In the previous blog post, we've taken a look at how NATS works in general, we created a pub-sub connection between two services allowing some kind of communication. The problem was, that it only worked when both publisher and subscriber were working at the same time. What if we want to allow particular pieces to go down, get redeployed and updated, but still maintain all the messages so that they are processed once the receiver wakes up? This is where we need NATS Streaming.

Streaming vs "regular" NATS

If you want to compare regular NATS with Streaming, you can say that the former is a radio show and the latter is the phone call. You can broadcast lots of things on the radio show, but you don't need anyone to be listening to. You say things and hope it reaches the audience. With a phone call, you say something and know that there is someone that listens so that the message you are communicating is expected to reach a certain party.

Regular NATS acts as a simple message bus, where you put messages and if someone listens to it at the moment they appear, they can process them. Streaming persists the messages so that if somebody is late to the party and starts listening after the messages have been sent, they can still access them. It also allows the listener to choose whether they want to see the past, or just focus on the future, but at least they have the choice.

Messages can be persisted is a number of ways, starting with in-memory storage (for development purposes mostly), flat files on the disk, or in an SQL database, so that you can access them as long as you wish (indicated by the values you put into the configuration).

Publishing messages

In order to publish messages to the NATS Streaming server, we need to have a clusterID and a unique clientID values to identify the environment and the particular service that will do the work. For the purpose of the example, the client's identifier will be generated using UUID v4:

import (
    ...
    stan "github.com/nats-io/go-nats-streaming"
    ...
)
...
// Connect to NATS-Streaming
natsClient, err := stan.Connect(clusterID, uuid.NewV4().String(), stan.NatsURL(natsSrvAddr))
if err != nil {
    log.Fatalf("Can't connect: %v.\nMake sure a NATS Streaming Server is running at: %s", err, natsSrvAddr)
}
defer natsClient.Close()

As in our previous example, we will be using protocol buffers to marshal the messages into a slice of bytes, because this is the structure that has to contain the message published on the message queue. This looks very similar to what we did in a pub-sub example:

bs, err := proto.Marshal(msg)
if err != nil {
    return errors.Wrap(err, "failed to marshal proto message")
}

if err := natsClient.Publish(topicPublishEpisode, bs); err != nil {
    return errors.Wrap(err, "failed to publish message")
}

The difference is that we don't need to Flush() anymore, and if we didn't get any errors publishing the message, it means that they are available to the subscribers.

Subscription types

When it comes to the other side, the connection part looks exactly the same. What is different, it that the subscriber needs to... subscribe to the message queue to get notified when something gets published:

subscription, err := natsClient.QueueSubscribe(topic, qgroup, handler, startOpt)
if err != nil {
    ...
}

In order to subscribe, a client needs to have qgroup which is an identifier of a group of listeners. This way, once the message is consumed by one member of the group, it does not necessarily have to be sent to other members as well.

Apart from three mandatory parameters, you can add some subscription options that allow you to alter the way data is sent to the service. For example, we have one particularly important decision to make, that is which messages should the service be interested in. We might want to process:

  • all that have ever been sent,
  • only the new ones,
  • most recent and all new,
  • all from a particular point in time,
  • all unprocessed for this service yet, etc. -->

NATS Streaming allows you to define this once you start subscribing to the channel, so that the server knows what messages should be sent to the client. We will use an ENV variable to make that choice:

switch optString {
default:
    return stan.StartAt(stanpb.StartPosition_NewOnly)
case "MOST_RECENT":
    return stan.StartWithLastReceived()
case "ALL":
    return stan.DeliverAllAvailable()
}

Working example

In our example, we will have Neatflyx, a pseudo-video-streaming service that will publish new episodes of the TV series. Then, three watcher services will start with different subscription start option, so that we can compare how those behaviors differ from each other.

First, let's start with Neatflyx and publish a few episodes of a widely beloved TV series, Friends:

neatflyx_1         | time="2018-05-02T18:17:08Z" level=info msg="Publishing on S11E01 of 'Friends' on 'https://frien.ds/11/1/episode.mp4'"
neatflyx_1         | time="2018-05-02T18:17:40Z" level=info msg="Publishing on S11E02 of 'Friends' on 'https://frien.ds/11/2/episode.mp4'"

Now we can start our first watcher which will ask for the most recently published message:

watcher_curious_1  | time="2018-05-02T18:18:48Z" level=info msg="Starting watcher service"
watcher_curious_1  | time="2018-05-02T18:18:48Z" level=info msg="Watching on S11E02 of 'Friends' on 'https://frien.ds/11/2/episode.mp4'"

As you can see, it didn't receive the first episode, only the second one. The next watcher is more patient, and will watch only the new episodes (so we need to publish something for them):

watcher_patient_1  | time="2018-05-02T18:20:18Z" level=info msg="Starting watcher service"
neatflyx_1         | time="2018-05-02T18:20:53Z" level=info msg="Publishing on S11E03 of 'Friends' on 'https://frien.ds/11/3/episode.mp4'"
watcher_patient_1  | time="2018-05-02T18:20:53Z" level=info msg="Watching on S11E03 of 'Friends' on 'https://frien.ds/11/3/episode.mp4'"

Last, but not least, is the binge watcher who wants to watch all available episodes, even the oldest ones:

watcher_binge_1    | time="2018-05-02T18:21:47Z" level=info msg="Starting watcher service"
watcher_binge_1    | time="2018-05-02T18:21:47Z" level=info msg="Watching on S11E01 of 'Friends' on 'https://frien.ds/11/1/episode.mp4'"
watcher_binge_1    | time="2018-05-02T18:21:47Z" level=info msg="Watching on S11E02 of 'Friends' on 'https://frien.ds/11/2/episode.mp4'"
watcher_binge_1    | time="2018-05-02T18:21:47Z" level=info msg="Watching on S11E03 of 'Friends' on 'https://frien.ds/11/3/episode.mp4'"

As you can see, all watchers acted just the way we expected them to. The full source code of this example is available on Github.

Links

Versions

  • go -> 1.10
  • protoc -> 3.0.0
  • nats-streaming-server -> 0.9.2
  • nats-server -> 1.0.7