Messaging with
RabbitMQ in Golang
I’m Andi Pangeran
Currently : GOPAY - Backend Developer
Jurnal.id - Senior Software Engineer
Prev :
J-travel - Tech Lead
@apangeran
Community :
@a_pangeran
https://
cybercoding.wordpress.com
Overview
Communication basic, Synchronous vs.
Asynchronous
Delivery semantic principal
RabbitMQ in a nutshell
Golang meet RabbitMQ
Production Checklist
watch your communication...
Synchronous
Sender must know the address of the receiver
Both parties need to be ready
Complete when the result has been delivered to the
Sender Receiver
sender (from receiver), assuming there is a return
value.
(Command) Message contract determined by receiver
Easier to debug (Trace)
Example : webservices (SOAP / Rest), RPC (GRPC)
Sync Overview
3.
Payment
External H2H
Service
2.
1.
4.
Order Invoice
6. Service Service
5.
Mail
Service
Asynchronous
Location Transparency
Sender Broker Receiver Sender can send whether the recipient ready or not
Does not wait around for a reply
Support message as Command & Event
Asynchronous complexity. Durable / Non Durable,
Pull vs Push, Message Sequence
Harder to Debug (Trace)
Example : RabbitMQ, Kafka, etc
Async Overview
Payment
External H2H
Service
Order Invoice
BROKER
Service Service
Mail
Service
delivery guarantees...
Two General Problem
General General
A B
Castle
Two armies, each led by a different general, are preparing to attack a fortified city. The armies are encamped near the city, each in its
own valley. A third valley separates the two hills, and the only way for the two generals to communicate is by sending messengers
through the valley. Unfortunately, the valley is occupied by the city's defenders and there's a chance that any given messenger sent
through the valley will be captured.
It is required that the two generals have their armies attack the city at the same time in order to succeed, else the lone
attacker army will die trying.
At Most Once
General General
A B
Castle
Request sent once
If it lost or the recipient fails to process it, there is no attempt to recover it (retried)
At Least Once
send and wait
confirmation from
blue, need resend okay get the message,
attack plan if no will confirm to Yellow
answer in xx time
General General
A B
Castle
Sender send message, retain message, resend it if doesn’t receive confirmation
Recipient must acknowledge receipt or execution by sending confirmation message
Resend can be the result of lost confirmation, the recipient may receive the message more than once
Exactly Once
send and wait
confirmation from
blue, need resend okay get the message,
attack plan if no will confirm to Yellow
answer in xx time
General General
A B
Castle
if can, process the message Idempotently
Implement deduplicator for keep track of which request already been processed
RabbitMQ in Nutshell...
AMQP Model
Broker
Routes
Sender Exchanger Queue Receiver
When your application publishes a message, it publishes to an exchange.
An exchange routes a message to a queue. there are Direct, Fanout, Topic
Queues wait for a consumer to be available, then deliver the message..
Direct Exchange
Broker
Queue
A queue binds to the exchange with a routing key K
When a new message with routing key R arrives at the direct exchange,
Exchanger
the exchange routes it to the queue if K = R
Queue
route message in a round robin manner.
Queue
Fanout Exchange
Broker
Queue
routes messages to all of the queues that are bound to it and the routing
key is ignored.
Exchanger when a new message is published to that exchange a copy of the message
Queue is delivered to all N queues
Queue
Topic Exchange
Broker
Queue
A queue binds to the exchange with a routing key K
When a new message with routing key R arrives at the direct exchange,
Exchanger
the exchange routes it to the queue if K = R
Queue
when a new message is published to that exchange a copy of the message
is delivered to all N queues
Queue
Acknowledgements
When a consumer (subscription) is registered, messages will be delivered
(pushed) by RabbitMQ using the basic.deliver method.
Queue Receiver
Routes RabbitMQ consider a message to be successfully
Delivery Auto-Ack,
delivered immediately after it is sent out (written to a TCP socket)
Delivery Manual, client need send acknowledgements ACK | NACK
( Requeue )
GO GET github.com/streadway/amqp
Publisher
func (client *amqpPublisher) Publish(payload interface{}, cid string) error {
if client.publishChannel == nil {
return fmt.Errorf("connection currently not available.")
}
exchangeName := ""
routingKey := client.publishQueue
data, err := json.Marshal(payload)
if err != nil {
return err
}
publishing := amqp.Publishing{
ContentType: messaging.MESSAGING_CONTENT_TYPE,
ContentEncoding: "UTF-8",
CorrelationId: cid,
DeliveryMode: amqp.Persistent,
Body: data,
}
return client.publishChannel.Publish(exchangeName, routingKey, false, false, publishing)
}
Subscriber
deadLetterQueue := c.QueueName + "_DEAD_LETTER"
if _, err := channel.QueueDeclare(deadLetterQueue, true, false, false, false, nil); err != nil {
return err
}
table := make(amqp.Table)
table["x-dead-letter-exchange"] = ""
table["x-dead-letter-routing-key"] = deadLetterQueue
if _, err := channel.QueueDeclare(c.QueueName, true, false, false, false, table); err != nil {
return err
}
if (c.Key != "") && (c.Exchange != "") {
if err := channel.QueueBind(c.QueueName, c.Key, c.Exchange, false, nil); err != nil {
return err
}
}
deliveryChan, err := channel.Consume(c.QueueName, c.consumerId, c.AutoAck, false, false, false, nil)
if err != nil {
return err
}
go c.runEventProcessor(deliveryChan)
check github.com/meong1234/go-rabbit
production checklist...
Graceful Shutdown
Wait OS interrupt signal and trigger stop
for {
select {
On Close, stop process new message wait for all worker done. ex: case d := <-deliveryChan:
if len(d.Body) == 0 {
using stop channel and waitgroup d.Nack(false, false)
return
}
func AppRunner(daemon util.Daemon) error { c.Logger.Debugf("recieved message %s\n", d.CorrelationId)
err := daemon.Start()
func (c *amqpSubscriber) Close() error { workerNumber := pool.Get()
if err != nil {
c.Logger.Debugf("close triggered") event := amqpEvent{d}
return err c.processingWG.Add(1)
close(c.stopChan)
}
go func(worker int, event *amqpEvent, log util.Logger) {
// Waiting for any tasks being processed to finish logger := log.WithField("workerNumber", worker).
osSignals := make(chan os.Signal) WithField("correlationId", event.GetCorrelationID())
c.processingWG.Wait()
signal.Notify(osSignals, os.Interrupt)
ctx := util.NewSessionCtx(event.GetCorrelationID(), logger)
if err :=
select { c.channel.Cancel(c.consumerId, false); err != nil { logger.WithField("message", string(event.GetBody())).Debug("process event started")
return err
case <-osSignals: c.processor(ctx, event)
logger.Debug("process event finish")
}util.Log.Infof("osSignal Interrupt trigerred")
return
return c.channel.Close()
daemon.Stop() defer func() {
}} c.processingWG.Done()
pool.Put(worker)
} }()
}(workerNumber, &event, c.Logger)
case <-c.stopChan:
c.Logger.Debugf("stop triggered")
return
}
}
Reconnect
Remember fallacy of distributed system, The network is reliable. we can handle by listening close event
for reconnect
for {
errors := make(chan *amqp.Error) select {
b.conn.NotifyClose(errors) case err := <-b.errors:
util.Log.Warnf("Connection lost: %v\n", err)
time.Sleep(10 * time.Second)
b.connect()
case stop := <-b.watchStop:
if stop {
return
}
}
}
Consumer Prefetch
Quality of Service (QOS), The value of this property determines the
service-time manner in which AMQP Consumers read messages from Queues.
basic.delivery
Prefetch Processing
time Auto Ack or manual ack with unlimited prefetch will lead to memory
Queue Buf Logic
consumption growth. remember golang GC -> Stop The world
basic.ack time
Remember Pool the goroutine if you are read concurrently on
delivery channel
Deduplicator ?
Remember, on Manual Acknowledge, acknowledgements can fail because connection loss or
other error.
Use Deduplicator Pattern for make sure message processed just once.
func (r *RedisDeduplicator) Visit(key string) error {
conn := r.Pool.Get()
defer conn.Close()
skey := fmt.Sprintf("%s_%s", r.Table, key)
_, err := conn.Do("SET", skey, key)
if err != nil {
type ( return err
}
Deduplicator interface {
Visit(key string) error _, err = conn.Do("EXPIRE", skey, int64(r.Ttl))
if err != nil {
IsVisited(key string) bool }
return err
}
) return nil
}
func (r *RedisDeduplicator) IsVisited(key string) bool {
conn := r.Pool.Get()
defer conn.Close()
resp, err := conn.Do("EXISTS", fmt.Sprintf("%s_%s", r.Table, key))
if err != nil {
return false
}
return resp.(int64) >= 1
}
Measurements
Nodes Utilization:
File descriptors used, Count of file descriptors used by RabbitMQ processes
File descriptors used as sockets, Count of file descriptors used as network
sockets by RabbitMQ processes
Disk space used, Bytes of disk used by a RabbitMQ node
Memory used, Bytes in RAM used by a RabbitMQ node
Exchange performance :
Messages published in, Messages published to an exchange
Messages published out, Messages that have left an exchange
Messages unroutable, Count of messages not routed to a queue
Measurements
Queue Performance:
Queue depth, Count of all messages in the queue
Messages unacknowledged, Count of messages a queue has delivered without receiving
acknowledgment from a consumer
Messages ready, Count of messages available to consumer
Message rates, Messages that move in or out of a queue per second, whether unacknowledged,
delivered, acknowledged, or redelivered
Messages persistent, Count of messages written to disk
Number of consumers, Count of consumers for a given queue
Consumer utilization, Proportion of time that the queue can deliver messages to consumers
Reference
https://www.rabbitmq.com/
https://insidethecpu.com/2014/11/11/rabbitmq-qos-vs-competing-consumers/
https://www.manning.com/books/rabbitmq-in-depth
https://www.datadoghq.com/blog/rabbitmq-monitoring/
https://www.slideshare.net/ktoso/building-a-reactive-system-with-akka-workshop-oreilly-saconf-nyc
https://en.wikipedia.org/wiki/Two_Generals%27_Problem