所謂的全雙工表示客戶端和服務(wù)端都能向?qū)Ψ桨l(fā)送消" />

国产成人精品无码青草_亚洲国产美女精品久久久久∴_欧美人与鲁交大毛片免费_国产果冻豆传媒麻婆精东

18143453325 在線咨詢 在線咨詢
18143453325 在線咨詢
所在位置: 首頁(yè) > 營(yíng)銷資訊 > 網(wǎng)站運(yùn)營(yíng) > Websocket集群解決方案

Websocket集群解決方案

時(shí)間:2023-04-18 20:22:01 | 來(lái)源:網(wǎng)站運(yùn)營(yíng)

時(shí)間:2023-04-18 20:22:01 來(lái)源:網(wǎng)站運(yùn)營(yíng)

Websocket集群解決方案:最近在項(xiàng)目中在做一個(gè)消息推送的功能,比如客戶下單之后通知給給對(duì)應(yīng)的客戶發(fā)送系統(tǒng)通知,這種消息推送需要使用到全雙工的websocket推送消息。

所謂的全雙工表示客戶端和服務(wù)端都能向?qū)Ψ桨l(fā)送消息。不使用同樣是全雙工的http是因?yàn)?code>http只能由客戶端主動(dòng)發(fā)起請(qǐng)求,服務(wù)接收后返回消息。websocket建立起連接之后,客戶端和服務(wù)端都能主動(dòng)向?qū)Ψ桨l(fā)送消息。
上一篇文章Spring Boot 整合單機(jī)websocket介紹了websocket在單機(jī)模式下進(jìn)行消息的發(fā)送和接收:

用戶A用戶Bweb服務(wù)器建立連接之后,用戶A發(fā)送一條消息到服務(wù)器,服務(wù)器再推送給用戶B,在單機(jī)系統(tǒng)上所有的用戶都和同一個(gè)服務(wù)器建立連接,所有的session都存儲(chǔ)在同一個(gè)服務(wù)器中。

單個(gè)服務(wù)器是無(wú)法支撐幾萬(wàn)人同時(shí)連接同一個(gè)服務(wù)器,需要使用到分布式或者集群將請(qǐng)求連接負(fù)載均衡到到不同的服務(wù)下。消息的發(fā)送方和接收方在同一個(gè)服務(wù)器,這就和單體服務(wù)器類似,能成功接收到消息:

但負(fù)載均衡使用輪詢的算法,無(wú)法保證消息發(fā)送方和接收方處于同一個(gè)服務(wù)器,當(dāng)發(fā)送方和接收方不是在同一個(gè)服務(wù)器時(shí),接收方是無(wú)法接受到消息的:

websocket集群?jiǎn)栴}解決思路

客戶端和服務(wù)端每次建立連接時(shí)候,會(huì)創(chuàng)建有狀態(tài)的會(huì)話session,服務(wù)器的保存維持連接的session??蛻舳嗣看沃荒芎图悍?wù)器其中的一個(gè)服務(wù)器連接,后續(xù)也是和該服務(wù)器進(jìn)行數(shù)據(jù)傳輸。

要解決集群的問(wèn)題,應(yīng)該考慮session共享的問(wèn)題,客戶端成功連接服務(wù)器之后,其他服務(wù)器也知道客戶端連接成功。

方案一:session 共享(不可行)

websocket類似的http是如何解決集群?jiǎn)栴}的?解決方案之一就是共享session,客戶端登錄服務(wù)端之后,將session信息存儲(chǔ)在Redis數(shù)據(jù)庫(kù)中,連接其他服務(wù)器時(shí),從Redis獲取session,實(shí)際就是將session信息存儲(chǔ)在Redis中,實(shí)現(xiàn)redis的共享。

session可以被共享的前提是可以被序列化,而websocketsession是無(wú)法被序列化的,httpsession記錄的是請(qǐng)求的數(shù)據(jù),而websocketsession對(duì)應(yīng)的是連接,連接到不同的服務(wù)器,session也不同,無(wú)法被序列化。

方案二:ip hash(不可行)

http不使用session共享,就可以使用Nginx負(fù)載均衡的ip hash算法,客戶端每次都是請(qǐng)求同一個(gè)服務(wù)器,客戶端的session都保存在服務(wù)器上,而后續(xù)請(qǐng)求都是請(qǐng)求該服務(wù)器,都能獲取到session,就不存在分布式session問(wèn)題了。

websocket相對(duì)http來(lái)說(shuō),可以由服務(wù)端主動(dòng)推動(dòng)消息給客戶端,如果接收消息的服務(wù)端和發(fā)送消息消息的服務(wù)端不是同一個(gè)服務(wù)端,發(fā)送消息的服務(wù)端無(wú)法找到接收消息對(duì)應(yīng)的session,即兩個(gè)session不處于同一個(gè)服務(wù)端,也就無(wú)法推送消息。如下圖所示:

解決問(wèn)題的方法是將所有消息的發(fā)送方和接收方都處于同一個(gè)服務(wù)器下,而消息發(fā)送方和接收方都是不確定的,顯然是無(wú)法實(shí)現(xiàn)的。

方案三:廣播模式

將消息的發(fā)送方和接收方都處于同一個(gè)服務(wù)器下才能發(fā)送消息,那么可以轉(zhuǎn)換一下思路,可以將消息以消息廣播的方式通知給所有的服務(wù)器,可以使用消息中間件發(fā)布訂閱模式,消息脫離了服務(wù)器的限制,通過(guò)發(fā)送到中間件,再發(fā)送給訂閱的服務(wù)器,類似廣播一樣,只要訂閱了消息,都能接收到消息的通知:

發(fā)布者發(fā)布消息到消息中間件,消息中間件再將發(fā)送給所有訂閱者:

廣播模式的實(shí)現(xiàn)

搭建單機(jī) websocket

參考以前寫的websocket單機(jī)搭建 文章,先搭建單機(jī)websocket實(shí)現(xiàn)消息的推送。

1. 添加依賴

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-freemarker</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>

2. 創(chuàng)建 ServerEndpointExporter 的 bean 實(shí)例

ServerEndpointExporter 的 bean 實(shí)例自動(dòng)注冊(cè) @ServerEndpoint 注解聲明的 websocket endpoint,使用springboot自帶tomcat啟動(dòng)需要該配置,使用獨(dú)立 tomcat 則不需要該配置。

@Configurationpublic class WebSocketConfig { //tomcat啟動(dòng)無(wú)需該配置 @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}

3. 創(chuàng)建服務(wù)端點(diǎn) ServerEndpoint 和 客戶端端

@Component@ServerEndpoint(value = "/message")@Slf4jpublic class WebSocket { private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>(); private Session session; @OnOpen public void onOpen(Session session) throws SocketException { this.session = session; webSocketSet.put(this.session.getId(),this); log.info("【websocket】有新的連接,總數(shù):{}",webSocketSet.size()); } @OnClose public void onClose(){ String id = this.session.getId(); if (id != null){ webSocketSet.remove(id); log.info("【websocket】連接斷開(kāi):總數(shù):{}",webSocketSet.size()); } } @OnMessage public void onMessage(String message){ if (!message.equals("ping")){ log.info("【wesocket】收到客戶端發(fā)送的消息,message={}",message); sendMessage(message); } } /** * 發(fā)送消息 * @param message * @return */ public void sendMessage(String message){ for (WebSocket webSocket : webSocketSet.values()) { webSocket.session.getAsyncRemote().sendText(message); } log.info("【wesocket】發(fā)送消息,message={}", message); }}<div> <input type="text" name="message" id="message"> <button id="sendBtn">發(fā)送</button></div><div style="width:100px;height: 500px;" id="content"></div><script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script><script type="text/javascript"> var ws = new WebSocket("ws://127.0.0.1:8080/message"); ws.onopen = function(evt) { console.log("Connection open ..."); }; ws.onmessage = function(evt) { console.log( "Received Message: " + evt.data); var p = $("<p>"+evt.data+"</p>") $("#content").prepend(p); $("#message").val(""); }; ws.onclose = function(evt) { console.log("Connection closed."); }; $("#sendBtn").click(function(){ var aa = $("#message").val(); ws.send(aa); })</script>服務(wù)端和客戶端中的OnOpenonclose、onmessage都是一一對(duì)應(yīng)的。

添加 controller

@GetMapping({"","index.html"})public ModelAndView index() { ModelAndView view = new ModelAndView("index"); return view;}

效果展示

打開(kāi)兩個(gè)客戶端,其中的一個(gè)客戶端發(fā)送消息,另一個(gè)客戶端也能接收到消息。

添加 RabbitMQ 中間件

這里使用比較常用的RabbitMQ作為消息中間件,而RabbitMQ支持發(fā)布訂閱模式

添加消息訂閱

交換機(jī)使用扇形交換機(jī),消息分發(fā)給每一條綁定該交換機(jī)的隊(duì)列。以服務(wù)器所在的IP + 端口作為唯一標(biāo)識(shí)作為隊(duì)列的命名,啟動(dòng)一個(gè)服務(wù),使用隊(duì)列綁定交換機(jī),實(shí)現(xiàn)消息的訂閱:

@Configurationpublic class RabbitConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE"); } @Bean public Queue psQueue() throws SocketException { // ip + 端口 為隊(duì)列名 String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort(); return new Queue("ps_" + ip); } @Bean public Binding routingFirstBinding() throws SocketException { return BindingBuilder.bind(psQueue()).to(fanoutExchange()); }}
獲取服務(wù)器IP和端口可以具體查看Github源碼,這里就不做詳細(xì)描述了。

修改服務(wù)端點(diǎn) ServerEndpoint

WebSocket添加消息的接收方法,@RabbitListener 接收消息,隊(duì)列名稱使用常量命名,動(dòng)態(tài)隊(duì)列名稱使用 #{name},其中的nameQueuebean 名稱:

@RabbitListener(queues= "#{psQueue.name}")public void pubsubQueueFirst(String message) { System.out.println(message); sendMessage(message);}然后再調(diào)用sendMessage方法發(fā)送給所在連接的客戶端。

修改消息發(fā)送

WebSocket類的onMessage方法將消息發(fā)送改成RabbitMQ方式發(fā)送:

@OnMessagepublic void onMessage(String message){ if (!message.equals("ping")){ log.info("【wesocket】收到客戶端發(fā)送的消息,message={}",message); //sendMessage(message); if (rabbitTemplate == null) { rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate"); } rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message); }}消息通知流程如下所示:

啟動(dòng)兩個(gè)實(shí)例,模擬集群環(huán)境

打開(kāi)idea的Edit Configurations

點(diǎn)擊左上角的COPY,然后添加端口server.port=8081

啟動(dòng)兩個(gè)服務(wù),端口分別是80808081。在啟動(dòng)8081端口的服務(wù),將前端連接端口改成8081:

var ws = new WebSocket("ws://127.0.0.1:8081/message");

效果展示

源碼

github源碼

參考

關(guān)鍵詞:方案,解決

74
73
25
news

版權(quán)所有? 億企邦 1997-2025 保留一切法律許可權(quán)利。

為了最佳展示效果,本站不支持IE9及以下版本的瀏覽器,建議您使用谷歌Chrome瀏覽器。 點(diǎn)擊下載Chrome瀏覽器
關(guān)閉