Concurrent Message Processing in Messaging
By default, when a subscription is created, each message that is received it process sequentially. There can be multiple subscriptions setup in a [queue group][queue] in which case the NATS server will distribute messages to each member of the group.
However, even within a subscription, it may be desirable to handle messages concurrently. This example shows how this can be achieved in the clients that support it.
Code
package example;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
import io.nats.client.Options;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
public class Main {
public static void main(String[] args) {
String natsURL = System.getenv("NATS_URL");
if (natsURL == null) {
natsURL = "nats://127.0.0.1:4222";
}
Initialize a connection to the server. The connection is AutoCloseable on exit.
try (Connection nc = Nats.connect(natsURL)) {
int total = 50;
CountDownLatch latch = new CountDownLatch(total);
Create message dispatchers with queue groups for handling messages in separate threads.
for (int i = 0; i < 4; i++) {
Dispatcher dispatcher = nc.createDispatcher((msg) -> {
System.out.printf("Received %s\n",
new String(msg.getData(), StandardCharsets.UTF_8));
latch.countDown();
});
dispatcher.subscribe("greet", "queue");
}
for (int i = 0; i < total; i++) {
nc.publish("greet", String.format("hello %s", i).getBytes(StandardCharsets.UTF_8));
}
Await the dispatcher threads to have received all the messages before the program quits.
latch.await();
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
}
Output
Received hello 0 Received hello 4 Received hello 6 Received hello 1 Received hello 2 Received hello 7 Received hello 18 Received hello 5 Received hello 11 Received hello 21 Received hello 3 Received hello 19 Received hello 20 Received hello 8 Received hello 10 Received hello 22 Received hello 23 Received hello 12 Received hello 13 Received hello 9 Received hello 14 Received hello 24 Received hello 27 Received hello 28 Received hello 32 Received hello 25 Received hello 31 Received hello 35 Received hello 15 Received hello 29 Received hello 34 Received hello 37 Received hello 38 Received hello 36 Received hello 39 Received hello 40 Received hello 45 Received hello 43 Received hello 44 Received hello 46 Received hello 42 Received hello 47 Received hello 16 Received hello 17 Received hello 26 Received hello 49 Received hello 30 Received hello 33 Received hello 41 Received hello 48
Recording
Note, playback is half speed to make it a bit easier to follow.