江苏省建设工程注册中心网站,网站模板整站资源,wordpress 获取页面链接,水土保持与生态建设网站近有球友问我#xff1a;MQ的使用场景有哪些#xff1f;工作中一定要使用MQ吗#xff1f;记得刚工作那会儿#xff0c;我总是想不明白#xff1a;为什么明明直接调用接口就能完成的功能#xff0c;非要引入MQ这么个中间商#xff1f;直到经历了系统崩溃、数…近有球友问我MQ的使用场景有哪些工作中一定要使用MQ吗记得刚工作那会儿我总是想不明白为什么明明直接调用接口就能完成的功能非要引入MQ这么个中间商直到经历了系统崩溃、数据丢失、性能瓶颈等一系列问题后我才真正理解了MQ的价值。今天我想和大家分享我在实际工作中使用消息队列MQ的10种典型场景希望对你会有所帮助。加苏三的工作内推群一、为什么需要消息队列MQ在深入具体场景之前我们先来思考一个基本问题为什么要使用消息队列系统间的直接调用image引入消息队列后image接下来我们将通过10个具体场景带大家来深入理解MQ的价值。场景一系统解耦背景描述在我早期参与的一个电商项目中订单创建后需要通知多个系统// 早期的紧耦合设计public class OrderService {private InventoryService inventoryService;private PointsService pointsService;private EmailService emailService;private AnalyticsService analyticsService;public void createOrder(Order order) {// 1. 保存订单orderDao.save(order);// 2. 调用库存服务inventoryService.updateInventory(order);// 3. 调用积分服务pointsService.addPoints(order.getUserId(), order.getAmount());// 4. 发送邮件通知emailService.sendOrderConfirmation(order);// 5. 记录分析数据analyticsService.trackOrderCreated(order);// 更多服务...}}这种架构存在严重问题紧耦合订单服务需要知道所有下游服务单点故障任何一个下游服务挂掉都会导致订单创建失败性能瓶颈同步调用导致响应时间慢MQ解决方案引入MQ后架构变为image代码实现// 订单服务 - 生产者Servicepublic class OrderService {Autowiredprivate RabbitTemplate rabbitTemplate;public void createOrder(Order order) {// 1. 保存订单orderDao.save(order);// 2. 发送消息到MQrabbitTemplate.convertAndSend(order.exchange,order.created,new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount()));}}// 库存服务 - 消费者ComponentRabbitListener(queues inventory.queue)public class InventoryConsumer {Autowiredprivate InventoryService inventoryService;RabbitHandlerpublic void handleOrderCreated(OrderCreatedEvent event) {inventoryService.updateInventory(event.getOrderId());}}技术要点消息协议选择根据业务需求选择RabbitMQ、Kafka或RocketMQ消息格式使用JSON或Protobuf等跨语言格式错误处理实现重试机制和死信队列场景二异步处理背景描述用户上传视频后需要执行转码、生成缩略图、内容审核等耗时操作如果同步处理用户需要等待很长时间。MQ解决方案// 视频服务 - 生产者Servicepublic class VideoService {Autowiredprivate KafkaTemplateString, Object kafkaTemplate;public UploadResponse uploadVideo(MultipartFile file, String userId) {// 1. 保存原始视频String videoId saveOriginalVideo(file);// 2. 发送处理消息kafkaTemplate.send(video-processing, new VideoProcessingEvent(videoId, userId));// 3. 立即返回响应return new UploadResponse(videoId, upload_success);}}// 视频处理服务 - 消费者Servicepublic class VideoProcessingConsumer {KafkaListener(topics video-processing)public void processVideo(VideoProcessingEvent event) {// 异步执行耗时操作videoProcessor.transcode(event.getVideoId());videoProcessor.generateThumbnails(event.getVideoId());contentModerationService.checkContent(event.getVideoId());// 发送处理完成通知notificationService.notifyUser(event.getUserId(), event.getVideoId());}}架构优势快速响应用户上传后立即得到响应弹性扩展可以根据处理压力动态调整消费者数量故障隔离处理服务故障不会影响上传功能场景三流量削峰背景描述电商秒杀活动时瞬时流量可能是平时的百倍以上直接冲击数据库和服务。MQ解决方案image代码实现// 秒杀服务Servicepublic class SecKillService {Autowiredprivate RedisTemplateString, Object redisTemplate;Autowiredprivate RabbitTemplate rabbitTemplate;public SecKillResponse secKill(SecKillRequest request) {// 1. 校验用户资格if (!checkUserQualification(request.getUserId())) {return SecKillResponse.failed(用户无资格);}// 2. 预减库存Redis原子操作Long remaining redisTemplate.opsForValue().decrement(sec_kill_stock: request.getItemId());if (remaining null || remaining 0) {// 库存不足恢复库存redisTemplate.opsForValue().increment(sec_kill_stock: request.getItemId());return SecKillResponse.failed(库存不足);}// 3. 发送秒杀成功消息到MQrabbitTemplate.convertAndSend(sec_kill.exchange,sec_kill.success,new SecKillSuccessEvent(request.getUserId(), request.getItemId()));return SecKillResponse.success(秒杀成功);}}// 订单处理消费者ComponentRabbitListener(queues sec_kill.order.queue)public class SecKillOrderConsumer {RabbitHandlerpublic void handleSecKillSuccess(SecKillSuccessEvent event) {// 异步创建订单orderService.createSecKillOrder(event.getUserId(), event.getItemId());}}技术要点库存预扣使用Redis原子操作避免超卖队列缓冲MQ缓冲请求避免直接冲击数据库限流控制在网关层进行限流拒绝过多请求场景四数据同步背景描述在微服务架构中不同服务有自己的数据库需要保证数据一致性。MQ解决方案// 用户服务 - 数据变更时发送消息Servicepublic class UserService {Transactionalpublic User updateUser(User user) {// 1. 更新数据库userDao.update(user);// 2. 发送消息在事务内rocketMQTemplate.sendMessageInTransaction(user-update-topic,MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus())).build(),null);return user;}}// 其他服务 - 消费用户更新消息ServiceRocketMQMessageListener(topic user-update-topic, consumerGroup order-group)public class UserUpdateConsumer implements RocketMQListenerUserUpdateEvent {Overridepublic void onMessage(UserUpdateEvent event) {// 更新本地用户信息缓存orderService.updateUserCache(event.getUserId(), event.getStatus());}}一致性保证本地事务表将消息和业务数据放在同一个数据库事务中事务消息使用RocketMQ的事务消息机制幂等消费消费者实现幂等性避免重复处理场景五日志收集背景描述分布式系统中日志分散在各个节点需要集中收集和分析。MQ解决方案image代码实现// 日志收集组件Componentpublic class LogCollector {Autowiredprivate KafkaTemplateString, String kafkaTemplate;public void collectLog(String appId, String level, String message, MapString, Object context) {LogEvent logEvent new LogEvent(appId, level, message, context, System.currentTimeMillis());// 发送到KafkakafkaTemplate.send(app-logs, appId, JsonUtils.toJson(logEvent));}}// 日志消费者Servicepublic class LogConsumer {KafkaListener(topics app-logs, groupId log-es)public void consumeLog(String message) {LogEvent logEvent JsonUtils.fromJson(message, LogEvent.class);// 存储到ElasticsearchelasticsearchService.indexLog(logEvent);// 实时监控检查if (ERROR.equals(logEvent.getLevel())) {alertService.checkAndAlert(logEvent);}}}技术优势解耦应用节点无需关心日志如何处理缓冲应对日志产生速率波动多消费同一份日志可以被多个消费者处理场景六消息广播背景描述系统配置更新后需要通知所有服务节点更新本地配置。MQ解决方案// 配置服务 - 广播配置更新Servicepublic class ConfigService {Autowiredprivate RedisTemplateString, Object redisTemplate;public void updateConfig(String configKey, String configValue) {// 1. 更新配置存储configDao.updateConfig(configKey, configValue);// 2. 广播配置更新消息redisTemplate.convertAndSend(config-update-channel,new ConfigUpdateEvent(configKey, configValue));}}// 服务节点 - 订阅配置更新Componentpublic class ConfigUpdateListener {Autowiredprivate LocalConfigCache localConfigCache;RedisListener(channel config-update-channel)public void handleConfigUpdate(ConfigUpdateEvent event) {// 更新本地配置缓存localConfigCache.updateConfig(event.getKey(), event.getValue());}}应用场景功能开关动态开启或关闭功能参数调整调整超时时间、限流阈值等黑白名单更新黑白名单配置场景七顺序消息背景描述在某些业务场景中消息的处理顺序很重要如订单状态变更。MQ解决方案// 订单状态变更服务Servicepublic class OrderStateService {Autowiredprivate RocketMQTemplate rocketMQTemplate;public void changeOrderState(String orderId, String oldState, String newState) {OrderStateEvent event new OrderStateEvent(orderId, oldState, newState);// 发送顺序消息使用orderId作为sharding keyrocketMQTemplate.syncSendOrderly(order-state-topic,event,orderId // 保证同一订单的消息按顺序处理);}}// 订单状态消费者ServiceRocketMQMessageListener(topic order-state-topic,consumerGroup order-state-group,consumeMode ConsumeMode.ORDERLY // 顺序消费)public class OrderStateConsumer implements RocketMQListenerOrderStateEvent {Overridepublic void onMessage(OrderStateEvent event) {// 按顺序处理订单状态变更orderService.processStateChange(event);}}顺序保证机制分区顺序同一分区内的消息保证顺序顺序投递MQ保证消息按发送顺序投递顺序处理消费者顺序处理消息场景八延迟消息背景描述需要实现定时任务如订单超时未支付自动取消。MQ解决方案// 订单服务 - 发送延迟消息Servicepublic class OrderService {Autowiredprivate RabbitTemplate rabbitTemplate;public void createOrder(Order order) {// 保存订单orderDao.save(order);// 发送延迟消息30分钟后检查支付状态rabbitTemplate.convertAndSend(order.delay.exchange,order.create,new OrderCreateEvent(order.getId()),message - {message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分钟return message;});}}// 订单超时检查消费者ComponentRabbitListener(queues order.delay.queue)public class OrderTimeoutConsumer {RabbitHandlerpublic void checkOrderPayment(OrderCreateEvent event) {Order order orderDao.findById(event.getOrderId());if (UNPAID.equals(order.getStatus())) {// 超时未支付取消订单orderService.cancelOrder(order.getId(), 超时未支付);}}}替代方案对比方案 优点 缺点数据库轮询 实现简单 实时性差数据库压力大延时队列 实时性好 实现复杂消息堆积问题定时任务 可控性强 分布式协调复杂