Appearance
消息队列开发指南
目录
系统概述
消息队列系统用于异步处理业务任务,支持多种队列驱动(数据库、Redis、RabbitMQ),提供统一的生产者/消费者接口,确保任务处理的可靠性、幂等性和高并发安全。
核心特性
- ✅ 多驱动支持:数据库(DB)、Redis List、Redis Stream、RabbitMQ
- ✅ 幂等性保证:通过
biz_key唯一标识,防止重复处理 - ✅ 延迟任务:支持延迟执行(
delay_sec) - ✅ 优先级控制:支持任务优先级排序
- ✅ 自动重试:失败任务自动重试,支持指数退避
- ✅ 并发安全:原子抢占机制,防止多进程重复消费
- ✅ 统一存储:所有任务统一入库
sys_task_queue表,便于监控和管理 - ✅ 事务统一管理:事务由
QueueConsumer层统一管理,确保业务逻辑和状态更新的一致性
目录结构
app/deshang/queue/
├── core/ # 核心类
│ ├── QueueProducer.php # 生产者(统一入队入口)
│ └── QueueConsumer.php # 消费者(定时任务消费)
├── driver/ # 驱动实现
│ ├── DbDriver.php # 数据库驱动
│ ├── RedisListDriver.php # Redis List 驱动
│ ├── RedisStreamDriver.php # Redis Stream 驱动
│ └── RabbitMqDriver.php # RabbitMQ 驱动
├── handler/ # 任务处理器
│ ├── QueueHandlerInterface.php # 处理器接口
│ ├── order/ # 订单相关处理器
│ │ ├── OrderGenerateSalesIncQueue.php # 订单生成销量增加
│ │ ├── OrderCancelSalesDecQueue.php # 订单取消销量减少
│ │ ├── OrderCloseSalesDecQueue.php # 订单关闭销量减少
│ │ ├── OrderPayUserPointsQueue.php # 订单支付增加积分
│ │ ├── OrderCancelUserPointsQueue.php # 订单取消扣除积分
│ │ ├── OrderPayUserGrowthQueue.php # 订单支付增加成长值
│ │ └── OrderCancelUserGrowthQueue.php # 订单取消扣除成长值
│ ├── goods/ # 商品相关处理器
│ └── user/ # 用户相关处理器
└── config.php # 队列配置文件架构设计
工作流程
业务代码
↓
QueueProducer::enqueue([...]) // 批量入队接口
↓
1. 写入 sys_task_queue 表(status=0,payload 字段存储 JSON)
↓
2. 根据驱动类型推送队列(DB 驱动跳过此步)
↓
定时任务 /crontab/queue/consume
↓
QueueConsumer::consume()
↓
1. Driver::popBatch() 原子抢占任务(status 0→3)
↓
2. 根据 queue_type 路由到对应 Handler(通过 config('queue.handlers') 映射)
↓
3. 开启事务 → Handler::handle() 执行业务逻辑 → markSuccess() 更新状态 → 提交事务
↓
4. 如果失败:回滚事务 → handleFailure() 重试/标记失败关键组件
1. QueueProducer(生产者)
统一的生产者入口,封装底层驱动差异,业务代码只需调用 enqueue() 方法。
职责:
- 统一批量入队接口(单个任务也传入数组)
- 写入
sys_task_queue表(所有任务) - 根据驱动类型推送到队列(非 DB 驱动)
重要特性:
- 统一使用批量接口,简化代码维护
- 数据库已有唯一约束,
biz_key重复时会自动抛出异常 - 外部调用已有事务时,内部不重复开启事务
2. QueueConsumer(消费者)
队列消费者,由定时任务唤起,批量处理任务。
职责:
- 从驱动批量获取任务(原子抢占)
- 根据
queue_type路由到对应处理器 - 统一事务管理:在
processTask中开启事务,确保业务逻辑和状态更新的一致性 - 处理成功/失败后的状态回写
- 失败重试机制(指数退避)
事务管理:
- 事务由
QueueConsumer层统一管理 - Handler 不需要自己管理事务(无需
Db::startTrans()) - 如果 Handler 抛出异常,事务会自动回滚
3. Driver(驱动)
底层队列驱动实现,支持多种队列系统。
接口方法:
push(array $payload, int $delaySec = 0): void- 推送任务popBatch(int $max = 200): array- 批量获取任务(返回 JSON 字符串数组)migrateDueDelay(int $max = 500): void- 迁移到期延迟任务
4. Handler(处理器)
具体业务处理逻辑,实现 QueueHandlerInterface 接口。
接口方法:
handle(array $params): void- 执行处理逻辑(无需supports()方法)
重要说明:
- 路由通过
config('queue.handlers')配置映射完成,无需实现supports()方法 - 事务由
QueueConsumer统一管理,Handler 无需自己管理事务 - Handler 只需专注业务逻辑,异常会自动被捕获并回滚
配置说明
配置文件
配置文件位置:app/deshang/queue/config.php
php
<?php
return [
// 任务类型 => 处理器类(需实现 \app\deshang\queue\handler\QueueHandlerInterface)
'handlers' => [
// 订单相关队列处理器
'OrderGenerateSalesIncQueue' => \app\deshang\queue\handler\order\OrderGenerateSalesIncQueue::class,
'OrderCancelSalesDecQueue' => \app\deshang\queue\handler\order\OrderCancelSalesDecQueue::class,
'OrderCloseSalesDecQueue' => \app\deshang\queue\handler\order\OrderCloseSalesDecQueue::class,
// 用户积分相关队列处理器
'OrderPayUserPointsQueue' => \app\deshang\queue\handler\order\OrderPayUserPointsQueue::class,
'OrderCancelUserPointsQueue' => \app\deshang\queue\handler\order\OrderCancelUserPointsQueue::class,
// 用户成长值相关队列处理器
'OrderPayUserGrowthQueue' => \app\deshang\queue\handler\order\OrderPayUserGrowthQueue::class,
'OrderCancelUserGrowthQueue' => \app\deshang\queue\handler\order\OrderCancelUserGrowthQueue::class,
],
// 默认驱动(可选:redis_list | redis_stream | rabbitmq | db)
'driver' => 'db',
];配置项说明
handlers(处理器映射)
格式: '任务类型' => 处理器类全路径
说明:
- 键名(任务类型)必须与
QueueProducer::enqueue()的type参数完全一致 - 值必须是实现了
QueueHandlerInterface的类 - 处理器类无需实现
supports()方法,路由通过配置映射完成
示例:
php
'handlers' => [
'OrderGenerateSalesIncQueue' => \app\deshang\queue\handler\order\OrderGenerateSalesIncQueue::class,
'OrderCancelSalesDecQueue' => \app\deshang\queue\handler\order\OrderCancelSalesDecQueue::class,
],driver(队列驱动)
可选值:
db- 数据库驱动(默认,推荐用于中小规模)redis_list- Redis List 驱动redis_stream- Redis Stream 驱动rabbitmq- RabbitMQ 驱动
注意事项:
- DB 驱动:所有任务存储在数据库,无需额外中间件
- Redis/RabbitMQ 驱动:需要额外配置 Redis 或 RabbitMQ 连接
- 无论使用哪种驱动,所有任务都会写入
sys_task_queue表
数据库表结构
队列任务存储表:sys_task_queue
关键字段:
| 字段名 | 类型 | 说明 |
|---|---|---|
id | int | 主键 |
queue_type | varchar(50) | 任务类型(对应配置键) |
biz_key | varchar(255) | 业务唯一键(用于幂等,唯一索引) |
payload | text | 任务载荷(JSON格式,包含完整的任务信息) |
status | tinyint | 状态:0=待处理,1=完成,2=失败,3=处理中 |
priority | tinyint | 优先级(数值越大优先级越高) |
retry_count | int | 重试次数 |
max_retries | int | 最大重试次数(默认 3) |
error_message | text | 错误信息 |
scheduled_at | int | 计划执行时间(时间戳) |
consume_ms | int | 消费耗时(毫秒) |
queue_group | varchar(50) | 队列分组 |
version | int | 乐观锁版本号(预留字段) |
create_at | int | 创建时间 |
update_at | int | 更新时间 |
状态枚举:
php
use app\common\enum\system\SysTaskQueueEnum;
SysTaskQueueEnum::STATUS_PENDING = 0; // 待处理
SysTaskQueueEnum::STATUS_SUCCESS = 1; // 完成
SysTaskQueueEnum::STATUS_FAILED = 2; // 失败
SysTaskQueueEnum::STATUS_PROCESSING = 3; // 处理中队列分组枚举:
php
SysTaskQueueEnum::GROUP_DEFAULT = 'default'; // 默认分组
SysTaskQueueEnum::GROUP_ORDER = 'order'; // 订单分组
SysTaskQueueEnum::GROUP_USER = 'user'; // 用户分组快速开始
1. 创建任务处理器
创建文件:php-server/app/deshang/queue/handler/order/OrderCancelQueue.php
php
<?php
namespace app\deshang\queue\handler\order;
use app\deshang\queue\handler\QueueHandlerInterface;
use app\deshang\exceptions\CommonException;
use app\common\dao\order\TblOrderDao;
/**
* 订单取消队列处理器
*/
class OrderCancelQueue implements QueueHandlerInterface
{
/**
* 执行业务处理逻辑
*
* 注意:
* - 事务由 QueueConsumer 层统一管理,这里不需要 Db::startTrans()
* - 如果抛出异常,事务会自动回滚
*
* @param array $params 任务参数
* @return void
* @throws CommonException
*/
public function handle(array $params): void
{
$orderId = (int)($params['order_id'] ?? 0);
if ($orderId <= 0) {
throw new CommonException('order_id 缺失');
}
// 业务处理逻辑(无需事务管理)
$order = (new TblOrderDao())->getOrderInfo(['id' => $orderId]);
if (empty($order)) {
throw new CommonException('订单不存在');
}
// 执行取消操作...
// ...
}
}2. 注册处理器
编辑 app/deshang/queue/config.php:
php
'handlers' => [
'OrderGenerateSalesIncQueue' => \app\deshang\queue\handler\order\OrderGenerateSalesIncQueue::class,
'OrderCancelSalesDecQueue' => \app\deshang\queue\handler\order\OrderCancelSalesDecQueue::class,
'OrderCancelQueue' => \app\deshang\queue\handler\order\OrderCancelQueue::class, // 新增
],3. 投递任务
在业务代码中投递任务(批量接口):
php
use app\deshang\queue\core\QueueProducer;
use app\common\enum\system\SysTaskQueueEnum;
// 单个任务(也使用数组格式)
(new QueueProducer())->enqueue([
[
'type' => 'OrderCancelQueue',
'data' => [
'order_id' => 12345,
'cancel_reason' => '用户主动取消',
],
'options' => [
'biz_key' => 'OrderCancelQueue_12345', // 必填:业务唯一键
'priority' => 1, // 可选:优先级(默认 1)
'delay_sec' => 0, // 可选:延迟秒数(默认 0)
'queue_group' => SysTaskQueueEnum::GROUP_ORDER, // 可选:队列分组(默认 default)
],
],
]);
// 批量任务
(new QueueProducer())->enqueue([
[
'type' => 'OrderCancelSalesDecQueue',
'data' => ['order_goods_list' => $orderGoodsList],
'options' => [
'biz_key' => 'OrderCancelSalesDecQueue_' . $orderId,
'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
],
],
[
'type' => 'OrderCancelUserPointsQueue',
'data' => ['order_info' => $orderInfo],
'options' => [
'biz_key' => 'OrderCancelUserPointsQueue_' . $orderId,
'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
],
],
]);4. 配置定时任务
在 Linux Crontab 或系统定时任务中配置:
bash
# 每 30 秒执行一次队列消费(每次处理最多 200 条)
*/30 * * * * curl -s http://your-domain.com/crontab/queue/consume?batch=200 > /dev/null 2>&1生产者使用
基本用法
php
use app\deshang\queue\core\QueueProducer;
use app\common\enum\system\SysTaskQueueEnum;
$producer = new QueueProducer();
// 单个任务(也使用数组格式)
$producer->enqueue([
[
'type' => 'OrderGenerateSalesIncQueue',
'data' => [
'order_goods_list' => $orderGoodsList,
],
'options' => [
'biz_key' => 'OrderGenerateSalesIncQueue_' . $orderId,
'priority' => 1,
'delay_sec' => 0,
'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
],
],
]);
// 批量任务
$producer->enqueue([
[
'type' => 'OrderCancelSalesDecQueue',
'data' => ['order_goods_list' => $orderGoodsList],
'options' => ['biz_key' => 'OrderCancelSalesDecQueue_' . $orderId, ...],
],
[
'type' => 'OrderCancelUserPointsQueue',
'data' => ['order_info' => $orderInfo],
'options' => ['biz_key' => 'OrderCancelUserPointsQueue_' . $orderId, ...],
],
]);参数说明
enqueue(array $tasks): void
参数:
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
$tasks | array | 是 | 任务数组,每个元素包含 type、data、options |
任务元素结构:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
type | string | 是 | 任务类型,必须与 config('queue.handlers') 中的键一致 |
data | array | 是 | 任务参数,将传递给处理器的 handle() 方法 |
options | array | 是 | 选项配置 |
options 选项:
| 选项 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
biz_key | string | 是 | - | 业务唯一键,用于幂等性控制 |
priority | int | 否 | 1 | 优先级(0=普通,1=较高,2=最高) |
delay_sec | int | 否 | 0 | 延迟执行秒数 |
queue_group | string | 否 | 'default' | 队列分组(建议使用枚举常量) |
使用场景示例
1. 订单生成时增加销量
php
use app\deshang\queue\core\QueueProducer;
use app\common\enum\system\SysTaskQueueEnum;
// 订单生成后,异步增加商品和店铺销量
(new QueueProducer())->enqueue([
[
'type' => 'OrderGenerateSalesIncQueue',
'data' => [
'order_goods_list' => $orderGoodsList, // 直接传递订单商品列表,避免重复查询
],
'options' => [
'biz_key' => 'OrderGenerateSalesIncQueue_' . $orderId,
'priority' => 1, // 较高优先级
'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
],
],
]);2. 订单取消时减少销量
php
// 订单取消成功,异步减少商品和店铺销量
(new QueueProducer())->enqueue([
[
'type' => 'OrderCancelSalesDecQueue',
'data' => [
'order_goods_list' => $orderGoodsList,
],
'options' => [
'biz_key' => 'OrderCancelSalesDecQueue_' . $orderId,
'priority' => 1,
'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
],
],
]);3. 延迟任务(30 分钟后检查订单状态)
php
// 30 分钟后检查订单是否超时未支付
(new QueueProducer())->enqueue([
[
'type' => 'OrderTimeoutCheckQueue',
'data' => [
'order_id' => $orderId,
],
'options' => [
'biz_key' => 'OrderTimeoutCheckQueue_' . $orderId,
'delay_sec' => 1800, // 延迟 30 分钟(1800 秒)
'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
],
],
]);4. 批量任务(订单取消时同时处理销量和积分)
php
// 订单取消时,批量处理销量减少和积分扣除
(new QueueProducer())->enqueue([
[
'type' => 'OrderCancelSalesDecQueue',
'data' => ['order_goods_list' => $orderGoodsList],
'options' => [
'biz_key' => 'OrderCancelSalesDecQueue_' . $orderId,
'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
],
],
[
'type' => 'OrderCancelUserPointsQueue',
'data' => ['order_info' => $orderInfo],
'options' => [
'biz_key' => 'OrderCancelUserPointsQueue_' . $orderId,
'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
],
],
]);biz_key 设计规范
biz_key 用于保证任务的幂等性,建议遵循以下规范:
格式: {任务类型}_{业务标识}
示例:
php
// 订单相关
'OrderGenerateSalesIncQueue_' . $orderId
'OrderCancelSalesDecQueue_' . $orderId
'OrderPayUserPointsQueue_' . $orderId
// 商品相关
'GoodsScoreUpdateQueue_' . $goodsId
'GoodsStockReleaseQueue_' . $orderId . '_' . $skuId
// 用户相关
'UserBalanceAddQueue_' . $userId . '_' . $orderId注意事项:
biz_key必须唯一,相同biz_key的任务只会执行一次(数据库唯一索引)- 建议包含业务标识(如订单ID、商品ID等),便于追踪和调试
- 避免使用随机字符串,确保相同业务操作生成相同的
biz_key - 格式建议:
{QueueType}_{业务ID},如OrderGenerateSalesIncQueue_12345
处理器开发
接口定义
所有处理器必须实现 QueueHandlerInterface 接口:
php
interface QueueHandlerInterface
{
/**
* 执行业务处理逻辑
*
* 注意:
* - 事务由 QueueConsumer 层统一管理,这里不需要 Db::startTrans()
* - 如果抛出异常,事务会自动回滚
*
* @param array $params 任务参数(由生产者传入)
* @return void
* @throws \app\deshang\exceptions\CommonException
*/
public function handle(array $params): void;
}重要说明:
- 无需实现
supports()方法:路由通过config('queue.handlers')配置映射完成 - 无需管理事务:事务由
QueueConsumer层统一管理,Handler 只需专注业务逻辑 - 异常处理:抛出异常会自动回滚事务,并由消费者处理重试
开发步骤
1. 创建处理器类
创建文件:php-server/app/deshang/queue/handler/{模块}/{HandlerName}Queue.php
命名规范:
- 文件名:
{业务名称}Queue.php(如OrderCancelQueue.php) - 类名:与文件名一致
- 命名空间:
app\deshang\queue\handler\{模块}
2. 实现接口方法
php
<?php
namespace app\deshang\queue\handler\order;
use app\deshang\queue\handler\QueueHandlerInterface;
use app\deshang\exceptions\CommonException;
use app\common\dao\order\TblOrderDao;
class OrderCancelQueue implements QueueHandlerInterface
{
/**
* 执行业务处理逻辑
*
* 注意:
* - 事务由 QueueConsumer 层统一管理,这里不需要 Db::startTrans()
* - 如果抛出异常,事务会自动回滚
*
* @param array $params 任务参数
* @return void
* @throws CommonException
*/
public function handle(array $params): void
{
// 1. 参数验证
$orderId = (int)($params['order_id'] ?? 0);
if ($orderId <= 0) {
throw new CommonException('order_id 缺失');
}
// 2. 业务处理(无需事务管理)
$order = (new TblOrderDao())->getOrderInfo(['id' => $orderId]);
if (empty($order)) {
throw new CommonException('订单不存在');
}
// 执行业务逻辑...
// ...
}
}3. 注册到配置
在 app/deshang/queue/config.php 中注册:
php
'handlers' => [
'OrderCancelQueue' => \app\deshang\queue\handler\order\OrderCancelQueue::class,
],开发规范
1. 幂等性保证
处理器必须保证幂等性,即多次执行相同参数的任务,结果应该一致。
实现方式:
- 方案一:原子操作(推荐)
php
public function handle(array $params): void
{
$goodsId = (int)($params['goods_id'] ?? 0);
$quantity = (int)($params['quantity'] ?? 0);
// 使用数据库原子操作,天然幂等
(new TblGoodsDao())->setGoodsInc(['id' => $goodsId], 'sales_num', $quantity);
}- 方案二:悲观锁 + 状态标记
php
public function handle(array $params): void
{
$orderId = (int)($params['order_id'] ?? 0);
// 悲观锁订单行(注意:事务由 QueueConsumer 管理,这里只是查询)
$order = (new TblOrderDao())->getOrderInfo(
['id' => $orderId],
'id,status,is_processed',
true // FOR UPDATE
);
if (empty($order)) {
throw new CommonException('订单不存在');
}
// 检查是否已处理
if ($order['is_processed'] == 1) {
return; // 已处理,直接返回(幂等)
}
// 执行业务逻辑...
// 更新标记
(new TblOrderDao())->updateOrder(['id' => $orderId], [
'is_processed' => 1,
]);
}2. 异常处理
处理器必须抛出 CommonException,消费者会自动捕获并处理重试。
php
use app\deshang\exceptions\CommonException;
public function handle(array $params): void
{
// 参数验证失败
if (empty($params['order_id'])) {
throw new CommonException('order_id 缺失');
}
// 业务逻辑失败
if ($order['status'] !== 'pending') {
throw new CommonException('订单状态不允许此操作');
}
// 系统异常会自动转换为 CommonException
}3. 事务管理
重要:事务由 QueueConsumer 统一管理,Handler 无需自己管理事务。
php
// ✅ 正确:无需事务管理
public function handle(array $params): void
{
// 业务逻辑...
(new TblOrderDao())->updateOrder(['id' => $orderId], ['status' => 'cancelled']);
// 如果抛出异常,QueueConsumer 会自动回滚
}
// ❌ 错误:不要在 Handler 中管理事务
public function handle(array $params): void
{
Db::startTrans(); // 不要这样做!
try {
// ...
Db::commit();
} catch (\Throwable $e) {
Db::rollback();
}
}4. 使用 Dao 层
所有数据库操作必须通过 Dao 层,避免直接使用 Db::name():
php
// ✅ 正确
$order = (new TblOrderDao())->getOrderInfo(['id' => $orderId], '*', true);
// ❌ 错误
$order = Db::name('tbl_order')->where('id', $orderId)->find();完整示例
订单生成销量增加处理器:
php
<?php
namespace app\deshang\queue\handler\order;
use app\deshang\queue\handler\QueueHandlerInterface;
use app\common\dao\goods\TblGoodsDao;
use app\common\dao\store\TblStoreDao;
use app\deshang\exceptions\CommonException;
/**
* 订单生成销量增加队列处理器
*
* 说明:
* - 订单生成时,基于订单商品表(tbl_order_goods)将商品/店铺销量进行累加
* - 参数直接传递 order_goods_list,避免重复查询
* - 并发安全:事务由 QueueConsumer 层统一管理
*/
class OrderGenerateSalesIncQueue implements QueueHandlerInterface
{
/**
* 执行业务处理
*
* @param array $params 必须包含:
* - order_goods_list: array 订单商品列表(已在 Listener 中查询)
* @return void
* @throws CommonException
*/
public function handle(array $params): void
{
$orderGoodsList = $params['order_goods_list'] ?? [];
if (empty($orderGoodsList) || !is_array($orderGoodsList)) {
throw new CommonException('order_goods_list 缺失或格式错误');
}
// 累加商品与店铺销量
// 注意:事务由 QueueConsumer 层统一管理
foreach ($orderGoodsList as $og) {
$goodsId = (int)($og['goods_id'] ?? 0);
$storeId = (int)($og['store_id'] ?? 0);
$goodsNum = (int)($og['goods_num'] ?? 0);
if ($goodsId <= 0 || $storeId <= 0 || $goodsNum <= 0) {
continue; // 跳过无效数据
}
// 使用 Dao 层自增,保持数据访问一致性
(new TblGoodsDao())->setGoodsInc(['id' => $goodsId], 'sales_num', $goodsNum);
(new TblStoreDao())->setStoreInc(['id' => $storeId], 'sales_num', $goodsNum);
}
}
}消费者配置
定时任务配置
队列消费者通过定时任务定期执行,建议配置为每 30 秒执行一次。
Linux Crontab
bash
# 编辑 crontab
crontab -e
# 添加定时任务(每 30 秒执行一次,每次处理 200 条)
*/30 * * * * curl -s http://your-domain.com/crontab/queue/consume?batch=200 > /dev/null 2>&1
# 或者使用 PHP 命令行(推荐)
*/30 * * * * cd /path/to/php-server && php think queue:consume 200Windows 计划任务
创建批处理文件 consume-queue.bat:
batch
@echo off
curl -s http://localhost/crontab/queue/consume?batch=200在 Windows 计划任务中配置每 30 秒执行一次。
消费接口
URL: /crontab/queue/consume
请求方式: GET
参数:
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
batch | int | 否 | 200 | 每次最大处理条数(建议 100~500) |
响应:
json
{
"code": 200,
"msg": "执行成功",
"data": null
}批量处理建议
- 小规模系统:
batch=100,每 30 秒执行一次 - 中等规模系统:
batch=200,每 30 秒执行一次 - 大规模系统:
batch=500,每 10 秒执行一次,或部署多个消费者
监控建议
建议监控以下指标:
- 待处理任务数:
SELECT COUNT(*) FROM sys_task_queue WHERE status = 0 - 处理中任务数:
SELECT COUNT(*) FROM sys_task_queue WHERE status = 3 - 失败任务数:
SELECT COUNT(*) FROM sys_task_queue WHERE status = 2 - 平均处理耗时:
SELECT AVG(consume_ms) FROM sys_task_queue WHERE status = 1
驱动说明
DB 驱动(默认)
特点:
- 所有任务存储在
sys_task_queue表 - 无需额外中间件
- 支持事务和复杂查询
- 适合中小规模系统
使用场景:
- 任务量 < 1000 条/分钟
- 不需要分布式队列
- 希望统一管理所有任务
配置:
php
// app/deshang/queue/config.php
'driver' => 'db',Redis List 驱动
特点:
- 高性能,适合大规模任务
- 支持延迟任务(使用 ZSET)
- 需要配置 Redis 连接
使用场景:
- 任务量 > 1000 条/分钟
- 需要分布式队列
- 已有 Redis 基础设施
配置:
php
// app/deshang/queue/config.php
'driver' => 'redis_list',Redis 配置: 需要在 config/cache.php 中配置 Redis 连接。
Redis Stream 驱动
特点:
- 支持消息持久化
- 支持消费者组
- 适合高可靠性场景
使用场景:
- 需要消息持久化
- 需要消费者组功能
- 高可靠性要求
配置:
php
// app/deshang/queue/config.php
'driver' => 'redis_stream',RabbitMQ 驱动
特点:
- 企业级消息队列
- 支持多种队列模式
- 需要单独部署 RabbitMQ
使用场景:
- 大规模分布式系统
- 需要复杂路由规则
- 已有 RabbitMQ 基础设施
配置:
php
// app/deshang/queue/config.php
'driver' => 'rabbitmq',注意事项:
- 无论使用哪种驱动,所有任务都会写入
sys_task_queue表 - DB 驱动无需推送队列,其他驱动需要先写入表再推送队列
最佳实践
1. 任务类型命名规范
推荐格式: {模块}{动作}Queue(PascalCase)
示例:
php
'OrderGenerateSalesIncQueue' // 订单生成销量增加
'OrderCancelSalesDecQueue' // 订单取消销量减少
'OrderPayUserPointsQueue' // 订单支付增加积分
'OrderCancelUserPointsQueue' // 订单取消扣除积分不推荐:
php
'order_sales_inc' // 不要使用下划线命名
'OrderSalesInc' // 不要缺少 Queue 后缀2. biz_key 设计规范
格式: {任务类型}_{业务标识}
示例:
php
// 单一业务标识
'OrderGenerateSalesIncQueue_' . $orderId
// 多个业务标识(用下划线连接)
'GoodsStockReleaseQueue_' . $orderId . '_' . $skuId3. 优先级使用建议
| 优先级值 | 枚举常量 | 使用场景 |
|---|---|---|
| 0 | PRIORITY_NORMAL | 普通任务 |
| 1 | PRIORITY_HIGH | 重要任务(如订单处理,默认值) |
| 2 | PRIORITY_URGENT | 紧急任务(如库存告警) |
4. 延迟任务使用场景
- 订单超时检查:延迟 30 分钟检查订单是否支付
- 库存释放:延迟 10 分钟释放未支付订单的库存
- 优惠券过期提醒:延迟到优惠券过期前 1 天发送提醒
5. 事务管理规范
重要:事务由 QueueConsumer 统一管理,Handler 无需自己管理事务。
php
// ✅ 正确:Handler 无需事务管理
public function handle(array $params): void
{
// 业务逻辑...
// 如果抛出异常,QueueConsumer 会自动回滚
}
// ❌ 错误:不要在 Handler 中管理事务
public function handle(array $params): void
{
Db::startTrans(); // 不要这样做!
// ...
}6. 错误处理规范
- 参数验证失败:直接抛出
CommonException,不重试 - 业务逻辑失败:抛出
CommonException,自动重试 - 系统异常:消费者自动捕获并重试
7. 性能优化建议
- 批量处理:使用
setInc、setDec等原子操作,避免逐条更新 - 减少查询:一次查询获取所有需要的数据(如直接传递
order_goods_list) - 索引优化:确保
biz_key、queue_type、status、scheduled_at有索引
8. 监控和告警
建议监控以下指标:
- 任务积压:待处理任务数 > 1000 时告警
- 失败率:失败任务数 / 总任务数 > 5% 时告警
- 处理耗时:平均处理耗时 > 5 秒时告警
- 重试率:重试次数 > 1 的任务占比 > 10% 时告警
常见问题
1. 任务重复执行怎么办?
原因:
biz_key不唯一或未设置- 处理器未保证幂等性
解决方案:
- 确保
biz_key唯一且有意义(数据库唯一索引) - 在处理器中使用原子操作或状态标记保证幂等性
2. 任务一直处于处理中状态?
原因:
- 消费者进程异常退出
- 处理时间过长,超过定时任务执行间隔
解决方案:
- 检查消费者进程是否正常运行
- 优化处理器性能,减少处理时间
- 增加超时机制,自动将超时任务重置为待处理
3. 任务失败后如何重试?
系统自动重试机制:
- 重试次数:默认最多重试 3 次(可在创建任务时通过
max_retries配置) - 重试策略:指数退避(60s, 120s, 240s)
- 失败处理:超过最大重试次数后标记为失败(status=2)
手动重试:
sql
-- 将失败任务重置为待处理
UPDATE sys_task_queue
SET status = 0, retry_count = 0, error_message = NULL
WHERE status = 2 AND id = ?;4. 如何查看任务执行情况?
sql
-- 查看待处理任务
SELECT * FROM sys_task_queue WHERE status = 0 ORDER BY priority DESC, scheduled_at ASC LIMIT 100;
-- 查看处理中任务
SELECT * FROM sys_task_queue WHERE status = 3;
-- 查看失败任务
SELECT * FROM sys_task_queue WHERE status = 2 ORDER BY update_at DESC LIMIT 100;
-- 查看任务统计
SELECT
status,
COUNT(*) as count,
AVG(consume_ms) as avg_ms,
MAX(consume_ms) as max_ms
FROM sys_task_queue
WHERE create_at >= UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 1 DAY))
GROUP BY status;5. 如何清理历史任务?
sql
-- 删除 30 天前的已完成任务
DELETE FROM sys_task_queue
WHERE status = 1 AND create_at < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 DAY));
-- 删除 7 天前的失败任务(需要先人工处理)
DELETE FROM sys_task_queue
WHERE status = 2 AND update_at < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 7 DAY));6. 如何切换队列驱动?
- 修改
app/deshang/queue/config.php中的driver配置 - 确保对应的中间件(Redis/RabbitMQ)已配置
- 重启消费者进程
7. 任务处理太慢怎么办?
优化方向:
- 增加消费者数量(多个定时任务并行)
- 优化处理器性能(减少数据库查询、使用批量操作)
- 调整
batch参数(增加单次处理数量) - 使用更快的队列驱动(Redis/RabbitMQ)
8. 如何保证高可用?
- 多消费者部署:部署多个消费者进程,自动负载均衡
- 监控告警:监控任务积压、失败率等指标
- 自动恢复:失败任务自动重试,超过重试次数后人工介入
9. Handler 中是否需要管理事务?
不需要。 事务由 QueueConsumer 层统一管理:
- Handler 抛出异常时,事务会自动回滚
- Handler 成功执行后,
markSuccess()会在同一事务中更新状态 - 这确保了业务逻辑和状态更新的原子性
错误示例:
php
// ❌ 错误:不要在 Handler 中管理事务
public function handle(array $params): void
{
Db::startTrans();
try {
// ...
Db::commit();
} catch (\Throwable $e) {
Db::rollback();
}
}正确示例:
php
// ✅ 正确:无需事务管理
public function handle(array $params): void
{
// 业务逻辑...
// 如果抛出异常,QueueConsumer 会自动回滚
}总结
消息队列系统提供了完整的异步任务处理方案,支持多种队列驱动、自动重试、优先级控制等功能。在使用过程中,需要注意:
- 幂等性:确保处理器幂等,避免重复执行
- 异常处理:正确抛出
CommonException,让系统自动重试 - 事务管理:事务由
QueueConsumer统一管理,Handler 无需自己管理事务 - 监控告警:及时监控任务执行情况,避免任务积压
- 批量接口:统一使用批量入队接口,简化代码维护
如有问题,请参考本文档或联系开发团队。
最后更新:2024-01-20
维护者:DSPlatform技术团队(德尚网络)
获取帮助
如果您在使用过程中遇到问题,可以通过以下方式获取帮助:
- 官方网站:https://www.csdeshang.com
- 电话咨询:15364080101(微信同号)
- QQ咨询:858761000
- 邮箱咨询:858761000@qq.com
- 工作时间:工作日 9:00-18:00
- 微信咨询:扫码添加微信

版权所有 © 2014-至今 德尚网络