rabbitmq是AMQP的一种实现,可为分布式系统提供可靠的消息通信机制。它从生产者接收消息,并将其传递给消费者,并可二者之间可以提供路由、缓冲和存留功能。
queue
一个无限缓冲,可以存储任意多的消息。多个producer
可以将消息发送给同一queue,多个consumer
也可以从同一queue接收消息。
在使用RabbitMQ之前,需要有RabbitMQ-server,如果是ubuntu,可以用如下命令安装
1
| sudo apt-get install rabbitmq-server
|
Hello World!
以下使用Java编写一个Hello world例程,包含两个程序,一个producer和一个consumer。
如果使用Maven工具,可以在pom.xml添加如下的依赖:
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.2</version> </dependency>
|
用Factory类建立连接和channel:
1 2 3 4
| ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel();
|
绑定queue:
1
| channel.queueDeclare(QUEUE_NAME, false, false,false, null);
|
如果该queue不存在,将新建该queue。因为producer和consumer的启动顺序是无保证的,所以consumer也需要declare queue,以确保接收消息前queue存在。
发送消息(producer)
第一个参数为exchange,这里暂不涉及,使用default:
1 2
| String message = "hello world!"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
|
接收消息(consumer)
接收者可以是另一个进程,如果想接收producer发送的消息,它需要和producer绑定同一个队列,建立channel的操作与producer无异。
新建consumer,与channel和Queue关联。
1 2
| QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer);
|
消息将会以异步方式传递,所以我们提供一个回调对象QueueingConsumer
来缓存消息。
1 2 3 4 5
| while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); logger.debug("received a message {}",message); }
|
consumer.nextDelivery()
在收到消息之前,会阻塞进程。
我们先运行consumer监听消息,再运行producer发送一个消息。
运行结果。
producer:
1
| 2015-05-18 18:00:35,142 DEBUG azyet.MQSender (MQSender.java:main(32)) - sent a message hello world!
|
consumer
1 2
| 2015-05-18 18:00:28,931 DEBUG azyet.MQConsummer (MQConsummer.java:main(26)) - waiting for message... 2015-05-18 18:00:35,144 DEBUG azyet.MQConsummer (MQConsummer.java:main(34)) - received a message hello world!
|
附:完整程序清单
MQSender.java
producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package io.github.azyet; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Hello world! * */ public class MQSender { private static String QUEUE_NAME = "TEST_QUEUE"; private static Logger logger = LoggerFactory.getLogger(MQSender.class); public static void main( String[] args ) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false,false, null); String message = "hello world!"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); logger.debug("sent a message {}",message); channel.close(); conn.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
|
MQConsumer.java
consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package io.github.azyet; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * Hello world! * */ public class MQConsummer { private static String QUEUE_NAME = "TEST_QUEUE"; private static Logger logger = LoggerFactory.getLogger(MQConsummer.class); public static void main( String[] args ) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false,null); logger.debug("waiting for message..."); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); logger.debug("received a message {}",message); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); } } }
|