NATS Server itself does not support persistence; in this example, NATS JetStream is used

[TOC]

producers

:one: creates a connection

This part is no different from the native NATS

//DurablePub.java
Options options = new Options.Builder()
    .server("nats://localhost:4222")
    .reconnectWait(Duration.ofSeconds(1))
    .build();
Connection nc = Nats.connect(options);
Copy the code

:two: configure and create flows

NATS Server: NATS Server: NATS Server

//DurablePub.java
private static final String STREAM = "CONTRACT";
private static final String SUBJECTS = "CONTRACT.*";
Copy the code
//DurablePub.java
//Configure and create Stream
StreamConfiguration streamConfiguration = StreamConfiguration.builder()
    .name(STREAM)
    .subjects(SUBJECTS)
    .retentionPolicy(RetentionPolicy.WorkQueue)
    .replicas(1)
    .discardPolicy(DiscardPolicy.Old)
    .duplicateWindow(Duration.ofSeconds(30))
    .build();

StreamInfo streamInfo = nc.jetStreamManagement().addStream(streamConfiguration);
Copy the code

The attributes I configured here are explained as follows:

  • name: The name of the flow
  • subjectsSubjects in flow
  • retentionPolicy: Retention policy for messages. The default value isLimitsPolicyThat is, the retention of messages is determined by various constraints
  • replicas: Number of copies of the message in the cluster. The maximum value is 5
  • discardPolicy: Discard policy. The default value isDiscardOldWhen the message store reaches its limit, the old message is deleted
  • duplicateWindow: You are advised to keep the time window for message deduplication as small as possible

The full attributes are shown in the following table (quoted from the official website):

attribute describe
MaxAge The maximum age of messages in the flow, in microseconds
MaxBytes The maximum storage capacity of a stream. When the size of the merged stream exceeds this size, old messages are deleted
MaxMsgSize The maximum message size that a stream can receive
MaxMsgs The maximum number of messages that can be stored in a stream. When the number exceeds this value, old messages are deleted
MaxConsumers The maximum number of consumers a stream can have is- 1When an unlimited
Name The name of the stream,Can’tWith a space, TAB or.
NoAck Disables ACK for messages received by a stream
Replicas Number of copies of messages in the cluster, maximum 5
Retention Message retention policy, yesLimitsPolicy(Default),InterestPolicy.WorkQueuePolicy
Discard When the stream hits the limit,DiscardNewThe policy rejects new messages,DiscardOldOld messages are deleted (default)
Storage How messages are stored, there arefileandmemoryTwo kinds of
Subjects For consumptionsubjectsCollection, which supports wildcards
Duplicates Time window for message dereuse

Retention Strategy

Retention policy describe
LimitsPolicy Limit the number of messages, storage capacity, and age
WorkQueuePolicy Messages are kept until consumed by an observer
InterestPolicy As long as any consumer is active, the message is saved

The MaxMsgs, MaxMsgs, and MaxAge mentioned above are used to restrict messages and are also attributes that can only be used by the LimitsPolicy policy.

Under the WorkQueuePolicy, messages are deleted as soon as a consumer receives an acknowledgement. InterestPolicy, on the other hand, deletes stories immediately if no consumers are online.

Note that under WorkQueuePolicy and InterestPolicy, the MaxMsgs, MaxMsgs, and MaxAge attributes are still valid and exist as preconditions.

Acknowledgement Models

There are three confirmation modes for consumers:

model describe
AckExplicit Manual validation is required for each message, which is the only way the pull model supports it
AckAll In this mode, if you confirm the no100A piece of news, then199Each message is automatically acknowledged and is suitable for batch tasks to reduce the overhead of validation
AckNone No validation is supported

:three: sends the message

Once the flow is configured and created, messages can be sent

//Pub
for (int i = 0; i < 10; i++) {
  nc.jetStream().publish("CONTRACT.EFFECT", generateData().getBytes(StandardCharsets.UTF_8));
}
Copy the code
//Generate Data
private static String generateData(a) {
  Contract contract = new Contract(UUID.randomUUID().toString(), "EFFECT".new Date());
  GsonBuilder builder = new GsonBuilder();
  Gson gson = builder.create();
  return gson.toJson(contract);
}

 private static class Contract {

    public String contractId;

    public String status;

    public Date signDate;

    public Contract(String contractId, String status, Date signDate) {
      this.contractId = contractId;
      this.status = status;
      this.signDate = signDate; }}Copy the code

:four: Closes the connection

nc.flush(Duration.ZERO);
nc.close();
Copy the code

consumers

The connection between the consumer and the producer will be skipped

:one: Configure and create consumer:

private static final String STREAM = "CONTRACT";
private static final String SUBJECTS = "CONTRACT.EFFECT";
private static final String CONSUMER = "consumer-1";
Copy the code
//Configure and create consumer
ConsumerConfiguration configuration = ConsumerConfiguration.builder()
    //Durable Consumer Name
    .durable(CONSUMER)
    .filterSubject(SUBJECTS)
    .replayPolicy(ReplayPolicy.Instant)
    //This requires every message to be specifically acknowledged, it's the only supported option for pull-based Consumers
    .ackPolicy(AckPolicy.Explicit)
    .ackWait(Duration.ofSeconds(30))
    .deliverPolicy(DeliverPolicy.All)
    .maxDeliver(20)
    .rateLimit(100)
    .maxAckPending(20000)
    .build();

ConsumerInfo consumerInfo = nc.jetStreamManagement()
    .addOrUpdateConsumer(STREAM, configuration);
Copy the code

:two: creates a pull configuration (PushSubscribeOptions, which is similar)

PullSubscribeOptions pullSubscribeOptions = PullSubscribeOptions
          .builder()
          .configuration(configuration)
          .build();
Copy the code

:three: Creates the subscription, reads it in batches, prints the message content, and confirms it manually

JetStreamSubscription jetStreamSubscription = js.subscribe(SUBJECTS, pullSubscribeOptions);

Iterator<Message> iter = jetStreamSubscription.iterate(10, Duration.ofMillis(1000));
while (iter.hasNext()) {
  Message message = iter.next();
  System.out.printf("Message Received : %s\n".new String(message.getData(), StandardCharsets.UTF_8));
  message.ack();
}
Copy the code

The list of consumer configuration properties is as follows:

attribute describe
AckPolicy Message confirmation mode supportedAckNone``, ``AckAllAckExplicit
AckWait How long a message is allowed to remain unacknowledged before it is redelivered
DeliverPolicy Consumer starting location strategy, supportedDeliverAll.DeliverLast.DeliverNew.DeliverByStartSequenceDeliverByStartTime
DeliverySubject The Subject passing observed messages, if not set, creates a consumer of pull mode
Durable Name of consumer
FilterSubject When consuming from a stream with many Subjects or wildcards, only one particular incoming subject is selected, and wildcards are supported.
MaxDeliver The maximum number of times a particular message can be delivered to avoid poison-type messages that crash your system (such as falling into an infinite loop)
OptStartSeq When a message is first consumed from the flow, it is read from this particular message in the collection
ReplayPolicy Message sending mode supportedReplayInstantandReplayOriginal
SampleFrequency The percentage of confirmed messages in the observed sample ranges from 0 to 100
OptStartTime When consuming a message from a stream for the first time, start with a message at or after this time
RateLimit Message transfer rate in bit/s
MaxAckPending The maximum number of unacknowledged messages, once reached, messages will be suspended

Consumer Starting Position

When configuring consumers to decide where to start spending, NATS supports the following deliverpolicies:

strategy describe
all Deliver all available messages
last Deliver the most recent message, something liketail -n 1 -f
new Only new messages that arrive after the subscription are delivered
by_start_time To deliver messages after a specific time, you need to setOptStartTime
by_start_sequence Starts with a message with a specific serial number in the streamOptStartSeq

conclusion

NATS is much easier to use than message-oriented middleware such as RabbitMQ and Kafka, but the Java client doesn’t feel very user-friendly and the official Spring client has been shelved, which is a bit of a hassle and requires some extra work to simplify use.