Switching queues

Adapters in JavaScript

In my recent project I work with a set of processes which communicate with each other using message queue, not unlike the idea, I described in one of my previous posts.

The prototype was build using bull. Small message queue build on top of Redis.

It was perfect for the prototype, but I had stability problems when trying to scale the system to much higher throughput (around 200 msg/s).

I admit that I was probably doing something wrong and it wasn’t necessary a problem with bull or redis. Did I mention I had limited resources to run this thing on?

The circumstances forced me to change underlying queue. Instead of using in-memory database I decided to switch to “real” message queue. RabbitMQ is one of the best in its class and I was hopeful that it will fix my problem. At least in theory, it should sustain much higher throughput than what I was planning to generate.

The only challenge was to adopt my code to use the new solution.

My code for interacting with bull was laughably small. The only thing it did was to abstract getting redis connection data from the config file. The whole module in the entirety looked like this:

const queue = require('bull');
const config = require('../config');

module.exports = name => {
  return queue(name, config.redis.port, config.redis.host);
};

The fact that it’s so tiny proves that bull is a well-abstracted library. I could achieve everything I needed with the interface it provides. Almost the perfect fit.

The module above was used in the following fashion.

const getQueue = require('getQueue');
const queue = getQueue('queue-name');

queue.add({data: {/* something to do later */}});

queue.process(job => {
    // process the job in an asynchronous manner, returns promise
    return asyncHandle(job.data);
});

One interesting point about it is that bull expects you to return a Promise from the job processing function. If the promise resolves the job will be marked as processed. In case of rejection, it will be returned to the queue and processed again later. This gives reassurance that no work will be lost.

To interact with RabbitMQ I decide to use jackrabbit.

The queue interface, although also very simple, is a bit different than what bull has to offer. My plan was to write small adapter which would enable me to not change any of my existing code.

The result surprised me. Only 18 lines of code. It’s quite elegant.

const jackrabbit = require('jackrabbit');
const config = require('../config');
const rabbit = jackrabbit(config.rabbitmq.url);

module.exports = name => {
  const exchange = rabbit.default();

  return {
    add: message => {
      exchange.publish(message, {key: name, persistent: true});
      return Promise.resolve();
    },

    process: callback => {
      const queue = exchange.queue({name});
      queue.consume((data, ack, nack) => {
        callback({data}).then(ack).catch(nack);
      });
    }
  };
};

It preserves the most important property of the old solution, where depending if the worker returns a promise which resolves the message will be marked as processed.

I was able to write so little code because I was using libraries which abstracted the underlying queues a lot. Also, JavaScript shines here showing how flexible it can be.