A robust RabbitMQ client in Go

A few months ago I started working on a project that heavily relies on RabbitMQ as a message broker. We have two clients communicating with Go, one via AMQP (RabbitMQ) and the other through HTTP. As our dependency on RabbitMQ is big, I had to write a robust client that does graceful shutdowns, panic recoveries, is multithreaded, logs everything nicely, and more.

10th October 2020: Fixed reconnection logic in example

Modern cloud applications tend to be decoupled into smaller, independent services (microservice) that, compared to classic/monolith applications, are easier to develop, deploy, and maintains. Message queues provide communication and coordination for these distributed applications. They can significantly simplify the coding of decoupled applications while improving performance, reliability, and scalability. Message queues provide asynchronous communication, allowing different services to produce and consume messages with the queue, not each other. New messages can be added to the queue without waiting for them to be processed. And consumers only process messages when they are available. This optimizes the data flow by removing the need for services to wait for each other.

RabbitMQ is the most widely deployed open-source message broker. RabbitMQ is lightweight and easy to deploy on-premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

Go doesn’t have an official rabbitmq library. Instead there’s a very popular one maintained by Sean Treadway, located under streadway/amqp. While it does provide some examples, and there are plenty of tutorials explaining how to use it for a basic task, this one wraps it all up and explains how to build a robust rabbitmq consumer/publisher with Go. Honorable mention goes to harrisonturton’s Gist, where a big part of this code originates from.

The code is available as a Gist HERE.

Custom errors needed for proper handling of failures:

var (
ErrDisconnected = errors.New("disconnected from rabbitmq, trying to reconnect")
errInvalidQueueNameProvided = errors.New("invalid queue name provided")
)

The client which holds all the methods needed for streaming/pushing and connecting to rabbitmq has the following fields.

// Client holds necessery information for rabbitMQ
type Client struct {
pushQueue string
logger zerolog.Logger
connection *amqp.Connection
channel *amqp.Channel
done chan os.Signal
notifyClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
isConnected bool
alive bool
threads int
wg *sync.WaitGroup
}
  • The pushQueue is the name of queue client will push to. Instead of storing it in the client, it could be passed as an argument to Push method.
  • Logger is not mandatory and could be replaced with any other (structured) logger like Zap/Logrus.
  • Connection and Channel are AMQP connection and channel. More info is available in the linked GoDoc pages.
  • The done channel is triggered once the server has received the shutdown signal, which will stop the client from trying to reconnect to RabbitMQ server.
  • Notify close tells the reconnect method that the connections closed and the client needs to reconnect
  • NotifyConfirm is used to acknowledge that the data was pushed to the server
  • The wg waitGroup is used to gracefully shutdown the server once all messages are processed.
const (
// When reconnecting to the server after connection failure
reconnectDelay = 5 * time.Second
)

The New function is a constructor that takes address, push and listen queue names, logger, and a channel that will notify rabbitmq client on server shutdown. We calculate the number of threads, create the client, and start the connection process. Connect method connects to the rabbitmq server and creates push/listen channels if they don’t exist.

func New(listenQueue, pushQueue, addr string, l zerolog.Logger, done chan os.Signal) *Client {
threads := runtime.GOMAXPROCS(0)
if numCPU := runtime.NumCPU(); numCPU > threads {
threads = numCPU
}

The push method is quite simple. If you need to push to multiple/different queues which may often be the case, instead of providing the queue name in constructor — provide it as an argument to Push. The Push method is blocking until it succeeds, which may not be always what you’re looking for.

Stream creates threads number of goroutines that listen to messages from server, and process in in parseEvent method.

func (c *Client) Stream(cancelCtx context.Context) error {
for {
if c.isConnected {
break
}
time.Sleep(1 * time.Second)
}

The called to stream method looks something like:

go func() {
for {
err = rmq.Stream(cancelCtx)
if errors.Is(err, rabbitmq.ErrDisconnected) {
continue
}
break
}
}()

A stripped-down version of event received from the stream. It can include any data you need.

type event struct{
Job string `json:"job"`
Data string `json:"data"`
}

ParseEvent is responsible for the actual application logic. It does things depending on the job received in the event, and calls other services/interfaces passed as arguments to Queue to do the actual job. Panic recovery is responsible for nACKing the message and logging the panic.

func (c *Client) parseEvent(msg amqp.Delivery) {
l := c.logger.Log().Timestamp()
startTime := time.Now()

Log and nack function is used to send negative acknowledgment to RabbitMQ and log the error

func logAndNack(msg amqp.Delivery, l *zerolog.Event, t time.Time, err string, args ...interface{}) {
msg.Nack(false, false)
l.Int64("took-ms", time.Since(t).Milliseconds()).Str("level", "error").Msg(fmt.Sprintf(err, args...))
}