Thoughts on message queue and work queue systems - overview & what's useful
I have just finished writing a message queue based system and I used Redis as the message queue system. I have just run the system in production for a couple of days, but I already have some thoughts on how I would like a message queue system to work, and what seems to be available. There seems to be two names floating around to describe these kinds of queues: Message queues as a more general term and work queue as a more specific term.
A work queue system helps with distributing and deferring work. It can be so that a user of a web site can ask for some processing to be done, but does not have to wait for it to happen while the web page is slowly loading the result. Instead a quick confirmation is given and later a message - usually an e-mail, is sent to the user. A message queue system can also help with scalability and reliability, see further down in the text how it does this.
Here is a list of some interesting systems I have found that are freely available. I will divide them in three categories: Messaging toolkits, message queue systems and work queue systems. This is just a rough categorization. With messaging toolkits you have complete freedom in how you want your system to work and behave, message queue systems have the fundamentals to build work queue systems (and oftentimes form the core of the work queue systems) and finally work queue systems contain a lot of the functionality out of the box.
zeromq - Toolkit targeted towards those who want to build their own high-speed systems.
Message queue systems
Lend themselves as a starting point for building work queue systems
Redis - Well documented and easy to get started with. Does not dabble in higher level constructs so you may need to write your own code on that level, or use any of the frameworks that build on Redis. Redis seems to have started as an improvement on memcached, but has since taken on more functions also suitable for messaging systems. A lot of the use of Redis on the web is though as a cache and session store. The building stones of Redis - keys, lists, sets, sorted sets, transactions and timeouts - seem well thought out. Redis is what I currently use. Huge user base: A Google search for pages mentioning "Redis" yields 7.5 million results. However a large part of the base is probably using the caching stuff more than the queueing.
RabbitMQ - Advanced system that follows the AMQP standard (which exists in different revisions, RabbitMQ is at 0.91, but there is a 1.0 version of the standard). Used by many large operations. Not a caching system. Has an acknowledge function that means that a job can be requeued if it does not get completed in a configurable amount of time. Also has the concept of exchanges, which are a couple of pre-defined routing algorithms. There are other AMQP systems available such as ActiveMQ and HornetMQ, and also commercial message queue systems that use the AMQP protocol. AMQP originated in the world of banking.
Work queue systems
Have features built in for managing jobs
sidekiq is a work queue system written in Ruby similar to Resque and in many ways API compatible with it. Uses threads. Can run on the JVM with JRuby, might even be the recommended way of running it.
Celery - Amibitious high level messaging system in Python that can run on top of RabbitMQ, Redis, beanstalkd or pretty much anything. Celery is extensively documented. It can still be a bit hard to find one's way through the documentation on what it really is and how it works. It's here: User Guide — Celery 3.0.11 documentation . This slideshow- Advanced task management with Celery helps a lot after having read through the docs. Here's a little thing I've been writing after initial tests: How to create a sequential workflow with error handling in Celery.
Besides running on the standard CPython (both 2 and 3), Celery can also run on the JVM with Jython, and with Pypy.
kombu - factored out of Celery - has a very nice API. Runs on top of Rabbitmq, Redis, beanstalkd or pretty much anything. Kombu is used in OpenStack - an infrastructure as a service project (basically automatically deploying virual servers).
Gearman has been around for a while, originally written in Perl, but there is now also a C version. It was mentioned in a Reddit discussion pertaining to this post. Documentation is a bit sketchy and I cannot say much about it, but seems to have a user community and many client implementations. The main documentation does not give a good overview of Gearman as far as I can see, but these do:
Popularity of the different systems
I did an unscientitic ranking by checking how many days back it took to get the 50 latest questions about each system from stackoverflow.com, and then calculate questions per week for each system. The results as of 2012-10-28 were with most popular sorted first:
- Redis 35
- RabbitMQ 13
- Celery 9.5
- Resque 5.8
- ZeroMQ 5.4
- Gearman 2.9
- Kombu 1.6
- sidekiq 1.3
- Beanstalkd 1.1
I would say though that at least half of the Redis questions on stackoverflow are fielded by people who are using Redis as a cache or session store rather than as a basis for a message queue or work queue.
I am surprised that beanstalkd ranked so low and Celery so high. It may well be that some of these systems have their communities do Q/A somewhere else than at stackoverflow or that some just do not generate that many questions, but still it is a rough estimate.
I checked the tag "beanstalk" too for beanstalkd, but that one was 98% about Amazon beanstalk, which is something else.
For other AMQP systems than RabbitMQ: ActiveMQ would have ranked similar to RabbitMQ, HornetQ somewhere near Kombu.
Features and concepts
In this text:
- task means the overarching thing you are trying to achieve with your application, e.g. do some searches, analyze the results and then email a report to the user
- subtask is a part of the task, e.g. e-mailing out a report
- worker is a program that does the subtask, e.g. a worker that is an emailer
- job is an instance of a subtask, e.g an emailer worker emailing out the report to a specific user
- Control is some kind of central control function, e.g. a supervisor that checks for bad jobs
Scalability and reliability
Distributing and deferring work can help with scalability and reliability. With a work queue, several worker processes can feed from the same queue and hence you can get an automatically scalable system, where each worker just needs to connect and snatch something from the queue and go to work. Reliability can be improved by revoking jobs from malfunctioning workers and reschedule them to other workers. Just the fact that you have queues means that is not a biggie if most your complex web based system temporarily goes down; as long as the web interface and the process that puts things into the queues are up and running, the rest of the system can display a bit of volatility without jeopardizing the entire application.
Message queue systems also seem to be used as a cache, similar to memcached. This is the function I am least interested currently. In caching, the message queue system supplies a number of constructs, such as queues obviously, that can help to serve out fast-changing information quicker, than what would be possible by handling it with slower back end services. It is basically a cache with a bit of intelligence, that can sort, slice and dice. For this application speed is of the essence, with sub-millisecond replies being the order of the day.
If you divide your application into separate workers, they need a way to communicate. Often JSON is used for this in message queues, and since there are JSON parsers for all major languages you can mix and match workers written in completely different languages.
Resilience and inspectability
Having the process divided into several steps becomes a bit like having break points in your code. You can check the state of the data at the end and beginning of each step; they provide a snapshot of the state of your application at that point. You can correct data or code to get jobs unstuck in the processing chain, which gives the system a higher service level, and can help when demands are that every task should get through.
Revoking and rescheduling
Sometimes a job does not finish, or it does not finish the way you like. It would be good if the message queue system could help in handling this. In Redis I simply rescheduled the job by putting it back last in the processing queue. However the processing in my system is fairly deterministic and the job is most likely not going to fare better on a second run. Hence jobs that fail are now taken out of the queue and go to human inspection.
I figured out there could be different causes for a job not finishing and how it should be handled. It is assumed that a worker at least can catch its own exceptions. Here is what I came up with:
- Bad job: Data makes worker throw an exception, detectable from worker, can reach control
- Bad storage: Server makes worker throw an exception, detectable from worker, can reach control
- Bad network: Server makes worker throw an exception, detectable from worker, cannot reach control
- Power failure or other catastrophic failure: Server makes worker lock or crash, undetectable from worker
zeromq has some reasoning along the same lines. Since they have more experience than me I'll quote their take on it:
"So let's look at the possible causes of failure in a distributed ØMQ application, in roughly descending order of probability:
- Application code is the worst offender. It can crash and exit, freeze and stop responding to input, run too slowly for its input, exhaust all memory, etc.
- System code - like brokers we write using ØMQ - can die for the same reasons as application code. System code should be more reliable than application code but it can still crash and burn, and especially run out of memory if it tries to queue messages for slow clients.
- Message queues can overflow, typically in system code that has learned to deal brutally with slow clients. When a queue overflows, it starts to discard messages. So we get "lost" messages.
- Networks can fail (e.g. wifi gets switched off or goes out of range). ØMQ will automatically reconnect in such cases but in the meantime, messages may get lost.
- Hardware can fail and take with it all the processes running on that box.
- Networks can fail in exotic ways, e.g. some ports on a switch may die and those parts of the network become inaccessible.
- Entire data centers can be struck by lightning, earthquakes, fire, or more mundane power or cooling failures."
Some ideas from me on remedies on the things on my list above:
Bad job: The job should be taken out of the job queue and a bad job mail be sent to a human
Bad storage: The job should be resubmitted to another worker and the malfunctioning worker should be taken out of commission, i.e. terminate itself.
Bad network: The job should time out and be resubmitted and the worker should be taken out of commission, i.e. terminate itself .
Power failure: The job should time out and the job should time out and be resubmitted and the worker should be taken out of commission from the point of control, since the malfunctioning worker can't do it. If it comes to life and sends in a job that has already been processed by another worker, this job is ignored.
- The control server must make reasonable assumptions of how long a job could max take
Work queue strategies
The three systems I have found that seems to be able to help out-of-the-box with these kinds of things are Resque from GitHub, beanstalkd and Celery. After reading through the AMQP 0.91 standard as explained on the RabbitMQ site it seems RabbitMQ should also be able to contribute out-of the box on this level.
zeromq and Redis on the other hand are more like toolkits, especially zeromq.
Resque puts in an abstraction layer with a parent worker process and a child worker process, where the parent worker process starts the child process for the actual work and watches it and changes its own state depending on whether the child process concludes or something else. Some quotes from their pages:
"Resque assumes your background workers will lock up, run too long, or have unwanted memory growth."
"If you want to kill a stale or stuck child, use
USR1. Processing will continue as normal unless the child was not found. In that case Resque assumes the parent process is in a bad state and shuts down."
"For example, this [buried state] is useful in preventing the server from re-entering a timed-out task into the queue when large and unpredicatble run-times are involved. If a client reserves a job, then first buries it, then does its business logic work, then gets stuck in deadlock, the job will remain buried indefinitely, available for inspection by a human — it will not get rerun by another worker."
sidekiq has an interesting take on the same theme. Instead of burying a job for human inspection it retries with a timeout that gets longer and longer so that you should have time to fix the problem:
"Sidekiq will retry processing failures with an exponential backoff using the formula
retry_count**4 + 15(i.e. 15, 16, 31, 96, 271, ... seconds). It will perform 25 retries over approximately 20 days. Assuming you deploy a bug fix within that time, the message will get retried and successfully processed."
beanstalkd also sports configurable time-outs that can give control a signal that a job is probably hung or unreachable. RabbitMQ uses acknowledgements (acks) to track finished jobs. Redis has time-outs on its key data type, but no buillt-in detection or event handler for timeouts, so you would have to construct something like that on a higher level than in Redis itself. A pop/push timeout in Redis would have been helpful methinks.
Celery has advanced concepts in this department. Some concepts from Celery that seem to be of interest: linked workers, revoke, inspect, chains, groups, chords, scheduling. Chains, groups and chords are a way of stringing together subtasks into units, that can fan out into parallel processing and for example do map-reduce.
Some concepts from kombu that seem to be of interest: message priority, retry_policy, recover, heartbeat_check.
There seems to be some systems that are pull-based when it comes to queues such as Redis, while RabbitMQ seems to support both pul and push-based interactions. With pull based systems you need less intelligence centrally but the downside then is that you do have less intelligence centrally of course. I guess you will need a bit of both: Workers know better how to distribute the load between them, but they cannot manage states where they do not function anymore. Then control needs to do that and be able to bury jobs.
In my current application, there is a different job queue for each kind of subtask, and it is each worker's responsibility to move the results of a job on to the next queue so that the next subtask can be executed.
I discovered that you can specify in Celery a mapping between task signatures and queue names, see: Routing Tasks. Still, it seems as the responsibility for moving stuff onwards does not fall onto the worker, although I will have to look into that.
Update: I have looked into how Celery can be used for handling work queues in a way that you can move a task from worker to worker and branch out on error conditions, see my blog post: How to create a sequential workflow with error handling in Celery.
Parts and names
From Zeromq again with my boldface:
"sinks (process the messages without any response), proxies (send the messages on to other nodes), or services (send back replies)"
Furthermore you want to avoid having the workers produce side effects that cannot be revoked or at least gather them. It is a good idea I believe if you are going to send out an email, to make that worker as simple and reliable as possible, and not trigger it unless everything else has lined up right. And do make sure it does not get stuck in a loop. In fact its should probably have a memory of what is has sent out before and refuse to send again if it detects same content and recipient. Unless you are running a huge system or a spam operation or something else nefarious you don't need to have many e-mailing workers and can get by with just a singleton, which means you do not need to worry about spawning e-mail clients all over the place (and other places).
If a subtask doesn't have any side effects, it means that it can be run several times without any ill consequences. It is then said to be idempotent, a word that pops up here and there in the documentation for the different systems. Idempotency does allow side effects too, as long as it only happends on the first run. So a singleton e-mailer that refuses to send an e-mail it has already sent would also be called idempotent.
Fallbacks & graceful degradation
One thing I think would be useful in a work queue system is fallbacks. If the biggest source for interruption of a job is a conflict between code and data (a nice way of saying that the code is buggy and/or bad data has been allowed in), then re-running the job will give the same result. Another way of handling that is to re-run the job with different code, that may not give as good a result but is more robust.
Could be time to start defining what would be interesting to have in a work queue system for me, right now. One should note that many things may best be left to the coder. There is no use in having a framework which concepts are just far enough from what you actually want to make the code a bit cumbersome.
I'll start with Redis, which I have used and I will also fill in some preliminary info on Celery, but I need to run more tests on that one. Please note that the rating will change as I find out more. Do also note that since for example Celery can use Redis as a back end, by definition Redis can "do" everything Celery can, as long as e.g. Celery is doing the work.
I've also started putting in info on RabbitMQ and its implementation of the AMQP, which seems to have enough knobs and levers to express many of the concepts listed below. It can however be a blurry line between what is built-in and is merely a configuration issue, and what starts looking like bona fide programming.
A solid authentication layer
Redis claims to not be overly secure if exposed directly to the Internet. One can put an SSL tunnel in front with certificates, which I have done. That may be a better solution than what could have been built in.
Celery has built in support for signing and certificates right into the messages themselves, see Security — Celery 3.0.11 documentation, however the Celery documentation says that Celery should be treated as an "unsafe component", so SSL tunneling might be a good idea anyway.
A work queue that workers can take jobs from
Celery has this.
RabbitMQ has this. It can make it look like workers are pulling jobs from a queue, by using the configuration prefetch_count=1.
Possibility to bury jobs
Redis does not have this, you will have to build that on top
Celery has something called a persistent revoke, which if you specify a storage file, seems to do the trick, see: Workers Guide — Revoking tasks
Beanstalkd has this
Sidekiq retires the job with an exponentially rising delay, so that a fix can be applied, instead of burying it
According to the RabbitMQ docs here, there is something called a dead letter extension, that as I can read it, could be helpful in implementing this.
Time-outs after which jobs are rescheduled, revoked or buried
Redis does not have this, you will have to build that on top. There is a time-out option for keys though.
Sidekiq retires the job with an exponentially rising delay, so that a fix can be applied
Celery has this, with acks_late option, see: Workers Guide.
Monitoring of processes
Redis does not have this, you will have to build that on top, but there are third party modules although it unlikely that they work out-of-the-box with how you have designed your system
Group operations into atomic transactions
Redis has this
RabbitMQ has this
Redis has this
Celery has this as long as the back end is configured to persist
RabbitMQ has this.
Flexible reporting and web interface, pubsub logging
- There is a PHP web interface, among others. Pubsub exists and there is e.g a python logger that publishes to Redis.
Celery has a very ambitious support for this, in the shape of flower and django plugins, see: Monitoring and Management Guide — Celery 3.0.11 documentation
There is even a limited curses interface for Celery.
Sequential workflow with error handling
Redis does not have this, you will have to build that on top
Celery has this, see How to create a sequential workflow with error handling in Celery
sidekiq can send exceptions to an exception notification service
Redis does not have this concept, you will have to build that on top
Celery does not have this concept, you will have to build that on top
Language agnostic, easy to mix and use different languages
Redis has a defined wire protocol and support in a plethora of languages, and as long as you use a data format available on all platforms such as JSON, interoperability should not be a problem
Celery has a defined protocol as far as i can see, but it is not implemented in plethora of clients. It uses python's pickle format for serializing data as default, but it can be switched to e.g. JSON or Yaml. Celery can together with Django be used with a simple http protocol called Webhook, a protocol that according to Google searches seems to have been causing some enthusiasm back in 2009, and which today forms a part of Github's API. Celery can also operate with http posts and gets, see: HTTP Callback Tasks (Webhooks), celery/examples/celery_http_gateway.
Resque seems to run its child workers as system processes and that could open up a possibility for it to run code in other languages, but that does not seem to be how it is used. I guess one could write a worker that used STDOUT and STDIN to communicate with the child process I guess.
RabbitMQ works with AMQP and has standardization and interoperability built right in.
Redis -well documented in a concise way. Redis itself is pretty concise, which helps. For every command there is information on how well that command scales, in "Big O" notation. there are also pages covering other aspects of Redis.
Celery is very well documented, from high level all the way down to the internals and message format.
beanstalkd - An FAQ that is one page on a five page wiki.
Zeromq - Lots of documentation and examples, given in parallel in several computer languages