肿圬后 发表于 2025-9-26 17:07:16

【RabbitMQ】主题(Topics)与主题交换机(Topic Exchange)

本章目标


[*]理解主题交换机(Topic Exchange)的强大路由能力。
[*]掌握通配符*和#的使用规则。
[*]学习基于模式匹配的复杂消息路由。
[*]实现一个支持多维度过滤的智能消息系统。
一、理论部分

1. 主题交换机(Topic Exchange)简介

主题交换机是RabbitMQ中最灵活也是最强大的交换机类型。它结合了扇形交换机的广播能力和直连交换机的精确匹配能力,同时引入了模式匹配的概念。
主题交换机的工作方式:

[*]消息仍然带有路由键(Routing Key),但路由键必须是由点号分隔的单词列表(如:usa.news、europe.weather.alert)。
[*]队列通过绑定键(Binding Key) 绑定到交换机,绑定键也使用相同的点号分隔格式。
[*]绑定键支持两种通配符进行模式匹配。
2. 通配符规则

主题交换机的强大之处在于绑定键支持通配符:

[*]*(星号):匹配恰好一个单词

[*]示例:*.orange.* 可以匹配 quick.orange.rabbit,但不能匹配 quick.orange.fox.jumps

[*]#(井号):匹配零个或多个单词

[*]示例:lazy.# 可以匹配 lazy、lazy.fox、lazy.brown.fox、lazy.pink.fox.jumps.over

3. 路由键格式最佳实践

路由键通常采用层次结构,便于模式匹配:

[*].:auth.info、kernel.error
[*]..:usa.payment.success、europe.order.cancelled
[*]..:news.sports.update、weather.alert.severe
4. 使用场景

主题交换机适用于需要复杂、灵活的消息路由场景:

[*]新闻订阅系统:用户可以根据兴趣订阅特定主题(如sports.*、*.finance)
[*]物联网设备监控:按设备类型、地理位置、告警级别路由消息
[*]微服务事件总线:基于事件类型和来源进行精细路由
二、实操部分:构建智能新闻分发系统

我们将构建一个新闻分发系统,其中:

[*]生产者发送带有分类路由键的新闻消息
[*]消费者可以根据兴趣订阅特定模式的新闻
第1步:创建项目


[*]创建一个新的解决方案。
[*]添加一个控制台应用程序项目作为生产者:EmitLogTopic。
[*]添加多个消费者项目:

[*]ReceiveNewsAll - 接收所有新闻
[*]ReceiveSportsNews - 接收所有体育新闻
[*]ReceiveUSNews - 接收所有美国新闻
[*]ReceiveCriticalAlerts - 接收所有紧急警报
[*]ReceiveWeatherUpdates - 接收所有天气更新

[*]为所有项目添加RabbitMQ.Client NuGet包。
第2步:编写新闻生产者(EmitLogTopic.cs)

using System.Text;
using RabbitMQ.Client;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    // 声明主题交换机
    channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);

    // 路由键格式:<category>.<region>.<severity>
    // 示例:news.usa.info, sports.europe.alert, weather.asia.critical
    var routingKey = (args.Length > 0) ? args : "anonymous.info";
    var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
    var body = Encoding.UTF8.GetBytes(message);

    channel.BasicPublish(exchange: "topic_logs",
                         routingKey: routingKey,
                         basicProperties: null,
                         body: body);

    Console.WriteLine($" Sent '{routingKey}':'{message}'");
}

Console.WriteLine(" Press to exit.");
Console.ReadLine();第3步:编写接收所有新闻的消费者(ReceiveNewsAll.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
    var queueName = channel.QueueDeclare().QueueName;

    // 使用 # 匹配所有消息
    channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "#");

    Console.WriteLine($" [*] Waiting for ALL news. Queue: {queueName}");

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
      var body = ea.Body.ToArray();
      var message = Encoding.UTF8.GetString(body);
      Console.WriteLine($" Received '{ea.RoutingKey}':'{message}'");
    };

    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    Console.WriteLine(" Press to exit.");
    Console.ReadLine();
}第4步:编写接收体育新闻的消费者(ReceiveSportsNews.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
    var queueName = channel.QueueDeclare().QueueName;

    // 匹配所有体育相关的新闻:sports.*.*
    channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "sports.#");

    Console.WriteLine($" [*] Waiting for SPORTS news. Queue: {queueName}");

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
      var body = ea.Body.ToArray();
      var message = Encoding.UTF8.GetString(body);
      Console.WriteLine($" Received '{ea.RoutingKey}':'{message}'");
    };

    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    Console.WriteLine(" Press to exit.");
    Console.ReadLine();
}第5步:编写接收美国新闻的消费者(ReceiveUSNews.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
    var queueName = channel.QueueDeclare().QueueName;

    // 匹配所有美国相关的新闻:*.usa.*
    channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.usa.*");

    Console.WriteLine($" [*] Waiting for USA news. Queue: {queueName}");

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
      var body = ea.Body.ToArray();
      var message = Encoding.UTF8.GetString(body);
      Console.WriteLine($" Received '{ea.RoutingKey}':'{message}'");
    };

    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    Console.WriteLine(" Press to exit.");
    Console.ReadLine();
}第6步:编写接收紧急警报的消费者(ReceiveCriticalAlerts.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
    var queueName = channel.QueueDeclare().QueueName;

    // 匹配所有紧急级别的消息:*.*.critical
    channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.*.critical");

    Console.WriteLine($" [*] Waiting for CRITICAL alerts. Queue: {queueName}");

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
      var body = ea.Body.ToArray();
      var message = Encoding.UTF8.GetString(body);
      Console.WriteLine($" Received '{ea.RoutingKey}':'{message}'");
      Console.WriteLine("    -> Sending emergency notification!");
    };

    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    Console.WriteLine(" Press to exit.");
    Console.ReadLine();
}第7步:编写接收天气更新的消费者(ReceiveWeatherUpdates.cs)

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

var factory = new ConnectionFactory() { HostName = "localhost", UserName = "myuser", Password = "mypassword" };

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "topic_logs", type: ExchangeType.Topic);
    var queueName = channel.QueueDeclare().QueueName;

    // 匹配所有天气相关的更新:weather.*
    // 一个队列可以绑定多个模式
    channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "weather.#");
    channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: "*.alert"); // 也接收所有警报

    Console.WriteLine($" [*] Waiting for WEATHER updates and ALERTS. Queue: {queueName}");

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
      var body = ea.Body.ToArray();
      var message = Encoding.UTF8.GetString(body);
      Console.WriteLine($" Received '{ea.RoutingKey}':'{message}'");
    };

    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    Console.WriteLine(" Press to exit.");
    Console.ReadLine();
}第8步:运行与演示


[*]启动所有消费者
打开六个终端窗口,分别运行所有消费者程序。
[*]发送各种类型的新闻消息
cd EmitLogTopic

# 发送体育新闻
dotnet run "sports.usa.score" "Team USA wins gold medal"
dotnet run "sports.europe.update" "Champions League finals scheduled"

# 发送美国相关新闻
dotnet run "news.usa.politics" "Election results announced"
dotnet run "tech.usa.innovation" "Silicon Valley startup raises $10M"

# 发送紧急警报
dotnet run "weather.usa.critical" "Tornado warning for Midwest"
dotnet run "safety.europe.critical" "Security alert: System maintenance"

# 发送天气更新
dotnet run "weather.asia.update" "Monsoon season begins"
dotnet run "news.europe.alert" "Breaking: Major announcement"

# 发送其他消息
dotnet run "entertainment.hollywood.gossip" "Celebrity wedding announced"
[*]观察路由结果并分析模式匹配
消息路由键ALLSPORTSUSACRITICALWEATHER/ALERTsports.usa.score✅✅✅❌❌sports.europe.update✅✅❌❌❌news.usa.politics✅❌✅❌❌tech.usa.innovation✅❌✅❌❌weather.usa.critical✅❌✅✅✅safety.europe.critical✅❌❌✅✅ (*.alert)weather.asia.update✅❌❌❌✅news.europe.alert✅❌❌❌✅ (*.alert)entertainment.hollywood.gossip✅❌❌❌❌
[*]测试复杂场景

[*]发送 weather.alert.severe.critical - 观察哪些消费者能收到
[*]发送 sports.alert - 测试多个模式的匹配
[*]在管理后台查看绑定关系,理解通配符的实际效果

第9步:通配符规则详解示例

为了更好理解通配符,让我们看一些匹配示例:
绑定键 *.orange.* 的匹配情况:

[*]✅ quick.orange.rabbit (匹配)
[*]✅ lazy.orange.elephant (匹配)
[*]❌ quick.orange.fox.lazy (不匹配 - 四个单词)
[*]❌ orange (不匹配 - 只有一个单词)
[*]❌ quick.brown.fox (不匹配 - 中间不是orange)
绑定键 lazy.# 的匹配情况:

[*]✅ lazy (匹配)
[*]✅ lazy.fox (匹配)
[*]✅ lazy.brown.fox (匹配)
[*]✅ lazy.pink.fox.jumps.over (匹配)
[*]❌ quick.lazy.fox (不匹配 - 第一个单词不是lazy)
本章总结

在这一章中,我们深入学习了RabbitMQ中最强大的主题交换机,掌握了基于模式匹配的复杂消息路由:

[*]主题交换机(Topic Exchange):理解了基于通配符的模式匹配路由机制。
[*]通配符规则:掌握了*(匹配一个单词)和#(匹配零个或多个单词)的使用方法。
[*]路由键设计:学习了使用点号分隔的层次化路由键设计最佳实践。
[*]复杂路由场景:实现了支持多维度过滤的智能新闻分发系统。
[*]多重模式绑定:掌握了单个队列绑定多个模式的高级用法。
主题交换机提供了无与伦比的灵活性,是构建复杂事件驱动系统的理想选择。在下一章,我们将转向另一个重要主题:消息可靠性保障,学习如何确保消息在复杂的分布式环境中绝不丢失。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 【RabbitMQ】主题(Topics)与主题交换机(Topic Exchange)