preface
Most of the frameworks are event subscriptions, the observer model, or event mechanism. By subscribing to an event, a method is called back when the event is fired. This function is very easy to use, and SOFA internal design of this function, and a lot of internal use of this function. Let’s see how it’s designed.
Source code analysis
There are three core classes:
- EventBus indicates the EventBus
- Event Indicates the observed Event
- Subscriber, that is, observer
Subscriber is an abstract class, and subclasses need to implement the onEvent method, or callback method, themselves. There is also a parameter to synchronize execution.
The EventBus class implements registration and unregistration (deletion). Notify subscribers when an event occurs.
Internally, a “large data structure” is used to hold information about events and subscribers.
ConcurrentHashMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>> SUBSCRIBER_MAP
Copy the code
All relevant information is stored in this data structure.
Look at the registration feature.
public static void register(Class<? extends Event> eventClass, Subscriber subscriber) {
CopyOnWriteArraySet<Subscriber> set = SUBSCRIBER_MAP.get(eventClass);
if (set == null) {
set = new CopyOnWriteArraySet<Subscriber>();
CopyOnWriteArraySet<Subscriber> old = SUBSCRIBER_MAP.putIfAbsent(eventClass, set);
if(old ! =null) {
set = old;
}
}
set.add(subscriber);
}
Copy the code
The parameters are an event object and a subscription object.
We first get the corresponding subscriber collection from the Map based on the Class of the event. Note that this is all concurrent containers.
If the Set is null the first time a Map is retrieved, try creating a null Map. Instead of using the PUT method, use the putIfAbsent method.
if(! map.containsKey(key))return map.put(key, value);
else
return map.get(key);
Copy the code
So, again, concurrency, if there’s another thread put in this gap, we can get the Set that that thread put. Very cautious. And performance is much better than locking. While this approach is not very concurrent, it is a performance optimization.
If concurrency occurs, the existing Set is used, and then the Set is placed into the Map to complete the mapping of events to subscribers.
Look at the deregistration method.
public static void unRegister(Class<? extends Event> eventClass, Subscriber subscriber) {
CopyOnWriteArraySet<Subscriber> set = SUBSCRIBER_MAP.get(eventClass);
if(set ! =null) { set.remove(subscriber); }}Copy the code
It’s very simple, just delete it.
Now look at notifications:
public static void post(final Event event) {
if(! isEnable()) {return;
}
CopyOnWriteArraySet<Subscriber> subscribers = SUBSCRIBER_MAP.get(event.getClass());
if (CommonUtils.isNotEmpty(subscribers)) {
for (final Subscriber subscriber : subscribers) {
if (subscriber.isSync()) {
handleEvent(subscriber, event);
} else { / / asynchronous
AsyncRuntime.getAsyncThreadPool().execute(
new Runnable() {
@Override
public void run(a) { handleEvent(subscriber, event); }}); }}}}Copy the code
First check whether the bus function is enabled, which may be disabled during performance testing.
If turned on, the subscriber is found at a given time, and the handleEvent method (which is essentially calling the subscriber’s onEvent method) is looped through.
There is an asynchronous check, and if so, it is executed in the asynchronous thread pool.
AsyncRuntime AsyncRuntime AsyncRuntime AsyncRuntime AsyncRuntime AsyncRuntime
public static ThreadPoolExecutor getAsyncThreadPool(boolean build) {
if (asyncThreadPool == null && build) {
synchronized (AsyncRuntime.class) {
if (asyncThreadPool == null && build) {
// Some system parameters can be obtained from the configuration or registry.
int coresize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_CORE);
int maxsize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_MAX);
int queuesize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_QUEUE);
int keepAliveTime = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_TIME);
BlockingQueue<Runnable> queue = ThreadPoolUtils.buildQueue(queuesize);
NamedThreadFactory threadFactory = new NamedThreadFactory("SOFA-RPC-CB".true);
RejectedExecutionHandler handler = new RejectedExecutionHandler() {
private int i = 1;
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (i++ % 7= =0) {
i = 1;
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Task:{} has been reject because of threadPool exhausted!" +
" pool:{}, active:{}, queue:{}, taskcnt: {}", r, executor.getPoolSize(), executor.getActiveCount(), executor.getQueue().size(), executor.getTaskCount()); }}throw new RejectedExecutionException("Callback handler thread pool has bean exhausted"); }}; asyncThreadPool = ThreadPoolUtils.newCachedThreadPool( coresize, maxsize, keepAliveTime, queue, threadFactory, handler); }}}return asyncThreadPool;
}
Copy the code
There’s also a double-checked lock here.
The default core thread size is 10, the maximum is 200, the queue size is 256, and the reclaim time is 60 seconds.
Therefore, the fetched queue is the LinkedBlockingQueue.
For every 6 failures, print details, number of current threads, number of active threads, queue size, total number of tasks. I don’t know why this is designed. .
There are many implementations of Events in the framework, as we saw in our previous source code analysis. The subscriber currently has only one FaultToleranceSubscriber. Used for fault tolerant processing. Is a function of the FaultToleranceModule module. This function is also an extension point, and when the system is initialized, the ClientSyncReceiveEvent event and ClientAsyncReceiveEvent are registered.
conclusion
This event bus capability is really a best practice for the observer mode, allowing external modules, such as the fault-tolerant module described above, to perceive and process events occurring in the system. When a subscribed event occurs, the external module responds perfectly.