StupidBeauty
Read times:2048Posted at: - no title specified

RabbitMQ .NET教程翻译:工作队列,Work Queues

(使用.NET客户端)

前提

本教程假设RabbitMQ已经安装,并且在localhost的标准端口(5672)上运行。如果妳用的是别的主机名、端口号或认证信息,那么,连接设置会需要进行调整。

如何寻求帮助

如果妳在学习本教程时遇到困难,那么,可通过邮件列表来 与我们联系

第一 个教程 中,我们写了两个程序,它们通过一个已命名的队列来发送及接收消息。 在这个教程中,我们会创建一个 工作队列 它会被用来向多个工作者分发费时的任务。

工作队列 也被称作 任务队列 ),其背后的理念是,避免立即进行那些 狠费资源的任务并且傻傻等待它们完成。取而代之 的是,我们 将任务放置到队列里,日后再处理。 我们用一条消息来封装一个 任务 ,然后 将它发送到某个队列中。后台运行 的某个工作者进程会取出任务,并且最终执行该任务。如果 妳运行着多个工作者,那么,整个任务队列会由它们共享。

这种概念,对于网页应用程序尤其有用,因为,无法在短暂的HTTP 请求时间窗口里完成一个复杂的任务。

准备

在教程的前一个部分,我们发送了一条包含着"Hello World!"内容的消息。现在,我们将会发送用来表示复杂任务的字符串。我们此时并没有真正要处理的任务,例如图片尺寸调整,或者pdf文件的渲染,因此,让我们就假装狠忙吧——利用 Thread.sleep() 函数来实现这一点。我们将字符串中小数点的个数当成复杂度的表示;每个小数点表示一秒钟的“工作”。例如,用 Hello... 表示的一个假任务,会占用三秒钟。

我们会稍微修改一下之前 Send.cs 示例中的代码, 以允许通过命令行发送任意的消息。 这个程序,会将任务加入到工作队列中去,因此,我们将它叫做 NewTask.cs

var message = GetMessage(args);

var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();

properties.SetPersistent(true);

channel.BasicPublish(exchange: "",

routingKey : "task_queue" ,

basicProperties : properties ,

body : body );

以下是个辅助函数,用来从命令行参数中获取到消息内容:

private static string GetMessage(string[] args)

{

return (( args . Length > 0 ) ? string . Join ( " " , args ) : "Hello World!" );

}

之前 Receive.cs 程序 也需要做些修改: 它需要针对消息体中的每个小数点假装进行了一秒钟的工作。 它会处理RabbitMQ传递过来的消息,并且执行具体的任务,因此,我们将它叫做 Worker.cs

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>

{

var body = ea . Body ;

var message = Encoding . UTF8 . GetString ( body );

Console . WriteLine ( " [x] Received {0}" , message );

int dots = message . Split ( '.' ). Length - 1 ;

Thread . Sleep ( dots * 1000 );

Console . WriteLine ( " [x] Done" );

channel . BasicAck ( deliveryTag : ea . DeliveryTag , multiple : false );

};

channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);

我们的假任务,用来模仿执行时间:

int dots = message.Split('.').Length - 1;

Thread.Sleep(dots * 1000);

像教程1中所说的那样编译它们(需要将客户端库放置到工作目录中)

$ csc /r:"RabbitMQ.Client.dll" NewTask.cs

$ csc /r:"RabbitMQ.Client.dll" Worker.cs

轮询分发

使用任务队列的好处之一,就是,可以轻易地将工作并行化。如果我们积压了狠多任务,那么,我们可以简单地加入更多工作者,以这种方式来轻易地扩展工作能力。

首先,让我们尝试同时运行两个 Worker.cs 脚本。我们都会从队列中获取到消息,但是,具体情况如何?让我们来深入查看。

妳需要打开三个终端。其中两个运行着 Worker.cs 脚本。这两个终端,会成为两个消费者——C1C2

shell1$ Worker.exe

Worker

[ * ] Waiting for messages. To exit press CTRL+C

shell2$ Worker.exe

Worker

[ * ] Waiting for messages. To exit press CTRL+C

我们在第三个终端中发布新任务。当妳启动了消费者之后,就可以发布一些消息了:

shell3$ NewTask.exe First message.

shell3$ NewTask.exe Second message..

shell3$ NewTask.exe Third message...

shell3$ NewTask.exe Fourth message....

shell3$ NewTask.exe Fifth message.....

让我们看看,工作者都接收到了什么:

shell1$ Worker.exe

[ * ] Waiting for messages. To exit press CTRL+C

[ x ] Received 'First message.'

[ x ] Received 'Third message...'

[ x ] Received 'Fifth message.....'

shell2$ Worker.exe

[ * ] Waiting for messages. To exit press CTRL+C

[ x ] Received 'Second message..'

[ x ] Received 'Fourth message....'

默认情况下,RabbitMQ会按照顺序将消息发送给下一个消费者。平均下来,每个消费者都会获取到相同数量的消息。这种分发消息的方式,就称作轮询。使用三个或更多工作者来try一try。

消息确认

一项任务可能会占用数秒的时间。妳可能会疑惑,如果某个消费者启动一个长时间的任务,然后还没做完任务就死掉了,结果会怎么样?按照我们当前的代码,一旦RabbitMQ将某条消息传递给消费者之后,它就会立即从内存中删除这条消息。在这种情况下,如果妳杀死一个工作者的话,我们会丢失它刚才正在处理的消息。并且,所有那些已被分发给该工作者却没有被处理的消息,也会丢失。

然而,我们并不想丢失任何任务。如果某个工作者死掉了,那么,我们希望该任务被重新分发给另一个工作者。

为了确保不遗漏任何一条消息, RabbitMQ支持消息 确认 。回执 ack(nowledgement) )是由消费者回发的,用来告知 RabbitMQ ,某条特定的消息已经被接收到 、并且处理完毕了,从此 RabbitMQ 可以随意删除它了。

如果某个消费者死掉了而没有发送回执,那么,RabbitMQ就会知道,某条消息并没有被完整地处理,因而会将该条消息重新分发给另一个消费者。那样,妳就可以确保,即使工作者偶尔死掉,也不会有消息丢失。

没有任何的消息超时;RabbitMQ只会在工作者的连接断开的时候重新分发消息。即使消息的处理过程要占用狠长狠长的时间,也没有问题。

消息确认,默认情况是启用的。在前一个示例中,我们显式地将这个选项关闭了,具体就是将 noAck  ("不进行手动确认")参数设置为 true 。现在,是时候去掉这个标记位了,让工作者在完成任务之后回复一个适当的回执。

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>

{

var body = ea . Body ;

var message = Encoding . UTF8 . GetString ( body );

Console . WriteLine ( " [x] Received {0}" , message );

int dots = message . Split ( '.' ). Length - 1 ;

Thread . Sleep ( dots * 1000 );

Console . WriteLine ( " [x] Done" );

channel . BasicAck ( deliveryTag : ea . DeliveryTag , multiple : false );

};

channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);

加上这样的代码之后,就可以确保,即使妳在某个工作者正处理某条消息时按CTRL+C 杀死了它,也不会有任何消息丢失。当该工作者死掉之后,所有未确认的消息都会被重新分发。

忘记确认

一个常见的错误就是,忘记了 BasicAck 。这种错误狠容易犯,但是,其后果却狠严重。消息会在妳的客户端退出时被重新分发(看起来会像是随机的重新分发),但是,RabbitMQ会占用越来越多的内存,因为,它无法释放掉任何一条未被确认的消息。

要想对这种类型的错误进行调试,则,妳可以使用 rabbitmqctl 来输出 messages_unacknowledged字段的值:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Listing queues ...

hello    0       0

...done.

消息的持久性

我们已经学会了,如何做到,即使消费者死掉了,任务也不会丢失。但是,当RabbitMQ 服务器停止运行时,我们的任务仍然会丢失。

RabbitMQ退出或崩溃时,它会忘记所有的队列和消息,除非我们明确告诉它不要这么做。要做两件事, 才能确保消息不丢失:我们必须将队列和消息都标记为持久的。

首先 我们要确保 RabbitMQ永远 不要丢失我们的队列信息。 为了做到这一点,我们需要将它声明为持久的( durable ):

channel.QueueDeclare(queue: "hello",

durable : true ,

exclusive : false ,

autoDelete : false ,

arguments : null );

尽管这条命令本身是正确的,但是,在我们当前的配置情况下,它无法正常工作。这是因为,我们已经定义了一个名为 hello 的非持久的队列。RabbitMQ不允许妳使用不同的参数重新定义一个已有的队列,并且,对于任何试图这么做的程序,都会返回一个错误。但是,这样的小问题,是可以快速解决的——我们换个名字来声明队列,例如 task_queue

channel.QueueDeclare(queue: "task_queue",

durable : true ,

exclusive : false ,

autoDelete : false ,

arguments : null );

这个 queueDeclare 变更,需要在生产者和消费者两边同时进行。

到了此刻,我们可以确信,即使RabbitMQ 重启了, task_queue 队列也不会丢失。现在,我们需要将我们的消息标记为持久的——具体做法就是,将 IBasicProperties.SetPersistent 设置为 true

var properties = channel.CreateBasicProperties();

properties.SetPersistent(true);

关于消息持久性的注意事项

将消息标记为持久的,并不能完全确保消息的不丢失。尽管 我们告知 RabbitMQ 要将该消息保存到硬盘上,但是,仍然 会有一个短暂的时间窗口,在这个时间窗口里, RabbitMQ已经接收 了该消息,但还没有将它保存。另外 RabbitMQ 也不会对每条消息都进行一次 fsync(2) 调用——所以,消息可能已经 被保存到缓存 ,却尚未 被真正写入到磁盘。持久 性保障,并不是一个坚固的保障,但是 ,对于 我们的简单任务队列来说已经足够了。如果 妳需要一个更坚固的保障,那么,可以使用 发布者确认

公平分发

妳可能注意到了,消息的分发仍然与我们的预期不太相符。例如,考虑这样一个场景,我们有两个工作者,并且,所有奇数编号的消息工作量都狠大,而偶数编号的消息工作量都狠小,那么,后果就是,其中一个工作者长期处于忙碌状态,而另一个工作者几乎不做什么工作。可是,RabbitMQ根本不知道这些事,于是仍然平均地分发消息。

这是因为,当消息进入队列时,RabbitMQ就会分发它。它并不查看每个消费者还有多少条消息未被确认。它只是将每一轮的第n条消息分发给第n个消费者。

为了克服这一点,我们可以使用 basicQos 方法,具体就是向它传递 prefetchCount  =  1 选项。这会告知RabbitMQ,同一时刻不要向单个工作者传递多余一条消息。或者,换句话说,只有在某个工作者处理完了一条消息并且对之进行确认之后,才向它传递下一条新消息。取而代之,它会将新消息传递给下一个当前并不忙碌的工作者。

channel.BasicQos(0, 1, false);

关于队列长度的注意事项

如果所有的工作者都处于忙碌状态,那么,妳的队列可能会塞满。妳应当注意对这种情况进行处理,例如加入更多的工作者,或者,采用其它策略。

将所有代码凑到一起

NewTask.cs 类的最终代码:

using System;

using RabbitMQ.Client;

using System.Text;

class NewTask

{

public static void Main ( string [] args )

{

var factory = new ConnectionFactory () { HostName = "localhost" };

using ( var connection = factory . CreateConnection ())

using ( var channel = connection . CreateModel ())

{

channel . QueueDeclare ( queue : "task_queue" ,

durable : true ,

exclusive : false ,

autoDelete : false ,

arguments : null );

var message = GetMessage ( args );

var body = Encoding . UTF8 . GetBytes ( message );

var properties = channel . CreateBasicProperties ();

properties . SetPersistent ( true );

channel . BasicPublish ( exchange : "" ,

routingKey : "task_queue" ,

basicProperties : properties ,

body : body );

Console . WriteLine ( " [x] Sent {0}" , message );

}

Console . WriteLine ( " Press [enter] to exit." );

Console . ReadLine ();

}

private static string GetMessage ( string [] args )

{

return (( args . Length > 0 ) ? string . Join ( " " , args ) : "Hello World!" );

}

}

(NewTask.cs 源代码 )

以及 Worker.cs

using System;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Text;

using System.Threading;

class Worker

{

public static void Main ()

{

var factory = new ConnectionFactory () { HostName = "localhost" };

using ( var connection = factory . CreateConnection ())

using ( var channel = connection . CreateModel ())

{

channel . QueueDeclare ( queue : "task_queue" ,

durable : true ,

exclusive : false ,

autoDelete : false ,

arguments : null );

channel . BasicQos ( prefetchSize : 0 , prefetchCount : 1 , global : false );

Console . WriteLine ( " [*] Waiting for messages." );

var consumer = new EventingBasicConsumer ( channel );

consumer . Received += ( model , ea ) =>

{

var body = ea . Body ;

var message = Encoding . UTF8 . GetString ( body );

Console . WriteLine ( " [x] Received {0}" , message );

int dots = message . Split ( '.' ). Length - 1 ;

Thread . Sleep ( dots * 1000 );

Console . WriteLine ( " [x] Done" );

channel . BasicAck ( deliveryTag : ea . DeliveryTag , multiple : false );

};

channel . BasicConsume ( queue : "task_queue" ,

noAck : false ,

consumer : consumer );

Console . WriteLine ( " Press [enter] to exit." );

Console . ReadLine ();

}

}

}

(Worker.cs 源代码 )

利用消息回执 BasicQos 就可以建立好一个工作队列。持久性选项,使得,即使RabbitMQ 重启,那些任务也仍然存在。

欲知更多关于 IModel 方法 IBasicProperties 的信息,则浏览 RabbitMQ .NET客户端API参考文档

现在 ,可以移步到 教程3 ,学习如何将同一条消息传递给多个消费者。

女神

电梯

Your opinions
Your name:Email:Website url:Opinion content:
- no title specified

HxLauncher: Launch Android applications by voice commands