Contents
  1. 1. Barrier
    1. 1.1. 测试代码
    2. 1.2. 日志输出设置
    3. 1.3. 运行结果与分析
  2. 2. Queue
    1. 2.1. 测试代码
    2. 2.2. 输出与分析
  3. 3. 附完整代码
    1. 3.1. SysncPrimitive
    2. 3.2. zkQueue
    3. 3.3. zkBarrier

这一节中我们要对上一节中的Barrier和Queue用例进行测试和分析,填补忽略的细节,加深印象。

Barrier

首先我们来看测试main方法,为了更方便地进行Debug和日志输出,暂没有使用不是太熟悉的Junit.

测试代码

1
2
3
4
5
6
7
8
9
10
ZKBarrier zkBarrier = new ZKBarrier("dell:2181", "/testRoot3", 3);
//purge the children under testrRoot
zkBarrier.name = args[0];
System.out.println("Entering the Barrier");
zkBarrier.enter();
System.out.println("Work done, leaving");
zkBarrier.leave();
return;

大致的流程如下:

等待其它进程(总数3)就绪
开始工作
等待所有Node删除,退出

日志输出设置

enter方法中有如下打印语句:

监听也是在enter方法中设置的List<String> list = zk.getChildren(root, true);第二个参数True代表监听root Node,Node的任何更新都会被通知.

1
2
3
4
5
6
7
8
ArrayList<String> list = (ArrayList<String>) zk.getChildren(root, true);
if (list.size() > 0) {
System.out.print(list.size()+" children: ");
for (String s : list) {
System.out.print(s + " ");
}
System.out.print("\n");

每次收到消息,即打印children node, 数量到达3时, 将进入工作阶段。

leave方法中也有类似日志输出,当数量为0时,退出程序。

运行结果与分析

首先运行nodej进程,再依次运行node1 和node2进程。

nodej输出:

1
2
3
4
5
6
7
Entering the Barrier
1 children: nodej
2 children: node1 nodej
3 children: node2 node1 nodej
time to work!!!
Work done, leaving
1children: node1

Node1输出:

1
2
3
4
5
Entering the Barrier
2 children: node1 nodej
3 children: node2 node1 nodej
time to work!!!
Work done, leaving

Node2 输出:

1
2
3
4
Entering the Barrier
3 children: node2 node1 nodej
time to work!!!
Work done, leaving

可以看到,Nodej Node1 Node2依次进入Barrier。每次有Node进入,在线的Node都会收到消息(广播),打印信息。当Node2进入时,达到预定数量3, 所有进程退出enter方法开始进入工作状态(只是打印一句日志),完成后开始进入退出状态。退出时,每个进程删除自己对应的Node,但是并不立即退出方法,而是待到所有Node删除后才返回。

这里测试模拟的工作时间很短,过程中出现过问题,即Node2刚创建,Node1 Nodej结束等待并瞬间工作完毕,删除自身Node,导致Node2尚未进入工作状态,因而将一直在等待。加一句sleep(1000)延长工作时间可解决。实际应用中如果任务运行时间很短,也需要考虑这个问题。

通过以上机制,分布式的三个进程实现了同时开始工作和同时退出,也就是达到了协同的效果。

Queue

我们之前介绍了RabbitMQ,它是一个专门的AMQP应用,使用ZooKeeper的znode的有序性,也可以实现类似的功能。

测试代码

依然采用了main方法来测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
ZKQueue zkQueue = new ZKQueue("dell:2181", "/queueRoot1");
if (zkQueue == null) {
return;
}
if (args.length == 1) {
zkQueue.produce(Integer.parseInt(args[0]));
} else {
while (true) {
System.out.println("node value is:" + zkQueue.consume());
}
}
无参时作为consumer一直监听消息,有参时作为producer发送参数值(整数)。

输出与分析

producer:

1
2
3
4
5
$ java -jar target/zkQueue-1.0-SNAPSHOT.jar 1
$ java -jar target/zkQueue-1.0-SNAPSHOT.jar 2
$ java -jar target/zkQueue-1.0-SNAPSHOT.jar 5

consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
Going to wait
node name: element0000000003
Temporary value: /queueRoot1/element0000000003
node value is:1
Going to wait
node name: element0000000004
Temporary value: /queueRoot1/element0000000004
node value is:2
Going to wait
node name: element0000000005
Temporary value: /queueRoot1/element0000000005
node value is:5
Going to wait

这里我们可以看到,当创建模式为EPHEMERAL_SEQUENTIAL时,ZooKeeper会为节点名自动添加序号,对应是创建的顺序。因为只有一个producer,任务也简单,所以Node数总是1。如果有多个consumer,上述代码就会出错,因为消息是广播的,获取节点数据之后二者都会执行删除操作。可见它用来实现1对1的通信还是比较便捷的,但如果要实现工作队列来在多个线程中分发任务,实现并行处理和负载平衡,就不是很方便。

附完整代码

实质上一节的代码有一些小Bug,这里附上完整的代码

SysncPrimitive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package io.github.azyet;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.proto.WatcherEvent;
import java.io.IOException;
/**
* Created by chen on 15-5-19.
*/
public class SyncPrimitive implements Watcher {
static ZooKeeper zk = null;
static final Object mutex = new Object();
String root;
SyncPrimitive(String address)
throws KeeperException, IOException {
if (zk == null) {
System.out.println("Starting ZK:");
zk = new ZooKeeper(address, 3000, (Watcher) this);
System.out.println("Finished starting ZK: " + zk);
}
}
public void process(WatchedEvent watchedEvent) {
synchronized (mutex) {
mutex.notify();
}
}
}

zkQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package io.github.azyet;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
/**
* Hello world!
*/
public class ZKQueue extends SyncPrimitive {
ZKQueue(String address, String name)
throws KeeperException, InterruptedException, IOException {
super(address);
this.root = name;
// Create ZK node name
if (zk != null) {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
}
/**
* ****** Add element to the queue.
* ******
* ****** @param i
* ****** @return
*******/
boolean produce(int i) throws KeeperException, InterruptedException {
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
// Add child with value i
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
return true;
}
/**
* ****** Remove first element from the queue.
* ******
* ****** @return
* ****** @throws KeeperException
* ****** @throws InterruptedException
*******/
int consume() throws KeeperException, InterruptedException {
int retvalue = -1;
Stat stat = null;
// Get the first element available
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.isEmpty()) {
System.out.println("Going to wait");
mutex.wait();
} else {
for (String s : list) {
System.out.println("node name: " + s);
}
Integer min = new Integer(list.get(0).substring(7));
String minName = list.get(0);
for (String s : list) {
Integer tempValue = new Integer(s.substring(7));
if (tempValue < min) {
min = tempValue;
minName = s;
}
}
System.out.println("Temporary value: " + root + "/" + minName);
byte[] b = zk.getData(root + "/" + minName, false, stat);
zk.delete(root + "/" + minName, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
public static void main(String args[]) throws KeeperException, InterruptedException, IOException {
ZKQueue zkQueue = new ZKQueue("dell:2181", "/queueRoot1");
if (zkQueue == null) {
return;
}
if (args.length == 1) {
zkQueue.produce(Integer.parseInt(args[0]));
} else {
while (true) {
System.out.println("node value is:" + zkQueue.consume());
}
}
}
}

zkBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package io.github.azyet;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
/**
* Hello world!
*/
public class ZKBarrier extends SyncPrimitive {
String root;
private String name;
private int size;
/**
* ****** Barrier constructor
* ******
* ****** @param address
* ****** @param name
* ****** @param size
*******/
ZKBarrier(String address, String rootNode, int size)
throws KeeperException, InterruptedException, IOException {
super(address);
this.root = rootNode;
this.size = size;
// Create barrier node
if (zk != null) {
Stat s = zk.exists(this.root, false);
if (s == null) {
zk.create(this.root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// My node name
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
}
boolean enter() throws KeeperException, InterruptedException {
zk.create(root + "/" + name, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
System.out.print(list.size()+" children: ");
for (String s : list) {
System.out.print(s+" ");
}
System.out.print("\n");
if (list.size() < size) {
mutex.wait();
} else {
System.out.println("time to work!!!");
return true;
}
}
}
}
/**
* ****** Wait until all reach barrier
* ******
* ****** @return
* ****** @throws KeeperException
* ****** @throws InterruptedException
*******/
boolean leave() throws KeeperException, InterruptedException {
zk.delete(root + "/" + name, 0);
while (true) {
synchronized (mutex) {
ArrayList<String> list = (ArrayList<String>) zk.getChildren(root, true);
if (list.size() > 0) {
System.out.print(list.size()+" children: ");
for (String s : list) {
System.out.print(s + " ");
}
Thread.sleep(1000);
System.out.print("\n");
mutex.wait();
} else {
return true;
}
}
}
}
public static void main(String[] args) {
if (args.length < 1) {
System.out.println("input node name");
return;
}
try {
ZKBarrier zkBarrier = new ZKBarrier("dell:2181", "/testRoot3", 3);
//purge the children under testrRoot
zkBarrier.name = args[0];
System.out.println("Entering the Barrier");
zkBarrier.enter();
System.out.println("Work done, leaving");
zkBarrier.leave();
return;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Contents
  1. 1. Barrier
    1. 1.1. 测试代码
    2. 1.2. 日志输出设置
    3. 1.3. 运行结果与分析
  2. 2. Queue
    1. 2.1. 测试代码
    2. 2.2. 输出与分析
  3. 3. 附完整代码
    1. 3.1. SysncPrimitive
    2. 3.2. zkQueue
    3. 3.3. zkBarrier