Redis is not just a very good in-memory database, it can also double as a message queue. I want to show you our first shot at implementing a queue in Redis that can fulfill all our requirements: reliable, persistent, fast, monitorable. While still being a Gedankenexperiment the basic idea should be a very good starting point.
Although we used Go for our implementation, the concept could be transferred to any language you like.
The main reason we used Go is that this approach to build a message queue uses a native Redis client as basis and our backend is written in Go.
Criteria for a feasible message queue system
In order to be used in production at adeven we need the following features:
- persistent (none to little data loss in case of a crash)
- reliable, atomic (each message can only be fetched once)
- monitorable (message rates, queue length and worker statuses need to be monitored through an API)
- multi-message ack and reject (acknowledge multiple messages at once while rejecting them individually)
We do not need any complex broker system. Simple queues with multi publishers and multi consumers are the only thing we currently use. So if you want to have a fancy broker or exchange system this is probably not what you’re looking for.
Our starting point was the “Reliable queue” pattern described here: http://redis.io/commands/rpoplpush
The basis of our message queue system is that Redis has atomic list operations as well as blocking (waiting) versions of those. Using those commands it is possible to write a system where all messages are atomically moved between Redis lists. This way a crashing consumer cannot lose any messages if you use a “processing list” or “working queue”.
The other great feature of Redis is, that you can exactly tune the level of persistence vs. performance you need and easily adopt it to your specific needs.
So we already got the persistence and the reliability covered. Performance is usually one of the big advantages when using redis. As we’ll see later this also applies to this case.
While the underlying idea is simply using
RPOP with a bunch of cleverly named lists, we added some magic with statistics counters and unique consumers.
Each queue is a
LIST in Redis with multiple “publishers”
LPUSHing into it. It also comes with a
failed list to collect all rejected messages for later use.
Consumers each have a “working queue” where the currently unacked messages are stored. In order to make a consumer tag unique we write a “Hearbeat-Key” and check if it’s already set. See consumer.go for the implementation. This also enables us to monitor which consumers have subscribed to a queue even from another program.
Message rates are measured by
INCRementing counters for each stream of messages (
FAILED). An observer then resets them each second to get the throughput.
We used a scheme where
redismq::queue_name is the input list and every related list and key has the prefix
redismq::queue_name:: to group related keys together.
All names of the lists can be found here:
First off we give each message a package with headers that can be expanded for stuff like rejection handling.
1 2 3 4
So the basic PUSH command is simply:
1 2 3 4 5 6
Consumer is a bit more complex since it includes message rate, checking for unacked messages of the same consumer tag and so on.
But the basic commands are easy to understand:
1 2 3 4 5
It’s just called unsafe because the check for unacked messages has to be done separately. So once we
GET a message we can either
REJECT (with or without re-queuing) it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
Everything else is just to facilitate
MULTI-ACK and monitoring. As I already said: you can easily rewrite this into any language you want as long as you have a Redis client at hand.
As this is only a preliminary piece of work some commands may be implemented in a slower than optimal way. The general functionality is fully tested.
First off, this software creates quite a bunch of keys in your redis (5 per queue plus 5 per consumer). So make sure to pick an otherwise empty db. You can set this up in the config.yml as we use goenv to configure everything.
A basic example is included with the code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
As you can see the usage is very straight forward. Just import the package, create a queue and you can start pushing into it. Reading is equally simple and does work quite well even for huge payloads greater than 10 MB.
To give you a rough idea what this baby can do i conducted some simple benchmarks. Please keep in mind that these are highly dependent on the setting of your Redis and what kind of machine you run this on.
So on my 2.4GHz i7 and SSD for Redis i got following results.
2 writers and 4 consumers with standard Redis persistence:
That means we can publish and read around 8k messages per second simultaneously.
I am pretty sure though that there is a lot of space for optimizations as we, so far, only focused on implementing the basic functionality.
In order to make this a viable option for our own production system we need to access the monitoring information via a JSON API. This will enable our Zabbix to keep an eye on the message queue system and warn us if anything looks fishy.
Therefore we’ll write a standalone monitoring server that enables access to the queue observers. In a next step a pretty little webview with control functions and stats overview could follow.
I hope you enjoyed our little idea and if you interested to contribute, just send us a pull request.
I have written a new article that explains how we implemented some of the next steps and also made it 6 times faster.