NATS Server itself does not support persistence; in this example, NATS JetStream is used
[TOC]
producers
:one: creates a connection
This part is no different from the native NATS
//DurablePub.java
Options options = new Options.Builder()
.server("nats://localhost:4222")
.reconnectWait(Duration.ofSeconds(1))
.build();
Connection nc = Nats.connect(options);
Copy the code
:two: configure and create flows
NATS Server: NATS Server: NATS Server
//DurablePub.java
private static final String STREAM = "CONTRACT";
private static final String SUBJECTS = "CONTRACT.*";
Copy the code
//DurablePub.java
//Configure and create Stream
StreamConfiguration streamConfiguration = StreamConfiguration.builder()
.name(STREAM)
.subjects(SUBJECTS)
.retentionPolicy(RetentionPolicy.WorkQueue)
.replicas(1)
.discardPolicy(DiscardPolicy.Old)
.duplicateWindow(Duration.ofSeconds(30))
.build();
StreamInfo streamInfo = nc.jetStreamManagement().addStream(streamConfiguration);
Copy the code
The attributes I configured here are explained as follows:
name
: The name of the flowsubjects
Subjects in flowretentionPolicy
: Retention policy for messages. The default value isLimitsPolicy
That is, the retention of messages is determined by various constraintsreplicas
: Number of copies of the message in the cluster. The maximum value is 5discardPolicy
: Discard policy. The default value isDiscardOld
When the message store reaches its limit, the old message is deletedduplicateWindow
: You are advised to keep the time window for message deduplication as small as possible
The full attributes are shown in the following table (quoted from the official website):
attribute | describe |
---|---|
MaxAge | The maximum age of messages in the flow, in microseconds |
MaxBytes | The maximum storage capacity of a stream. When the size of the merged stream exceeds this size, old messages are deleted |
MaxMsgSize | The maximum message size that a stream can receive |
MaxMsgs | The maximum number of messages that can be stored in a stream. When the number exceeds this value, old messages are deleted |
MaxConsumers | The maximum number of consumers a stream can have is- 1 When an unlimited |
Name | The name of the stream,Can’tWith a space, TAB or. |
NoAck | Disables ACK for messages received by a stream |
Replicas | Number of copies of messages in the cluster, maximum 5 |
Retention | Message retention policy, yesLimitsPolicy (Default),InterestPolicy .WorkQueuePolicy |
Discard | When the stream hits the limit,DiscardNew The policy rejects new messages,DiscardOld Old messages are deleted (default) |
Storage | How messages are stored, there arefile andmemory Two kinds of |
Subjects | For consumptionsubjects Collection, which supports wildcards |
Duplicates | Time window for message dereuse |
Retention Strategy
Retention policy | describe |
---|---|
LimitsPolicy |
Limit the number of messages, storage capacity, and age |
WorkQueuePolicy |
Messages are kept until consumed by an observer |
InterestPolicy |
As long as any consumer is active, the message is saved |
The MaxMsgs, MaxMsgs, and MaxAge mentioned above are used to restrict messages and are also attributes that can only be used by the LimitsPolicy policy.
Under the WorkQueuePolicy, messages are deleted as soon as a consumer receives an acknowledgement. InterestPolicy, on the other hand, deletes stories immediately if no consumers are online.
Note that under WorkQueuePolicy and InterestPolicy, the MaxMsgs, MaxMsgs, and MaxAge attributes are still valid and exist as preconditions.
Acknowledgement Models
There are three confirmation modes for consumers:
model | describe |
---|---|
AckExplicit |
Manual validation is required for each message, which is the only way the pull model supports it |
AckAll |
In this mode, if you confirm the no100 A piece of news, then1 –99 Each message is automatically acknowledged and is suitable for batch tasks to reduce the overhead of validation |
AckNone |
No validation is supported |
:three: sends the message
Once the flow is configured and created, messages can be sent
//Pub
for (int i = 0; i < 10; i++) {
nc.jetStream().publish("CONTRACT.EFFECT", generateData().getBytes(StandardCharsets.UTF_8));
}
Copy the code
//Generate Data
private static String generateData(a) {
Contract contract = new Contract(UUID.randomUUID().toString(), "EFFECT".new Date());
GsonBuilder builder = new GsonBuilder();
Gson gson = builder.create();
return gson.toJson(contract);
}
private static class Contract {
public String contractId;
public String status;
public Date signDate;
public Contract(String contractId, String status, Date signDate) {
this.contractId = contractId;
this.status = status;
this.signDate = signDate; }}Copy the code
:four: Closes the connection
nc.flush(Duration.ZERO);
nc.close();
Copy the code
consumers
The connection between the consumer and the producer will be skipped
:one: Configure and create consumer:
private static final String STREAM = "CONTRACT";
private static final String SUBJECTS = "CONTRACT.EFFECT";
private static final String CONSUMER = "consumer-1";
Copy the code
//Configure and create consumer
ConsumerConfiguration configuration = ConsumerConfiguration.builder()
//Durable Consumer Name
.durable(CONSUMER)
.filterSubject(SUBJECTS)
.replayPolicy(ReplayPolicy.Instant)
//This requires every message to be specifically acknowledged, it's the only supported option for pull-based Consumers
.ackPolicy(AckPolicy.Explicit)
.ackWait(Duration.ofSeconds(30))
.deliverPolicy(DeliverPolicy.All)
.maxDeliver(20)
.rateLimit(100)
.maxAckPending(20000)
.build();
ConsumerInfo consumerInfo = nc.jetStreamManagement()
.addOrUpdateConsumer(STREAM, configuration);
Copy the code
:two: creates a pull configuration (PushSubscribeOptions, which is similar)
PullSubscribeOptions pullSubscribeOptions = PullSubscribeOptions
.builder()
.configuration(configuration)
.build();
Copy the code
:three: Creates the subscription, reads it in batches, prints the message content, and confirms it manually
JetStreamSubscription jetStreamSubscription = js.subscribe(SUBJECTS, pullSubscribeOptions);
Iterator<Message> iter = jetStreamSubscription.iterate(10, Duration.ofMillis(1000));
while (iter.hasNext()) {
Message message = iter.next();
System.out.printf("Message Received : %s\n".new String(message.getData(), StandardCharsets.UTF_8));
message.ack();
}
Copy the code
The list of consumer configuration properties is as follows:
attribute | describe |
---|---|
AckPolicy | Message confirmation mode supportedAckNone``, ``AckAll 和 AckExplicit |
AckWait | How long a message is allowed to remain unacknowledged before it is redelivered |
DeliverPolicy | Consumer starting location strategy, supportedDeliverAll .DeliverLast .DeliverNew .DeliverByStartSequence 和DeliverByStartTime |
DeliverySubject | The Subject passing observed messages, if not set, creates a consumer of pull mode |
Durable | Name of consumer |
FilterSubject | When consuming from a stream with many Subjects or wildcards, only one particular incoming subject is selected, and wildcards are supported. |
MaxDeliver | The maximum number of times a particular message can be delivered to avoid poison-type messages that crash your system (such as falling into an infinite loop) |
OptStartSeq | When a message is first consumed from the flow, it is read from this particular message in the collection |
ReplayPolicy | Message sending mode supportedReplayInstant andReplayOriginal |
SampleFrequency | The percentage of confirmed messages in the observed sample ranges from 0 to 100 |
OptStartTime | When consuming a message from a stream for the first time, start with a message at or after this time |
RateLimit | Message transfer rate in bit/s |
MaxAckPending | The maximum number of unacknowledged messages, once reached, messages will be suspended |
Consumer Starting Position
When configuring consumers to decide where to start spending, NATS supports the following deliverpolicies:
strategy | describe |
---|---|
all |
Deliver all available messages |
last |
Deliver the most recent message, something liketail -n 1 -f |
new |
Only new messages that arrive after the subscription are delivered |
by_start_time |
To deliver messages after a specific time, you need to setOptStartTime |
by_start_sequence |
Starts with a message with a specific serial number in the streamOptStartSeq |
conclusion
NATS is much easier to use than message-oriented middleware such as RabbitMQ and Kafka, but the Java client doesn’t feel very user-friendly and the official Spring client has been shelved, which is a bit of a hassle and requires some extra work to simplify use.