Appearance
高并发一致性设计文档
概述
业务场景
DSPlatform 电商系统在高并发场景下面临的主要挑战:
- 库存扣减:多用户同时购买同一商品
- 余额操作:用户余额、商户余额、分销余额的并发修改
- 积分成长值:订单支付后批量增加积分和成长值
- 数据一致性:确保余额、库存等关键数据的一致性
- 系统可用性:99.9%可用性,故障自动恢复
设计目标
- ✅ 强一致性:关键业务数据保证强一致性
- ✅ 高可用性:系统故障不影响核心业务
- ✅ 高性能:支持高并发,低延迟响应
- ✅ 可扩展性:支持水平扩展
架构设计
分层保护机制
系统采用四层保护机制,确保高并发场景下的数据一致性:
┌─────────────────────────────────────────────────────────┐
│ 业务层 │
│ (订单支付、库存扣减、余额操作) │
└────────────────────┬────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────┐
│ 第一层:分布式锁 │
│ (减少并发冲突,防止重复处理) │
│ 实现:KvManager::lock() │
└────────────────────┬────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────┐
│ 第二层:数据库事务 │
│ (保证单次操作的原子性) │
│ 实现:Db::startTrans() │
└────────────────────┬────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────┐
│ 第三层:乐观锁 │
│ (版本号机制,确保数据一致性) │
│ 实现:version 字段 + WHERE 条件检查 │
└────────────────────┬────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────┐
│ 异步层:消息队列 │
│ (异步处理非关键路径,提升响应速度) │
│ 实现:QueueProducer/QueueConsumer │
└─────────────────────────────────────────────────────────┘核心组件
分布式锁(LockService)
- 基于 Redis 的原子操作(SET NX EX)
- 支持 File 驱动降级处理
- 防止同一资源被多个请求同时处理
数据库事务
- MySQL 事务(ACID)
- 保证单次操作的原子性
- 所有写操作都在事务内
乐观锁
- version 字段(int类型,默认0)
- WHERE 条件检查版本号
- 版本冲突自动重试(最多3次)
消息队列
- 统一队列系统(支持 DB/Redis/RabbitMQ)
- 按用户ID分组处理,避免同一用户的任务并发
- 事务统一管理,确保业务逻辑和状态更新的一致性
技术实现
1. 分布式锁实现
核心代码(用户余额修改示例):
php
// app/deshang/service/user/DeshangUserBalanceService.php
public function modifyUserBalance($data)
{
// ========== 第一层:分布式锁(减少并发冲突)==========
$lockKey = sprintf(LockKeyManager::LOCK_USER_BALANCE_KEY, $data['user_id']);
$lockValue = KvManager::lock()->acquire($lockKey, 5);
if (!$lockValue) {
throw new SystemException('余额更新失败,系统繁忙,请稍后重试');
}
try {
// 业务逻辑(包含乐观锁)
// ...
} finally {
// 释放分布式锁
KvManager::lock()->release($lockValue, $lockKey);
}
}性能指标:
- 获取锁耗时:< 5ms(Redis)
- 支持并发:> 10,000 QPS
2. 乐观锁实现
核心代码(用户余额修改示例):
php
private function tryModifyUserBalance($data)
{
// 读取数据(包含 version)
$user_info = (new UserDao())->getUserInfoById(
$data['user_id'],
'id,balance,balance_in,balance_out,version'
);
// 计算新余额
$after_balance = $data['change_mode'] == 1
? $user_info['balance'] + $data['change_amount']
: $user_info['balance'] - $data['change_amount'];
// 使用条件更新,确保更新的是读取时的余额和版本号(双重验证)
$affectedRows = (new UserDao())->updateUser(
[
['id', '=', $data['user_id']],
['balance', '=', $user_info['balance']], // 条件1:余额必须等于读取时的值
['version', '=', $user_info['version']] // 条件2:版本号必须等于读取时的值
],
[
'balance' => $after_balance,
'version' => $user_info['version'] + 1 // 版本号+1
]
);
// 如果影响行数为0,则表示更新失败(版本冲突)
return $affectedRows > 0;
}重试机制:
php
// 在分布式锁内使用乐观锁重试机制
$maxRetries = 3;
$retryCount = 0;
while ($retryCount < $maxRetries) {
$result = $this->tryModifyUserBalance($data);
if ($result) {
return true;
}
$retryCount++;
// 指数退避延迟(避免惊群效应)
if ($retryCount < $maxRetries) {
ds_retry_delay($retryCount); // 第1次约5ms,第2次约10ms,第3次约20ms
}
}
throw new SystemException('余额更新失败,版本冲突,已重试' . $maxRetries . '次', 409);性能指标:
- 版本冲突率:< 1%(在分布式锁保护下)
- 重试次数:最多3次
- 重试延迟:指数退避(5ms, 10ms, 20ms)
3. 消息队列实现
核心代码(订单支付后增加积分):
php
// app/listener/order/OrderPaySuccessListener.php
public function payGetPoints($order_info)
{
// 检查是否开启支付获取积分功能
$points_pay_enabled = sysConfig('points:points_pay_enabled');
if ($points_pay_enabled != 1) {
return;
}
// 使用消息队列异步处理
(new QueueProducer())->enqueue([
[
'type' => 'OrderPayUserPointsQueue',
'data' => [
'order_info' => $order_info,
],
'options' => [
'biz_key' => 'OrderPayUserPointsQueue_' . $order_info['id'],
'queue_group' => SysTaskQueueEnum::GROUP_ORDER,
'priority' => 2,
],
],
]);
}消费者处理(按用户ID分组,避免并发):
php
// app/deshang/queue/core/QueueConsumer.php
public function consume(int $batch = 200): void
{
$rawList = $this->driver->popBatch($batch);
// 按用户ID分组(避免同一用户的任务并发处理)
$tasksByUser = [];
$tasksWithoutUser = [];
foreach ($rawList as $raw) {
$task = json_decode($raw, true) ?: [];
$userId = $this->extractUserId($task);
if ($userId > 0) {
$tasksByUser[$userId][] = $raw;
} else {
$tasksWithoutUser[] = $raw;
}
}
// 按用户串行处理(同一用户的任务不会并发,避免版本冲突)
foreach ($tasksByUser as $userId => $tasks) {
foreach ($tasks as $raw) {
$this->processTask($raw, $handlerMap);
}
}
}性能指标:
- 入队耗时:< 10ms
- 处理吞吐量:> 500 任务/秒
- 任务延迟:< 30秒(定时任务间隔)
使用场景
场景1:用户余额操作
适用场景:用户充值、提现、支付、退款等余额操作
实现方式:
- 分布式锁:防止同一用户并发操作
- 乐观锁:确保余额数据一致性
- 数据库事务:保证操作原子性
代码位置:app/deshang/service/user/DeshangUserBalanceService.php
场景2:订单支付
适用场景:订单支付、防止重复支付
实现方式:
- 分布式锁:防止重复支付
- 数据库事务:保证支付状态更新原子性
代码位置:app/api/service/trade/TradePayService.php
场景3:积分成长值增加
适用场景:订单支付后增加积分、成长值
实现方式:
- 消息队列:异步处理,提升响应速度
- 按用户ID分组:避免同一用户的任务并发处理
- 数据库事务:保证业务逻辑和状态更新的一致性
代码位置:
- 生产者:
app/listener/order/OrderPaySuccessListener.php - 消费者:
app/deshang/queue/handler/order/OrderPayUserPointsQueue.php
场景4:商品销量统计
适用场景:订单生成后增加销量、订单取消后减少销量
实现方式:
- 消息队列:异步处理,不影响订单创建性能
- 数据库事务:保证销量更新原子性
代码位置:
- 生产者:
app/listener/order/OrderGenerateListener.php - 消费者:
app/deshang/queue/handler/order/OrderGenerateSalesIncQueue.php
最佳实践
1. 何时使用分布式锁
✅ 推荐使用:
- 用户余额操作
- 商户余额操作
- 库存扣减
- 订单状态修改
- 支付处理
❌ 不推荐使用:
- 只读操作
- 幂等操作(已有 biz_key 保护)
- 低并发场景
2. 何时使用乐观锁
✅ 推荐使用:
- 读多写少的场景
- 数据一致性要求高的场景
- 配合分布式锁使用
❌ 不推荐使用:
- 写多读少的场景(冲突率高)
- 不需要强一致性的场景
3. 何时使用消息队列
✅ 推荐使用:
- 非关键路径的异步处理
- 批量操作(如销量统计)
- 需要解耦的业务逻辑
- 提升响应速度的场景
❌ 不推荐使用:
- 需要立即返回结果的场景
- 关键路径的业务逻辑(如库存扣减)
4. 代码规范
分布式锁使用规范:
php
// ✅ 正确:使用 try-finally 确保释放锁
$lockValue = KvManager::lock()->acquire($lockKey, 5);
if (!$lockValue) {
throw new SystemException('系统繁忙,请稍后重试');
}
try {
// 业务逻辑
} finally {
KvManager::lock()->release($lockValue, $lockKey);
}
// ❌ 错误:忘记释放锁
$lockValue = KvManager::lock()->acquire($lockKey, 5);
// 业务逻辑(没有释放锁)乐观锁使用规范:
php
// ✅ 正确:WHERE 条件包含业务字段和版本号
$affectedRows = $dao->updateUser(
[
['id', '=', $userId],
['balance', '=', $oldBalance], // 业务字段验证
['version', '=', $oldVersion] // 版本号验证
],
[
'balance' => $newBalance,
'version' => $oldVersion + 1
]
);
// ❌ 错误:只验证版本号,不验证业务字段
$affectedRows = $dao->updateUser(
[['id', '=', $userId], ['version', '=', $oldVersion]],
['balance' => $newBalance, 'version' => $oldVersion + 1]
);监控告警
关键指标
- 分布式锁获取失败率:> 1% 时告警
- 乐观锁版本冲突率:> 5% 时告警
- 消息队列积压:> 1000 任务时告警
- 数据库事务失败率:> 0.1% 时告警
监控SQL
sql
-- 查看消息队列积压
SELECT COUNT(*) as pending_count
FROM sys_task_queue
WHERE status = 0;
-- 查看消息队列处理中
SELECT COUNT(*) as processing_count
FROM sys_task_queue
WHERE status = 3;
-- 查看消息队列失败率
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as failed,
ROUND(SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as failure_rate
FROM sys_task_queue
WHERE create_at >= UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 1 HOUR));后台也可以查看任务处理情况
故障处理
1. 分布式锁故障
症状:锁获取失败率突然升高
处理步骤:
- 检查 Redis 连接状态
- 检查 Redis 内存使用率
- 检查网络延迟
- 临时增加锁超时时间
2. 乐观锁冲突
症状:版本冲突率突然升高
处理步骤:
- 检查是否有慢查询
- 检查数据库连接池
- 临时增加重试次数
- 考虑使用悲观锁
3. 消息队列积压
症状:待处理任务数 > 1000
处理步骤:
- 增加消费者数量
- 优化处理器性能
- 检查是否有死循环
- 临时提高处理批次大小
相关文档
最后更新:2024-01-20
维护者:DSPlatform技术团队(德尚网络)
获取帮助
如果您在使用过程中遇到问题,可以通过以下方式获取帮助:
- 官方网站:https://www.csdeshang.com
- 电话咨询:15364080101(微信同号)
- QQ咨询:858761000
- 邮箱咨询:858761000@qq.com
- 工作时间:工作日 9:00-18:00
- 微信咨询:扫码添加微信

版权所有 © 2014-至今 德尚网络