全站資源開放下載,感謝廣大網友的支持
鏈接失效請移步職業司平臺
非盈利平臺

非盈利平臺

只為分享一些優質內容

Java幫幫-微信公眾號

Java幫幫-微信公眾號

將分享做到極致

微信小程序

微信小程序

更方便的閱讀

職業司微信公眾號

職業司微信公眾號

實時動態通知

安卓APP

安卓APP

我們從此不分開

程序員生活志-公眾號

程序員生活志-公眾號

程序員生活學習圈,互聯網八卦黑料

支付寶贊助-Java幫幫社區
微信贊助-Java幫幫社區

從構建分布式秒殺系統聊聊Disruptor高性能隊列

23
發表時間:2018-11-08 13:24來源:Java幫幫-微信公眾號


前言

秒殺架構持續優化中,基于自身認知不足之處在所難免,也請大家指正,共同進步。文章標題來自碼友<tukangzheng>的建議,希望可以把阻塞隊列ArrayBlockingQueue這個隊列替換成Disruptor,由于之前曾接觸過這個東西,聽說很不錯,正好借此機會整合進來。

簡介

LMAX Disruptor是一個高性能的線程間消息庫。它源于LMAX對并發性,性能和非阻塞算法的研究,如今構成了Exchange基礎架構的核心部分。

  • Disruptor它是一個開源的并發框架,并獲得2011 Duke’s 程序框架創新獎,能夠在無鎖的情況下實現網絡的Queue并發操作。

  • Disruptor是一個高性能的異步處理框架,或者可以認為是最快的消息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。

在這里你可以跟BlockingQueue隊列作比對,簡單的理解為它是一種高效的"生產者-消費者"模型,先了解后深入底層原理。

核心

寫代碼案例之前,大家最好先了解 Disruptor 的核心概念,至少知道它是如何運作的。

  • Ring Buffer
    如其名,環形的緩沖區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 可以由用戶的自定義實現來完全替代。

  • Sequence Disruptor
    通過順序遞增的序號來編號管理通過其進行交換的數據(事件),對數據(事件)的處理過程總是沿著序號逐個遞增處理。一個 Sequence 用于跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也可以用于標識進度,但定義 Sequence 來負責該問題還有另一個目的,那就是防止不同的 Sequence 之間的CPU緩存偽共享(Flase Sharing)問題。

  • Sequencer
    Sequencer 是 Disruptor 的真正核心。此接口有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的并發算法。

  • Sequence Barrier
    用于保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。

  • Wait Strategy
    定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現)

  • Event
    在 Disruptor 的語義中,生產者和消費者之間進行交換的數據被稱為事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義并指定。

  • EventProcessor
    EventProcessor 持有特定消費者(Consumer)的 Sequence,并提供用于調用事件處理實現的事件循環(Event Loop)。

  • EventHandler
    Disruptor 定義的事件處理接口,由用戶實現,用于處理事件,是 Consumer 的真正實現。

  • Producer
    即生產者,只是泛指調用 Disruptor 發布事件的用戶代碼,Disruptor 沒有定義特定接口或類型。


優點

  • 剖析Disruptor:為什么會這么快?(一)鎖的缺點

  • 剖析Disruptor:為什么會這么快?(二)神奇的緩存行填充

  • 剖析Disruptor:為什么會這么快?(三)偽共享

  • 剖析Disruptor:為什么會這么快?(四)揭秘內存屏障

有興趣的參考:

使用案例

這里以我們系統中的秒殺作為案例,后面有相對復雜的場景介紹。

定義秒殺事件對象:

/** * 事件對象(秒殺事件) */publicclassSeckillEventimplementsSerializable{    privatestaticfinallong serialVersionUID = 1L;    privatelong seckillId;    privatelong userId;        publicSeckillEvent(){            }    publiclonggetSeckillId(){        return seckillId;    }    publicvoidsetSeckillId(long seckillId){        this.seckillId = seckillId;    }    publiclonggetUserId(){        return userId;    }    publicvoidsetUserId(long userId){        this.userId = userId;    }}

為了讓Disruptor為我們預先分配這些事件,我們需要一個將執行構造的EventFactory:

/** * 事件生成工廠(用來初始化預分配事件對象)*/publicclassSeckillEventFactoryimplementsEventFactory<SeckillEvent> {    public SeckillEvent newInstance(){        returnnew SeckillEvent();    }}

然后,我們需要創建一個處理這些事件的消費者:

/** * 消費者(秒殺處理器)*/publicclassSeckillEventConsumerimplementsEventHandler<SeckillEvent> {    //業務處理、這里是無法注入的,需要手動獲取,見源碼private ISeckillService seckillService = (ISeckillService) SpringUtil.getBean("seckillService");       publicvoidonEvent(SeckillEvent seckillEvent, long seq, boolean bool)throws Exception {       seckillService.startSeckil(seckillEvent.getSeckillId(), seckillEvent.getUserId());    }}

既然有消費者,我們將需要這些秒殺事件的來源:

/** * 使用translator方式生產者*/publicclassSeckillEventProducer{        privatefinalstatic EventTranslatorVararg<SeckillEvent> translator = new EventTranslatorVararg<SeckillEvent>() {        publicvoidtranslateTo(SeckillEvent seckillEvent, long seq, Object... objs){            seckillEvent.setSeckillId((Long) objs[0]);            seckillEvent.setUserId((Long) objs[1]);        }    };    privatefinal RingBuffer<SeckillEvent> ringBuffer;        publicSeckillEventProducer(RingBuffer<SeckillEvent> ringBuffer){        this.ringBuffer = ringBuffer;    }        publicvoidseckill(long seckillId, long userId){        this.ringBuffer.publishEvent(translator, seckillId, userId);    }}

最后,我們來寫一個測試類,運行一下(跑不通,需要修改消費者):

/** * 測試類 */publicclassSeckillEventMain{    publicstaticvoidmain(String[] args){        producerWithTranslator();    }    publicstaticvoidproducerWithTranslator(){       SeckillEventFactory factory = new SeckillEventFactory();        int ringBufferSize = 1024;        ThreadFactory threadFactory = new ThreadFactory() {            public Thread newThread(Runnable runnable){                returnnew Thread(runnable);           }       };        //創建disruptor        Disruptor<SeckillEvent> disruptor = new Disruptor<SeckillEvent>(factory, ringBufferSize, threadFactory);        //連接消費事件方法        disruptor.handleEventsWith(new SeckillEventConsumer());        //啟動        disruptor.start();        RingBuffer<SeckillEvent> ringBuffer = disruptor.getRingBuffer();        SeckillEventProducer producer = new SeckillEventProducer(ringBuffer);        for(long i = 0; i<10; i++){            producer.seckill(i, i);        }        disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;    }}

使用場景

  • PCP (生產者-消費者問題)

  • 網上搜了下國內實戰案例并不多,大廠可能有在使用

這里舉一個大家日常的例子,停車場景。當汽車進入停車場時(A),系統首先會記錄汽車信息(B)。同時也會發送消息到其他系統處理相關業務(C),最后發送短信通知車主收費開始(D)。

一個生產者A與三個消費者B、C、D,D的事件處理需要B與C先完成。則該模型結構如下:


在這個結構下,每個消費者擁有各自獨立的事件序號Sequence,消費者之間不存在共享競態。SequenceBarrier1監聽RingBuffer的序號cursor,消費者B與C通過SequenceBarrier1等待可消費事件。SequenceBarrier2除了監聽cursor,同時也監聽B與C的序號Sequence,從而將最小的序號返回給消費者D,由此實現了D依賴B與C的邏輯。


Java幫幫學習群生態

Java幫幫學習群生態

總有一款能幫到你

Java學習群

Java學習群

與大牛一起交流

大數據學習群

大數據學習群

在數據中成長

九點編程學習群

九點編程學習群

深夜九點學編程

python學習群

python學習群

人工智能,爬蟲

測試學習群

測試學習群

感受測試的魅力

Java幫幫生態承諾

Java幫幫生態承諾

一直堅守,不負重望

初心
勤儉
誠信
正義
分享
友鏈交換:加幫主QQ2524138991 留言即可 24小時內答復  
業司
教育資訊
會員登錄
獲取驗證碼
登錄
登錄
我的資料
留言
回到頂部