RabbitMQ Java教程翻译:发布/订阅,Publish/Subscribe
本教程假设RabbitMQ已经安装,并且在localhost的标准端口(5672)上运行。如果妳用的是别的主机名、端口号或认证信息,那么,连接设置会需要进行调整。
如果妳在学习本教程时遇到困难,那么,可通过邮件列表来 与我们联系 。
在 前一个教程 中,我们创建了一个工作队列。工作队列背后 的假设是,每个任务 被传递给唯一一个工作者。 在这个部分中,我们要做点完全不同的事—— 我们会将一条消息传递给多个消费者。 这种模式被称作 " 发布/订阅 " 。
为了展示这种模式,我们要构建一个简单的日志系统。它由两个程序组成——第一个程序会发送日志消息,第二个程序会接收消息并且输出。
在我们的日志系统中,每个处于运行状态的接收者程序都会收到消息。这样的话,我们就可以运行一个接收者程序,将日志写入到磁盘;同时,运行另一个接收者程序,将日志直接输出到屏幕。
本质上,发出的日志消息都会被广播给所有的接收者。
在教程中之前的部分里,我们是利用队列来发送及接收消息的。而现在,让我们引入Rabbit 中的完整消息模型。
快速回顾一下之前教程中学过的东西:
•. 生产 者 ,是一个用户程序,它发送消息。
•. 队列 ,是一个缓冲区,它存储消息。
•. 消费 者 ,是一个用户程序,它接收消息。
RabbitMQ 的消息模型中,核心之处在于,生产者从不直接向队列发送消息。实际上,通常情况下,生产者甚至不知道一条消息是否会被传递给任何队列。
取而代之 的是,生产者只能向某个交换机 ( exchange )发送消息。交换机 是一个狠简单的东西。 一个方面,它从生产者那里接收消息, 另一方面,它将消息推入到队列中去。交换机 ,对于它所接收到的任何一条消息,都必须知道自己究竟该干什么。应当 将该消息追加到某个特定的队列中去? 还是应当将它追加到多个队列中去? 还是应当将它直接忽略。 这种规则,是由 交换机 的类型 来定义的。
有多种交换机类型: direct 、 topic 、 headers 和 fanout 。此处,我们专注于最后一种——广播 (fanout)。我们来创建一个这种类型的交换机,并且称它为 logs :
channel . exchangeDeclare ( "logs" , "fanout" );
广播交换机狠简单。妳可能已经根据它的名字猜测到了,它的作用就是,将它接收到的所有消息,广播给它所知道的所有队列。这正是我们的日志系统所需要的能力。
要想列出服务器上所有的交换机,妳可以运行强大的 rabbitmqctl 命令:
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.
在输出的列表中,有一些 amq.* 交换机,还有默认的(未命名)交换机。这些交换机默认就会创建,但是,目前,妳似乎用不上它们。
在教程中之前的部分里,我们根本不知道还有交换机这么一回事,然而我们仍然能够向队列中发送消息。那是因为,我们使用的是默认的交换机,它是使用空白字符串( "" )来标识的。
回忆一下,我们之前是如何发布消息的:
channel . basicPublish ( "" , "hello" , null, message . getBytes ());
第一 个参数,即为交换机名字。空白字符串表示 ,使用默认或 无名 交换机 :如果对应 的队列存在,则,消息 会被路由到 名为 routingKey 的队列中去。
如今,我们可以换种方式,发布给已命名的交换机:
channel . basicPublish ( "logs" , "" , null, message . getBytes ());
妳也许还记得,在之前,我们使用的都是具有特定名字的队列(hello 和 task_queue)。在那个时候,对队列进行命名是狠必要的——我们需要将工作者指向相同的队列。当妳要在生产者和消费者之间分享队列时,给队列命名是狠重要的。
但是,对于我们的日志系统来说,就不是这么回事了。我们想要监听所有的日志消息,而不仅仅是其中的一个子集。另外,我们也只是对当前发出的消息感兴趣,而不关心旧的消息。要做到那一点,我们需要做两件事。
首先,当我们连接到Rabbit的时候,我们需要有一个全新的空白队列。要做到这一点的话,我们可以选择以一个随机的名字来创建队列,或者,更好的处理方式是——让服务器为我们选择一个随机的队列名字。
其次,当我们的消费者断开连接的时候,队列应当被自动删除。
在Java客户端中,如果我们不向 queueDeclare() 传递任何参数,那么,就会创建一个非持久的、独占的、自动删除的队列,并且其名字是自动生成的:
String queueName = channel . queueDeclare (). getQueue ();
在那个时间点, queueName 中会包含一个随机生成的队列名字。例如,它可能长这样: amq.gen-JzTY20BRgKO-HjmUJj0wLg 。
我们已经创建了一个广播交换机和一个队列。现在 ,我们需要告诉交换机将消息发送到我们的队列。交换机 和队列之间的这种关系,就称作 绑定 。
channel . queueBind ( queueName , "logs" , "" );
从现在开始, logs 交换机就会将消息追加到我们的队列中去。
妳可能已经猜到了,可以使用 rabbitmqctl list_bindings 命令来列表已有的所有绑定。
生产者程序,也就是用来发送日志消息的程序,与前一个教程中的代码没什么差别。最大的差别就是,我们现在将消息发布到 logs 交换机,而不是一个无名交换机。我们在发送消息时需要提供一个 routingKey ,但是,对于 fanout 类型的交换机,这个参数的值会被无视掉。以下是 EmitLog.java 程序的代码:
import java.io.IOException ; import com.rabbitmq.client.ConnectionFactory ; import com.rabbitmq.client.Connection ; import com.rabbitmq.client.Channel ; public class EmitLog { private static final String EXCHANGE_NAME = "logs" ; public static void main ( String [] argv ) throws java . io . IOException { ConnectionFactory factory = new ConnectionFactory (); factory . setHost ( "localhost" ); Connection connection = factory . newConnection (); Channel channel = connection . createChannel (); channel . exchangeDeclare ( EXCHANGE_NAME , "fanout" ); String message = getMessage ( argv ); channel . basicPublish ( EXCHANGE_NAME , "" , null, message . getBytes ()); System . out . println ( " [x] Sent '" + message + "'" ); channel . close (); connection . close (); } //... } |
如妳所见,建立了连接之后,我们就声明了交换机。这个步骤是必要的,因为,向一个不存在的交换机发布消息是被禁止的。
如果此刻还未有任何队列被绑定到该交换机,那么,该消息会丢失,但是,这对于我们现在这个应用场景来说是完全可以接受的;如果没有任何消费者监听该队列,那么,我们可以安全地无视该消息。
ReceiveLogs.java 源代码:
import com.rabbitmq.client.* ; import java.io.IOException ; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs" ; public static void main ( String [] argv ) throws Exception { ConnectionFactory factory = new ConnectionFactory (); factory . setHost ( "localhost" ); Connection connection = factory . newConnection (); Channel channel = connection . createChannel (); channel . exchangeDeclare ( EXCHANGE_NAME , "fanout" ); String queueName = channel . queueDeclare (). getQueue (); channel . queueBind ( queueName , EXCHANGE_NAME , "" ); System . out . println ( " [*] Waiting for messages. To exit press CTRL+C" ); Consumer consumer = new DefaultConsumer ( channel ) { @Override public void handleDelivery ( String consumerTag , Envelope envelope , AMQP . BasicProperties properties , byte [] body ) throws IOException { String message = new String ( body , "UTF-8" ); System . out . println ( " [x] Received '" + message + "'" ); } }; channel . basicConsume ( queueName , true, consumer ); } } |
按照前面教程中讲的方法来编译,就完工了。
$ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java
如果妳想将日志内容保存到一个文件中去,那么,打开一个终端,并且执行:
$ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log
如果妳想在屏幕上实时观看日志信息,则,打开一个新的终端,并且执行:
$ java -cp .:rabbitmq-client.jar ReceiveLogs
当然,发送日志的命令是这样的:
$ java -cp .:rabbitmq-client.jar EmitLog
使用rabbitmqctl list_bindings命令,可以检查确认,这些代码确实是按照我们的设想正确地创建了对应的队列。当妳运行着两份 ReceiveLogs.java 程序实例的时候,妳会看到类似下面的输出内容:
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.
妳可以狠直观地解释这个输出结果:来自交换机 logs 的数据,会进入两个队列,这两个队列的名字是由服务器分配的。而这正是我们所想要的行为。
要想学习如何监听消息中的一个子集,则,请移步到 教程4 。
女神
Your opinionsHxLauncher: Launch Android applications by voice commands