三级毛片免费看

全站资源开放下载,感谢广大网友的支持
链接失效请移步职业司平台
非盈利平台

非盈利平台

只为分享一些优质内容

Java帮帮-微信公众号

Java帮帮-微信公众号

将分享做到极致

微信小程序

微信小程序

更方便的阅读

职业司微信公众号

职业司微信公众号

实时动态通知

安卓APP

安卓APP

我们从此不分开

程序员生活志-公众号

程序员生活志-公众号

程序员生活学习圈,互联网八卦黑料

从构建分布式秒杀系统聊聊分布式锁

8
发表时间:2018-11-08 11:59来源:Java帮帮-微信公众号


前言

最近懒成一坨屎,学不动系列一波接一波,大多还都是底层原理相关的。上周末抽时间重读了周志明大湿的 JVM 高效并发部分,每读一遍都有不同的感悟。路漫漫,借此,把前段时间搞着玩的秒杀案例中的分布式锁深入了解一下。

案例介绍

在尝试了解分布式锁之前,大家可以想象一下,什么场景下会使用分布式锁?


单机应用架构中,秒杀案例使用ReentrantLcok或者synchronized来达到秒杀商品互斥的目的。然而在分布式系统中,会存在多台机器并行去实现同一个功能。也就是说,在多进程中,如果还使用以上JDK提供的进程锁,来并发访问数据库资源就可能会出现商品超卖的情况。因此,需要我们来实现自己的分布式锁。

实现一个分布式锁应该具备的特性:

  • 高可用、高性能的获取锁与释放锁

  • 在分布式系统环境下,一个方法或者变量同一时间只能被一个线程操作

  • 具备锁失效机制,网络中断或宕机无法释放锁时,锁必须被删除,防止死锁

  • 具备阻塞锁特性,即没有获取到锁,则继续等待获取锁

  • 具备非阻塞锁特性,即没有获取到锁,则直接返回获取锁失败

  • 具备可重入特性,一个线程中可以多次获取同一把锁,比如一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则该线程可以直接执行调用的方法,而无需重新获得锁

在之前的秒杀案例中,我们曾介绍过关于分布式锁几种实现方式:

  • 基于数据库实现分布式锁

  • 基于 Redis 实现分布式锁

  • 基于 Zookeeper 实现分布式锁

前两种对于分布式生产环境来说并不是特别推荐,高并发下数据库锁性能太差,Redis在锁时间限制和缓存一致性存在一定问题。这里我们重点介绍一下 Zookeeper 如何实现分布式锁。

实现原理

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能存在唯一文件名。


数据模型

  • PERSISTENT 持久化节点,节点创建后,不会因为会话失效而消失

  • EPHEMERAL 临时节点, 客户端session超时此类节点就会被自动删除

  • EPHEMERAL_SEQUENTIAL 临时自动编号节点

  • PERSISTENT_SEQUENTIAL 顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1

监视器(watcher)

当创建一个节点时,可以注册一个该节点的监视器,当节点状态发生改变时,watch被触发时,ZooKeeper将会向客户端发送且仅发送一条通知,因为watch只能被触发一次。

根据zookeeper的这些特性,我们来看看如何利用这些特性来实现分布式锁:

  • 创建一个锁目录lock

  • 线程A获取锁会在lock目录下,创建临时顺序节点

  • 获取锁目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁

  • 线程B创建临时节点并获取所有兄弟节点,判断自己不是最小节点,设置监听(watcher)比自己次小的节点(只关注比自己次小的节点是为了防止发生“羊群效应”)

  • 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是最小的节点,获得锁

代码分析

尽管ZooKeeper已经封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。但是如果让一个普通开发者去手撸一个分布式锁还是比较困难的,在秒杀案例中我们直接使用 Apache 开源的curator 开实现 Zookeeper 分布式锁。

这里我们使用以下版本,截止目前最新版4.0.1:

<!-- zookeeper 分布式锁、注意zookeeper版本  这里对应的是3.4.6--><dependency>    <groupId>org.apache.curator</groupId>    <artifactId>curator-recipes</artifactId>    <version>2.10.0</version></dependency>

首先,我们看下InterProcessLock接口中的几个方法:

/**  * 获取锁、阻塞等待、可重入  */public void acquire() throws Exception;/**  * 获取锁、阻塞等待、可重入、超时则获取失败  */public boolean acquire(long time, TimeUnit unit) throws Exception;/**  * 释放锁  */public void release() throws Exception;/**  * Returns true if the mutex is acquired by a thread in this JVM  */boolean isAcquiredInThisProcess();

获取锁:

//获取锁publicvoidacquire()throws Exception    {            if ( !internalLock(-1, null) )            {                thrownew IOException("Lost connection while trying to acquire lock: " + basePath);            }    }

privatebooleaninternalLock(long time, TimeUnit unit)throws Exception    {        /*         实现同一个线程可重入性,如果当前线程已经获得锁,         则增加锁数据中lockCount的数量(重入次数),直接返回成功        *///获取当前线程            Thread currentThread = Thread.currentThread();            //获取当前线程重入锁相关数据            LockData lockData = threadData.get(currentThread);            if ( lockData != null )        {                //原子递增一个当前值,记录重入次数,后面锁释放会用到                lockData.lockCount.incrementAndGet();                returntrue;           }            //尝试连接zookeeper获取锁            String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());            if ( lockPath != null )        {                //创建可重入锁数据,用于记录当前线程重入次数                LockData newLockData = new LockData(currentThread, lockPath);                threadData.put(currentThread, newLockData);                returntrue;            }           //获取锁超时或者zk通信异常返回失败    returnfalse;    }

Zookeeper获取锁实现:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes)throws Exception    {            //获取当前时间戳finallong      startMillis = System.currentTimeMillis();        //如果unit不为空(非阻塞锁),把当前传入time转为毫秒final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;        //子节点标识finalbyte[]    localLockNodeBytes = (revocable.get() != null) ? newbyte[0] : lockNodeBytes;        //尝试次数int             retryCount = 0;        String          ourPath = null;        boolean         hasTheLock = false;        boolean         isDone = false;        //自旋锁,循环获取锁while ( !isDone )        {                isDone = true;                try            {                //在锁节点下创建临时且有序的子节点,例如:_c_008c1b07-d577-4e5f-8699-8f0f98a013b4-lock-000000001                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);                //如果当前子节点序号最小,获得锁则直接返回,否则阻塞等待前一个子节点删除通知(release释放锁)                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);            }            catch (    KeeperException.NoNodeException e )            {               //异常处理,如果找不到节点,这可能发生在session过期等时,因此,如果重试允许,只需重试一次即可if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {                    isDone = false;                  }                    else                    {                        throw e;                    }               }           }        //如果获取锁则返回当前锁子节点路径if ( hasTheLock )        {            return ourPath;        }       returnnull;    }

privatebooleaninternalLockLoop(long startMillis, Long millisToWait, String ourPath)throws Exception    {        boolean     haveTheLock = false;        boolean     doDelete = false;        try        {            if ( revocable.get() != null )            {               client.getData().usingWatcher(revocableWatcher).forPath(ourPath);            }            //自旋获取锁while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )            {                //获取所有子节点集合               List<String>   children = getSortedChildren();                //判断当前子节点是否为最小子节点                String    sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);                //如果是最小节点则获取锁if ( predicateResults.getsTheLock() ){                   haveTheLock = true;                }else{                    //获取前一个节点,用于监听                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();                    synchronized(this){                        try{                            //这里使用getData()接口而不是checkExists()是因为,如果前一个子节点已经被删除了那么会抛出异常而且不会设置事件监听器,而checkExists虽然也可以获取到节点是否存在的信息但是同时设置了监听器,这个监听器其实永远不会触发,对于Zookeeper来说属于资源泄露                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);                            if ( millisToWait != null ) {                                millisToWait -= (System.currentTimeMillis() - startMillis);                                startMillis = System.currentTimeMillis();                                //如果设置了获取锁等待时间if ( millisToWait <= 0 ){                                    doDelete = true;    // 超时则删除子节点break;                                }                                //等待超时时间                                wait(millisToWait);                            } else{                                wait();//一直等待}                        } catch ( KeeperException.NoNodeException e ){                            // it has been deleted (i.e. lock released). Try to acquire again//如果前一个子节点已经被删除则deException,只需要自旋获取一次即可                        }                    }                }            }        }        catch ( Exception e )        {            ThreadUtils.checkInterrupted(e);            doDelete = true;            throw e;        }        finally{            if ( doDelete ){                deleteOurPath(ourPath);//获取锁超时则删除节点            }        }        return haveTheLock;    }

释放锁:

publicvoidrelease()throws Exception    {        Thread currentThread = Thread.currentThread();        LockData lockData = threadData.get(currentThread);        //没有获取锁,你释放个球球,如果为空抛出异常if ( lockData == null )        {            thrownew IllegalMonitorStateException("You do not own the lock: " + basePath);        }        //获取重入数量int newLockCount = lockData.lockCount.decrementAndGet();        //如果重入锁次数大于0,直接返回if ( newLockCount > 0 )        {            return;        }        //如果重入锁次数小于0,抛出异常if ( newLockCount < 0 )        {            thrownew IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);        }        try        {            //释放锁            internals.releaseLock(lockData.lockPath);        }        finally        {            //移除当前线程锁数据            threadData.remove(currentThread);        }    }

测试案例

为了更好的理解其原理和代码分析中获取锁的过程,这里我们实现一个简单的Demo:

/**  * 基于curator的zookeeper分布式锁  */publicclassCuratorUtil{    privatestatic String address = "192.168.1.180:2181";        publicstaticvoidmain(String[] args){       //1、重试策略:初试时间为1s 重试3次        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);         //2、通过工厂创建连接        CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);        //3、开启连接        client.start();        //4 分布式锁final InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");         //读写锁//InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter");                ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);                for (int i = 0; i < 5; i++) {            fixedThreadPool.submit(new Runnable() {                @Overridepublicvoidrun(){                    boolean flag = false;                    try {                        //尝试获取锁,最多等待5秒                        flag = mutex.acquire(5, TimeUnit.SECONDS);                        Thread currentThread = Thread.currentThread();                        if(flag){                            System.out.println("线程"+currentThread.getId()+"获取锁成功");                        }else{                            System.out.println("线程"+currentThread.getId()+"获取锁失败");                        }                        //模拟业务逻辑,延时4秒                        Thread.sleep(4000);                    } catch (Exception e) {                        e.printStackTrace();                    } finally{                        if(flag){                            try {                                mutex.release();                            } catch (Exception e) {                                e.printStackTrace();                            }                        }                    }                }            });        }    }}

这里我们开启5个线程,每个线程获取锁的最大等待时间为5秒,为了模拟具体业务场景,方法中设置4秒等待时间。开始执行main方法,通过ZooInspector监控/curator/lock下的节点如下图:

对,没错,设置4秒的业务处理时长就是为了观察生成了几个顺序节点。果然如案例中所述,每个线程都会生成一个节点并且还是有序的。

观察控制台,我们会发现只有两个线程获取锁成功,另外三个线程超时获取锁失败会自动删除节点。线程执行完毕我们刷新一下/curator/lock节点,发现刚才创建的五个子节点已经不存在了。

小结

通过分析第三方开源工具实现的分布式锁方式,收获还是满满的。学习本身就是一个由浅入深的过程,从如何调用API,到理解其代码逻辑实现,想要更深入可以去挖掘Zookeeper的核心算法ZAB协议。


Java帮帮学习群生态

Java帮帮学习群生态

总有一款能帮到你

Java学习群

Java学习群

与大牛一起交流

大数据学习群

大数据学习群

在数据中成长

九点编程学习群

九点编程学习群

深夜九点学编程

python学习群

python学习群

人工智能,爬虫

测试学习群

测试学习群

感受测试的魅力

Java帮帮生态承诺

Java帮帮生态承诺

一直坚守,不负重望

初心
勤俭
诚信
正义
分享
友链交换:加帮主QQ2524138991 留言即可 24小时内答复  
业司
教育资讯
会员登录
获取验证码
登录
登录
我的资料
留言
回到顶部