sequence
This article focuses on CanalMQStarter
CanalMQStarter
Canal – 1.1.4 / server/SRC/main/Java/com/alibaba/otter/canal/server/CanalMQStarter. Java
public class CanalMQStarter {
private static final Logger logger = LoggerFactory.getLogger(CanalMQStarter.class);
private volatile boolean running = false;
private ExecutorService executorService;
private CanalMQProducer canalMQProducer;
private MQProperties properties;
private CanalServerWithEmbedded canalServer;
private Map<String, CanalMQRunnable> canalMQWorks = new ConcurrentHashMap<>();
private static Thread shutdownThread = null;
public CanalMQStarter(CanalMQProducer canalMQProducer){
this.canalMQProducer = canalMQProducer;
}
public synchronized void start(MQProperties properties, String destinations) {
try {
if (running) {
return;
}
this.properties = properties;
canalMQProducer.init(properties);
// set filterTransactionEntry
if (properties.isFilterTransactionEntry()) {
System.setProperty("canal.instance.filter.transaction.entry"."true"); } canalServer = CanalServerWithEmbedded.instance(); / / for each instance starts a worker thread executorService = Executors. NewCachedThreadPool (); logger.info("## start the MQ workers.");
String[] dsts = StringUtils.split(destinations, ",");
for (String destination : dsts) {
destination = destination.trim();
CanalMQRunnable canalMQRunnable = new CanalMQRunnable(destination);
canalMQWorks.put(destination, canalMQRunnable);
executorService.execute(canalMQRunnable);
}
running = true;
logger.info("## the MQ workers is running now ......");
shutdownThread = new Thread() {
public void run() {
try {
logger.info("## stop the MQ workers");
running = false;
executorService.shutdown();
canalMQProducer.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping MQ workers:", e);
} finally {
logger.info("## canal MQ is down."); }}}; Runtime.getRuntime().addShutdownHook(shutdownThread); } catch (Throwable e) { logger.error("## Something goes wrong when starting up the canal MQ workers:", e);
}
}
public synchronized void destroy() {
running = false;
if(executorService ! = null) { executorService.shutdown(); }if(canalMQProducer ! = null) { canalMQProducer.stop(); }if(shutdownThread ! = null) { Runtime.getRuntime().removeShutdownHook(shutdownThread); shutdownThread = null; }} / /... }Copy the code
- CanalMQStarter provides the start and destroy methods. The start method using MQProperties to initialize canalMQProducer, then through CanalServerWithEmbedded. The instance () to obtain canalServer, after traversal destinations, Create canalMQRunnable, submit it to executorService, and register shutdownThread to execute executorservice.shutdown () and canalmqproducer.stop () when the JVM is shutdown. Executorservice.shutdown () and canalmqproducer.stop (), which also removes the shutdownThread from the shutdownHook of Runtime.getruntime ()
CanalMQRunnable
Canal – 1.1.4 / server/SRC/main/Java/com/alibaba/otter/canal/server/CanalMQStarter. Java
private class CanalMQRunnable implements Runnable {
private String destination;
CanalMQRunnable(String destination){
this.destination = destination;
}
private AtomicBoolean running = new AtomicBoolean(true);
@Override
public void run() {
worker(destination, running);
}
public void stop() {
running.set(false); }}Copy the code
- CanalMQRunnable implements the Runnable interface, and its run method executes worker(destination, running).
worker
Canal – 1.1.4 / server/SRC/main/Java/com/alibaba/otter/canal/server/CanalMQStarter. Java
public class CanalMQStarter {
//......
private void worker(String destination, AtomicBoolean destinationRunning) {
while(! running || ! destinationRunning.get()) { try { Thread.sleep(100); } catch (InterruptedException e) { // ignore } } logger.info("## start the MQ producer: {}.", destination);
MDC.put("destination", destination);
final ClientIdentity clientIdentity = new ClientIdentity(destination, (short) 1001, "");
while (running && destinationRunning.get()) {
try {
CanalInstance canalInstance = canalServer.getCanalInstances().get(destination);
if (canalInstance == null) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// ignore
}
continue;
}
MQProperties.CanalDestination canalDestination = new MQProperties.CanalDestination();
canalDestination.setCanalDestination(destination);
CanalMQConfig mqConfig = canalInstance.getMqConfig();
canalDestination.setTopic(mqConfig.getTopic());
canalDestination.setPartition(mqConfig.getPartition());
canalDestination.setDynamicTopic(mqConfig.getDynamicTopic());
canalDestination.setPartitionsNum(mqConfig.getPartitionsNum());
canalDestination.setPartitionHash(mqConfig.getPartitionHash());
canalServer.subscribe(clientIdentity);
logger.info("## the MQ producer: {} is running now ......", destination);
Long getTimeout = properties.getCanalGetTimeout();
int getBatchSize = properties.getCanalBatchSize();
while (running && destinationRunning.get()) {
Message message;
if(getTimeout ! = null && getTimeout > 0) { message = canalServer.getWithoutAck(clientIdentity, getBatchSize, getTimeout, TimeUnit.MILLISECONDS); }else {
message = canalServer.getWithoutAck(clientIdentity, getBatchSize);
}
final long batchId = message.getId();
try {
int size = message.isRaw() ? message.getRawEntries().size() : message.getEntries().size();
if(batchId ! = -1 && size ! = 0) { canalMQProducer.send(canalDestination, message, new CanalMQProducer.Callback() {
@Override
public void commit() { canalServer.ack(clientIdentity, batchId); } @override public voidrollback() { canalServer.rollback(clientIdentity, batchId); }}); // Send message to topic}else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// ignore
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
} catch (Exception e) {
logger.error("process error!", e); }}} / /... }Copy the code
- Create ClientIdentity worker method, and then according to the destination from canalServer. GetCanalInstances get canalInstance (), and then create canalDestination, Subscribe (clientIdentity); And while loop execution canalServer getWithoutAck pull the message, by canalMQProducer. Send to send
summary
CanalMQStarter provides the start and destroy methods. The start method using MQProperties to initialize canalMQProducer, then through CanalServerWithEmbedded. The instance () to obtain canalServer, after traversal destinations, Create canalMQRunnable, submit it to executorService, and register shutdownThread to execute executorservice.shutdown () and canalmqproducer.stop () when the JVM is shutdown. Executorservice.shutdown () and canalmqproducer.stop (), which also removes the shutdownThread from the shutdownHook of Runtime.getruntime ()
doc
- CanalMQStarter