RabbitMQ .NET教程翻译:工作队列,Work Queues
本教程假设RabbitMQ已经安装,并且在localhost的标准端口(5672)上运行。如果妳用的是别的主机名、端口号或认证信息,那么,连接设置会需要进行调整。
如果妳在学习本教程时遇到困难,那么,可通过邮件列表来 与我们联系 。
在 第一 个教程 中,我们写了两个程序,它们通过一个已命名的队列来发送及接收消息。 在这个教程中,我们会创建一个 工作队列 , 它会被用来向多个工作者分发费时的任务。
工作队列 ( 也被称作 任务队列 ),其背后的理念是,避免立即进行那些 狠费资源的任务并且傻傻等待它们完成。取而代之 的是,我们 将任务放置到队列里,日后再处理。 我们用一条消息来封装一个 任务 ,然后 将它发送到某个队列中。后台运行 的某个工作者进程会取出任务,并且最终执行该任务。如果 妳运行着多个工作者,那么,整个任务队列会由它们共享。
这种概念,对于网页应用程序尤其有用,因为,无法在短暂的HTTP 请求时间窗口里完成一个复杂的任务。
在教程的前一个部分,我们发送了一条包含着"Hello World!"内容的消息。现在,我们将会发送用来表示复杂任务的字符串。我们此时并没有真正要处理的任务,例如图片尺寸调整,或者pdf文件的渲染,因此,让我们就假装狠忙吧——利用 Thread.sleep() 函数来实现这一点。我们将字符串中小数点的个数当成复杂度的表示;每个小数点表示一秒钟的“工作”。例如,用 Hello... 表示的一个假任务,会占用三秒钟。
我们会稍微修改一下之前 Send.cs 示例中的代码, 以允许通过命令行发送任意的消息。 这个程序,会将任务加入到工作队列中去,因此,我们将它叫做 NewTask.cs :
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
channel.BasicPublish(exchange: "",
routingKey : "task_queue" ,
basicProperties : properties ,
body : body );
以下是个辅助函数,用来从命令行参数中获取到消息内容:
private static string GetMessage(string[] args)
{
return (( args . Length > 0 ) ? string . Join ( " " , args ) : "Hello World!" );
}
之前 的 Receive.cs 程序 也需要做些修改: 它需要针对消息体中的每个小数点假装进行了一秒钟的工作。 它会处理RabbitMQ传递过来的消息,并且执行具体的任务,因此,我们将它叫做 Worker.cs :
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea . Body ;
var message = Encoding . UTF8 . GetString ( body );
Console . WriteLine ( " [x] Received {0}" , message );
int dots = message . Split ( '.' ). Length - 1 ;
Thread . Sleep ( dots * 1000 );
Console . WriteLine ( " [x] Done" );
channel . BasicAck ( deliveryTag : ea . DeliveryTag , multiple : false );
};
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
我们的假任务,用来模仿执行时间:
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
像教程1中所说的那样编译它们(需要将客户端库放置到工作目录中):
$ csc /r:"RabbitMQ.Client.dll" NewTask.cs
$ csc /r:"RabbitMQ.Client.dll" Worker.cs
使用任务队列的好处之一,就是,可以轻易地将工作并行化。如果我们积压了狠多任务,那么,我们可以简单地加入更多工作者,以这种方式来轻易地扩展工作能力。
首先,让我们尝试同时运行两个 Worker.cs 脚本。我们都会从队列中获取到消息,但是,具体情况如何?让我们来深入查看。
妳需要打开三个终端。其中两个运行着 Worker.cs 脚本。这两个终端,会成为两个消费者——C1和C2。
shell1$ Worker.exe
Worker
[ * ] Waiting for messages. To exit press CTRL+C
shell2$ Worker.exe
Worker
[ * ] Waiting for messages. To exit press CTRL+C
我们在第三个终端中发布新任务。当妳启动了消费者之后,就可以发布一些消息了:
shell3$ NewTask.exe First message.
shell3$ NewTask.exe Second message..
shell3$ NewTask.exe Third message...
shell3$ NewTask.exe Fourth message....
shell3$ NewTask.exe Fifth message.....
让我们看看,工作者都接收到了什么:
shell1$ Worker.exe
[ * ] Waiting for messages. To exit press CTRL+C
[ x ] Received 'First message.'
[ x ] Received 'Third message...'
[ x ] Received 'Fifth message.....'
shell2$ Worker.exe
[ * ] Waiting for messages. To exit press CTRL+C
[ x ] Received 'Second message..'
[ x ] Received 'Fourth message....'
默认情况下,RabbitMQ会按照顺序将消息发送给下一个消费者。平均下来,每个消费者都会获取到相同数量的消息。这种分发消息的方式,就称作轮询。使用三个或更多工作者来try一try。
一项任务可能会占用数秒的时间。妳可能会疑惑,如果某个消费者启动一个长时间的任务,然后还没做完任务就死掉了,结果会怎么样?按照我们当前的代码,一旦RabbitMQ将某条消息传递给消费者之后,它就会立即从内存中删除这条消息。在这种情况下,如果妳杀死一个工作者的话,我们会丢失它刚才正在处理的消息。并且,所有那些已被分发给该工作者却没有被处理的消息,也会丢失。
然而,我们并不想丢失任何任务。如果某个工作者死掉了,那么,我们希望该任务被重新分发给另一个工作者。
为了确保不遗漏任何一条消息, RabbitMQ支持消息 的 确认 。回执 ( ack(nowledgement) )是由消费者回发的,用来告知 RabbitMQ ,某条特定的消息已经被接收到 、并且处理完毕了,从此 RabbitMQ 可以随意删除它了。
如果某个消费者死掉了而没有发送回执,那么,RabbitMQ就会知道,某条消息并没有被完整地处理,因而会将该条消息重新分发给另一个消费者。那样,妳就可以确保,即使工作者偶尔死掉,也不会有消息丢失。
没有任何的消息超时;RabbitMQ只会在工作者的连接断开的时候重新分发消息。即使消息的处理过程要占用狠长狠长的时间,也没有问题。
消息确认,默认情况是启用的。在前一个示例中,我们显式地将这个选项关闭了,具体就是将 noAck ("不进行手动确认")参数设置为 true 。现在,是时候去掉这个标记位了,让工作者在完成任务之后回复一个适当的回执。
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea . Body ;
var message = Encoding . UTF8 . GetString ( body );
Console . WriteLine ( " [x] Received {0}" , message );
int dots = message . Split ( '.' ). Length - 1 ;
Thread . Sleep ( dots * 1000 );
Console . WriteLine ( " [x] Done" );
channel . BasicAck ( deliveryTag : ea . DeliveryTag , multiple : false );
};
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
加上这样的代码之后,就可以确保,即使妳在某个工作者正处理某条消息时按CTRL+C 杀死了它,也不会有任何消息丢失。当该工作者死掉之后,所有未确认的消息都会被重新分发。
一个常见的错误就是,忘记了 BasicAck 。这种错误狠容易犯,但是,其后果却狠严重。消息会在妳的客户端退出时被重新分发(看起来会像是随机的重新分发),但是,RabbitMQ会占用越来越多的内存,因为,它无法释放掉任何一条未被确认的消息。
要想对这种类型的错误进行调试,则,妳可以使用 rabbitmqctl 来输出 messages_unacknowledged字段的值:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
我们已经学会了,如何做到,即使消费者死掉了,任务也不会丢失。但是,当RabbitMQ 服务器停止运行时,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记所有的队列和消息,除非我们明确告诉它不要这么做。要做两件事, 才能确保消息不丢失:我们必须将队列和消息都标记为持久的。
首先 , 我们要确保 RabbitMQ永远 不要丢失我们的队列信息。 为了做到这一点,我们需要将它声明为持久的( durable ):
channel.QueueDeclare(queue: "hello",
durable : true ,
exclusive : false ,
autoDelete : false ,
arguments : null );
尽管这条命令本身是正确的,但是,在我们当前的配置情况下,它无法正常工作。这是因为,我们已经定义了一个名为 hello 的非持久的队列。RabbitMQ不允许妳使用不同的参数重新定义一个已有的队列,并且,对于任何试图这么做的程序,都会返回一个错误。但是,这样的小问题,是可以快速解决的——我们换个名字来声明队列,例如 task_queue :
channel.QueueDeclare(queue: "task_queue",
durable : true ,
exclusive : false ,
autoDelete : false ,
arguments : null );
这个 queueDeclare 变更,需要在生产者和消费者两边同时进行。
到了此刻,我们可以确信,即使RabbitMQ 重启了, task_queue 队列也不会丢失。现在,我们需要将我们的消息标记为持久的——具体做法就是,将 IBasicProperties.SetPersistent 设置为 true 。
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
将消息标记为持久的,并不能完全确保消息的不丢失。尽管 我们告知 RabbitMQ 要将该消息保存到硬盘上,但是,仍然 会有一个短暂的时间窗口,在这个时间窗口里, RabbitMQ已经接收 了该消息,但还没有将它保存。另外 , RabbitMQ 也不会对每条消息都进行一次 fsync(2) 调用——所以,消息可能已经 被保存到缓存 ,却尚未 被真正写入到磁盘。持久 性保障,并不是一个坚固的保障,但是 ,对于 我们的简单任务队列来说已经足够了。如果 妳需要一个更坚固的保障,那么,可以使用 发布者确认 。
妳可能注意到了,消息的分发仍然与我们的预期不太相符。例如,考虑这样一个场景,我们有两个工作者,并且,所有奇数编号的消息工作量都狠大,而偶数编号的消息工作量都狠小,那么,后果就是,其中一个工作者长期处于忙碌状态,而另一个工作者几乎不做什么工作。可是,RabbitMQ根本不知道这些事,于是仍然平均地分发消息。
这是因为,当消息进入队列时,RabbitMQ就会分发它。它并不查看每个消费者还有多少条消息未被确认。它只是将每一轮的第n条消息分发给第n个消费者。
为了克服这一点,我们可以使用 basicQos 方法,具体就是向它传递 prefetchCount = 1 选项。这会告知RabbitMQ,同一时刻不要向单个工作者传递多余一条消息。或者,换句话说,只有在某个工作者处理完了一条消息并且对之进行确认之后,才向它传递下一条新消息。取而代之,它会将新消息传递给下一个当前并不忙碌的工作者。
channel.BasicQos(0, 1, false);
如果所有的工作者都处于忙碌状态,那么,妳的队列可能会塞满。妳应当注意对这种情况进行处理,例如加入更多的工作者,或者,采用其它策略。
NewTask.cs 类的最终代码:
using System;
using RabbitMQ.Client;
using System.Text;
class NewTask
{
public static void Main ( string [] args )
{
var factory = new ConnectionFactory () { HostName = "localhost" };
using ( var connection = factory . CreateConnection ())
using ( var channel = connection . CreateModel ())
{
channel . QueueDeclare ( queue : "task_queue" ,
durable : true ,
exclusive : false ,
autoDelete : false ,
arguments : null );
var message = GetMessage ( args );
var body = Encoding . UTF8 . GetBytes ( message );
var properties = channel . CreateBasicProperties ();
properties . SetPersistent ( true );
channel . BasicPublish ( exchange : "" ,
routingKey : "task_queue" ,
basicProperties : properties ,
body : body );
Console . WriteLine ( " [x] Sent {0}" , message );
}
Console . WriteLine ( " Press [enter] to exit." );
Console . ReadLine ();
}
private static string GetMessage ( string [] args )
{
return (( args . Length > 0 ) ? string . Join ( " " , args ) : "Hello World!" );
}
}
以及 Worker.cs :
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
class Worker
{
public static void Main ()
{
var factory = new ConnectionFactory () { HostName = "localhost" };
using ( var connection = factory . CreateConnection ())
using ( var channel = connection . CreateModel ())
{
channel . QueueDeclare ( queue : "task_queue" ,
durable : true ,
exclusive : false ,
autoDelete : false ,
arguments : null );
channel . BasicQos ( prefetchSize : 0 , prefetchCount : 1 , global : false );
Console . WriteLine ( " [*] Waiting for messages." );
var consumer = new EventingBasicConsumer ( channel );
consumer . Received += ( model , ea ) =>
{
var body = ea . Body ;
var message = Encoding . UTF8 . GetString ( body );
Console . WriteLine ( " [x] Received {0}" , message );
int dots = message . Split ( '.' ). Length - 1 ;
Thread . Sleep ( dots * 1000 );
Console . WriteLine ( " [x] Done" );
channel . BasicAck ( deliveryTag : ea . DeliveryTag , multiple : false );
};
channel . BasicConsume ( queue : "task_queue" ,
noAck : false ,
consumer : consumer );
Console . WriteLine ( " Press [enter] to exit." );
Console . ReadLine ();
}
}
}
利用消息回执和 BasicQos 就可以建立好一个工作队列。持久性选项,使得,即使RabbitMQ 重启,那些任务也仍然存在。
欲知更多关于 IModel 方法 和 IBasicProperties 的信息,则浏览 RabbitMQ .NET客户端API参考文档 。
现在 ,可以移步到 教程3 ,学习如何将同一条消息传递给多个消费者。
女神
电梯
Your opinionsHxLauncher: Launch Android applications by voice commands