>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE Backup: 👉 gitee.com/antblack/ca…
A preface.
We are going to spend three pages looking at xxL-job concepts. This one will focus on the client-side request process.
Use two.
The usage is mainly divided into two parts:
- Implement execute by inheriting IJobHandler interface
- Annotate @jobHandler (value = “testJobHandler”) interface
@JobHandler(value = "testJobHandler")
@Component
@Service
public class TestHandleService extends IJobHandler {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public ReturnT<String> execute(String s) throws Exception {
logger.info("param is :{}", s);
return null; }}Copy the code
Server Configuration
3. The source code
In the Spring family, XxlJobSpringExecutor is used as its executor:
3.1 Initialization Operations
XxlJobSpringExecutor implements notification through ApplicationContextAware, which starts with a method defined in the configuration class:
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor(a) {
// Set AdminAddresses, AppName, IP, etc
}
Copy the code
3.1.1 XxlJobSpringExecutor # start
// C- XxlJobSpringExecutor
public void start(a) throws Exception {
// init JobHandler Repository -> 3.2 JobHandler scan
initJobHandlerRepository(applicationContext);
// refresh GlueFactory
// Create a GlueFactory or SpringGlueFactory using type type
GlueFactory.refreshInstance(1);
// super start, call xxljobexecutor.start
super.start();
}
Copy the code
3.1.2 XxlJobExecutor # start section:
// C- XxlJobExecutor
public void start(a) throws Exception {
// init logpath
// This includes preparing logBasePath and glueSrcPath and creating the path mkdir
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
// Step 1: adminaddresses.trim ().split(",") -> Separate clusters with commas
// Step 2: New XxlRpcReferenceBean to build AdminBiz
// Step 3: Add to List
adminBizList
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
// Create a separate thread to process the log file
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread -> See below
TriggerCallbackThread.getInstance().start();
// init executor-server
// Provide a default port for 9999 and set the IP
port = port>0? port: NetUtil.findAvailablePort(9999); ip = (ip! =null&&ip.trim().length()>0)? ip: IpUtil.getIp();//
initRpcProvider(ip, port, appName, accessToken);
}
Copy the code
3.1.3 TriggerCallbackThread callback processing
Create: where 2 threads are created:
- triggerCallbackThread
- triggerRetryCallbackThread
public void start(a) {
triggerCallbackThread = new Thread(new Runnable() {
@Override
public void run(a) {
// while loop until end signal
while(! toStop){// Contains a core object: ReturnT
executeResult
// executeResult = code + msg + content
HandleCallbackParam callback = getInstance().callBackQueue.take();
if(callback ! =null) {
// callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
// LinkedBlockingQueue<HandleCallbackParam> callBackQueue
// The number of data to be fetched from the queue at one time
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// callback, will retry if error
if(callbackParamList! =null && callbackParamList.size()>0) {
/ / call the callbackdoCallback(callbackParamList); }}}// last callback
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if(callbackParamList! =null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory."); }}); triggerCallbackThread.setDaemon(true);
triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
triggerCallbackThread.start();
// retry
triggerRetryCallbackThread = new Thread(new Runnable() {
@Override
public void run(a) {
while(! toStop){ retryFailCallbackFile(); TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory."); }}); triggerRetryCallbackThread.setDaemon(true);
triggerRetryCallbackThread.start();
}
Copy the code
doCallback
private void doCallback(List<HandleCallbackParam> callbackParamList){
boolean callbackRet = false;
// callback, will retry if error
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
// Callback initiates an RPC remote call
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if(callbackResult! =null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
callbackRet = true;
break;
} else {
// Create a callback log
callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:"+ callbackResult); }}if (!callbackRet) {
appendFailCallbackFile(callbackParamList);
}
}
Copy the code
3.1.4 initRpcProvider Registers the service
// C- XxlJobExecutor
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
// init, provider factory
String address = IpUtil.getIpPort(ip, port);
// Prepare the remote call properties
Map<String, String> serviceRegistryParam = new HashMap<String, String>();
serviceRegistryParam.put("appName", appName);
serviceRegistryParam.put("address", address);
// initialize configuration information, including using NettyHttpServer, serialization methods, and ExecutorServiceRegistry
// NetEnum includes four common types:
// - NettyServer / NettyClient
// - NettyHttpServer / NettyHttpClient
// - MinaServer / MinaClient
// - JettyServer / JettyClient
xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
// add services -> Add Map
serviceData
,>
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null.new ExecutorBizImpl());
// start -> Service registration
xxlRpcProviderFactory.start();
}
Copy the code
Remote registration Service
// C-
public void start(a) throws Exception {
// start server
serviceAddress = IpUtil.getIpPort(this.ip, port);
server = netType.serverClass.newInstance();
server.setStartedCallback(new BaseCallback() { // serviceRegistry started
@Override
public void run(a) throws Exception {
// start registry
if(serviceRegistryClass ! =null) {
// Build serviceRegistry for registration
serviceRegistry = serviceRegistryClass.newInstance();
// ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
ExecutorServiceRegistry -> ExecutorServiceRegistry
serviceRegistry.start(serviceRegistryParam);
if (serviceData.size() > 0) { serviceRegistry.registry(serviceData.keySet(), serviceAddress); }}}});// Register to stop the callback
server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
@Override
public void run(a) {
// stop registry
if(serviceRegistry ! =null) {
if (serviceData.size() > 0) {
// Remove and stop
serviceRegistry.remove(serviceData.keySet(), serviceAddress);
}
// registryThread.interrupt();
// registryThread.join();
serviceRegistry.stop();
serviceRegistry = null; }}});// enable thread processing
server.start(this);
}
Copy the code
Server and ServiceRegistry class structures
3.2 Scanning of annotations
All jobhandlers are scanned in XxlJobSpringExecutor# initJobHandlerRepository
private void initJobHandlerRepository(ApplicationContext applicationContext){
// Use the applicationContext tool to get all classes annotated with jobHandler
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
if(serviceBeanMap! =null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
// Why do Job tasks need to implement the IJobHandler interface
if (serviceBean instanceof IJobHandler){
// Get the name information carried by the annotation
String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
IJobHandler handler = (IJobHandler) serviceBean;
// jobHandlerRepository cannot contain jobs with the same name
if(loadJobHandler(name) ! =null) {
throw new RuntimeException("xxl-job jobhandler naming conflicts.");
}
// Add Handler to ConcurrentMapregistJobHandler(name, handler); }}}}JobHandlerRepository is actually a Map
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
return jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
Copy the code
3.3 Process processing
This section looks at the details of how @JobHandler is currently registered with Admin, and its main processing class is ExecutorRegistryThread
// C- ExecutorServiceRegistry
ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
// C- ExecutorRegistryThread
public void start(final String appName, final String address){
/ / if appName and XxlJobExecutor getAdminBizList () is null, the return directly
registryThread = new Thread(new Runnable() {
@Override
public void run(a) {
// registry is an infinite loop
while(! toStop) {try {
// Prepare the RegistryParam object for registration processing
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
// Initiate process registration
// client.asyncSend(finalAddress, xxlRpcRequest)
// TODO: the RPC call of xxl-job is also a large module
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if(registryResult! =null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}".new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}".newObject[]{registryParam, registryResult}); }}}catch (Exception e) {
// Prints exception logs
}
// Wait automatically in the while loop
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
// Registry remove, which contains two main things- AdminBiz adminBiz: XxlJobExecutor.getAdminBizList() - ReturnT<String> registryResult = adminBiz.registryRemove(registryParam); - registryResult = ReturnT.SUCCESS; }}); registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
}
Copy the code
3.4 Invocation of JobThread
TODO: I’ll leave the logic of client-side callbacks behind for the next article
conclusion
Xxl-job is a very easy to use task scheduling platform. In addition to the comprehensive functions it provides, the code and structure are very clear.
- The deployment registers the Service to the server. There are several ways to register the service
- After execution, the server is called back via callback