Microsoft eShopOnContainers

EShopOnContainers is a simplified version of the base. NET Core, Docker and other technologies for reference applications of microservices architecture that not only contain a lot of terminology, design patterns, architectural styles, A range of common technologies are used (RabbitMQ, EventBus, IdentityServer4, Polly, Api Gateway, Redis, CQRS, CAP, CI/CD, etc.) as well as related tools (Docker, K8S, etc.). Can be said to be a comprehensive technical integration implementation of the application reference. Studying it will give you a clearer understanding of the practical application of the technology involved.

Compared to the rest of EshoponContainer, its domain events, integration (consolidation) events, and collaboration between event buses are somewhat obscure. So let’s analyze it.

Domain Events

Use domain events to explicitly implement the side effects of changes in the domain. When expressed in DDD terms, side effects are explicitly implemented across multiple aggregations using domain events. (Optional) To improve scalability and reduce the impact of database locking, final consistency can be used across aggregations of the same domain.

For example, in the eShopOnContainers application, when an order is created, the user becomes the buyer, Therefore OrderStartedDomainEvent will be triggered and deal with in ValidateOrAddBuyerAggregateWhenOrderStartedDomainEventHandler eshop, A domain event corresponds to a handler, which is implemented using the Mediator INotificationHandler

Domain events and integration events are the same: both are notifications of events that have occurred. However, their implementations must be different. A domain event is a message pushed to the domain event dispatcher, which can be implemented as an in-memory dump process based on the IoC container or any other method (i.e., Mediator)

The purpose of integration events is to propagate committed transactions and updates to other subsystems, whether they are other microservices, binding contexts, or external applications. (Integration events are cross-service, domain events are not)

Such as:

  1. When the user initiates an order, the order aggregation sends the OrderStarted domain event. The OrderStarted domain event is based on the original user information in the identity microservice, including the information provided in the CreateOrder command, and is aggregated by the buyer to create the buyer object when the microservice is ordered.
  2. Each OrderItem child entity can raise an event when the item price is higher than a certain amount, or when the product item amount is too high. The aggregation root can then receive these events and perform a global calculation of the total of the Order.
  3. In an application, when the article category is added or deleted, the cache needs to be refreshed to achieve unity. Then, an event of refreshing the cache can be triggered when the category is added or deleted:
/// <summary>
///Add category
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<CommandResult> Handle(CreateCategoryCommand request, CancellationToken cancellationToken)
{
    var rep = _uow.GetBaseRepository<Category>();
    var model = rep.GetFirstOrDefault(predicate:x => x.DisplayName == request.Name,disableTracking:true);
    if(model! =null)
    {
        return CommandResult.Fail("Duplicate name");
    }
    var entity = new Category(categoryName: request.Name, displayName: request.Name);
    var res= await rep.InsertAsync(entity);
    if(res.Entity! =null)
    {
        entity.AddCacheChangeDomainEvent(new List<string> { _cacheKeyMgr.PostCategoryListlKey() }); // After the event is emitted, the corresponding handler flushes the cache
        return CommandResult.Success();
    }
    return CommandResult.Fail("Add failed");
}
Copy the code

Integration Events

In event-based communication, microservices publish events when noteworthy events occur, such as when business entities are updated. Other microservices subscribe to these events. When microservices receive events, they can update their own business entities, which may lead to more events being published. This is the essence of the concept of ultimate consistency. This publish/subscribe system is typically executed using an event bus implementation. The final consistent transaction consists of a series of distributed operations. In each operation, the microservice updates the business entity and publishes an event that triggers the next operation.

Integration events are used to keep domain state synchronized across multiple microservices or external systems. This can be done by publishing integration events outside the microservice. When an event is published to multiple recipient microservices, the event is processed by the appropriate event handler in each recipient microservice. (Producer and consumer of message queue)

Integration events are at a single application level and it is not recommended to use the same integration event across applications, which leads to confusion of event sources (microservices must be independent)

EventBus

The event bus enables publish/subscribe communication without explicit recognition between components. Microservice A publishes to the event bus, which distributes to subscription microservices B and C without the publisher knowing about the subscriber. :

How to implement anonymity between publisher and subscriber? One simple way is to let the relay handle all the traffic. The event bus is one such relay station.

An event bus typically consists of two parts:

  1. Abstract or interface.
  2. One or more implementations. (the RabbitMQ \ Azure ServiceBus \ kafka, etc.)

The function of the interface is very simple, that is, only publish (publish the integration events of the system) and subscribe (subscribe to the events of the other subsystems), publish and subscribe in the process without their own need to know those subsystems are involved.

Collaboration of domain events, integration events, event buses (for example, implementation in ESHOP)

The three events interact with each other to solve the final consistency of the entire microservice system. When the data of microservice A itself changes, A series of reactions caused by this change may lead to different results of the whole system. In order to ensure that the results are consistent with expectations, there is a need for consistency, as well as the CAP principle in distributed systems.

In eShopOnContainers, when the domain event is fired, the corresponding handler saves the information of this event into the log table of the integrated event, and saves this operation by using the transaction object corresponding to the context in which the domain entity of the event occurs, so as to ensure strong internal consistency (in the handler of the domain event, Save the event to log table by obtaining the transaction object of entity DB context.

/ / here OrderCancelledDomainEvent is triggered when the handler OrderCancelledDomainEventHandler. Cs
public async Task Handle(OrderCancelledDomainEvent orderCancelledDomainEvent, CancellationToken cancellationToken)
        {
            _logger.CreateLogger<OrderCancelledDomainEvent>()
                .LogTrace("Order with Id: {OrderId} has been successfully updated to status {Status} ({Id})",
                    orderCancelledDomainEvent.Order.Id, nameof(OrderStatus.Cancelled), OrderStatus.Cancelled.Id);

            var order = await _orderRepository.GetAsync(orderCancelledDomainEvent.Order.Id);
            var buyer = await _buyerRepository.FindByIdAsync(order.GetBuyerId.Value.ToString());

            var orderStatusChangedToCancelledIntegrationEvent = new OrderStatusChangedToCancelledIntegrationEvent(order.Id, order.OrderStatus.Name, buyer.Name);
            // The domain event information is saved by integrating the event service
            await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStatusChangedToCancelledIntegrationEvent);
        }
        
/ / inheritance event service OrderingIntegrationEventService. Cs
public async Task AddAndSaveEventAsync(IntegrationEvent evt)
{
    _logger.LogInformation("----- Enqueuing integration event {IntegrationEventId} to repository ({@IntegrationEvent})",
evt.Id, evt);
    / / _orderingContext GetCurrentTransaction () acquisition domain when an event is a entity current fluctuation asked transaction object
    // TransactionBehaviour is used in esHOP due to the presence of the fetch context transaction object. Ensures the existence of the current transaction object.
    await _eventLogService.SaveEventAsync(evt, _orderingContext.GetCurrentTransaction());
}
Copy the code

And then in TransactionBehaviour, the transaction is committed, the entity changes and the corresponding domain events are recorded in the database. Then, through the integrated event service, query the domain event data to be fired according to the sub-transaction ID, and iterate through the operation:

  1. Mark as processing
  2. Publishing via the event bus (message queue or other component)
  3. In this case, you can add the policy retry function, but the response time of this operation is affected. Background tasks periodically retry these failed events. Ensure final consistency. However, consistency on the consumer side is not guaranteed (other policies need to be handled).