Contents
  1. 1. 基类
  2. 2. Barrier实现
    1. 2.1. 构造方法
    2. 2.2. 创建父node
  3. 3. 生产者-消费者队列(Producer-Consumer Queues)
    1. 3.1. Queue构造方法
    2. 3.2. 向队列添加元素
    3. 3.3. 取队列元素

这一节中我们演示用ZooKeeper创建一个简单的barrier和producer-consumer队列。当然首先你必须有至少一个ZooKeeper Server在运行。

基类

barrier和queue都将用到以下共同代码,构造方法会创建到Server的连接。

注意:官网教程没有提到,这个基类是要实现Watcher接口的,这样process方法才能作为回调方法收到消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static ZooKeeper zk = null;
static final Object mutex = new Object();
String root;
SyncPrimitive(String address)
throws KeeperException, IOException {
if(zk == null){
System.out.println("Starting ZK:");
zk = new ZooKeeper(address, 3000, (Watcher) this);
System.out.println("Finished starting ZK: " + zk);
}
}
public void process(WatcherEvent event) {
synchronized (mutex) {
mutex.notify();
}
}
public void process(WatchedEvent watchedEvent) {
synchronized (mutex) {
mutex.notify();
}
}

WatchEvent包含的信息有:

1
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testRoot1

两个类都继承SyncPrimitive,为了简化例程,我们在第一次实例化barrier或queue时创建ZooKeeper对象,并声明一个静态变量指向该对象。后续的Barrier/queue检查对象是否存在。另替代一种方案,我们也可以创建一个ZooKeeper对象,并将它传递给Barrier和Queue的构造方法。

我们用process()方法处理监听后收到的消息,监听(watch)是一种使Zookeeper能在节点更新时通知client的内建机制。例如一个client想监听其它client的Barrier,那么它可以设置监听并等待,指定节点一旦被更改就会收到通知,从而等待结束。

下面的例程来将加深这方面的印]。

Barrier实现

Barrier是能使一组进程能够在计算的开始和结束时同步的基本单元。基本的思想是让一个Barrier node担当所有进程的父node。假设/b1为父node,那么每一个进程p创建一个相应的node /b1/p ,当进程数足够时,就可以开始计算了。

构造方法

本例中,每个进程实例化一个Barrier对象,其构造函数会传入以下参数:

+ Server地址(e.g., "zoo1.foo.com:2181");
+ barrier节点路径(e.g., "/b1");
+ 组的大小

server地址会被传递给父类的构造方法创建一个ZooKeeper对象,接着Barrier的构造方法在ZooKeeper上创建Barrier node, 作为所有进程node的父node,我们称之为root(不同于zookeeper 的 root).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ZKBarrier(String address, String name, int size)
throws KeeperException, InterruptedException, IOException {
super(address);
this.root = name;
this.size = size;
// Create barrier node
if (zk != null) {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
// My node name
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
}

创建父node

进程可调用enter()进入Barrier,在父Node下创建名称为主机名的node代表自身,之后等待其它进程就绪,直到父node下的进程node数量达到预定值。为了能接收父node的更新信息,必须设置watch,这里是通过调用getChildren(),第一个参数为需要读取的node,第二个参数是布尔值flag用于设置watch,这里是true.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
******* Join barrier
*******
******* @return
******* @throws KeeperException
******* @throws InterruptedException
******* /
boolean enter() throws KeeperException, InterruptedException {
zk.create(root + "/" + name, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() < size) {
mutex.wait();
} else {
return true;
}
}
}
}

enter()抛出KeeperException和InterruptedException异常,所以程序需要负责捕获和处理这些异常。

当计算结束,进程调用leave()离开barrier。首先删除对应的进程node, 检查父node的children,如果不为0,那么就等待通知(之前已通过boolean true设置了watch)。收到通知后,再次确认子node数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* ****** Wait until all reach barrier
* ******
* ****** @return
* ****** @throws KeeperException
* ****** @throws InterruptedException
*******/
boolean leave() throws KeeperException, InterruptedException {
zk.delete(root + "/" + name, 0);
while (true) {
synchronized (mutex) {
ArrayList<String> list = (ArrayList<String>) zk.getChildren(root, true);
if (list.size() > 0) {
mutex.wait();
} else {
return true;
}
}
}
}

生产者-消费者队列(Producer-Consumer Queues)

以下简称队列(queu)。

队列是一个分布式的数据,一组进程用它来生成和消费item。生产者进程生成新元素加入队列,消费都进程从队列中取出元素并消费。本例中元素是简单的整数。队列用一个root node来表示,生产都在root node下加入子Node,代表将新元素加入队列。

Queue也是SyncPrimitive的子类。

Queue构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
******* Constructor of producer-consumer queue
*******
******* @param address
******* @param name
*******/
ZKQueue(String address, String name)
throws KeeperException, InterruptedException, IOException {
super(address);
this.root = name;
// Create ZK node name
if (zk != null) {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
}

向队列添加元素

生产者进程调用produce方法将元素加入队列,用create创建node,传入SEQUENCE Flag告知ZooKeeper追加root node的序列计数器,这样就保证了队列的先FIFO特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
******* Add element to the queue.
*******
******* @param i
******* @return
*******/
boolean produce(int i) throws KeeperException, InterruptedException {
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
// Add child with value i
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
return true;
}

取队列元素

消费者进程从root node读取计数最小的node,并返回node. 注意如果遇到冲突,那么两个consumer中有一个删除node和失败抛出异常。 getChildren()按字典顺序返回子node, 所以需要我们自己寻找counter最小的node.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* ****** Remove first element from the queue.
* ******
* ****** @return
* ****** @throws KeeperException
* ****** @throws InterruptedException
*******/
int consume() throws KeeperException, InterruptedException {
int retvalue = -1;
Stat stat = null;
// Get the first element available
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.isEmpty()) {
System.out.println("Going to wait");
mutex.wait();
} else {
Integer min = new Integer(list.get(0).substring(7));
for (String s : list) {
Integer tempValue = new Integer(s.substring(7));
if (tempValue < min) min = tempValue;
}
System.out.println("Temporary value: " + root + "/element" + min);
byte[] b = zk.getData(root + "/element" + min, false, stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
Contents
  1. 1. 基类
  2. 2. Barrier实现
    1. 2.1. 构造方法
    2. 2.2. 创建父node
  3. 3. 生产者-消费者队列(Producer-Consumer Queues)
    1. 3.1. Queue构造方法
    2. 3.2. 向队列添加元素
    3. 3.3. 取队列元素