RabbitMQ Python教程翻译:路由,Routing
本教程假设RabbitMQ已经安装,并且在localhost的标准端口(5672)上运行。如果妳用的是别的主机名、端口号或认证信息,那么,连接设置会需要进行调整。
如果妳在学习本教程时遇到困难,那么,可通过邮件列表来 与我们联系 。
在 前一个教程 中,我们构建了一个简单的日志系统。 我们能够将日志消息广播给多个接收器。
在这个教程中,我们将要向它再加入一个特性——我们将实现只订阅全部消息中某个子集的功能。例如,我们能够仅仅将致命的错误消息写入到文件中(以便节省磁盘空间),同时仍然将全部的日志消息输出到屏幕上。
在前一个示例中,我们已经创建了绑定。回忆一下这样的代码:
channel . queue_bind(exchange = exchange_name,
queue = queue_name )
绑定,指的是,交换机和队列之间的一种关系。这种关系可以简单地说成:这个队列,对于来自这个交换机的消息感兴趣。
绑定,可能有另一个额外的 routing_key 参数。为了避免与 basic_publish 中的参数相混淆,我们将它称为 binding key 。以下是用来创建带有键名的绑定的代码:
channel . queue_bind(exchange = exchange_name,
queue = queue_name ,
routing_key = 'black' )
绑定键的具体意义,取决于交换机的类型。我们之前使用的 fanout 类型的交换机,会直接无视掉这个值。
前一个教程中的日志系统,会将所有消息广播给所有消费者。我们想对这个系统进行扩展,以实现根据重要性来过滤消息。例如,我们希望,向磁盘写入日志消息的那个脚本,仅仅接收致命错误信息,而不要因为警告或提示性日志消息浪废磁盘空间。
我们之前使用的是 fanout 交换机,那种交换机使用起来并不灵活——它只会进行无脑的广播。
这次,我们将使用 direct 交换机。 direct 交换机背后的路由算法狠简单——如果某些队列的 binding key 与该消息的 routing key 完全匹配,那么,消息会进入那些队列。
我们使用下图来帮助理解这一点:
在这个场景中,我们可以看到 direct 类型的交换机 X ,它还绑定了两个队列。第一个队列,是使用绑定键 orange 来绑定的;而第二个队列拥有两个绑定,第一个是以键 black 来绑定的,第二个是以键 green 来绑定的。
在这种情况下,发送到该交换机的带有路由键 orange 的消息会被路由到队列 Q1 。带有路由键 black 或 green 的消息,会进入队列 Q2 。其它所有消息会被忽略。
妳完全可以让多个队列使用同一个绑定键来绑定。在我们的示例中,可以使用绑定键 black 来在 X 和 Q1 之间添加一个绑定。在那种情况下,这个 direct交换机的行为会与 fanout相同,会将消息广播给所有相匹配的队列。带有路由键 black 的消息,会同时被传递给 Q1 和 Q2 。
我们会在我们的日志系统中使用这种模型。这次,我们不使用 fanout 交换机,而使用 direct 交换机。我们会将日志的严重性当作路由键( routing key )来使用。这样的话,接收者脚本就可以选择自己要接收哪个严重级别的日志。首先,让我们来关注于日志消息的发送。
一如既往地,我们需要先创建一个交换机:
channel . exchange_declare(exchange = 'direct_logs',
type = 'direct' )
然后,我们就可以发送消息了:
channel . basic_publish(exchange = 'direct_logs',
routing_key = severity ,
body = message )
为了简单起见,我们假设,“严重性”('severity')的取值是'info'、'warning'、'error'中的一个。
接收消息,与前一个教程中的做法类似,只是有一个例外——我们将会为我们感兴趣的每个严重性程度分别创建一个新的绑定。
result = channel . queue_declare(exclusive = True)
queue_name = result . method . queue
for severity in severities:
channel . queue_bind ( exchange = 'direct_logs' ,
queue = queue_name ,
routing_key = severity )
emit_log_direct.py 的代码:
#!/usr/bin/env python import pika import sys connection = pika . BlockingConnection(pika . ConnectionParameters( host = 'localhost')) channel = connection . channel() channel . exchange_declare(exchange = 'direct_logs', type = 'direct') severity = sys . argv[1] if len(sys . argv) > 1 else 'info' message = ' ' . join(sys . argv[2:]) or 'Hello World!' channel . basic_publish(exchange = 'direct_logs', routing_key = severity, body = message) print " [x] Sent %r:%r" % (severity, message) connection . close() |
receive_logs_direct.py 的代码:
#!/usr/bin/env python import pika import sys connection = pika . BlockingConnection(pika . ConnectionParameters( host = 'localhost')) channel = connection . channel() channel . exchange_declare(exchange = 'direct_logs', type = 'direct') result = channel . queue_declare(exclusive = True) queue_name = result . method . queue severities = sys . argv[1:] if not severities: print >> sys . stderr, "Usage: %s [info] [warning] [error]" % \ (sys . argv[0],) sys . exit(1) for severity in severities: channel . queue_bind(exchange = 'direct_logs', queue = queue_name, routing_key = severity) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method . routing_key, body,) channel . basic_consume(callback, queue = queue_name, no_ack = True) channel . start_consuming() |
如果妳想将“警告”('warning')和“错误”('error')(而不包括“提示信息”('info'))级别的日志消息保存到文件中去,那么,打开一个终端,执行以下命令:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log
如果妳想在屏幕上查看所有的日志消息,那么,打开一个终端,执行:
$ python receive_logs_direct.py info warning error
[ * ] Waiting for logs. To exit press CTRL+C
然后,举个例子,要想发送一条 error 级别的日志消息,则执行:
$ python emit_log_direct.py error "Run. Run. Or it will explode."
[ x ] Sent 'error' : 'Run. Run. Or it will explode.'
( emit_log_direct.py 和 receive_logs_direct.py 的完整源代码 )
请移步到 教程5 ,以学习,如何基于模式 来监听消息。
女神
Your opinionsHxLauncher: Launch Android applications by voice commands