分布式锁的几种实现:
1.zookeeper分布式锁,基于自增节点
2.Redis分布式锁,基于setnx命令,
基于Redis实现分布式锁:http://blog.csdn.net/daiyudong2020/article/details/51760648
官网:http://redis.io/topics/distlock
译文:http://www.oschina.net/translate/redis-distlock
3.memcache分布式锁,基于add函数
这几种方案在网上有很多技术文章,不重复叙述,需要的Google一下

</div><!--end: blogTitle 博客的标题和副标题 -->
<div >
		<div >

评论 - 558

	</div><!--end: blogStats -->
</div><!--end: navigator 博客导航栏 -->

基于ZooKeeper的分布式锁和队列

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe。

锁(Lock)

完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node)。

需要获得锁的客户端按照以下步骤来获取锁:

  1. 保证锁节点(lock root node)这个父根节点的存在,这个节点是每个要获取lock客户端共用的,这个节点是PERSISTENT的。
  2. 第一次需要创建本客户端要获取lock的节点,调用 create( ),并设置 节点为EPHEMERAL_SEQUENTIAL类型,表示该节点为临时的和顺序的。如果获取锁的节点挂掉,则该节点自动失效,可以让其他节点获取锁。

  3. 在父锁节点(lock root node)上调用 getChildren( ) ,不需要设置监视标志。 (为了避免“羊群效应”).

  4. 按照Fair竞争的原则,将步骤3中的子节点(要获取锁的节点)按照节点顺序的大小做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id
    是否就为owner id,如果是则返回,lock成功。如果不是则调用 exists( )监听比自己小的前一位的id,关注它锁释放的操作(也就是exist watch)。

  5. 如果第4步监听exist的watch被触发,则继续按4中的原则判断自己是否能获取到lock。

释放锁:需要释放锁的客户端只需要删除在第2步中创建的节点即可。

注意事项:

一个节点的删除只会导致一个客户端被唤醒,因为每个节点只被一个客户端watch,这避免了“羊群效应”。

一个分布式lock的实现:

基于ZooKeeper的分布式锁和队列
    





            

			基于ZooKeeper的分布式锁和队列
package org.apache.zookeeper.recipes.lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;

/**

  • A <a href="package.html">protocol to implement an exclusive
  • write lock or to elect a leader</a>. <p/> You invoke {@link #lock()} to
  • start the process of grabbing the lock; you may get the lock then or it may be
  • some time later. <p/> You can register a listener so that you are invoked
  • when you get the lock; otherwise you can ask if you have the lock
  • by calling {@link #isOwner()}

*/
public class WriteLock extends ProtocolSupport {
private static final Logger LOG = LoggerFactory.getLogger(WriteLock.class);

</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">final</span><span style="color: #000000;"> String dir;
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> String id;
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> ZNodeName idName;
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> String ownerId;
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> String lastChildId;
</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">byte</span>[] data = {0x12, 0x34<span style="color: #000000;">};
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> LockListener callback;
</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> LockZooKeeperOperation zop;

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * zookeeper contructor for writelock
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> zookeeper zookeeper client instance
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> dir the parent path you want to use for locking
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> acls the acls that you want to use for all the paths, 
 * if null world read/write is used.
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> WriteLock(ZooKeeper zookeeper, String dir, List&lt;ACL&gt;<span style="color: #000000;"> acl) {
    </span><span style="color: #0000ff;">super</span><span style="color: #000000;">(zookeeper);
    </span><span style="color: #0000ff;">this</span>.dir =<span style="color: #000000;"> dir;
    </span><span style="color: #0000ff;">if</span> (acl != <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
        setAcl(acl);
    }
    </span><span style="color: #0000ff;">this</span>.zop = <span style="color: #0000ff;">new</span><span style="color: #000000;"> LockZooKeeperOperation();
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * zookeeper contructor for writelock with callback
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> zookeeper the zookeeper client instance
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> dir the parent path you want to use for locking
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> acl the acls that you want to use for all the paths
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> callback the call back instance
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> WriteLock(ZooKeeper zookeeper, String dir, List&lt;ACL&gt;<span style="color: #000000;"> acl, 
        LockListener callback) {
    </span><span style="color: #0000ff;">this</span><span style="color: #000000;">(zookeeper, dir, acl);
    </span><span style="color: #0000ff;">this</span>.callback =<span style="color: #000000;"> callback;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * return the current locklistener
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> the locklistener
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span><span style="color: #000000;"> LockListener getLockListener() {
    </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">this</span><span style="color: #000000;">.callback;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * register a different call back listener
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> callback the call back instance
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> setLockListener(LockListener callback) {
    </span><span style="color: #0000ff;">this</span>.callback =<span style="color: #000000;"> callback;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Removes the lock or associated znode if 
 * you no longer require the lock. this also 
 * removes your request in the queue for locking
 * in case you do not already hold the lock.
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> RuntimeException throws a runtime exception
 * if it cannot connect to zookeeper.
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">synchronized</span> <span style="color: #0000ff;">void</span> unlock() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> RuntimeException {
    
    </span><span style="color: #0000ff;">if</span> (!isClosed() &amp;&amp; id != <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> we don't need to retry this operation in the case of failure
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> as ZK will remove ephemeral files and we don't wanna hang
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> this process when closing if we cannot reconnect to ZK</span>
        <span style="color: #0000ff;">try</span><span style="color: #000000;"> {
            ZooKeeperOperation zopdel </span>= <span style="color: #0000ff;">new</span><span style="color: #000000;"> ZooKeeperOperation() {
                </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">boolean</span> execute() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException,
                    InterruptedException {
                    zookeeper.delete(id, </span>-1<span style="color: #000000;">);   
                    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> Boolean.TRUE;
                }
            };
            zopdel.execute();
        } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (InterruptedException e) {
            LOG.warn(</span>"Caught: " +<span style="color: #000000;"> e, e);
            </span><span style="color: #008000;">//</span><span style="color: #008000;">set that we have been interrupted.</span>

Thread.currentThread().interrupt();
}
catch (KeeperException.NoNodeException e) {
// do nothing
} catch (KeeperException e) {
LOG.warn(
"Caught: " + e, e);
throw (RuntimeException) new RuntimeException(e.getMessage()).
initCause(e);
}
finally {
if (callback != null) {
callback.lockReleased();
}
id
= null;
}
}
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;"> 
 * the watcher called on  
 * getting watch while watching 
 * my predecessor
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">private</span> <span style="color: #0000ff;">class</span> LockWatcher <span style="color: #0000ff;">implements</span><span style="color: #000000;"> Watcher {
    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> process(WatchedEvent event) {
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> lets either become the leader or watch the new/updated node</span>
        LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +<span style="color: #000000;"> 
                event.getState() </span>+ " type " +<span style="color: #000000;"> event.getType());
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
            lock();
        } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (Exception e) {
            LOG.warn(</span>"Failed to acquire lock: " +<span style="color: #000000;"> e, e);
        }
    }
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * a zoookeeper operation that is mainly responsible
 * for all the magic required for locking.
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">private</span>  <span style="color: #0000ff;">class</span> LockZooKeeperOperation <span style="color: #0000ff;">implements</span><span style="color: #000000;"> ZooKeeperOperation {
    
    </span><span style="color: #008000;">/**</span><span style="color: #008000;"> find if we have been created earler if not create our node
     * 
     * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> prefix the prefix node
     * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> zookeeper teh zookeeper client
     * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> dir the dir paretn
     * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> KeeperException
     * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> InterruptedException
     </span><span style="color: #008000;">*/</span>
    <span style="color: #0000ff;">private</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) 
        </span><span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException {
        List</span>&lt;String&gt; names = zookeeper.getChildren(dir, <span style="color: #0000ff;">false</span><span style="color: #000000;">);
        </span><span style="color: #0000ff;">for</span><span style="color: #000000;"> (String name : names) {
            </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (name.startsWith(prefix)) {
                id </span>= dir + "/" +<span style="color: #000000;"> name;
                </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (LOG.isDebugEnabled()) {
                    LOG.debug(</span>"Found id created last time: " +<span style="color: #000000;"> id);
                }
                </span><span style="color: #0000ff;">break</span><span style="color: #000000;">;
            }
        }
        </span><span style="color: #0000ff;">if</span> (id == <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
            id </span>= zookeeper.create(dir + "/" +<span style="color: #000000;"> prefix, data, 
                    getAcl(), EPHEMERAL_SEQUENTIAL);

            </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (LOG.isDebugEnabled()) {
                LOG.debug(</span>"Created id: " +<span style="color: #000000;"> id);
            }
        }

    }
    
    </span><span style="color: #008000;">/**</span><span style="color: #008000;">
     * the command that is run and retried for actually 
     * obtaining the lock
     * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> if the command was successful or not
     </span><span style="color: #008000;">*/</span>
    <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">boolean</span> execute() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException {
        </span><span style="color: #0000ff;">do</span><span style="color: #000000;"> {
            </span><span style="color: #0000ff;">if</span> (id == <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
                </span><span style="color: #0000ff;">long</span> sessionId =<span style="color: #000000;"> zookeeper.getSessionId();
                String prefix </span>= "x-" + sessionId + "-"<span style="color: #000000;">;
                </span><span style="color: #008000;">//</span><span style="color: #008000;"> lets try look up the current ID if we failed 
                </span><span style="color: #008000;">//</span><span style="color: #008000;"> in the middle of creating the znode</span>

findPrefixInChildren(prefix, zookeeper, dir);
idName
= new ZNodeName(id);
}
if (id != null) {
List
<String> names = zookeeper.getChildren(dir, false);
if (names.isEmpty()) {
LOG.warn(
"No children in: " + dir + " when we've just " +
"created one! Lets recreate it...");
// lets force the recreation of the id
id = null;
}
else {
// lets sort them explicitly (though they do seem to come back in order ususally ????
SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
for (String name : names) {
sortedNames.add(
new ZNodeName(dir + "/" + name));
}
ownerId
= sortedNames.first().getName();
SortedSet
<ZNodeName> lessThanMe = sortedNames.headSet(idName);
if (!lessThanMe.isEmpty()) {
ZNodeName lastChildName
= lessThanMe.last();
lastChildId
= lastChildName.getName();
if (LOG.isDebugEnabled()) {
LOG.debug(
"watching less than me node: " + lastChildId);
}
Stat stat
= zookeeper.exists(lastChildId, new LockWatcher());
if (stat != null) {
return Boolean.FALSE;
}
else {
LOG.warn(
"Could not find the" +
" stats for less than me: " + lastChildName.getName());
}
}
else {
if (isOwner()) {
if (callback != null) {
callback.lockAcquired();
}
return Boolean.TRUE;
}
}
}
}
}
while (id == null);
return Boolean.FALSE;
}
};

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Attempts to acquire the exclusive write lock returning whether or not it was
 * acquired. Note that the exclusive lock may be acquired some time later after
 * this method has been invoked due to the current lock owner going away.
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">synchronized</span> <span style="color: #0000ff;">boolean</span> lock() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException {
    </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (isClosed()) {
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">false</span><span style="color: #000000;">;
    }
    ensurePathExists(dir);

    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> (Boolean) retryOperation(zop);
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * return the parent dir for lock
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> the parent dir used for locks.
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span><span style="color: #000000;"> String getDir() {
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> dir;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Returns true if this node is the owner of the
 *  lock (or the leader)
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">boolean</span><span style="color: #000000;"> isOwner() {
    </span><span style="color: #0000ff;">return</span> id != <span style="color: #0000ff;">null</span> &amp;&amp; ownerId != <span style="color: #0000ff;">null</span> &amp;&amp;<span style="color: #000000;"> id.equals(ownerId);
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * return the id for this lock
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> the id for this lock
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span><span style="color: #000000;"> String getId() {
   </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">this</span><span style="color: #000000;">.id;
}

}

基于ZooKeeper的分布式锁和队列
    





            

			基于ZooKeeper的分布式锁和队列

注意这里的lock,可能会失败,会尝试多次,每次失败后会Sleep一段时间。

PS:官方的代码有个小bug,id和ownerId应该都是全路径,即id = dir + "/" + name;原代码在findPrefixInChildren里有问题。

用于辅助节点大小顺序排序的类:

package org.apache.zookeeper.recipes.lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**

  • Represents an ephemeral znode name which has an ordered sequence number and
  • can be sorted in order

*/
class ZNodeName implements Comparable<ZNodeName> {
private final String name;
private String prefix;
private int sequence = -1;
private static final Logger LOG = LoggerFactory.getLogger(ZNodeName.class);

</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> ZNodeName(String name) {
    </span><span style="color: #0000ff;">if</span> (name == <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
        </span><span style="color: #0000ff;">throw</span> <span style="color: #0000ff;">new</span> NullPointerException("id cannot be null"<span style="color: #000000;">);
    }
    </span><span style="color: #0000ff;">this</span>.name =<span style="color: #000000;"> name;
    </span><span style="color: #0000ff;">this</span>.prefix =<span style="color: #000000;"> name;
    </span><span style="color: #0000ff;">int</span> idx = name.lastIndexOf('-'<span style="color: #000000;">);
    </span><span style="color: #0000ff;">if</span> (idx &gt;= 0<span style="color: #000000;">) {
        </span><span style="color: #0000ff;">this</span>.prefix = name.substring(0<span style="color: #000000;">, idx);
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
            </span><span style="color: #0000ff;">this</span>.sequence = Integer.parseInt(name.substring(idx + 1<span style="color: #000000;">));
            </span><span style="color: #008000;">//</span><span style="color: #008000;"> If an exception occurred we misdetected a sequence suffix,
            </span><span style="color: #008000;">//</span><span style="color: #008000;"> so return -1.</span>
        } <span style="color: #0000ff;">catch</span><span style="color: #000000;"> (NumberFormatException e) {
            LOG.info(</span>"Number format exception for " +<span style="color: #000000;"> idx, e);
        } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (ArrayIndexOutOfBoundsException e) {
            LOG.info(</span>"Array out of bounds for " +<span style="color: #000000;"> idx, e);
        }
    }
}

@Override
</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String toString() {
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> name.toString();
}

@Override
</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">boolean</span><span style="color: #000000;"> equals(Object o) {
    </span><span style="color: #0000ff;">if</span> (<span style="color: #0000ff;">this</span> ==<span style="color: #000000;"> o)
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">true</span><span style="color: #000000;">;
    </span><span style="color: #0000ff;">if</span> (o == <span style="color: #0000ff;">null</span> || getClass() !=<span style="color: #000000;"> o.getClass())
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">false</span><span style="color: #000000;">;

    ZNodeName sequence </span>=<span style="color: #000000;"> (ZNodeName) o;

    </span><span style="color: #0000ff;">if</span> (!<span style="color: #000000;">name.equals(sequence.name))
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">false</span><span style="color: #000000;">;

    </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">true</span><span style="color: #000000;">;
}

@Override
</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> hashCode() {
    </span><span style="color: #0000ff;">return</span> name.hashCode() + 37<span style="color: #000000;">;
}

</span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> compareTo(ZNodeName that) {
    </span><span style="color: #0000ff;">int</span> s1 = <span style="color: #0000ff;">this</span><span style="color: #000000;">.sequence;
    </span><span style="color: #0000ff;">int</span> s2 =<span style="color: #000000;"> that.sequence;
    </span><span style="color: #0000ff;">if</span> (s1 == -1 &amp;&amp; s2 == -1<span style="color: #000000;">) {
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">this</span><span style="color: #000000;">.name.compareTo(that.name);
    }
    </span><span style="color: #0000ff;">if</span> (s1 == -1<span style="color: #000000;">) {
        </span><span style="color: #0000ff;">return</span> -1<span style="color: #000000;">;
    } </span><span style="color: #0000ff;">else</span> <span style="color: #0000ff;">if</span> (s2 == -1<span style="color: #000000;">) {
        </span><span style="color: #0000ff;">return</span> 1<span style="color: #000000;">;
    } </span><span style="color: #0000ff;">else</span><span style="color: #000000;"> {
        </span><span style="color: #0000ff;">return</span> s1 -<span style="color: #000000;"> s2;
    }
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Returns the name of the znode
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span><span style="color: #000000;"> String getName() {
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> name;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Returns the sequence number
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> getZNodeName() {
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> sequence;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Returns the text prefix before the sequence number
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span><span style="color: #000000;"> String getPrefix() {
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> prefix;
}

}

View Code

PS:这个ZNodeName类是被我修改过的,官方的代码比较有问题,官方的先用了节点路径的前缀prefix比较,再去比较sequence序号是不对的,这样会导致sessionid小的总是能拿到锁。应该直接比较全局有序的sequence序号,小的先拿到锁,先到先得。

Zookeeper统一操作ZooKeeperOperation接口:

public interface ZooKeeperOperation {
</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Performs the operation - which may be involved multiple times if the connection
 * to ZooKeeper closes during this operation
 *
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> the result of the operation or null
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> KeeperException
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> InterruptedException
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">boolean</span> execute() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException;

}

View Code

因为Zookeeper的操作会失败,这个类封装了多次尝试:

/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.zookeeper.recipes.lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.recipes.lock.ZooKeeperOperation;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**

  • A base class for protocol implementations which provides a number of higher
  • level helper methods for working with ZooKeeper along with retrying synchronous
  • operations if the connection to ZooKeeper closes such as
  • {@link #retryOperation(ZooKeeperOperation)}

*/
class ProtocolSupport {
private static final Logger LOG = LoggerFactory.getLogger(ProtocolSupport.class);

</span><span style="color: #0000ff;">protected</span> <span style="color: #0000ff;">final</span><span style="color: #000000;"> ZooKeeper zookeeper;
</span><span style="color: #0000ff;">private</span> AtomicBoolean closed = <span style="color: #0000ff;">new</span> AtomicBoolean(<span style="color: #0000ff;">false</span><span style="color: #000000;">);
</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">long</span> retryDelay = 500L<span style="color: #000000;">;
</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">int</span> retryCount = 10<span style="color: #000000;">;
</span><span style="color: #0000ff;">private</span> List&lt;ACL&gt; acl =<span style="color: #000000;"> ZooDefs.Ids.OPEN_ACL_UNSAFE;

</span><span style="color: #0000ff;">public</span><span style="color: #000000;"> ProtocolSupport(ZooKeeper zookeeper) {
    </span><span style="color: #0000ff;">this</span>.zookeeper =<span style="color: #000000;"> zookeeper;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Closes this strategy and releases any ZooKeeper resources; but keeps the
 *  ZooKeeper instance open
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> close() {
    </span><span style="color: #0000ff;">if</span> (closed.compareAndSet(<span style="color: #0000ff;">false</span>, <span style="color: #0000ff;">true</span><span style="color: #000000;">)) {
        doClose();
    }
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * return zookeeper client instance
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> zookeeper client instance
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span><span style="color: #000000;"> ZooKeeper getZookeeper() {
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> zookeeper;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * return the acl its using
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> the acl.
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> List&lt;ACL&gt;<span style="color: #000000;"> getAcl() {
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> acl;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * set the acl 
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> acl the acl to set to
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span> setAcl(List&lt;ACL&gt;<span style="color: #000000;"> acl) {
    </span><span style="color: #0000ff;">this</span>.acl =<span style="color: #000000;"> acl;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * get the retry delay in milliseconds
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> the retry delay
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">long</span><span style="color: #000000;"> getRetryDelay() {
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> retryDelay;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Sets the time waited between retry delays
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> retryDelay the retry delay
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span> setRetryDelay(<span style="color: #0000ff;">long</span><span style="color: #000000;"> retryDelay) {
    </span><span style="color: #0000ff;">this</span>.retryDelay =<span style="color: #000000;"> retryDelay;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Allow derived classes to perform 
 * some custom closing operations to release resources
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">protected</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> doClose() {
}


</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Perform the given operation, retrying if the connection fails
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> object. it needs to be cast to the callee's expected 
 * return type.
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">protected</span><span style="color: #000000;"> Object retryOperation(ZooKeeperOperation operation) 
    </span><span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException {
    KeeperException exception </span>= <span style="color: #0000ff;">null</span><span style="color: #000000;">;
    </span><span style="color: #0000ff;">for</span> (<span style="color: #0000ff;">int</span> i = 0; i &lt; retryCount; i++<span style="color: #000000;">) {
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
            </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> operation.execute();
        } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (KeeperException.SessionExpiredException e) {
            LOG.warn(</span>"Session expired for: " + zookeeper + " so reconnecting due to: " +<span style="color: #000000;"> e, e);
            </span><span style="color: #0000ff;">throw</span><span style="color: #000000;"> e;
        } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (KeeperException.ConnectionLossException e) {
            </span><span style="color: #0000ff;">if</span> (exception == <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
                exception </span>=<span style="color: #000000;"> e;
            }
            LOG.debug(</span>"Attempt " + i + " failed with connection loss so " +
                    "attempting to reconnect: " +<span style="color: #000000;"> e, e);
            retryDelay(i);
        }
    }
    </span><span style="color: #0000ff;">throw</span><span style="color: #000000;"> exception;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Ensures that the given path exists with no data, the current
 * ACL and no flags
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> path
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">protected</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> ensurePathExists(String path) {
    ensureExists(path, </span><span style="color: #0000ff;">null</span><span style="color: #000000;">, acl, CreateMode.PERSISTENT);
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Ensures that the given path exists with the given data, ACL and flags
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> path
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> acl
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> flags
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">protected</span> <span style="color: #0000ff;">void</span> ensureExists(<span style="color: #0000ff;">final</span> String path, <span style="color: #0000ff;">final</span> <span style="color: #0000ff;">byte</span><span style="color: #000000;">[] data,
        </span><span style="color: #0000ff;">final</span> List&lt;ACL&gt; acl, <span style="color: #0000ff;">final</span><span style="color: #000000;"> CreateMode flags) {
    </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
        retryOperation(</span><span style="color: #0000ff;">new</span><span style="color: #000000;"> ZooKeeperOperation() {
            </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">boolean</span> execute() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException {
                Stat stat </span>= zookeeper.exists(path, <span style="color: #0000ff;">false</span><span style="color: #000000;">);
                </span><span style="color: #0000ff;">if</span> (stat != <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
                    </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">true</span><span style="color: #000000;">;
                }
                zookeeper.create(path, data, acl, flags);
                </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">true</span><span style="color: #000000;">;
            }
        });
    } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (KeeperException e) {
        LOG.warn(</span>"Caught: " +<span style="color: #000000;"> e, e);
    } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (InterruptedException e) {
        LOG.warn(</span>"Caught: " +<span style="color: #000000;"> e, e);
    }
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Returns true if this protocol has been closed
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> true if this protocol is closed
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">protected</span> <span style="color: #0000ff;">boolean</span><span style="color: #000000;"> isClosed() {
    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> closed.get();
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Performs a retry delay if this is not the first attempt
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> attemptCount the number of the attempts performed so far
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">protected</span> <span style="color: #0000ff;">void</span> retryDelay(<span style="color: #0000ff;">int</span><span style="color: #000000;"> attemptCount) {
    </span><span style="color: #0000ff;">if</span> (attemptCount &gt; 0<span style="color: #000000;">) {
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
            Thread.sleep(attemptCount </span>*<span style="color: #000000;"> retryDelay);
        } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (InterruptedException e) {
            LOG.debug(</span>"Failed to sleep: " +<span style="color: #000000;"> e, e);
        }
    }
}

}

View Code

这个类是本客户端获取到lock和释放lock的时候触发操作的接口:

public interface LockListener {
    /**
     * call back called when the lock 
     * is acquired
     */
    public void lockAcquired();
</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * call back called when the lock is 
 * released.
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> lockReleased();

}

View Code

队列(Queue)

分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。

实现步骤基本如下:

前提:需要一个队列root节点dir

入队:使用create()创建节点,将共享数据data放在该节点上,节点类型为PERSISTENT_SEQUENTIAL,永久顺序性的(也可以设置为临时的,看需求)。

出队:因为队列可能为空,2种方式处理:一种如果为空则wait等待,一种返回异常。

等待方式:这里使用了CountDownLatch的等待和Watcher的通知机制,使用了TreeMap的排序获取节点顺序最小的数据(FIFO)。

抛出异常:getChildren()获取队列数据时,如果size==0则抛出异常。

一个分布式Queue的实现,详细代码:

package org.apache.zookeeper.recipes.queue;

import java.util.List;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

/**
*

  • A <a href="package.html">protocol to implement a distributed queue</a>.

*/
public class DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);

</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">final</span><span style="color: #000000;"> String dir;

</span><span style="color: #0000ff;">private</span><span style="color: #000000;"> ZooKeeper zookeeper;
</span><span style="color: #0000ff;">private</span> List&lt;ACL&gt; acl =<span style="color: #000000;"> ZooDefs.Ids.OPEN_ACL_UNSAFE;

</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">final</span> String prefix = "qn-"<span style="color: #000000;">;

</span><span style="color: #0000ff;">public</span> DistributedQueue(ZooKeeper zookeeper, String dir, List&lt;ACL&gt;<span style="color: #000000;"> acl){
    </span><span style="color: #0000ff;">this</span>.dir =<span style="color: #000000;"> dir;

    </span><span style="color: #0000ff;">if</span>(acl != <span style="color: #0000ff;">null</span><span style="color: #000000;">){
        </span><span style="color: #0000ff;">this</span>.acl =<span style="color: #000000;"> acl;
    }
    </span><span style="color: #0000ff;">this</span>.zookeeper =<span style="color: #000000;"> zookeeper;
    
    </span><span style="color: #008000;">//</span><span style="color: #008000;">Add root dir first if not exists</span>
    <span style="color: #0000ff;">if</span> (zookeeper != <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
            Stat s </span>= zookeeper.exists(dir, <span style="color: #0000ff;">false</span><span style="color: #000000;">);
            </span><span style="color: #0000ff;">if</span> (s == <span style="color: #0000ff;">null</span><span style="color: #000000;">) {
                zookeeper.create(dir, </span><span style="color: #0000ff;">new</span> <span style="color: #0000ff;">byte</span>[0<span style="color: #000000;">], acl, CreateMode.PERSISTENT);
            }
        } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (KeeperException e) {
            LOG.error(e.toString());
        } </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (InterruptedException e) {
            LOG.error(e.toString());
        }
    }
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Returns a Map of the children, ordered by id.
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> watcher optional watcher on getChildren() operation.
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> map from id to child name for all children
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">private</span> TreeMap&lt;Long,String&gt; orderedChildren(Watcher watcher) <span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException {
    TreeMap</span>&lt;Long,String&gt; orderedChildren = <span style="color: #0000ff;">new</span> TreeMap&lt;Long,String&gt;<span style="color: #000000;">();

    List</span>&lt;String&gt; childNames = <span style="color: #0000ff;">null</span><span style="color: #000000;">;
    </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
        childNames </span>=<span style="color: #000000;"> zookeeper.getChildren(dir, watcher);
    }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> (KeeperException.NoNodeException e){
        </span><span style="color: #0000ff;">throw</span><span style="color: #000000;"> e;
    }

    </span><span style="color: #0000ff;">for</span><span style="color: #000000;">(String childName : childNames){
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
            </span><span style="color: #008000;">//</span><span style="color: #008000;">Check format</span>
            <span style="color: #0000ff;">if</span>(!childName.regionMatches(0, prefix, 0<span style="color: #000000;">, prefix.length())){
                LOG.warn(</span>"Found child node with improper name: " +<span style="color: #000000;"> childName);
                </span><span style="color: #0000ff;">continue</span><span style="color: #000000;">;
            }
            String suffix </span>=<span style="color: #000000;"> childName.substring(prefix.length());
            Long childId </span>= <span style="color: #0000ff;">new</span><span style="color: #000000;"> Long(suffix);
            orderedChildren.put(childId,childName);
        }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(NumberFormatException e){
            LOG.warn(</span>"Found child node with improper format : " + childName + " " +<span style="color: #000000;"> e,e);
        }
    }

    </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> orderedChildren;
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Find the smallest child node.
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> The name of the smallest child node.
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">private</span> String smallestChildName() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException {
    </span><span style="color: #0000ff;">long</span> minId =<span style="color: #000000;"> Long.MAX_VALUE;
    String minName </span>= ""<span style="color: #000000;">;

    List</span>&lt;String&gt; childNames = <span style="color: #0000ff;">null</span><span style="color: #000000;">;

    </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
        childNames </span>= zookeeper.getChildren(dir, <span style="color: #0000ff;">false</span><span style="color: #000000;">);
    }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(KeeperException.NoNodeException e){
        LOG.warn(</span>"Caught: " +<span style="color: #000000;">e,e);
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">null</span><span style="color: #000000;">;
    }

    </span><span style="color: #0000ff;">for</span><span style="color: #000000;">(String childName : childNames){
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
            </span><span style="color: #008000;">//</span><span style="color: #008000;">Check format</span>
            <span style="color: #0000ff;">if</span>(!childName.regionMatches(0, prefix, 0<span style="color: #000000;">, prefix.length())){
                LOG.warn(</span>"Found child node with improper name: " +<span style="color: #000000;"> childName);
                </span><span style="color: #0000ff;">continue</span><span style="color: #000000;">;
            }
            String suffix </span>=<span style="color: #000000;"> childName.substring(prefix.length());
            </span><span style="color: #0000ff;">long</span> childId =<span style="color: #000000;"> Long.parseLong(suffix);
            </span><span style="color: #0000ff;">if</span>(childId &lt;<span style="color: #000000;"> minId){
                minId </span>=<span style="color: #000000;"> childId;
                minName </span>=<span style="color: #000000;"> childName;
            }
        }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(NumberFormatException e){
            LOG.warn(</span>"Found child node with improper format : " + childName + " " +<span style="color: #000000;"> e,e);
        }
    }

    </span><span style="color: #0000ff;">if</span>(minId &lt;<span style="color: #000000;"> Long.MAX_VALUE){
        </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> minName;
    }</span><span style="color: #0000ff;">else</span><span style="color: #000000;">{
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">null</span><span style="color: #000000;">;
    }
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Return the head of the queue without modifying the queue.
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> the data at the head of the queue.
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> NoSuchElementException
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> KeeperException
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> InterruptedException
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">byte</span>[] element() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> NoSuchElementException, KeeperException, InterruptedException {
    TreeMap</span>&lt;Long,String&gt;<span style="color: #000000;"> orderedChildren;

    </span><span style="color: #008000;">//</span><span style="color: #008000;"> element, take, and remove follow the same pattern.
    </span><span style="color: #008000;">//</span><span style="color: #008000;"> We want to return the child node with the smallest sequence number.
    </span><span style="color: #008000;">//</span><span style="color: #008000;"> Since other clients are remove()ing and take()ing nodes concurrently, 
    </span><span style="color: #008000;">//</span><span style="color: #008000;"> the child with the smallest sequence number in orderedChildren might be gone by the time we check.
    </span><span style="color: #008000;">//</span><span style="color: #008000;"> We don't call getChildren again until we have tried the rest of the nodes in sequence order.</span>
    <span style="color: #0000ff;">while</span>(<span style="color: #0000ff;">true</span><span style="color: #000000;">){
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
            orderedChildren </span>= orderedChildren(<span style="color: #0000ff;">null</span><span style="color: #000000;">);
        }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(KeeperException.NoNodeException e){
            </span><span style="color: #0000ff;">throw</span> <span style="color: #0000ff;">new</span><span style="color: #000000;"> NoSuchElementException();
        }
        </span><span style="color: #0000ff;">if</span>(orderedChildren.size() == 0 ) <span style="color: #0000ff;">throw</span> <span style="color: #0000ff;">new</span><span style="color: #000000;"> NoSuchElementException();

        </span><span style="color: #0000ff;">for</span><span style="color: #000000;">(String headNode : orderedChildren.values()){
            </span><span style="color: #0000ff;">if</span>(headNode != <span style="color: #0000ff;">null</span><span style="color: #000000;">){
                </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
                    </span><span style="color: #0000ff;">return</span> zookeeper.getData(dir+"/"+headNode, <span style="color: #0000ff;">false</span>, <span style="color: #0000ff;">null</span><span style="color: #000000;">);
                }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(KeeperException.NoNodeException e){
                    </span><span style="color: #008000;">//</span><span style="color: #008000;">Another client removed the node first, try next</span>

}
}
}

    }
}


</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Attempts to remove the head of the queue and return it.
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> The former head of the queue
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> NoSuchElementException
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> KeeperException
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> InterruptedException
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">byte</span>[] remove() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> NoSuchElementException, KeeperException, InterruptedException {
    TreeMap</span>&lt;Long,String&gt;<span style="color: #000000;"> orderedChildren;
    </span><span style="color: #008000;">//</span><span style="color: #008000;"> Same as for element.  Should refactor this.</span>
    <span style="color: #0000ff;">while</span>(<span style="color: #0000ff;">true</span><span style="color: #000000;">){
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
            orderedChildren </span>= orderedChildren(<span style="color: #0000ff;">null</span><span style="color: #000000;">);
        }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(KeeperException.NoNodeException e){
            </span><span style="color: #0000ff;">throw</span> <span style="color: #0000ff;">new</span><span style="color: #000000;"> NoSuchElementException();
        }
        </span><span style="color: #0000ff;">if</span>(orderedChildren.size() == 0) <span style="color: #0000ff;">throw</span> <span style="color: #0000ff;">new</span><span style="color: #000000;"> NoSuchElementException();

        </span><span style="color: #0000ff;">for</span><span style="color: #000000;">(String headNode : orderedChildren.values()){
            String path </span>= dir +"/"+<span style="color: #000000;">headNode;
            </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
                </span><span style="color: #0000ff;">byte</span>[] data = zookeeper.getData(path, <span style="color: #0000ff;">false</span>, <span style="color: #0000ff;">null</span><span style="color: #000000;">);
                zookeeper.delete(path, </span>-1<span style="color: #000000;">);
                </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> data;
            }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(KeeperException.NoNodeException e){
                </span><span style="color: #008000;">//</span><span style="color: #008000;"> Another client deleted the node first.</span>

}
}
}
}

</span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">class</span> LatchChildWatcher <span style="color: #0000ff;">implements</span><span style="color: #000000;"> Watcher {

    CountDownLatch latch;

    </span><span style="color: #0000ff;">public</span><span style="color: #000000;"> LatchChildWatcher(){
        latch </span>= <span style="color: #0000ff;">new</span> CountDownLatch(1<span style="color: #000000;">);
    }

    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> process(WatchedEvent event){
        LOG.debug(</span>"Watcher fired on path: " + event.getPath() + " state: " +<span style="color: #000000;"> 
                event.getState() </span>+ " type " +<span style="color: #000000;"> event.getType());
        latch.countDown();
    }
    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span> await() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> InterruptedException {
        latch.await();
    }
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Removes the head of the queue and returns it, blocks until it succeeds.
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> The former head of the queue
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> NoSuchElementException
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> KeeperException
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> InterruptedException
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">byte</span>[] take() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException {
    TreeMap</span>&lt;Long,String&gt;<span style="color: #000000;"> orderedChildren;
    </span><span style="color: #008000;">//</span><span style="color: #008000;"> Same as for element.  Should refactor this.</span>
    <span style="color: #0000ff;">while</span>(<span style="color: #0000ff;">true</span><span style="color: #000000;">){
        LatchChildWatcher childWatcher </span>= <span style="color: #0000ff;">new</span><span style="color: #000000;"> LatchChildWatcher();
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
            orderedChildren </span>=<span style="color: #000000;"> orderedChildren(childWatcher);
        }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(KeeperException.NoNodeException e){
            zookeeper.create(dir, </span><span style="color: #0000ff;">new</span> <span style="color: #0000ff;">byte</span>[0<span style="color: #000000;">], acl, CreateMode.PERSISTENT);
            </span><span style="color: #0000ff;">continue</span><span style="color: #000000;">;
        }
        </span><span style="color: #0000ff;">if</span>(orderedChildren.size() == 0<span style="color: #000000;">){
            childWatcher.await();
            </span><span style="color: #0000ff;">continue</span><span style="color: #000000;">;
        }

        </span><span style="color: #0000ff;">for</span><span style="color: #000000;">(String headNode : orderedChildren.values()){
            String path </span>= dir +"/"+<span style="color: #000000;">headNode;
            </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
                </span><span style="color: #0000ff;">byte</span>[] data = zookeeper.getData(path, <span style="color: #0000ff;">false</span>, <span style="color: #0000ff;">null</span><span style="color: #000000;">);
                zookeeper.delete(path, </span>-1<span style="color: #000000;">);
                </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> data;
            }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(KeeperException.NoNodeException e){
                </span><span style="color: #008000;">//</span><span style="color: #008000;"> Another client deleted the node first.</span>

}
}
}
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Inserts data into queue.
 * </span><span style="color: #808080;">@param</span><span style="color: #008000;"> data
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> true if data was successfully added
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">boolean</span> offer(<span style="color: #0000ff;">byte</span>[] data) <span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException{
    </span><span style="color: #0000ff;">for</span><span style="color: #000000;">(;;){
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
            zookeeper.create(dir</span>+"/"+<span style="color: #000000;">prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
            </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">true</span><span style="color: #000000;">;
        }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(KeeperException.NoNodeException e){
            zookeeper.create(dir, </span><span style="color: #0000ff;">new</span> <span style="color: #0000ff;">byte</span>[0<span style="color: #000000;">], acl, CreateMode.PERSISTENT);
        }
    }
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Returns the data at the first element of the queue, or null if the queue is empty.
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> data at the first element of the queue, or null.
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> KeeperException
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> InterruptedException
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">byte</span>[] peek() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException{
    </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
        </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> element();
    }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(NoSuchElementException e){
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">null</span><span style="color: #000000;">;
    }
}

</span><span style="color: #008000;">/**</span><span style="color: #008000;">
 * Attempts to remove the head of the queue and return it. Returns null if the queue is empty.
 * </span><span style="color: #808080;">@return</span><span style="color: #008000;"> Head of the queue or null.
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> KeeperException
 * </span><span style="color: #808080;">@throws</span><span style="color: #008000;"> InterruptedException
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">byte</span>[] poll() <span style="color: #0000ff;">throws</span><span style="color: #000000;"> KeeperException, InterruptedException {
    </span><span style="color: #0000ff;">try</span><span style="color: #000000;">{
        </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> remove();
    }</span><span style="color: #0000ff;">catch</span><span style="color: #000000;">(NoSuchElementException e){
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">null</span><span style="color: #000000;">;
    }
}

}

View Code

Apache Curator

Curator是一个封装Zookeeper操作的库,使用这个库的好处是Curator帮你管理和Zookeeper的连接,当连接有问题时会自动重试(retry)。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();

Curator已经封装了一些常用的Recipes

Distributed Lock

基于ZooKeeper的分布式锁和队列
    





            

			基于ZooKeeper的分布式锁和队列
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) ) 
{
    try 
    {
        // do some work inside of the critical section here
    }
    finally
    {
        lock.release();
    }
}
基于ZooKeeper的分布式锁和队列
    





            

			基于ZooKeeper的分布式锁和队列

Leader Election

基于ZooKeeper的分布式锁和队列
    





            

			基于ZooKeeper的分布式锁和队列
LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
{
    public void takeLeadership(CuratorFramework client) throws Exception
    {
        // this callback will get called when you are the leader
        // do whatever leader work you need to and only exit
        // this method when you want to relinquish leadership
    }
}

LeaderSelector selector = new LeaderSelector(client, path, listener);
selector.autoRequeue();
// not required, but this is behavior that you will probably expect
selector.start(); 

基于ZooKeeper的分布式锁和队列
    





            

			基于ZooKeeper的分布式锁和队列

 

参考:

http://zookeeper.apache.org/doc/trunk/recipes.html

http://curator.apache.org/curator-recipes/index.html

 

作者:阿凡卢
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
	</div>
	<div class="postDesc">posted @ <span >收藏</a></div>
</div>
<script type="text/javascript">var allowComments=true,cb_blogId=124362,cb_entryId=4889764,cb_blogApp=currentBlogApp,cb_blogUserGuid='1211fb30-2ddd-e111-aa3f-842b2b196315',cb_entryCreatedDate='2015/10/18 16:52:00';loadViewCount(cb_entryId);</script>
最新知识库文章:
技术的正宗与野路子
» 更多知识库文章...
历史上的今天:
2013-10-18 C# IP地址与整数之间的转换
</div><!--end: forFlow -->
</div><!--end: mainContent 主体内容容器-->

<div >
	<div >

公告

+加关注
		<div >
<tbody><tr><td colspan="7"><table class="CalTitle" cellspacing="0">
	<tbody><tr><td class="CalNextPrev"><a href="javascript:void(0);" onclick="loadBlogCalendar('2016/12/01');return false;">&lt;</a></td><td align="center">2017年1月</td><td class="CalNextPrev" align="right"><a href="javascript:void(0);" onclick="loadBlogCalendar('2017/02/01');return false;">&gt;</a></td></tr>
</tbody></table></td></tr><tr><th class="CalDayHeader" align="center" abbr="日" scope="col">日</th><th class="CalDayHeader" align="center" abbr="一" scope="col">一</th><th class="CalDayHeader" align="center" abbr="二" scope="col">二</th><th class="CalDayHeader" align="center" abbr="三" scope="col">三</th><th class="CalDayHeader" align="center" abbr="四" scope="col">四</th><th class="CalDayHeader" align="center" abbr="五" scope="col">五</th><th class="CalDayHeader" align="center" abbr="六" scope="col">六</th></tr><tr><td class="CalOtherMonthDay" align="center">25</td><td class="CalOtherMonthDay" align="center">26</td><td class="CalOtherMonthDay" align="center">27</td><td class="CalOtherMonthDay" align="center">28</td><td class="CalOtherMonthDay" align="center">29</td><td class="CalOtherMonthDay" align="center">30</td><td class="CalOtherMonthDay" align="center">31</td></tr><tr><td class="CalWeekendDay" align="center">1</td><td align="center">2</td><td align="center">3</td><td align="center">4</td><td align="center">5</td><td align="center">6</td><td class="CalWeekendDay" align="center">7</td></tr><tr><td class="CalWeekendDay" align="center">8</td><td align="center">9</td><td align="center">10</td><td align="center">11</td><td align="center">12</td><td align="center">13</td><td class="CalWeekendDay" align="center">14</td></tr><tr><td class="CalWeekendDay" align="center">15</td><td class="CalTodayDay" align="center">16</td><td align="center">17</td><td align="center">18</td><td align="center">19</td><td align="center">20</td><td class="CalWeekendDay" align="center">21</td></tr><tr><td class="CalWeekendDay" align="center">22</td><td align="center">23</td><td align="center">24</td><td align="center">25</td><td align="center">26</td><td align="center">27</td><td class="CalWeekendDay" align="center">28</td></tr><tr><td class="CalWeekendDay" align="center">29</td><td align="center">30</td><td align="center">31</td><td class="CalOtherMonthDay" align="center">1</td><td class="CalOtherMonthDay" align="center">2</td><td class="CalOtherMonthDay" align="center">3</td><td class="CalOtherMonthDay" align="center">4</td></tr>
		<div >
			<div >

搜索

最新评论

<div ><ul>
    <li class="recent_comment_title"><a href="http://www.cnblogs.com/luxiaoxun/p/4161782.html#3603827">1. Re:C#自定义工业控件开发</a></li>
    <li class="recent_comment_body">很不错,最近在研究组态软件的开发,博主,能否给点建议,看到请回复下</li>
    <li class="recent_comment_author">--YaoShuangQisBlogs</li>
    <li class="recent_comment_title"><a href="http://www.cnblogs.com/luxiaoxun/p/4454880.html#3603778">2. Re:百度谷歌离线地图解决方案(离线地图下载)</a></li>
    <li class="recent_comment_body">@chdyc百度不行,坐标不一致。...</li>
    <li class="recent_comment_author">--阿凡卢</li>
    <li class="recent_comment_title"><a href="http://www.cnblogs.com/luxiaoxun/p/4454880.html#3603503">3. Re:百度谷歌离线地图解决方案(离线地图下载)</a></li>
    <li class="recent_comment_body">请问这个软件能下载能用于百度地图api 的地图瓦片么 我试了一下 下载百度的地图的瓦片 不行啊</li>
    <li class="recent_comment_author">--chdyc</li>
    <li class="recent_comment_title"><a href="http://www.cnblogs.com/luxiaoxun/p/5022333.html#3603094">4. Re:Web GIS离线解决方案</a></li>
    <li class="recent_comment_body">@Dominic Xu那里面没有加百度的,用那个下载器下了百度的也没用,用真实的坐标转换不到下载器里的百度地图的坐标。...</li>
    <li class="recent_comment_author">--阿凡卢</li>
    <li class="recent_comment_title"><a href="http://www.cnblogs.com/luxiaoxun/p/4454880.html#3603088">5. Re:百度谷歌离线地图解决方案(离线地图下载)</a></li>
    <li class="recent_comment_body">@Dominic Xu这篇文章里有...</li>
    <li class="recent_comment_author">--阿凡卢</li>

相关文章: