定时任务接入步骤说明
2023年1月14日大约 4 分钟
定时任务接入步骤说明
步骤说明
步骤1.定义任务类型
当需要接入一个新的任务类型时,可以自定义一个新的任务类型(taskType)
步骤2.定义任务执行类型
以此参数定义定时任务的执行方式。
步骤3.定义任务执行体
此处为长文本的类型,可存储自定义的json数据。以此获取定时任务执行时需要的参数。
步骤4.定义任务处理类型
以此参数定义定时任务最终需要达成什么动作。
步骤5.定义任务处理执行体
此处为长文本的类型,可存储自定义的json数据。以此获取定时任务最终处理时需要的参数。
步骤6.实现公共处理接口并编写业务逻辑
在代码中编写一个任务类型的实现类实现TaskCommService,编写定时任务处理逻辑。
提示
尚不清楚参数内容,请移步这里
步骤例子
以数据预警为例
{
"openState":0,
"taskName":"生产环境定时任务测试",
"taskType":"dataAlert",
"businessCode":"pro",
"cornConfig":"0 */1 * * * ?",
"executeType":"sql",
"executeStructure":"select * from sys_base_user where id = 1",
"processingType":"message",
"processingStructure":"{\r\n \"messageType\":[\r\n \"inMail\",\r\n \"enterpriseMailbox\",\r\n \"dingding\",\r\n \"weixin\"\r\n ],\r\n \"userIds\":[\r\n \"111111\",\r\n \"222222\"\r\n ],\r\n \"messageContent\":\"这是消息内容\"\r\n,,\r\n \"messageTitle\":\"消息标题\"\r\n}",
}- 定义了一个
dataAlert的任务类型(taskType)。 - 定义了
sql的定时任务执行方式(executeType)。 - 在定时任务执行体(
executeStructure)中写了select * from sys_base_user where id = 1的sql执行内容。 - 定义了
message的处理类型(processingType),用以发送消息提醒。 - 定义了消息处理体(
processingStructure)的json字符串。
{
// 消息通知方式
"messageType":[
"inMail", // 站内信
"enterpriseMailbox",
"dingding", // 钉钉
"email" // 邮件
],
// 接收人
"userIds":[
"111111",
"222222"
],
// 消息内容
"messageContent":"这是消息内容",
// 消息标题
"messageTitle":"这是消息标题"
}- 编写了DataAlertTaskImpl实现了TaskCommonService,并在execute中编写了业务逻辑。

实际运用代码
public JSONObject disposeEarlyTask(MeterEarlyWarning earlyWarning,MeterConfig meterConfig) {
try {
if(V.isEmpty(earlyWarning.getRemindType())){
throw new BusinessException("预警的提醒频率类型为空!");
}
else if(earlyWarning.getWarningState() != 1){
throw new BusinessException("预警的提醒未启动!");
}
JSONObject scheduledTask = new JSONObject();
// 生成业务编码
String businessCode = UUID.randomUUID().toString();
scheduledTask.put("businessCode",businessCode);
earlyWarning.setWarningFlag(businessCode);
// 解析cron表达式
Cron cron = new Cron();
cron.setJobType(earlyWarning.getRemindType());
switch (earlyWarning.getRemindType()){
case 1:
cron.setDayOfMonths(earlyWarning.getRemindTime().split(","));
break;
case 2:
cron.setDayOfMonths(earlyWarning.getRemindTime().split(","));
break;
default:
throw new BusinessException("未找到相关提醒频率类型!");
}
// cron表达式
String cronExpression = CustomCronUtil.createCronExpression(cron);
scheduledTask.put("cornConfig",cronExpression);
// 获取判断sql
// 获取表单
WfFormInfo formInfo = meterCommonFun.getFormData(meterConfig.getFormId());
String tableName = formInfo.getTableName();
// 获取数据源
String dataSourceName = meterCommonFun.getDataSource(meterConfig.getSourceType());
// 创建动态sql
DynamicSql dynamicSql = new DynamicSql();
dynamicSql.setTableName(tableName);
dynamicSql.setDataScore(dataSourceName);
dynamicSql.setFormKey(formInfo.getFormKey());
// 获取字段
FormField formField = earlyWarning.getWarningCondition();
// 设置查询字段
String field = S.toSnakeCase(formField.getField());
// 处理指标计算函数
if(V.notEmpty(formField) && V.notEmpty(formField.getFun()) && V.notEmpty(formField.getCriteria())){
// 处理后的指标函数
String funStr = meterCommonFun.funDispose(dynamicSql.getTableName(),formField.getFun(),field);
dynamicSql.getSelectMap().put(funStr,field);
// 处理条件
String conEx = meterCommonFun.criteriaData(formField.getCriteria(),formField.getValues());
// 拼接查询参数
StringBuilder paramSql = new StringBuilder();
paramSql.append("funStr").append(" ").append(conEx);
dynamicSql.getSelectMap().put(paramSql.toString(),"state");
dynamicSql.setSubQuery(false);
// 判断的sql语句
String sql = meterCommonFun.joinSql(dynamicSql);
scheduledTask.put("executeType","sql");
scheduledTask.put("executeStructure",sql);
}
JSONObject message = new JSONObject();
// 消息标题
message.put("messageTitle","数据预警");
String remindWay = earlyWarning.getRemindWay();
if(S.isNotEmpty(remindWay)){
// 提醒方式
List<String> ways = Arrays.asList(remindWay.split(","));
message.put("messageType",ways);
}
// 消息内容(仪表盘名称+图表名称)
StringBuilder builder = new StringBuilder();
String messageContent = builder.append("[").append(earlyWarning.getPanelName()).append("]")
.append("仪表盘中,").append("[").append(meterConfig.getMeterTitle()).append("]")
.append("的数据触发预警。").toString();
message.put("messageContent",messageContent);
// 提醒对象的用户id集查询
JsonResult jsonResult = workflowEngineService.getTargetUsers(earlyWarning.getRemindObjectJoin());
if(jsonResult.getCode() != 0){
log.error("提醒对象数据处理返回数据:{}", JSONUtil.toJsonStr(jsonResult));
throw new BusinessException("获取处理后的提醒对象数据失败!");
}
List<String> ids = JSONArray.parseArray(JSONUtil.toJsonStr(jsonResult.getData()),String.class);
message.put("userIds",ids);
// 任务处理类型-消息
scheduledTask.put("processingType","message");
// 任务处理体
scheduledTask.put("processingStructure",message.toJSONString());
// 任务名称
scheduledTask.put("taskName",earlyWarning.getPanelName() + "-" + meterConfig.getMeterTitle());
// 任务类型-消息预警
scheduledTask.put("taskType","dataAlert");
JSONObject taskData = earlyWarning.getTaskData();
// 存在id为修改
if(Objects.nonNull(taskData) && taskData.containsKey("id")){
Long id = Long.valueOf(taskData.get("id").toString());
try {
JsonResult result = scheduledTaskFeignService.updateEntityMapping(id, scheduledTask);
if(Objects.nonNull(result) && result.isOK()){
taskData = JSONObject.parseObject(JSONUtil.toJsonStr(result.getData()));
earlyWarning.setTaskData(taskData);
}
}catch (Exception e){
log.error("远程调用修改定时任务失败,原因:",e);
}
}else {
// 新增
try {
JsonResult result = scheduledTaskFeignService.createEntityMapping(scheduledTask);
if(Objects.nonNull(result) && result.isOK()){
taskData = JSONObject.parseObject(JSONUtil.toJsonStr(result.getData()));
earlyWarning.setTaskData(taskData);
}
}catch (Exception e){
log.error("远程调用创建定时任务失败,原因:",e);
}
}
return JSONObject.parseObject(JSONUtil.toJsonStr(earlyWarning));
}catch (BusinessException b){
log.error("预警任务处理失败!失败信息:",b);
throw b;
}
catch (Exception e){
log.error("预警任务处理错误!错误信息:",e);
throw new BusinessException("预警任务处理错误!请联系管理员!");
}
}