“The ocean is the original internet; it connects all things.” — Renzo Piano

Delegating Work Using NodeJS and AMQP

Dave Sag
ITNEXT
Published in
3 min readJan 28, 2019

--

This past weekend I popped over to Goa to chill, take my mind off work, and get in some good food, fresh air, and general beach-side relaxo. I’d not been here a day when I started thinking about message-queues and simple task delegation, so I decided to write a small library, which I have called amp-delegate that simplifies to the point of triviality, the creation and use of AMQP remote workers.

npm install amqp-delegate

An example:

worker.js

const { makeWorker } = require('amqp-delegate')// waits for 10ms then adds two numbers.
const task = (a, b) =>
new Promise(resolve =>
setTimeout(() =>
resolve(a + b), 10))
const worker = makeWorker({
name: 'adder',
task
})
worker.start()

delegate.js

const { makeDelegate } = require('amqp-delegate')const delegator = makeDelegator()delegator
.start()
.then(() => delegator.invoke('adder', 10, 15))
.then(result => {
console.log('result', result)
})
.catch(err => {
console.error('caught', err)
})

You can run as many of the workers as you like, and, because they are all asynchronous, they can do things like scrape web pages, or interact with databases, or whatever you like. The first named worker to pick up the job will do it.

How does it work?

It’s just a simple implementation of the standard remote procedure call pattern, but with the details of the message-queue interactions wrapped up in a simple high-level interface so you don’t have to see, or understand them.

Workers

When you first make a worker you give it a name and a task to perform. The name can be any string, and the task can be any pure asynchronous function that accepts simple parameters and returns a simple result. By simple I mean parameters that can be marshalled into a JSON string and converted to a buffer, buffers being AMQP’s perferred message content format.

When you start a worker it connects to your AMQP server, then starts listening to a queue with the same name as the worker. When it hears a message on that queue it grabs it, racing any other workers with the same name, decodes the message content into an array of parameters, and passes those parameters to the worker’s task.

When the task is complete it marshals the result into JSON, turns it into a buffer, and sends it back to the caller using the incoming requests’ replyTo queue, and supplying the incoming request’s correlationId. This way the delegator knows that this message is the reply to its request and not some other random message on the same queue.

* makeWorker({ url, name, task, onError, onClose }) // => worker
* async worker.start()
* async worker.stop()

Delegators

When you start a delegator you don’t need to tell it much of anything. It just connects to the AMQP server and waits for you to invoke remote workers by name. When you call delegator.invoke you give it the name of the worker you want to invoke, and the params you want the worker to use when performing its task. Then you just await the result.

The delegator marshals the parameters into JSON and then into a buffer, creates a unique correlationId, and starts listening for the message with the right correlationId in the queue’s replyTo queue.

* makeDelegator({ url, onError, onClose }) // => delegator
* async delegator.start()
* async delegator.invoke(name, ...params) // => result
* async delegator.stop()

Error handling

Both a worker and a delegator can be supplied onClose and onError hooks that your code can use to handle error conditions gracefully. Other than that you can use the standard try / catch wrappers around your awaits, or, if you are using promises, use the standard catch handler.

Links

Like this but not a subscriber? You can support the author by joining via davesag.medium.com.

--

--