本节中我们将新建Work Queue用来在多个工作进程间分发比较耗时的任务。
work queue的主要意图就是避免立即处理资源密集的任务并等待它完成,而是安排它稍后处理。将任务封闭为消息发送给queue,后台的工作进程会取得这个消息并处理它,如果有多个工作进程,它们分担任务。
这种理念在web应用中非常有用,特别是在当无法在一个短时间窗内完成复杂请求时。
准备工作
方便起见,我们将使用Thread.sleep()来代替一个真正的费时任务,使用.
号来代表处理时长,比如消息hello...
表示任务将费时3秒。
对上一节的producer进行改动,使之能发送随机时长的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { int l = new Random().nextInt(100); StringBuilder sb = new StringBuilder("hello."); for (int i = 0; i < l % 5; i++) { sb.append("."); } try { logger.debug("sent a message {}", sb.toString()); channel.basicPublish("", QUEUE_NAME, null, sb.toString().getBytes()); } catch (IOException e) { e.printStackTrace(); } } }, 0, 1500);
|
对consumer也进行相应地改动:
1 2 3 4 5 6
| for (char ch: message.toCharArray()) { if (ch == '.') { logger.debug("."); Thread.sleep(1000); } }
|
循环分发
使用Task Queue的一个优势就是能够方便地并行工作。处理大量任务时,加入更多的工作线程即可,易于扩容。
首先,同时运行两个工作进程,它们都将从队列收到消息。缺省情况下,rabbitMQ会将消息顺序地依次发给下一个consumer,每个consumer都会收到相同数量的消息,这就是所谓的round-robin
。
注意,第一节提到QueueingConsumer是一个消息缓冲,一个工作进程可能在处理一个任务的时候,同时积压多个任务,而另一个工作进程可能是空闲(收到的都是轻量级任务),这是round-robin
的不足之处。
消息确认
一个工作进程如果意外终止,缺省状态(消息发出即从Queue删除)下会丢失正在处理的数据和接收到但未及处理的消息数据。显然这一般无法接受的,为了防止数据丢失,可以禁用自动确认,并在第次任务完成后手动确认。
1 2 3 4 5
| boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer); …… channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
一个常见又严重的错误就是忘记确认,消息将被和重传,但work进程已经退出,这将导致RabbitMQ占用越来越多的内存,因为它无法释放未被确认的消息。
可以使用如下命令来检查这种情况:
1 2 3
| $ sudo rabbitmqctl list_queues name messages_rdy messages_unacknowledged Listing queues ... TEST_QUEUE 0 0
|
消息持久性
虽然工作进程终止可以不丢失数据了,但Server终止仍然会导致数据丢失。我们也可以进行配置,使Server保存数据,需要完成以下两步:
- 将Queue声明为durable,这样Server就不会丢失Queue了。
1 2
| boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
|
需要注意的是RabbitMQ不允许新建同名Queue。
将消息标志为persisten,设置 MessageProperties = PERSISTENT_TEXT_PLAIN.
1 2 3
| channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
|
注:收到消息到保存到硬盘有一段时间,所以以上设置也不能100%保证消息不丢失,但对一般的简单任务足够。如果要更强的保证,可以使用publisher confirms
.
公平分发
上面已经提到round-robin可能导致有的工作进程繁忙而有的空闲,为此我们可以设置 prefetchCount = 1,这样工作进程将不会同时收到超过1个的任务,也就是繁忙中的进程不会再收到任务(关闭自动确认为前提)。
1 2
| int prefetchCount = 1; channel.basicQos(prefetchCount);
|
注意队列大小。如果所有工作进程繁忙,队列就会越来越长,应该增加更多工作进程,或者改变策略。
附:最终代码
producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package io.github.azyet; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.TimeoutException; /** * rabbitmq producer * */ public class MQSender { private static String QUEUE_NAME = "TEST_QUEUE"; private static Logger logger = LoggerFactory.getLogger(MQSender.class); public static void main( String[] args ) { logger.debug(args.length+""); for (String arg : args) { logger.debug(arg+""); } ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { Connection conn = factory.newConnection(); final Channel channel = conn.createChannel(); boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false,false, null); final String message = "hello world!"; Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { int l = new Random().nextInt(100); StringBuilder sb = new StringBuilder("hello."); for (int i = 0; i < l % 5; i++) { sb.append("."); } try { logger.debug("sent a message {}", sb.toString()); channel.basicPublish("", QUEUE_NAME, null, sb.toString().getBytes()); } catch (IOException e) { e.printStackTrace(); } } }, 0, 2000); // channel.close(); // conn.close(); } catch (IOException e) { e.printStackTrace(); } } }
|
consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| package io.github.azyet; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * rabbitmq consumer * */ public class MQConsummer { private static String QUEUE_NAME = "TEST_QUEUE"; private static Logger logger = LoggerFactory.getLogger(MQConsummer.class); public static void main( String[] args ) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false,null); int prefetchCount = 1; channel.basicQos(prefetchCount); logger.debug("waiting for message..."); QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); logger.debug("received a message {}",message); for (char ch: message.toCharArray()) { if (ch == '.') { logger.debug("."); Thread.sleep(1000); } } channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); } } }
|