Example executor
Here is sample code for integrating actuators in different ways provided by XXL-job:
Of course, the most commonly used method is Springboot integration. Here we use this example to study the startup process of xxL-Job client executor.
Start the process
Start process the entry in the client actuators XxlJobSpringExecutor class, by implementing the spring extension SmartInitializingSingleton, When the IOC singleton Bean loaded method is called afterSingletonsInstantiated () :
@Override
public void afterSingletonsInstantiated(a) {
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw newRuntimeException(e); }}Copy the code
Bean type task resolution
First of all, let us analysis under initJobHandlerMethodRepository (applicationContext) method, the method
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
// init job handler from method
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false.true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, XxlJob> annotatedMethods = null; / / referred to: org. Springframework. Context. Event. EventListenerMethodProcessor. ProcessBean
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
returnAnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class); }}); }catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue; }... }}Copy the code
The logic is to go through the Bean in the IoC container, get the Method annotated with @xxlJob, and finally return the Map
result type. Key is the Method annotated with @xxlJob, and value is the @xxlJob annotated with the Method.
@xxlJob @xxlJob @xxlJob @xxlJob @xxlJob
/ / traverse
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method method = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}
// Get the @xxlJob annotation value configuration
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "].");
}
// Check whether the same name is loaded, if there is an exception thrown
if(loadJobHandler(name) ! =null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
// @xxlJob Annotation method parameter verification: there must be only one parameter, and the parameter type is String, otherwise an exception will be thrown
if(! (method.getParameterTypes().length ==1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "]." + "The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
// @xxlJob Annotation method return value validation, must return type ReturnT
if(! method.getReturnType().isAssignableFrom(ReturnT.class)) {throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "]." + "The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
method.setAccessible(true);
// init and destory
Method initMethod = null;
Method destroyMethod = null;
// parse @xxlJob annotation init configuration
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "]."); }}// parse @xxlJob annotation destroy configuration
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "]."); }}// Encapsulate the @xxlJob annotations Method, initMethod, and destroyMethod as MethodJobHandler and place them in the Map to complete the registration with key= @xxljob. Value
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
}
Copy the code
GlueFactory initialization
GlueFactory processes GLUE(Java) jobs, compels and invokes the source code of GLUE(Java) jobs, and supports Spring dependency injection, such as @Autowired, @Resource, and @Qualifier.
The executor startup process
XxlJobExecutor#start:
public void start(a) throws Exception {
// Initialize the log path
XxlJobFileAppender.initLogPath(logPath);
// Initializes AdminBizClient for remote interaction with admin
initAdminBizList(adminAddresses, accessToken);
// Initialize the log clearing thread to clear expired log files in the log directory
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// Initialize the callback thread, triggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}
Copy the code
Here the main is behind the two statements, TriggerCallbackThread. GetInstance (). The start (); The main startup is used for the callback to pass the results to the admin module after the job is executed, as detailed in the next section analyzing the Client executor Job Execution Process. initEmbedServer(address, ip, port, appname, accessToken);
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port
port = port>0? port: NetUtil.findAvailablePort(9999); ip = (ip! =null&&ip.trim().length()>0)? ip: IpUtil.getIp();// generate address
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port);
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
Copy the code
EmbedServer#start internally creates a Thread and starts it:
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run(a) {... }}); thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
Copy the code
Let’s take a look at what’s going on inside this thread:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0.200.60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-"+ r.hashCode()); }},new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); }});try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0.0.30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
}
Copy the code
A lot of code above, in fact, the logic is very simple, mainly do two things:
- use
netty
Initialization starts onehttp server
, mainly used for receivingThe admin module
To send instructions to the executor, such as executing job instructions, kill job instructions, the main processing logic encapsulated inEmbedHttpServerHandler
; startRegistry(appname, address)
Example Start the scheduled direction of the client actuatorThe admin module
Register thread, logic code inExecutorRegistryThread#start
Of the methods, it is relatively simple;
xxl-job
The client executor registration process is roughly as follows:
1. The client sends registration information to the Admin module using adminbiz.registry (registryParam) periodic cycle;
2, the admin module receives the client after the registration information, insert | update xxl_job_registry table update_time field time value;
3. The admin module starts the thread in JobRegistryMonitorHelper to scan the xxL_job_registry table periodically, removes the timeout, and joins the online instance collection together to update the executor address_list field whose executor address is automatically registered.
conclusion
Xxl-job client executor startup process is relatively simple, the core of the main two points:
- using
netty
Start ahttp server
Container, and willIP:PORT
Use registration information to bringadmin
Module, like thisadmin
You can give the executor to run the job, kill the job and other instructions; - Actuator timing (default 30 seconds) to
admin
Register once, whenadmin
If no actuator registration information is received within 90 seconds, the actuator is considered to have timed out and removed offline.