Contents
  1. 1. Hello World!
    1. 1.1. 发送消息(producer)
    2. 1.2. 接收消息(consumer)
  2. 2. 附:完整程序清单
    1. 2.1. MQSender.java
    2. 2.2. MQConsumer.java

  rabbitmq是AMQP的一种实现,可为分布式系统提供可靠的消息通信机制。它从生产者接收消息,并将其传递给消费者,并可二者之间可以提供路由、缓冲和存留功能。

queue一个无限缓冲,可以存储任意多的消息。多个producer可以将消息发送给同一queue,多个consumer也可以从同一queue接收消息。

  在使用RabbitMQ之前,需要有RabbitMQ-server,如果是ubuntu,可以用如下命令安装

1
sudo apt-get install rabbitmq-server

Hello World!

  以下使用Java编写一个Hello world例程,包含两个程序,一个producer和一个consumer。

  如果使用Maven工具,可以在pom.xml添加如下的依赖:

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.2</version>
</dependency>

  用Factory类建立连接和channel:

1
2
3
4
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();

  绑定queue:

1
channel.queueDeclare(QUEUE_NAME, false, false,false, null);

  如果该queue不存在,将新建该queue。因为producer和consumer的启动顺序是无保证的,所以consumer也需要declare queue,以确保接收消息前queue存在。

发送消息(producer)

  第一个参数为exchange,这里暂不涉及,使用default:

1
2
String message = "hello world!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

接收消息(consumer)

  接收者可以是另一个进程,如果想接收producer发送的消息,它需要和producer绑定同一个队列,建立channel的操作与producer无异。

  新建consumer,与channel和Queue关联。

1
2
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);

  消息将会以异步方式传递,所以我们提供一个回调对象QueueingConsumer来缓存消息。

1
2
3
4
5
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
logger.debug("received a message {}",message);
}

consumer.nextDelivery()在收到消息之前,会阻塞进程。

  我们先运行consumer监听消息,再运行producer发送一个消息。
  运行结果。
producer:

1
2015-05-18 18:00:35,142 DEBUG azyet.MQSender (MQSender.java:main(32)) - sent a message hello world!

consumer

1
2
2015-05-18 18:00:28,931 DEBUG azyet.MQConsummer (MQConsummer.java:main(26)) - waiting for message...
2015-05-18 18:00:35,144 DEBUG azyet.MQConsummer (MQConsummer.java:main(34)) - received a message hello world!

附:完整程序清单

MQSender.java

producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package io.github.azyet;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Hello world!
*
*/
public class MQSender
{
private static String QUEUE_NAME = "TEST_QUEUE";
private static Logger logger = LoggerFactory.getLogger(MQSender.class);
public static void main( String[] args )
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false,false, null);
String message = "hello world!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
logger.debug("sent a message {}",message);
channel.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}

MQConsumer.java

consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package io.github.azyet;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Hello world!
*
*/
public class MQConsummer
{
private static String QUEUE_NAME = "TEST_QUEUE";
private static Logger logger = LoggerFactory.getLogger(MQConsummer.class);
public static void main( String[] args )
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false,null);
logger.debug("waiting for message...");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
logger.debug("received a message {}",message);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ShutdownSignalException e) {
e.printStackTrace();
}
}
}

Contents
  1. 1. Hello World!
    1. 1.1. 发送消息(producer)
    2. 1.2. 接收消息(consumer)
  2. 2. 附:完整程序清单
    1. 2.1. MQSender.java
    2. 2.2. MQConsumer.java