Skip to content

消息队列开发指南

目录

系统概述

消息队列系统用于异步处理业务任务,支持多种队列驱动(数据库、Redis、RabbitMQ),提供统一的生产者/消费者接口,确保任务处理的可靠性、幂等性和高并发安全。

核心特性

  • 多驱动支持:数据库(DB)、Redis List、Redis Stream、RabbitMQ
  • 幂等性保证:通过 biz_key 唯一标识,防止重复处理
  • 延迟任务:支持延迟执行(delay_sec
  • 优先级控制:支持任务优先级排序
  • 自动重试:失败任务自动重试,支持指数退避
  • 并发安全:原子抢占机制,防止多进程重复消费
  • 统一存储:所有任务统一入库 sys_task_queue 表,便于监控和管理
  • 事务统一管理:事务由 QueueConsumer 层统一管理,确保业务逻辑和状态更新的一致性

目录结构

app/deshang/queue/
├── core/                          # 核心类
│   ├── QueueProducer.php         # 生产者(统一入队入口)
│   └── QueueConsumer.php         # 消费者(定时任务消费)
├── driver/                        # 驱动实现
│   ├── DbDriver.php              # 数据库驱动
│   ├── RedisListDriver.php       # Redis List 驱动
│   ├── RedisStreamDriver.php     # Redis Stream 驱动
│   └── RabbitMqDriver.php        # RabbitMQ 驱动
├── handler/                       # 任务处理器
│   ├── QueueHandlerInterface.php  # 处理器接口
│   ├── order/                     # 订单相关处理器
│   │   ├── OrderGenerateSalesIncQueue.php    # 订单生成销量增加
│   │   ├── OrderCancelSalesDecQueue.php       # 订单取消销量减少
│   │   ├── OrderCloseSalesDecQueue.php        # 订单关闭销量减少
│   │   ├── OrderPayUserPointsQueue.php        # 订单支付增加积分
│   │   ├── OrderCancelUserPointsQueue.php     # 订单取消扣除积分
│   │   ├── OrderPayUserGrowthQueue.php         # 订单支付增加成长值
│   │   └── OrderCancelUserGrowthQueue.php      # 订单取消扣除成长值
│   ├── goods/                     # 商品相关处理器
│   └── user/                      # 用户相关处理器
└── config.php                     # 队列配置文件

架构设计

工作流程

业务代码

QueueProducer::enqueue([...])  // 批量入队接口

1. 写入 sys_task_queue 表(status=0,payload 字段存储 JSON)

2. 根据驱动类型推送队列(DB 驱动跳过此步)

定时任务 /crontab/queue/consume

QueueConsumer::consume()

1. Driver::popBatch() 原子抢占任务(status 0→3)

2. 根据 queue_type 路由到对应 Handler(通过 config('queue.handlers') 映射)

3. 开启事务 → Handler::handle() 执行业务逻辑 → markSuccess() 更新状态 → 提交事务

4. 如果失败:回滚事务 → handleFailure() 重试/标记失败

关键组件

1. QueueProducer(生产者)

统一的生产者入口,封装底层驱动差异,业务代码只需调用 enqueue() 方法。

职责:

  • 统一批量入队接口(单个任务也传入数组)
  • 写入 sys_task_queue 表(所有任务)
  • 根据驱动类型推送到队列(非 DB 驱动)

重要特性:

  • 统一使用批量接口,简化代码维护
  • 数据库已有唯一约束,biz_key 重复时会自动抛出异常
  • 外部调用已有事务时,内部不重复开启事务

2. QueueConsumer(消费者)

队列消费者,由定时任务唤起,批量处理任务。

职责:

  • 从驱动批量获取任务(原子抢占)
  • 根据 queue_type 路由到对应处理器
  • 统一事务管理:在 processTask 中开启事务,确保业务逻辑和状态更新的一致性
  • 处理成功/失败后的状态回写
  • 失败重试机制(指数退避)

事务管理:

  • 事务由 QueueConsumer 层统一管理
  • Handler 不需要自己管理事务(无需 Db::startTrans()
  • 如果 Handler 抛出异常,事务会自动回滚

3. Driver(驱动)

底层队列驱动实现,支持多种队列系统。

接口方法:

  • push(array $payload, int $delaySec = 0): void - 推送任务
  • popBatch(int $max = 200): array - 批量获取任务(返回 JSON 字符串数组)
  • migrateDueDelay(int $max = 500): void - 迁移到期延迟任务

4. Handler(处理器)

具体业务处理逻辑,实现 QueueHandlerInterface 接口。

接口方法:

  • handle(array $params): void - 执行处理逻辑(无需 supports() 方法)

重要说明:

  • 路由通过 config('queue.handlers') 配置映射完成,无需实现 supports() 方法
  • 事务由 QueueConsumer 统一管理,Handler 无需自己管理事务
  • Handler 只需专注业务逻辑,异常会自动被捕获并回滚

配置说明

配置文件

配置文件位置:app/deshang/queue/config.php

php
<?php

return [
    // 任务类型 => 处理器类(需实现 \app\deshang\queue\handler\QueueHandlerInterface)
    'handlers' => [
        // 订单相关队列处理器
        'OrderGenerateSalesIncQueue'     => \app\deshang\queue\handler\order\OrderGenerateSalesIncQueue::class,
        'OrderCancelSalesDecQueue'       => \app\deshang\queue\handler\order\OrderCancelSalesDecQueue::class,
        'OrderCloseSalesDecQueue'        => \app\deshang\queue\handler\order\OrderCloseSalesDecQueue::class,
        
        // 用户积分相关队列处理器
        'OrderPayUserPointsQueue'        => \app\deshang\queue\handler\order\OrderPayUserPointsQueue::class,
        'OrderCancelUserPointsQueue'     => \app\deshang\queue\handler\order\OrderCancelUserPointsQueue::class,
        
        // 用户成长值相关队列处理器
        'OrderPayUserGrowthQueue'        => \app\deshang\queue\handler\order\OrderPayUserGrowthQueue::class,
        'OrderCancelUserGrowthQueue'     => \app\deshang\queue\handler\order\OrderCancelUserGrowthQueue::class,
    ],

    // 默认驱动(可选:redis_list | redis_stream | rabbitmq | db)
    'driver' => 'db',
];

配置项说明

handlers(处理器映射)

格式: '任务类型' => 处理器类全路径

说明:

  • 键名(任务类型)必须与 QueueProducer::enqueue()type 参数完全一致
  • 值必须是实现了 QueueHandlerInterface 的类
  • 处理器类无需实现 supports() 方法,路由通过配置映射完成

示例:

php
'handlers' => [
    'OrderGenerateSalesIncQueue' => \app\deshang\queue\handler\order\OrderGenerateSalesIncQueue::class,
    'OrderCancelSalesDecQueue' => \app\deshang\queue\handler\order\OrderCancelSalesDecQueue::class,
],

driver(队列驱动)

可选值:

  • db - 数据库驱动(默认,推荐用于中小规模)
  • redis_list - Redis List 驱动
  • redis_stream - Redis Stream 驱动
  • rabbitmq - RabbitMQ 驱动

注意事项:

  • DB 驱动:所有任务存储在数据库,无需额外中间件
  • Redis/RabbitMQ 驱动:需要额外配置 Redis 或 RabbitMQ 连接
  • 无论使用哪种驱动,所有任务都会写入 sys_task_queue

数据库表结构

队列任务存储表:sys_task_queue

关键字段:

字段名类型说明
idint主键
queue_typevarchar(50)任务类型(对应配置键)
biz_keyvarchar(255)业务唯一键(用于幂等,唯一索引)
payloadtext任务载荷(JSON格式,包含完整的任务信息)
statustinyint状态:0=待处理,1=完成,2=失败,3=处理中
prioritytinyint优先级(数值越大优先级越高)
retry_countint重试次数
max_retriesint最大重试次数(默认 3)
error_messagetext错误信息
scheduled_atint计划执行时间(时间戳)
consume_msint消费耗时(毫秒)
queue_groupvarchar(50)队列分组
versionint乐观锁版本号(预留字段)
create_atint创建时间
update_atint更新时间

状态枚举:

php
use app\common\enum\system\SysTaskQueueEnum;

SysTaskQueueEnum::STATUS_PENDING    = 0;  // 待处理
SysTaskQueueEnum::STATUS_SUCCESS    = 1;  // 完成
SysTaskQueueEnum::STATUS_FAILED     = 2;  // 失败
SysTaskQueueEnum::STATUS_PROCESSING = 3;  // 处理中

队列分组枚举:

php
SysTaskQueueEnum::GROUP_DEFAULT = 'default';  // 默认分组
SysTaskQueueEnum::GROUP_ORDER   = 'order';    // 订单分组
SysTaskQueueEnum::GROUP_USER    = 'user';     // 用户分组

快速开始

1. 创建任务处理器

创建文件:php-server/app/deshang/queue/handler/order/OrderCancelQueue.php

php
<?php

namespace app\deshang\queue\handler\order;

use app\deshang\queue\handler\QueueHandlerInterface;
use app\deshang\exceptions\CommonException;
use app\common\dao\order\TblOrderDao;

/**
 * 订单取消队列处理器
 */
class OrderCancelQueue implements QueueHandlerInterface
{
    /**
     * 执行业务处理逻辑
     *
     * 注意:
     * - 事务由 QueueConsumer 层统一管理,这里不需要 Db::startTrans()
     * - 如果抛出异常,事务会自动回滚
     *
     * @param array $params 任务参数
     * @return void
     * @throws CommonException
     */
    public function handle(array $params): void
    {
        $orderId = (int)($params['order_id'] ?? 0);
        if ($orderId <= 0) {
            throw new CommonException('order_id 缺失');
        }

        // 业务处理逻辑(无需事务管理)
        $order = (new TblOrderDao())->getOrderInfo(['id' => $orderId]);
        if (empty($order)) {
            throw new CommonException('订单不存在');
        }

        // 执行取消操作...
        // ...
    }
}

2. 注册处理器

编辑 app/deshang/queue/config.php

php
'handlers' => [
    'OrderGenerateSalesIncQueue'     => \app\deshang\queue\handler\order\OrderGenerateSalesIncQueue::class,
    'OrderCancelSalesDecQueue'       => \app\deshang\queue\handler\order\OrderCancelSalesDecQueue::class,
    'OrderCancelQueue'               => \app\deshang\queue\handler\order\OrderCancelQueue::class,  // 新增
],

3. 投递任务

在业务代码中投递任务(批量接口):

php
use app\deshang\queue\core\QueueProducer;
use app\common\enum\system\SysTaskQueueEnum;

// 单个任务(也使用数组格式)
(new QueueProducer())->enqueue([
    [
        'type' => 'OrderCancelQueue',
        'data' => [
            'order_id' => 12345,
            'cancel_reason' => '用户主动取消',
        ],
        'options' => [
            'biz_key' => 'OrderCancelQueue_12345',  // 必填:业务唯一键
            'priority' => 1,                         // 可选:优先级(默认 1)
            'delay_sec' => 0,                        // 可选:延迟秒数(默认 0)
            'queue_group' => SysTaskQueueEnum::GROUP_ORDER,  // 可选:队列分组(默认 default)
        ],
    ],
]);

// 批量任务
(new QueueProducer())->enqueue([
    [
        'type' => 'OrderCancelSalesDecQueue',
        'data' => ['order_goods_list' => $orderGoodsList],
        'options' => [
            'biz_key' => 'OrderCancelSalesDecQueue_' . $orderId,
            'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
        ],
    ],
    [
        'type' => 'OrderCancelUserPointsQueue',
        'data' => ['order_info' => $orderInfo],
        'options' => [
            'biz_key' => 'OrderCancelUserPointsQueue_' . $orderId,
            'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
        ],
    ],
]);

4. 配置定时任务

在 Linux Crontab 或系统定时任务中配置:

bash
# 每 30 秒执行一次队列消费(每次处理最多 200 条)
*/30 * * * * curl -s http://your-domain.com/crontab/queue/consume?batch=200 > /dev/null 2>&1

生产者使用

基本用法

php
use app\deshang\queue\core\QueueProducer;
use app\common\enum\system\SysTaskQueueEnum;

$producer = new QueueProducer();

// 单个任务(也使用数组格式)
$producer->enqueue([
    [
        'type' => 'OrderGenerateSalesIncQueue',
        'data' => [
            'order_goods_list' => $orderGoodsList,
        ],
        'options' => [
            'biz_key' => 'OrderGenerateSalesIncQueue_' . $orderId,
            'priority' => 1,
            'delay_sec' => 0,
            'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
        ],
    ],
]);

// 批量任务
$producer->enqueue([
    [
        'type' => 'OrderCancelSalesDecQueue',
        'data' => ['order_goods_list' => $orderGoodsList],
        'options' => ['biz_key' => 'OrderCancelSalesDecQueue_' . $orderId, ...],
    ],
    [
        'type' => 'OrderCancelUserPointsQueue',
        'data' => ['order_info' => $orderInfo],
        'options' => ['biz_key' => 'OrderCancelUserPointsQueue_' . $orderId, ...],
    ],
]);

参数说明

enqueue(array $tasks): void

参数:

参数类型必填说明
$tasksarray任务数组,每个元素包含 typedataoptions

任务元素结构:

字段类型必填说明
typestring任务类型,必须与 config('queue.handlers') 中的键一致
dataarray任务参数,将传递给处理器的 handle() 方法
optionsarray选项配置

options 选项:

选项类型必填默认值说明
biz_keystring-业务唯一键,用于幂等性控制
priorityint1优先级(0=普通,1=较高,2=最高)
delay_secint0延迟执行秒数
queue_groupstring'default'队列分组(建议使用枚举常量)

使用场景示例

1. 订单生成时增加销量

php
use app\deshang\queue\core\QueueProducer;
use app\common\enum\system\SysTaskQueueEnum;

// 订单生成后,异步增加商品和店铺销量
(new QueueProducer())->enqueue([
    [
        'type' => 'OrderGenerateSalesIncQueue',
        'data' => [
            'order_goods_list' => $orderGoodsList,  // 直接传递订单商品列表,避免重复查询
        ],
        'options' => [
            'biz_key' => 'OrderGenerateSalesIncQueue_' . $orderId,
            'priority' => 1,  // 较高优先级
            'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
        ],
    ],
]);

2. 订单取消时减少销量

php
// 订单取消成功,异步减少商品和店铺销量
(new QueueProducer())->enqueue([
    [
        'type' => 'OrderCancelSalesDecQueue',
        'data' => [
            'order_goods_list' => $orderGoodsList,
        ],
        'options' => [
            'biz_key' => 'OrderCancelSalesDecQueue_' . $orderId,
            'priority' => 1,
            'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
        ],
    ],
]);

3. 延迟任务(30 分钟后检查订单状态)

php
// 30 分钟后检查订单是否超时未支付
(new QueueProducer())->enqueue([
    [
        'type' => 'OrderTimeoutCheckQueue',
        'data' => [
            'order_id' => $orderId,
        ],
        'options' => [
            'biz_key' => 'OrderTimeoutCheckQueue_' . $orderId,
            'delay_sec' => 1800,  // 延迟 30 分钟(1800 秒)
            'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
        ],
    ],
]);

4. 批量任务(订单取消时同时处理销量和积分)

php
// 订单取消时,批量处理销量减少和积分扣除
(new QueueProducer())->enqueue([
    [
        'type' => 'OrderCancelSalesDecQueue',
        'data' => ['order_goods_list' => $orderGoodsList],
        'options' => [
            'biz_key' => 'OrderCancelSalesDecQueue_' . $orderId,
            'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
        ],
    ],
    [
        'type' => 'OrderCancelUserPointsQueue',
        'data' => ['order_info' => $orderInfo],
        'options' => [
            'biz_key' => 'OrderCancelUserPointsQueue_' . $orderId,
            'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
        ],
    ],
]);

biz_key 设计规范

biz_key 用于保证任务的幂等性,建议遵循以下规范:

格式: {任务类型}_{业务标识}

示例:

php
// 订单相关
'OrderGenerateSalesIncQueue_' . $orderId
'OrderCancelSalesDecQueue_' . $orderId
'OrderPayUserPointsQueue_' . $orderId

// 商品相关
'GoodsScoreUpdateQueue_' . $goodsId
'GoodsStockReleaseQueue_' . $orderId . '_' . $skuId

// 用户相关
'UserBalanceAddQueue_' . $userId . '_' . $orderId

注意事项:

  • biz_key 必须唯一,相同 biz_key 的任务只会执行一次(数据库唯一索引)
  • 建议包含业务标识(如订单ID、商品ID等),便于追踪和调试
  • 避免使用随机字符串,确保相同业务操作生成相同的 biz_key
  • 格式建议:{QueueType}_{业务ID},如 OrderGenerateSalesIncQueue_12345

处理器开发

接口定义

所有处理器必须实现 QueueHandlerInterface 接口:

php
interface QueueHandlerInterface
{
    /**
     * 执行业务处理逻辑
     *
     * 注意:
     * - 事务由 QueueConsumer 层统一管理,这里不需要 Db::startTrans()
     * - 如果抛出异常,事务会自动回滚
     *
     * @param array $params 任务参数(由生产者传入)
     * @return void
     * @throws \app\deshang\exceptions\CommonException
     */
    public function handle(array $params): void;
}

重要说明:

  • 无需实现 supports() 方法:路由通过 config('queue.handlers') 配置映射完成
  • 无需管理事务:事务由 QueueConsumer 层统一管理,Handler 只需专注业务逻辑
  • 异常处理:抛出异常会自动回滚事务,并由消费者处理重试

开发步骤

1. 创建处理器类

创建文件:php-server/app/deshang/queue/handler/{模块}/{HandlerName}Queue.php

命名规范:

  • 文件名:{业务名称}Queue.php(如 OrderCancelQueue.php
  • 类名:与文件名一致
  • 命名空间:app\deshang\queue\handler\{模块}

2. 实现接口方法

php
<?php

namespace app\deshang\queue\handler\order;

use app\deshang\queue\handler\QueueHandlerInterface;
use app\deshang\exceptions\CommonException;
use app\common\dao\order\TblOrderDao;

class OrderCancelQueue implements QueueHandlerInterface
{
    /**
     * 执行业务处理逻辑
     *
     * 注意:
     * - 事务由 QueueConsumer 层统一管理,这里不需要 Db::startTrans()
     * - 如果抛出异常,事务会自动回滚
     *
     * @param array $params 任务参数
     * @return void
     * @throws CommonException
     */
    public function handle(array $params): void
    {
        // 1. 参数验证
        $orderId = (int)($params['order_id'] ?? 0);
        if ($orderId <= 0) {
            throw new CommonException('order_id 缺失');
        }

        // 2. 业务处理(无需事务管理)
        $order = (new TblOrderDao())->getOrderInfo(['id' => $orderId]);
        if (empty($order)) {
            throw new CommonException('订单不存在');
        }

        // 执行业务逻辑...
        // ...
    }
}

3. 注册到配置

app/deshang/queue/config.php 中注册:

php
'handlers' => [
    'OrderCancelQueue' => \app\deshang\queue\handler\order\OrderCancelQueue::class,
],

开发规范

1. 幂等性保证

处理器必须保证幂等性,即多次执行相同参数的任务,结果应该一致。

实现方式:

  • 方案一:原子操作(推荐)
php
public function handle(array $params): void
{
    $goodsId = (int)($params['goods_id'] ?? 0);
    $quantity = (int)($params['quantity'] ?? 0);
    
    // 使用数据库原子操作,天然幂等
    (new TblGoodsDao())->setGoodsInc(['id' => $goodsId], 'sales_num', $quantity);
}
  • 方案二:悲观锁 + 状态标记
php
public function handle(array $params): void
{
    $orderId = (int)($params['order_id'] ?? 0);
    
    // 悲观锁订单行(注意:事务由 QueueConsumer 管理,这里只是查询)
    $order = (new TblOrderDao())->getOrderInfo(
        ['id' => $orderId], 
        'id,status,is_processed', 
        true  // FOR UPDATE
    );
    
    if (empty($order)) {
        throw new CommonException('订单不存在');
    }
    
    // 检查是否已处理
    if ($order['is_processed'] == 1) {
        return;  // 已处理,直接返回(幂等)
    }
    
    // 执行业务逻辑...
    
    // 更新标记
    (new TblOrderDao())->updateOrder(['id' => $orderId], [
        'is_processed' => 1,
    ]);
}

2. 异常处理

处理器必须抛出 CommonException,消费者会自动捕获并处理重试。

php
use app\deshang\exceptions\CommonException;

public function handle(array $params): void
{
    // 参数验证失败
    if (empty($params['order_id'])) {
        throw new CommonException('order_id 缺失');
    }
    
    // 业务逻辑失败
    if ($order['status'] !== 'pending') {
        throw new CommonException('订单状态不允许此操作');
    }
    
    // 系统异常会自动转换为 CommonException
}

3. 事务管理

重要:事务由 QueueConsumer 统一管理,Handler 无需自己管理事务。

php
// ✅ 正确:无需事务管理
public function handle(array $params): void
{
    // 业务逻辑...
    (new TblOrderDao())->updateOrder(['id' => $orderId], ['status' => 'cancelled']);
    // 如果抛出异常,QueueConsumer 会自动回滚
}

// ❌ 错误:不要在 Handler 中管理事务
public function handle(array $params): void
{
    Db::startTrans();  // 不要这样做!
    try {
        // ...
        Db::commit();
    } catch (\Throwable $e) {
        Db::rollback();
    }
}

4. 使用 Dao 层

所有数据库操作必须通过 Dao 层,避免直接使用 Db::name()

php
// ✅ 正确
$order = (new TblOrderDao())->getOrderInfo(['id' => $orderId], '*', true);

// ❌ 错误
$order = Db::name('tbl_order')->where('id', $orderId)->find();

完整示例

订单生成销量增加处理器:

php
<?php

namespace app\deshang\queue\handler\order;

use app\deshang\queue\handler\QueueHandlerInterface;
use app\common\dao\goods\TblGoodsDao;
use app\common\dao\store\TblStoreDao;
use app\deshang\exceptions\CommonException;

/**
 * 订单生成销量增加队列处理器
 *
 * 说明:
 * - 订单生成时,基于订单商品表(tbl_order_goods)将商品/店铺销量进行累加
 * - 参数直接传递 order_goods_list,避免重复查询
 * - 并发安全:事务由 QueueConsumer 层统一管理
 */
class OrderGenerateSalesIncQueue implements QueueHandlerInterface
{
    /**
     * 执行业务处理
     *
     * @param array $params 必须包含:
     *   - order_goods_list: array 订单商品列表(已在 Listener 中查询)
     * @return void
     * @throws CommonException
     */
    public function handle(array $params): void
    {
        $orderGoodsList = $params['order_goods_list'] ?? [];
        if (empty($orderGoodsList) || !is_array($orderGoodsList)) {
            throw new CommonException('order_goods_list 缺失或格式错误');
        }

        // 累加商品与店铺销量
        // 注意:事务由 QueueConsumer 层统一管理
        foreach ($orderGoodsList as $og) {
            $goodsId = (int)($og['goods_id'] ?? 0);
            $storeId = (int)($og['store_id'] ?? 0);
            $goodsNum = (int)($og['goods_num'] ?? 0);

            if ($goodsId <= 0 || $storeId <= 0 || $goodsNum <= 0) {
                continue; // 跳过无效数据
            }

            // 使用 Dao 层自增,保持数据访问一致性
            (new TblGoodsDao())->setGoodsInc(['id' => $goodsId], 'sales_num', $goodsNum);
            (new TblStoreDao())->setStoreInc(['id' => $storeId], 'sales_num', $goodsNum);
        }
    }
}

消费者配置

定时任务配置

队列消费者通过定时任务定期执行,建议配置为每 30 秒执行一次。

Linux Crontab

bash
# 编辑 crontab
crontab -e

# 添加定时任务(每 30 秒执行一次,每次处理 200 条)
*/30 * * * * curl -s http://your-domain.com/crontab/queue/consume?batch=200 > /dev/null 2>&1

# 或者使用 PHP 命令行(推荐)
*/30 * * * * cd /path/to/php-server && php think queue:consume 200

Windows 计划任务

创建批处理文件 consume-queue.bat

batch
@echo off
curl -s http://localhost/crontab/queue/consume?batch=200

在 Windows 计划任务中配置每 30 秒执行一次。

消费接口

URL: /crontab/queue/consume

请求方式: GET

参数:

参数类型必填默认值说明
batchint200每次最大处理条数(建议 100~500)

响应:

json
{
    "code": 200,
    "msg": "执行成功",
    "data": null
}

批量处理建议

  • 小规模系统batch=100,每 30 秒执行一次
  • 中等规模系统batch=200,每 30 秒执行一次
  • 大规模系统batch=500,每 10 秒执行一次,或部署多个消费者

监控建议

建议监控以下指标:

  • 待处理任务数SELECT COUNT(*) FROM sys_task_queue WHERE status = 0
  • 处理中任务数SELECT COUNT(*) FROM sys_task_queue WHERE status = 3
  • 失败任务数SELECT COUNT(*) FROM sys_task_queue WHERE status = 2
  • 平均处理耗时SELECT AVG(consume_ms) FROM sys_task_queue WHERE status = 1

驱动说明

DB 驱动(默认)

特点:

  • 所有任务存储在 sys_task_queue
  • 无需额外中间件
  • 支持事务和复杂查询
  • 适合中小规模系统

使用场景:

  • 任务量 < 1000 条/分钟
  • 不需要分布式队列
  • 希望统一管理所有任务

配置:

php
// app/deshang/queue/config.php
'driver' => 'db',

Redis List 驱动

特点:

  • 高性能,适合大规模任务
  • 支持延迟任务(使用 ZSET)
  • 需要配置 Redis 连接

使用场景:

  • 任务量 > 1000 条/分钟
  • 需要分布式队列
  • 已有 Redis 基础设施

配置:

php
// app/deshang/queue/config.php
'driver' => 'redis_list',

Redis 配置: 需要在 config/cache.php 中配置 Redis 连接。

Redis Stream 驱动

特点:

  • 支持消息持久化
  • 支持消费者组
  • 适合高可靠性场景

使用场景:

  • 需要消息持久化
  • 需要消费者组功能
  • 高可靠性要求

配置:

php
// app/deshang/queue/config.php
'driver' => 'redis_stream',

RabbitMQ 驱动

特点:

  • 企业级消息队列
  • 支持多种队列模式
  • 需要单独部署 RabbitMQ

使用场景:

  • 大规模分布式系统
  • 需要复杂路由规则
  • 已有 RabbitMQ 基础设施

配置:

php
// app/deshang/queue/config.php
'driver' => 'rabbitmq',

注意事项:

  • 无论使用哪种驱动,所有任务都会写入 sys_task_queue
  • DB 驱动无需推送队列,其他驱动需要先写入表再推送队列

最佳实践

1. 任务类型命名规范

推荐格式: {模块}{动作}Queue(PascalCase)

示例:

php
'OrderGenerateSalesIncQueue'        // 订单生成销量增加
'OrderCancelSalesDecQueue'         // 订单取消销量减少
'OrderPayUserPointsQueue'          // 订单支付增加积分
'OrderCancelUserPointsQueue'       // 订单取消扣除积分

不推荐:

php
'order_sales_inc'          // 不要使用下划线命名
'OrderSalesInc'            // 不要缺少 Queue 后缀

2. biz_key 设计规范

格式: {任务类型}_{业务标识}

示例:

php
// 单一业务标识
'OrderGenerateSalesIncQueue_' . $orderId

// 多个业务标识(用下划线连接)
'GoodsStockReleaseQueue_' . $orderId . '_' . $skuId

3. 优先级使用建议

优先级值枚举常量使用场景
0PRIORITY_NORMAL普通任务
1PRIORITY_HIGH重要任务(如订单处理,默认值)
2PRIORITY_URGENT紧急任务(如库存告警)

4. 延迟任务使用场景

  • 订单超时检查:延迟 30 分钟检查订单是否支付
  • 库存释放:延迟 10 分钟释放未支付订单的库存
  • 优惠券过期提醒:延迟到优惠券过期前 1 天发送提醒

5. 事务管理规范

重要:事务由 QueueConsumer 统一管理,Handler 无需自己管理事务。

php
// ✅ 正确:Handler 无需事务管理
public function handle(array $params): void
{
    // 业务逻辑...
    // 如果抛出异常,QueueConsumer 会自动回滚
}

// ❌ 错误:不要在 Handler 中管理事务
public function handle(array $params): void
{
    Db::startTrans();  // 不要这样做!
    // ...
}

6. 错误处理规范

  • 参数验证失败:直接抛出 CommonException,不重试
  • 业务逻辑失败:抛出 CommonException,自动重试
  • 系统异常:消费者自动捕获并重试

7. 性能优化建议

  • 批量处理:使用 setIncsetDec 等原子操作,避免逐条更新
  • 减少查询:一次查询获取所有需要的数据(如直接传递 order_goods_list
  • 索引优化:确保 biz_keyqueue_typestatusscheduled_at 有索引

8. 监控和告警

建议监控以下指标:

  • 任务积压:待处理任务数 > 1000 时告警
  • 失败率:失败任务数 / 总任务数 > 5% 时告警
  • 处理耗时:平均处理耗时 > 5 秒时告警
  • 重试率:重试次数 > 1 的任务占比 > 10% 时告警

常见问题

1. 任务重复执行怎么办?

原因:

  • biz_key 不唯一或未设置
  • 处理器未保证幂等性

解决方案:

  • 确保 biz_key 唯一且有意义(数据库唯一索引)
  • 在处理器中使用原子操作或状态标记保证幂等性

2. 任务一直处于处理中状态?

原因:

  • 消费者进程异常退出
  • 处理时间过长,超过定时任务执行间隔

解决方案:

  • 检查消费者进程是否正常运行
  • 优化处理器性能,减少处理时间
  • 增加超时机制,自动将超时任务重置为待处理

3. 任务失败后如何重试?

系统自动重试机制:

  • 重试次数:默认最多重试 3 次(可在创建任务时通过 max_retries 配置)
  • 重试策略:指数退避(60s, 120s, 240s)
  • 失败处理:超过最大重试次数后标记为失败(status=2)

手动重试:

sql
-- 将失败任务重置为待处理
UPDATE sys_task_queue 
SET status = 0, retry_count = 0, error_message = NULL 
WHERE status = 2 AND id = ?;

4. 如何查看任务执行情况?

sql
-- 查看待处理任务
SELECT * FROM sys_task_queue WHERE status = 0 ORDER BY priority DESC, scheduled_at ASC LIMIT 100;

-- 查看处理中任务
SELECT * FROM sys_task_queue WHERE status = 3;

-- 查看失败任务
SELECT * FROM sys_task_queue WHERE status = 2 ORDER BY update_at DESC LIMIT 100;

-- 查看任务统计
SELECT 
    status,
    COUNT(*) as count,
    AVG(consume_ms) as avg_ms,
    MAX(consume_ms) as max_ms
FROM sys_task_queue 
WHERE create_at >= UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 1 DAY))
GROUP BY status;

5. 如何清理历史任务?

sql
-- 删除 30 天前的已完成任务
DELETE FROM sys_task_queue 
WHERE status = 1 AND create_at < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 DAY));

-- 删除 7 天前的失败任务(需要先人工处理)
DELETE FROM sys_task_queue 
WHERE status = 2 AND update_at < UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 7 DAY));

6. 如何切换队列驱动?

  1. 修改 app/deshang/queue/config.php 中的 driver 配置
  2. 确保对应的中间件(Redis/RabbitMQ)已配置
  3. 重启消费者进程

7. 任务处理太慢怎么办?

优化方向:

  • 增加消费者数量(多个定时任务并行)
  • 优化处理器性能(减少数据库查询、使用批量操作)
  • 调整 batch 参数(增加单次处理数量)
  • 使用更快的队列驱动(Redis/RabbitMQ)

8. 如何保证高可用?

  • 多消费者部署:部署多个消费者进程,自动负载均衡
  • 监控告警:监控任务积压、失败率等指标
  • 自动恢复:失败任务自动重试,超过重试次数后人工介入

9. Handler 中是否需要管理事务?

不需要。 事务由 QueueConsumer 层统一管理:

  • Handler 抛出异常时,事务会自动回滚
  • Handler 成功执行后,markSuccess() 会在同一事务中更新状态
  • 这确保了业务逻辑和状态更新的原子性

错误示例:

php
// ❌ 错误:不要在 Handler 中管理事务
public function handle(array $params): void
{
    Db::startTrans();
    try {
        // ...
        Db::commit();
    } catch (\Throwable $e) {
        Db::rollback();
    }
}

正确示例:

php
// ✅ 正确:无需事务管理
public function handle(array $params): void
{
    // 业务逻辑...
    // 如果抛出异常,QueueConsumer 会自动回滚
}

总结

消息队列系统提供了完整的异步任务处理方案,支持多种队列驱动、自动重试、优先级控制等功能。在使用过程中,需要注意:

  1. 幂等性:确保处理器幂等,避免重复执行
  2. 异常处理:正确抛出 CommonException,让系统自动重试
  3. 事务管理:事务由 QueueConsumer 统一管理,Handler 无需自己管理事务
  4. 监控告警:及时监控任务执行情况,避免任务积压
  5. 批量接口:统一使用批量入队接口,简化代码维护

如有问题,请参考本文档或联系开发团队。


最后更新:2024-01-20
维护者:DSPlatform技术团队(德尚网络)

获取帮助

如果您在使用过程中遇到问题,可以通过以下方式获取帮助:

  • 官方网站https://www.csdeshang.com
  • 电话咨询:15364080101(微信同号)
  • QQ咨询:858761000
  • 邮箱咨询:858761000@qq.com
  • 工作时间:工作日 9:00-18:00
  • 微信咨询:扫码添加微信
微信二维码

版权所有 © 2014-至今 德尚网络