Tuning Redismq - How to Use Redis in Go

In my last article I talked about how redismq employs a simple Redis client to describe a message queue. The first iteration was aimed at proof of concept and to satisfy my own curiosity. Since then adeven decided to use it in production and so we developed the initial idea into a full featured queue.

On the way we picked up a lot of interesting stuff which I’d like to share with you.

Performance

While not being really slow the first version of redismq was only capable of around 5k messages being pushed and consumed per second. Something had to be done…

Pipelining

One thing that stood out was that the payload size didn’t seem to have any impact on the performance. A 1MB payload was almost as fast as a 1KB payload. This led us to the assumption that the Redis command itself and its answer from the Redis server was the step that took most of the time.

Upon investigating the Redis client library, We found the option to pipeline commands. So multiple commands would be transmitted in one call to the Redis server.

So one of the first improvements was to pipeline all the statistics counter commands.

Before:

1
2
3
4
5
 func (self *Consumer) AckPackage(p *Package) error {
  answer := self.GetQueue().redisClient.RPop(self.WorkingName())
  self.GetQueue().redisClient.Incr(self.AckCounterName())
  return answer.Err()
 }

After:

1
2
3
4
5
6
7
 func (self *Consumer) AckPackage(p *Package) error {
  _, err := self.GetQueue().redisClient.Pipelined(func(c *redis.PipelineClient) {
    c.RPop(self.WorkingName())
    c.Incr(self.AckCounterName())
  })
  return err
 }

This change immediately doubled write performance, confirming the assumption about the most time being spent waiting for the Redis server to answer.

So after this little adjustment our benchmarks for 1 writer and 2 consumers shows:

1
2
InputRate:  11949
WorkRate:   8045

Rates are measured in messages per second

Improve MultiGet

After the easy to pipeline commands we refactored the MultiGet() command.

So instead of multiple single Get() we now pipeline multiple reads together and increase the counter only once.

Before:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (self *Consumer) unsafeGet() (*Package, error) {
  var answer *redis.StringReq

  self.GetQueue().redisClient.Pipelined(func(c *redis.PipelineClient) {
      answer = c.BRPopLPush(self.GetQueue().InputName(), self.WorkingName(), 0)
      c.Incr(self.WorkingCounterName())
  })

  return self.parseRedisAnswer(answer)
}

func (self *Consumer) MultiGet(length int) ([]*Package, error) {
  var collection []*Package
  if self.HasUnacked() {
      return nil, fmt.Errorf("unacked Packages found!")
  }
  for i := 0; i < length; i++ {
      p, err := self.unsafeGet()
      if err != nil {
          return nil, err
      }
      p.Collection = &collection
      collection = append(collection, p)
  }
  return collection, nil
}

After:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (self *Consumer) MultiGet(length int) ([]*Package, error) {
  var collection []*Package
  if self.HasUnacked() {
      return nil, fmt.Errorf("unacked Packages found!")
  }

  reqs, err := self.GetQueue().redisClient.Pipelined(func(c *redis.PipelineClient) {
      c.BRPopLPush(self.GetQueue().InputName(), self.WorkingName(), 0)
      for i := 1; i < length; i++ {
          c.RPopLPush(self.GetQueue().InputName(), self.WorkingName())
      }

  })
  if err != nil {
      return nil, err
  }

  for _, answer := range reqs {
      switch answer := answer.(type) {
      default:
          return nil, err
      case *redis.StringReq:
          if answer.Val() == "" {
              continue
          }
          p, err := self.parseRedisAnswer(answer)
          if err != nil {
              return nil, err
          }
          p.Collection = &collection
          collection = append(collection, p)
      }
  }
  self.GetQueue().redisClient.IncrBy(self.WorkingCounterName(), int64(length))

  return collection, nil
}

As you can see we also fixed a bug where MultiGet() would wait till all the requested messages were fetched. So N-1 messages would never be processed. Now it only waits for the first package and then tries to fetch as many as requested by MultiGet.

Using this, 2 writers and 2 consumers clock in at:

1
2
InputRate:  18688
WorkRate:   16000

Buffered Queues

Understanding that waiting for Redis commands to execute takes most of the time it seemed logical not to just pipeline the consuming of the queue but also the write process.

This however has some far more complex implications as it needs a local buffer in the client. Since persistence is important to us this buffer and its flushing to Redis needs to be controllable by the program using redismq.

To implement BufferedQueue we used a nice design pattern that Go enables. While Go does not strictly have inheritance it has embedding.

So we can embed Queue into BufferedQueue:

1
2
3
4
5
6
type BufferedQueue struct {
  *Queue
  BufferSize  int
  Buffer      chan *Package
  ...
}

This way all commands that Queue has are available for the BufferedQueue as well. Another cool feature we discovered while writing this, is that you can selectively override single functions from the “parent”.

In order to change Put() we simply overwrite it:

1
2
3
4
5
func (self *BufferedQueue) Put(payload string) error {
  p := &Package{CreatedAt: time.Now(), Payload: payload, Queue: self}
  self.Buffer <- p
  return nil
}

To periodically flush the buffer or to write to Redis when the buffer is full we use a go func() constantly checking those conditions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (queue *BufferedQueue) startWritingBufferToRedis() {
  go func() {
      queue.nextWrite = time.Now().Unix()
      for {
          if len(queue.Buffer) == cap(queue.Buffer) || time.Now().Unix() >= queue.nextWrite {
              size := len(queue.Buffer)
              queue.redisClient.Pipelined(func(c *redis.PipelineClient) {
                  for i := 0; i < size; i++ {
                      p := <-queue.Buffer
                      c.LPush(queue.InputName(), p.GetString())
                  }
                  c.IncrBy(queue.InputCounterName(), int64(size))
              })
              for i := 0; i < len(queue.flushStatus); i++ {
                  c := <-queue.flushStatus
                  c <- true
              }
              queue.nextWrite = time.Now().Unix() + 1
          }
          if len(queue.Buffer) == 0 {
              time.Sleep(10 * time.Millisecond)
          }
      }
  }()
}

This approach to flushing the buffer needs to ensure that only one of those background jobs is running. So just like Consumer we use a heartbeat to check if there is a running queue with the same name.

1
2
3
4
5
6
7
8
func (self *BufferedQueue) startHeartbeat() {
  go func() {
      for {
          self.redisClient.SetEx(self.HeartbeatName(), 1, "ping")
          time.Sleep(500 * time.Millisecond)
      }
  }()
}

Last but not least we need to be able to trigger the flushing of the buffer programmatically. For example when shutting down a server and to be sure no messages are lost in the buffer you would shut down the HTTP handlers and then flush the queue.

1
2
3
4
5
6
func (self *BufferedQueue) FlushBuffer() {
  flushing := make(chan bool, 1)
  self.flushStatus <- flushing
  <-flushing
  return
}

You already saw the other half of this functionality in the startWritingBufferToRedis(). The pattern to use channels of channels for processing answers from long running queries is taken from “Effective Go”.

Using all these the constructor for the BufferedQueue looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type BufferedQueue struct {
  *Queue
  BufferSize  int
  Buffer      chan *Package
  nextWrite   int64
  flushStatus chan (chan bool)
}

func NewBufferedQueue(redisURL, redisPassword string, redisDB int64, name string, bufferSize int) (q *BufferedQueue) {
  q = &BufferedQueue{
      Queue:       &Queue{Name: name},
      BufferSize:  bufferSize,
      Buffer:      make(chan *Package, bufferSize),
      flushStatus: make(chan chan bool, 1),
  }
  q.redisClient = redis.NewTCPClient(redisURL, redisPassword, redisDB)
  return q
}

func (queue *BufferedQueue) heartbeatName() string {
  return queue.inputName() + "::buffered::heartbeat"
}

func (queue *BufferedQueue) Start() error {
  queue.redisClient.SAdd(masterQueueKey(), queue.Name)
  val := queue.redisClient.Get(queue.heartbeatName()).Val()
  if val == "ping" {
      return fmt.Errorf("buffered queue with this name is already started")
  }
  queue.startHeartbeat()
  queue.startWritingBufferToRedis()
  return nil
}

The usage of BufferedQueue is a lot less complex than the implementation.

1
2
3
4
5
6
7
8
...
bufferSize := 100
testQueue := redismq.NewBufferedQueue("localhost:6379", "password", 9, "clicks", bufferSize)
err := testQueue.Start()
if err != nil {
  panic(err)
}
...

But how fast do all these shenanigans make our beloved redismq? In a word: fast…

Or in raw numbers (1 buffered writer, 4 multi-consumers):

1
2
InputRate:  39600
WorkRate:   18600

So we now have a persistent and atomic queue that can move 60k messages per second.

Monitoring

Besides making it faster we also needed the capability to monitor the size of the queues and the rates at which new messages are added and existing ones are consumed.

A simple web API that returns JSON would be enough to hook it up to monitoring tools like zabbix.

Therefore we implemented a simple web server that uses a handler to display the data collected by our observer model.

Setting it up is straight forward:

1
2
3
4
5
func main() {
  server := redismq.NewServer("localhost:6379", "password", 9, "9999")
  server.Start()
  ...
}

Now you can check http://localhost:9999/stats to see your queue information as JSON in real time.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
   "Stats":{
      "example":{
         "InputRate":27900,
         "WorkRate":13200,
         "AckRate":13138,
         "FailRate":0,
         "InputSize":479717,
         "UnAckSize":55,
         "FailSize":0,
         "ConsumerStats":{
            "testconsumer1":{
               "WorkRate":3300,
               "AckRate":3299,
               "UnAckSize":15
            },
            "testconsumer2":{
               "WorkRate":3300,
               "AckRate":3265,
               "UnAckSize":0
            },
            "testconsumer3":{
               "WorkRate":3300,
               "AckRate":3278,
               "UnAckSize":0
            },
            "testconsumer4":{
               "WorkRate":3300,
               "AckRate":3296,
               "UnAckSize":40
            }
         }
      }
   }
}

With all those additions we’ll start testing redismq in our production environment very soon. When we collected some real world data I may write another follow up.

Till then, have fun.

Comments