Contents
  1. 1. 一个简单的Watch客户端
    1. 1.1. Requirements
    2. 1.2. 程序设计
  2. 2. Executor类
    1. 2.1. 接口的定义
    2. 2.2. 接口在Executor中的实现
  3. 3. DataMonitor类
  4. 4. 运行效果
    1. 4.1. 输入
    2. 4.2. 输出
  5. 5. 代码分析
  6. 6. 附完整源码
    1. 6.1. Executor
    2. 6.2. DataMonitor

  之前小节中已经写过快速入门的例程,但在进一步了解了更多的概念之后,需要更多的实践来加深印象。总之理论学习和实践穿插进行在我看来是一种不错的方式,重要的是在参照教程编写代码的同时不要只求快速跑通,而应该与之前的看过的原理结合起来。

一个简单的Watch客户端

  watch一个znode,用启动和停止特定程序的方式作出回应。

Requirements

  1. 接收的参数:

    服务地址
    需要watch的znode
    可执行程序+参数

  2. 从zonde中取得数据并运行程序
  3. 如果znode变化,用新的数据重启程序
  4. 如果znode删除,终止程序

程序设计

  通常ZooKeeper应用分解为两部分,一部分Executor用于维护连接,另一部分DataMonitor监控数据。(注:还记得上一节中讲到的事件线程和io线程吗?)同时,executor包含主线程和执行逻辑,它负责用户交互以及与可执行程序的交互。这可执行程序是通过参数传递进来的,它根据znode的更改而启停。

Executor类

  例程的主体,包含ZooKeeper对象和DataMonitor。Executor把自己作为Watcher传递进ZooKeeper的构造方法,为此Executor必须实现Watcher接口。

1
2
3
4
5
6
7
8
9
10
public class ZKExecutor
implements Watcher, Runnable, DataMonitor.DataMonitorListener {
……
public ZKExecutor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}

  Watcher接口由ZooKeeper定义,ZooKeeper通过这个接口回信给它的容器,接口只包含一个方法process(),通过它通信主线程可能感兴趣的事件,例如连接和会话的状态改变等。本例中Executor只是简单地将事件下发给DataMonitor,由它决定事件的处理。这样做主要是为了展示惯例的做法,即拥有ZooKeeper连接的Executor可以自由地将事件以新的形式(可重新封装) 委派/代理给其它对象。

1
2
3
public void process(WatchedEvent watchedEvent) {
dm.process(watchedEvent);
}

  至于DataMonitorListener接口则完全是为这个例程自定义的,与zooKeeper无关,DataMonitor通过它来回信给它的容器(Executor)。接口在DataMonitor中定义,在Executor中实现,方法由DataMonitor调用,但业务逻辑由Executor来决定。

接口的定义

1
2
3
4
5
6
public interface DataMonitorListener {
void exists(byte data[]);
void closing(int rc);
}

接口在Executor中的实现

1
2
3
4
5
public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}

  总之,这就是调用与回调:Execuotr将事件代理给DataMonitor,DataMonitor处理完自己的任务(监控数据)后通过回调接口将结果返回给上层容器Executor继续后续业务(管理可执行程序)。

DataMonitor类

  DataMonitor类比较能体现ZooKeeper的逻辑,异步与事件驱动。DataMonitor通过zk.exists(znode, true, this, null)启动业务,因为所有的处理都是在收到事件后开始的。

不要把完成回调和watch回调混淆,ZooKeeper.exists()的完成回调是DataMonitor对StatCallback.processResult()接口的实现,它在异步操作ZooKeeper.exists()完成后调用。而watch事件触发后,将被发送给Executor对象,因为在建立连接时作为Watcher传递给了ZooKeeper。

1
2
3
4
5
6
7
8
9
public class DataMonitor implements Watcher, StatCallback {
……
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
……

  exists操作返回后,processResult被调用,它确认操作完成情况,读取znode数据,最后回调Executor。

运行效果

  初始状态下,节点不存在,使用cli模式来创建和修改指定node,Executor收到watch事件后,将调用echo命令打印znode数据。

输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[zk: 127.0.0.1:2181(CONNECTED) 17] create /queueRoot1 createNode
Created /queueRoot1
[zk: 127.0.0.1:2181(CONNECTED) 18] set /queueRoot1 setData1
cZxid = 0x3ac
ctime = Thu May 21 03:15:58 CST 2015
mZxid = 0x3ad
mtime = Thu May 21 03:16:20 CST 2015
pZxid = 0x3ac
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0
[zk: 127.0.0.1:2181(CONNECTED) 19] set /queueRoot1 setData2
……
[zk: 127.0.0.1:2181(CONNECTED) 20] delete /queueRoot1

  setData的输出比较多,上两节中我们介绍过stat structure,这里是一一对应的。例如cZxid < mZxid , mtime和ctime都是时钟时间,有三种version号,node非瞬时故ephemeralOwner = 0x0。

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ java -jar target/zkMonitor-1.0-SNAPSHOT.jar dell:2181 /queueRoot1 log echo running program... data=
Picked up JAVA_TOOL_OPTIONS: -javaagent:/usr/share/java/jayatanaag.jar
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. ##启动后停留于此
Starting child #创建node后开始运行
running program... data= createNode
Stopping child #第一次修改node
Starting child
running program... data= setData1
Stopping child #第二次修改node
Starting child
running program... data= setData2
Killing process #删除node

代码分析

  System.arraycopy(args, 3, exec, 0, exec.length)可用于数组Copy.

  整个的流程如下所示:

  实际上有两个线程在运行,一个为实现了Runnable的Executor,另一个为ZooKeeper,Executor自启动后就进行了Wait状态,直到znode被删除,DataMonitor回调Executor的closing方法它才会继续运行,此时如果dm.dead被置位,程序结束。

附完整源码

Executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package io.github.azyet;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
/**
* Hello world!
*/
public class ZKExecutor
implements Watcher, Runnable, DataMonitor.DataMonitorListener {
String znode;
DataMonitor dm;
ZooKeeper zk;
String filename;
String exec[];
Process child;
public ZKExecutor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}
/**
* @param args
*/
public static void main(String[] args) {
if (args.length < 4) {
System.err
.println("USAGE: Executor hostPort znode filename program [args ...]");
System.exit(2);
}
String hostPort = args[0];
String znode = args[1];
String filename = args[2];
String exec[] = new String[args.length - 3];
System.arraycopy(args, 3, exec, 0, exec.length);
try {
new ZKExecutor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}
}
/***************************************************************************
* We do process any events ourselves, we just need to forward them on.
*/
public void process(WatchedEvent watchedEvent) {
dm.process(watchedEvent);
}
public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}
public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}
static class StreamWriter extends Thread {
OutputStream os;
InputStream is;
StreamWriter(InputStream is, OutputStream os) {
this.is = is;
this.os = os;
start();
}
public void run() {
byte b[] = new byte[80];
int rc;
try {
while ((rc = is.read(b)) > 0) {
os.write(b, 0, rc);
}
} catch (IOException e) {
}
}
}
public void exists(byte[] data) {
if (data == null) {
if (child != null) {
System.out.println("Killing process");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
}
}
child = null;
} else {
if (child != null) {
System.out.println("Stopping child");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
FileOutputStream fos = new FileOutputStream(filename);
fos.write(data);
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
System.out.println("Starting child");
String[] cmd = new String[exec.length + 1];
System.arraycopy(exec, 0, cmd, 0, exec.length);
cmd[exec.length] = new String(data);
child = Runtime.getRuntime().exec(cmd);
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

DataMonitor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package io.github.azyet;
/**
* Created by chen on 15-5-21.
* <p>
* A simple class that monitors the data and existence of a ZooKeeper
* node. It uses asynchronous ZooKeeper APIs.
*/
/**
* A simple class that monitors the data and existence of a ZooKeeper
* node. It uses asynchronous ZooKeeper APIs.
*/
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.Arrays;
public class DataMonitor implements Watcher, StatCallback {
ZooKeeper zk;
String znode;
Watcher chainedWatcher;
boolean dead;
DataMonitorListener listener;
byte prevData[];
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
// Get things started by checking if the node exists. We are going
// to be completely event driven
zk.exists(znode, true, this, null);
}
/**
* Other classes use the DataMonitor by implementing this method
*/
public interface DataMonitorListener {
/**
* The existence status of the node has changed.
*/
void exists(byte data[]);
/**
* The ZooKeeper session is no longer valid.
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
}
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}
byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.exists(b);
prevData = b;
}
}
}
Contents
  1. 1. 一个简单的Watch客户端
    1. 1.1. Requirements
    2. 1.2. 程序设计
  2. 2. Executor类
    1. 2.1. 接口的定义
    2. 2.2. 接口在Executor中的实现
  3. 3. DataMonitor类
  4. 4. 运行效果
    1. 4.1. 输入
    2. 4.2. 输出
  5. 5. 代码分析
  6. 6. 附完整源码
    1. 6.1. Executor
    2. 6.2. DataMonitor