This past weekend I popped over to Goa to chill, take my mind off work, and get in some good food, fresh air, and general beach-side relaxo. I’d not been here a day when I started thinking about message-queues and simple task delegation, so I decided to write a small library, which I have called amp-delegate
that simplifies to the point of triviality, the creation and use of AMQP
remote workers.
npm install amqp-delegate
An example:
worker.js
const { makeWorker } = require('amqp-delegate')// waits for 10ms then adds two numbers.
const task = (a, b) =>
new Promise(resolve =>
setTimeout(() =>
resolve(a + b), 10))const worker = makeWorker({
name: 'adder',
task
})worker.start()
delegate.js
const { makeDelegate } = require('amqp-delegate')const delegator = makeDelegator()delegator
.start()
.then(() => delegator.invoke('adder', 10, 15))
.then(result => {
console.log('result', result)
})
.catch(err => {
console.error('caught', err)
})
You can run as many of the workers as you like, and, because they are all asynchronous, they can do things like scrape web pages, or interact with databases, or whatever you like. The first named worker to pick up the job will do it.
How does it work?
It’s just a simple implementation of the standard remote procedure call pattern, but with the details of the message-queue interactions wrapped up in a simple high-level interface so you don’t have to see, or understand them.
Workers
When you first make a worker you give it a name
and a task
to perform. The name
can be any string, and the task
can be any pure asynchronous function that accepts simple parameters and returns a simple result. By simple I mean parameters that can be marshalled into a JSON
string and converted to a buffer, buffers being AMQP
’s perferred message content format.
When you start a worker it connects to your AMQP
server, then starts listening to a queue with the same name
as the worker. When it hears a message on that queue it grabs it, racing any other workers with the same name
, decodes the message content into an array of parameters, and passes those parameters to the worker’s task
.
When the task
is complete it marshals the result into JSON
, turns it into a buffer, and sends it back to the caller using the incoming requests’ replyTo
queue, and supplying the incoming request’s correlationId
. This way the delegator knows that this message is the reply to its request and not some other random message on the same queue.
* makeWorker({ url, name, task, onError, onClose }) // => worker
* async worker.start()
* async worker.stop()
Delegators
When you start a delegator you don’t need to tell it much of anything. It just connects to the AMQP
server and waits for you to invoke remote workers by name. When you call delegator.invoke
you give it the name
of the worker you want to invoke, and the params
you want the worker to use when performing its task
. Then you just await
the result.
The delegator marshals the parameters into JSON
and then into a buffer, creates a unique correlationId
, and starts listening for the message with the right correlationId
in the queue’s replyTo
queue.
* makeDelegator({ url, onError, onClose }) // => delegator
* async delegator.start()
* async delegator.invoke(name, ...params) // => result
* async delegator.stop()
Error handling
Both a worker and a delegator can be supplied onClose
and onError
hooks that your code can use to handle error conditions gracefully. Other than that you can use the standard try
/ catch
wrappers around your awaits
, or, if you are using promises
, use the standard catch
handler.
Links
- See the repo at
github.com/davesag/amqp-delegate
. - The official RabbitMQ RPC tutorial.
—
Like this but not a subscriber? You can support the author by joining via davesag.medium.com.