【RuoYi-Eggjs】:基于 Bull Queue 的企业级定时任务调度系统

本文介绍 RuoYi-Eggjs 中基于 Bull Queue 构建的企业级定时任务调度系统,涵盖动态任务管理、分布式执行、失败重试、日志监控等核心功能的设计与实现。

一、引言:为什么需要强大的定时任务系统?

在企业级应用中,定时任务是不可或缺的基础设施:

RuoYi-Eggjs 基于 Bull Queue[1] 构建了一套完整的定时任务调度系统,具备以下企业级特性:

二、系统架构设计

整体架构


    
    
    
  ┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   管理端 Web     │    │   Bull Queue    │    │   任务执行器     │
│                 │    │                 │    │                 │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │ 任务管理API  │ │───▶│ │ Redis 队列   │ │───▶│ │ ryTask 处理器│ │
│ │ - 增删改查   │ │    │ │ - 任务调度   │ │    │ │ - 任务执行   │ │
│ │ - 启停控制   │ │    │ │ - 失败重试   │ │    │ │ - 结果记录   │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         ▼                       ▼                       ▼
┌─────────────────────────────────────────────────────────────────┐
│                     数据存储层                                   │
│  ┌─────────────────┐            ┌─────────────────────────────┐ │
│  │     MySQL       │            │          Redis              │ │
│  │ - sys_job 表    │            │ - 队列数据                   │ │
│  │ - sys_job_log表 │            │ - 分布式锁                   │ │
│  └─────────────────┘            └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

核心组件

组件职责实现文件
JobService任务管理、调度控制app/service/monitor/job.js
Bull Queue分布式任务队列app/queue/ryTask.js
CronUtilsCron 表达式工具app/util/cronUtils.js
JobLog任务执行日志app/service/monitor/jobLog.js
RyTask任务执行器app/service/ryTask.js

三、Cron 表达式处理

智能 Cron 工具类

系统内置了强大的 Cron 表达式处理工具,支持表达式验证、下次执行时间计算等功能:


    
    
    
  // app/util/cronUtils.js
const
 parser = require('cron-parser');

class
 CronUtils {
  // 验证 cron 表达式是否有效

  static
 isValid(cronExpression) {
    try
 {
      parser.parseExpression(cronExpression);
      return
 true;
    } catch (err) {
      return
 false;
    }
  }

  // 获取下次执行时间

  static
 getNextExecution(cronExpression) {
    try
 {
      const
 interval = parser.parseExpression(cronExpression);
      return
 interval.next().toDate();
    } catch (err) {
      return
 null;
    }
  }
}

常用 Cron 表达式

系统预置了常用的 Cron 表达式模板:


    
    
    
  static get EXAMPLES() {
  return
 {
    EVERY_MINUTE
: '0 * * * * *',           // 每分钟执行
    EVERY_HOUR
: '0 0 * * * *',             // 每小时执行
    EVERY_DAY
: '0 0 0 * * *',              // 每天 0 点执行
    WORKDAY_MORNING
: '0 0 9 * * 1-5',      // 工作日早上 9 点
    EVERY_WEEK
: '0 0 0 * * 0',             // 每周日 0 点执行
  };
}

四、Bull Queue 队列处理

队列初始化

基于 Redis 创建分布式任务队列:


    
    
    
  // app/queue/ryTask.js
const
 Queue = require('bull');

module
.exports = app => {
  const
 queue = new Queue('ryTask', {
    redis
: {
      port
: app.config.bull.client.port,
      host
: app.config.bull.client.host,
      password
: app.config.bull.client.password,
      db
: app.config.bull.client.db,
    },
  });

  // 配置任务处理器

  queue.process(async (job) => {
    const
 ctx = app.createAnonymousContext();
    const
 { invokeTarget, jobInfo } = job.data;
    
    // 执行任务逻辑...

  });

  return
 queue;
};

任务执行流程

每个任务的执行都经过完整的生命周期管理:


    
    
    
  queue.process(async (job) => {
  const
 startTime = Date.now();
  let
 status = '0'; // 0-成功 1-失败
  let
 jobMessage = '';
  let
 exceptionInfo = '';

  try
 {
    // 1. 记录开始执行

    ctx.logger.info(`[Bull] 开始执行任务: ${jobInfo.jobName}`);
    
    // 2. 调用任务执行器

    const
 result = await ctx.service.ryTask.execute(invokeTarget);
    
    jobMessage = result.message || '任务执行成功';
    ctx.logger.info(`[Bull] 任务执行成功: ${jobInfo.jobName}`);
  } catch (err) {
    status = '1';
    jobMessage = '任务执行失败';
    exceptionInfo = err.message;
    
    ctx.logger.error(`[Bull] 任务执行失败: ${jobInfo.jobName}`, err);
    throw
 err; // 抛出错误,让 Bull 处理重试
  } finally {
    const
 duration = Date.now() - startTime;
    
    // 3. 记录执行日志

    await
 ctx.service.monitor.jobLog.insertJobLog({
      jobName
: jobInfo.jobName,
      jobGroup
: jobInfo.jobGroup,
      invokeTarget
: invokeTarget,
      jobMessage
: `${jobMessage} (耗时: ${duration}ms)`,
      status,
      exceptionInfo
: exceptionInfo.substring(0, 2000),
      createTime
: ctx.helper.formatDate(new Date(startTime)),
    });
  }
});

五、动态任务管理

创建定时任务

系统支持动态创建定时任务,任务信息存储在数据库中:


    
    
    
  // app/service/monitor/job.js
async
 createBullJob(job) {
  const
 { app, ctx } = this;
  
  try
 {
    // 使用 jobId + invokeTarget 作为唯一标识

    const
 uniqueId = `${job.jobId}:${job.invokeTarget}`;
    
    // 清理可能存在的旧任务

    await
 this.cleanOldRepeatableJobs(job);
    
    // 添加新的重复任务

    await
 app.queue.ryTask.add(
      {
        invokeTarget
: job.invokeTarget,
        jobInfo
: {
          jobId
: job.jobId,
          jobName
: job.jobName,
          jobGroup
: job.jobGroup,
          uniqueId,
        },
      },
      {
        jobId
: uniqueId,
        repeat
: {
          cron
: job.cronExpression,
          key
: uniqueId,
        },
        removeOnComplete
: true,
        removeOnFail
: 100,
      }
    );
    
    ctx.logger.info(`[Bull] 创建定时任务成功: ${job.jobName}`);
    return
 true;
  } catch (err) {
    ctx.logger.error(`[Bull] 创建定时任务失败: ${job.jobName}`, err);
    return
 false;
  }
}

任务状态控制

支持任务的启动、暂停、删除等状态控制:


    
    
    
  // 暂停任务
async
 pauseBullJob(job) {
  return
 await this.deleteBullJob(job);
}

// 恢复任务

async
 resumeBullJob(job) {
  return
 await this.createBullJob(job);
}

// 立即执行任务

async
 runBullJob(job) {
  await
 app.queue.ryTask.add(
    {
      invokeTarget
: job.invokeTarget,
      jobInfo
: job,
    },
    {
      removeOnComplete
: true,
      priority
: 1, // 高优先级立即执行
    }
  );
  
  return
 true;
}

六、任务执行与监控

任务执行日志

系统完整记录每次任务执行的详细日志:


    
    
    
  // app/service/monitor/jobLog.js
async
 insertJobLog(jobLog) {
  const
 { ctx } = this;
  
  try
 {
    const
 mapper = ctx.helper.getDB(ctx).sysJobLogMapper;
    
    // 设置日志ID和创建时间

    jobLog.jobLogId = ctx.helper.generateId();
    jobLog.createTime = jobLog.createTime || ctx.helper.formatDate(new Date());
    
    return
 await mapper.insertJobLog([], jobLog);
  } catch (err) {
    ctx.logger.error('插入任务执行日志失败:', err);
    throw
 err;
  }
}

日志查询与统计

提供丰富的日志查询功能,支持按任务名、执行状态、时间范围等条件筛选:


    
    
    
  async selectJobLogPage(params = {}) {
  const
 { ctx } = this;
  const
 mapper = ctx.helper.getDB(ctx).sysJobLogMapper;

  return
 await ctx.helper.pageQuery(
    mapper.selectJobLogListMapper([], params),
    params,
    mapper.db()
  );
}

七、企业级特性

1. 分布式支持

基于 Redis 实现分布式任务调度,支持多实例部署:

2. 失败重试机制

Bull Queue 内置强大的失败重试功能:


    
    
    
  // 配置重试策略
{
  attempts
: 3,                    // 重试次数
  backoff
: {
    type
: 'exponential',          // 指数退避
    delay
: 2000,                  // 初始延迟
  },
  removeOnFail
: 100,              // 保留失败任务数量
}

3. 性能监控

系统提供完整的性能监控指标:

4. 安全控制

严格的任务执行安全控制:


    
    
    
  // 任务安全校验
checkJobSecurity
(job) {
  const
 securityCheck = { pass: true, message: '' };
  
  // 检查调用目标是否安全

  if
 (job.invokeTarget && !job.invokeTarget.match(/^[\w\.]+(\(.*\))?$/)) {
    securityCheck.pass = false;
    securityCheck.message = '调用目标格式不正确';
  }
  
  return
 securityCheck;
}

八、最佳实践

1. 任务设计原则

2. Cron 表达式优化


    
    
    
  // 推荐的 Cron 表达式
'0 0 2 * * *'
          // 每天凌晨2点(避开高峰期)
'0 0 9-17 * * 1-5'
     // 工作日工作时间
'0 */5 * * * *'
        // 每5分钟(适度频率)

// 避免的表达式

'* * * * * *'
          // 每秒执行(过于频繁)
'0 0 0 * * *'
          // 午夜12点(高峰期)

3. 监控告警

建议配置任务执行监控告警:


    
    
    
  // 监控失败任务
queue.on('failed', (job, err) => {
  // 发送告警通知

  notificationService.sendAlert({
    title
: '任务执行失败',
    content
: `任务 ${job.data.jobInfo.jobName} 执行失败: ${err.message}`,
    level
: 'error'
  });
});

九、总结

RuoYi-Eggjs 的定时任务调度系统通过以下技术栈实现了企业级的任务管理能力:

核心优势

  1. 1. 技术先进:基于 Bull Queue + Redis 的成熟方案
  2. 2. 功能完整:涵盖任务的全生命周期管理
  3. 3. 高可用性:分布式部署 + 失败重试保证系统稳定
  4. 4. 易于使用:直观的管理界面 + 丰富的 API

引用链接

[1] Bull Queue: https://github.com/OptimalBits/bull
[2] RuoYi-Eggjs: https://github.com/undsky/ruoyi-eggjs
[3] RuoYi-Eggjs 文档: https://www.undsky.com/blog/?category=RuoYi-Eggjs#