The core interface of the server module is CanalServer, which has two implementation classes CanalServerWithNetty and CanalServerWithEmbeded. CanalServer is described in the official documentation as follows:

Below is the author’s further description of the official document:

The figure on the left

Represents Canal standalone deployment. Different applications communicate with the CanalServer through the Canal Client, and all requests of the Canal Client are uniformly accepted by canal Server with Netty. CanalServerWithNetty then sends the client request to CanalServerWithEmbeded for actual processing. CannalServerWithEmbeded internally maintains multiple Canal instances, each masquerading as a slave of a different mysql instance, However, CanalServerWithEmbeded determines which Canal Instance should provide service for it according to the destination parameter carried by the client request.

The right of the figure

Canal server withembeded is directly embedded in the application, without the need to deploy canal independently. Obviously, with fewer network communication links, binlog information can be synchronized more efficiently. However, the technical requirements for users are relatively high. In the application, we can through CanalServerWithEmbeded. The instance () method to obtain CanalServerWithEmbeded instances, this a singleton.

The source directory structure of the server module is as follows:

The red box above is an embedded implementation, while the blue box below is a Netty based implementation.

It looks like there’s a bit more netty-based implementation code, but this is just an illusion. CanalServerWithNetty delegates all requests to CanalServerWithEmbedded.

CanalServerWithEmbedded has only one class embedded, because CanalServerWithEmbedded selects a specific CanalInstance based on destination to process the client request. The implementation of CanalInstance is in the Instance module, which we will examine later. Therefore, from the perspective of CanalServer, CanalServer Withembedded is the real core of the server module.

Both CanalServerWithNetty and CanalServerWithEmbedded are singletons that provide a static method instance() to get the corresponding instance. When reviewing the CanalController source code in the previous section, it is through these two static methods that the CanalServer code is prepared in the CanalController constructor to obtain the corresponding instance.

public CanalController(final Properties properties){ .... // Prepare canal Server IP = getProperty(properties, canalconstants.canal_ip); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT)); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator); / / set custom instanceGenerator canalServer = CanalServerWithNetty. The instance (); canalServer.setIp(ip); canalServer.setPort(port); . }Copy the code

CanalServer interface

The CanalServer interface inherits the CanalLifeCycle interface, primarily to redefine the start and stop methods to throw CanalServerException.

public interface CanalServer extends CanalLifeCycle {
 
    void start() throws CanalServerException;
 
    void stop() throws CanalServerException;
}
Copy the code

CanalServerWithNetty is used to accept requests from clients and delegate them to CanalServerWithEmbeded. The source code below shows the fields and constructors defined by CanalServerWithNetty

public class CanalServerWithNetty extends AbstractCanalLifeCycle implements CanalServer { // All listener client requests will be processed for CanalServerWithEmbedded private CanalServerWithEmbedded embeddedServer; // Embedded server // Listening IP address and port, through which the client communicates with the server private String IP; private int port; // Netty component private Channel serverChannel = null; private ServerBootstrap bootstrap = null; / /... Singleton pattern implementation private CanalServerWithNetty () {/ / to embeddedServer assignment enclosing embeddedServer = CanalServerWithEmbedded. The instance (); } / /... start and stop method //... setters and getters... }Copy the code

Field Description:

  • EmbeddedServer: because CanalServerWithNetty needs to delegate requests to CanalServerWithEmbeded processing, it maintains the embeddedServer object.
  • IP and port: indicates the network IP address and port monitored by netty. The client communicates with the server through this IP address and port
  • ServerChannel, Bootstrap: These are Netty apis. ServerBootstrap is used to start the server. By calling its bind method, a serverChannel object of type Channel is returned, representing the serverChannel. Knowledge of Netty is not the focus of this tutorial

The start method

The start method contains the core logic for netty startup, as shown below:

com.alibaba.otter.canal.server.netty.CanalServerWithNetty#start

public void start() { super.start(); // Start the embedded Canal Server first, since the Netty-based implementation needs to delegate requests to it to handle if (! embeddedServer.isStart()) { embeddedServer.start(); } / * create instances, the bootstrap parameter NioServerSocketChannelFactory Netty API, it accepts two parameters of the thread pool One of the first thread pool is to Accept the thread pool, the second thread pool is woker thread pool, After receiving the client connection request, the Accept thread pool forwards the object representing the client to the worker thread pool for processing. Netty uses threads to handle high concurrency requests from clients. */ this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); /* Pipeline is actually netty to the client request processor chain, can be analogous to JAVA EE programming Filter responsibility chain mode, the last Filter processing is completed after the next Filter processing, but in Netty, no longer Filter, But a ChannelHandler. */ bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipelines = Channels.pipeline(); // It is mainly used for encoding and decoding. Because the incoming stream of network traffic is binary, FixedHeaderFrameDecoder role is to parse pipelines. AddLast (FixedHeaderFrameDecoder. Class. GetName (), new FixedHeaderFrameDecoder()); / / deal with the client and server handshake pipelines. AddLast (HandshakeInitializationHandler. Class. GetName (), new HandshakeInitializationHandler()); / / client authentication pipelines. AddLast (ClientAuthenticationHandler. Class. GetName (), new ClientAuthenticationHandler(embeddedServer)); SessionHandler = new SessionHandler(embeddedServer); pipelines.addLast(SessionHandler.class.getName(), sessionHandler); return pipelines; }}); // Start, when bind is called, netty actually starts monitoring a port, If (stringutils.isnotempty (IP)) {this.serverChannel = bootstrap.bind(new InetSocketAddress(this.ip,  this.port)); } else { this.serverChannel = bootstrap.bind(new InetSocketAddress(this.port)); }}Copy the code

The stop method is nothing more than a few close operations, the code is very simple, not covered here.

SessionHandler

Obviously, canal’s core logic for handling client requests is in the SessionHandler handler. Notice that it passed in the embeddedServer object when it was instantiated, and as we mentioned earlier, CanalServerWithNetty delegates the request to CanalServerWithEmbedded processing, Obviously the SessionHandler also maintains the embeddedServer instance.

SessionHandler’s messageReceived method indicates that a client request has been received. We will look at how the SessionHandler resolves the client request. Delegate to CanalServerWithEmbedded. In order to reflect the core logic of forwarding request processing, the following code omitted a large number of source code fragments, as follows

SessionHandler#messageReceived

public class SessionHandler extends SimpleChannelHandler { .... Public void messageReceived(ChannelHandlerContext CTX, ChannelHandlerContext CTX, MessageEvent e) throws Exception { .... // Delegate to the embeddedServer to handle switch (packet.getType()) {case SUBSCRIPTION: embeddedServer.subscribe(clientIdentity); . break; Case UNSUBSCRIPTION:// Unsubscribe request... embeddedServer.unsubscribe(clientIdentity); . break; Case GET:// Obtain the binlog request.... If (get.getTimeout() == -1) {// Call different methods to get binlog message = depending on whether the client specified a request timeout time embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize()); } else { ... message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(), unit); }... break; Case CLIENTACK:// The client consumes the ACK request successfully... embeddedServer.ack(clientIdentity, ack.getBatchId()); . break; Case CLIENTROLLBACK:// Client consumption failed rollback request... if (rollback.getBatchId() == 0L) { embeddedServer.rollback(clientIdentity); } else {embeddedServer.rollback(clientIdentity, rollback. GetBatchId ()); Roll back only a single batch}... break; Error (400, messageformatter. format("packet type={} is NOT supported!") , packet.getType()) .getMessage(), ctx.getChannel(), null); break; }... }... }Copy the code

As you can see, the SessionHandler parses the client request and delegates it to the corresponding method of CanalServerWithEmbedded, depending on the request type. So the core logic is all in canal Server Withe Bedded.

CannalServerWithEmbeded

CanalServer withembedded implements CanalServer and CanalServiceCan interfaces. A Map is maintained internally. The key is Destination and the value is the corresponding CanalInstance. The Map is forwarded to the corresponding CanalInstance for processing according to the Destination parameter carried by the client request.

public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService { ... // The key is destination and the value is the corresponding CanalInstance. private Map<String, CanalInstance> canalInstances; . }Copy the code

The start and stop methods defined in the CanalServer interface are relatively simple to implement and will not be described here.

CanalServerWithEmbedded methods are called according to the type of the request message. These methods are defined in the CanalService interface as follows:

Public interface CanalService {// Subscribe void subscribe(ClientIdentity ClientIdentity) throws CanalServerException; // Unsubscribe void Unsubscribe (ClientIdentity ClientIdentity) throws CanalServerException; // Obtain data proporately and automatically ack Message Get (ClientIdentity ClientIdentity, int batchSize) throws CanalServerException; // Obtain data in batches within the timeout period. Ack Message get(ClientIdentity ClientIdentity, int batchSize, Long Timeout, TimeUnit unit) throws CanalServerException; // Obtain data in a batch without ack Message getWithoutAck(ClientIdentity ClientIdentity, int batchSize) throws CanalServerException; // Obtain data in batches within the timeout period. Message getWithoutAck(ClientIdentity ClientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException; // Ack a batch of data void ACK (ClientIdentity ClientIdentity, Long batchId) throws CanalServerException; Void rollback(ClientIdentity ClientIdentity) throws CanalServerException; // rollback all batches of data that have no ack. Void rollback(ClientIdentity ClientIdentity, Long batchId) throws CanalServerException; // rollback a batch of data. }Copy the code

A careful reader will notice that each method contains a ClientIdentity type parameter, which identifies the client.

public class ClientIdentity implements Serializable { private String destination; private short clientId; private String filter; . }Copy the code

CanalServerWithEmbedded determines which CanalInstance should handle the request based on the Destination parameter in the ClientIdentity.

Here’s a look at what each method does:

The subscribe method:

Subscribe is primarily used to handle client subscription requests. Currently, a CanalInstance can only be subscribed by one client, but can be subscribed repeatedly. The processing steps of subscription are as follows:

  • Find the corresponding CanalInstance based on the destination to which the client wants to subscribe
  • The CanalMetaManager component logs client subscriptions through this CanalInstance.
  • Gets the current subscriber Position of the client. First try to get it from CanalMetaManager, which records the location of a client’s current subscription to binlog. If you are subscribing for the first time, you will not be able to get this location, so try to get the location of the first binlog from CanalEventStore. The logic for obtaining binlog location information from CanalEventStore is as follows: Once CanalInstance is started, it will immediately pull the binlog and store it in the CanalEventStore. In the case of the first subscription, the location of the first binlog in the CanalEventStore is the location where the current client starts the current consumption.
  • Notify CanalInstance of a subscription change
/** * Client subscription, */ @override public void subscribe(ClientIdentity ClientIdentity) throws CanalServerException { checkStart(clientIdentity.getDestination()); //1. To find the corresponding CanalInstance CanalInstance CanalInstance = canalInstances. Get (clientIdentity. GetDestination ()); if (! canalInstance.getMetaManager().isStart()) { canalInstance.getMetaManager().start(); } //2. Manage metadata by CanalMetaManager The record the current CanalInstance have client in subscription CanalInstance. GetMetaManager (). The subscribe (clientIdentity); // Execute the meta subscription //3, obtain the binlog Position of the current subscription. First try to obtain the Position from the CanalMetaManager Position. = canalInstance getMetaManager (). The getCursor (clientIdentity); If (position == null) {//3.1 If it is the first subscription, try to get the location of the first binlog from CanalEventStore as the starting position of the client subscription. position = canalInstance.getEventStore().getFirstPosition(); If (position! = null) { canalInstance.getMetaManager().updateCursor(clientIdentity, position); Logger. Info ("subscribe successfully, {} with first position:{} ", clientIdentity, position); } else { logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position); } / / 4 notice next subscription canalInstance change relations subscribeChange (clientIdentity); }Copy the code

Unsubscribe method:

The unsubscribe method is primarily used to unsubscribe. Find the CanalMetaManager corresponding to CanalInstance and call unsubscribe to cancel the subscription. Note that unsubscribing does not mean stopping CanalInstance. When a client unsubscribes, a new client subscribes to the CanalInstance, so it can’t stop.

/** * Unsubscribe */ @override public void unsubscribe(ClientIdentity ClientIdentity) throws CanalServerException { CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); canalInstance.getMetaManager().unsubscribe(clientIdentity); // Execute meta subscription logger.info("unsubscribe successfully, {}", clientIdentity); }Copy the code

ListAllSubscribe method:

This management method lists all clients subscribed to a destination. This returns a List, but as we’ve mentioned several times, a destination can currently only be subscribed by one client. The reason for returning a list is that Canal originally planned to support multiple clients to subscribe to the same destination. However, this feature was never implemented. So there’s actually only one ClientIdentity in the List.

Public List<ClientIdentity> listAllSubscribe(String destination) throws CanalServerException { CanalInstance canalInstance = canalInstances.get(destination); return canalInstance.getMetaManager().listAllSubscribeInfo(destination); } /** * Query the batch list that has not been ack. */ public List<Long> listBatchIds(ClientIdentity ClientIdentity) throws CanalServerException {public List<Long> listBatchIds(ClientIdentity) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); Map<Long, PositionRange> batchs = canalInstance.getMetaManager().listAllBatchs(clientIdentity); List<Long> result = new ArrayList<Long>(batchs.keySet()); Collections.sort(result); return result; }Copy the code

GetWithoutAck method:

The getWithoutAck method is used by the client to obtain a batch of binlogs. Canal generates a unique batchId for the batch of binlogs. The client calls the ACK method to confirm the batch if it has been successfully consumed. If that fails, you can call the rollback method to rollback. The client can call the getWithoutAck method for multiple times to obtain binlogs. During the ACK, the client needs to ack binlogs in the sequence in which the binlogs are obtained. If the binlog obtained later is ack, the unack binlog message will also be automatically ack. The general working steps of the getWithoutAck method are as follows:

  • Find out which CanalInstance to get the binlog message from based on destination.
  • Determines the Position from which to continue consuming the binlog. Normally, this information is stored in the CanalMetaManager. In particular, CanalMetaManager did not store any binlog location information when it was first fetched. In this case, the first binlog location stored in CanalEventStore should be the location where the client starts consuming.
  • Get the binlog from CanalEventStore according to Position. For maximum efficiency, it is common to fetch binlogs one batch at a time rather than one. The batchSize (batchSize) is specified by the client. In addition, the client can specify a timeout period. Within the timeout period, if the binlog of batchSize is obtained, the system returns it immediately. If the number of binlogs specified by batchSize is not obtained, the system returns the value immediately. In particular, if the timeout period is not set, it is returned immediately if the binlog is not obtained.
  • Log the binlog message for this batch in CanalMetaManager. CanalMetaManager generates a unique batchId, which is incremented, for the binlog of the fetched batch. If the binlog information is empty, set the batchId to -1.
@Override public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException { return getWithoutAck(clientIdentity, batchSize, null, null); } /** * <pre> * several cases: * a. If timeout is null, use tryGet to get * B. If timeout is 0, get blocks to obtain data and does not set timeout until there is enough batchSize data. 2. If timeout is not 0, get+timeout is used to obtain data. Timeout does not have enough data for batchSize, * </pre> */ @override public Message getWithoutAck(ClientIdentity ClientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance CanalInstance = canalInstances.get(clientIdentity.getDestination()); Synchronized (canalInstance) {//2, obtain the position of the last binlog batch without ack from CanalMetaManager. PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity); CanalEventStore <Event> Events = null; if (positionRanges ! = null) {// 3.1 If CanalMetaManager gets location information, From the current position and gain binlog events = getEvents (canalInstance. GetEventStore (), positionRanges. GetStart (), batchSize, timeout, unit); } else {//3.2 If the binlog position is not obtained, From the current store first began to get the Position of the start. = canalInstance getMetaManager (). The getCursor (clientIdentity); If (start = = null) {/ / for the first time, haven't had an ack record, then gets the current store in the first start. = canalInstance getEventStore () getFirstPosition (); } / / from CanalEventStore binlog news events = getEvents (canalInstance. GetEventStore (), start, batchSize, timeout, unit); If (collectionUtils.isempty (events.getevents ())) {// if (collectionUtils.isempty (events.getevents ())) { Construct an empty Message object, Return the client logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null", new Object[] { clientIdentity.getClientId(), batchSize }); return new Message(-1, new ArrayList<Entry>()); Else {// If you get a binlog message, record the batch of binlog messages to CanalMetaMaager. And generate a unique batchId Long batchId. = canalInstance getMetaManager (). The addBatch (clientIdentity events. GetPositionRange ()); // Convert Events to Entry List<Entry> entrys = Lists. Transform (events.getevents (), new Function<Event, Entry>() { public Entry apply(Event input) { return input.getEntry(); }}); logger.info("getWithoutAck successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", clientIdentity.getClientId(), batchSize, entrys.size(), batchId, events.getPositionRange()); Return new Message(batchId, entrys); }}} /** * According to different parameters, */ Private Events<Event> getEvents(CanalEventStore eventStore, Position Start, int batchSize, Long Timeout, TimeUnit unit) { if (timeout == null) { return eventStore.tryGet(start, batchSize); } else { try { if (timeout <= 0) { return eventStore.get(start, batchSize); } else { return eventStore.get(start, batchSize, timeout, unit); } } catch (Exception e) { throw new CanalServerException(e); }}}Copy the code

Ack method:

Ack method when the client user confirms that a batch of binlog has been successfully consumed. Batch ID is confirmed. After confirmation, messages less than or equal to this batchId are acknowledged. Note: Feedback must be performed in the order of batchId ack(guaranteed by the client). You need to do the following when ack:

  • Remove the batch information from the CanalMetaManager. In the getWithoutAck method, the batch information is logged to the CanalMetaManager and removed when ack.
  • Log the binlog location that has been successfully consumed so that the next fetch can start at that location, which is logged via CanalMetaManager.
  • Remove the batch’s binlog contents from the CanalEventStore. Since it has already been consumed, there is no point in saving the binlogs that have already been consumed.
@Override public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); PositionRange<LogPosition> positionRanges = null; / / 1 from CanalMetaManager, remove the batch information positionRanges = canalInstance. GetMetaManager () removeBatch (clientIdentity, batchId); If (positionRanges == null) {rollback throw new CanalServerException(string. format(" ACK error, clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId)); } //2, record the binlog position that has been successfully consumed, so that the next fetch can start from this position. This is through the CanalMetaManager record if (positionRanges.getack ()! = null) { canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck()); logger.info("ack successfully, clientId:{} batchId:{} position:{}", clientIdentity.getClientId(), batchId, positionRanges); } / / 3, from CanalEventStore, removes the batch binlog content canalInstance. GetEventStore () an ack (positionRanges. GetEnd ()); } /** * rollback to where {@link #ack} is not performed, the next fetch, */ @override public void rollback(ClientIdentity) throws */ @override public void rollback(ClientIdentity) CanalServerException { checkStart(clientIdentity.getDestination()); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); // Because there is an automatic rollback on the first link, So you want to ignore not subscribe to the Boolean hasSubscribe = canalInstance. GetMetaManager () hasSubscribe (clientIdentity); if (! hasSubscribe) { return; } synchronized (canalInstance) {/ / clear batch information canalInstance getMetaManager () clearAllBatchs (clientIdentity); . / / rollback status information of eventStore canalInstance getEventStore (). The rollback (); logger.info("rollback successfully, clientId:{}", new Object[] { clientIdentity.getClientId() }); }} /** * rollback to a place where {@link #ack} is not performed, the next fetch, */ @override public void rollback(ClientIdentity ClientIdentity, Long batchId) throws CanalServerException { checkStart(clientIdentity.getDestination()); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); // Because there is an automatic rollback on the first link, So you want to ignore not subscribe to the Boolean hasSubscribe = canalInstance. GetMetaManager () hasSubscribe (clientIdentity); if (! hasSubscribe) { return; } synchronized (canalInstance) {// Clear batch information PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId); If (positionRanges == null) {rollback throw new CanalServerException(string. format("rollback error, clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId)); } // lastRollbackPostions.put(clientIdentity, // positionRanges.getEnd()); / / to record the location of the final rollback / / TODO subsequent rollback to a specified position batchId canalInstance. GetEventStore (). The rollback (); Logger. Info ("rollback successfully, clientId:{} batchId:{} position:{}", clientIdentity.getClientId(), batchId, positionRanges); }}Copy the code

The get method:

This is exactly the same as the main getWithoutAck process, except that the ACK is performed directly before the data is returned to the user, regardless of the success of the client consumption.

@Override public Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException { return get(clientIdentity, batchSize, null, null); } /* * several cases: * a. If timeout is null, use tryGet to get * B. If timeout is 0, get blocks to obtain data and does not set timeout until there is enough batchSize data. 2. If timeout is not 0, get+timeout is used to obtain data. Timeout does not have enough data for batchSize, */ @override public Message get(ClientIdentity ClientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException { checkStart(clientIdentity.getDestination()); checkSubscribe(clientIdentity); CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination()); Synchronized (canalInstance) {// Obtain the last obtained position in the stream data PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity); if (positionRanges ! = null) { throw new CanalServerException(String.format("clientId:%s has last batch:[%s] isn't ack , maybe loss data", clientIdentity.getClientId(), positionRanges)); } Events<Event> events = null; Position start = canalInstance.getMetaManager().getCursor(clientIdentity); events = getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit); if (CollectionUtils.isEmpty(events.getEvents())) { logger.debug("get successfully, clientId:{} batchSize:{} but result is null", new Object[] { clientIdentity.getClientId(), batchSize }); return new Message(-1, new ArrayList<Entry>()); // Return an empty package to avoid generating a batchId, Waste performance} else {/ / record to streaming information Long batchId. = canalInstance getMetaManager (). The addBatch (clientIdentity, events.getPositionRange()); List<Entry> entrys = Lists.transform(events.getEvents(), new Function<Event, Entry>() { public Entry apply(Event input) { return input.getEntry(); }}); logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", clientIdentity.getClientId(), batchSize, entrys.size(), batchId, events.getPositionRange()); // Submit ack ack(clientIdentity, batchId); return new Message(batchId, entrys); }}}Copy the code