NATS Logo by Example

Queue Push Consumers (legacy) in JetStream

A queue push consumer is analogous to a core NATS queue group, but designed to work with streams. Unlike a standard push consumer which only supports a single bound subscription at any time, a queue-based one supports multiple subscriptions bound to the consumer. Messages from the consumer will be distributed randomly among active subscribers which can freely come and go.

In this example, we will demonstrate how to create a durable queue push consumer and how to bind subscriptions to receive and process messages.

Note that as of NATS server v2.8.4, ephemeral queue push consumers are not supported. This means that the server does not currently keep track of these and will auto-cleanup if no active subscriptions are bound. You can, of course, create a durable and then delete once you are done with it, but if the deletion fails to happen (program crashes), you will need to be sure to check when the starts up again.

It is recommended to review the standard push consumer example in order to understand the general concepts. The queue-based variant is a small extension to this type of consumer.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run jetstream/queue-push-consumer/java
View the source code or learn how to run this example yourself

Code

package example;


import io.nats.client.*;
import io.nats.client.api.*;


import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;


public class Main {
    public static void main(String[] args) {
        String natsURL = System.getenv("NATS_URL");
        if (natsURL == null) {
            natsURL = "nats://127.0.0.1:4222";
        }

Initialize a connection to the server. The connection is AutoCloseable on exit.

        try (Connection nc = Nats.connect(natsURL)) {

Access JetStream and JetStreamManagement which provide methods to create streams and consumers as well as convenience methods for publishing to streams and consuming messages from the streams.

            JetStream js = nc.jetStream();
            JetStreamManagement jsm = nc.jetStreamManagement();

Declare a simple stream.

            String streamName = "EVENTS";
            StreamConfiguration config = StreamConfiguration.builder()
                    .name(streamName)
                    .subjects("events.>")
                    .build();


            StreamInfo stream = jsm.addStream(config);

Durable (implicit)

Like the standard push consumer, the JetStream context provides a simple way to create a queue push consumer. The only additional argument is the “group name”.

            System.out.println("# Durable (implicit)");
            PushSubscribeOptions pso = ConsumerConfiguration.builder()
                    .durable("durable")
                    .deliverGroup("event-processor")
                    .buildPushSubscribeOptions();
            JetStreamSubscription sub1 = js.subscribe("events.>", "event-processor", pso);

If we inspect the consumer info, we will notice a property that was not defined for the non-queue push consumer. The DeliverGroup is the unique name of the group of subscribers. Internally, this corresponds to a core NATS queue group name which we will see below.

            ConsumerInfo info = jsm.getConsumerInfo(streamName, "durable");
            ConsumerConfiguration consumerConfig = info.getConsumerConfiguration();
            System.out.printf("Deliver group: '%s'\n", consumerConfig.getDeliverGroup());

Using the same helper method, we can create another subscription in the group. This method implicitly checks for whether the consumer has been created and binds to it based on the subject and group name.

            JetStreamSubscription sub2 = js.subscribe("events.>", "event-processor", pso);

As noted above, a queue push consumer relies on a core NATS queue group for distributing messages to active members. As a result, we can bind a subscription by using the DeliverSubject and the DeliverGroup Since messages are publishing to a dedicated subject and is part of a group, we can also create a core NATS subscription to join the group. As a reminder, the DeliverSubject is randomly generated by default, but this can be set explicitly in the consumer config if desired.

            Subscription sub3 = nc.subscribe(consumerConfig.getDeliverSubject(), consumerConfig.getDeliverGroup());
            System.out.printf("Deliver subject: '%s'\n", consumerConfig.getDeliverSubject());

Now we can publish some messages to the stream to observe how they are distributed to the subscribers.

            js.publish("events.1", null);
            js.publish("events.2", null);
            js.publish("events.3", null);

As noted in the push consumer example, subscriptions enqueue messages proactively. When there are a group of subscriptions, each will receive a different subset of the messages. When calling nextMessage this means, messages can be processed out of order. There is no correlation with message order and subscription creation order 😉. In fact, it is possible that not all subscriptions will necessarily get a message.

            Message msg = sub1.nextMessage(Duration.ofSeconds(1));
            if (msg != null) {
                System.out.printf("sub1: received message '%s'\n", msg.getSubject());
                msg.ack();
            } else {
                System.out.println("sub1: receive timeout");
            }


            msg = sub2.nextMessage(Duration.ofSeconds(1));
            if (msg != null) {
                System.out.printf("sub2: received message '%s'\n", msg.getSubject());
                msg.ack();
            } else {
                System.out.println("sub2: receive timeout");
            }


            msg = sub3.nextMessage(Duration.ofSeconds(1));
            if (msg != null) {
                System.out.printf("sub3: received message '%s'\n", msg.getSubject());
                msg.ack();
            } else {
                System.out.println("sub3: receive timeout");
            }

Since we created this consumer using the helper method, when we unsubscribe (or call drain), the consumer will be deleted.

            sub1.unsubscribe();
            sub2.unsubscribe();
            sub3.unsubscribe();

Durable (explicit)

To create a (safe) durable consumer, use the addOrUpdateConsumer method. Although it may seem redundant, a durable name and the deliver group name must be defined. This is simply because the durable name is used for all consumer types, while the deliver group is exclusive to the queue push consumer. In this example, the same name is used as convention which is what the helper method above did as well.

            System.out.println("\n# Durable (explicit)");


            String consumerName = "event-processor";
            ConsumerConfiguration cc = ConsumerConfiguration.builder()
                    .durable(consumerName)
                    .deliverSubject("my-subject")
                    .deliverGroup(consumerName)
                    .ackPolicy(AckPolicy.Explicit)
                    .build();
            info = jsm.addOrUpdateConsumer(streamName, cc);

Wait for all 6 messages to be received before exiting.

            CountDownLatch latch = new CountDownLatch(6);

For this part, we will use async core NATS queue subscriptions. Since core NATS subscriptions are JetStream-unaware, we must call msg.ack explicitly to notify the server that the message has been processed.

            Dispatcher dispatcher = nc.createDispatcher();
            dispatcher.subscribe("my-subject", "event-processor", m -> {
                System.out.printf("sub1: received message '%s'\n", m.getSubject());
                m.ack();
                latch.countDown();
            });
            dispatcher.subscribe("my-subject", "event-processor", m -> {
                System.out.printf("sub2: received message '%s'\n", m.getSubject());
                m.ack();
                latch.countDown();
            });
            dispatcher.subscribe("my-subject", "event-processor", m -> {
                System.out.printf("sub3: received message '%s'\n", m.getSubject());
                m.ack();
                latch.countDown();
            });

Publish some more messages.

            js.publish("events.4", null);
            js.publish("events.5", null);
            js.publish("events.6", null);
            js.publish("events.7", null);
            js.publish("events.8", null);
            js.publish("events.9", null);


            latch.await();
        } catch (InterruptedException | IOException | JetStreamApiException e) {
            e.printStackTrace();
        }
    }
}

Output

# Durable (implicit)
Deliver group: 'event-processor'
Deliver subject: '_INBOX.t3PHt1Zy6CkrgtWzKOGM8Y'
sub1: received message 'events.2'
sub2: received message 'events.1'
sub3: receive timeout

# Durable (explicit)
sub1: received message 'events.1'
sub1: received message 'events.2'
sub1: received message 'events.3'
sub1: received message 'events.4'
sub2: received message 'events.5'
sub2: received message 'events.6'
sub2: received message 'events.7'
sub2: received message 'events.8'
sub3: received message 'events.9'

Recording

Note, playback is half speed to make it a bit easier to follow.