Contents
  1. 1. Client接口
  2. 2. Callback queue
  3. 3. Correlation Id
  4. 4. Summary总结
  5. 5. 汇总
    1. 5.1. client
    2. 5.2. RPC Sever

第二节中我们学习了如何使用工作队列来在多个工作进程中分发费时的任务。

但是如果我们需要运行一个远程的函数并且需要等待返回结果会怎样?这是一种完全不同的机制,通常叫做RPC(Remote Procedure Call)远程方法调用。

这一节中我们将使用RabbitMQ来建造一个RPC系统:一个Client和一个可扩容的RPC Server。因为没有真正值得分布式化的任务,我们用Fibonacci数列来代替。

Client接口

为了演示RPC Service是如何使用的,我们将编写一个简单的Client类。它提供一个名为call的方法,发出RPC请求并在返回之前阻塞。

1
2
3
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
注:虽然RPC很常见,但也备受批评。问题在于程序无法得知调用的方法是远程(较慢)还是本地,这种模糊将导致系统不可预期以及不必要的额外调试复杂度,可能导致不可维护的`意面代码`。
确保方法远程或本地的明确性,作好文档。使组件之间的依赖关系清晰。处理异常情况,比如RPC不可用。存疑时尽量避免使用RPC。如果可能,使用异步方式,用状态转移代替阻塞。

Callback queue

总的来说通过RabbitMQ使用RPC是很容易的。Client发送请求,Server回复响应。为了收到回复,我们发送request时需要提供一个callback queue名,供Server使用。这里我们使用匿名Queue,它是专属的。

1
2
3
4
5
6
7
8
9
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
Message properties
AMQP协议定义了14种消息属性,大部分很少使用,除了以下几种:
deliveryMode: 标识消息持久(2)还是瞬时(其它)的,我们在第二节中使用过。
contentType: 描述编码的mine类型。例如Json是一种常见编码,此属性可调置为application/json
replyTo: 指定回调Queue
correlationId: 关联RPC请求与响应

Correlation Id

上面的代码中为每一个RPC请求建立了一个回调queue,这样很不经济,我们可以为每个Clinet创建一个回调queue。

那么怎么将请求与响应对应起来呢?使用上面介绍的correlationId属性。每一次请求都将其设置为一个唯一值,响应中也会包含此值,这样就可以对应起来了。而如果收到不符的correlationId时,则可以安全地将它丢弃。

为什么是丢弃而不是报错呢?因为有一种不太可能出现的情况,即RPC Server发出响应后就终止了,却未发出确认信息。那么Server再次运行时就会重复收到请求,Client就会收到重复的响应。这也是为什么RPC理想状态下是幂等的原因。

Summary总结

我们的RPC将这样工作:

Client启动后,1 建立一个匿名专属的回调queue;2 每个请求都包含两个属性,replyTocorrelationId,后者必须是唯一值不能雷同;3 请求被RPC server(worker)收到并完成后,会将结果通过属性replyTo中指定的queue并附上correlationId返回;4 Client收到结果后退出阻塞状态(监听回调queue),检查correlationId,如果匹配,则将值返回给程序。

汇总

我们可以看到,RabbitMQ并没有提供一个所谓的RPC消息类型的Queue或者Exchange,总之RPC其实是由程序员自己来实现的,只是利用了AMQP协议的replayTo和correlationId两个属性。

fibonacci函数定义:

1
2
3
4
5
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}

运行效果:

client

总的来说,Client的改动要大一些,因为它发送完请求后立即变成一个Consumer,等待来自callbackQueue的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package io.github.azyet;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* rabbitmq producer
*/
public class MQSender {
private static String QUEUE_NAME = "TEST_RPC_QUEUE";
private static Logger logger = LoggerFactory.getLogger(MQSender.class);
private static String callBackQueueName;
public static void main(String[] args) {
if (args.length < 1) {
logger.error("input param");
return;
}
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection conn = factory.newConnection();
final Channel channel = conn.createChannel();
callBackQueueName = channel.queueDeclare().getQueue();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(callBackQueueName, consumer);
String corrId = java.util.UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.correlationId(corrId)
.replyTo(callBackQueueName)
.build();
channel.basicPublish("", QUEUE_NAME, props, args[0].getBytes());
logger.debug("request sent, waiting for responese");
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
logger.debug("the result is: {}",new String(delivery.getBody()));
}
channel.close();
conn.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

RPC Sever

相对Consumer,它的变化在于处理完消息之后变成一个producer,即把结果以消息的形式发送到callbackQUEUE。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package io.github.azyet;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* rabbitmq consumer
*
*/
public class MQConsummer
{
private static String QUEUE_NAME = "TEST_RPC_QUEUE";
private static Logger logger = LoggerFactory.getLogger(MQConsummer.class);
public static void main( String[] args )
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
int prefetchCount = 1;
channel.basicQos(prefetchCount);
logger.debug("waiting for message...");
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
AMQP.BasicProperties properties = delivery.getProperties();
String message = new String(delivery.getBody());
int i = Integer.parseInt(message);
logger.debug("received a message {}",message);
int fib = fib(i);
String res = ""+fib;
logger.debug("the result is : {}",res);
AMQP.BasicProperties replyProps = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish( "", properties.getReplyTo(), replyProps, res.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ShutdownSignalException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
}
Contents
  1. 1. Client接口
  2. 2. Callback queue
  3. 3. Correlation Id
  4. 4. Summary总结
  5. 5. 汇总
    1. 5.1. client
    2. 5.2. RPC Sever