StupidBeauty
Read times:2480Posted at: - no title specified

RabbitMQ .NET教程翻译:主题,Topics

(使用.NET客户端)

前提

本教程假设RabbitMQ已经安装,并且在localhost的标准端口(5672)上运行。如果妳用的是别的主机名、端口号或认证信息,那么,连接设置会需要进行调整。

如何寻求帮助

如果妳在学习本教程时遇到困难,那么,可通过邮件列表来 与我们联系

前一个教程 中,我们改善了我们的日志系统。 我们不再使用只能盲目地进行广播的 fanout 交换机,而是使用了 direct 交换机 这样,我们就能够有选择性地接收日志了。

尽管,对于 direct 交换机的使用,改善了我们的日志系统,但是,它仍然有一些局限性——它无法根据多个条件来进行路由。

在我们的日志系统中, 有可能 ,我们并不仅仅想根据严重 性来订 阅日志消息,还希望根据 发送日志的来源来订阅日志消息。 妳可能之前在 syslog 这个unix 工具中了解过这种概念, 它会根据严重性 (info/warn/crit...) 和软件本身 (auth/cron/kern...) 来进行日志的路由。

那将会给我们带来巨大的灵活性——例如,我们可能希望,对于来自'cron'的日志,只监听其中的致命错误(critical errors)消息,而对于来自'kern'的日志,监听其所有消息。

为了在我们的日志系统中实现那种功能,我们需要学习一下更复杂的 topic 交换机。

主题(Topic)交换机

发往 topic 交换机的消息,不能任意使用路由键( routing_key )——它必须是由多个单词组成的列表,以小数点分隔。这些单词,可以是任何东西,但是,通常,它们会表达某种与消息本身相关的特性。例如,这些都是有效的路由键:"stock.usd.nyse""nyse.vmw""quick.orange.rabbit"。在路由键中,可以按照妳的喜好塞入尽可能多的单词,不过总的限制是255字节

绑定键,也必须是使用相同的形式。 topic 交换机背后的逻辑,与 direct 交换机的逻辑类似——带有特定路由键的消息,会被发送给所有使用与之相匹配的绑定键来绑定的队列。然而,对于绑定键,有两种重要的特殊情况:

  • •. * (星号) 可代替一个单词。

  • •. # (井号) 可代替0个或多个单词。

使用示例来解释最简单:

在这个示例中,我们将要发送一些用来描述动物的消息。这些消息在发送的时候,会带有一个由3个单词(两个小数点)组成的路由键。路由键中的第一个单词,用来描述速度,第二个单词,用来描述颜色,第三个单词,用来描述种族:"<speed>.<colour>.<species>"

我们创建了三个绑定:Q1的绑定键是"*.orange.*"Q2的绑定键是"*.*.rabbit""lazy.#"

以下以人类语言描述了这些绑定:

  • •.Q1对所有橙色的动物感兴趣。

  • •.Q2想要知道关于兔子的所有事,以及关于懒惰的动物的所有事。

一条带有路由键" quick.orange.rabbit "的消息,会被同时传递给这两条队列。消息"lazy.orange.elephant"同样会进入两条队列。然而,"quick.orange.fox"只会进入第一条队列,"lazy.brown.fox"只会进入第二条队列。"lazy.pink.rabbit"只会被向第二条队列传递一次,尽管它匹配了两个绑定。"quick.brown.fox"不匹配任何绑定,所以,会被直接忽略。

如果我们打破这个约定,发送出带有一个单词或四个单词路由键的消息,例如"orange"或者"quick.orange.male.rabbit",会怎么样?狠简单,这些消息不会匹配任何绑定,因而会被丢弃。

另一方面,"lazy.orange.male.rabbit"呢,虽然有着四个单词,但是,它匹配了最后一个绑定,因而会被传递到第二个队列中。

主题交换机

主题交换机,功能强大,可以做到其它交换机所做到的事。

如果某个队列是使用" # " (井号)这个绑定键来绑定的——那么,它会接收到所有的消息,并且会忽略掉路由键——就像 fanout 交换机那样。

如果绑定键中未使用特殊字符"*" (星号)"#" (井号),那么,主题交换机的行为会与 direct 相同。

将所有代码凑到一起

我们要在我们的日志系统中使用 topic 交换机了。我们先要做一个假设,即,日志消息的路由键,由两个单词组成:"<facility>.<severity>"

代码 前一个教程 中的代码几乎完全相同。

EmitLogTopic.cs 的源代码:

using System;

using System.Linq;

using RabbitMQ.Client;

using System.Text;

class EmitLogTopic

{

public static void Main ( string [] args )

{

var factory = new ConnectionFactory () { HostName = "localhost" };

using ( var connection = factory . CreateConnection ())

using ( var channel = connection . CreateModel ())

{

channel . ExchangeDeclare ( exchange : "topic_logs" ,

type : "topic" );

var routingKey = ( args . Length > 0 ) ? args [ 0 ] : "anonymous.info" ;

var message = ( args . Length > 1 )

? string . Join ( " " , args . Skip ( 1 ). ToArray ())

: "Hello World!" ;

var body = Encoding . UTF8 . GetBytes ( message );

channel . BasicPublish ( exchange : "topic_logs" ,

routingKey : routingKey ,

basicProperties : null ,

body : body );

Console . WriteLine ( " [x] Sent '{0}':'{1}'" , routingKey , message );

}

}

}

ReceiveLogsTopic.cs 的源代码:

using System;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Text;

class ReceiveLogsTopic

{

public static void Main ( string [] args )

{

var factory = new ConnectionFactory () { HostName = "localhost" };

using ( var connection = factory . CreateConnection ())

using ( var channel = connection . CreateModel ())

{

channel . ExchangeDeclare ( exchange : "topic_logs" , type : "topic" );

var queueName = channel . QueueDeclare (). QueueName ;

if ( args . Length < 1 )

{

Console . Error . WriteLine ( "Usage: {0} [binding_key...]" ,

Environment . GetCommandLineArgs ()[ 0 ]);

Console . WriteLine ( " Press [enter] to exit." );

Console . ReadLine ();

Environment . ExitCode = 1 ;

return ;

}

foreach ( var bindingKey in args )

{

channel . QueueBind ( queue : queueName ,

exchange : "topic_logs" ,

routingKey : bindingKey );

}

Console . WriteLine ( " [*] Waiting for messages. To exit press CTRL+C" );

var consumer = new EventingBasicConsumer ( channel );

consumer . Received += ( model , ea ) =>

{

var body = ea . Body ;

var message = Encoding . UTF8 . GetString ( body );

var routingKey = ea . RoutingKey ;

Console . WriteLine ( " [x] Received '{0}':'{1}'" ,

routingKey ,

message );

};

channel . BasicConsume ( queue : queueName ,

noAck : true ,

consumer : consumer );

Console . WriteLine ( " Press [enter] to exit." );

Console . ReadLine ();

}

}

}

运行以下示例:

以下命令用来接收所有日志:

$ ReceiveLogsTopic.exe "#"

接收来自"kern"程序的所有日志:

$ ReceiveLogsTopic.exe "kern.*"

或者,如果妳只想监听" critical "级别的日志的话:

$ ReceiveLogsTopic.exe "*.critical"

可以创建多个绑定:

$ ReceiveLogsTopic.exe "kern.*" "*.critical"

要想发送一条带有路由键" kern.critical "的日志消息,则执行:

$ EmitLogTopic.exe "kern.critical" "A critical kernel error"

请好好玩弄一下这些程序。注意,这些代码中,并不对路由键及绑定键做任何假设,妳可以试试使用多于两个路由键的参数。

某些值得研究的小问题:

  • •. " * "这个绑定键,会捕获到带有空白路由键的消息吗?

  • •."#.*",会捕获到以" .. "作为路由键的消息吗?会捕获到以单个单词作为路由键的消息吗?

  • •."a.*.#""a.#"有什么区别?

( EmitLogTopic.cs ReceiveLogsTopic.cs 的完整代码 )

下一步,在 教程6 中,学习如何来自发送消息,以实现一个远程过程调用。

女神

孟茜

Your opinions
Your name:Email:Website url:Opinion content:
- no title specified

HxLauncher: Launch Android applications by voice commands

 
Recent comments
2017年4月~2019年4月垃圾短信排行榜Posted at:Thu Sep 26 04:51:48 2024
Qt5.7文档翻译:QWebEngineCookieStore类,QWebEngineCookieStore ClassPosted at:Fri Aug 11 06:50:35 2023盲盒kill -9 18289 Grebe.20230517.211749.552.mp4