news 2026/6/16 20:49:20

消息队列设计:从同步到异步的性能突破

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
消息队列设计:从同步到异步的性能突破

前言

2024年初,我们的订单系统经常出现"超时"问题。用户下单后,系统需要同时调用库存服务、支付服务、通知服务,任何一个服务慢都会导致整个请求超时。

我们决定引入消息队列,将同步调用改为异步处理。这个改造带来了显著的性能提升。


一、问题:同步调用的瓶颈

原始的订单流程是这样的:

python

@app.route('/api/orders', methods=['POST']) def create_order(): # 1. 创建订单 order = Order.create(request.json) # 2. 同步调用库存服务 inventory_response = requests.post( 'http://inventory-service/deduct', json={'product_id': order.product_id, 'quantity': order.quantity} ) if inventory_response.status_code != 200: return {"error": "库存不足"}, 400 # 3. 同步调用支付服务 payment_response = requests.post( 'http://payment-service/pay', json={'order_id': order.id, 'amount': order.amount} ) if payment_response.status_code != 200: return {"error": "支付失败"}, 400 # 4. 同步调用通知服务 notify_response = requests.post( 'http://notify-service/send', json={'order_id': order.id, 'type': 'order_created'} ) return {"order_id": order.id}, 201

问题

  • 任何一个服务慢都会导致整个请求慢;
  • 任何一个服务故障都会导致订单创建失败;
  • 耦合度太高,难以扩展。

性能数据

  • 库存服务:200ms
  • 支付服务:300ms
  • 通知服务:150ms
  • 总耗时:200 + 300 + 150 =650ms

二、解决方案:引入RabbitMQ

我们选择RabbitMQ作为消息队列。改造后的流程:

2.1 发布订单创建事件

python

import pika import json def create_order(): # 1. 创建订单 order = Order.create(request.json) # 2. 发布事件到消息队列 connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() # 声明交换机和队列 channel.exchange_declare(exchange='orders', exchange_type='topic') # 发布消息 message = { 'order_id': order.id, 'product_id': order.product_id, 'quantity': order.quantity, 'amount': order.amount } channel.basic_publish( exchange='orders', routing_key='order.created', body=json.dumps(message) ) connection.close() # 立即返回响应 return {"order_id": order.id}, 201

耗时:仅需10ms(发布到队列)

2.2 消费者:库存服务

python

def inventory_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='inventory_queue', durable=True) queue_name = result.method.queue # 绑定队列到交换机 channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 扣减库存 deduct_inventory( message['product_id'], message['quantity'] ) # 确认消息 ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: # 拒绝消息,重新入队 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('库存服务已启动,等待消息...') channel.start_consuming() if __name__ == '__main__': inventory_consumer()

2.3 消费者:支付服务

python

def payment_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='payment_queue', durable=True) queue_name = result.method.queue channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 处理支付 process_payment( message['order_id'], message['amount'] ) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('支付服务已启动,等待消息...') channel.start_consuming()

2.4 消费者:通知服务

python

def notify_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='notify_queue', durable=True) queue_name = result.method.queue channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 发送通知 send_notification( message['order_id'], 'order_created' ) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('通知服务已启动,等待消息...') channel.start_consuming()


三、可靠性保证

3.1 消息持久化

python

# 声明持久化队列 channel.queue_declare( queue='payment_queue', durable=True # 队列持久化 ) # 发布持久化消息 channel.basic_publish( exchange='orders', routing_key='order.created', body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2 # 消息持久化 ) )

3.2 消息确认机制

python

Copy code

# 手动确认消息 def callback(ch, method, properties, body): try: process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) # 确认 except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # 拒绝并重新入队 # 禁用自动确认 channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=False # 手动确认 )

3.3 死信队列

python

# 声明死信交换机 channel.exchange_declare(exchange='dlx', exchange_type='direct') channel.queue_declare(queue='dead_letter_queue', durable=True) channel.queue_bind(exchange='dlx', queue='dead_letter_queue') # 声明普通队列,指定死信交换机 channel.queue_declare( queue='payment_queue', durable=True, arguments={ 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'dead_letter' } )


四、监控和告警

python

import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def callback(ch, method, properties, body): start_time = time.time() try: process_message(body) duration = time.time() - start_time logger.info(f"消息处理成功, 耗时: {duration}ms") ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"消息处理失败: {str(e)}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)


五、国际化团队的挑战

在跨国团队中,消息队列的错误日志和告警需要支持多语言。我们使用同言翻译(Transync AI)来自动翻译消息队列的错误信息和监控告警,确保不同语言背景的团队成员能够快速理解问题并做出响应。


六、性能对比

指标同步调用异步消息队列提升
平均响应时间650ms10ms-98.5%
P99响应时间2000ms50ms-97.5%
系统吞吐量1000 req/s10000 req/s+900%
故障隔离-

七、最佳实践

  1. 幂等性设计:消费者应该能够安全地处理重复消息;
  2. 超时设置:为消息处理设置合理的超时时间;
  3. 监控队列深度:及时发现消费者处理不过来的情况;
  4. 分离关注点:生产者和消费者应该解耦;
  5. 定期审查:定期检查死信队列,找出问题消息。

八、结语

消息队列的引入,从根本上改变了我们的系统架构。从同步的紧耦合,到异步的松耦合,系统的可扩展性和可靠性都得到了显著提升。

如果你的系统也在经历性能瓶颈,消息队列可能是一个很好的解决方案。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/16 17:11:07

java_base_面向对象原理省流

每日一学:基础知识精讲ok,终于也是要写完了,真是令人激动。话不多说有请今天的主角——封装(Encapsulation),依然用邮轮运输的场景讲透核心逻辑。封装的核心:“打包 控权”其本质可以总结两个方…

作者头像 李华
网站建设 2026/6/15 15:24:54

Git 开发常用命令速查手册

文章目录git命令开发最常用的核心命令一、基础配置(首次使用必做)二、仓库初始化和克隆1. 本地新建仓库2. 克隆远程仓库三、工作区或暂存区操作1. 查看状态(最常用!随时确认文件变更)2. 添加文件到暂存区(提…

作者头像 李华
网站建设 2026/6/16 1:02:28

如快(sofast)

链接:https://pan.quark.cn/s/06a8b6a75c81如快Sofast是一款致力于提升办公效率的启动器工具,通过简洁易用的操作界面和强大的功能,帮助用户快速访问应用程序和文件,提高工作效率。该工具支持中文拼音模糊搜索,方便用户…

作者头像 李华
网站建设 2026/6/14 14:59:41

Java EE 应用与 Spring MVC简介

什么是JavaEE(j2ee)?javaee是一套用于开发大型企业级应用的标准和规范。它定义了组件模型和各种API。JavaEE的分层模型JavaEE通常采用多层架构,将应用的关注点清晰地分离,我们这里介绍的层级划分为:Domain …

作者头像 李华
网站建设 2026/6/14 11:11:00

Buck Boost Buck-Boost

Buck 电感接在输出端&#xff0c;通过控制开关管的通断时间&#xff08;占空比D&#xff09;&#xff0c;来控制电感中存储的能量。当开关管导通时输入电压给电感储能&#xff0c;开关管关断后&#xff0c;电感释放能量到输出端&#xff0c;由于占空比D<1&#xff0c;达到降…

作者头像 李华
网站建设 2026/6/16 16:43:00

springboot基于vue的仓库管理系统 前端前台_g3y27qk1

目录已开发项目效果实现截图开发技术系统开发工具&#xff1a;核心代码参考示例1.建立用户稀疏矩阵&#xff0c;用于用户相似度计算【相似度矩阵】2.计算目标用户与其他用户的相似度系统测试总结源码文档获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&…

作者头像 李华