这节中我们将实现只订阅消息的一个子集。例如,只接收Error的日志保存到磁盘,而将debug类的信息直接在console打印。
Bindings
上一节已经提到Binding,代码如下:
1
| channel.queueBind(queueName, EXCHANGE_NAME, "");
|
Binding是Exchange与Queue之间的关系,可以理解为:queue对Exchange中的消息感兴趣。
Binding可以添加额外的路由参数,为免混淆,这里称为bindingKey。以下是使用BindingKey来绑定:
1
| channel.queueBind(queueName, EXCHANGE_NAME, "black");
|
BindingKey的解释取决于Exchange类型,fanout类Exchange直接无视BindingKey.
Direct exchange
我们希望通过日志级别来区别对待,Fanout只会无脑广播肯定不行,这里我们使用Direct Exchange。它的路由算法也很简单:如消息的`routingKey == BindingKey``,则推送到相应的Queue中。
注意routingkey是消息属性,BindingKey是queue属性。
如上图,routingkey = orange的会去向C1,black和green(允许指定多个BindingKey)会去向C2,其它的会被丢弃。
Multiple bindings
多个Queue指定相同的BindingKey也是完全合法的,此时Exhange行为类似Fanout。
产生日志
我们将使用Direct Exchange,同时为消息指定routingKey代表日志级别。
首先,你得有一个Exchange。
1
| channel.exchangeDeclare(EXCHANGE_NAME, "direct");
|
可以发送消息了:
1
| channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
|
日志级别可以是以下之一: debug, warning, error.
订阅
consumer和之前差不多,只是将分别指定BindingKey。
1 2 3 4 5
| String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }
|
汇总
这次producer consumer都用参数来指定key,bindingKey可以指定多个。
运行效果:
producer源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package io.github.azyet; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * rabbitmq producer */ public class MQSender { private static String QUEUE_NAME = "TEST_QUEUE"; private static String EXCHANGE_NAME = "TEST_DIRECT_EXCHANGE"; private static Logger logger = LoggerFactory.getLogger(MQSender.class); public static void main(String[] args) { if (args.length < 1) { logger.error("input param"); return; } ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { Connection conn = factory.newConnection(); final Channel channel = conn.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); boolean durable = true; channel.close(); conn.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
|
consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package io.github.azyet; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * rabbitmq consumer * */ public class MQConsummer { private static String EXCHANGE_NAME = "TEST_DIRECT_EXCHANGE"; private static Logger logger = LoggerFactory.getLogger(MQConsummer.class); public static void main( String[] args ) { if (args.length < 1) { logger.error("need param"); return; } ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); boolean durable = true; channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); for (String arg : args) { channel.queueBind(queueName, EXCHANGE_NAME, arg); } int prefetchCount = 1; channel.basicQos(prefetchCount); logger.debug("waiting for message..."); QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume(queueName, autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); logger.debug("received a message {}",message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); } } }
|