I’ve been searching for alternatives to Kafka for some time,
and found out that NATS and/or NATS Streaming server(a.k.a. STAN) would be a good fit for it.
Unlike Kafka, NATS and STAN are:
not dependent on JVM
not dependent on ZooKeeper
and quite light-weight, even run on Raspberry Pis!
They are not that famous as Kafka yet,
but are known to be used by many large companies in their production environments,
so I made up my mind to try it on my Raspberry Pis.
This post was written and tested with:
Get Ready
I have Go development environments on all of my Pis, so installation is not a big thing.
1. Install nats-server
$ go get -u github.com/nats-io/gnatsd
then gnatsd
will be installed as $GOPATH/bin/gnatsd
.
2. Install nats-streaming-server
$ go get -u github.com/nats-io/nats-streaming-server
then nats-streaming-server
will be installed as $GOPATH/bin/nats-streaming-server
.
There are two Pis in my house, connected behind a router:
3B+ : running on an external HDD, being used as a file server
Zero W : being used as an application server
And there is one at work, also behind a router:
3B : camera connected, being used as an application server (mostly for hobby projects)
I plan to build a NATS cluster made up of 3B+ and Zero W ,
and run STAN cluster on all three: 3B+ , Zero W , and 3B .
1. Generate self-signed certificates
My NATS servers will be exposed to the outer world,
so they should be run with TLS options.
For TLS support, I needed to generate some self-signed certificates.
I used this script :
$ ./gen_selfsigned_certs.sh myhostname.com 192.168.0.101 192.168.0.102
myhostname.com
is the domain name to my home router.
192.168.0.101
and 192.168.0.102
are internal IP addresses of my Pis: 3B++ and Zero W .
2. Build a NATS/STAN cluster with two Pis
As mentioned above, two Pis - 3B+ and Zero W - are in the same local area network.
I configured my router to port-forward these two through port 60100-60102 and 60200-60202.
A. 3B+: will work as STAN (with embedded NATS)
3B+ will work as a seed(primary) NATS cluster member,
and also as a boostrap node of STAN cluster.
Configuration file for both NATS and STAN is like following:
# filename: stan.conf
# Configuration file for nats-streaming-server: Seed (Primary)
################
# NATS specific configurations
listen: 0.0.0.0:60100 # host/port to listen for client connections
https: 0.0.0.0:60102 # HTTPS monitoring port
tls: {
cert_file: "./certs/server-cert.pem"
key_file: "./certs/server-key.pem"
ca_file: "./certs/ca.pem"
verify: true
timeout : 10
}
# Authorization for client connections
# (Generage passwords with: gnatsd/util/mkpasswd -p)
authorization {
users = [
{
user: USER
password: PASSWORD
}
]
}
# Cluster definition (seed)
cluster {
listen: 0.0.0.0:60101 # host/port for inbound route connections
tls {
cert_file: "./certs/server-cert.pem"
key_file: "./certs/server-key.pem"
ca_file: "./certs/ca.pem"
verify: true
timeout : 10
}
# Authorization for route connections
authorization {
user: CLUSTER_USER
password: PASSWORD
}
# Routes are actively solicited and connected to from this server.
# Other servers can connect to us if they supply the correct credentials
# in their routes definitions from above.
routes = [
# "nats://CLUSTER_USER:[email protected] :60201"
]
}
# logging options
debug: false
trace: false
logtime: true
log_file: "/tmp/nats.log"
# pid file
pid_file: "/tmp/nats.pid"
# Some system overides
# max_connections
max_connections: 100
# max_subscriptions (per connection)
max_subscriptions: 100
# maximum protocol control line
max_control_line: 512
# maximum payload
max_payload: 65536
# Duration the server can block on a socket write to a client.
# Exceeding the deadline will designate a client as a slow consumer.
write_deadline: "10s"
################
# STAN specific configurations
streaming: {
id : "stan-rpi"
store: "file"
dir : "/home/pi/stan/datastore"
hb_interval: "10s"
hb_timeout: "10s"
hb_fail_count: 5
#ft_group: "ft" # for fault-tolerance setting
sd: false # debug logging
sv: false # trace logging
tls: {
client_cert: "./certs/cert.pem"
client_key: "./certs/key.pem"
client_ca: "./certs/ca.pem"
}
store_limits: {
max_channels: 100
max_msgs: 1000
max_bytes: 10MB
max_age: "6h"
max_subs: 100
max_inactivity: "6h"
}
file: {
compact: true
compact_fragmentation: 50
compact_interval: 300
compact_min_size: 100MB
buffer_size: 2MB
crc: true
crc_poly: 3988292384
sync_on_flush: true
file_descriptors_limit: 100
parallel_recovery: 2
}
cluster: {
node_id: "stan-node-rpi3bplus"
bootstrap: true
#peers: ["stan-node-rpi0w", "stan-node-rpi3b"]
log_path: "/home/pi/stan/log"
log_cache_size: 1024
log_snapshots: 1
trailing_logs: 256
sync : true
raft_logging: false
}
}
With this config, NATS will be run on port 60100 and 60101(for cluster),
and web interface for monitoring will be run on port 60102.
Now run it with the config file:
$ nats-streaming-server -sc stan.conf -c stan.conf -user USER -pass PASSWORD
-sc
option specifies a config file for STAN, and -c
for NATS,
so this STAN server (with an embedded NATS server) can be run with this one config file only.
B. Zero W: will work as STAN (with standalone NATS)
Zero W will work as a secondary NATS cluster member,
and also as a non-bootstrap node of STAN cluster.
I wanted to configure it up like 3B+ as one STAN instance,
but I couldn’t make it run properly due to this issue .
So I had to run NATS and STAN servers respectively.
Configuration files for NATS and STAN are:
# filename: nats.conf
# Configuration file for nats-streaming-server: Secondary
################
# NATS specific configurations
listen: 0.0.0.0:60200 # host/port to listen for client connections
https: 0.0.0.0:60202 # HTTPS monitoring port
tls: {
cert_file: "./certs/server-cert.pem"
key_file: "./certs/server-key.pem"
ca_file: "./certs/ca.pem"
verify: true
timeout : 10
}
# Authorization for client connections
# (Generage passwords with: gnatsd/util/mkpasswd -p)
authorization {
users = [
{
user: USER
password: PASSWORD
}
]
}
# Cluster definition
cluster {
listen: 0.0.0.0:60201 # host/port for inbound route connections
tls {
cert_file: "./certs/server-cert.pem"
key_file: "./certs/server-key.pem"
ca_file: "./certs/ca.pem"
verify: true
timeout : 10
}
# Authorization for route connections
authorization {
user: CLUSTER_USER
password: PASSWORD
}
# Routes are actively solicited and connected to from this server.
# Other servers can connect to us if they supply the correct credentials
# in their routes definitions from above.
routes = [
"nats://CLUSTER_USER:[email protected] :60101"
]
}
# logging options
debug: false
trace: false
logtime: true
log_file: "/tmp/nats.log"
# pid file
pid_file: "/tmp/nats.pid"
# Some system overides
# max_connections
max_connections: 100
# max_subscriptions (per connection)
max_subscriptions: 100
# maximum protocol control line
max_control_line: 512
# maximum payload
max_payload: 65536
# Duration the server can block on a socket write to a client.
# Exceeding the deadline will designate a client as a slow consumer.
write_deadline: "10s"
and
# filename: stan.conf
# Configuration file for nats-streaming-server: Secondary
################
# NATS specific configurations
# (There is `nats_server_url` for STAN, so other NATS options are not needed)
# logging options
debug: false
trace: false
logtime: true
log_file: "/tmp/nats.log"
# pid file
pid_file: "/tmp/nats.pid"
# Some system overides
# max_connections
max_connections: 100
# max_subscriptions (per connection)
max_subscriptions: 100
# maximum protocol control line
max_control_line: 512
# maximum payload
max_payload: 65536
# Duration the server can block on a socket write to a client.
# Exceeding the deadline will designate a client as a slow consumer.
write_deadline: "10s"
################
# STAN specific configurations
streaming: {
id : "stan-rpi"
store: "file"
dir : "/home/pi/stan/datastore"
nats_server_url: "nats://USER:[email protected] :60200"
hb_interval: "10s"
hb_timeout: "10s"
hb_fail_count: 5
#ft_group: "ft" # for fault-tolerance setting
sd: false # debug logging
sv: false # trace logging
tls: {
client_cert: "./certs/cert.pem"
client_key: "./certs/key.pem"
client_ca: "./certs/ca.pem"
}
store_limits: {
max_channels: 100
max_msgs: 1000
max_bytes: 10MB
max_age: "6h"
max_subs: 100
max_inactivity: "6h"
}
file: {
compact: true
compact_fragmentation: 50
compact_interval: 300
compact_min_size: 100MB
buffer_size: 2MB
crc: true
sync_on_flush: true
file_descriptors_limit: 100
parallel_recovery: 1
}
cluster: {
node_id: "stan-node-rpi0w"
bootstrap: false
#peers: ["stan-node-rpi3bplus", "stan-node-rpi3b"]
log_path: "/home/pi/stan/log"
log_cache_size: 1024
log_snapshots: 1
trailing_logs: 256
sync : true
raft_logging: false
}
}
With these config files, NATS will be run on port 60200 and 60201(for cluster),
and web interface will be run on port 60202.
They can be run with following commands:
$ gnatsd -c nats.conf
and
$ nats-streaming-server -sc stan.conf -user USER -pass PASSWORD
3. Build a remote STAN cluster
3B is not in the same network, and not even exposed to the outer world,
but still can be a node of a STAN cluster.
A. 3B: will work as STAN node
Configuration file for STAN:
# filename: stan.conf
# Configuration file for nats-streaming-server: External
# logging options
debug: false
trace: false
logtime: true
log_file: "/tmp/nats.log"
# pid file
pid_file: "/tmp/nats.pid"
# Some system overides
# max_connections
max_connections: 100
# max_subscriptions (per connection)
max_subscriptions: 100
# maximum protocol control line
max_control_line: 512
# maximum payload
max_payload: 65536
# Duration the server can block on a socket write to a client.
# Exceeding the deadline will designate a client as a slow consumer.
write_deadline: "15s"
################
# STAN specific configurations
streaming: {
id : "stan-rpi"
store: "file"
dir : "/home/pi/stan/datastore"
nats_server_url: "nats://USER:[email protected] :60100, nats://USER:[email protected] :60200"
hb_interval: "10s"
hb_timeout: "10s"
hb_fail_count: 5
#ft_group: "ft" # for fault-tolerance setting
sd: false # debug logging
sv: false # trace logging
tls: {
client_cert: "./certs/cert.pem"
client_key: "./certs/key.pem"
client_ca: "./certs/ca.pem"
}
store_limits: {
max_channels: 100
max_msgs: 1000
max_bytes: 10MB
max_age: "6h"
max_subs: 100
max_inactivity: "6h"
}
file: {
compact: true
compact_fragmentation: 50
compact_interval: 300
compact_min_size: 100MB
buffer_size: 2MB
crc: true
sync_on_flush: true
file_descriptors_limit: 100
parallel_recovery: 1
}
cluster: {
node_id: "stan-node-rpi3b"
bootstrap: false
#peers: ["stan-node-rpi3bplus", "stan-node-rpi0w"]
log_path: "/home/pi/stan/log"
log_cache_size: 1024
log_snapshots: 1
trailing_logs: 256
sync : true
raft_logging: false
}
}
Now run it with:
$ nats-streaming-server -sc stan.conf
then it will connect to the NATS server which is already configured on myhostname.com
, and start STAN.
Resulting Network Topology
Monitor
You can make sure all servers are running well by looking into logs saved in /tmp/nats.log
.
And also, you can connect to the web interfaces through https://192.168.0.101/60102
or https://192.168.0.102/60202
.
Run as services
Following files are systemd service files I made.
They can be enabled/started/stopped with commands like these:
$ sudo cp stan.service /lib/systemd/system/
$ sudo systemctl enable stan.service
$ sudo systemctl start stan.service
$ sudo systemctl stop stan.service
1. STAN service file for 3B+
stan.service
file:
[ Unit]
Description = NATS Streaming Server
After = syslog.target network.target
[ Service]
PrivateTmp = true
Type = simple
WorkingDirectory = /home/pi/stan
ExecStart = /home/pi/go/bin/nats-streaming-server -sc stan.conf -c stan.conf -user USER -pass PASSWORD
ExecReload = /bin/kill -s HUP $MAINPID
ExecStop = /bin/kill -s SIGINT $MAINPID
User = pi
Group = pi
[ Install]
WantedBy = multi-user.target
2. NATS and STAN service files for Zero W
nats.service
file:
[ Unit]
Description = NATS Server
After = syslog.target network.target
[ Service]
PrivateTmp = true
Type = simple
WorkingDirectory = /home/pi/stan
ExecStart = /home/pi/go/bin/gnatsd -c nats.conf
ExecReload = /bin/kill -s HUP $MAINPID
ExecStop = /bin/kill -s SIGINT $MAINPID
User = pi
Group = pi
[ Install]
WantedBy = multi-user.target
stan.service
file:
[ Unit]
Description = NATS Streaming Server
Wants = nats.service
After = syslog.target network.target nats.service
[ Service]
PrivateTmp = true
Type = simple
WorkingDirectory = /home/pi/stan
ExecStart = /home/pi/go/bin/nats-streaming-server -sc stan.conf -user USER -pass PASSWORD
ExecReload = /bin/kill -s HUP $MAINPID
ExecStop = /bin/kill -s SIGINT $MAINPID
User = pi
Group = pi
[ Install]
WantedBy = multi-user.target
STAN depends on NATS, so nats.service was added to After in stan.service
file.
3. STAN service file for 3B
stan.service
file:
[ Unit]
Description = NATS Streaming Server
After = syslog.target network.target
[ Service]
PrivateTmp = true
Type = simple
WorkingDirectory = /home/pi/stan
ExecStart = /home/pi/go/bin/nats-streaming-server -sc stan.conf
ExecReload = /bin/kill -s HUP $MAINPID
ExecStop = /bin/kill -s SIGINT $MAINPID
User = pi
Group = pi
[ Install]
WantedBy = multi-user.target
Test with a client
Create a sample client from this code :
// main.go
package main
import (
"encoding/json"
"log"
"os"
"os/signal"
"time"
"github.com/meinside/stan-client-go"
"github.com/nats-io/go-nats-streaming"
)
const (
delaySeconds = 5
queueGroupUnique = "unique"
durableDefault = "durable"
natsServerURL1 = "nats://myhostname.com:60100"
natsServerURL2 = "nats://myhostname.com:60200"
clientCertPath = "./certs/cert.pem"
clientKeyPath = "./certs/key.pem"
rootCaPath = "./certs/ca.pem"
natsUsername = "USER"
natsPassword = "PASSWORD"
clusterID = "stan-rpi"
clientID = "pingpong-client"
subjectPing = "ping"
subjectPong = "pong"
)
type pingPong struct {
Message string `json:"message"`
}
type logger struct {
}
func ( l * logger ) Log ( msg string ) {
log . Println ( msg )
}
func ( l * logger ) Error ( err string ) {
log . Println ( "ERROR: " + err )
}
var sc * stanclient . Client
func main () {
// for catching signals
interrupt := make ( chan os . Signal , 1 )
signal . Notify ( interrupt , os . Interrupt )
sc = stanclient . Connect (
[] string { natsServerURL1 , natsServerURL2 },
stanclient . AuthOptionWithUsernameAndPassword ( natsUsername , natsPassword ),
stanclient . SecOptionWithCerts ( clientCertPath , clientKeyPath , rootCaPath ),
clusterID ,
clientID ,
[] stanclient . ToSubscribe {
stanclient . ToSubscribe {
Subject : subjectPing ,
QueueGroupName : queueGroupUnique ,
DurableName : durableDefault ,
DeliverAll : true ,
},
stanclient . ToSubscribe {
Subject : subjectPong ,
QueueGroupName : queueGroupUnique ,
DurableName : durableDefault ,
DeliverAll : true ,
},
},
handlePingPong ,
publishFailed ,
& logger {},
)
go func () {
time . Sleep ( delaySeconds * time . Second )
sc . Publish ( subjectPing , pingPong { Message : "initial ping" })
}()
go sc . Poll ()
// wait...
loop :
for {
select {
case <- interrupt :
break loop
}
}
sc . Close ()
log . Println ( "Application terminating..." )
}
func handlePingPong ( message * stan . Msg ) {
var data pingPong
err := json . Unmarshal ( message . Data , & data )
if err != nil {
log . Printf ( "Failed to unmarshal data: %s" , err )
return
}
switch message . Subject {
case subjectPing :
log . Printf ( "Received PING: %s" , data . Message )
time . Sleep ( delaySeconds * time . Second )
sc . Publish ( subjectPong , pingPong { Message : "pong for ping" })
case subjectPong :
log . Printf ( "Received PONG: %s" , data . Message )
time . Sleep ( delaySeconds * time . Second )
sc . Publish ( subjectPing , pingPong { Message : "ping for pong" })
}
}
func publishFailed ( subject , nuid string , obj interface {}) {
log . Printf ( "Failed to publish to subject: %s, nuid: %s, value: %+v" , subject , nuid , obj )
// resend it later
go func ( subject string , obj interface {}) {
time . Sleep ( delaySeconds * time . Second )
log . Println ( "Retrying publishing..." )
sc . Publish ( subject , obj )
}( subject , obj )
}
and run it with:
$ go run main.go
Pings and pongs!
Now we have a tiny, but clustered message queue system on Raspberry Pis!
Trouble Shooting
1. Bad certificates?
If you see errors about certificates, make sure there are all needed IP addresses and domain names in them.
You can see the contents of your certificates with:
$ openssl x509 -in server-cert.pem -text
2. Timeout Errors?
If you see any timeout error on server launches, increase the timeout
values in the config files and retry.
Some tasks may be too much for our little Raspberry Pis… :-(
Wrap-Up
NATS/STAN is a light-weight, but powerful message queue system that can be run even on Raspberry Pis.
Message queue system is often said to be the best infra structure for microservices,
so it would be a good start to play with our spare Raspberry Pis :-)