StupidBeauty
Read times:2107Posted at: - no title specified

RabbitMQ Java教程翻译:工作队列,Work Queues

(使用Java客户端)

前提

本教程假设RabbitMQ已经安装,并且在localhost的标准端口(5672)上运行。如果妳用的是别的主机名、端口号或认证信息,那么,连接设置会需要进行调整。

如何寻求帮助

如果妳在学习本教程时遇到困难,那么,可通过邮件列表来 与我们联系

第一 个教程 中,我们 写了两个程序,它们会利用一个命名队列发送及接收消息。 在这个教程中,我们会创建一个 工作队列 它会被用来向多个工作者分发费时的任务。

工作队列 也被称作 任务队列 ),其背后的理念是,避免立即进行那些 狠费资源的任务并且傻傻等待它们完成。取而代之 的是,我们 将任务放置到队列里,日后再处理。 我们用一条消息来封装一个 任务 ,然后 将它发送到某个队列中。后台运行 的某个工作者进程会取出任务,并且最终执行该任务。如果 妳运行着多个工作者,那么,整个任务队列会由它们共享。

这种概念,对于网页应用程序尤其有用,因为,无法在短暂的HTTP 请求时间窗口里完成一个复杂的任务。

准备

在教程的前一个部分,我们发送了一条包含着"Hello World!"内容的消息。现在,我们将会发送用来表示复杂任务的字符串。我们此时并没有真正要处理的任务,例如图片尺寸调整,或者pdf文件的渲染,因此,让我们就假装狠忙吧——利用 Thread.sleep() 函数来实现这一点。我们将字符串中小数点的个数当成复杂度的表示;每个小数点表示一秒钟的“工作”。例如,用 Hello... 表示的一个假任务,会占用三秒钟。

我们会稍微修改一下之前 Send.java 示例中的代码,以允许通过命令行发送任意的消息。 这个程序,会将任务加入到工作队列中去,因此,我们将它叫做 NewTask.java

String message = getMessage ( argv );

channel . basicPublish ( "" , "hello" , null, message . getBytes ());

System . out . println ( " [x] Sent '" + message + "'" );

以下是个辅助函数,用来从命令行参数中获取到消息内容:

private static String getMessage ( String [] strings ){

if ( strings . length < 1 )

return "Hello World!" ;

return joinStrings ( strings , " " );

}

private static String joinStrings ( String [] strings , String delimiter ) {

int length = strings . length ;

if ( length == 0 ) return "" ;

StringBuilder words = new StringBuilder ( strings [ 0 ]);

for ( int i = 1 ; i < length ; i ++) {

words . append ( delimiter ). append ( strings [ i ]);

}

return words . toString ();

}

之前 Recv.java 程序也需要做些修改: 它需要针对消息体中的每个小数点假装进行了一秒钟的工作。 它会处理RabbitMQ传递过来的消息,并且执行具体的任务,因此,我们将它叫做 Worker.java

final Consumer consumer = new DefaultConsumer ( channel ) {

@Override

public void handleDelivery ( String consumerTag , Envelope envelope , AMQP . BasicProperties properties , byte [] body ) throws IOException {

String message = new String ( body , "UTF-8" );

System . out . println ( " [x] Received '" + message + "'" );

try {

doWork ( message );

} finally {

System . out . println ( " [x] Done" );

}

}

};

channel . basicConsume ( TASK_QUEUE_NAME , true, consumer );

我们的假任务,用来模仿执行时间:

private static void doWork ( String task ) throws InterruptedException {

for ( char ch: task . toCharArray ()) {

if ( ch == '.' ) Thread . sleep ( 1000 );

}

}

像教程1中所说的那样编译它们(需要将那些jar文件放置到工作目录中)

$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

轮询分发

使用任务队列的好处之一,就是,可以轻易地将工作并行化。如果我们积压了狠多任务,那么,我们可以简单地加入更多工作者,以这种方式来轻易地扩展工作能力。

首先,让我们尝试同时运行两个工作者 实例 。我们都会从队列中获取到消息,但是,具体情况如何?让我们来深入查看。

妳需要打开三个终端。其中两个运行着工作者程序。这两个终端,会成为两个消费者——C1C2

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

[ * ] Waiting for messages. To exit press CTRL+C

shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

[ * ] Waiting for messages. To exit press CTRL+C

我们在第三个终端中发布新任务。当妳启动了消费者之后,就可以发布一些消息了:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask First message.

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Second message..

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Third message...

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Fourth message....

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Fifth message.....

让我们看看,工作者都接收到了什么:

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

[ * ] Waiting for messages. To exit press CTRL+C

[ x ] Received 'First message.'

[ x ] Received 'Third message...'

[ x ] Received 'Fifth message.....'

shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

[ * ] Waiting for messages. To exit press CTRL+C

[ x ] Received 'Second message..'

[ x ] Received 'Fourth message....'

默认情况下,RabbitMQ会按照顺序将消息发送给下一个消费者。平均下来,每个消费者都会获取到相同数量的消息。这种分发消息的方式,就称作轮询。使用三个或更多工作者来try一try。

消息确认

一项任务可能会占用数秒的时间。妳可能会疑惑,如果某个消费者启动一个长时间的任务,然后还没做完任务就死掉了,结果会怎么样?按照我们当前的代码,一旦RabbitMQ将某条消息传递给消费者之后,它就会立即从内存中删除这条消息。在这种情况下,如果妳杀死一个工作者的话,我们会丢失它刚才正在处理的消息。并且,所有那些已被分发给该工作者却没有被处理的消息,也会丢失。

然而,我们并不想丢失任何任务。如果某个工作者死掉了,那么,我们希望该任务被重新分发给另一个工作者。

为了确保不遗漏任何一条消息, RabbitMQ支持消息 确认 。回执 ack(nowledgement) )是由消费者回发的,用来告知 RabbitMQ ,某条特定的消息已经被接收到 、并且处理完毕了,从此 RabbitMQ 可以随意删除它了。

如果某个消费者死掉了而没有发送回执,那么,RabbitMQ就会知道,某条消息并没有被完整地处理,因而会将该条消息重新分发给另一个消费者。那样,妳就可以确保,即使工作者偶尔死掉,也不会有消息丢失。

没有任何的消息超时;RabbitMQ只会在工作者的连接断开的时候重新分发消息。即使消息的处理过程要占用狠长狠长的时间,也没有问题。

消息确认,默认情况是启用的。在前一个示例中,我们显式地将这个选项关闭了,具体就是设置了 autoAck=true 标志位。现在,是时候去掉这个标记位了,让工作者在完成任务之后回复一个适当的回执。

channel . basicQos ( 1 );

final Consumer consumer = new DefaultConsumer ( channel ) {

@Override

public void handleDelivery ( String consumerTag , Envelope envelope , AMQP . BasicProperties properties , byte [] body ) throws IOException {

String message = new String ( body , "UTF-8" );

System . out . println ( " [x] Received '" + message + "'" );

try {

doWork ( message );

} finally {

System . out . println ( " [x] Done" );

channel . basicAck ( envelope . getDeliveryTag (), false);

}

}

};

加上这样的代码之后,就可以确保,即使妳在某个工作者正处理某条消息时按CTRL+C 杀死了它,也不会有任何消息丢失。当该工作者死掉之后,所有未确认的消息都会被重新分发。

忘记确认

一个常见的错误就是,忘记了 basicAck 。这种错误狠容易犯,但是,其后果却狠严重。消息会在妳的客户端退出时被重新分发(看起来会像是随机的重新分发),但是,RabbitMQ会占用越来越多的内存,因为,它无法释放掉任何一条未被确认的消息。

要想对这种类型的错误进行调试,则,妳可以使用 rabbitmqctl 来输出 messages_unacknowledged字段的值:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Listing queues ...

hello    0       0

...done.

消息的持久性

我们已经学会了,如何做到,即使消费者死掉了,任务也不会丢失。但是,当RabbitMQ 服务器停止运行时,我们的任务仍然会丢失。

RabbitMQ退出或崩溃时,它会忘记所有的队列和消息,除非我们明确告诉它不要这么做。要做两件事,才能确保消息不丢失:我们必须将队列和消息都标记为持久的。

首先 我们要确保 RabbitMQ永远 不要丢失我们的队列信息。 为了做到这一点,我们需要将它声明为持久的( durable ):

boolean durable = true;

channel . queueDeclare ( "hello" , durable , false, false, null);

尽管这条命令本身是正确的,但是,在我们当前的配置情况下,它无法正常工作。这是因为,我们已经定义了一个名为 hello 的非持久的队列。RabbitMQ不允许妳使用不同的参数重新定义一个已有的队列,并且,对于任何试图这么做的程序,都会返回一个错误。但是,这样的小问题,是可以快速解决的——我们换个名字来声明队列,例如 task_queue

boolean durable = true;

channel . queueDeclare ( "task_queue" , durable , false, false, null);

这个 queueDeclare 变更,需要在生产者和消费者两边同时进行。

到了此刻,我们可以确信,即使RabbitMQ 重启了, task_queue 队列也不会丢失。现在,我们需要将我们的消息标记为持久的——具体做法就是,将 MessageProperties  (它实现了 BasicProperties )的值设置为 PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel . basicPublish ( "" , "task_queue" ,

MessageProperties . PERSISTENT_TEXT_PLAIN ,

message . getBytes ());

关于消息持久性的注意事项

将消息标记为持久的,并不能完全确保消息的不丢失。尽管 我们告知 RabbitMQ 要将该消息保存到硬盘上,但是,仍然 会有一个短暂的时间窗口,在这个时间窗口里, RabbitMQ已经接收 了该消息,但还没有将它保存。另外 RabbitMQ 也不会对每条消息都进行一次 fsync(2) 调用——所以,消息可能已经 被保存到缓存 ,却尚未 被真正写入到磁盘。持久 性保障,并不是一个坚固的保障,但是 ,对于 我们的简单任务队列来说已经足够了。如果 妳需要一个更坚固的保障,那么,可以使用 发布者确认

公平分发

妳可能注意到了,消息的分发仍然与我们的预期不太相符。例如,考虑这样一个场景,我们有两个工作者,并且,所有奇数编号的消息工作量都狠大,而偶数编号的消息工作量都狠小,那么,后果就是,其中一个工作者长期处于忙碌状态,而另一个工作者几乎不做什么工作。可是,RabbitMQ根本不知道这些事,于是仍然平均地分发消息。

这是因为,当消息进入队列时,RabbitMQ就会分发它。它并不查看每个消费者还有多少条消息未被确认。它只是将每一轮的第n条消息分发给第n个消费者。

为了克服这一点,我们可以使用 basicQos 方法,具体就是向它传递 prefetchCount  =  1 选项。这会告知RabbitMQ,同一时刻不要向单个工作者传递多余一条消息。或者,换句话说,只有在某个工作者处理完了一条消息并且对之进行确认之后,才向它传递下一条新消息。取而代之,它会将新消息传递给下一个当前并不忙碌的工作者。

int prefetchCount = 1 ;

channel . basicQos ( prefetchCount );

关于队列长度的注意事项

如果所有的工作者都处于忙碌状态,那么,妳的队列可能会塞满。妳应当注意对这种情况进行处理,例如加入更多的工作者,或者,采用其它策略。

将所有代码凑到一起

NewTask.java 类的最终代码:

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.MessageProperties;

public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue" ;

public static void main ( String [] argv )

throws java . io . IOException {

ConnectionFactory factory = new ConnectionFactory ();

factory . setHost ( "localhost" );

Connection connection = factory . newConnection ();

Channel channel = connection . createChannel ();

channel . queueDeclare ( TASK_QUEUE_NAME , true, false, false, null);

String message = getMessage ( argv );

channel . basicPublish ( "" , TASK_QUEUE_NAME ,

MessageProperties . PERSISTENT_TEXT_PLAIN ,

message . getBytes ());

System . out . println ( " [x] Sent '" + message + "'" );

channel . close ();

connection . close ();

}

//...

}

(NewTask.java 源代码 )

以及 Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {

private static final String TASK_QUEUE_NAME = "task_queue" ;

public static void main ( String [] argv ) throws Exception {

ConnectionFactory factory = new ConnectionFactory ();

factory . setHost ( "localhost" );

final Connection connection = factory . newConnection ();

final Channel channel = connection . createChannel ();

channel . queueDeclare ( TASK_QUEUE_NAME , true, false, false, null);

System . out . println ( " [*] Waiting for messages. To exit press CTRL+C" );

channel . basicQos ( 1 );

final Consumer consumer = new DefaultConsumer ( channel ) {

@Override

public void handleDelivery ( String consumerTag , Envelope envelope , AMQP . BasicProperties properties , byte [] body ) throws IOException {

String message = new String ( body , "UTF-8" );

System . out . println ( " [x] Received '" + message + "'" );

try {

doWork ( message );

} finally {

System . out . println ( " [x] Done" );

channel . basicAck ( envelope . getDeliveryTag (), false);

}

}

};

channel . basicConsume ( TASK_QUEUE_NAME , false, consumer );

}

private static void doWork ( String task ) {

for ( char ch : task . toCharArray ()) {

if ( ch == '.' ) {

try {

Thread . sleep ( 1000 );

} catch ( InterruptedException _ignored ) {

Thread . currentThread (). interrupt ();

}

}

}

}

}

(Worker.java 源代码 )

利用消息回执 prefetchCount 就可以建立好一个工作队列。持久性选项,使得,即使RabbitMQ 重启,那些任务也仍然存在。

欲知更多关于 Channel 方法 MessageProperties 的信息,则浏览 在线 javadocs

现在 ,可以移步到 教程3 学习如何将同一条消息传递给多个消费者。

美甲

Your opinions
Your name:Email:Website url:Opinion content:
- no title specified

HxLauncher: Launch Android applications by voice commands