Curator是如何利用Zookeeper实现分布式锁的——InterProcessMutex源码分析


分布式锁

分布式锁就是在不同的进程甚至不同的机器上也能限制资源互斥访问的锁。分布式锁一般使用独立于应用的其它组件来实现,比如Redis、Zookeeper。

Zookeeper中有临时有序节点,这让使用Zookeeper来进行分布式公平锁开发更加方便:

  1. 临时节点会在创建它的节点下线时被销毁,所以,不用担心持有分布式锁的节点宕机导致锁永远不被释放
  2. 顺序节点可以用来保存加锁请求到来的先后顺序,这可以用来实现公平锁

下面是在zkCli中创建顺序临时节点的例子,Zookeeper会针对每个节点维护一个计数器,这个计数器用来记录它底下的有序子节点的序号,每一个创建的有序子节点在后面都会加上这个十位的序号。

[zk: localhost:2181(CONNECTED) 6] create /test data
Created /test
[zk: localhost:2181(CONNECTED) 7] create -s -e /test/lock-1- data
Created /test/lock-1-0000000000
[zk: localhost:2181(CONNECTED) 8] create -s -e /test/lock-1- data
Created /test/lock-1-0000000001
[zk: localhost:2181(CONNECTED) 9] create -s -e /test/lock-2- data
Created /test/lock-2-0000000002
[zk: localhost:2181(CONNECTED) 10] ls /test
[lock-2-0000000002, lock-1-0000000001, lock-1-0000000000]

InterProcessMutex编写分布式锁的一个示例分析

InterProcessMutex,是Curator中提供的一个进程间互斥量,它将使用ZooKeeper的顺序临时节点来实现分布式锁。InterProcessMutex是一个锁!是一个锁!是一个锁!一定要把它想成是一个锁。

client = CuratorFactory.createLocal();
client.start();
// 创建一个分布式锁 lockPath是锁的路径
mutex = new InterProcessMutex(client, lockPath);

try {
    // 获取分布式锁
    mutex.acquire();
    if (mutex.isOwnedByCurrentThread()) {
        System.out.println("Thread " + Thread.currentThread().getId() + " own the dlock!");
        Thread.sleep(3000);
        System.out.println("Thread " + Thread.currentThread().getId() + " release the dlock!");
        // 释放分布式锁
        mutex.release();
    }
} catch (Exception e) {
    e.printStackTrace();
}

我们创建十个线程去不断的循环执行这段代码块(释放锁后再获取),看看Zookeeper里啥样:

[zk: localhost:2181(CONNECTED) 4] ls /ipmlock2
[_c_84da3060-4aac-42fb-b090-6596224cdeb3-lock-0000000003, _c_98c1f513-761b-448b-94eb-262f2ab8e742-lock-0000000008, _c_32834e46-7cfc-4964-8ff1-f4be8047876b-lock-0000000009, _c_01efe906-c8b6-413b-804a-68a4473c1838-lock-0000000000, _c_3a822fc1-490f-4b6e-8252-b882b9ff866d-lock-0000000001, _c_d70ee803-20b0-4fa5-ba14-b603d6dc05bd-lock-0000000002, _c_b348eb4b-d58f-4f3e-bddc-09c56ac2b382-lock-0000000005, _c_552cb9f7-2b28-44c7-bf4d-599ae3ef510b-lock-0000000007, _c_f69d4ec7-6a03-40a7-80c9-658269e0e5a7-lock-0000000004, _c_c9d91ee1-d873-446c-a985-6f664f83fd33-lock-0000000006]

十个名字奇奇怪怪的节点被创建出来了,它们的序号是0~9,也就是说,Zookeeper刚刚创建了十个临时顺序节点。

过几秒钟再看,0~3已经没了,多了10~13,这意味着在刚刚的一段时间内,有四个线程释放了锁并重新获取了锁,并且,顺序节点越先被创建(序号越小)就越早持有锁。

[zk: localhost:2181(CONNECTED) 5] ls /ipmlock2
[_c_98c1f513-761b-448b-94eb-262f2ab8e742-lock-0000000008, _c_32834e46-7cfc-4964-8ff1-f4be8047876b-lock-0000000009, _c_25c05f71-75a8-48fd-a5fe-0a7b1527780a-lock-0000000013, _c_4a6b805c-e181-4e1b-9291-51212db8c6ae-lock-0000000010, _c_d67f8e0b-5e47-4cf5-a8ba-7a9a14ab42eb-lock-0000000012, _c_b348eb4b-d58f-4f3e-bddc-09c56ac2b382-lock-0000000005, _c_552cb9f7-2b28-44c7-bf4d-599ae3ef510b-lock-0000000007, _c_f69d4ec7-6a03-40a7-80c9-658269e0e5a7-lock-0000000004, _c_c9d91ee1-d873-446c-a985-6f664f83fd33-lock-0000000006, _c_604fc4bf-13a6-4ed3-9161-5f2d51b70eaa-lock-0000000011]

InterProcessMutex源码分析

所以,我们可以大胆猜测,当你调用InterProcessMutex.acquire时,只不过是向锁路径下添加了一个临时有序节点,而当你调用InterProcessMutex.release时,这个节点就会被删除,这个其实不用猜,肯定就是这样实现的,这是实现锁的过程中最简单的部分,重要的是,如何实现互斥?

对,锁最重要的就是保护临界区,让多个线程对临界区的访问互斥,上面我们的猜测内容中只有关于InterProcessMutex如何操作Zookeeper的,却没有任何关于如何实现互斥的猜测,光靠Zookeeper的特性好像不足以实现互斥,当然我对Zookeeper的了解甚少,就顺序节点我还是现查的,InterProcessMutex很可能是在Java层面实现的互斥,比如:

  1. InterProcessMutex不断watch锁路径,监控里面子节点的变化,总是第一时间获取到新节点
  2. 获取锁的时候,先将一个临时顺序节点放到锁路径中,然后根据返回的path,截取后面的序号
  3. 如果当前序号是最小的,当前线程就获取到锁了
  4. 否则,锁正在被其它线程持有,当前线程需要阻塞

为了方便,后面我们称InterProcessMutex为IPM

猜是这么猜的,我们来实际通过源码看看,下面是acquire方法:

/**
 * 申请互斥量 - 阻塞,直到它可用。
 * 注意:一个线程可以重入的调用acquire,每一个acquire调用必须有一个对应的release调用
 *
 * @throws Exception ZK errors, connection interruptions
 */
@Override
public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

acquire中调用了一个internalLock,从方法名中来看,就是一个在IPM内部实现锁定的方法:

private boolean internalLock(long time, TimeUnit unit) throws Exception
{

    Thread currentThread = Thread.currentThread();

    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }

    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

我们从代码中大体可以猜出,一个IPM可以由多个线程操作,对于每一个线程,IPM都维护了一个LockData,存到一个Map<Thread, LockData>中,由于一个ThreadData只会被一个线程操作,所以这个方法中没用什么同步机制。下面是LockData的结构:

final Thread owningThread; // 拥有者线程
final String lockPath;     // 锁路径
// 当前锁了几次
final AtomicInteger lockCount = new AtomicInteger(1);

回到internalLock,它根据当前线程是否有一个LockData对象,判断了当前线程是否已经有accquire的申请,如果有就简单的将它的lockCount增1,否则新建lockData,这就是可重入锁的简单实现。

重点是这一行代码:

String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());

调用一个internals成员的attemptLock方法,从名字来判断就是尝试获取锁,返回的lockPath应该是加锁之后的路径,也就是带有顺序序号的路径。不过现在只能靠猜,我也无法确定,因为Curator中的非API接口方法是很少有注释文档的。

我们先来看看internals是个啥,它是一个LockInternals类型的成员:

private final LockInternals internals;

它是从哪里传过来的呢?我们关注一下构造方法:

InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
    basePath = PathUtils.validatePath(path);
    internals = new LockInternals(client, driver, path, lockName, maxLeases);
}

上面是一个非公有的构造方法,也就是说不给我们用,internals在这里被构建。如果再这样追查下去,可能会越来越陷入难以理解的逻辑当中,所以我决定回到刚刚internalLock里的重点代码行:

String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());

点进去这个attemptLock方法:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    final long      startMillis = System.currentTimeMillis();
    final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int             retryCount = 0;

    String          ourPath = null;
    boolean         hasTheLock = false;
    boolean         isDone = false;
    while ( !isDone )
    {
        isDone = true;

        try
        {
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // gets thrown by StandardLockInternalsDriver when it can't find the lock node
            // this can happen when the session expires, etc. So, if the retry allows, just try it all again
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
            {
                isDone = false;
            }
            else
            {
                throw e;
            }
        }
    }

    if ( hasTheLock )
    {
        return ourPath;
    }

    return null;
}

乍一看,代码贼多,根本无从下手。阅读源代码时遇到这种情况我们需要过滤掉一些看起来和我们需要的逻辑关系不大的代码,如果你手边有个文本编辑器,你可以把那些行删除,就像这样:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{

    try {
        ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
        hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
    }
    catch ( KeeperException.NoNodeException e ) {}

    if ( hasTheLock ) {
        return ourPath;
    }

    return null;
}

好了,这下简单多了,首先调用了driver.createsTheLock,然后调用了一个什么internalLockLoop,最后判断了一下是否已经锁定了,如果是的话返回createsTheLock返回的路径,也就是我们所估计的带序号的顺序临时节点路径。

所以,可以猜测,createsTheLock肯定是用于创建顺序临时锁节点的,而internalLockLoop返回了一个hasTheLock,应该是用某种循环机制来在当前线程没有获得锁之前阻塞的。

点进去driver.createsTheLock,是一个接口:

public interface LockInternalsDriver extends LockInternalsSorter
{
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception;

    public String createsTheLock(CuratorFramework client,  String path, byte[] lockNodeBytes) throws Exception;
}

所以,我们要知道实现类从哪来,是什么。回到IPM的构造方法中:

public InterProcessMutex(CuratorFramework client, String path)
{
    this(client, path, new StandardLockInternalsDriver());
}

public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
{
    this(client, path, LOCK_NAME, 1, driver);
}

答案已经很明了了,在我们没有指定LockInternalsDriver的情况下,会使用StandardLockInternalsDriver,所以我们需要到StandardLockInternalsDriver中查看createsTheLock方法究竟写了啥:

public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
    String ourPath;
    if ( lockNodeBytes != null )
    {
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
    }
    else
    {
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    }
    return ourPath;
}

这个if分支里面基本是一样的逻辑,区别是一个有锁节点的字节,一个没有,所以我们看一个就行。这里确实就是创建顺序临时节点而已,并把创建后的节点路径返回,这也证实了确实interalLock中最终接到的path确实是带有顺序序号的path。

下面我们再看attemptLock中的第二个调用internalLockLoop,这个代码更长并且没啥好简化的:

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{

    // 判断是否尚未获得锁,若未获得锁,继续循环
    while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
    {
        // 获取排序后的children,这里盲猜就是获取当前锁路径下的所有有序临时节点,并把它们按照后面的序号排序
        // 篇幅原因,这个代码的跟踪我就不写出来了,不过也不复杂。如果找不到相关的代码,另一个思路就是在这里打断点
        List<String>        children = getSortedChildren();
        // 这里是得到当前节点的序号
        String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

        // 这里用全部临时有序节点和当前节点的序号判断,有了这些就能判断出当前节点是否已经持有锁了
        // maxLeases应该是最多能允许几个线程同时获得锁,这个在IPM里写死了,是1
        PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
        // 如果已经持有锁了,haveTheLock=true
        if ( predicateResults.getsTheLock() )
        {
            haveTheLock = true;
        }
        else
        {
            // 为了确保你不一下接触太多代码,else分支里,也就是尚未获得锁的逻辑先省略
        }
    }
    return haveTheLock;
}

下面我们看看driver.getsTheLock干了啥,我们已经知道driverStandardLockInternalsDriver了,所以直接进去看getsTheLock就行:

public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
    // 查看我们的序号在所有有序节点中的位置
    int             ourIndex = children.indexOf(sequenceNodeName);
    validateOurIndex(sequenceNodeName, ourIndex);

    // 如果我们的位置小于最小可获得锁的线程数,说明我们还没获取到锁
    boolean         getsTheLock = ourIndex < maxLeases;
    // pathToWatch即我们需要等待(它释放锁)的最小序号的节点路径(我不理解的是在maxLeases>1的情况下,万一前面的序号先结束了咋办)
    String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

    return new PredicateResults(pathToWatch, getsTheLock);
}

所以,很明了,getsTheLock方法基于我们的序号,判断了我们当前是否获得了锁,并且返回了我们需要等待的最小节点路径。那么上面代码中,未获得锁的else分支中的逻辑也明确了,就是观察这个节点,等它被删除。

PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
    haveTheLock = true;
}
else
{
    // 获取需要等待的前一个节点的路径,也就是`getsTheLock`中计算出来的那个节点
    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

    synchronized(this)
    {
        try
        {

            // 监视这个节点
            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
            // 如果是超时锁,判断是否超时,如果超时就把当前锁作废,否则wait挂起当前线程
            if ( millisToWait != null )
            {
                millisToWait -= (System.currentTimeMillis() - startMillis);
                startMillis = System.currentTimeMillis();
                if ( millisToWait <= 0 )
                {
                    doDelete = true;    // timed out - delete our node
                    break;
                }

                wait(millisToWait);
            }
            else
            {
                wait();
            }
        }
        catch ( KeeperException.NoNodeException e ){}
        // ... 省略一些代码 ...

EMMM,你既然把这个线程挂起了,那么等它可以持有锁的时候,一定要notify通知它,否则它将一直挂起。我们猜测这个notify是前一个锁节点的Watcher调用的,所以我们看看这个watcher到底是啥。

private final Watcher watcher = new Watcher()
{
    @Override
    public void process(WatchedEvent event)
    {
        client.postSafeNotify(LockInternals.this);
    }
};

从这里我们可以看出,这个Watcher的实现很简单,没有判断事件是什么,直接调用client.postSafeNotify,官方文档里说该方法就是调用给定对象的notifyAll方法,而之前用于挂起线程的wait,也是在LockInternals.this对象上调用的,这样就连上了。

关于为啥这里没有判断WatchedEvent的类型,我也不知道,大概是在里,这个事件只能是节点被删除的事件吧,或者是什么其它原因。

所以,现在加锁的实现细节已经完全明确:

  1. 向Zookeeper中添加顺序临时节点
  2. 根据顺序节点的序号,判断当前线程是否已经获取了锁(序号排在第一位)
  3. 如果没获取到锁就挂起当前线程,并且监视正在持有锁的节点,在它发生变动时唤醒被挂起的线程
  4. 否则加锁成功

原创文章,作者:kepupublish,如若转载,请注明出处:https://blog.ytso.com/273279.html

(0)
上一篇 2022年7月10日
下一篇 2022年7月10日

相关推荐

发表回复

登录后才能评论