>>>>
Github:
github.com/black-ant CASE Backup:
gitee.com/antblack/ca…
A. The preface
This article begins an in-depth study of the relevant source code of Mycat, as well as from the source code to try to analyze the customizable point and the way of extension.
You can pull the corresponding source code mycat-server from Git, and its core startup class is MycatStartup. Mycat is a domestic open source, relatively speaking, the annotation is very clear. This article is mainly about code handling and learning
Mycat project structure
|-- main | |-- java | | `-- io | | `-- mycat | | |-- MycatServer.java | | |-- MycatShutdown.java | | |-- MycatStartup. Java: start the class, will build MycatServer to start | | | - backend: results back and other processing | | | - buffer: NIO Buffer implementation, copy Netty | | | - cache | | | - catlets: Join statements such as parsing | | | - config | | | - manager | | | -- - | memory | | - migrate: automatic upgraded | | | - net: NIO connection, database connection related classes | | | - the route, the route rule processing | | | - server: TODO | | | - sqlengine: SQL actuators | | | - statistic | | ` - util | ` - resources | | -- cacheservice. Properties | | -- ehcache. XML | | -- log4j2. XML Rule. | | - XML: fragmentation rules | | - schema. The XML: node rules | | - server XML: Mysql data information | | - zkconf: Zookeeper cluster configuration | | | -- rule. XML | | | -- schema. The XML | | | -- server. XML | ` - zkdownload | ` - auto - shardinglong.txt
Copy the code
Two. Source start and pattern
Mycat’s startup class is called MycatServer for initialization
public final class MycatStartup {
public static void main(String[] args) {
// S1: If the configuration is performed using Zookeeper, initialization is performed here
ZkConfig.getInstance().initZk();
// S2: Obtain the Home path
String home = SystemConfig.getHomePath();
// S3: get MycatServer instance object
MycatServer server = MycatServer.getInstance();
// S4: bootserver.startup(); }}Copy the code
2.1 Core startup classes
private MycatServer(a) {
// Read the file configuration
this.config = new MycatConfig();
// Timed thread pool, single thread pool
scheduler = Executors.newSingleThreadScheduledExecutor();
// Heartbeat scheduling is independent from other tasks
heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
/ / SQL recorder
this.sqlRecorder = new SQLRecorder(config.getSystem().getSqlRecordCount());
/ * * * whether online, MyCat manager have command control * | offline | Change MyCat status to OFF | | * online | Change MyCat status to ON | * /
this.isOnline = new AtomicBoolean(true);
// The cache service is initialized
cacheService = new CacheService();
// Route calculation initialization
routerService = new RouteService(cacheService);
// load datanode active index from properties
dnIndexProperties = loadDnIndexProps();
try {
/ / SQL parser
sqlInterceptor = (SQLInterceptor) Class.forName(
config.getSystem().getSqlInterceptor()).newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
/ / catlet loader
catletClassLoader = new DynaClassLoader(SystemConfig.getHomePath()
+ File.separator + "catlet", config.getSystem().getCatletClassCheckSeconds());
// Record the startup time
this.startupTime = TimeUtil.currentTimeMillis();
if (isUseZkSwitch()) {
String path = ZKUtils.getZKBasePath() + "lock/dnindex.lock";
dnindexLock = newInterProcessMutex(ZKUtils.getConnection(), path); }}Copy the code
Iii. Overview of connection process
If you don’t know much about the entire source process at first, the easiest way to go deeper is to read the log, which will show us all aspects of the process.
3.1 Obtaining the Configuration
The configuration is scanned using XMLSchemaLoader and initialized using ConfigInitializer. The configuration process is as follows:
// S1: Build MycatConfig while building MycatServer
- MycatServer.getInstance()
|- this.config = new MycatConfig();
// S2: Builds the ConfigInitializer object
public MycatConfig(a) {
// Read schema. XML, rule-xml, and server.xml
ConfigInitializer confInit = new ConfigInitializer(true);
//....
}
S3: ConfigInitializer initialization process
3-1 > newXMLSchemaLoader() : Initializes the schema.xml loader to read rul.xml and schema.xml3-2 > newXMLConfigLoader(schemaLoader) : Reads the server. XML file3-3> initDataHosts: loads dataHosts3-4> initDataNodes: loads dataNodes3-5> getFirewallConfig + initCobarCluster: load permission configuration// -------------------
// Add: load schema.xml -> XMLSchemaLoader- preparenewXMLRuleLoader (ruleFile), if not configured | - will use the default configurationprivate final static String DEFAULT_XML = "/rule.xml";
|- private final static String DEFAULT_XML = "/schema.xml"; - getResourceAsStream load flow and parsing a HashMap | -private final Map<String, TableRuleConfig> tableRules;
|- private final Map<String, AbstractPartitionAlgorithm> functions;
// Add: Server load -> XMLConfigLoader
C- XMLServerLoader # load
|- XMLServerLoader.class.getResourceAsStream("/server.xml") : access to the Resource flow | - ConfigUtil. GetDocument: access to the Document | - loadSystem: loading System tags | - loadUsers: Loading User tags | - ClusterConfig: loading the cluster configuration | - loadFirewall: load the firewallCopy the code
3.2 Establishing a Connection
Mycat Connect = PhysicalDatasource # createNewConnection ThreadPoolExecutor is used to create a MySQLConnection. The process for creating a MySQLConnection is as follows:
// S1: server.startup() Loads the application at startup- startup to do a lot of things, including the initial chemical class, initialize the Connection pool, initialize the AIO processor, etc. - this link is only focus on Mysql Connection Connection < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S2: initializes datahost
Map<String, PhysicalDBPool> dataHosts = config.getDataHosts()
for (PhysicalDBPool node : dataHosts.values()) {
// Build the database mapping index
String index = dnIndexProperties.getProperty(node.getHostName(), "0");
// Initialize the node
node.init(Integer.parseInt(index));
// Establish a heartbeat association
node.startHeartbeat()
}
<---------------------------------->
S3: PhysicalDBPool initializes the Source. This initializes the connection pool
- ds.getConnection(this.schemas[i % schemas.length], true, getConHandler, null) < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S4: PhysicalDatasource # getConnection: Obtain the connection- increamentCount. LongValue () + totalConnectionCount: to obtain the maximum number of connections - createNewConnection (handler, attachment, schema) : If the number of connections is smaller than the maximum number of connections, and create a connection < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S5: create a connection from a thread pool
MycatServer.getInstance().getBusinessExecutor().execute(....)
<---------------------------------->
// S6: pool create connection -> MySQLConnectionFactory
- MySQLConnectionFactory -> make -> new MySQLConnection -> create Socket
<---------------------------------->
// 补充 : InetSocketAddressIf it is asynchronous processing, will call AsynchronousSocketChannel InetSocketAddress processingCopy the code
4. Query process overview
We have seen the relationship between Mycat and the physical library above, so let’s look at the overall relationship in the whole query process
4.1 Navicat connection
The connection between Mycat and the outside is handled by NIOReactor. NIO really has a very wide range of applications in various frameworks, so I don’t care about the specific implementation here
// S1: MycatServer # startup
- NIOReactorPool reactorPool = new NIOReactorPool
- connector = new NIOConnector
- ((NIOConnector) connector).start()
1-1Create Manager connection ->9066
manager = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME
+ "Manager", system.getBindIp(), system.getManagerPort(), mf, reactorPool);
1-2Set up a Server connection ->8066
server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME
+ "Server", system.getBindIp(), system.getServerPort(), sf, reactorPool);
<---------------------------------->
// S2: Create NIOReactor with NIOReactorPool
for (int i = 0; i < poolSize; i++) {
NIOReactor reactor = new NIOReactor(name + "-"+ i); reactor.startup(); } < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S3: NIOReactor processes requests in an infinite loop
public void run(a) {
for (;;) {
/ /... Handle the request}}Copy the code
4.2 Mycat Query process
MySQL query flow is concentrated in MySQLConnection’s parent class, AbstractConnection # onReadData, with a select statement to view the flow
// The overall query process-frontendConnection # handle: Connection establishes a connection. -frontendCommandHandler # handle: -serverQueryHandler # query: ServerConnection # execute: -singlenodeHandler # execute: -mysqlConnection # MySQLConnection -mysqlConnection # synAndDoExecute -mysqlConnection # sendQueryCmd: Send the SQL Query commandCopy the code
Main Query Process
// S1: NIOReactor receives requests and reads asynchronously
- con.asynRead()
// S2: Intermediate data conversion and processing
<---------------------------------->
// S3: FrontendCommandHandler Distributes commands
public void handle(byte[] data){
// Select the fourth byte for Switch
switch (data[4]) {// Based on final byte in MySQLPacket
// As you can see, it is processed by the corresponding Handler- ServerParse.SELECT -> SelectHandler.handle - ServerParse.SHOW -> ShowHandler.handle - ServerParse.SET -> SetHandler.handle - ServerParse.START -> StartHandler.handle - .... }}// Take Select as an example
<---------------------------------->
// S4: ServerConnection for Route and ExcuteRouteResultset = MycatServer.getInstance().getRouterservice().route(...) session.execute(rrs, rrs.isSelectForUpdate()? ServerParse. UPDATE: type) : the Session to execute queries < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S5: MultiNodeQueryHandler
for (final RouteResultsetNode node : rrs.getNodes()){
// For loop For processing
PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
dn.getConnection(dn.getDatabase(), autocommit, node, this, node); } < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S6: PhysicalDBNode # getConnection The connection pool
dbPool.getRWBanlanceCon(schema,autoCommit, handler, attachment, this.database);
<---------------------------------->
// S7: PhysicalDatasource Obtains the Source resource
BackendConnection con = this.conmap. tryTakeCon(schema, autocommit) : obtain the established connection from Map takeCon(con, handler, Attachment, Schema) : Binding corresponds to the front-end request handler < -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- >// S8: MySQLConnection processing
xaTXID = sc.getSession2().getXaTXID()+", ""+getSchema()+"'"Build transactionID
synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(a),autocommit, sc.isTxReadonly(a), sc.getSqlSelectLimit(a)) : execute asynchronously <---------------------------------->// S9: MySQLConnection # synAndDoExecute- Process: Build CommandPacket used to send Command - Execute: sendQueryCmd to send CommandCopy the code
Integration of the data
The data set was queried above, and the results of the query are aggregated below
// S1: handle in MySQLConnectionHandler
// S2: MySQLConnectionHandler # handleData- Analyzes the resultStatus, including three categories and four methodsprivate static final int RESULT_STATUS_INIT = 0;
private static final int RESULT_STATUS_HEADER = 1;
private static final int RESULT_STATUS_FIELD_EOF = 2;
OkPacket > byte FIELD_COUNT = 0x00;
RequestFilePacket > byte FIELD_COUNT = (byte) 0xff;
ErrorPacket > byte FIELD_COUNT = (byte) 251;
EOFPacket > byte FIELD_COUNT = (byte) 0xfe;
MultiNodeQueryHandler # fieldEofResponse
// S4: DataMergeService # addPack
- server.getBusinessExecutor().execute(this) : puts itself into the thread pool as Runnable - run: The thread pool calls the run command by default to execute the thread -for- result.get(pack.datanode). Add (row) : add result to map-response result: source.write(source.writeToBuffer(eof, buffer))Copy the code
PS: The whole process is quite complicated, so I don’t need to spend a lot of time in the early stage
conclusion
This article studied and understood from the startup of Mycat to a complete query. In the subsequent articles, part of the core functions will be studied step by step. At the same time, problems in the production process will be analyzed and customized points will be extended
Reference documentation
Distributed Database Architecture – Mycat Middleware