Scalable Open Financial Architecture Stack (SOFAStack) is a finance-level cloud native Architecture independently developed by Ant Financial, which contains various components required to build the finance-level cloud native Architecture and is the best practice developed in the Financial scene.
SOFATracer is a component used for distributed system call tracing. It uses unified TraceId to record various network calls in the invocation link in a log to achieve the purpose of perspective network calls. The link data can be used for fault discovery and service governance.
SOFATracer:github.com/sofastack/s…
Disruptor profile
Disruptor aims to provide low-latency, high-throughput work queues in an asynchronous event processing architecture. It ensures that any data is owned by only one thread for write access, thus reducing write contention compared to other constructs. Disruptor is currently used for high performance by a number of well-known projects including Apache Storm, Camel, Log4j 2, and others.
SOFATracer also provides the ability to print logs to local disks asynchronously based on Disruptor high-performance lockless circular queues. SOFATracer provides two similar types of log printing: digest logs and statistics logs. Digest logs: logs that land on disk with each invocation. Statistics logs: statistics logs generated at regular intervals. Regardless of the type of log output, SOFATracer needs to ensure high performance to reduce the impact on the overall business process time.
For a theoretical analysis of Disruptor, see: Disruptor.
A High Performance Inter-thread Messaging Library
case
Start with a small example of Disruptor for perspective; Let’s take a look at its constructor:
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
Copy the code
- EventFactory: creates the eventFactory in the ring buffer;
- RingBufferSize: The size of the ring buffer, which must be a power of 2;
- ThreadFactory: Used to create threads for the processor;
- ProducerType: generator type to support RingBuffer creation with the correct sequencer and publisher; Enumeration type, SINGLE, MULTI two items. Corresponding to SingleProducerSequencer and MultiProducerSequencer;
- WaitStrategy: waitStrategy.
If we want to build a Disruptor, we need these components. From an eventFactory perspective, you also need a concrete Event to act as a carrier for message events. [The following is a simple modification according to the official case as an example]
Message event LongEvent, a data carrier that can be consumed
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
public long getValue(a) {
returnvalue; }}Copy the code
Create a factory for the message event
public class LongEventFactory implements EventFactory<LongEvent> {
@Override
public LongEvent newInstance(a) {
return newLongEvent(); }}Copy the code
ConsumerThreadFactory
public class ConsumerThreadFactory implements ThreadFactory {
private final AtomicInteger index = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "disruptor-thread-"+ index.getAndIncrement()); }}Copy the code
Create a disruptor. Create a disruptor.
private int ringBufferCapacity = 8;
// Message event production Factory
LongEventFactory longEventFactory = new LongEventFactory();
// Execute the event handler thread Factory
ConsumerThreadFactory consumerThreadFactory = new ConsumerThreadFactory();
// Wait policy for ring buffer.
WaitStrategy waitStrategy = new BlockingWaitStrategy();
/ / build disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(
longEventFactory,
ringBufferCapacity,
longEventThreadFactory,
ProducerType.SINGLE,
waitStrategy);
Copy the code
Disruptor is now available and can be started with: start
/ / start disruptor
disruptor.start();
Copy the code
At this point, you have built a disruptor; But how is it currently used to publish and consume messages?
news
Here are five pieces of data to publish in the for loop:
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
for (long l = 0; l < 5; l++)
{
long sequence = ringBuffer.next();
LongEvent event = ringBuffer.get(sequence);
event.set(100+l);
System.out.println("publish event :" + l);
ringBuffer.publish(sequence);
Thread.sleep(1000);
}
Copy the code
Now that the message has been published, you need to set the consumer processor for the current Disruptor. We already have LongEvent and EventFactory; Consume messages via EventHandler within Disruptor.
Writing consumer code
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Event: " + event.getValue()+"- >" + Thread.currentThread().getName());
Thread.sleep(2000); }}Copy the code
Set eventHandler to your disruptor’s processing chain:
// The event handler that will process the event -> the handler that consumes the event
LongEventHandler longEventHandler = new LongEventHandler();
disruptor.handleEventsWith(longEventHandler);
Copy the code
Run results (here)
publish event :0
Event: 0 -> disruptor-thread-1
-------------------------------->
publish event :1
Event: 1 -> disruptor-thread-1
-------------------------------->
publish event :2
Event: 2 -> disruptor-thread-1
-------------------------------->
publish event :3
Event: 3 -> disruptor-thread-1
-------------------------------->
publish event :4
Event: 4 -> disruptor-thread-1
-------------------------------->
Copy the code
Basic concepts and principles
Disruptor
The entire container based on the producer-consumer pattern implemented by ringBuffer. Main attributes:
private final RingBuffer<T> ringBuffer;
private final Executor executor;
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>();
Copy the code
- RingBuffer: Holds a ringBuffer object that is used to publish internal Disruptor events.
- Executor: a thread pool that consumes events;
- ConsumerRepository: Provides a repository mechanism for associating EventHandler with EventProcessor;
- Started: Indicates whether the current Disruptor has been started.
- ExceptionHandler: exceptionHandler that handles the uncaught exceptions of the BatchEventProcessor event cycle.
RingBuffer
Ring queue can be analogous to BlockingQueue. The use of ringBuffer allows memory to be recycled, reducing time-consuming operations such as memory allocation, recycling and expansion in some scenarios.
public final class RingBuffer<E> extends RingBufferFields<E>
implements Cursored.EventSequencer<E>, EventSink<E>
Copy the code
- E: implementation that stores data for sharing during exchange or parallel coordination of events -> message events;
Sequencer
The top parent interface of the producer in RingBuffer, which directly realizes SingleProducerSequencer and MultiProducerSequencer; Corresponding to SINGLE and MULTI enumeration values.
EventHandler
The event handler interface is extended to implement concrete consumption logic. Such as the LongEventHandler in the Demo above;
// Callback interface for processing events available in {@link RingBuffer}
public interface EventHandler<T> {
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
Copy the code
- Event: RingBuffer published events.
- Sequence: Sequence number of the event being processed.
- EndOfBatch: used to identify whether it is the last event in the batch from RingBuffer;
SequenceBarrier
Consumer roadblocks dictate how consumers go down. In fact, the barricade is a kind of deflected lock.
final class ProcessingSequenceBarrier implements SequenceBarrier {
// Wait policy when the need to wait (probe) is not available
private final WaitStrategy waitStrategy;
// The serial number of other dependent consumers, which is used in the case of dependent consumption,
// For example, there are two consumers: A and B.
private final Sequence dependentSequence;
private volatile boolean alerted = false;
// Write pointer to Ringbuffer
private final Sequence cursorSequence;
// Sequencer corresponding to RingBuffer
private final Sequencer sequencer;
//exclude method
}
Copy the code
WaitStrategy determines which waiting strategy the consumer uses.
WaitStrategy
Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}.
Wait strategy for EventProcessor; There are eight implementations of disruptor:
At the heart of the difference in these wait strategies is how waitFor is implemented.
EventProcessor
The event handler, which can be understood as a framework for the consumer model, implements the run method of the thread Runnable, enclosing operations such as loop judgments. This interface has three implementation classes:
1, BatchEventProcessor
public final class BatchEventProcessor<T> implements EventProcessor {
private final AtomicBoolean running = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler();
private final DataProvider<T> dataProvider;
private final SequenceBarrier sequenceBarrier;
private final EventHandler<? super T> eventHandler;
private final Sequence sequence = new Sequence( Sequencer.INITIAL_CURSOR_VALUE);
private final TimeoutHandler timeoutHandler;
//exclude method
}
Copy the code
- ExceptionHandler: ExceptionHandler;
- DataProvider: indicates the data source, corresponding to RingBuffer.
- EventHandler: callback object that handles events;
- 1. SequenceBarrier: a corresponding ordinal barrier;
- TimeoutHandler: TimeoutHandler. By default, the TimeoutHandler is null. If you want to set the TimeoutHandler, you only need to implement TimeoutHandler for the associated EventHandler.
If we choose to use EventHandler, the default is BatchEventProcessor, which corresponds to EventHandler one to one and executes in a single thread.
If a RingBuffer has multiple batcheventProcessors, there will be one thread for each BatchEventProcessor.
2, WorkProcessor
public final class WorkProcessor<T> implements EventProcessor {
private final AtomicBoolean running = new AtomicBoolean(false);
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final RingBuffer<T> ringBuffer;
private final SequenceBarrier sequenceBarrier;
private final WorkHandler<? super T> workHandler;
private final ExceptionHandler<? super T> exceptionHandler;
private final Sequence workSequence;
private final EventReleaser eventReleaser = new EventReleaser() {
@Override
public void release(a) { sequence.set(Long.MAX_VALUE); }};private final TimeoutHandler timeoutHandler;
}
Copy the code
Basically similar to BatchEventProcessor, except that the callback object used to process events is WorkHandler.
Schematic diagram
In the absence of consumers, producers keep producing, but remainingCapacity remains constant.
In writing the Demo, it was intended to observe variations in the available RingBuffer capacity without setting consumers. However, in the verification process, the expected result has not been achieved, (note: there is no set of consumers, only producers), let’s look at the results:
publish event :0 bufferSie:8 remainingCapacity:8 cursor:0 --------------------------------> publish event :1 bufferSie:8 remainingCapacity:8 cursor:1 --------------------------------> publish event :2 bufferSie:8 remainingCapacity:8 cursor:2 --------------------------------> publish event :3 bufferSie:8 remainingCapacity:8 cursor:3 --------------------------------> publish event :4 bufferSie:8 remainingCapacity:8 cursor:4 --------------------------------> publish event :5 bufferSie:8 remainingCapacity:8 cursor:5 --------------------------------> publish event :6 bufferSie:8 remainingCapacity:8 cursor:6 --------------------------------> publish event :7 bufferSie:8 remainingCapacity:8 cursor:7 --------------------------------> publish event :8 bufferSie:8 remainingCapacity:8 cursor:8 --------------------------------> publish event :9 bufferSie:8 remainingCapacity:8 cursor:9 -------------------------------->Copy the code
Judging from the results, the value of remainingCapacity should decrease with the number of posts; But it hasn’t really changed at all.
Look at ringBuffer. RemainingCapacity () this method:
/**
* Get the remaining capacity for this ringBuffer.
*
* @return The number of slots remaining.
*/
public long remainingCapacity(a)
{
return sequencer.remainingCapacity();
}
Copy the code
It and use the sequencer. RemainingCapacity () the method to calculate. In the above example, producerType. SINGLE is used, so let’s look at the implementation of remainingCapacity in SingleProducerSequencer.
@Override
public long remainingCapacity(a)
{
// The sequence value of the last application completed
long nextValue = this.nextValue;
// Calculates the sequence values currently consumed
long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
// The sequence value of the current production
long produced = nextValue;
return getBufferSize() - (produced - consumed);
}
Copy the code
To explain what this code means:
Assume that the current ringBuffer bufferSize is 8; The serial number applied last time is 5, in fact, it means that the serial number used in production is 5; Assuming that the current consumed serial number is 3, the remaining capacity is: 8- (5-2) = 5.
Because here we can determine the bufferSize and produced values, the result of remainingCapacity depends on the calculation result of the getMinimumSequence.
public static long getMinimumSequence(final Sequence[] sequences, long minimum)
{
for (int i = 0, n = sequences.length; i < n; i++)
{
long value = sequences[i].get();
minimum = Math.min(minimum, value);
}
return minimum;
}
Copy the code
This method takes the smallest Sequence from the Sequence array. If sequences is empty, return minimum. Go back to the previous step and take a look at where the SEQUENCES array came from and where its values were set.
long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
Copy the code
GatingSequences are member variables in SingleProducerSequencer’s parent class AbstractSequencer:
protected volatile Sequence[] gatingSequences = new Sequence[0];
Copy the code
GatingSequences are managed in the following method.
/ * * *@seeSequencer#addGatingSequences(Sequence...) * /
@Override
public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
Copy the code
The stack of calls to this method traces back to these places:
WorkerPool to manage multiple consumers; The hangdlerEventsWith method is also used to set the consumer. However, in the above test case we wanted to observe the occupation of the ring queue by setting only the producer without setting the consumer, so the gatingSequences would always be empty, so the produced value would be returned as a minimum in the calculation. So each calculation is equivalent to:
return getBufferSize() - (produced - produced) === getBufferSize();
Copy the code
This verifies why the value of remainingCapacity will remain unchanged without setting consumers.
Disruptor practices in SOFATracer
SOFATracer AsyncCommonDigestAppenderManager Disruptor for the packaging, to deal with external components of Tracer in the log. The part with the aid of AsyncCommonDigestAppenderManager SOFATracer under source to analyze how to use the Disruptor.
SOFATracer uses two different event models, a StringEvent used internally by SOFATracer and a SofaTacerSpanEvent used by external extensions. The analysis is based on the event model SofaTacerSpanEvent. StringEvent news event model corresponding to the disruptor is AsyncCommonAppenderManager class encapsulation.
SofaTracerSpanEvent ( -> LongEvent)
Define the message event model. SofaTacerSpanEvent has the same basic structure as LongEvent in the previous Demo, except that it holds different message data internally. LongEvent is a data of type Long. SofaTacerSpanEvent holds SofaTracerSpan.
public class SofaTracerSpanEvent {
private volatile SofaTracerSpan sofaTracerSpan;
public SofaTracerSpan getSofaTracerSpan(a) {
return sofaTracerSpan;
}
public void setSofaTracerSpan(SofaTracerSpan sofaTracerSpan) {
this.sofaTracerSpan = sofaTracerSpan; }}Copy the code
Consumer ( -> LongEventHandler)
Consumer is AsyncCommonDigestAppenderManager inner classes; By implementing the EventHandler interface, the consumer exists as a consumer.
There is also a in AsyncCommonAppenderManager, the local people think can go out, This enables AsyncCommonDigestAppenderManager/AsyncCommonAppenderManager code looks more clean.
private class Consumer implements EventHandler<SofaTracerSpanEvent> {
// Set of log types. Log types outside this set will not be processed
protected Set<String> logTypes = Collections.synchronizedSet(new HashSet<String>());
@Override
public void onEvent(SofaTracerSpanEvent event, long sequence, boolean endOfBatch)
throws Exception {
// Get specific message data sofaTracerSpan
SofaTracerSpan sofaTracerSpan = event.getSofaTracerSpan();
// If there is no data, no processing is done
if(sofaTracerSpan ! =null) {
try {
String logType = sofaTracerSpan.getLogType();
// Verify that the current log type can be consumed by the current consumer
if (logTypes.contains(logType)) {
// Get the encoding type
SpanEncoder encoder = contextEncoders.get(logType);
/ / get the appender
TraceAppender appender = appenders.get(logType);
// Encode the data
String encodedStr = encoder.encode(sofaTracerSpan);
if (appender instanceof LoadTestAwareAppender) {
((LoadTestAwareAppender) appender).append(encodedStr,
TracerUtils.isLoadTest(sofaTracerSpan));
} else {
appender.append(encodedStr);
}
// Flush buffer, log outputappender.flush(); }}catch (Exception e) {
// Exception omitted}}}public void addLogType(String logType) { logTypes.add(logType); }}Copy the code
SofaTracerSpanEventFactory (- > LongEventFactory)
A Factory used to generate message events.
public class SofaTracerSpanEventFactory implements EventFactory<SofaTracerSpanEvent> {
@Override
public SofaTracerSpanEvent newInstance(a) {
return newSofaTracerSpanEvent(); }}Copy the code
ConsumerThreadFactory (-> LongEventThreadFactory )
The Factory used to generate the consuming thread.
public class ConsumerThreadFactory implements ThreadFactory {
private String workName;
public String getWorkName(a) {
return workName;
}
public void setWorkName(String workName) {
this.workName = workName;
}
@Override
public Thread newThread(Runnable runnable) {
Thread worker = new Thread(runnable, "Tracer-AsyncConsumer-Thread-" + workName);
worker.setDaemon(true);
returnworker; }}Copy the code
Build the Disruptor
Disruptor build is done in the middle of AsyncCommonDigestAppenderManager constructor.
public AsyncCommonDigestAppenderManager(int queueSize, int consumerNumber) {
// Use this calculation to ensure that realQueueSize is a power of 2 (return the smallest power of 2 currently greater than or equal to queueSize)
int realQueueSize = 1< < (32 - Integer.numberOfLeadingZeros(queueSize - 1));
// Build Disruptor using producerType.multi
The wait strategy is BlockingWaitStrategy
disruptor = new Disruptor<SofaTracerSpanEvent>(new SofaTracerSpanEventFactory(),
realQueueSize, threadFactory, ProducerType.MULTI, new BlockingWaitStrategy());
// Consumer list
this.consumers = new ArrayList<Consumer>(consumerNumber);
for (int i = 0; i < consumerNumber; i++) {
Consumer consumer = new Consumer();
consumers.add(consumer);
// Set the exception handler
disruptor.setDefaultExceptionHandler(new ConsumerExceptionHandler());
// Bind the consumer
disruptor.handleEventsWith(consumer);
}
// Whether discarding is allowed can be obtained from the configuration file
this.allowDiscard = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_ALLOW_DISCARD, DEFAULT_ALLOW_DISCARD));
if (allowDiscard) {
// Whether to record the number of lost logs
this.isOutDiscardNumber = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_NUMBER,
DEFAULT_IS_OUT_DISCARD_NUMBER));
// Whether to record TraceId and RpcId of lost logs
this.isOutDiscardId = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_ID,
DEFAULT_IS_OUT_DISCARD_ID));
// When the number of lost logs reaches the threshold, logs are generated once
this.discardOutThreshold = Long.parseLong(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_DISCARD_OUT_THRESHOLD,
DEFAULT_DISCARD_OUT_THRESHOLD));
if (isOutDiscardNumber) {
this.discardCount = new PaddedAtomicLong(0L); }}}Copy the code
Start the Disruptor
Disruptor startup entrusted to the AsyncCommonDigestAppenderManager start method to execute.
public void start(final String workerName) {
this.threadFactory.setWorkName(workerName);
this.ringBuffer = this.disruptor.start();
}
Copy the code
Let’s see where SOFATracer calls start:
- CommonTracerManager: this hold the AsyncCommonDigestAppenderManager inside a singleton class, and the static static block of code to invoke the start method; This is used to print normal logs;
- SofaTracerDigestReporterAsyncManager: This class is also holding a singleton AsyncCommonDigestAppenderManager class to like, and provides a getSofaTracerDigestReporterAsyncManager method to get the singleton, The start method is called in this method; This object is used to output summary logs;
Publish event
In the previous Demo, events were published through a for loop. In SOFATracer, event publishing is triggered when there is a Tracer log that needs to be output. This corresponds to the log append operation, which appends the log to the ring buffer.
public boolean append(SofaTracerSpan sofaTracerSpan) {
long sequence = 0L;
// Whether discarding is allowed
if (allowDiscard) {
try {
If tryNext is allowed to discard the sequence, no exception will be thrown
sequence = ringBuffer.tryNext();
} catch (InsufficientCapacityException e) {
// Whether to output TraceId and RpcId of lost logs
if (isOutDiscardId) {
SofaTracerSpanContext sofaTracerSpanContext = sofaTracerSpan
.getSofaTracerSpanContext();
if(sofaTracerSpanContext ! =null) {
SynchronizingSelfLog.warn("discarded tracer: traceId["
+ sofaTracerSpanContext.getTraceId()
+ "]; spanId[" + sofaTracerSpanContext.getSpanId()
+ "]"); }}// Whether to output the number of lost logs
if ((isOutDiscardNumber) && discardCount.incrementAndGet() == discardOutThreshold) {
discardCount.set(0);
if (isOutDiscardNumber) {
SynchronizingSelfLog.warn("discarded " + discardOutThreshold + " logs"); }}return false; }}else {
// Use the next method if discards are not allowed
sequence = ringBuffer.next();
}
try {
SofaTracerSpanEvent event = ringBuffer.get(sequence);
event.setSofaTracerSpan(sofaTracerSpan);
} catch (Exception e) {
SynchronizingSelfLog.error("fail to add event");
return false;
}
/ / release
ringBuffer.publish(sequence);
return true;
}
Copy the code
Call logic for SOFATracer event publishing:
Tracing the flow of the call, you can see that a message event was published when the current SPAN call to Finish or reportSpan was called in SOFATracer.
summary
This article provides a brief analysis of SOFATracer code that uses Disruptor for logging. For more internal details, check out SOFATracer code. As a relatively low-level middleware component, SOFATracer is basically invisible in actual business development. But as a technique for learning, there’s still a lot to dig for.
SOFATracer:github.com/sofastack/s…
If you are interested in middleware, welcome to join our team. Those interested in SOFA technology system can follow the SOFAStack community
Financial Class Distributed Architecture (Antfin_SOFA)