衡阳派盒市场营销有限公司

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

一個注解實現(xiàn)WebSocket集群的方案

jf_ro2CN3Fa ? 來源:CSDN ? 2023-04-10 11:37 ? 次閱讀

介紹

WebSocket大家應(yīng)該是再熟悉不過了,如果是單體應(yīng)用確實不會有什么問題,但是當(dāng)我們的項目使用微服務(wù)架構(gòu)時,就可能會存在問題

比如服務(wù)A有兩個實例A1和A2,前端的WebSocket客戶端C通過網(wǎng)關(guān)的負(fù)載均衡連到了A1,這個時候當(dāng)A2觸發(fā)消息發(fā)送的邏輯,需要將某個消息發(fā)送給所有的客戶端時,C就接受不到消息

這個時候我們很快就能想到一種最簡單的解決方案,就是把A2的消息轉(zhuǎn)發(fā)給A1,A1再把消息發(fā)送給C,這樣C就能收到A2發(fā)送的消息了

81f46d56-d6c8-11ed-bfe3-dac502259ad0.png

基于這個思路,我實現(xiàn)了一個庫,一個配置注解搞定一切

用法

接下來讓我們看看這個庫的用法

首先我們需要在啟動類上添加一個注解@EnableWebSocketLoadBalanceConcept

@EnableWebSocketLoadBalanceConcept
@EnableDiscoveryClient
@SpringBootApplication
publicclassAServiceApplication{

publicstaticvoidmain(String[]args){
SpringApplication.run(AServiceApplication.class,args);
}
}

接著我們在需要發(fā)送消息的地方注入WebSocketLoadBalanceConcept就可以愉快的跨實例發(fā)消息啦

@RestController
@RequestMapping("/ws")
publicclassWsController{

@Autowired
privateWebSocketLoadBalanceConceptconcept;

@RequestMapping("/send")
publicvoidsend(@RequestParamStringmsg){
concept.send(msg);
}
}

是不是很簡單,有沒有覺得比自己集成單體應(yīng)用的WebSocket還要簡單!

當(dāng)你的同事還在頭疼要實現(xiàn)手動轉(zhuǎn)發(fā)時你已經(jīng)通過一個配置注解實現(xiàn)了功能并開始泡茶喝

你的同事肯定對你刮目相看啊(又能開始摸魚了)

不知道大家看了之后是不是對具體實現(xiàn)已經(jīng)有了一些思路呢

接下來我就來講講這個庫的實現(xiàn)流程

抽象思路

其實我之前有專門針對WebSocket實現(xiàn)過類似功能的模塊,只是當(dāng)時的一些場景都是基于項目定死的,所以相對來說實現(xiàn)比較簡單,但是過于定制化不好擴展

有一天在和我的一個前同事聊天的過程中得知,他們在考慮讓設(shè)備和服務(wù)直連,并且服務(wù)要部署成多實例

設(shè)備和服務(wù)直連無非就是通過TCP這種長連接來實現(xiàn),可以使用緩存來保存連接和服務(wù)地址的映射關(guān)系來實現(xiàn)點對點轉(zhuǎn)發(fā)的功能需求

聽到這里,是不是感覺似曾相識?當(dāng)時就有一道光穿過我的腦瓜子,真相只有一個!這不就和WebSocket在集群模式下的問題一樣么

于是我從原來針對WebSocket的思考,變成了對各種長連接的思考,最終我將這個問題抽象成了:長連接的集群方案

而不管是WebSocket還是TCP都是長連接的一種具體實現(xiàn)

所以我們可以抽象一個頂級接口Connection,然后實現(xiàn)WebSocketConnection或者是TCPConnection

其實從抽象的角度來說不僅僅是長連接,短連接也在我們的抽象范圍之內(nèi),只不過類似HTTP等協(xié)議并不存在上述的問題,但是并不妨礙你實現(xiàn)一個HTTPConnection用于轉(zhuǎn)發(fā)消息,所以大家不要被先入為主的思維束縛住了

轉(zhuǎn)發(fā)思路

之前講到,這個庫的主要思路就是將消息轉(zhuǎn)發(fā)給其他的服務(wù)實例來達(dá)到一個單播或廣播的效果

所以消息轉(zhuǎn)發(fā)的設(shè)計就非常重要了

首先消息轉(zhuǎn)發(fā)需要憑借一些支持?jǐn)?shù)據(jù)交互的技術(shù)手段

比如HTTP,MQ,TCP,WebSocket

說到這里。。。大家是不是。。。你TM原來自己就能搞定?。ㄏ谱溃?/p>

長連接不就是用來交互數(shù)據(jù)的嗎,所以完全可以自給自足啊

于是就有一個精妙的想法在我腦子里形成:

如果每個服務(wù)實例都把自己作為一個客戶端,連接到其他服務(wù)上呢?

WebSocket的場景下,我們將當(dāng)前服務(wù)實例作為一個WebSocket客戶端去連接其他服務(wù)實例的WebSocket服務(wù)端

TCP的場景下,我們將當(dāng)前服務(wù)實例作為一個TCP的客戶端去連接其他服務(wù)實例的TCP服務(wù)端

這樣其他服務(wù)實例就可以把消息發(fā)到這些偽裝的客戶端上,當(dāng)服務(wù)實例上偽裝的客戶端接收到消息之后就可以再轉(zhuǎn)發(fā)給自己管理的真正的客戶端

撒花家人們,自閉(自我閉環(huán))了屬于是

所以我們首先需要先讓服務(wù)實例之間相互連接上

連接流程

讓我們來看看互相建立連接是怎么設(shè)計的

82155278-d6c8-11ed-bfe3-dac502259ad0.png

我定義了一個ConnectionSubscriber的接口,大家可以理解為我們的服務(wù)實例要去訂閱監(jiān)聽其他服務(wù)發(fā)送的消息

同時提供了默認(rèn)實現(xiàn),就是基于自身的協(xié)議進行連接和消息的發(fā)送

當(dāng)然也能夠靈活的支持其他方式,只需要自定義一個ConnectionSubscriber就可以了,如果使用MQ的方式就可以實現(xiàn)一個MQConnectionSubscriber或者使用HTTP就可以實現(xiàn)一個HTTPConnectionSubscriber

只不過使用自身的協(xié)議就可以不用依賴其他的庫或是中間件了,當(dāng)然如果你對消息的丟失率有比較嚴(yán)格的要求也可以使用MQ作為消息轉(zhuǎn)發(fā)的中介,而以我之前參與過的項目來說,一般普通的WebSocket場景基本上還是能忍受一定的丟失率的

獲取服務(wù)實例信息

那么我們怎么知道要去連接哪些實例呢

我定義了一個ConnectionServerManager的接口用來管理服務(wù)信息

當(dāng)然我們完全可以自己實現(xiàn)一個,比如通過配置文件來配置服務(wù)實例信息

不過我們有更方便的方式,那就是依賴Spring Cloud的服務(wù)發(fā)現(xiàn)組件了,不管是Eureka還是Nacos還是其他的注冊中心相當(dāng)于都支持了,這就是抽象的魅力啊

我們可以通過DiscoveryClient#getInstances(Registration.getServiceId())來獲得所有的實例,排除掉自身就是需要連接的服務(wù)實例了

當(dāng)我們的服務(wù)實例連接上其他的服務(wù)實例之后,發(fā)送一個自身實例信息的消息過去,其他的服務(wù)實例接收到對應(yīng)的消息之后反過來連接我們的服務(wù)實例,保證一定的連接及時性,這樣雙方的連接就搭建起來了,可以互相轉(zhuǎn)發(fā)消息了

同時我還添加了心跳檢測和自動重連,當(dāng)一段時間沒有收到心跳回復(fù)后就會斷開連接,并且每隔一段時間就會重新查詢一遍實例信息,如果發(fā)現(xiàn)存在某個服務(wù)實例沒有對應(yīng)的連接,就會重新進行連接,這樣就能在某些偶爾網(wǎng)絡(luò)不好的情況下有一定的容錯

到目前為止,我們基本的框架已經(jīng)建立了,當(dāng)我們啟動服務(wù)之后,服務(wù)間就會自動建立連接

連接區(qū)分和管理

基于上述的思路,我們肯定需要區(qū)分真實的客戶端和用來轉(zhuǎn)發(fā)的客戶端

于是我就把這些連接做了一個分類

類別 說明
Client 普通的連接
Subscriber 服務(wù)實例偽裝的連接,用于接受需要轉(zhuǎn)發(fā)的消息
Observable 服務(wù)實例偽裝的連接,用于發(fā)送需要轉(zhuǎn)發(fā)的消息

然后對于這些連接進行一個統(tǒng)一的管理

826dec12-d6c8-11ed-bfe3-dac502259ad0.png

通過連接工廠ConnectionFactory我們可以將任意的連接適配成Connection對象,并實現(xiàn)各種連接間的消息轉(zhuǎn)發(fā)

每個連接都會配置一個MessageEncoder和MessageDecoder用于消息的編碼和解碼,而且不同類別的連接對應(yīng)的編碼器和解碼器肯定是不一樣的,比如轉(zhuǎn)發(fā)的消息和發(fā)給真實客戶端的消息很大程度上都是有區(qū)別的,所以額外定義了一個MessageCodecAdapter用來適配不同類型的編解碼器,也能讓大家在自定義時方便管理

消息發(fā)送

現(xiàn)在當(dāng)我們發(fā)送某條消息之后,消息就會被轉(zhuǎn)發(fā)到其他的服務(wù)實例,所有的客戶端就都能收到了

不對啊,在有些情況下我們不想讓所有客戶端都收到啊,能不能我們想讓誰收到就讓誰收到啊

真麻煩,來,我把所有的連接都給你,你自己選吧

連接選擇

我們需要在消息發(fā)送時確定發(fā)送給哪些連接

82cd30be-d6c8-11ed-bfe3-dac502259ad0.png

于是我就定義了一個連接選擇器ConnectionSelector

每次要發(fā)送消息的時候,我都會匹配一個連接選擇器,然后通過選擇器來獲得需要發(fā)送消息的連接,而我們可以通過自定義連接選擇器來實現(xiàn)我們消息的精準(zhǔn)發(fā)送

這里其實就是我為什么會取名WebSocketLoadBalanceConcept的原因,為什么要叫LoadBalance呢

Ribbon通過IRule來選擇一個Server

我通過ConnectionSelector來選擇一個Connection集合

是不是有異曲同工之妙

繼續(xù)來說自定義選擇器

準(zhǔn)備工作:

我們的Connection有一個metadata字段用于存放自定義屬性

我們的Message有一個headers字段用于存放消息頭

給指定用戶發(fā)送消息

很多場景下我們需要給指定的用戶發(fā)送消息

首先當(dāng)客戶端連接上來時,可以通過參數(shù)或者主動發(fā)送一個消息將userId發(fā)給服務(wù)端,然后服務(wù)端將得到的userId存在Connection的metadata中

接著我們給需要發(fā)送的Message添加一個header,將對應(yīng)的userId作為消息頭

這樣我們就可以自定義一個連接選擇器通過判斷Message是否包含userId消息頭來作為匹配的條件,當(dāng)Message的headers中存在userId時,對Connection中的metadata進行userId的匹配來篩選需要發(fā)送消息的連接

由于userId是唯一的,當(dāng)我們自身服務(wù)連上來的客戶端中已經(jīng)匹配到就不需要再轉(zhuǎn)發(fā)了,如果沒有匹配到就通過其他服務(wù)實例的客戶端進行消息轉(zhuǎn)發(fā)

庫中已經(jīng)實現(xiàn)了對應(yīng)的UserSelector和UserMessage,可以使用配置開啟并通過在連接路徑上添加userId參數(shù)來標(biāo)記用戶

當(dāng)然我們也可以借用緩存來精確的判斷需不需要轉(zhuǎn)發(fā)或者是需要轉(zhuǎn)發(fā)給哪幾個服務(wù),把userId和服務(wù)的instanceId等一些具有唯一性的數(shù)據(jù)緩存在Redis中,當(dāng)給用戶發(fā)送消息時,從Redis中獲得用戶對應(yīng)的服務(wù)實例的instanceId或是具有唯一性的數(shù)據(jù),如果經(jīng)過匹配就是當(dāng)前服務(wù)就可以直接下發(fā),如果是其他服務(wù)就轉(zhuǎn)發(fā)給那個對應(yīng)的服務(wù)就行了

給指定路徑發(fā)送消息

還有一種場景也比較常見就是類似主題訂閱,如訂閱設(shè)備狀態(tài)更新的數(shù)據(jù),就要給每一個對應(yīng)路徑的連接發(fā)送消息了

我們可以使用不同的路徑來表示不同主題,然后自定義一個連接選擇器來匹配連接的路徑和消息頭中指定的路徑

當(dāng)然庫中也已經(jīng)實現(xiàn)了對應(yīng)的PathSelector和PathMessage,可以通過配置開啟

結(jié)束

最后請允許我發(fā)表一點對于抽象的拙見

抽象其實就和 “道生一,一生二,二生三,三生萬物” 一樣,根據(jù)你的頂級接口(也就是核心功能)不斷的向外展開,你的頂級接口就是道(狹義的來講)

以這個庫為例,ConnectionLoadBalanceConcept就是這個庫的道,他的核心功能就是發(fā)送消息,至于怎么發(fā),發(fā)給誰,不確定,像是一個混沌的狀態(tài)

那么什么是一,二,三呢,我們發(fā)送消息需要載體于是就有了Connection和Message,我們需要對Connection進行管理于是就有了ConnectionRepository,我們需要轉(zhuǎn)發(fā)消息于是就有了ConnectionSubscriber等等

而萬物就像是具體的實現(xiàn),是能落實的,基于Spring Cloud服務(wù)發(fā)現(xiàn)的連接管理器DiscoveryConnectionServerManager,基于路徑的連接選擇器PathSelector,基于Reactive的WebSocket連接ReactiveWebSocketConnection

就像是你創(chuàng)造的世界,不斷的衍生出各種各樣的規(guī)則,這些規(guī)則相輔相成,讓你的世界平穩(wěn)的運行。






審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • HTTP協(xié)議
    +關(guān)注

    關(guān)注

    0

    文章

    66

    瀏覽量

    9795
  • TCP通信
    +關(guān)注

    關(guān)注

    0

    文章

    146

    瀏覽量

    4292
  • WebSocket
    +關(guān)注

    關(guān)注

    0

    文章

    29

    瀏覽量

    3783

原文標(biāo)題:一個注解實現(xiàn) WebSocket 集群方案,這樣玩才爽!

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    EE-98:使用外部總線仲裁將兩以上的ADSP-21065L組合到多處理集群

    電子發(fā)燒友網(wǎng)站提供《EE-98:使用外部總線仲裁將兩以上的ADSP-21065L組合到多處理集群中.pdf》資料免費下載
    發(fā)表于 01-05 09:38 ?0次下載
    EE-98:使用外部總線仲裁將兩<b class='flag-5'>個</b>以上的ADSP-21065L組合到<b class='flag-5'>一</b><b class='flag-5'>個</b>多處理<b class='flag-5'>集群</b>中

    國產(chǎn)智算集群黑馬!曦源號SADA算力集群綜合評測表現(xiàn)優(yōu)異

    穩(wěn)定性、線性度、模型支持度等多個維度均表現(xiàn)優(yōu)異。加佳科技長期深耕國產(chǎn)替代數(shù)字科技的技術(shù)研發(fā)、平臺運營與解決方案提供。旗下曦源號SADA萬卡集群通過構(gòu)建開放、標(biāo)準(zhǔn)、
    的頭像 發(fā)表于 12-25 11:16 ?371次閱讀
    國產(chǎn)智算<b class='flag-5'>集群</b>黑馬!曦源<b class='flag-5'>一</b>號SADA算力<b class='flag-5'>集群</b>綜合評測表現(xiàn)優(yōu)異

    韓國無晶圓廠初創(chuàng)公司Panmnesia展示第一個支持CXL的AI集群

    在2024?OCP全球峰會上,開發(fā)CXL交換機SoC和CXL IP的韓國無晶圓廠初創(chuàng)公司Panmnesia展示了第一個支持CXL的AI集群,該集群采用CXL 3.1交換機。 OCP全球峰會由世界上
    的頭像 發(fā)表于 11-28 11:04 ?486次閱讀

    socket 和 WebSocket 的區(qū)別

    在現(xiàn)代網(wǎng)絡(luò)通信中,Socket和WebSocket是兩種常見的通信協(xié)議。它們在實現(xiàn)網(wǎng)絡(luò)通信、數(shù)據(jù)傳輸?shù)确矫姘l(fā)揮著重要作用。然而,它們之間存在些關(guān)鍵的區(qū)別。 1. Socket(套接字) 1.1
    的頭像 發(fā)表于 11-12 14:33 ?616次閱讀

    socket與WebSocket的區(qū)別與聯(lián)系

    ) : Socket是種通信端點,它在網(wǎng)絡(luò)編程中用于實現(xiàn)不同主機之間的通信。Socket可以是TCP套接字或UDP套接字,分別對應(yīng)于TCP(傳輸控制協(xié)議)和UDP(用戶數(shù)據(jù)報協(xié)議)。 TCP套接字提供了可靠的、面向連接的通信服務(wù),而UDP套接字則提供了不可靠的、無連接的
    的頭像 發(fā)表于 11-04 09:19 ?582次閱讀

    不可錯過的Air780E之WebSocket應(yīng)用示范!小白篇

    咋們今天說的Air780E之WebSocket應(yīng)用示范針對小白絕對是不可以錯過的示例。
    的頭像 發(fā)表于 11-03 20:16 ?916次閱讀
    不可錯過的Air780E之<b class='flag-5'>WebSocket</b>應(yīng)用示范!小白篇

    世界前十科技集群 中國有4 深圳-香港-廣州 上海-蘇州上榜

    根據(jù)WIPO中國公布的2024GII百強科技集群排名榜單顯示;排名世界前十科技集群我國有4 ;分別是深圳-香港-廣州 ;北京;上海-蘇州;南京;? 此外我國的武漢、杭州表現(xiàn)也非常不錯,排名分別位列
    的頭像 發(fā)表于 08-29 14:16 ?502次閱讀

    在多FPGA集群實現(xiàn)高級并行編程

    今天我們看的這篇論文介紹了在多FPGA集群實現(xiàn)高級并行編程的研究,其主要目標(biāo)是為非FPGA專家提供成熟且易于使用的環(huán)境,以便在多個并行運行的設(shè)備上擴展高性能計算(HPC)應(yīng)用。
    的頭像 發(fā)表于 07-24 14:54 ?1394次閱讀

    websocket.c RTOS演示中缺少對wifi_connect()的調(diào)用怎么辦?

    在 RTOS SDK 1.3 中,有名為 /examples/websocket_demo/websocket/websocket.c
    發(fā)表于 07-18 06:37

    請問websocket庫怎么讀取服務(wù)器發(fā)來的數(shù)據(jù)?

    官方websocket庫怎么讀取服務(wù)器發(fā)來的數(shù)據(jù)?
    發(fā)表于 06-25 06:40

    鴻蒙開發(fā)網(wǎng)絡(luò)管理:ohos.net.webSocket WebSocket連接

    使用WebSocket建立服務(wù)器與客戶端的雙向連接,需要先通過[createWebSocket]方法創(chuàng)建[WebSocket]對象,然后通過[connect]方法連接到服務(wù)器。當(dāng)連接成功后,客戶端
    的頭像 發(fā)表于 06-19 17:12 ?667次閱讀
    鴻蒙開發(fā)網(wǎng)絡(luò)管理:ohos.net.<b class='flag-5'>webSocket</b> <b class='flag-5'>WebSocket</b>連接

    ESP32進行websocket通信接收數(shù)據(jù)出錯的原因?

    開發(fā),支持一對一私聊和對多公聊 W (104983) WEBSOCKET: Total payload length=277, data_len=277, current payload offset
    發(fā)表于 06-14 07:42

    HBase集群數(shù)據(jù)在線遷移方案探索

    、背景 訂單本地化系統(tǒng)目前一個月的訂單的讀寫已經(jīng)切至jimkv存儲,對應(yīng)的HBase集群已下線。但存儲全量數(shù)據(jù)的HBase集群仍在使用,計劃將這個HBase
    的頭像 發(fā)表于 06-12 11:54 ?1217次閱讀
    HBase<b class='flag-5'>集群</b>數(shù)據(jù)在線遷移<b class='flag-5'>方案</b>探索

    鴻蒙原生應(yīng)用開發(fā)-網(wǎng)絡(luò)管理WebSocket連接

    。使用該功能需要申請ohos.permission.INTERNET權(quán)限。具體接口說明如下表。 三、開發(fā)步驟 1.導(dǎo)入需要的webSocket模塊。 2.創(chuàng)建WebSocket連接
    發(fā)表于 04-07 09:46

    浪潮信息推出全球首個單存儲即可支持16節(jié)點的SAP HANA集群方案

    近日,浪潮信息成功實現(xiàn)并推出全球首個單存儲即可支持16節(jié)點的SAP HANA集群方案,全閃存儲HF5000系列作為該方案的數(shù)據(jù)存儲底座,助力以最優(yōu)的性能和成本為企業(yè)創(chuàng)建大規(guī)模、易擴展、
    的頭像 發(fā)表于 04-01 10:03 ?489次閱讀
    八宿县| 百家乐官网强弱走势图| 百家乐赢多少该止赢| 大悟县| 百家乐牌具公司| 百家乐官网最新首存优惠| 百家乐娱乐城会员| 大发888游戏注册送98| 真人百家乐官网分析软件是骗局| 百家乐猜大小规则| 博马百家乐官网娱乐城| 博彩网百家乐中和局| 永利高百家乐官网开户| 赌场百家乐的玩法技巧和规则| 百家乐官网庄9点| 大发888优惠代码| 百家乐去哪里玩最好| 佛坪县| 基础百家乐的玩法技巧和规则| 百家乐官网赌博娱乐城大全| 棋牌新闻| 广东百家乐扫描分析仪| 网上百家乐官网内幕| 威尼斯人娱乐场官网h00| 迪士尼百家乐官网的玩法技巧和规则 | 新大发888pt老虎机| 百家乐有没有稳赢| 金臂百家乐| 百家乐路单怎样| 百家乐官网凯时娱乐平台| 网上大发扑克| 百家乐屏风| 至尊百家乐官网赌场娱乐网规则| 娱乐城棋牌| 百家乐太阳城真人游戏| 百家乐官网电子路单谁| 绥江县| 全讯网.com| 百家乐游戏规则介绍| 使用的百家乐官网软件| 大发888开户注册|