目录
本文概览:介绍了调度平台和执行器通过RPC进行交互的流程。
选用版本:版本 v2.1.0
1 RPC
调度平台在XxlJobScheduler初始化Adminbiz Server和 Executorbiz Client;执行器在XxxlJobExecutor中初始化Adminbiz Client和 Executorbiz Server。
- ExecutorBiz Server。直接使用调度平台的服务作为Http Server。
- ExecutorBiz Client。调度平台新建一个XxlRpcReferenceBean,通过一个NettyClient发送给执行器信息:ExecutorBiz类名称、method信息,执行器拿到这个信息之后,执行相应ExecutorBiz的method。这个是一个RPC的流程。
- AdminBiz Server。使用netty实现一个Http Servrer。
- AdminBiz Client。执行器新建一个XxlRpcReferenceBean,通过一个NettyClient发送给调度平台信息:AdminBiz类名称、method信息,调度平台拿到这个信息之后,执行相应AdminBiz的method。这个是一个RPC的流程。
1.1 调度平台端的RPC-XxlJobScheduler
1.1.1 AdminBiz服务端
服务端AdminBiz是怎么接受客户端的RPC的请求?没有单独定义AdminBiz sever,直接使用调度平台服务作为server。
STEP1: 调度平台提供JobApiController#api接口,接受所有uri为/api的请求。
STEP2:XxlJobScheduler.invokeAdminService
STEP3: 通过ServletServerHandler来实现
- 将htppservletRequest转换成XxlRpcRequest
- 根据XxlRpcRequest找到中类名和函数名,使用反射调用真正AdminBiz的实现类。
1.1.2 ExcutorBiz的客户端
以查看日志为例
STEP1 通过XxlJobScheduler.getExecutorBiz获取ExcutorBiz的动态代理对象
1 2 3 4 5 6 7 |
@RequestMapping("/logDetailCat") @ResponseBody public ReturnT<LogResult> logDetailCat(String executorAddress, long triggerTime, long logId, int fromLineNum) { try { ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(executorAddress); ReturnT<LogResult> logResult = executorBiz.log(triggerTime, logId, fromLineNum); } |
STEP2 获取ExcutorBiz的动态代理对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
#### XxlJobSchedule ##### public static ExecutorBiz getExecutorBiz(String address) throws Exception { // valid if (address==null || address.trim().length()==0) { return null; } // load-cache address = address.trim(); ExecutorBiz executorBiz = executorBizRepository.get(address); if (executorBiz != null) { return executorBiz; } // set-cache executorBiz = (ExecutorBiz) new XxlRpcReferenceBean( NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC, LoadBalance.ROUND, ExecutorBiz.class, null, 3000, address, XxlJobAdminConfig.getAdminConfig().getAccessToken(), null, null).getObject(); executorBizRepository.put(address, executorBiz); return executorBiz; } |
STEP3 动态代理对象XxlRpcReferenceBean作为一个RPC的client来执行远程调用RPC服务端。
1.2 执行器端的RPC-XxxlJobExecutor
1.2.1 AdminBiz客户端
XxxlJobExecutor通过XxlRpcReferenceBean构建AdminBiz的客户端,通过XxxlJobExecutor#getAdminBizList来获取AdminBiz的Client,调用调用平台的AdMinBiz的Server。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
#### XxxlJobExecutor ### private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { serializer = Serializer.SerializeEnum.HESSIAN.getSerializer(); if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { if (address!=null && address.trim().length()>0) { String addressUrl = address.concat(AdminBiz.MAPPING); AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean( NetEnum.NETTY_HTTP, serializer, CallType.SYNC, LoadBalance.ROUND, AdminBiz.class, null, 3000, addressUrl, accessToken, null, null ).getObject(); if (adminBizList == null) { adminBizList = new ArrayList<AdminBiz>(); } adminBizList.add(adminBiz); } } } } |
1.2.2 ExecutorBiz的服务端
通过XxlJobScheduler定义ExecutorBiz的Http Server,接受调度平台的ExecutorBiz Client的请求,比如调度任务的操作ExecutorBiz#run(triggerParm)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception { // init, provider factory String address = IpUtil.getIpPort(ip, port); Map<String, String> serviceRegistryParam = new HashMap<String, String>(); serviceRegistryParam.put("appName", appName); serviceRegistryParam.put("address", address); xxlRpcProviderFactory = new XxlRpcProviderFactory(); // 定义HTTP server xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam); // add services xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl()); // start xxlRpcProviderFactory.start(); } |
2 系统交互
调度平台和客户端功能如下:
(1)调度平台
- 注册中心,提供心跳功能
- 触发任务逻辑。
(2)客户端ExecutorBiz
- 客户端调用服务端心跳接口
- 服务端发送触发任务信息,在客户端映射为@JobHandler,然后返回触发成功。在客户端通过JobTread触发所有的任务@JobHadnerl
- 任务执行完成之后通知服务端,返回执行成功。客户端执行完成任务,通过TriggerCallbackThread将执行完成任务的结果发动给服务端
2.1 注册中心
1、注册信息的数据表
- xxl_job_registry,单实例的信息,如心跳信息
- xxl_job_group,每个服务注册实例list
2、执行器启动注册线程ExecutorRegistryThread:每30秒向注册表请求一次,更新执行器心跳信息。
3、调度中心启动线程JobRegistryMonitorHelper:每30秒检测一次xxl_job_registry,将超过90秒还没有收到心跳的实例信息从xxl_job_registry中删除,并更新xxl_job_group服务的实例列表信息。
2.2 调度平台触发任务流程
2.2.1 调度平台端
1、XxxJobScheduler入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public void afterPropertiesSet() throws Exception { // init i18n initI18n(); // 注册中心的线程 JobRegistryMonitorHelper.getInstance().start(); // admin monitor run JobFailMonitorHelper.getInstance().start(); // admin-server initRpcProvider(); // 扫描待触发任务,触发任务 JobScheduleHelper.getInstance().start(); logger.info(">>>>>>>>> init xxl-job admin success."); } |
2、JobScheduleHelper 扫描5s待触发的任务,加入到ring中或者直接执行。
这个环抽象了一个分钟秒盘,比如现在是14:15:10,即此时在环的10这个位置,那么执行14:15:10(对应环的10)和14:15:11 (对应环的11位置)的任务。
3、JobTriggerPoolHelper#trigger 触发任务
4、XxlJobTrigger#trigger
- 通过路由规则选择执行器实例
- 通过XxlJobScheduler获取RPC对象ExecutorBiz(类似于一个Dubble对象),执行触发任务
1 2 3 4 5 6 7 8 |
##### XxlJobTrigger ####### public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address) { // ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); runResult = executorBiz.run(triggerParam); } |
新建一个XxlRpcReferenceBean,通过一个NettyClient发送给客户端信息:ExecutorBiz类名称、method信息,客户端拿到这个信息之后,执行相应ExecutorBiz的method。这个是一个RPC的流程。
1 2 3 4 5 6 7 8 9 10 11 |
// request XxlRpcRequest xxlRpcRequest = new XxlRpcRequest(); xxlRpcRequest.setRequestId(UUID.randomUUID().toString()); xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis()); xxlRpcRequest.setAccessToken(accessToken); xxlRpcRequest.setClassName(className); xxlRpcRequest.setMethodName(methodName); xxlRpcRequest.setParameterTypes(parameterTypes); xxlRpcRequest.setParameters(parameters); |
2.2.2 执行器端
1、JobThread在队列为空时如何处理?当30次*3s=90s空闲,线程就被中断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
########### JobThread.java ############ while(!toStop){ // 空闲次数 idelTimes++ triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { 执行任务 ...... } else { // 超过30次,中断线程。 if (idleTimes > 30) { XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } |
2、JobThread维护一个阻塞队列LinedBlockingQueue的原因是可能一个任务被触发多次。这个时候只有一个JobThread来进行处理,同时只有一次调用被处理,其他调用放置在队列中。
2.3 执行器初始化
1、XxlJobSpringExecutor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
### XxlJobSpringExecutor #### public void start() throws Exception { //初始化@JobHandler initJobHandlerRepository(applicationContext); // refresh GlueFactory GlueFactory.refreshInstance(1); // super start super.start(); } |
2、XxxlJobExecutor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public void start() throws Exception { // 初始化日志路径 // private static String logBasePath = "/data/applogs/xxl-job/jobhandler"; XxlJobFileAppender.initLogPath(this.logPath); // 初始化注册中心列表 (把注册地址放到 List) this.initAdminBizList(this.adminAddresses, this.accessToken); // 启动日志文件清理线程 (一天清理一次) // 每天清理一次过期日志,配置参数必须大于3才有效 JobLogFileCleanThread.getInstance().start((long)this.logRetentionDays); // 开启触发器回调线程 TriggerCallbackThread.getInstance().start(); // 指定端口 this.port = this.port > 0 ? this.port : NetUtil.findAvailablePort(9999); // 指定IP this.ip = this.ip != null && this.ip.trim().length() > 0 ? this.ip : IpUtil.getIp(); // 初始化RPC 将执行器注册到调度中心 30秒一次 this.initRpcProvider(this.ip, this.port, this.appName, this.accessToken); } |
3 数据表
xxl_job_group :服务的注册信息。
xxl_job_registry :服务注册的每个实例信息,包括心跳。
xxl_job_info:任务信息
xxl_job_lock: 分布式锁
xxl_job_log :执行log
xxl_job_logglue:shell脚本
xxl_job_user :用户和用户权限