Today’s sharing started, please give us more advice ~
One, foreword
Spring-integration is based on Spring, which enables lightweight messaging in applications and enables Integration with external systems through declarative adapters. This section Outlines the purpose of Integration as a whole. Personally, messaging is the real focus.
The typical producer-consumer pattern, as shown in the figure above, has a specific channel through which data is transferred. In fact, the default channel is blockingQueue.
Today is mainly to see a simple Hello Word come in to analyze the entire implementation process.
Take a look at the code:
Second, ServiceActivator
The code above demonstrates the inbound channel adapter for calling a method and the standard outbound channel adapter, with an annotated ServiceActivator between them. The ServiceActivator is a message endpoint.
The primary role of message endpoints is to connect application code to the messaging framework in a non-invasive manner. In other words, ideally, the application code should not know about message objects or message pipes. This is similar to the role of controller in the MVC paradigm. Just as controllers handle HTTP requests, message endpoints handle messages. Just as controllers map to URL patterns, message endpoints map to message channels. The goal is the same in both cases.
ServiceActivator is a generic endpoint for connecting service instances to messaging systems. An input message channel must be configured, and an output message channel can be provided if the service method to be invoked returns a value.
The specific process is as follows:
The above code is relatively simple, but you may find that we only define the output channel OC, the input channel IC should not be defined, is not strange? With that in mind let’s take a look at ServiceActivator source:
The comment makes it clear that if the input channel does not exist, a DirectChannel with that name will be registered in the application context. And we’ll see what that is, but we’re not going to worry about it, but we’re going to look at it step by step.
Our global search ServiceActivator, see which side he is processed, finally found MessagingAnnotationPostProcessor class, used to handle method level message annotations BeanPostProcessor implementations.
In the afterPropertiesSet method, we see that we define a postprocessor, postProcessors, that registers the relevant annotation processing classes. Contains a variety of message endpoint processing, in addition to the ServiceActivator written above, as well as various endpoint methods such as filters, routing, converters, and so on.
Then look down, and now that implements the BeanPostProcessor, it must use postProcessAfterInitialization method, the process here is probably traversal contains @ ServiceActivator bean method, used for subsequent processing. Let’s get right to the code.
Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations);
Three, postProcess
There is a common method in AbstractMethodAnnotationPostProcessor postProcess used to generate the corresponding endpoint information. Specific code:
There are two main things:
Generate the corresponding MessageHandler based on the different createHandler abstract method implementations in the template pattern. For example, our side ServiceActivatorAnnotationPostProcessor
The MessageHandler implementation is connected to the message endpoint to generate the corresponding endpoint.
1.createHandler
CreateHandler generates ServiceActivatingHandler based on the attributes of the annotations and the corresponding method parameters. Back down in the ServiceActivatingHandler finally generates a delegate object MessagingMethodInvokerHelper used to reflect the way to carry out the target method.
2.createEndpoint
CreateEndpoint literally generates the message endpoint, and in fact associates the generated handler with the corresponding pipe. Take a look at the code:
In the above code, we can clearly see why we didn’t register the input channel in the demo and it works, thus answering the previous question.
There are two channel types, one is publish and subscribe, and the other is pollable. We choose the first one by default, because DirectChannel is a SubscribableChannel by default. So we ended up generating the corresponding information endpoint class, EventDrivenConsumer.
Let’s take a look at the overall structure of EventDrivenConsumer:
EventDrivenConsumer contains an abstract class called AbstractEndpoint that implements the Lifecycle interface, so we can skip straight to the star method:
This code basically registers a handler to an inputChannel, so that as soon as the inputChannel receives a message, it tells its registered Handlers to handle it. All operations are clearly recorded in the code, so there is not much explanation.
Four, send messages
After performing the above series of registrations, these channels are cleared, and all that remains is the actual send operation. Inputchannel. send(new GenericMessage(“World”)) Look at the send operation:
Send a message on this channel. If the channel is full, this method blocks until a timeout occurs or the sending thread interrupts. If the timeout specified is 0, the method returns immediately. If less than zero, it blocks indefinitely (see Send (Message)).
Parameters:
MessageArg – The message to be sent
Timeout – timeout in milliseconds
Returns:
True if the message is sent successfully. False If the message cannot be sent within the specified time or the sending thread is interrupted
Following the actual send operation, we’ll find that the level is very deep, so for the sake of space, we’ll go straight to the main code:
HandleRequestMessage we handler before operation is to use the delegate class MessagingMethodInvokerHelper run to reflect the corresponding endpoint method, and then send the results outputChannel. Finally, we directly navigate to the specific send operation:
So when we see that, we know where the data is going, it’s stored in the queue, the data that was generated by the producer has already been generated, so the send operation is basically done.
Five, receive information
Outputchannel.receive (0).getpayload ()
Receives the first available message from the channel. If the channel does not contain any messages, this method blocks until the allocated timeout has elapsed. If the timeout specified is 0, the method returns immediately. If less than zero, it blocks indefinitely (see Receive ()).
Parameters:
Timeout – timeout in milliseconds
Returns:
First available message or NULL if no message is available or the receiving thread is interrupted during the allocated time.
The final doReceive operation, as we all know, is to read the data directly from the above queue. The code is simple, so I don’t comment on it:
Six, the concluding
Spring-integration involves more branches of the application. If you get interested in Spring-Integration, this article has served its purpose. You need to go out and do your own research, and it’s always rewarding.
Today’s share has ended, please forgive and give advice!