Implementing Messaging Queue NSQ in Golang using Docker

Timothy Agustian
Level Up Coding
Published in
5 min readNov 10, 2020

--

Message queues provide a new asynchronous way for services to communicate with each other. Compared to an API, a message queue has many advantages, such as :

  • Optimize performance
    The Producer doesn’t need to wait for the Consumer to finish their jobs thus optimizing the performance
  • Reliability
    The level of persistence in messaging queue helps to minimalize the data loss when part of the system goes down
  • Scalability
    Scaling the producer and consumer separately to meets your needs

1. Introduction to NSQ

NSQ is an open source realtime distributed messaging platform which is a successor from simplequeue.

The core component of the NSQ is consist of :

  • nsqd is the daemon that receives, queues, and delivers messages to clients.
  • nsqlookupd is the daemon that manages topology information.
    Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcast topics and channel information.
  • nsqadmin is a Web UI to view aggregated cluster stats in realtime and perform various administrative tasks.

A single nsqd instance is designed to handle multiple streams of data at once. Streams are called “topics” and a topic has 1 or more “channels”. Each channel receives a copy of all the messages for a topic.

Both topics and channels are not pre-configured. Topics are created on first publish on the named topic or by subscribing channel on the named topic. Channels are created on the first use of subscribing to the named channel.
and both buffer data independently of each other.

A channel generally does have multiple clients connected and each message will be delivered to a random client

2. Code and NSQAdmin

So here I want to try a simple implementation of NSQ in Golang using Docker based on github.com/nsqio/go-nsq by passing the struct object in messaging queue and showing the NSQAdmin

  • docker-compose.yml
# docker-compose.yml
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160:4160"
- "4161:4161"
nsqd:
image: nsqio/nsq
command: /nsqd --broadcast-address=nsqd --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4151:4151"
- "4150:4150"
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "4171:4171"

Notes: Don’t forget to add nsqd as 127.0.0.1 at your /etc/hosts
Since our Producer and Consumer are located in our localhost and the NSQ environment runs in docker, this will provide a workaround so the nsqd service in docker will be accessible both from the docker side and localhost side.

  • producer.go
package mainimport (
"encoding/json"
"github.com/nsqio/go-nsq"
"log"
"time"
)
type Message struct {
Name string
Content string
Timestamp string
}
func main() {//The only valid way to instantiate the Config
config := nsq.NewConfig()
//Creating the Producer using NSQD Address
producer, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Fatal(err)
}
//Init topic name and message
topic := "Topic_Example"
msg := Message{
Name: "Message Name Example",
Content: "Message Content Example",
Timestamp: time.Now().String(),
}
//Convert message as []byte
payload, err := json.Marshal(msg)
if err != nil {
log.Println(err)
}
//Publish the Message
err = producer.Publish(topic, payload)
if err != nil {
log.Println(err)
}
}
  • consumer.go
package mainimport (
"encoding/json"
"github.com/nsqio/go-nsq"
"log"
"os"
"os/signal"
"syscall"
"time"
)
type messageHandler struct{}type Message struct {
Name string
Content string
Timestamp string
}
func main() {
//The only valid way to instantiate the Config
config := nsq.NewConfig()
//Tweak several common setup in config
// Maximum number of times this consumer will attempt to process a message before giving up
config.MaxAttempts = 10
// Maximum number of messages to allow in flight
config.MaxInFlight = 5
// Maximum duration when REQueueing
config.MaxRequeueDelay = time.Second * 900
config.DefaultRequeueDelay = time.Second * 0
//Init topic name and channel
topic := "Topic_Example"
channel := "Channel_Example"
//Creating the consumer
consumer, err := nsq.NewConsumer(topic, channel, config)
if err != nil {
log.Fatal(err)
}
// Set the Handler for messages received by this Consumer.
consumer.AddHandler(&messageHandler{})
//Use nsqlookupd to find nsqd instances
consumer.ConnectToNSQLookupd("127.0.0.1:4161")
// wait for signal to exit
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
// Gracefully stop the consumer.
consumer.Stop()
}
// HandleMessage implements the Handler interface.
func (h *messageHandler) HandleMessage(m *nsq.Message) error {
//Process the Message
var request Message
if err := json.Unmarshal(m.Body, &request); err != nil {
log.Println("Error when Unmarshaling the message body, Err : ", err)
// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
return err
}
//Print the Message
log.Println("Message")
log.Println("--------------------")
log.Println("Name : ", request.Name)
log.Println("Content : ", request.Content)
log.Println("Timestamp : ", request.Timestamp)
log.Println("--------------------")
log.Println("")
// Will automatically set the message as finish
return nil
}

After all the code is done, then we proceed with the execution :

  • Start the docker using the command
    docker-compose up -d
  • Build the producer.go and consumer.go
    go build producer.go
    go build consumer.go
  • Execute the consumer binary first then proceed to execute the producer binary multiple times
    ./consumer
    ./producer

Notes: The consumer will need to sometimes to query the nsqlookupd

Here is the resulting screenshot

You could check the NSQadmin by :

  • Click on the selected Topic
    It will display the overview of the Topic including the total message, list of the channel, in-flight message per channel, deferred message per channel, etc
  • Click on the selected Channel
    It will show the same overview as above but in channel level

3. In Summary

Implementing the NSQ in Golang is effortless, most of the provided function is explainable by themselves and there is a lots of documentation related to this topic also the https://godoc.org/github.com/nsqio/go-nsq is easy to understand for the beginner to learn the basic functionality of messaging queue through nsq and golang.

As always, we have an opening at Tokopedia.
We are an Indonesian technology company with a mission to democratize commerce through technology and help everyone achieve more.
Find your Dream Job with us in Tokopedia!
https://www.tokopedia.com/careers/

https://godoc.org/github.com/nsqio/go-nsq
https://levelup.gitconnected.com/messaging-platform-comparison-nsq-and-apache-kafka-60f96f7466b1
https://nsq.io/

--

--