全站資源開放下載,感謝廣大網友的支持
鏈接失效請移步職業司平臺
非盈利平臺

非盈利平臺

只為分享一些優質內容

Java幫幫-微信公眾號

Java幫幫-微信公眾號

將分享做到極致

微信小程序

微信小程序

更方便的閱讀

職業司微信公眾號

職業司微信公眾號

實時動態通知

安卓APP

安卓APP

我們從此不分開

程序員生活志-公眾號

程序員生活志-公眾號

程序員生活學習圈,互聯網八卦黑料

支付寶贊助-Java幫幫社區
微信贊助-Java幫幫社區

從構建分布式秒殺系統聊聊WebSocket推送通知

28
發表時間:2018-11-08 13:10來源:Java幫幫-微信公眾號

前言

秒殺架構到后期,我們采用了消息隊列的形式實現搶購邏輯,那么之前拋出過這樣一個問題:消息隊列異步處理完每個用戶請求后,如何通知給相應用戶秒殺成功?

場景映射

首先,我們舉一個生活中比較常見的例子:我們去銀行辦理業務,一般會選擇相關業務打印一個排號紙,然后就可以坐在小板凳上玩著手機,等待被小喇叭報號。當小喇叭喊到你所持有的號碼,就可以拿著排號紙去柜臺辦理自己的業務。

這里,假設當我們取排號紙的時候,銀行根據時間段內的排隊情況,比較人性化的提示用戶:排隊人數較多,您是否繼續等待?否的話我們可以換個時間段再來辦理。

由此我們把生活場景映射到真實的秒殺業務邏輯中來:

  • 我們可以把柜臺比喻成商品下單處理邏輯單元

  • 拿到排號紙說明你進入相應商品處理隊列

  • 拿到排號紙的請求直接返回前臺,提示用戶搶購進行中

  • 排號紙進入隊列后,等待商品業務處理邏輯

  • 小喇叭叫到自己的排號相當于服務端通知用戶秒殺成功,這時候可以進行支付邏輯

  • 那些拿不到票號的同學,相當于隊列已滿直接返回秒殺失敗

解決方案

通過上面的場景,我們很容易能夠想到一種方案就是服務端通知,那么如何做到服務端異步通知的呢?下面,主角開始登場了,就是我們的Websocket。

WebSocket是HTML5開始提供的一種瀏覽器與服務器間進行全雙工通訊的網絡技術。依靠這種技術可以實現客戶端和服務器端的長連接,雙向實時通信。


特點:

  • 異步、事件觸發

  • 可以發送文本,圖片等流文件

  • 數據格式比較輕量,性能開銷小,通信高效

  • 使用ws或者wss協議的客戶端socket,能夠實現真正意義上的推送功能

缺點:

  • 部分瀏覽器不支持,瀏覽器支持的程度與方式有區別,需要各種兼容寫法。

集成案例

由于我們的秒殺架構項目案例中使用了SpringBoot,因此集成webSocket也是相對比較簡單的。

首先pom.xml引入以下依賴:

<!-- webSocket 秒殺通知--><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-websocket</artifactId></dependency>

WebSocketConfig 配置:

/**  * WebSocket配置  * 創建者  爪哇筆記  * 創建時間    2018年5月29日  */@ConfigurationpublicclassWebSocketConfig{      @Beanpublic ServerEndpointExporter serverEndpointExporter(){              returnnew ServerEndpointExporter();          }  }

WebSocketServer 配置:

@ServerEndpoint("/websocket/{userId}")@ComponentpublicclassWebSocketServer{    privatefinalstatic Logger log = LoggerFactory.getLogger(WebSocketServer.class);    //靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。privatestaticint onlineCount = 0;    //concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。privatestatic CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();    //與某個客戶端的連接會話,需要通過它來給客戶端發送數據private Session session;    //接收userIdprivate String userId="";    /**     * 連接建立成功調用的方法*/@OnOpenpublicvoidonOpen(Session session,@PathParam("userId") String userId) {        this.session = session;        webSocketSet.add(this);     //加入set中        addOnlineCount();           //在線數加1        log.info("有新窗口開始監聽:"+userId+",當前在線人數為" + getOnlineCount());        this.userId=userId;        try {             sendMessage("連接成功");        } catch (IOException e) {            log.error("websocket IO異常");        }    }    /**    * 連接關閉調用的方法     */@OnClosepublicvoidonClose(){        webSocketSet.remove(this);  //從set中刪除        subOnlineCount();           //在線數減1        log.info("有一連接關閉!當前在線人數為" + getOnlineCount());    }    /**     * 收到客戶端消息后調用的方法     * @param message 客戶端發送過來的消息   */@OnMessagepublicvoidonMessage(String message, Session session){        log.info("收到來自窗口"+userId+"的信息:"+message);        //群發消息for (WebSocketServer item : webSocketSet) {            try {                item.sendMessage(message);            } catch (IOException e) {                e.printStackTrace();           }       }    }    /**     * @param session     * @param error     */@OnErrorpublicvoidonError(Session session, Throwable error){        log.error("發生錯誤");        error.printStackTrace();   }    /**     * 實現服務器主動推送     */publicvoidsendMessage(String message)throws IOException {        this.session.getBasicRemote().sendText(message);    }    /**     * 群發自定義消息     * */publicstaticvoidsendInfo(String message,@PathParam("userId") String userId){        log.info("推送消息到窗口"+userId+",推送內容:"+message);        for (WebSocketServer item : webSocketSet) {            try {                //這里可以設定只推送給這個userId的,為null則全部推送if(userId==null) {                   item.sendMessage(message);               }elseif(item.userId.equals(userId)){                   item.sendMessage(message);               }            } catch (IOException e) {                continue;           }       }    }    publicstaticsynchronizedintgetOnlineCount(){        return onlineCount;    }    publicstaticsynchronizedvoidaddOnlineCount(){        WebSocketServer.onlineCount++;    }    publicstaticsynchronizedvoidsubOnlineCount(){        WebSocketServer.onlineCount--;    }}

KafkaConsumer 消費配置,通知用戶是否秒殺成功:

/** * 消費者 spring-kafka 2.0 + 依賴JDK8 * @author  By http://blog.52itstyle.com */@ComponentpublicclassKafkaConsumer{    @Autowiredprivate ISeckillService seckillService;        privatestatic RedisUtil redisUtil = new RedisUtil();    /**    * 監聽seckill主題,有消息就讀取    * @param message     */@KafkaListener(topics = {"seckill"})   publicvoidreceiveMessage(String message){        //收到通道的消息之后執行秒殺操作        String[] array = message.split(";");         if(redisUtil.getValue(array[0])!=null){//control層已經判斷了,其實這里不需要再判斷了            Result result = seckillService.startSeckil(Long.parseLong(array[0]), Long.parseLong(array[1]));            if(result.equals(Result.ok())){                WebSocketServer.sendInfo(array[0].toString(), "秒殺成功");//推送給前臺            }else{                WebSocketServer.sendInfo(array[0].toString(), "秒殺失敗");//推送給前臺                redisUtil.cacheValue(array[0], "ok");//秒殺結束            }        }else{            WebSocketServer.sendInfo(array[0].toString(), "秒殺失敗");//推送給前臺        }   }}

webSocket.js 前臺通知邏輯:

$(function(){    socket.init();});var basePath = "ws://localhost:8080/seckill/";socket = {    webSocket : "",    init : function() {        //userId:自行追加if ('WebSocket'inwindow) {            webSocket = new WebSocket(basePath+'websocket/1');         }         elseif ('MozWebSocket'inwindow) {            webSocket = new MozWebSocket(basePath+"websocket/1");       }         else {            webSocket = new SockJS(basePath+"sockjs/websocket");        }       webSocket.onerror = function(event) {            alert("websockt連接發生錯誤,請刷新頁面重試!")        };        webSocket.onopen = function(event) {                    };        webSocket.onmessage = function(event) {           var message = event.data;           alert(message)//判斷秒殺是否成功、自行處理邏輯        };    }}

客戶端API

客戶端與服務器通信

  • send() 向遠程服務器發送數據

  • close() 關閉該websocket鏈接

監聽函數 

  • onopen 當網絡連接建立時觸發該事件

  • onerror 當網絡發生錯誤時觸發該事件

  • onclose 當websocket被關閉時觸發該事件

  • onmessage 當websocket接收到服務器發來的消息的時觸發的事件,也是通信中最重要的一個監聽事件。msg.data

readyState屬性

這個屬性可以返回websocket所處的狀態。

  • CONNECTING(0) websocket正嘗試與服務器建立連接

  • OPEN(1) websocket與服務器已經建立連接

  • CLOSING(2) websocket正在關閉與服務器的連接

  • CLOSED(3) websocket已經關閉了與服務器的連接

開源方案

goeasy

GoEasy實時Web推送,支持后臺推送和前臺推送兩種:后臺推送可以選擇Java SDK、 Restful API支持所有開發語言;前臺推送:JS推送。無論選擇哪種方式推送代碼都十分簡單(10分鐘可搞定)。由于它支持websocket 和polling兩種連接方式所以兼顧大多數主流瀏覽器,低版本的IE瀏覽器也是支持的。

地址:

Pushlets

Pushlets 是通過長連接方式實現“推”消息的。推送模式分為:Poll(輪詢)、Pull(拉)。

地址:

Pushlet

Pushlet 是一個開源的 Comet 框架,Pushlet 使用了觀察者模型:客戶端發送請求,訂閱感興趣的事件;服務器端為每個客戶端分配一個會話 ID 作為標記,事件源會把新產生的事件以多播的方式發送到訂閱者的事件隊列里。

地址:

總結

其實前面有提過,盡管WebSocket有諸多優點,但是,如果服務端維護很多長連接也是挺耗費資源的,服務器集群以及覽器或者客戶端兼容性問題,也會帶來了一些不確定性因素。大體了解了一下各大廠的做法,大多數都還是基于輪詢的方式實現的,比如:騰訊PC端微信掃碼登錄、京東商城支付成功通知等等。

有些小伙伴可能會問了,輪詢豈不是會更耗費資源?其實在我看來,有些輪詢是不可能穿透到后端數據庫查詢服務的,比如秒殺,一個緩存標記位就可以判定是否秒殺成功。相對于WS的長連接以及其不確定因素,在秒殺場景下,輪詢還是相對比較合適的。


Java幫幫學習群生態

Java幫幫學習群生態

總有一款能幫到你

Java學習群

Java學習群

與大牛一起交流

大數據學習群

大數據學習群

在數據中成長

九點編程學習群

九點編程學習群

深夜九點學編程

python學習群

python學習群

人工智能,爬蟲

測試學習群

測試學習群

感受測試的魅力

Java幫幫生態承諾

Java幫幫生態承諾

一直堅守,不負重望

初心
勤儉
誠信
正義
分享
友鏈交換:加幫主QQ2524138991 留言即可 24小時內答復  
業司
教育資訊
會員登錄
獲取驗證碼
登錄
登錄
我的資料
留言
回到頂部