Skip to content

高并发一致性设计文档

概述

业务场景

DSPlatform 电商系统在高并发场景下面临的主要挑战:

  • 库存扣减:多用户同时购买同一商品
  • 余额操作:用户余额、商户余额、分销余额的并发修改
  • 积分成长值:订单支付后批量增加积分和成长值
  • 数据一致性:确保余额、库存等关键数据的一致性
  • 系统可用性:99.9%可用性,故障自动恢复

设计目标

  • 强一致性:关键业务数据保证强一致性
  • 高可用性:系统故障不影响核心业务
  • 高性能:支持高并发,低延迟响应
  • 可扩展性:支持水平扩展

架构设计

分层保护机制

系统采用四层保护机制,确保高并发场景下的数据一致性:

┌─────────────────────────────────────────────────────────┐
│                    业务层                                │
│  (订单支付、库存扣减、余额操作)                           │
└────────────────────┬────────────────────────────────────┘

┌────────────────────▼────────────────────────────────────┐
│              第一层:分布式锁                             │
│  (减少并发冲突,防止重复处理)                             │
│  实现:KvManager::lock()                                 │
└────────────────────┬────────────────────────────────────┘

┌────────────────────▼────────────────────────────────────┐
│              第二层:数据库事务                           │
│  (保证单次操作的原子性)                                   │
│  实现:Db::startTrans()                                  │
└────────────────────┬────────────────────────────────────┘

┌────────────────────▼────────────────────────────────────┐
│              第三层:乐观锁                               │
│  (版本号机制,确保数据一致性)                             │
│  实现:version 字段 + WHERE 条件检查                     │
└────────────────────┬────────────────────────────────────┘

┌────────────────────▼────────────────────────────────────┐
│              异步层:消息队列                             │
│  (异步处理非关键路径,提升响应速度)                       │
│  实现:QueueProducer/QueueConsumer                       │
└─────────────────────────────────────────────────────────┘

核心组件

  1. 分布式锁(LockService)

    • 基于 Redis 的原子操作(SET NX EX)
    • 支持 File 驱动降级处理
    • 防止同一资源被多个请求同时处理
  2. 数据库事务

    • MySQL 事务(ACID)
    • 保证单次操作的原子性
    • 所有写操作都在事务内
  3. 乐观锁

    • version 字段(int类型,默认0)
    • WHERE 条件检查版本号
    • 版本冲突自动重试(最多3次)
  4. 消息队列

    • 统一队列系统(支持 DB/Redis/RabbitMQ)
    • 按用户ID分组处理,避免同一用户的任务并发
    • 事务统一管理,确保业务逻辑和状态更新的一致性

技术实现

1. 分布式锁实现

核心代码(用户余额修改示例):

php
// app/deshang/service/user/DeshangUserBalanceService.php

public function modifyUserBalance($data)
{
    // ========== 第一层:分布式锁(减少并发冲突)==========
    $lockKey = sprintf(LockKeyManager::LOCK_USER_BALANCE_KEY, $data['user_id']);
    $lockValue = KvManager::lock()->acquire($lockKey, 5);
    if (!$lockValue) {
        throw new SystemException('余额更新失败,系统繁忙,请稍后重试');
    }
    
    try {
        // 业务逻辑(包含乐观锁)
        // ...
    } finally {
        // 释放分布式锁
        KvManager::lock()->release($lockValue, $lockKey);
    }
}

性能指标

  • 获取锁耗时:< 5ms(Redis)
  • 支持并发:> 10,000 QPS

2. 乐观锁实现

核心代码(用户余额修改示例):

php
private function tryModifyUserBalance($data)
{
    // 读取数据(包含 version)
    $user_info = (new UserDao())->getUserInfoById(
        $data['user_id'], 
        'id,balance,balance_in,balance_out,version'
    );
    
    // 计算新余额
    $after_balance = $data['change_mode'] == 1 
        ? $user_info['balance'] + $data['change_amount'] 
        : $user_info['balance'] - $data['change_amount'];
    
    // 使用条件更新,确保更新的是读取时的余额和版本号(双重验证)
    $affectedRows = (new UserDao())->updateUser(
        [
            ['id', '=', $data['user_id']],
            ['balance', '=', $user_info['balance']],  // 条件1:余额必须等于读取时的值
            ['version', '=', $user_info['version']]   // 条件2:版本号必须等于读取时的值
        ],
        [
            'balance' => $after_balance,
            'version' => $user_info['version'] + 1    // 版本号+1
        ]
    );
    
    // 如果影响行数为0,则表示更新失败(版本冲突)
    return $affectedRows > 0;
}

重试机制

php
// 在分布式锁内使用乐观锁重试机制
$maxRetries = 3;
$retryCount = 0;

while ($retryCount < $maxRetries) {
    $result = $this->tryModifyUserBalance($data);
    if ($result) {
        return true;
    }
    $retryCount++;
    
    // 指数退避延迟(避免惊群效应)
    if ($retryCount < $maxRetries) {
        ds_retry_delay($retryCount); // 第1次约5ms,第2次约10ms,第3次约20ms
    }
}

throw new SystemException('余额更新失败,版本冲突,已重试' . $maxRetries . '次', 409);

性能指标

  • 版本冲突率:< 1%(在分布式锁保护下)
  • 重试次数:最多3次
  • 重试延迟:指数退避(5ms, 10ms, 20ms)

3. 消息队列实现

核心代码(订单支付后增加积分):

php
// app/listener/order/OrderPaySuccessListener.php

public function payGetPoints($order_info)
{
    // 检查是否开启支付获取积分功能
    $points_pay_enabled = sysConfig('points:points_pay_enabled');
    if ($points_pay_enabled != 1) {
        return;
    }

    // 使用消息队列异步处理
    (new QueueProducer())->enqueue([
        [
            'type' => 'OrderPayUserPointsQueue',
            'data' => [
                'order_info' => $order_info,
            ],
            'options' => [
                'biz_key' => 'OrderPayUserPointsQueue_' . $order_info['id'],
                'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
                'priority' => 2,
            ],
        ],
    ]);
}

消费者处理(按用户ID分组,避免并发):

php
// app/deshang/queue/core/QueueConsumer.php

public function consume(int $batch = 200): void
{
    $rawList = $this->driver->popBatch($batch);
    
    // 按用户ID分组(避免同一用户的任务并发处理)
    $tasksByUser = [];
    $tasksWithoutUser = [];
    
    foreach ($rawList as $raw) {
        $task = json_decode($raw, true) ?: [];
        $userId = $this->extractUserId($task);
        
        if ($userId > 0) {
            $tasksByUser[$userId][] = $raw;
        } else {
            $tasksWithoutUser[] = $raw;
        }
    }
    
    // 按用户串行处理(同一用户的任务不会并发,避免版本冲突)
    foreach ($tasksByUser as $userId => $tasks) {
        foreach ($tasks as $raw) {
            $this->processTask($raw, $handlerMap);
        }
    }
}

性能指标

  • 入队耗时:< 10ms
  • 处理吞吐量:> 500 任务/秒
  • 任务延迟:< 30秒(定时任务间隔)

使用场景

场景1:用户余额操作

适用场景:用户充值、提现、支付、退款等余额操作

实现方式

  1. 分布式锁:防止同一用户并发操作
  2. 乐观锁:确保余额数据一致性
  3. 数据库事务:保证操作原子性

代码位置app/deshang/service/user/DeshangUserBalanceService.php

场景2:订单支付

适用场景:订单支付、防止重复支付

实现方式

  1. 分布式锁:防止重复支付
  2. 数据库事务:保证支付状态更新原子性

代码位置app/api/service/trade/TradePayService.php

场景3:积分成长值增加

适用场景:订单支付后增加积分、成长值

实现方式

  1. 消息队列:异步处理,提升响应速度
  2. 按用户ID分组:避免同一用户的任务并发处理
  3. 数据库事务:保证业务逻辑和状态更新的一致性

代码位置

  • 生产者:app/listener/order/OrderPaySuccessListener.php
  • 消费者:app/deshang/queue/handler/order/OrderPayUserPointsQueue.php

场景4:商品销量统计

适用场景:订单生成后增加销量、订单取消后减少销量

实现方式

  1. 消息队列:异步处理,不影响订单创建性能
  2. 数据库事务:保证销量更新原子性

代码位置

  • 生产者:app/listener/order/OrderGenerateListener.php
  • 消费者:app/deshang/queue/handler/order/OrderGenerateSalesIncQueue.php

最佳实践

1. 何时使用分布式锁

推荐使用

  • 用户余额操作
  • 商户余额操作
  • 库存扣减
  • 订单状态修改
  • 支付处理

不推荐使用

  • 只读操作
  • 幂等操作(已有 biz_key 保护)
  • 低并发场景

2. 何时使用乐观锁

推荐使用

  • 读多写少的场景
  • 数据一致性要求高的场景
  • 配合分布式锁使用

不推荐使用

  • 写多读少的场景(冲突率高)
  • 不需要强一致性的场景

3. 何时使用消息队列

推荐使用

  • 非关键路径的异步处理
  • 批量操作(如销量统计)
  • 需要解耦的业务逻辑
  • 提升响应速度的场景

不推荐使用

  • 需要立即返回结果的场景
  • 关键路径的业务逻辑(如库存扣减)

4. 代码规范

分布式锁使用规范

php
// ✅ 正确:使用 try-finally 确保释放锁
$lockValue = KvManager::lock()->acquire($lockKey, 5);
if (!$lockValue) {
    throw new SystemException('系统繁忙,请稍后重试');
}

try {
    // 业务逻辑
} finally {
    KvManager::lock()->release($lockValue, $lockKey);
}

// ❌ 错误:忘记释放锁
$lockValue = KvManager::lock()->acquire($lockKey, 5);
// 业务逻辑(没有释放锁)

乐观锁使用规范

php
// ✅ 正确:WHERE 条件包含业务字段和版本号
$affectedRows = $dao->updateUser(
    [
        ['id', '=', $userId],
        ['balance', '=', $oldBalance],  // 业务字段验证
        ['version', '=', $oldVersion]   // 版本号验证
    ],
    [
        'balance' => $newBalance,
        'version' => $oldVersion + 1
    ]
);

// ❌ 错误:只验证版本号,不验证业务字段
$affectedRows = $dao->updateUser(
    [['id', '=', $userId], ['version', '=', $oldVersion]],
    ['balance' => $newBalance, 'version' => $oldVersion + 1]
);

监控告警

关键指标

  • 分布式锁获取失败率:> 1% 时告警
  • 乐观锁版本冲突率:> 5% 时告警
  • 消息队列积压:> 1000 任务时告警
  • 数据库事务失败率:> 0.1% 时告警

监控SQL

sql
-- 查看消息队列积压
SELECT COUNT(*) as pending_count 
FROM sys_task_queue 
WHERE status = 0;

-- 查看消息队列处理中
SELECT COUNT(*) as processing_count 
FROM sys_task_queue 
WHERE status = 3;

-- 查看消息队列失败率
SELECT 
    COUNT(*) as total,
    SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as failed,
    ROUND(SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as failure_rate
FROM sys_task_queue 
WHERE create_at >= UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 1 HOUR));

后台也可以查看任务处理情况


故障处理

1. 分布式锁故障

症状:锁获取失败率突然升高

处理步骤

  1. 检查 Redis 连接状态
  2. 检查 Redis 内存使用率
  3. 检查网络延迟
  4. 临时增加锁超时时间

2. 乐观锁冲突

症状:版本冲突率突然升高

处理步骤

  1. 检查是否有慢查询
  2. 检查数据库连接池
  3. 临时增加重试次数
  4. 考虑使用悲观锁

3. 消息队列积压

症状:待处理任务数 > 1000

处理步骤

  1. 增加消费者数量
  2. 优化处理器性能
  3. 检查是否有死循环
  4. 临时提高处理批次大小

相关文档


最后更新:2024-01-20
维护者:DSPlatform技术团队(德尚网络)

获取帮助

如果您在使用过程中遇到问题,可以通过以下方式获取帮助:

  • 官方网站https://www.csdeshang.com
  • 电话咨询:15364080101(微信同号)
  • QQ咨询:858761000
  • 邮箱咨询:858761000@qq.com
  • 工作时间:工作日 9:00-18:00
  • 微信咨询:扫码添加微信
微信二维码

版权所有 © 2014-至今 德尚网络