A robust RabbitMQ client in Go

A few months ago I started working on a project that heavily relies on RabbitMQ as a message broker. We have two clients communicating with Go, one via AMQP (RabbitMQ) and the other through HTTP. As our dependency on RabbitMQ is big, I had to write a robust client that does graceful shutdowns, panic recoveries, is multithreaded, logs everything nicely, and more.

10th October 2020: Fixed reconnection logic in example

Modern cloud applications tend to be decoupled into smaller, independent services (microservice) that, compared to classic/monolith applications, are easier to develop, deploy, and maintains. Message queues provide communication and coordination for these distributed applications. They can significantly simplify the coding of decoupled applications while improving performance, reliability, and scalability. Message queues provide asynchronous communication, allowing different services to produce and consume messages with the queue, not each other. New messages can be added to the queue without waiting for them to be processed. And consumers only process messages when they are available. This optimizes the data flow by removing the need for services to wait for each other.

RabbitMQ is the most widely deployed open-source message broker. RabbitMQ is lightweight and easy to deploy on-premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

Go doesn’t have an official rabbitmq library. Instead there’s a very popular one maintained by Sean Treadway, located under streadway/amqp. While it does provide some examples, and there are plenty of tutorials explaining how to use it for a basic task, this one wraps it all up and explains how to build a robust rabbitmq consumer/publisher with Go. Honorable mention goes to harrisonturton’s Gist, where a big part of this code originates from.

The code is available as a Gist HERE.

Custom errors needed for proper handling of failures:

var (
ErrDisconnected = errors.New("disconnected from rabbitmq, trying to reconnect")
errInvalidQueueNameProvided = errors.New("invalid queue name provided")
)

The client which holds all the methods needed for streaming/pushing and connecting to rabbitmq has the following fields.

// Client holds necessery information for rabbitMQ
type Client struct {
pushQueue string
logger zerolog.Logger
connection *amqp.Connection
channel *amqp.Channel
done chan os.Signal
notifyClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
isConnected bool
alive bool
threads int
wg *sync.WaitGroup
}
  • The pushQueue is the name of queue client will push to. Instead of storing it in the client, it could be passed as an argument to Push method.
  • Logger is not mandatory and could be replaced with any other (structured) logger like Zap/Logrus.
  • Connection and Channel are AMQP connection and channel. More info is available in the linked GoDoc pages.
  • The done channel is triggered once the server has received the shutdown signal, which will stop the client from trying to reconnect to RabbitMQ server.
  • Notify close tells the reconnect method that the connections closed and the client needs to reconnect
  • NotifyConfirm is used to acknowledge that the data was pushed to the server
  • The waitGroup is used to gracefully shutdown the server once all messages are processed.
const (
// When reconnecting to the server after connection failure
reconnectDelay = 5 * time.Second
)

The function is a constructor that takes address, push and listen queue names, logger, and a channel that will notify rabbitmq client on server shutdown. We calculate the number of threads, create the client, and start the connection process. Connect method connects to the rabbitmq server and creates push/listen channels if they don’t exist.

func New(listenQueue, pushQueue, addr string, l zerolog.Logger, done chan os.Signal) *Client {
threads := runtime.GOMAXPROCS(0)
if numCPU := runtime.NumCPU(); numCPU > threads {
threads = numCPU
}
client := Client{
logger: l,
threads: threads,
pushQueue: pushQueue,
done: done,
alive: true,
wg: &sync.WaitGroup{},
}
client.wg.Add(threads)
go client.handleReconnect(addr)
return &client
}
// handleReconnect will wait for a connection error on
// notifyClose, and then continuously attempt to reconnect.
func (c *Client) handleReconnect(listenQueue, addr string) {
for c.alive {
c.isConnected = false
t := time.Now()
fmt.Printf("Attempting to connect to rabbitMQ: %s\n", addr)
var retryCount int
for !c.connect(listenQueue, addr) {
if !q.alive {
return
}
select {
case <-q.done:
return
case <-time.After(reconnectDelay + time.Duration(retryCount)*time.Second):
c.logger.Printf("disconnected from rabbitMQ and failed to connect")
retryCount++
}
}
q.logger.Printf("Connected to rabbitMQ in: %vms", time.Since(t).Milliseconds())
select {
case <-c.done:
return
case <-c.notifyClose:
}
}
}
// connect will make a single attempt to connect to
// RabbitMq. It returns the success of the attempt.
func (c *Client) connect(listenQueue, addr string) bool {
conn, err := amqp.Dial(addr)
if err != nil {
c.logger.Printf("failed to dial rabbitMQ server: %v", err)
return false
}
ch, err := conn.Channel()
if err != nil {
c.logger.Printf("failed connecting to channel: %v", err)
return false
}
ch.Confirm(false)
_, err = ch.QueueDeclare(
listenQueue,
true, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
c.logger.Printf("failed to declare listen queue: %v", err)
return false
}
_, err = ch.QueueDeclare(
c.pushQueue,
true, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
c.logger.Printf("failed to declare push queue: %v", err)
return false
}
c.changeConnection(conn, ch)
c.isConnected = true
return true
}
// changeConnection takes a new connection to the queue,
// and updates the channel listeners to reflect this.
func (c *Client) changeConnection(connection *amqp.Connection, channel *amqp.Channel) {
c.connection = connection
c.channel = channel
c.notifyClose = make(chan *amqp.Error)
c.notifyConfirm = make(chan amqp.Confirmation)
c.channel.NotifyClose(c.notifyClose)
c.channel.NotifyPublish(c.notifyConfirm)
}
// Push will push data onto the queue, and wait for a confirmation.
// If no confirms are received until within the resendTimeout,
// it continuously resends messages until a confirmation is received.
// This will block until the server sends a confirm.
func (c *Client) Push(data []byte) error {
if !c.isConnected {
return errors.New("failed to push push: not connected")
}
for {
err := c.UnsafePush(data)
if err != nil {
if err == ErrDisconnected {
continue
}
return err
}
select {
case confirm := <-c.notifyConfirm:
if confirm.Ack {
return nil
}
case <-time.After(resendDelay):
}
}
}
// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// receive the message.
func (c *Client) UnsafePush(data []byte) error {
if !c.isConnected {
return ErrDisconnected
}
return c.channel.Publish(
"", // Exchange
c.name, // Routing key
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
}

The push method is quite simple. If you need to push to multiple/different queues which may often be the case, instead of providing the queue name in constructor — provide it as an argument to Push. The Push method is blocking until it succeeds, which may not be always what you’re looking for.

Stream creates threads number of goroutines that listen to messages from server, and process in in method.

func (c *Client) Stream(cancelCtx context.Context) error {
for {
if c.isConnected {
break
}
time.Sleep(1 * time.Second)
}
err := c.channel.Qos(1, 0, false)
if err != nil {
return err
}
var connectionDropped bool for i := 1; i <= c.threads; i++ {
msgs, err := c.channel.Consume(
c.streamQueue,
consumerName(i), // Consumer
false, // Auto-Ack
false, // Exclusive
false, // No-local
false, // No-Wait
nil, // Args
)
if err != nil {
return err
}
go func() {
defer c.wg.Done()
for {
select {
case <-cancelCtx.Done():
return
case msg, ok := <-msgs:
if !ok {
connectionDropped = true
return
}
c.parseEvent(msg)
}
}
}()
} c.wg.Wait() if connectionDropped {
return ErrDisconnected
}
return nil
}

The called to stream method looks something like:

go func() {
for {
err = rmq.Stream(cancelCtx)
if errors.Is(err, rabbitmq.ErrDisconnected) {
continue
}
break
}
}()

A stripped-down version of event received from the stream. It can include any data you need.

type event struct{
Job string `json:"job"`
Data string `json:"data"`
}

ParseEvent is responsible for the actual application logic. It does things depending on the received in the event, and calls other services/interfaces passed as arguments to to do the actual job. Panic recovery is responsible for nACKing the message and logging the panic.

func (c *Client) parseEvent(msg amqp.Delivery) {
l := c.logger.Log().Timestamp()
startTime := time.Now()
var evt event
err := json.Unmarshal(msg.Body, &evt)
if err != nil {
logAndNack(msg, l, startTime, "unmarshalling body: %s - %s", string(msg.Body), err.Error())
return
}
if evt.Data == "" {
logAndNack(msg, l, startTime, "received event without data")
return
}
defer func(ctx context.Context, e event, m amqp.Delivery, logger *zerolog.Event) {
if err := recover(); err != nil {
stack := make([]byte, 8096)
stack = stack[:runtime.Stack(stack, false)]
l.Bytes("stack", stack).Str("level", "fatal").Interface("error", err).Msg("panic recovery for rabbitMQ message")
msg.Nack(false, false)
}
}(ctx, evt, msg, l)
switch evt.Job {
case "job1":
// Call an actual function
err = func()
case "job1":
err = func()
default:
msg.Reject(false)
return
}
if err != nil {
logAndNack(msg, l, startTime, err.Error())
return
}
l.Str("level", "info").Int64("took-ms", time.Since(startTime).Milliseconds()).Msgf("%s succeeded" evt.Job)
msg.Ack(false)
}

Log and nack function is used to send negative acknowledgment to RabbitMQ and log the error

func logAndNack(msg amqp.Delivery, l *zerolog.Event, t time.Time, err string, args ...interface{}) {
msg.Nack(false, false)
l.Int64("took-ms", time.Since(t).Milliseconds()).Str("level", "error").Msg(fmt.Sprintf(err, args...))
}
// Close will cleanly shutdown the channel and connection after there are no messages in the system.
func (c *Client) Close() error {
if !c.isConnected {
return ni
}
c.alive = false
fmt.Println("Waiting for current messages to be processed...")
c.wg.Wait()
for i := 1; i <= c.threads; i++ {
fmt.Println("Closing consumer: ", i)
err := c.channel.Cancel(consumerName(i), false)
if err != nil {
return fmt.Errorf("error canceling consumer %s: %v", consumerName(i), err)
}
}
err := c.channel.Close()
if err != nil {
return err
}
err = c.connection.Close()
if err != nil {
return err
}
c.isConnected = false
fmt.Println("gracefully stopped rabbitMQ connection")
return nil
}
func consumerName(i int) string {
return fmt.Sprintf("go-consumer-%v", i)
}