定时任务与异步处理
本文介绍 BPMAX 插件中的定时任务、批处理、队列、重试和并发控制设计,适用于自动化和后台处理型插件。
学习目标
- 学会定义插件定时任务
- 理解何时应该使用异步处理而不是同步接口
- 掌握重试、幂等和并发控制的基本设计方法
适用场景
定时任务和异步处理通常适用于以下场景:
- 定时同步第三方数据
- 周期性扫描待处理对象
- 批量上报统计和指标
- 长耗时任务拆分执行
定时任务结构
定时任务配置
定时任务通常通过一个独立配置文件定义。核心字段一般包括:
cron或intervalenablehandle
一个最小配置示例如下:
export default {
reportDailyStats: {
cron: '0 */15 * * * *',
enable: true,
handle: '/api/plugin_example/job/reportDailyStats',
},
};cron 与 interval
cron适合明确的时间表达式interval适合固定频率轮询
优先选哪种,取决于业务语义是否需要“整点、每 15 分钟、每天凌晨”这类明确时间点。
enable 条件
enable 不建议始终写死为 true。通常要结合环境判断,例如:
- 只在自动化环境启用
- 只在非开发环境启用
- 依赖某些配置存在时才启用
handle 执行入口
建议统一走一个明确的执行入口,便于:
- 手工调试
- 查看日志
- 权限控制
控制器驱动的定时任务
通过接口统一入口
推荐让定时任务最终落到一个明确接口或服务入口,而不是把复杂逻辑直接写在配置文件里。
例如可以定义一个专门的任务控制器:
import BaseRest from '../rest.js';
@think.RestController()
export default class extends BaseRest {
async reportDailyStatsAction() {
const result = await this.service('plugin_example_job').reportDailyStats();
return this.success(result);
}
}便于人工调试
如果定时任务和手工调用共用同一个入口,就可以在问题排查时直接人工触发,降低调试成本。
与权限控制的关系
如果入口是可访问接口,需要明确:
- 是否对外开放
- 是否仅允许管理员或内部环境调用
异步处理设计
同步接口与后台任务的边界
以下情况更适合异步处理:
- 调用链长
- 第三方响应慢
- 单次处理数据量大
- 失败后需要重试
同步接口更适合:
- 轻量即时操作
- 用户强依赖即时结果的交互
队列缓存
如果任务可能堆积,建议增加队列或缓存层,用于:
- 解耦写入和执行
- 缓冲瞬时高峰
- 记录待处理状态
例如先把待处理对象写入队列:
await this.redis.lpush(
'plugin_example:pending_tasks',
JSON.stringify({
object_id: projectId,
retry_count: 0,
})
);批量处理
批量处理要控制:
- 单批大小
- 超时时间
- 失败回滚策略
不要为了“快”而一次吞下全部数据。
批处理服务可以先写成:
export default class PluginExampleJobService extends think.Service {
async reportDailyStats() {
const batch = await this.redis.lrange('plugin_example:pending_tasks', 0, 49);
for (const item of batch) {
const payload = JSON.parse(item);
await this.handleSingleTask(payload);
}
return {
size: batch.length,
};
}
}重试策略
重试要区分错误类型:
- 临时网络错误可重试
- 配置错误不应盲目重试
- 数据格式错误通常应直接失败并报警
并发与幂等
锁与防抖
定时任务最常见的问题是重复执行。常见做法包括:
- 分布式锁
- 防抖时间窗
- 执行计数控制
例如可以先用 Redis 锁限制并发:
const lockKey = 'plugin_example:job_lock';
const locked = await this.redis.set(lockKey, '1', 'EX', 300, 'NX');
if (!locked) {
return;
}
try {
await this.reportDailyStats();
} finally {
await this.redis.del(lockKey);
}幂等键设计
如果任务会重复投递或重试,建议设计幂等键,避免同一对象被重复处理。
例如:
const idempotentKey = `plugin_example:task:${projectId}:${action}`;
const exists = await this.redis.get(idempotentKey);
if (exists) {
return;
}
await this.redis.set(idempotentKey, '1', 'EX', 86400);重复执行保护
至少要能回答两个问题:
- 同一个任务在 1 分钟内重复执行会怎样
- 同一个对象被重复消费会怎样
典型案例
催办类插件
适合定期扫描待处理数据,并按规则触发提醒或批量动作。
资源上报类插件
适合先本地收集,再批量发送,配合重试和清理策略。
常见问题
定时任务未执行
enable条件不满足- 调度表达式配置错误
- 执行入口不可用
重复执行导致重复发送
- 没有并发保护
- 没有幂等键
- 重试策略与重复保护冲突
失败重试放大问题
- 重试次数过高
- 单批过大
- 失败后未区分可重试和不可重试错误
