One feature of data processing in session-based scenarios is that data generated in the same Session should be delivered to the same processing unit. Different from stateless service, the difficulty lies in the following: when the cluster expands and shrinks, it needs to ensure not only load balancing but also stable mapping of data to processing units. This article introduces how Sharding technology solves this problem from a classic case, and discusses the solution to the rebalance scenario with Kafka Consumer.

Analysis of session-based routing problems

What is a Session

Data flow processing faces four basic problems 1:

  • What is the result of the data processing? Such as summation, statistics, machine learning, and other computing logic determined by business scenarios.
  • Where: When is the Event Time generated? The generation, observation, transmission and calculation of data are generally distributed in different links with time difference. Event Time restores the original Time and sequence of events.
  • When: When is data processed? The original data is continuously poured into the processing node, and the results can only be obtained by processing according to certain strategies, such as processing one by one; Batch centralized processing, etc., involves balancing latency and performance. Processing Time describes how raw data is transformed into results.
  • How: How is the data updated? Because the data comes in like a stream, batch processing is followed by reprocessing of the staged results to get the final results. The Accumulating methods may include: Discarding old data with the latest results always, Accumulating and Retracting, etc.

Session is a manifestation of the Where problem: events are not independent of each other, but rather hierarchical; some events belong to the same context and need to be processed as a whole to get the right result. For example, all online operations belong to the same session. It starts with the user logging in and ends with the user logging out. Only one session can be maintained for the same user at a time, because some services may depend on global state. Here is a specific example in the context of a game.

Application example — negative game

In mobAS and other competitive games, the core gameplay takes the form of a single game, and each single game can be considered a session. In addition to the basic game experience, single game has some bypass features, such as cheating detection, negative play, etc.

Judge Cluster is a negative match adjudication service Cluster containing multiple process instances. Each Instance runs multiple adjudicators that correspond to a specific Game I. Game I, Event j Indicates Event j in single Game I. The negative decision depends on the event context, so the event in a single office must be processed by the same corresponding adjudicator in a complete and orderly manner. Because decision logic depends on causality of behavior, causality is related to both the order of Event Time and the integrity of the Event. For example, it’s normal for a character to die in combat. But if you stand still and let someone attack you, that’s a negative suspect. This scenario has the following requirements for architectural design:

  • Must satisfy
    • Single-office events can be correctly routed to the adjudicator.
    • The route mapping must be stable.
    • Data is ordered.
  • optional
    • Lost data can be recovered. If the behavior cannot be recovered, the negative behavior of the affected office does not affect normal users, and the experience is acceptable.

These requirements represent the technical points of data delivery in the Session scenario. The following abstracts them to try to refine the general solution.

abstract

  • Message
    • Message Key The unique identifier of a Message
    • Session Key Unique identifier of the Session to which the message belongs
    • Sequence ID Sequence number of a message, used for sorting. It can be a timestamp.
  • Cluster
    • Node process instance
    • Cluster A collection of Nodes that provides external services as a whole.
    • Entity Message processing Entity, which corresponds to session and is unique in the cluster.
  • Session
    • There are duration periods: Start and Stop. Start corresponds to Entity creation, stop corresponds to Entity destruction, and procesing refers to Entity processing messages.
    • Messages in the same session must be processed by the same Entity.
    • The message-to-Entity mapping is stable.
    • The mapping between an Entity and a Node can change dynamically, such as Cluster expansion, Node message unreachable, or slow response due to Entity allocation Rebalance on Nodes.
    • Entity Whether to process messages and scenarios based on the Sequence of Event Time. If yes, focus on Sequence IDS.

Problem analysis

There is only one core issue: ensuring that messages in the Session are delivered correctly to the corresponding Entity. It seems simple, but the carrier of Entity is Nodes. Changes of Nodes are almost inevitable under various conditions. The difficulty is transformed into: how to ensure the decoupling of the Entity’s logical address and physical address (the address of the Node to which it belongs), and how to automatically update the corresponding physical address of the unchanged logical address.

Let’s take a look at what causes a change in the mapping between Nodes and Entity.

  • Entity creation and destruction correspond to changes in the Session lifecycle.
  • Cluster changes. The increase or decrease of Nodes causes Entity migration to a new Node. The possible causes are as follows:
    • Enlarge shrinks. If container-based scheduling is automated using a choreography system, it may be more frequent.
    • Dr: Nodes fail or respond slowly.
    • The update.

Rebalance updating the mapping between Nodes and Entities. Consider the following factors based on your application scenario:

  • Route update. Determine which Node to send messages to through algorithms and storage.
  • Load balancing. Provide continuous service on a basic basis.
  • Data Locality is classified into two types: space and time. For example, Kafka Producer pushes data to partitions based on Session keys. Data in the same Session goes to the same Partition. As a consumer, we should try our best to ensure that the Session data pulled is processed locally. Otherwise, send it again.

The figure above shows the basic process of data routing in the session scenario, which can be summarized as two core links:

  • Determine the Entity ID based on the message.
  • Listen for Cluster changes and obtain the node location based on the Entity ID.

Let’s look at the solution.

Existing solution

  • (a) Direct connection: The Service Client serves as the request end, configures the addresses of each Node in the back-end cluster, and implements the routing function locally. Advantages: Close association with business scenarios and high flexibility. Disadvantages: Can not automatically sense the change of back-end cluster; Not requester friendly and requires intrusive SDK integration;
  • (b) With the help of a third party: In order to be automatically aware of back-end cluster changes, a third party Service Registry can be used for monitoring, and the requester can query from The Service Registry. For example, common service discovery components such as ZooKeeper and ETCD; Directory service, domain name resolution service; Databases and so on.
  • (c) Gateway transit: Requester does not have to integrate a complex SDK to integrate routing and service discovery into the gateway. Advantages are: less invasive; Services are decoupled so you don’t have to worry about cluster change details, just the service name. The disadvantages are: the general gateway carries the general logic, and the personalized route needs the personalized gateway to increase the complexity. The communication link has an additional Hop. Procedure
  • (d) Intra-cluster routing: the complex routing is hidden inside the Cluster, and the internal node undertakes the data forwarding function. Compared with gateway forwarding, the advantages are as follows: external load balancing only needs to be considered without worrying about routing; At worst, one more hop.

The four methods mentioned above have their own applicable scenarios and are not completely independent from each other. In order to simplify the problem, an implementation scheme of Sharding for routing within (d) Cluster is described below. In this scenario, external access to the Cluster is simplified as:

  • Service discovery.
  • Load balancing. Node level coarse-grained, many algorithms, such as: random, Round Robin, minimum load, and so on.

The stable routing between Session messages and Entity is all resolved by the Sharding mechanism inside the Cluster.

Cluster Sharding

Cluster Sharding implements message routing within the Cluster. No matter which Node receives the message, it sends the message to the Node where the target Entity belongs. This requires getting the Node location based on the message’s own content and adapting to update as the Cluster scales (core Sharding problem). This approach hides the complexity of routing, and the Cluster is meant to process messages and can do all sorts of custom logic, increasing flexibility. This article introduces Sharding architecture and terminology with reference to Akka Cluster, and then describes the routing, rebalance process.

System introduction

Table 1:Introduction to Sharding terminology
symbol The full name instructions
ST Sharding Type A separate Sharding system is determined by two factors: the type and name of the message being processed.
E Entity Message processing Entity to which all messages of a session are delivered for processing.
S Shard For entities with the same functions, one Shard can only reside on the same Node and is responsible for creating, destroying, and routing internal Entities.
SR Shard Coordinator Each ST is responsible for maintaining the mapping between Shard and Node: Allocate, Rebalance.
SC Shard Region Each Node is responsible for Shard creation and routing, local Shard direct transmission, and remote Shard forwarding

A Sharding system contains four modules: /Entity, Shard, Shard Regin, and Shard Coordinator. The distinction between Sharding systems depends on logical functions. For example, single-office battle events and chat events have different processing logic, which can be distinguished by message type. However, the same combat event can be used for negative conduct adjudication as well as statistical purposes, which requires a name distinction. So an independent Sharding system can use a combination of message type and name as a unique tag. After the Sharding system is determined, a class of Entity set with the same function is defined, which is represented by EntityTypeKeyMsgType. MsgType represents the type of message that such an Entity can handle, and name is a string token.

The number of entities can be large, with hundreds of thousands in a single scenario when millions of users are online. Managing Entity directly is costly, so the approach to simulating real-world organizational structures is layered. An Entity is divided into groups and managed as a basic unit. This is called a Shard. Access to Entity is degraded to Shard access and the granularity is reduced from fine to coarse. Considering disaster recovery and expansion, Shards should not be stored in the same Node, but must be distributed on different Nodes. There are two problems:

  • From a Shard perspective: Which Node should I be on? This is aAllocate, RebalanceUnavoidable problem.
  • Node perspective: Which shards do I manage? Not in my jurisdiction. Where is Shard?

These two problems need to be solved by Shard Coordinator and Shard Region respectively.

  • Shard Coordinator:When a Shard is created and rebalanced, the mapping between Shard and Node is determined. This kind of decision needs Cluster global information and unified judgment, soSCIs the globally unique processing logic within a Cluster.
  • Shard Region:One per Node. As the manager of all Shards in the current Node, it performs three functions:
    • In a Cluster, it communicates with SC to determine the location of Shard, and communicates with SRS on other nodes to route messages.
    • Outside of Cluser, the message request is received and forwarded to the correct Shard.
    • In Node, create the destroy Shard and forward the message.

The number of shards is usually fixed, and the number of entities in each Shard changes dynamically. This ** is critical ** as it provides the possibility of stable mapping of Entity to Shard. Since a Node is a physical existence, its change is inevitable, whereas a Shard is a logical existence and can be stable. In this way, Session and Node are decoupled, simplifying the complexity of Session to Shard mapping. To ensure that shards are evenly distributed across all Nodes, you can set the number of shards to a larger upper limit of the total number of Nodes, such as 10 times the number of Nodes.

The specific operation mode of this system is introduced in combination with two key processes: routing and rebalancing.

routing

Routes are created when they do not exist; Forwarding exists. There are two scenarios based on the destination: the destination shard is local to the Node receiving the message; The Node where the target shard resides is not the Node that received the message. These two scenarios are described below.

The local routing

  1. Node A receives message Msg1 and forwards it to SR for processing. Msg is of typeType(GameID, SeqID, GameData)GameID: indicates the ID of a single office, that is, the Session Key(Session ID). SeqID indicates the message sequence of an office. GameData represents the data content of the message.
  2. SR obtains the Entity ID(E1) and Shard ID(S1) from Msg1:
    • Obtain the Entity ID. Generally, the Entity corresponds to the Session one by one. Therefore, the Entity ID is the Session ID. The Session ID contained in the Message, for example in a negative match decision, is the GameID.
    • Obtain ShardID based on EntityID. Since the total number of shards is constant, a simple and effective way is to modulo the total number of shards, which is always stable.
  3. If the SR encounters S1 for the first time, it does not know the location of S1 and needs to query S1. If the query has already been queried and nothing has changed in that time, you can simply use the local cache results.
  4. SC returns the query result to SR: S1 is on Node A.
  5. SR discovers that S1 is local, so S1 is created.
  6. S1 continues to create Entity: E1 and routes the message to E1.

The above procedure describes how the Node receives the message and creates the Shard and Entity locally. If the local route has been created, the LOCAL route does not need to interact with the SC and can be directly forwarded to the local Shard because the SR retains the local route information.

Remote routing

The general process of remote routing is similar to that of local routing. The difference is that: When SRA queries SC and finds that the target Shard(S2) is on Node B, it forwards the message to Node B. SRB creates and routes S2 locally. In this case, the message is delivered by an extra Hop. If optimization is to be considered, there are two aspects:

  • In addition to load balancing, SC allocation should consider Locality to reduce message forwarding between nodes. If a Node receives a message, assign the Shard to that Node if possible.
  • When a message is sent to a Cluster, the message can be delivered regardless of which Node it is sent to, but the Locality can be better used if the message is sent to a fixed Node steadily.

rebalancing

Changes in the status of Nodes within a Cluster, such as addition, removal, and unreachable, must involve adjustment of the mapping between Shard and Node, which is called rebalancing. There are two cases of rebalancing:

  • Add Node. The Shard needs to be migrated from the old Node to the new Node, or the new Shard can be created directly on the new Node.
  • Remove the Node. This process is mostly uncontrollable and is equivalent to recreating the Shard on the remaining Nodes, as described in the previous section.

So rebalancing is about two things:

  • How the mapping between Shard and Node is updated when adding nodes.
  • How to migrate Shard status.

The new Node

  1. Rebalance the Shard(S1) on Node B to the new Node and initialize the Shard Region on the new Node. This is the Rebalance strategy. Then start the entire migration process.
  2. Suspends all SR queries about S1 location
  3. Notify all SRS to stop sending messages to S1 and cache all input locally. The goal is to stop sending new messages to S1 and allow it to process the residue before migrating to the new location.
  4. After S1 has processed all the messages, it informs SC that it has no remaining work and can be shut down. For stateful shards, synchronization from state to S1′ is also required. SC then considers the migration process to be complete.
  5. SC Shut down S1 in Node B.
  6. SC resumes SR’s query about S1’s location and proactively notifies S1 of its new address — S1′.
  7. Cached messages, or incoming messages, are routed to the new Node according to the S1′ new address.

Shard status migration

If the Entity in the Shard is stateful and cannot be lost, the Entity state needs to be synchronized to the new node. However, the synchronization method is closely related to the business scenario, and the solution should not and need not be provided by the underlying Shard, as long as the process is well controlled. Generally, there are two ways:


  • : Store the Inputs that cause a state change in order and persist them to a third-party store, such as kafka, a message-oriented middleware. These events are pulled to a local replay when the new Entity synchronizes. This approach requires that the internal logic be fully recovered based on the input, and should be very careful when dealing with random seeds, absolute timestamps, etc., which take longer to recover if the message volume is large. A classic example is Binlog.
    ,>

  • : Saves the Status, similar to a Snapshot, for the new Entity to set the Snapshot data locally. In this way, a large amount of data may be saved and the update frequency is required. Otherwise, information will be lost.
    ,>

In practical application, it is suggested to deal with requirements:

  • Is session data really unlosable? Can lossy services be received? At least the negative behavior ruling does not require state synchronization at migration time. The worst-case scenario is that some negative behaviors go undetected during the migration, but the percentage is very low.
  • Session data cannot be lost. It is necessary to determine which solution is more suitable for Binlog and Snapshot based on service characteristics, or even a combination of both.

The order

Careful readers may find a problem. If data from any Session is sent randomly to any Node, the route is eventually routed to the correct Entity, but the order is not guaranteed. This is actually the basic problem of network communication. If there are multiple paths from A to B, the sequence in which A sends messages cannot be guaranteed to be the same as that in which B receives messages. There are two solutions:

  • Ensure that the communication link is single. For Cluster Sharding, it means that messages of the same Session should be sent to the same Node, any Node can, but should remain unchanged.
  • The Entity resorts the message after receiving it. The need to maintain a certain amount of cache space may increase processing latency.

Let’s take a look at how Sharding can be used to solve various problems in conjunction with negative behavior rulings.

Kafka application

Let’s review the example of data processing in downstream stage: 1.2. Single game data has multiple application scenarios, such as: single game settlement, player career indicators, market data statistics, negative behavior adjudication. This is a typical publish-subscribe scenario, where Kafka can be used to store single office events and decouple services from one another. Since negative behavior rulings require that the original order of messages be preserved, uniqueness should be guaranteed on the communication link.

Game Cluster is a single-office service Cluster. A single office can exist on only one Node. For example, Game 3 exists on Node 2. Game 3 acts as a Session and pushes the Game ID into Kafka as a Sessioin ID and message Key. Therefore, it is guaranteed that all messages from the same Game will enter the same Partition in order. The adjudicator service cluster acts as a Consumer Group pulling data from Kafka. Each Partition is consumed by only one fixed Consumer. When the Consumer Group is stable, the mapping between the Partition and the Consumer is stable, and so is the link to the Entity. This allows Entity 3 to consume Game 3 data in a stable and orderly manner.

Consumer Rebalance

This seems to be enough, but the actual production environment makes it difficult to keep the Consumer Group unchanged. For example:

  • Consumer Group capacity
  • Kafka kicked Consumer for slow consumption
  • Consumer fault
  • If the Consumer is hosted on K8s, it is possible to adjust the container location based on resource usage

As shown in the figure, Partition 3 was originally consumed by Consumer 1. The events in Game 1 are M1,M2,M3 in order. After M1 and M2 are consumed by C1, Consumer 3 joins in and Partition 3 is allocated to C3. So M3 in Game 1 is handled instead of C3. Obviously, G1 data were truncated and divided to different consumers for processing, and the result was incorrect.

Sharding

The desired effect is that data from the same Session can always be forwarded to the same Entity for processing no matter which Consumer consumes the Partition. After the previous introduction, Sharding is very suitable, and the application process is as follows:

  1. The message type of an office is defined asGameMsgType(GameID, SeqID, GameData)GameID is the unique tag of a Session. It is equivalent to the Session ID.
  2. The single-office service pushes the GameID as a message Key to Kafka. This ensures that messages in the same office are sent to the same Partition in sequence.
  3. The negative behavior adjudication service connects to Kafka as a consumer. Sharding mechanism needs to be enabled. First define Sharding systemEntityTypeKey, the message type isGameMsgTypeAnd the name is the Consumer Group ID. This ensures that Kafka is unique among all consumers.
  4. Define Extractor to get an EntityID from the message. A stable mapping between Session and Entity is required. In the current scenario, it can be regarded as GameID.
  5. Obtain ShardID based on EntityID: ShardID=EntityID%ShardCount(>= PartitionCount). ShardCount is a logical concept and should not be smaller than the number of partitions.
  6. Rebalance strategy. When an SC receives a request from an SR to allocate a Node to a Shard, the Node receiving the message is preferentially allocated to the Shard. The advantage is that the consumer can directly process the message locally after receiving it, thus reducing a network forwarding.

conclusion

Sharding technology aims at decoupling logical processing units from physical nodes, hiding complex routing logic inside the cluster, and reducing the complexity of external access. If combined with message queue and other data recovery technologies, it can also support message ordering and node state migration, which is more suitable for the routing solution of fine granularity stateful service in distributed system.

Footnotes

1 Tyler Akidau, Slava Chernyak & Reuven lax. Streaming Systems: The What, Where, When and How of Large-Scale Data Processing[M].O’REILLY, 2018-07-12.