StupidBeauty
Read times:2269Posted at: - no title specified

RabbitMQ Java教程翻译:发布/订阅,Publish/Subscribe

(使用Java客户端)

前提

本教程假设RabbitMQ已经安装,并且在localhost的标准端口(5672)上运行。如果妳用的是别的主机名、端口号或认证信息,那么,连接设置会需要进行调整。

如何寻求帮助

如果妳在学习本教程时遇到困难,那么,可通过邮件列表来 与我们联系

前一个教程 中,我们创建了一个工作队列。工作队列背后 的假设是,每个任务 被传递给唯一一个工作者。 在这个部分中,我们要做点完全不同的事—— 我们会将一条消息传递给多个消费者。 这种模式被称作 " 发布/订阅 "

为了展示这种模式,我们要构建一个简单的日志系统。它由两个程序组成——第一个程序会发送日志消息,第二个程序会接收消息并且输出。

在我们的日志系统中,每个处于运行状态的接收者程序都会收到消息。这样的话,我们就可以运行一个接收者程序,将日志写入到磁盘;同时,运行另一个接收者程序,将日志直接输出到屏幕。

本质上,发出的日志消息都会被广播给所有的接收者。

交换机(Exchanges

在教程中之前的部分里,我们是利用队列来发送及接收消息的。而现在,让我们引入Rabbit 中的完整消息模型。

快速回顾一下之前教程中学过的东西:

  • •. 生产 ,是一个用户程序,它发送消息。

  • •. 队列 ,是一个缓冲区,它存储消息。

  • •. 消费 ,是一个用户程序,它接收消息。

RabbitMQ 的消息模型中,核心之处在于,生产者从不直接向队列发送消息。实际上,通常情况下,生产者甚至不知道一条消息是否会被传递给任何队列。

取而代之 的是,生产者只能向某个交换机 exchange )发送消息。交换机 是一个狠简单的东西。 一个方面,它从生产者那里接收消息, 另一方面,它将消息推入到队列中去。交换机 ,对于它所接收到的任何一条消息,都必须知道自己究竟该干什么。应当 将该消息追加到某个特定的队列中去? 还是应当将它追加到多个队列中去? 还是应当将它直接忽略。 这种规则,是由 交换机 的类型 来定义的。

有多种交换机类型: direct topic headers fanout 。此处,我们专注于最后一种——广播 fanout)。我们来创建一个这种类型的交换机,并且称它为 logs

channel . exchangeDeclare ( "logs" , "fanout" );

广播交换机狠简单。妳可能已经根据它的名字猜测到了,它的作用就是,将它接收到的所有消息,广播给它所知道的所有队列。这正是我们的日志系统所需要的能力。

列出所有交换机

要想列出服务器上所有的交换机,妳可以运行强大的 rabbitmqctl 命令:

$ sudo rabbitmqctl list_exchanges

Listing exchanges ...

direct

amq.direct direct

amq.fanout fanout

amq.headers headers

amq.match headers

amq.rabbitmq.log topic

amq.rabbitmq.trace topic

amq.topic topic

logs fanout

...done.

在输出的列表中,有一些 amq.* 交换机,还有默认的(未命名)交换机。这些交换机默认就会创建,但是,目前,妳似乎用不上它们。

无名交换机

在教程中之前的部分里,我们根本不知道还有交换机这么一回事,然而我们仍然能够向队列中发送消息。那是因为,我们使用的是默认的交换机,它是使用空白字符串( "" )来标识的。

回忆一下,我们之前是如何发布消息的:

channel . basicPublish ( "" , "hello" , null, message . getBytes ());

第一 个参数,即为交换机名字。空白字符串表示 ,使用默认或 无名 交换机 :如果对应 的队列存在,则,消息 会被路由到 名为 routingKey 的队列中去。

如今,我们可以换种方式,发布给已命名的交换机:

channel . basicPublish ( "logs" , "" , null, message . getBytes ());

临时队列

妳也许还记得,在之前,我们使用的都是具有特定名字的队列(hello task_queue)。在那个时候,对队列进行命名是狠必要的——我们需要将工作者指向相同的队列。当妳要在生产者和消费者之间分享队列时,给队列命名是狠重要的。

但是,对于我们的日志系统来说,就不是这么回事了。我们想要监听所有的日志消息,而不仅仅是其中的一个子集。另外,我们也只是对当前发出的消息感兴趣,而不关心旧的消息。要做到那一点,我们需要做两件事。

首先,当我们连接到Rabbit的时候,我们需要有一个全新的空白队列。要做到这一点的话,我们可以选择以一个随机的名字来创建队列,或者,更好的处理方式是——让服务器为我们选择一个随机的队列名字。

其次,当我们的消费者断开连接的时候,队列应当被自动删除。

Java客户端中,如果我们不向 queueDeclare() 传递任何参数,那么,就会创建一个非持久的、独占的、自动删除的队列,并且其名字是自动生成的:

String queueName = channel . queueDeclare (). getQueue ();

在那个时间点, queueName 中会包含一个随机生成的队列名字。例如,它可能长这样: amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定

我们已经创建了一个广播交换机和一个队列。现在 ,我们需要告诉交换机将消息发送到我们的队列。交换机 和队列之间的这种关系,就称作 绑定

channel . queueBind ( queueName , "logs" , "" );

从现在开始, logs 交换机就会将消息追加到我们的队列中去。

列出所有绑定

妳可能已经猜到了,可以使用 rabbitmqctl list_bindings 命令来列表已有的所有绑定。

将所有代码凑到一起

生产者程序,也就是用来发送日志消息的程序,与前一个教程中的代码没什么差别。最大的差别就是,我们现在将消息发布到 logs 交换机,而不是一个无名交换机。我们在发送消息时需要提供一个 routingKey ,但是,对于 fanout 类型的交换机,这个参数的值会被无视掉。以下是 EmitLog.java 程序的代码:

import java.io.IOException ;

import com.rabbitmq.client.ConnectionFactory ;

import com.rabbitmq.client.Connection ;

import com.rabbitmq.client.Channel ;

public class EmitLog {

private static final String EXCHANGE_NAME = "logs" ;

public static void main ( String [] argv )

throws java . io . IOException {

ConnectionFactory factory = new ConnectionFactory ();

factory . setHost ( "localhost" );

Connection connection = factory . newConnection ();

Channel channel = connection . createChannel ();

channel . exchangeDeclare ( EXCHANGE_NAME , "fanout" );

String message = getMessage ( argv );

channel . basicPublish ( EXCHANGE_NAME , "" , null, message . getBytes ());

System . out . println ( " [x] Sent '" + message + "'" );

channel . close ();

connection . close ();

}

//...

}

(EmitLog.java 源代码 )

如妳所见,建立了连接之后,我们就声明了交换机。这个步骤是必要的,因为,向一个不存在的交换机发布消息是被禁止的。

如果此刻还未有任何队列被绑定到该交换机,那么,该消息会丢失,但是,这对于我们现在这个应用场景来说是完全可以接受的;如果没有任何消费者监听该队列,那么,我们可以安全地无视该消息。

ReceiveLogs.java 源代码:

import com.rabbitmq.client.* ;

import java.io.IOException ;

public class ReceiveLogs {

private static final String EXCHANGE_NAME = "logs" ;

public static void main ( String [] argv ) throws Exception {

ConnectionFactory factory = new ConnectionFactory ();

factory . setHost ( "localhost" );

Connection connection = factory . newConnection ();

Channel channel = connection . createChannel ();

channel . exchangeDeclare ( EXCHANGE_NAME , "fanout" );

String queueName = channel . queueDeclare (). getQueue ();

channel . queueBind ( queueName , EXCHANGE_NAME , "" );

System . out . println ( " [*] Waiting for messages. To exit press CTRL+C" );

Consumer consumer = new DefaultConsumer ( channel ) {

      @Override

public void handleDelivery ( String consumerTag , Envelope envelope ,

AMQP . BasicProperties properties , byte [] body ) throws IOException {

String message = new String ( body , "UTF-8" );

System . out . println ( " [x] Received '" + message + "'" );

}

};

channel . basicConsume ( queueName , true, consumer );

}

}

(ReceiveLogs.java 源代码 )

按照前面教程中讲的方法来编译,就完工了。

$ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java

如果妳想将日志内容保存到一个文件中去,那么,打开一个终端,并且执行:

$ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log

如果妳想在屏幕上实时观看日志信息,则,打开一个新的终端,并且执行:

$ java -cp .:rabbitmq-client.jar ReceiveLogs

当然,发送日志的命令是这样的:

$ java -cp .:rabbitmq-client.jar EmitLog

使用rabbitmqctl list_bindings命令,可以检查确认,这些代码确实是按照我们的设想正确地创建了对应的队列。当妳运行着两份 ReceiveLogs.java 程序实例的时候,妳会看到类似下面的输出内容:

$ sudo rabbitmqctl list_bindings

Listing bindings ...

logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []

logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []

...done.

妳可以狠直观地解释这个输出结果:来自交换机 logs 的数据,会进入两个队列,这两个队列的名字是由服务器分配的。而这正是我们所想要的行为。

要想学习如何监听消息中的一个子集,则,请移步到 教程4

女神

Your opinions
Your name:Email:Website url:Opinion content:
- no title specified

HxLauncher: Launch Android applications by voice commands