这节中我们将学习如何将一个消息传递给多个consummer,所谓的发布/订阅模式。
Exchange
是时候学习完整的消息模型了。
回顾一下之前的内容:
- producer是产生消息的用户进程。
- Queue是存储消息的缓存
- cunsumer是接收消息的用户进程
切记消息从来不是直接发送给queue,实质上多数情况下producer甚至不知道有queue的存在。
消息是直接发送给Exhange的。Exhange很简单,一方面它从producer接收消息并推送到Queue中,至于是将消息推送到特定的Queue,或所有QUEUE,或丢弃,这取决了Exchange的类型。
有以下几种Exchange类型:direct,topic,headers和fanout。
我们先看Fanout类型,创建如下的Exchange:
1
| channel.exchangeDeclare("logs", "fanout");
|
Fanout非常简单,顾名思义就是广播消息给所有它知道的QUEUE。
要查看Server上的Exchange,可以用如下命令:
1 2 3 4 5 6 7 8 9 10
| $ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic
|
其中smq.*和default (unnamed)是系统自动创建的。
匿名Exchange
之前两节我们没有指定Exchange,而是使用了""作为参数值,就是使用了匿名exchange。
回顾消息发布的方法:
1
| channel.basicPublish("", "hello", null, message.getBytes());
|
第一个参数就是指定Exchange。
现在我们使用具名的Exchange:
1
| channel.basicPublish( "logs", "", null, message.getBytes());
|
临时QUEUE
log案例中,需要接收所有的而不是部分消息,且对旧的消息不感兴趣。
首先,每次连接Server我们需要一个新的Queue,因为Queue名不能重复,所以可以让Server来随机命名。
第二,断开连接中Queue应该自动清空。
当使用无参queueDeclare()中,将创建一个非持久的,专属且自动清空的队列,可用getQueue()返回自动生成的队列名:
1
| String queueName = channel.queueDeclare().getQueue();
|
返回值是类似amq.gen-JzTY20BRgKO-HjmUJj0wLg
的随机字串。
绑定
现在告诉Exchange将消息发送给Queue,此操作为Bind。
1
| channel.queueBind(queueName, "logs", "");
|
可用`rabbitmqctl list_bindings`命令查看bind.
汇总
producer的改动在于消息发送到了Exchange名,而不是queue名,在此之前须先声明Exchange,否则不存在的Exchange是非法的。
consumer的改动在于多了一个Bind操作,而且使用了匿名的Queue。
如果没有consumer绑定到这个Exchange,消息就会自动丢失,保证了安全性。
代码
producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| package io.github.azyet; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Random; import java.util.Timer; import java.util.TimerTask; /** * rabbitmq producer */ public class MQSender { private static String QUEUE_NAME = "TEST_QUEUE"; private static String EXCHANGE_NAME = "TEST_FANOUT_EXCHANGE"; private static Logger logger = LoggerFactory.getLogger(MQSender.class); public static void main(String[] args) { logger.debug(args.length + ""); for (String arg : args) { logger.debug(arg + ""); } ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { Connection conn = factory.newConnection(); final Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //#################### boolean durable = true; final String message = "hello world!"; Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { int l = new Random().nextInt(100); StringBuilder sb = new StringBuilder("hello."); for (int i = 0; i < l % 5; i++) { sb.append("."); } try { logger.debug("sent a message {}", sb.toString()); channel.basicPublish(EXCHANGE_NAME, "", null, sb.toString().getBytes());//######## } catch (IOException e) { e.printStackTrace(); } } }, 0, 2000); } catch (IOException e) { e.printStackTrace(); } } }
|
consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package io.github.azyet; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * rabbitmq consumer * */ public class MQConsummer { private static String EXCHANGE_NAME = "TEST_FANOUT_EXCHANGE"; private static Logger logger = LoggerFactory.getLogger(MQConsummer.class); public static void main( String[] args ) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //########## String queueName = channel.queueDeclare().getQueue(); //########## channel.queueBind(queueName, EXCHANGE_NAME, ""); //########## logger.debug("waiting for message..."); QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = true; channel.basicConsume(queueName, autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); logger.debug("received a message {}",message); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); } } }
|