Contents
  1. 1. Exchange
  2. 2. 临时QUEUE
  3. 3. 绑定
  4. 4. 汇总
    1. 4.1. 代码

  这节中我们将学习如何将一个消息传递给多个consummer,所谓的发布/订阅模式。

Exchange

  是时候学习完整的消息模型了。

  回顾一下之前的内容:

  1. producer是产生消息的用户进程。
  2. Queue是存储消息的缓存
  3. cunsumer是接收消息的用户进程

  切记消息从来不是直接发送给queue,实质上多数情况下producer甚至不知道有queue的存在。
  消息是直接发送给Exhange的。Exhange很简单,一方面它从producer接收消息并推送到Queue中,至于是将消息推送到特定的Queue,或所有QUEUE,或丢弃,这取决了Exchange的类型。

  有以下几种Exchange类型:direct,topic,headers和fanout。

  我们先看Fanout类型,创建如下的Exchange:

1
channel.exchangeDeclare("logs", "fanout");

  Fanout非常简单,顾名思义就是广播消息给所有它知道的QUEUE。

要查看Server上的Exchange,可以用如下命令:
1
2
3
4
5
6
7
8
9
10
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
其中smq.*和default (unnamed)是系统自动创建的。

匿名Exchange
之前两节我们没有指定Exchange,而是使用了""作为参数值,就是使用了匿名exchange。

  回顾消息发布的方法:

1
channel.basicPublish("", "hello", null, message.getBytes());

  第一个参数就是指定Exchange。

  现在我们使用具名的Exchange:

1
channel.basicPublish( "logs", "", null, message.getBytes());

临时QUEUE

  log案例中,需要接收所有的而不是部分消息,且对旧的消息不感兴趣。

  首先,每次连接Server我们需要一个新的Queue,因为Queue名不能重复,所以可以让Server来随机命名。

  第二,断开连接中Queue应该自动清空。

  当使用无参queueDeclare()中,将创建一个非持久的,专属且自动清空的队列,可用getQueue()返回自动生成的队列名:

1
String queueName = channel.queueDeclare().getQueue();

  返回值是类似amq.gen-JzTY20BRgKO-HjmUJj0wLg的随机字串。

绑定

现在告诉Exchange将消息发送给Queue,此操作为Bind。

1
channel.queueBind(queueName, "logs", "");

可用`rabbitmqctl list_bindings`命令查看bind.

汇总

producer的改动在于消息发送到了Exchange名,而不是queue名,在此之前须先声明Exchange,否则不存在的Exchange是非法的。
consumer的改动在于多了一个Bind操作,而且使用了匿名的Queue。


如果没有consumer绑定到这个Exchange,消息就会自动丢失,保证了安全性。

代码

producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package io.github.azyet;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
/**
* rabbitmq producer
*/
public class MQSender {
private static String QUEUE_NAME = "TEST_QUEUE";
private static String EXCHANGE_NAME = "TEST_FANOUT_EXCHANGE";
private static Logger logger = LoggerFactory.getLogger(MQSender.class);
public static void main(String[] args) {
logger.debug(args.length + "");
for (String arg : args) {
logger.debug(arg + "");
}
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection conn = factory.newConnection();
final Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //####################
boolean durable = true;
final String message = "hello world!";
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
int l = new Random().nextInt(100);
StringBuilder sb = new StringBuilder("hello.");
for (int i = 0; i < l % 5; i++) {
sb.append(".");
}
try {
logger.debug("sent a message {}", sb.toString());
channel.basicPublish(EXCHANGE_NAME, "", null, sb.toString().getBytes());//########
} catch (IOException e) {
e.printStackTrace();
}
}
}, 0, 2000);
} catch (IOException e) {
e.printStackTrace();
}
}
}

consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package io.github.azyet;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* rabbitmq consumer
*
*/
public class MQConsummer
{
private static String EXCHANGE_NAME = "TEST_FANOUT_EXCHANGE";
private static Logger logger = LoggerFactory.getLogger(MQConsummer.class);
public static void main( String[] args )
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //##########
String queueName = channel.queueDeclare().getQueue(); //##########
channel.queueBind(queueName, EXCHANGE_NAME, ""); //##########
logger.debug("waiting for message...");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
logger.debug("received a message {}",message);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ShutdownSignalException e) {
e.printStackTrace();
}
}
}

Contents
  1. 1. Exchange
  2. 2. 临时QUEUE
  3. 3. 绑定
  4. 4. 汇总
    1. 4.1. 代码