Contents
  1. 1. Bindings
  2. 2. Direct exchange
  3. 3. Multiple bindings
  4. 4. 产生日志
  5. 5. 订阅
  6. 6. 汇总
    1. 6.1. producer源码
    2. 6.2. consumer

这节中我们将实现只订阅消息的一个子集。例如,只接收Error的日志保存到磁盘,而将debug类的信息直接在console打印。

Bindings

上一节已经提到Binding,代码如下:

1
channel.queueBind(queueName, EXCHANGE_NAME, "");

Binding是Exchange与Queue之间的关系,可以理解为:queue对Exchange中的消息感兴趣。

Binding可以添加额外的路由参数,为免混淆,这里称为bindingKey。以下是使用BindingKey来绑定:

1
channel.queueBind(queueName, EXCHANGE_NAME, "black");

BindingKey的解释取决于Exchange类型,fanout类Exchange直接无视BindingKey.

Direct exchange

我们希望通过日志级别来区别对待,Fanout只会无脑广播肯定不行,这里我们使用Direct Exchange。它的路由算法也很简单:如消息的`routingKey == BindingKey``,则推送到相应的Queue中。

注意routingkey是消息属性,BindingKey是queue属性。

如上图,routingkey = orange的会去向C1,black和green(允许指定多个BindingKey)会去向C2,其它的会被丢弃。

Multiple bindings

多个Queue指定相同的BindingKey也是完全合法的,此时Exhange行为类似Fanout。

产生日志

我们将使用Direct Exchange,同时为消息指定routingKey代表日志级别。

首先,你得有一个Exchange。

1
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

可以发送消息了:

1
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

日志级别可以是以下之一: debug, warning, error.

订阅

consumer和之前差不多,只是将分别指定BindingKey。

1
2
3
4
5
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

汇总

这次producer consumer都用参数来指定key,bindingKey可以指定多个。

运行效果:

producer源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package io.github.azyet;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* rabbitmq producer
*/
public class MQSender {
private static String QUEUE_NAME = "TEST_QUEUE";
private static String EXCHANGE_NAME = "TEST_DIRECT_EXCHANGE";
private static Logger logger = LoggerFactory.getLogger(MQSender.class);
public static void main(String[] args) {
if (args.length < 1) {
logger.error("input param");
return;
}
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection conn = factory.newConnection();
final Channel channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
boolean durable = true;
channel.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}

consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package io.github.azyet;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* rabbitmq consumer
*
*/
public class MQConsummer
{
private static String EXCHANGE_NAME = "TEST_DIRECT_EXCHANGE";
private static Logger logger = LoggerFactory.getLogger(MQConsummer.class);
public static void main( String[] args )
{
if (args.length < 1) {
logger.error("need param");
return;
}
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
boolean durable = true;
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
for (String arg : args) {
channel.queueBind(queueName, EXCHANGE_NAME, arg);
}
int prefetchCount = 1;
channel.basicQos(prefetchCount);
logger.debug("waiting for message...");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
logger.debug("received a message {}",message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ShutdownSignalException e) {
e.printStackTrace();
}
}
}
Contents
  1. 1. Bindings
  2. 2. Direct exchange
  3. 3. Multiple bindings
  4. 4. 产生日志
  5. 5. 订阅
  6. 6. 汇总
    1. 6.1. producer源码
    2. 6.2. consumer