news 2026/6/16 17:06:29

【RabbitMQ】主题(Topics)与主题交换机(Topic Exchange)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【RabbitMQ】主题(Topics)与主题交换机(Topic Exchange)

一、理论部分

1. 主题交换机(Topic Exchange)简介

主题交换机是RabbitMQ中最灵活也是最强大的交换机类型。它结合了扇形交换机的广播能力和直连交换机的精确匹配能力,同时引入了模式匹配的概念。

主题交换机的工作方式:

消息仍然带有路由键(Routing Key),但路由键必须是由点号分隔的单词列表(如:usa.news、europe.weather.alert)。

队列通过绑定键(Binding Key) 绑定到交换机,绑定键也使用相同的点号分隔格式。

绑定键支持两种通配符进行模式匹配。

2. 通配符规则

主题交换机的强大之处在于绑定键支持通配符:

*(星号):匹配恰好一个单词

示例:*.orange.* 可以匹配 quick.orange.rabbit,但不能匹配 quick.orange.fox.jumps

#(井号):匹配零个或多个单词

示例:lazy.# 可以匹配 lazy、lazy.fox、lazy.brown.fox、lazy.pink.fox.jumps.over

3. 路由键格式最佳实践

路由键通常采用层次结构,便于模式匹配:

<facility>.<severity>:auth.info、kernel.error

<region>.<service>.<event>:usa.payment.success、europe.order.cancelled

<category>.<subcategory>.<action>:news.sports.update、weather.alert.severe

4. 使用场景

主题交换机适用于需要复杂、灵活的消息路由场景:

新闻订阅系统:用户可以根据兴趣订阅特定主题(如sports.*、*.finance)

物联网设备监控:按设备类型、地理位置、告警级别路由消息

微服务事件总线:基于事件类型和来源进行精细路由

二、实操部分:构建智能新闻分发系统

我们将构建一个新闻分发系统,其中:

生产者发送带有分类路由键的新闻消息

消费者可以根据兴趣订阅特定模式的新闻

第1步:创建项目

创建一个新的解决方案。

添加一个控制台应用程序项目作为生产者:EmitLogTopic。

添加多个消费者项目:

ReceiveNewsAll - 接收所有新闻

ReceiveSportsNews - 接收所有体育新闻

ReceiveUSNews - 接收所有美国新闻

ReceiveCriticalAlerts - 接收所有紧急警报

ReceiveWeatherUpdates - 接收所有天气更新

为所有项目添加RabbitMQ.Client NuGet包。

第2步:编写新闻生产者(EmitLogTopic.cs)

复制代码

using System.Text;

using RabbitMQ.Client;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())

using (var channel = connection.CreateModel())

{

// 声明主题交换机

channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);

// 路由键格式:<category>.<region>.<severity>

// 示例:news.usa.info, sports.europe.alert, weather.asia.critical

var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";

var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";

var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "topic_logs",

routingKey: routingKey,

basicProperties: null,

body: body);

Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");

}

Console.WriteLine(" Press [enter] to exit.");

Console.ReadLine();

复制代码

第3步:编写接收所有新闻的消费者(ReceiveNewsAll.cs)

复制代码

using System.Text;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())

using (var channel = connection.CreateModel())

{

channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);

var queueName = channel.QueueDeclare().QueueName;

// 使用 # 匹配所有消息

channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "#");

Console.WriteLine($" [*] Waiting for ALL news. Queue: {queueName}");

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>

{

var body = ea.Body.ToArray();

var message = Encoding.UTF8.GetString(body);

Console.WriteLine($" [x] [ALL] Received '{ea.RoutingKey}':'{message}'");

};

channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");

Console.ReadLine();

}

复制代码

第4步:编写接收体育新闻的消费者(ReceiveSportsNews.cs)

复制代码

using System.Text;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())

using (var channel = connection.CreateModel())

{

channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);

var queueName = channel.QueueDeclare().QueueName;

// 匹配所有体育相关的新闻:sports.*.*

channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "sports.#");

Console.WriteLine($" [*] Waiting for SPORTS news. Queue: {queueName}");

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>

{

var body = ea.Body.ToArray();

var message = Encoding.UTF8.GetString(body);

Console.WriteLine($" [x] [SPORTS] Received '{ea.RoutingKey}':'{message}'");

};

channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");

Console.ReadLine();

}

复制代码

第5步:编写接收美国新闻的消费者(ReceiveUSNews.cs)

复制代码

using System.Text;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())

using (var channel = connection.CreateModel())

{

channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);

var queueName = channel.QueueDeclare().QueueName;

// 匹配所有美国相关的新闻:*.usa.*

channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.usa.*");

Console.WriteLine($" [*] Waiting for USA news. Queue: {queueName}");

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>

{

var body = ea.Body.ToArray();

var message = Encoding.UTF8.GetString(body);

Console.WriteLine($" [x] [USA] Received '{ea.RoutingKey}':'{message}'");

};

channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");

Console.ReadLine();

}

复制代码

第6步:编写接收紧急警报的消费者(ReceiveCriticalAlerts.cs)

复制代码

using System.Text;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())

using (var channel = connection.CreateModel())

{

channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);

var queueName = channel.QueueDeclare().QueueName;

// 匹配所有紧急级别的消息:*.*.critical

channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.*.critical");

Console.WriteLine($" [*] Waiting for CRITICAL alerts. Queue: {queueName}");

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>

{

var body = ea.Body.ToArray();

var message = Encoding.UTF8.GetString(body);

Console.WriteLine($" [x] [CRITICAL] Received '{ea.RoutingKey}':'{message}'");

Console.WriteLine(" -> Sending emergency notification!");

};

channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");

Console.ReadLine();

}

复制代码

第7步:编写接收天气更新的消费者(ReceiveWeatherUpdates.cs)

复制代码

using System.Text;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())

using (var channel = connection.CreateModel())

{

channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);

var queueName = channel.QueueDeclare().QueueName;

// 匹配所有天气相关的更新:weather.*

// 一个队列可以绑定多个模式

channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "weather.#");

channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.alert"); // 也接收所有警报

Console.WriteLine($" [*] Waiting for WEATHER updates and ALERTS. Queue: {queueName}");

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>

{

var body = ea.Body.ToArray();

var message = Encoding.UTF8.GetString(body);

Console.WriteLine($" [x] [WEATHER/ALERT] Received '{ea.RoutingKey}':'{message}'");

};

channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");

Console.ReadLine();

}

复制代码

第8步:运行与演示

启动所有消费者

打开六个终端窗口,分别运行所有消费者程序。

发送各种类型的新闻消息

复制代码

cd EmitLogTopic

# 发送体育新闻

dotnet run "sports.usa.score" "Team USA wins gold medal"

dotnet run "sports.europe.update" "Champions League finals scheduled"

# 发送美国相关新闻

dotnet run "news.usa.politics" "Election results announced"

dotnet run "tech.usa.innovation" "Silicon Valley startup raises $10M"

# 发送紧急警报

dotnet run "weather.usa.critical" "Tornado warning for Midwest"

dotnet run "safety.europe.critical" "Security alert: System maintenance"

# 发送天气更新

dotnet run "weather.asia.update" "Monsoon season begins"

dotnet run "news.europe.alert" "Breaking: Major announcement"

# 发送其他消息

dotnet run "entertainment.hollywood.gossip" "Celebrity wedding announced"

复制代码

观察路由结果并分析模式匹配

消息路由键 ALL SPORTS USA CRITICAL WEATHER/ALERT

sports.usa.score ✅ ✅ ✅ ❌ ❌

sports.europe.update ✅ ✅ ❌ ❌ ❌

news.usa.politics ✅ ❌ ✅ ❌ ❌

tech.usa.innovation ✅ ❌ ✅ ❌ ❌

weather.usa.critical ✅ ❌ ✅ ✅ ✅

safety.europe.critical ✅ ❌ ❌ ✅ ✅ (*.alert)

weather.asia.update ✅ ❌ ❌ ❌ ✅

news.europe.alert ✅ ❌ ❌ ❌ ✅ (*.alert)

entertainment.hollywood.gossip ✅ ❌ ❌ ❌ ❌

测试复杂场景

发送 weather.alert.severe.critical - 观察哪些消费者能收到

发送 sports.alert - 测试多个模式的匹配

在管理后台查看绑定关系,理解通配符的实际效果

第9步:通配符规则详解示例

为了更好理解通配符,让我们看一些匹配示例:

绑定键 *.orange.* 的匹配情况:

✅ quick.orange.rabbit (匹配)

✅ lazy.orange.elephant (匹配)

❌ quick.orange.fox.lazy (不匹配 - 四个单词)

❌ orange (不匹配 - 只有一个单词)

❌ quick.brown.fox (不匹配 - 中间不是orange)

绑定键 lazy.# 的匹配情况:

✅ lazy (匹配)

✅ lazy.fox (匹配)

✅ lazy.brown.fox (匹配)

✅ lazy.pink.fox.jumps.over (匹配)

❌ quick.lazy.fox (不匹配 - 第一个单词不是lazy)

本章总结

在这一章中,我们深入学习了RabbitMQ中最强大的主题交换机,掌握了基于模式匹配的复杂消息路由:

主题交换机(Topic Exchange):理解了基于通配符的模式匹配路由机制。

通配符规则:掌握了*(匹配一个单词)和#(匹配零个或多个单词)的使用方法。

路由键设计:学习了使用点号分隔的层次化路由键设计最佳实践。

复杂路由场景:实现了支持多维度过滤的智能新闻分发系统。

多重模式绑定:掌握了单个队列绑定多个模式的高级用法。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/14 20:58:42

开源语音合成工具abogen:5分钟将电子书转为有声书

开源语音合成工具abogen&#xff1a;5分钟将电子书转为有声书 【免费下载链接】abogen Generate audiobooks from EPUBs, PDFs and text with synchronized captions. 项目地址: https://gitcode.com/GitHub_Trending/ab/abogen abogen是一款强大的开源语音合成工具&…

作者头像 李华
网站建设 2026/6/16 3:31:43

跨境电商独立站怎么做?新手卖家入门指南

一、独立站&#xff1a;自主电商的新起点独立站&#xff0c;即拥有独立域名与服务器&#xff0c;不依赖第三方平台的自建网站。企业或个人对其有完全控制权与自主权。搭建独立站好处多多&#xff1a;品牌塑造与自主性&#xff1a;能打造独特品牌形象&#xff0c;摆脱平台限制。…

作者头像 李华
网站建设 2026/6/15 14:32:34

6个宝藏图库,高清/免版权/风格全,直接拿走!

做设计/自媒体最头疼啥&#xff1f;——找不到又高清又能商用的图&#xff01;这6个网站是我压箱底的宝藏&#xff0c;免费付费都有&#xff0c;从此告别版权警告和渣画质&#xff01; 1、菜鸟图库 https://www.sucai999.com/pic.html?vNTYwNDUx 菜鸟图库除了设计类素材之外&…

作者头像 李华
网站建设 2026/6/14 1:48:41

素人推广没效果?可能是你没用到 “精准匹配” 的正确方式

最近和几位品牌市场负责人聊天&#xff0c;发现大家普遍有个困惑&#xff1a;明明投了上百个素人博主&#xff0c;笔记阅读量勉强破千&#xff0c;转化却连0.5%都不到。更扎心的是&#xff0c;有报告显示&#xff0c;63%的品牌在素人推广中遇到过「内容发了但没人买」的困境——…

作者头像 李华
网站建设 2026/6/13 17:20:14

ArcGIS大师之路500技---022空间校正

文章目录前言一、 数据准备1.1 展点1.2控制点二、 空间校正2.1 空间校正工具条2.2 控制点导入2.3 添加链接2.4 校正三、控制点选择技巧总结前言 空间校正&#xff08;Spatial Adjustment&#xff09;是ArcGIS中用于纠正空间数据位置误差的工具&#xff0c;通过将数据从一个坐标…

作者头像 李华
网站建设 2026/6/14 14:19:12

Transformer架构完全解析:从自注意力到文本分类实战

Transformer架构完全解析&#xff1a;从自注意力到文本分类实战掌握自然语言处理领域的革命性架构2017年&#xff0c;一篇名为《Attention Is All You Need》的论文彻底改变了自然语言处理领域的格局。这篇论文的核心思想非常简单&#xff1a;神经注意力机制本身就可以构建强大…

作者头像 李华