河池环江网站建设,太原网站排名公司哪家好,网站 业务范围,手机怎样制作链接前言
2024年初#xff0c;我们的订单系统经常出现超时问题。用户下单后#xff0c;系统需要同时调用库存服务、支付服务、通知服务#xff0c;任何一个服务慢都会导致整个请求超时。
我们决定引入消息队列#xff0c;将同步调用改为异步处理。这个改造带来了…前言2024年初我们的订单系统经常出现超时问题。用户下单后系统需要同时调用库存服务、支付服务、通知服务任何一个服务慢都会导致整个请求超时。我们决定引入消息队列将同步调用改为异步处理。这个改造带来了显著的性能提升。一、问题同步调用的瓶颈原始的订单流程是这样的pythonapp.route(/api/orders, methods[POST]) def create_order(): # 1. 创建订单 order Order.create(request.json) # 2. 同步调用库存服务 inventory_response requests.post( http://inventory-service/deduct, json{product_id: order.product_id, quantity: order.quantity} ) if inventory_response.status_code ! 200: return {error: 库存不足}, 400 # 3. 同步调用支付服务 payment_response requests.post( http://payment-service/pay, json{order_id: order.id, amount: order.amount} ) if payment_response.status_code ! 200: return {error: 支付失败}, 400 # 4. 同步调用通知服务 notify_response requests.post( http://notify-service/send, json{order_id: order.id, type: order_created} ) return {order_id: order.id}, 201问题任何一个服务慢都会导致整个请求慢任何一个服务故障都会导致订单创建失败耦合度太高难以扩展。性能数据库存服务200ms支付服务300ms通知服务150ms总耗时200 300 150 650ms二、解决方案引入RabbitMQ我们选择RabbitMQ作为消息队列。改造后的流程2.1 发布订单创建事件pythonimport pika import json def create_order(): # 1. 创建订单 order Order.create(request.json) # 2. 发布事件到消息队列 connection pika.BlockingConnection(pika.ConnectionParameters(rabbitmq)) channel connection.channel() # 声明交换机和队列 channel.exchange_declare(exchangeorders, exchange_typetopic) # 发布消息 message { order_id: order.id, product_id: order.product_id, quantity: order.quantity, amount: order.amount } channel.basic_publish( exchangeorders, routing_keyorder.created, bodyjson.dumps(message) ) connection.close() # 立即返回响应 return {order_id: order.id}, 201耗时仅需10ms发布到队列2.2 消费者库存服务pythondef inventory_consumer(): connection pika.BlockingConnection(pika.ConnectionParameters(rabbitmq)) channel connection.channel() channel.exchange_declare(exchangeorders, exchange_typetopic) result channel.queue_declare(queueinventory_queue, durableTrue) queue_name result.method.queue # 绑定队列到交换机 channel.queue_bind( exchangeorders, queuequeue_name, routing_keyorder.created ) def callback(ch, method, properties, body): message json.loads(body) try: # 扣减库存 deduct_inventory( message[product_id], message[quantity] ) # 确认消息 ch.basic_ack(delivery_tagmethod.delivery_tag) except Exception as e: # 拒绝消息重新入队 ch.basic_nack(delivery_tagmethod.delivery_tag, requeueTrue) channel.basic_consume( queuequeue_name, on_message_callbackcallback ) print(库存服务已启动等待消息...) channel.start_consuming() if __name__ __main__: inventory_consumer()2.3 消费者支付服务pythondef payment_consumer(): connection pika.BlockingConnection(pika.ConnectionParameters(rabbitmq)) channel connection.channel() channel.exchange_declare(exchangeorders, exchange_typetopic) result channel.queue_declare(queuepayment_queue, durableTrue) queue_name result.method.queue channel.queue_bind( exchangeorders, queuequeue_name, routing_keyorder.created ) def callback(ch, method, properties, body): message json.loads(body) try: # 处理支付 process_payment( message[order_id], message[amount] ) ch.basic_ack(delivery_tagmethod.delivery_tag) except Exception as e: ch.basic_nack(delivery_tagmethod.delivery_tag, requeueTrue) channel.basic_consume( queuequeue_name, on_message_callbackcallback ) print(支付服务已启动等待消息...) channel.start_consuming()2.4 消费者通知服务pythondef notify_consumer(): connection pika.BlockingConnection(pika.ConnectionParameters(rabbitmq)) channel connection.channel() channel.exchange_declare(exchangeorders, exchange_typetopic) result channel.queue_declare(queuenotify_queue, durableTrue) queue_name result.method.queue channel.queue_bind( exchangeorders, queuequeue_name, routing_keyorder.created ) def callback(ch, method, properties, body): message json.loads(body) try: # 发送通知 send_notification( message[order_id], order_created ) ch.basic_ack(delivery_tagmethod.delivery_tag) except Exception as e: ch.basic_nack(delivery_tagmethod.delivery_tag, requeueTrue) channel.basic_consume( queuequeue_name, on_message_callbackcallback ) print(通知服务已启动等待消息...) channel.start_consuming()三、可靠性保证3.1 消息持久化python# 声明持久化队列 channel.queue_declare( queuepayment_queue, durableTrue # 队列持久化 ) # 发布持久化消息 channel.basic_publish( exchangeorders, routing_keyorder.created, bodyjson.dumps(message), propertiespika.BasicProperties( delivery_mode2 # 消息持久化 ) )3.2 消息确认机制pythonCopy code# 手动确认消息 def callback(ch, method, properties, body): try: process_message(body) ch.basic_ack(delivery_tagmethod.delivery_tag) # 确认 except Exception as e: ch.basic_nack(delivery_tagmethod.delivery_tag, requeueTrue) # 拒绝并重新入队 # 禁用自动确认 channel.basic_consume( queuequeue_name, on_message_callbackcallback, auto_ackFalse # 手动确认 )3.3 死信队列python# 声明死信交换机 channel.exchange_declare(exchangedlx, exchange_typedirect) channel.queue_declare(queuedead_letter_queue, durableTrue) channel.queue_bind(exchangedlx, queuedead_letter_queue) # 声明普通队列指定死信交换机 channel.queue_declare( queuepayment_queue, durableTrue, arguments{ x-dead-letter-exchange: dlx, x-dead-letter-routing-key: dead_letter } )四、监控和告警pythonimport logging logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) def callback(ch, method, properties, body): start_time time.time() try: process_message(body) duration time.time() - start_time logger.info(f消息处理成功, 耗时: {duration}ms) ch.basic_ack(delivery_tagmethod.delivery_tag) except Exception as e: logger.error(f消息处理失败: {str(e)}) ch.basic_nack(delivery_tagmethod.delivery_tag, requeueTrue)五、国际化团队的挑战在跨国团队中消息队列的错误日志和告警需要支持多语言。我们使用同言翻译Transync AI来自动翻译消息队列的错误信息和监控告警确保不同语言背景的团队成员能够快速理解问题并做出响应。六、性能对比指标同步调用异步消息队列提升平均响应时间650ms10ms-98.5%P99响应时间2000ms50ms-97.5%系统吞吐量1000 req/s10000 req/s900%故障隔离否是-七、最佳实践幂等性设计消费者应该能够安全地处理重复消息超时设置为消息处理设置合理的超时时间监控队列深度及时发现消费者处理不过来的情况分离关注点生产者和消费者应该解耦定期审查定期检查死信队列找出问题消息。八、结语消息队列的引入从根本上改变了我们的系统架构。从同步的紧耦合到异步的松耦合系统的可扩展性和可靠性都得到了显著提升。如果你的系统也在经历性能瓶颈消息队列可能是一个很好的解决方案。