sequence
This article mainly studies chronos MasterElection
MasterElection
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/ha/MasterElection.java
public class MasterElection {
private static final Logger SWITCH_LOGGER = LogUtils.SWITCH_LOGGER;
private static volatile ServerState state = ServerState.BACKUPING;
public static void election(final CountDownLatch cdl) {
final CuratorFramework client = ZkUtils.getCuratorClient();
final LeaderSelector selector = new LeaderSelector(client, Constants.MASTER_PATH, new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
SWITCH_LOGGER.info("take master leadership"); long seekTimestamp = MetaService.getSeekTimestamp(); long zkSeekTimestamp = MetaService.getZkSeekTimestamp(); final long sleepMs = 200; long sleepCount = 0; // If zk is lost, zkSeekTimestamp is 0, and chronos is blockedwhile (seekTimestamp < zkSeekTimestamp && zkSeekTimestamp > 0) {
SWITCH_LOGGER.info("sleep {}ms to wait seekTimestamp:{} to catch up with zkSeekTimestamp:{}",
sleepMs, seekTimestamp, zkSeekTimestamp);
TimeUnit.MILLISECONDS.sleep(sleepMs);
seekTimestamp = MetaService.getSeekTimestamp();
zkSeekTimestamp = MetaService.getZkSeekTimestamp();
sleepCount++;
}
state = ServerState.MASTERING;
SWITCH_LOGGER.info("change server state to {}, totalSleepMs:{}ms", state, sleepCount * sleepMs);
cdl.await();
state = ServerState.BACKUPING;
SWITCH_LOGGER.info("release master leadership"); }}); selector.autoRequeue(); selector.start(); } public static booleanisMaster() {
return state == ServerState.MASTERING;
}
public static boolean isBackup() {
return state == ServerState.BACKUPING;
}
public static void standAlone() {
state = ServerState.MASTERING;
}
public static ServerState getState() {
returnstate; }}Copy the code
- MasterElection provides the Election, isMaster, isBackup, standAlone, getState methods; The election method uses the Curator recipes’ LeaderSelector, Its LeaderSelectorListenerAdapter takeLeadership method, will obtain seekTimestamp zkSeekTimestamp Use the while loop until seekTimestamp is greater than or equal to zkSeekTimestamp, after which update state to ServerState.mastering, and then call the await method of CountDownLatch, Update state to ServerState.BACKUPING; Call the autoRequeue and start methods after the LeaderSelector is created
ChronosStartup
DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/ChronosStartup.java
public class ChronosStartup {
private static final Logger LOGGER = LoggerFactory.getLogger(ChronosStartup.class);
private CountDownLatch waitForShutdown;
private String configFilePath = "chronos.yaml";
private PullWorker pullWorker;
private PushWorker pushWorker;
private DeleteBgWorker deleteBgWorker;
private NettyHttpServer nettyHttpServer;
ChronosStartup(final String configFilePath) {
if (StringUtils.isNotBlank(configFilePath)) {
this.configFilePath = configFilePath;
}
}
public void start() throws Exception {
LOGGER.info("start to launch chronos...");
final long start = System.currentTimeMillis();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
LOGGER.info("start to stop chronos...");
final long start = System.currentTimeMillis();
ChronosStartup.this.stop();
final long cost = System.currentTimeMillis() - start;
LOGGER.info("succ stop chronos, cost:{}ms", cost);
} catch (Exception e) {
LOGGER.error("error while shutdown chronos, err:{}", e.getMessage(), e);
} finally {
/* shutdown log4j2 */ LogManager.shutdown(); }}}); */ * init config */ configManager.initConfig (configFilePath); /* init metrics */if(! MetricService.init()) { System.exit(-1); } /* init rocksdb */ RDB.init(ConfigManager.getConfig().getDbConfig().getDbPath()); /* init zk */ ZkUtils.init(); /* init seektimestamp */ MetaService.load();waitForShutdown = new CountDownLatch(1);
if (ConfigManager.getConfig().isStandAlone()) {
/* standalone */
MasterElection.standAlone();
} else{/* Master election */ masterelection.election (waitForShutdown);
}
/* init pull worker */
if (ConfigManager.getConfig().isPullOn()) {
pullWorker = PullWorker.getInstance();
pullWorker.start();
}
/* init push worker */
if (ConfigManager.getConfig().isPushOn()) {
pushWorker = PushWorker.getInstance();
pushWorker.start();
}
/* init delete worker */
if (ConfigManager.getConfig().isDeleteOn()) {
deleteBgWorker = DeleteBgWorker.getInstance();
deleteBgWorker.start();
}
final long cost = System.currentTimeMillis() - start;
LOGGER.info("succ start chronos, cost:{}ms", cost);
/* init http server */
nettyHttpServer = NettyHttpServer.getInstance();
nettyHttpServer.start();
waitForShutdown.await();
}
void stop() {
/* shutdown netty http server */
if(nettyHttpServer ! = null) { nettyHttpServer.shutdown(); } /* stop pull from MQ */if(pullWorker ! = null) { pullWorker.stop(); } /* stop push to MQ */if(pushWorker ! = null) { pushWorker.stop(); } /* stop delete */if(deleteBgWorker ! = null) { deleteBgWorker.stop(); } MqConsumeStatService.getInstance().stop(); /* close zk client */ ZkUtils.close(); /* close rocksdb */ RDB.close();if (waitForShutdown ! = null) {waitForShutdown.countDown();
waitForShutdown = null; }}}Copy the code
- The start method of ChronosStartup creates waitForShutdown, and masterelection. election(waitForShutdown) for non-standlone modes.
summary
MasterElection provides the Election, isMaster, isBackup, standAlone, getState methods; The election method uses the Curator recipes’ LeaderSelector, Its LeaderSelectorListenerAdapter takeLeadership method, will obtain seekTimestamp zkSeekTimestamp Use the while loop until seekTimestamp is greater than or equal to zkSeekTimestamp, after which update state to ServerState.mastering, and then call the await method of CountDownLatch, Update state to ServerState.BACKUPING; Call the autoRequeue and start methods after the LeaderSelector is created
doc
- carrera-chronos
- Talk about LeaderLatch for co-curator recipes