RabbitMQ .NET教程翻译:远程进程调用(RPC),Remote procedure call (RPC)
本教程假设RabbitMQ已经安装,并且在localhost的标准端口(5672)上运行。如果妳用的是别的主机名、端口号或认证信息,那么,连接设置会需要进行调整。
如果妳在学习本教程时遇到困难,那么,可通过邮件列表来 与我们联系 。
在 第二 个教程 中,我们学习了,如何使用 工作队列 来 将费时的任务分发给多个工作者。
但是,如果 ,我们需要在远程的机器上运行一个函数,并且等待它的结果,该怎么办?那样 ,事情就不同了。 这种模式,被人们称作远程过程调用( Remote Procedure Call ),或者 RPC 。
在这个教程中,我们将使用 RabbitMQ 来构建一个远程过程调用系统:其中包含一个客户端和一个可扩展的远程过程调用服务器。目前,我们并没有什么值得分发的耗时任务,所以,我们将要创建一个简单的远程过程调用服务,它会返回指定数字的 斐波纳契数值 。
为了展示如何使用一个远程过程调用服务,我们将要创建一个简单的客户端类。它会暴露出一个名为 call 的方法,该方法会发送一个远程过程调用请求,并且阻塞,直到接收到答复为止:
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
尽管远程过程调用在计算机行业是一种狠常用的模式,但是,它经常招致批评。问题在于,程序猿并不知道,某个特定的函数,是一个本地调用还是一个缓慢的远程过程调用。像那样的疑惑,会带来一个不可预估的系统,并且增加了调试的复杂性。误用了的远程过程调用,不但不能简化软件,反而会造成一堆混乱的代码。
心中念想着那些问题,请考虑以下建议:
•.确保能够明了地分辨出,哪个函数调用是本地调用,哪个函数调用是远程调用。
•.为妳的系统写文档。将各个组件之间的依赖关系写清楚。
•.处理出错的情形。如果远程过程调用服务器停机了狠久,客户端应当如何反应?
如果妳还有疑问,那么请避免使用远程过程调用。如果可以的话,妳应当使用一个异步的管道——与远程过程调用那样的阻塞不同,异步管道的计算结果是异步地推送到下一步计算阶段的。
总体上说,利用RabbitMQ 来实现远程过程调用,是狠简单的。客户端发送一条请求消息,而服务器回复一条响应消息。为了能够接收到响应,我们需要在请求中附带上一个“回调”队列地址:
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey : "rpc_queue" ,
basicProperties : props ,
body : messageBytes );
// ... 然后,写代码,从回调队列(callback_queue)读取响应消息 ...
AMQP协议中预定义了14个属性,它们会随着消息而被传递。大部分属性都狠少被使用,不过以下属性是例外:
•. deliveryMode :标记一条消息是持久的 ( 值为 2 ) 还是临时的 (任何其它 值 ) 。 妳可能还记得,在 第二 个教程 中使用了这个属性。
•. contentType :用来描述编码内容的多媒体内容。例如,对于常用的JSON编码来说,将这个属性设置为以下值比较好: application/json 。
•. replyTo :常用来对回调队列进行命名。
•. correlationId :用来将远程过程调用的响应与请求对应起来。
在上面展示的方法中,我们建议针对每个远程过程调用请求创建一个回调队列。那样狠不高效,但是, 幸运的是,还有一种更好的方法——让我们为每个客户端创建一个单一的回调队列。
那样的话,又带来另一个问题,在那个队列中接收到响应消息之后,需要确定该响应是针对哪个请求而发出的。那就是我们使用 correlationId 属性的原因。对于每个请求,我们都会将它设置为一个唯一的值。日后,当我们从回调队列中接收到一条消息,我们就会检查这个属性的值,然后,根据这个,我们就能够将响应和请求对应起来。如果我们遇到了一个未知的 correlationId 值,我们可以安全地忽略那条消息——因为它跟我们发出的请求无关。
妳可以会有疑问,为什么我们应当忽略掉回调队列中的未知消息,而不是报告错误并退出?这是因为,在服务器端可能会出现一种罕见的竞争情况(race condition)。尽管出现这种情况的可能性狠小,但是,还是有可能的,远程过程调用服务器可能在刚刚向我们发送了响应而尚未针对我们的请求发送回执消息的时刻死掉。如果发生了这种事,那么,重启的远程过程调用服务器会再次处理该请求。所以,在客户端,我们应当优雅地处理好重复的响应,而整个远程过程调用系统应当做成是幂等的。
我们的整个远程过程调用系统是这样工作的:
•.客户端(Client)启动时,它创建一个匿名的独占的回调队列。
•.对于每个远程过程调用请求,客户端发送的消息中都会带有两个属性: replyTo ,它的值被设置为回调队列的名字; correlationId ,对于每个请求,它的值都是唯一的。
•.请求被发送名为 rpc_queue 的队列。
•.远程过程调用工作者(即:服务器)在那个队列中等待接收请求。接收到一个请求之后,它会做出对应的工作,然后,使用 replyTo 字段中所指明的那个队列,向客户端发送一条消息,告知结果。
•.客户端在回调队列上等待接收响应数据。接收到一条消息之后,它检查其中的 correlationId 属性。如果它的值与自己所发出的请求相匹配,则,将该响应的内容返回给应用程序。
斐波纳契任务 :
private static int fib(int n)
{
if ( n == 0 || n == 1 ) return n ;
return fib ( n - 1 ) + fib ( n - 2 );
}
我们声明了一个斐波纳契计算函数。它会假设输入参数都是有效的正整数。(请不要预期它能够处理那些巨大的数字,并且,它可能是最慢的递归实现了)。
远程过程调用服务器 RPCServer.cs 的代码 会是这样的 :
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class RPCServer
{
public static void Main ()
{
var factory = new ConnectionFactory () { HostName = "localhost" };
using ( var connection = factory . CreateConnection ())
using ( var channel = connection . CreateModel ())
{
channel . QueueDeclare ( queue : "rpc_queue" ,
durable : false ,
exclusive : false ,
autoDelete : false ,
arguments : null );
channel . BasicQos ( 0 , 1 , false );
var consumer = new QueueingBasicConsumer ( channel );
channel . BasicConsume ( queue : "rpc_queue" ,
noAck : false ,
consumer : consumer );
Console . WriteLine ( " [x] Awaiting RPC requests" );
while ( true )
{
string response = null ;
var ea = ( BasicDeliverEventArgs ) consumer . Queue . Dequeue ();
var body = ea . Body ;
var props = ea . BasicProperties ;
var replyProps = channel . CreateBasicProperties ();
replyProps . CorrelationId = props . CorrelationId ;
try
{
var message = Encoding . UTF8 . GetString ( body );
int n = int . Parse ( message );
Console . WriteLine ( " [.] fib({0})" , message );
response = fib ( n ). ToString ();
}
catch ( Exception e )
{
Console . WriteLine ( " [.] " + e . Message );
response = "" ;
}
finally
{
var responseBytes = Encoding . UTF8 . GetBytes ( response );
channel . BasicPublish ( exchange : "" ,
routingKey : props . ReplyTo ,
basicProperties : replyProps ,
body : responseBytes );
channel . BasicAck ( deliveryTag : ea . DeliveryTag ,
multiple : false );
}
}
}
}
/// <summary>
/// 假设 妳只会向本函数提供有效的正整数输入参数。
/// 请不要预期本函数能够处理那些巨大的数字,
/// 同时,本函数可能是最慢 的递归实现了。
/// </summary>
private static int fib ( int n )
{
if ( n == 0 || n == 1 )
{
return n ;
}
return fib ( n - 1 ) + fib ( n - 2 );
}
}
服务器代码狠直观:
•.一如既往地,我们首先建立连接、频道,然后,声明队列。
•. 我们可能需要运行多于一个服务器进程。为了在多个服务器之间均衡地分发负载,我们需要设置channel.basicQos 中的 prefetchCount 选项。
•. 我们使用 basicConsume 来访问队列。然后,我们进入永久(while)循环,在其中,我们等待接收到请求消息,做出对应的工作,然后回发响应内容。
远程过程调用客户端 RPCClient.cs 的代码:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
class RPCClient
{
private IConnection connection ;
private IModel channel ;
private string replyQueueName ;
private QueueingBasicConsumer consumer ;
public RPCClient ()
{
var factory = new ConnectionFactory () { HostName = "localhost" };
connection = factory . CreateConnection ();
channel = connection . CreateModel ();
replyQueueName = channel . QueueDeclare (). QueueName ;
consumer = new QueueingBasicConsumer ( channel );
channel . BasicConsume ( queue : replyQueueName ,
noAck : true ,
consumer : consumer );
}
public string Call ( string message )
{
var corrId = Guid . NewGuid (). ToString ();
var props = channel . CreateBasicProperties ();
props . ReplyTo = replyQueueName ;
props . CorrelationId = corrId ;
var messageBytes = Encoding . UTF8 . GetBytes ( message );
channel . BasicPublish ( exchange : "" ,
routingKey : "rpc_queue" ,
basicProperties : props ,
body : messageBytes );
while ( true )
{
var ea = ( BasicDeliverEventArgs ) consumer . Queue . Dequeue ();
if ( ea . BasicProperties . CorrelationId == corrId )
{
return Encoding . UTF8 . GetString ( ea . Body );
}
}
}
public void Close ()
{
connection . Close ();
}
}
class RPC
{
public static void Main ()
{
var rpcClient = new RPCClient ();
Console . WriteLine ( " [x] Requesting fib(30)" );
var response = rpcClient . Call ( "30" );
Console . WriteLine ( " [.] Got '{0}'" , response );
rpcClient . Close ();
}
}
客户端代码,稍微有点复杂:
•.我们建立一个连接和频道,然后声明一个排他性的“回调”('callback')队列,用于接收回复。
•.我们订阅“回调”队列,这样我们就能够接收到远程过程调用的响应了。
•. 在 call 方法中进行真正的远程过程调用请求。
•. 在这个方法中,我们首先生成一个唯一的 correlationId 数字并且记录下来——永久(while)循环中会使用这个值来匹配出正确的那个响应。
•. 接下来,我们发布请求消息,它带有两个属性: replyTo 和 correlationId 。
•.现在,我们可以等待了,等待,直到正确的那个响应消息到来了。
•.永久(while)循环内部的工作内容狠简单,对于到来的每条响应消息,它都进行检查,其中的 correlationId 是否与我们正在等待的值匹配。如果匹配,那么,保存这个响应。
•.最终,将响应内容回复给用户。
发起客户端请求:
RPCClient fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
fibonacciRpc.close();
现在 ,可以看看我们的完整代码了 (其中 还包括咯基本的异常处理 ) , RPCClient.cs 和 RPCServer.cs 。
$ csc /r:"RabbitMQ.Client.dll" RPCClient.cs
$ csc /r:"RabbitMQ.Client.dll" RPCServer.cs
远程过程调用服务已经准备好了。现在可以启动服务器:
$ RPCServer.exe
[ x ] Awaiting RPC requests
要请求计算一个斐波纳契数字,则运行客户端:
$ RPCClient.exe
[ x ] Requesting fib ( 30 )
此处展示的设计,并不是唯一可行的用来实现远程过程调用服务的方式,但是,它有着狠多重要的优点:
•.如果远程过程调用服务器太慢,那么,妳可以简单地加入另一个服务器,以进行扩展。试一试,在一个新的终端中,运行第二个 RPCServer 。
•. 在客户端,远程过程调用要求仅仅发送及接收一条消息。不需要进行像 queueDeclare 这样的同步调用。由此带来的结果是,远程过程调用客户端对于单个远程过程调用请求只需要使用一次往复的网络通信。
我们的代码仍然狠简单,也未尝试解决更复杂(然而重要)的问题,例如:
•.如果没有任何服务器运行,那么,客户端应当如何反应?
•.客户端是否应当针对远程过程调用来设置任何的超时呢?
•.如果服务器工作出错并且抛出一个异常,那么,这个异常应当被转发给客户端吗?
•.在进行处理之前,先进行检查(例如,检查绑定、类型),保护系统不受无效的传入消息的影响。
未知美人
Your opinionsHxLauncher: Launch Android applications by voice commands