这一节中我们演示用ZooKeeper创建一个简单的barrier和producer-consumer队列。当然首先你必须有至少一个ZooKeeper Server在运行。
基类
barrier和queue都将用到以下共同代码,构造方法会创建到Server的连接。
注意:官网教程没有提到,这个基类是要实现Watcher接口的,这样process方法才能作为回调方法收到消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| static ZooKeeper zk = null; static final Object mutex = new Object(); String root; SyncPrimitive(String address) throws KeeperException, IOException { if(zk == null){ System.out.println("Starting ZK:"); zk = new ZooKeeper(address, 3000, (Watcher) this); System.out.println("Finished starting ZK: " + zk); } } public void process(WatcherEvent event) { synchronized (mutex) { mutex.notify(); } } public void process(WatchedEvent watchedEvent) { synchronized (mutex) { mutex.notify(); } }
|
WatchEvent包含的信息有:
1
| WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testRoot1
|
两个类都继承SyncPrimitive,为了简化例程,我们在第一次实例化barrier或queue时创建ZooKeeper对象,并声明一个静态变量指向该对象。后续的Barrier/queue检查对象是否存在。另替代一种方案,我们也可以创建一个ZooKeeper对象,并将它传递给Barrier和Queue的构造方法。
我们用process()方法处理监听后收到的消息,监听(watch)是一种使Zookeeper能在节点更新时通知client的内建机制。例如一个client想监听其它client的Barrier,那么它可以设置监听并等待,指定节点一旦被更改就会收到通知,从而等待结束。
下面的例程来将加深这方面的印]。
Barrier实现
Barrier是能使一组进程能够在计算的开始和结束时同步的基本单元。基本的思想是让一个Barrier node担当所有进程的父node。假设/b1为父node,那么每一个进程p创建一个相应的node /b1/p ,当进程数足够时,就可以开始计算了。
构造方法
本例中,每个进程实例化一个Barrier对象,其构造函数会传入以下参数:
+ Server地址(e.g., "zoo1.foo.com:2181");
+ barrier节点路径(e.g., "/b1");
+ 组的大小
server地址会被传递给父类的构造方法创建一个ZooKeeper对象,接着Barrier的构造方法在ZooKeeper上创建Barrier node, 作为所有进程node的父node,我们称之为root(不同于zookeeper 的 root).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ZKBarrier(String address, String name, int size) throws KeeperException, InterruptedException, IOException { super(address); this.root = name; this.size = size; // Create barrier node if (zk != null) { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } } // My node name name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString()); }
|
创建父node
进程可调用enter()进入Barrier,在父Node下创建名称为主机名的node代表自身,之后等待其它进程就绪,直到父node下的进程node数量达到预定值。为了能接收父node的更新信息,必须设置watch,这里是通过调用getChildren()
,第一个参数为需要读取的node,第二个参数是布尔值flag用于设置watch,这里是true.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| /** ******* Join barrier ******* ******* @return ******* @throws KeeperException ******* @throws InterruptedException ******* / boolean enter() throws KeeperException, InterruptedException { zk.create(root + "/" + name, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); } else { return true; } } } }
|
enter()
抛出KeeperException和InterruptedException异常,所以程序需要负责捕获和处理这些异常。
当计算结束,进程调用leave()
离开barrier。首先删除对应的进程node, 检查父node的children,如果不为0,那么就等待通知(之前已通过boolean true设置了watch)。收到通知后,再次确认子node数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| /** * ****** Wait until all reach barrier * ****** * ****** @return * ****** @throws KeeperException * ****** @throws InterruptedException *******/ boolean leave() throws KeeperException, InterruptedException { zk.delete(root + "/" + name, 0); while (true) { synchronized (mutex) { ArrayList<String> list = (ArrayList<String>) zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } }
|
生产者-消费者队列(Producer-Consumer Queues)
以下简称队列(queu)。
队列是一个分布式的数据,一组进程用它来生成和消费item。生产者进程生成新元素加入队列,消费都进程从队列中取出元素并消费。本例中元素是简单的整数。队列用一个root node来表示,生产都在root node下加入子Node,代表将新元素加入队列。
Queue也是SyncPrimitive的子类。
Queue构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| /** ******* Constructor of producer-consumer queue ******* ******* @param address ******* @param name *******/ ZKQueue(String address, String name) throws KeeperException, InterruptedException, IOException { super(address); this.root = name; // Create ZK node name if (zk != null) { Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } } }
|
向队列添加元素
生产者进程调用produce方法将元素加入队列,用create创建node,传入SEQUENCE Flag告知ZooKeeper追加root node的序列计数器,这样就保证了队列的先FIFO特性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| /** ******* Add element to the queue. ******* ******* @param i ******* @return *******/ boolean produce(int i) throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; // Add child with value i b.putInt(i); value = b.array(); zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); return true; }
|
取队列元素
消费者进程从root node读取计数最小的node,并返回node. 注意如果遇到冲突,那么两个consumer中有一个删除node和失败抛出异常。 getChildren()按字典顺序返回子node, 所以需要我们自己寻找counter最小的node.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| /** * ****** Remove first element from the queue. * ****** * ****** @return * ****** @throws KeeperException * ****** @throws InterruptedException *******/ int consume() throws KeeperException, InterruptedException { int retvalue = -1; Stat stat = null; // Get the first element available while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.isEmpty()) { System.out.println("Going to wait"); mutex.wait(); } else { Integer min = new Integer(list.get(0).substring(7)); for (String s : list) { Integer tempValue = new Integer(s.substring(7)); if (tempValue < min) min = tempValue; } System.out.println("Temporary value: " + root + "/element" + min); byte[] b = zk.getData(root + "/element" + min, false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } }
|