NATS Logo by Example

JSON for Message Payloads in Messaging

The basic structure of a NATS message modeled in the client libraries includes the subject the message was published to, the application-defined payload, and an optional set of headers (for requests, there is also a reply-to subject). The payload is a sequence of bytes, so it is up to the application to define how to serialize and deserialize the payload.

JSON is ubiquitous and simple data-interchange format that is supported in virtually all programming languages. This example demonstrates how to serialize and deserialize a message payload using a JSON library.

CLI Go Python JavaScript Rust C# .NET V2 Java Ruby Elixir Crystal C
Jump to the output or the recording
$ nbe run messaging/json/java
View the source code or learn how to run this example yourself

Code

package example;


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nats.client.Connection;
import io.nats.client.Nats;
import io.nats.client.Dispatcher;


import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;


public class Main {


    public static void main(String[] args) {
        String natsURL = System.getenv("NATS_URL");
        if (natsURL == null) {
            natsURL = "nats://127.0.0.1:4222";
        }

Initialize a connection to the server. The connection is AutoCloseable on exit.

        try (Connection nc = Nats.connect(natsURL)) {

Construct a payload and serialize it. Using Jackson in this example, but any other JSON library can be used as well (and even any other message format since the payload is just bytes).

            ObjectMapper objectMapper = new ObjectMapper();
            Payload payload = new Payload("bar", 27);
            byte[] messageBytes = objectMapper.writeValueAsBytes(payload);


            CountDownLatch latch = new CountDownLatch(2);

Create a message dispatcher for handling messages in a separate thread and then subscribe to the target subject.

            Dispatcher dispatcher = nc.createDispatcher((msg) -> {

Attempt to deserialize the payload. If deserialization fails, alternate handling can be performed.

                try {
                    Payload deserializedPayload = objectMapper.readValue(msg.getData(), Payload.class);


                    System.out.printf("received valid JSON payload: %s\n", deserializedPayload);
                } catch (IOException e) {
                    System.out.printf("received invalid JSON payload: %s\n",
                            new String(msg.getData(), StandardCharsets.UTF_8));
                } finally {
                    latch.countDown();
                }
            });


            dispatcher.subscribe("foo");

Publish the serialized payload.

            nc.publish("foo", messageBytes);
            nc.publish("foo", "not json".getBytes(StandardCharsets.UTF_8));

Await the dispatcher thread to have received all the messages before the program quits.

            latch.await();


        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }
}


class Payload {
    private final String foo;
    private final int bar;


    @JsonCreator
    Payload(@JsonProperty("foo") String foo,
            @JsonProperty("bar") int bar) {
        this.foo = foo;
        this.bar = bar;
    }


    public String getFoo() {
        return foo;
    }


    public int getBar() {
        return bar;
    }


    @Override
    public String toString() {
        return "Payload{" +
                "foo='" + foo + '\'' +
                ", bar=" + bar +
                '}';
    }
}

Output

received valid JSON payload: Payload{foo='bar', bar=27}
received invalid JSON payload: not json

Recording

Note, playback is half speed to make it a bit easier to follow.