Contents
  1. 1. 准备工作
    1. 1.1. 循环分发
    2. 1.2. 消息确认
  2. 2. 消息持久性
  3. 3. 公平分发
  4. 4. 附:最终代码
    1. 4.1. producer
    2. 4.2. consumer

  本节中我们将新建Work Queue用来在多个工作进程间分发比较耗时的任务。

  work queue的主要意图就是避免立即处理资源密集的任务并等待它完成,而是安排它稍后处理。将任务封闭为消息发送给queue,后台的工作进程会取得这个消息并处理它,如果有多个工作进程,它们分担任务。

  这种理念在web应用中非常有用,特别是在当无法在一个短时间窗内完成复杂请求时。

准备工作

  方便起见,我们将使用Thread.sleep()来代替一个真正的费时任务,使用.号来代表处理时长,比如消息hello...表示任务将费时3秒。

  对上一节的producer进行改动,使之能发送随机时长的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
int l = new Random().nextInt(100);
StringBuilder sb = new StringBuilder("hello.");
for (int i = 0; i < l % 5; i++) {
sb.append(".");
}
try {
logger.debug("sent a message {}", sb.toString());
channel.basicPublish("", QUEUE_NAME, null, sb.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
}, 0, 1500);

  对consumer也进行相应地改动:

1
2
3
4
5
6
for (char ch: message.toCharArray()) {
if (ch == '.') {
logger.debug(".");
Thread.sleep(1000);
}
}

循环分发

  使用Task Queue的一个优势就是能够方便地并行工作。处理大量任务时,加入更多的工作线程即可,易于扩容。

  首先,同时运行两个工作进程,它们都将从队列收到消息。缺省情况下,rabbitMQ会将消息顺序地依次发给下一个consumer,每个consumer都会收到相同数量的消息,这就是所谓的round-robin

  注意,第一节提到QueueingConsumer是一个消息缓冲,一个工作进程可能在处理一个任务的时候,同时积压多个任务,而另一个工作进程可能是空闲(收到的都是轻量级任务),这是round-robin的不足之处。

消息确认

  一个工作进程如果意外终止,缺省状态(消息发出即从Queue删除)下会丢失正在处理的数据和接收到但未及处理的消息数据。显然这一般无法接受的,为了防止数据丢失,可以禁用自动确认,并在第次任务完成后手动确认。

1
2
3
4
5
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
……
//处理完成后
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

  一个常见又严重的错误就是忘记确认,消息将被和重传,但work进程已经退出,这将导致RabbitMQ占用越来越多的内存,因为它无法释放未被确认的消息。

  可以使用如下命令来检查这种情况:

1
2
3
$ sudo rabbitmqctl list_queues name messages_rdy messages_unacknowledged
Listing queues ...
TEST_QUEUE 0 0

消息持久性

  虽然工作进程终止可以不丢失数据了,但Server终止仍然会导致数据丢失。我们也可以进行配置,使Server保存数据,需要完成以下两步:

  1. 将Queue声明为durable,这样Server就不会丢失Queue了。
    1
    2
    boolean durable = true;
    channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

  需要注意的是RabbitMQ不允许新建同名Queue。

  1. 将消息标志为persisten,设置 MessageProperties = PERSISTENT_TEXT_PLAIN.

    1
    2
    3
    channel.basicPublish("", "task_queue",
    MessageProperties.PERSISTENT_TEXT_PLAIN,
    message.getBytes());

    注:收到消息到保存到硬盘有一段时间,所以以上设置也不能100%保证消息不丢失,但对一般的简单任务足够。如果要更强的保证,可以使用publisher confirms.

公平分发

  上面已经提到round-robin可能导致有的工作进程繁忙而有的空闲,为此我们可以设置 prefetchCount = 1,这样工作进程将不会同时收到超过1个的任务,也就是繁忙中的进程不会再收到任务(关闭自动确认为前提)。

1
2
int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意队列大小。如果所有工作进程繁忙,队列就会越来越长,应该增加更多工作进程,或者改变策略。

附:最终代码

producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package io.github.azyet;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeoutException;
/**
* rabbitmq producer
*
*/
public class MQSender
{
private static String QUEUE_NAME = "TEST_QUEUE";
private static Logger logger = LoggerFactory.getLogger(MQSender.class);
public static void main( String[] args )
{
logger.debug(args.length+"");
for (String arg : args) {
logger.debug(arg+"");
}
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection conn = factory.newConnection();
final Channel channel = conn.createChannel();
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false,false, null);
final String message = "hello world!";
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
int l = new Random().nextInt(100);
StringBuilder sb = new StringBuilder("hello.");
for (int i = 0; i < l % 5; i++) {
sb.append(".");
}
try {
logger.debug("sent a message {}", sb.toString());
channel.basicPublish("", QUEUE_NAME, null, sb.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
}, 0, 2000);
// channel.close();
// conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package io.github.azyet;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* rabbitmq consumer
*
*/
public class MQConsummer
{
private static String QUEUE_NAME = "TEST_QUEUE";
private static Logger logger = LoggerFactory.getLogger(MQConsummer.class);
public static void main( String[] args )
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false,null);
int prefetchCount = 1;
channel.basicQos(prefetchCount);
logger.debug("waiting for message...");
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
logger.debug("received a message {}",message);
for (char ch: message.toCharArray()) {
if (ch == '.') {
logger.debug(".");
Thread.sleep(1000);
}
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ShutdownSignalException e) {
e.printStackTrace();
}
}
}
Contents
  1. 1. 准备工作
    1. 1.1. 循环分发
    2. 1.2. 消息确认
  2. 2. 消息持久性
  3. 3. 公平分发
  4. 4. 附:最终代码
    1. 4.1. producer
    2. 4.2. consumer