Redis+Lua脚本和RabbitMQ解决高并发秒杀类问题

在一般的一些高并发场景比如秒杀等,一般是按照如下路径:

前端请求->查数据库库存->扣减库存->生成记录

一旦并发量上来,数据库要接受几千甚至几万的冲击,很容易挂掉,或者出现”超卖“现象。

今天结合我的项目来使用Redis+Lua脚本结合RabbitMQ解决这个问题。

Redis+Lua脚本(原子性+防超卖)

当一堆用户在同一时间点击抢购或者报名时,如果请求全打到MySQL,不仅慢,还会因为事务并发控制导致死锁或超卖

我的思路是:把库存扣减和重复报名校验全部前置到Redis中。

为什么用Lua脚本?因为Redis执行Lua脚本是原子性的。你可以把“判断库存是否足够”、“判断用户是否已经报过名”、“扣减库存”、“记录用户已报名”这四个操作写在一个脚本里,Redis 会保证它们的原子性,中间绝不会有其他命令插队。

比如一个最简单的实现就是某个商品库存有100个,那么就在Redis里面预设100个库存进行检验。

引入RabbitMQ

Redis+Lua实现原子性,RabbitMQ则实现流量控制,保护数据库。

什么是RabbitMQ?

RabbitMQ就相当于一个“邮局”,RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),是用Erlang语言实现的。

流程是生产者消息发送给RabbitMQ而RabbitMQ会把这些消息保存起来,然后消费者从RabbitMQ里拿消息进行处理。

为什么用RabbitMQ?

为什么不用Redis List?RabbitMQ有以下优点

  • 异步响应:用户在Redis扣减成功之后,只需要把相关信息交给MQ就可以返回给前端报名成功速度很快
  • 稳定:RabbitMQ可以处理的消息很多,并且稳定
  • 可靠:RabbitMQ自带消息确认机制(ACK)持久化等特性,十分可靠能保证消息绝对不丢

怎么用?

docker一键安装

docker run -d \
  --name rabbitmq \ 
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin123 \
  --restart=always \
  rabbitmq:3-management
  • -p 5672:5672:这是给你 Java 代码连接用的端口。
  • -p 15672:15672:这是Web可视化控制台的端口

然后打开浏览器就可以访问了

http://127.0.0.1:15672 中间换成你自己的ip地址 输入账号密码就是上面你自己设置的。

1.定义 MQ 的交换机和队列

我们要先在 Spring Boot 里声明交换机(Exchange)、队列(Queue),并用路由键(Routing Key)把它们绑定起来。

@Configuration
public class ActivitySignupRabbitConfig {

    public static final String ACTIVITY_SIGNUP_EXCHANGE = "activity.signup.direct";
    public static final String ACTIVITY_SIGNUP_QUEUE = "activity.signup.queue";
    public static final String ACTIVITY_SIGNUP_ROUTING_KEY = "activity.signup.routing";

    // 1. 声明 Direct 交换机
    @Bean
    public DirectExchange activitySignupExchange() {
        return new DirectExchange(ACTIVITY_SIGNUP_EXCHANGE, true, false);
    }

    // 2. 声明队列(持久化)
    @Bean
    public Queue activitySignupQueue() {
        return new Queue(ACTIVITY_SIGNUP_QUEUE, true);
    }

    // 3. 将队列和交换机通过 RoutingKey 绑定
    @Bean
    public Binding activitySignupBinding() {
        return BindingBuilder.bind(activitySignupQueue())
                .to(activitySignupExchange())
                .with(ACTIVITY_SIGNUP_ROUTING_KEY);
    }
}

2.生产者发送消息

@Service
public class UserActivityServiceImpl implements UserActivityService {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // ... 其他依赖

    @Override
    public void signup(Long userId, Long id) {
        // 1. (省略) 执行 Lua 脚本,在 Redis 中进行库存扣减和防重校验
        // ...
        
        // 2. Redis 扣减成功后,将消息异步丢入 RabbitMQ
        try {
            // 构造消息体,这里简单拼接了 userId, activityId 和时间戳
            String payload = userId + ":" + id + ":" + System.currentTimeMillis();
            
            // 发送到 Direct 交换机,指定 Routing Key
            rabbitTemplate.convertAndSend(
                    ActivitySignupRabbitConfig.ACTIVITY_SIGNUP_EXCHANGE,
                    ActivitySignupRabbitConfig.ACTIVITY_SIGNUP_ROUTING_KEY,
                    payload
            );
            
            log.info("用户 {} 报名活动 {} 成功,已发送 MQ 排队", userId, id);
        } catch (Exception e) {
            log.error("发送 RabbitMQ 消息失败: userId={}, activityId={}", userId, id, e);
            // 这里可以做一些补偿机制,比如本地重试或记录异常日志
            throw new RuntimeException("报名排队异常,请稍后重试");
        }
    }
}

3.消费者监听消息并进行处理

@Component
@Slf4j
public class ActivitySignupStreamTask {

    @Autowired
    private UserActivityMapper userActivityMapper;

    // 监听指定的队列
    @RabbitListener(queues = ActivitySignupRabbitConfig.ACTIVITY_SIGNUP_QUEUE)
    public void receiveSignupMessage(String message) {
        try {
            log.info("MQ 消费者收到报名消息: {}", message);
            
            // 1. 解析消息内容
            String[] parts = message.split(":");
            Long userId = Long.parseLong(parts[0]);
            Long activityId = Long.parseLong(parts[1]);

            // 2. 构建实体并插入 MySQL 数据库
            UserActivity userActivity = new UserActivity();
            userActivity.setUserId(userId);
            userActivity.setActivityId(activityId);
            userActivity.setSignupTime(LocalDateTime.now());
            userActivity.setStatus(1); // 1: 报名成功

            // 这里最好在数据库层面给 (userId, activityId) 加一个唯一索引,做最终的兜底防重!
            userActivityMapper.insert(userActivity);
            
            log.info("用户 {} 报名活动 {} 异步落库成功!", userId, activityId);
            
        } catch (Exception e) {
            log.error("MQ 消费报名消息异常,消息内容: {}", message, e);
            // 生产环境中,这里如果消费失败,可以考虑拒绝消息并丢入死信队列 (Dead Letter Exchange) 人工排查
        }
    }
}

总结

使用Redis+Lua解决原子性再使用RabbitMQ解决异步削峰减轻数据库的压力是一般”超卖“类问题的较好实践。

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇