RabbitMQ .NET教程翻译:主题,Topics
本教程假设RabbitMQ已经安装,并且在localhost的标准端口(5672)上运行。如果妳用的是别的主机名、端口号或认证信息,那么,连接设置会需要进行调整。
如果妳在学习本教程时遇到困难,那么,可通过邮件列表来 与我们联系 。
在 前一个教程 中,我们改善了我们的日志系统。 我们不再使用只能盲目地进行广播的 fanout 交换机,而是使用了 direct 交换机 , 这样,我们就能够有选择性地接收日志了。
尽管,对于 direct 交换机的使用,改善了我们的日志系统,但是,它仍然有一些局限性——它无法根据多个条件来进行路由。
在我们的日志系统中, 有可能 ,我们并不仅仅想根据严重 性来订 阅日志消息,还希望根据 发送日志的来源来订阅日志消息。 妳可能之前在 syslog 这个unix 工具中了解过这种概念, 它会根据严重性 (info/warn/crit...) 和软件本身 (auth/cron/kern...) 来进行日志的路由。
那将会给我们带来巨大的灵活性——例如,我们可能希望,对于来自'cron'的日志,只监听其中的致命错误(critical errors)消息,而对于来自'kern'的日志,监听其所有消息。
为了在我们的日志系统中实现那种功能,我们需要学习一下更复杂的 topic 交换机。
发往 topic 交换机的消息,不能任意使用路由键( routing_key )——它必须是由多个单词组成的列表,以小数点分隔。这些单词,可以是任何东西,但是,通常,它们会表达某种与消息本身相关的特性。例如,这些都是有效的路由键:"stock.usd.nyse"、"nyse.vmw"、"quick.orange.rabbit"。在路由键中,可以按照妳的喜好塞入尽可能多的单词,不过总的限制是255字节。
绑定键,也必须是使用相同的形式。 topic 交换机背后的逻辑,与 direct 交换机的逻辑类似——带有特定路由键的消息,会被发送给所有使用与之相匹配的绑定键来绑定的队列。然而,对于绑定键,有两种重要的特殊情况:
•. * (星号) 可代替一个单词。
•. # (井号) 可代替0个或多个单词。
使用示例来解释最简单:
在这个示例中,我们将要发送一些用来描述动物的消息。这些消息在发送的时候,会带有一个由3个单词(两个小数点)组成的路由键。路由键中的第一个单词,用来描述速度,第二个单词,用来描述颜色,第三个单词,用来描述种族:"<speed>.<colour>.<species>"。
我们创建了三个绑定:Q1的绑定键是"*.orange.*",Q2的绑定键是"*.*.rabbit"和"lazy.#"。
以下以人类语言描述了这些绑定:
•.Q1对所有橙色的动物感兴趣。
•.Q2想要知道关于兔子的所有事,以及关于懒惰的动物的所有事。
一条带有路由键" quick.orange.rabbit "的消息,会被同时传递给这两条队列。消息"lazy.orange.elephant"同样会进入两条队列。然而,"quick.orange.fox"只会进入第一条队列,"lazy.brown.fox"只会进入第二条队列。"lazy.pink.rabbit"只会被向第二条队列传递一次,尽管它匹配了两个绑定。"quick.brown.fox"不匹配任何绑定,所以,会被直接忽略。
如果我们打破这个约定,发送出带有一个单词或四个单词路由键的消息,例如"orange"或者"quick.orange.male.rabbit",会怎么样?狠简单,这些消息不会匹配任何绑定,因而会被丢弃。
另一方面,"lazy.orange.male.rabbit"呢,虽然有着四个单词,但是,它匹配了最后一个绑定,因而会被传递到第二个队列中。
主题交换机,功能强大,可以做到其它交换机所做到的事。
如果某个队列是使用" # " (井号)这个绑定键来绑定的——那么,它会接收到所有的消息,并且会忽略掉路由键——就像 fanout 交换机那样。
如果绑定键中未使用特殊字符"*" (星号)和"#" (井号),那么,主题交换机的行为会与 direct 相同。
我们要在我们的日志系统中使用 topic 交换机了。我们先要做一个假设,即,日志消息的路由键,由两个单词组成:"<facility>.<severity>"。
代码 与 前一个教程 中的代码几乎完全相同。
EmitLogTopic.cs 的源代码:
using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;
class EmitLogTopic
{
public static void Main ( string [] args )
{
var factory = new ConnectionFactory () { HostName = "localhost" };
using ( var connection = factory . CreateConnection ())
using ( var channel = connection . CreateModel ())
{
channel . ExchangeDeclare ( exchange : "topic_logs" ,
type : "topic" );
var routingKey = ( args . Length > 0 ) ? args [ 0 ] : "anonymous.info" ;
var message = ( args . Length > 1 )
? string . Join ( " " , args . Skip ( 1 ). ToArray ())
: "Hello World!" ;
var body = Encoding . UTF8 . GetBytes ( message );
channel . BasicPublish ( exchange : "topic_logs" ,
routingKey : routingKey ,
basicProperties : null ,
body : body );
Console . WriteLine ( " [x] Sent '{0}':'{1}'" , routingKey , message );
}
}
}
ReceiveLogsTopic.cs 的源代码:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class ReceiveLogsTopic
{
public static void Main ( string [] args )
{
var factory = new ConnectionFactory () { HostName = "localhost" };
using ( var connection = factory . CreateConnection ())
using ( var channel = connection . CreateModel ())
{
channel . ExchangeDeclare ( exchange : "topic_logs" , type : "topic" );
var queueName = channel . QueueDeclare (). QueueName ;
if ( args . Length < 1 )
{
Console . Error . WriteLine ( "Usage: {0} [binding_key...]" ,
Environment . GetCommandLineArgs ()[ 0 ]);
Console . WriteLine ( " Press [enter] to exit." );
Console . ReadLine ();
Environment . ExitCode = 1 ;
return ;
}
foreach ( var bindingKey in args )
{
channel . QueueBind ( queue : queueName ,
exchange : "topic_logs" ,
routingKey : bindingKey );
}
Console . WriteLine ( " [*] Waiting for messages. To exit press CTRL+C" );
var consumer = new EventingBasicConsumer ( channel );
consumer . Received += ( model , ea ) =>
{
var body = ea . Body ;
var message = Encoding . UTF8 . GetString ( body );
var routingKey = ea . RoutingKey ;
Console . WriteLine ( " [x] Received '{0}':'{1}'" ,
routingKey ,
message );
};
channel . BasicConsume ( queue : queueName ,
noAck : true ,
consumer : consumer );
Console . WriteLine ( " Press [enter] to exit." );
Console . ReadLine ();
}
}
}
运行以下示例:
以下命令用来接收所有日志:
$ ReceiveLogsTopic.exe "#"
接收来自"kern"程序的所有日志:
$ ReceiveLogsTopic.exe "kern.*"
或者,如果妳只想监听" critical "级别的日志的话:
$ ReceiveLogsTopic.exe "*.critical"
可以创建多个绑定:
$ ReceiveLogsTopic.exe "kern.*" "*.critical"
要想发送一条带有路由键" kern.critical "的日志消息,则执行:
$ EmitLogTopic.exe "kern.critical" "A critical kernel error"
请好好玩弄一下这些程序。注意,这些代码中,并不对路由键及绑定键做任何假设,妳可以试试使用多于两个路由键的参数。
某些值得研究的小问题:
•. " * "这个绑定键,会捕获到带有空白路由键的消息吗?
•."#.*",会捕获到以" .. "作为路由键的消息吗?会捕获到以单个单词作为路由键的消息吗?
•."a.*.#"与"a.#"有什么区别?
( EmitLogTopic.cs 和 ReceiveLogsTopic.cs 的完整代码 )
下一步,在 教程6 中,学习如何来自发送消息,以实现一个远程过程调用。
女神
孟茜
Your opinionsHxLauncher: Launch Android applications by voice commands