>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE Backup: 👉 gitee.com/antblack/ca…

A. The preface

This article begins an in-depth study of the relevant source code of Mycat, as well as from the source code to try to analyze the customizable point and the way of extension.

You can pull the corresponding source code mycat-server from Git, and its core startup class is MycatStartup. Mycat is a domestic open source, relatively speaking, the annotation is very clear. This article is mainly about code handling and learning

Mycat project structure

|-- main | |-- java | | `-- io | | `-- mycat | | |-- MycatServer.java | | |-- MycatShutdown.java | | |-- MycatStartup. Java: start the class, will build MycatServer to start | | | - backend: results back and other processing | | | - buffer: NIO Buffer implementation, copy Netty | | | - cache | | | - catlets: Join statements such as parsing | | | - config | | | - manager | | | -- - | memory | | - migrate: automatic upgraded | | | - net: NIO connection, database connection related classes | | | - the route, the route rule processing | | | - server: TODO | | | - sqlengine: SQL actuators | | | - statistic | | ` - util | ` - resources | | -- cacheservice. Properties | | -- ehcache. XML | | -- log4j2. XML Rule. | | - XML: fragmentation rules | | - schema. The XML: node rules | | - server XML: Mysql data information | | - zkconf: Zookeeper cluster configuration | | | -- rule. XML | | | -- schema. The XML | | | -- server. XML | ` - zkdownload | ` - auto - shardinglong.txt


Copy the code

Two. Source start and pattern

Mycat’s startup class is called MycatServer for initialization

public final class MycatStartup {

    public static void main(String[] args) {
        
        // S1: If the configuration is performed using Zookeeper, initialization is performed here
        ZkConfig.getInstance().initZk();
        
        // S2: Obtain the Home path
        String home = SystemConfig.getHomePath();
        
        // S3: get MycatServer instance object
        MycatServer server = MycatServer.getInstance();

        // S4: bootserver.startup(); }}Copy the code

2.1 Core startup classes

private MycatServer(a) {

    // Read the file configuration
    this.config = new MycatConfig();

    // Timed thread pool, single thread pool
    scheduler = Executors.newSingleThreadScheduledExecutor();

    // Heartbeat scheduling is independent from other tasks
    heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();

    / / SQL recorder
    this.sqlRecorder = new SQLRecorder(config.getSystem().getSqlRecordCount());

    / * * * whether online, MyCat manager have command control * | offline | Change MyCat status to OFF | | * online | Change MyCat status to ON | * /
    this.isOnline = new AtomicBoolean(true);

    // The cache service is initialized
    cacheService = new CacheService();

    // Route calculation initialization
    routerService = new RouteService(cacheService);

    // load datanode active index from properties
    dnIndexProperties = loadDnIndexProps();
    try {
        / / SQL parser
        sqlInterceptor = (SQLInterceptor) Class.forName(
                config.getSystem().getSqlInterceptor()).newInstance();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }

    / / catlet loader
    catletClassLoader = new DynaClassLoader(SystemConfig.getHomePath()
            + File.separator + "catlet", config.getSystem().getCatletClassCheckSeconds());

    // Record the startup time
    this.startupTime = TimeUtil.currentTimeMillis();
    if (isUseZkSwitch()) {
        String path = ZKUtils.getZKBasePath() + "lock/dnindex.lock";
        dnindexLock = newInterProcessMutex(ZKUtils.getConnection(), path); }}Copy the code

Iii. Overview of connection process

If you don’t know much about the entire source process at first, the easiest way to go deeper is to read the log, which will show us all aspects of the process.

3.1 Obtaining the Configuration

The configuration is scanned using XMLSchemaLoader and initialized using ConfigInitializer. The configuration process is as follows:

// S1: Build MycatConfig while building MycatServer
- MycatServer.getInstance()
    |- this.config = new MycatConfig();

// S2: Builds the ConfigInitializer object
public MycatConfig(a) {
    // Read schema. XML, rule-xml, and server.xml
    ConfigInitializer confInit = new ConfigInitializer(true);
    //....
}    

S3: ConfigInitializer initialization process
3-1 > newXMLSchemaLoader() : Initializes the schema.xml loader to read rul.xml and schema.xml3-2 > newXMLConfigLoader(schemaLoader) : Reads the server. XML file3-3> initDataHosts: loads dataHosts3-4> initDataNodes: loads dataNodes3-5> getFirewallConfig + initCobarCluster: load permission configuration// -------------------

// Add: load schema.xml -> XMLSchemaLoader- preparenewXMLRuleLoader (ruleFile), if not configured | - will use the default configurationprivate final static String DEFAULT_XML = "/rule.xml";
    |- private final static String DEFAULT_XML = "/schema.xml"; - getResourceAsStream load flow and parsing a HashMap | -private final Map<String, TableRuleConfig> tableRules;
    |- private final Map<String, AbstractPartitionAlgorithm> functions;
    
    
// Add: Server load -> XMLConfigLoader
C- XMLServerLoader # load
    |- XMLServerLoader.class.getResourceAsStream("/server.xml") : access to the Resource flow | - ConfigUtil. GetDocument: access to the Document | - loadSystem: loading System tags | - loadUsers: Loading User tags | - ClusterConfig: loading the cluster configuration | - loadFirewall: load the firewallCopy the code

3.2 Establishing a Connection

Mycat Connect = PhysicalDatasource # createNewConnection ThreadPoolExecutor is used to create a MySQLConnection. The process for creating a MySQLConnection is as follows:

// S1: server.startup() Loads the application at startup- startup to do a lot of things, including the initial chemical class, initialize the Connection pool, initialize the AIO processor, etc. - this link is only focus on Mysql Connection Connection < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S2: initializes datahost
Map<String, PhysicalDBPool> dataHosts = config.getDataHosts()
for (PhysicalDBPool node : dataHosts.values()) {
    // Build the database mapping index
    String index = dnIndexProperties.getProperty(node.getHostName(), "0");
    // Initialize the node
    node.init(Integer.parseInt(index));
    // Establish a heartbeat association
    node.startHeartbeat()
}

<---------------------------------->

S3: PhysicalDBPool initializes the Source. This initializes the connection pool
- ds.getConnection(this.schemas[i % schemas.length], true, getConHandler, null) < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S4: PhysicalDatasource # getConnection: Obtain the connection- increamentCount. LongValue () + totalConnectionCount: to obtain the maximum number of connections - createNewConnection (handler, attachment, schema) : If the number of connections is smaller than the maximum number of connections, and create a connection < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S5: create a connection from a thread pool
MycatServer.getInstance().getBusinessExecutor().execute(....)

<---------------------------------->

// S6: pool create connection -> MySQLConnectionFactory
- MySQLConnectionFactory -> make -> new MySQLConnection -> create Socket

<---------------------------------->

// 补充 : InetSocketAddressIf it is asynchronous processing, will call AsynchronousSocketChannel InetSocketAddress processingCopy the code

4. Query process overview

We have seen the relationship between Mycat and the physical library above, so let’s look at the overall relationship in the whole query process

4.1 Navicat connection

The connection between Mycat and the outside is handled by NIOReactor. NIO really has a very wide range of applications in various frameworks, so I don’t care about the specific implementation here

// S1: MycatServer # startup
- NIOReactorPool reactorPool = new NIOReactorPool
- connector = new NIOConnector
- ((NIOConnector) connector).start()

1-1Create Manager connection ->9066
manager = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME
                    + "Manager", system.getBindIp(), system.getManagerPort(), mf, reactorPool);
                    
1-2Set up a Server connection ->8066 
server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME
                    + "Server", system.getBindIp(), system.getServerPort(), sf, reactorPool);


<---------------------------------->

// S2: Create NIOReactor with NIOReactorPool
for (int i = 0; i < poolSize; i++) {
    NIOReactor reactor = new NIOReactor(name + "-"+ i); reactor.startup(); } < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S3: NIOReactor processes requests in an infinite loop
public void run(a) {
    for (;;) {
        / /... Handle the request}}Copy the code

4.2 Mycat Query process

MySQL query flow is concentrated in MySQLConnection’s parent class, AbstractConnection # onReadData, with a select statement to view the flow


// The overall query process-frontendConnection # handle: Connection establishes a connection. -frontendCommandHandler # handle: -serverQueryHandler # query: ServerConnection # execute: -singlenodeHandler # execute: -mysqlConnection # MySQLConnection -mysqlConnection # synAndDoExecute -mysqlConnection # sendQueryCmd: Send the SQL Query commandCopy the code

Main Query Process


// S1: NIOReactor receives requests and reads asynchronously
- con.asynRead()

// S2: Intermediate data conversion and processing

<---------------------------------->

// S3: FrontendCommandHandler Distributes commands
public void handle(byte[] data){
    // Select the fourth byte for Switch
    switch (data[4]) {// Based on final byte in MySQLPacket
        // As you can see, it is processed by the corresponding Handler- ServerParse.SELECT -> SelectHandler.handle - ServerParse.SHOW -> ShowHandler.handle - ServerParse.SET -> SetHandler.handle - ServerParse.START -> StartHandler.handle - .... }}// Take Select as an example
<---------------------------------->

// S4: ServerConnection for Route and ExcuteRouteResultset = MycatServer.getInstance().getRouterservice().route(...) session.execute(rrs, rrs.isSelectForUpdate()? ServerParse. UPDATE: type) : the Session to execute queries < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S5: MultiNodeQueryHandler
for (final RouteResultsetNode node : rrs.getNodes()){
    // For loop For processing
    PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
    dn.getConnection(dn.getDatabase(), autocommit, node, this, node); } < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S6: PhysicalDBNode # getConnection The connection pool
dbPool.getRWBanlanceCon(schema,autoCommit, handler, attachment, this.database);

<---------------------------------->

// S7: PhysicalDatasource Obtains the Source resource
BackendConnection con = this.conmap. tryTakeCon(schema, autocommit) : obtain the established connection from Map takeCon(con, handler, Attachment, Schema) : Binding corresponds to the front-end request handler < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S8: MySQLConnection processing
xaTXID = sc.getSession2().getXaTXID()+", ""+getSchema()+"'"Build transactionID
synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(a),autocommit, sc.isTxReadonly(a), sc.getSqlSelectLimit(a)) : execute asynchronously <---------------------------------->// S9: MySQLConnection # synAndDoExecute- Process: Build CommandPacket used to send Command - Execute: sendQueryCmd to send CommandCopy the code

Integration of the data

The data set was queried above, and the results of the query are aggregated below

// S1: handle in MySQLConnectionHandler

// S2: MySQLConnectionHandler # handleData- Analyzes the resultStatus, including three categories and four methodsprivate static final int RESULT_STATUS_INIT = 0;
private static final int RESULT_STATUS_HEADER = 1;
private static final int RESULT_STATUS_FIELD_EOF = 2;

OkPacket > byte FIELD_COUNT = 0x00;
RequestFilePacket > byte FIELD_COUNT = (byte) 0xff;
ErrorPacket > byte FIELD_COUNT = (byte) 251;
EOFPacket > byte FIELD_COUNT = (byte) 0xfe;

MultiNodeQueryHandler # fieldEofResponse


// S4: DataMergeService # addPack
- server.getBusinessExecutor().execute(this) : puts itself into the thread pool as Runnable - run: The thread pool calls the run command by default to execute the thread -for- result.get(pack.datanode). Add (row) : add result to map-response result: source.write(source.writeToBuffer(eof, buffer))Copy the code

PS: The whole process is quite complicated, so I don’t need to spend a lot of time in the early stage

conclusion

This article studied and understood from the startup of Mycat to a complete query. In the subsequent articles, part of the core functions will be studied step by step. At the same time, problems in the production process will be analyzed and customized points will be extended

Reference documentation

Distributed Database Architecture – Mycat Middleware