時(shí)間:2023-04-18 20:22:01 | 來(lái)源:網(wǎng)站運(yùn)營(yíng)
時(shí)間:2023-04-18 20:22:01 來(lái)源:網(wǎng)站運(yùn)營(yíng)
Websocket集群解決方案:最近在項(xiàng)目中在做一個(gè)消息推送的功能,比如客戶下單之后通知給給對(duì)應(yīng)的客戶發(fā)送系統(tǒng)通知,這種消息推送需要使用到全雙工的websocket
推送消息。所謂的全雙工表示客戶端和服務(wù)端都能向?qū)Ψ桨l(fā)送消息。不使用同樣是全雙工的上一篇文章Spring Boot 整合單機(jī)websocket介紹了http
是因?yàn)?code>http只能由客戶端主動(dòng)發(fā)起請(qǐng)求,服務(wù)接收后返回消息。websocket
建立起連接之后,客戶端和服務(wù)端都能主動(dòng)向?qū)Ψ桨l(fā)送消息。
websocket
在單機(jī)模式下進(jìn)行消息的發(fā)送和接收:用戶A
和用戶B
和web
服務(wù)器建立連接之后,用戶A
發(fā)送一條消息到服務(wù)器,服務(wù)器再推送給用戶B
,在單機(jī)系統(tǒng)上所有的用戶都和同一個(gè)服務(wù)器建立連接,所有的session
都存儲(chǔ)在同一個(gè)服務(wù)器中。session
,服務(wù)器的保存維持連接的session
??蛻舳嗣看沃荒芎图悍?wù)器其中的一個(gè)服務(wù)器連接,后續(xù)也是和該服務(wù)器進(jìn)行數(shù)據(jù)傳輸。session共享
的問(wèn)題,客戶端成功連接服務(wù)器之后,其他服務(wù)器也知道客戶端連接成功。websocket
類似的http
是如何解決集群?jiǎn)栴}的?解決方案之一就是共享session
,客戶端登錄服務(wù)端之后,將session
信息存儲(chǔ)在Redis
數(shù)據(jù)庫(kù)中,連接其他服務(wù)器時(shí),從Redis
獲取session
,實(shí)際就是將session
信息存儲(chǔ)在Redis
中,實(shí)現(xiàn)redis的共享。session
可以被共享的前提是可以被序列化,而websocket
的session
是無(wú)法被序列化的,http
的session
記錄的是請(qǐng)求的數(shù)據(jù),而websocket
的session
對(duì)應(yīng)的是連接,連接到不同的服務(wù)器,session
也不同,無(wú)法被序列化。http
不使用session
共享,就可以使用Nginx
負(fù)載均衡的ip hash
算法,客戶端每次都是請(qǐng)求同一個(gè)服務(wù)器,客戶端的session
都保存在服務(wù)器上,而后續(xù)請(qǐng)求都是請(qǐng)求該服務(wù)器,都能獲取到session
,就不存在分布式session
問(wèn)題了。websocket
相對(duì)http
來(lái)說(shuō),可以由服務(wù)端主動(dòng)推動(dòng)消息給客戶端,如果接收消息的服務(wù)端和發(fā)送消息消息的服務(wù)端不是同一個(gè)服務(wù)端,發(fā)送消息的服務(wù)端無(wú)法找到接收消息對(duì)應(yīng)的session
,即兩個(gè)session不處于同一個(gè)服務(wù)端,也就無(wú)法推送消息。如下圖所示:解決問(wèn)題的方法是將所有消息的發(fā)送方和接收方都處于同一個(gè)服務(wù)器下,而消息發(fā)送方和接收方都是不確定的,顯然是無(wú)法實(shí)現(xiàn)的。
websocket
實(shí)現(xiàn)消息的推送。<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-freemarker</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
@Configurationpublic class WebSocketConfig { //tomcat啟動(dòng)無(wú)需該配置 @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}
@Component@ServerEndpoint(value = "/message")@Slf4jpublic class WebSocket { private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>(); private Session session; @OnOpen public void onOpen(Session session) throws SocketException { this.session = session; webSocketSet.put(this.session.getId(),this); log.info("【websocket】有新的連接,總數(shù):{}",webSocketSet.size()); } @OnClose public void onClose(){ String id = this.session.getId(); if (id != null){ webSocketSet.remove(id); log.info("【websocket】連接斷開(kāi):總數(shù):{}",webSocketSet.size()); } } @OnMessage public void onMessage(String message){ if (!message.equals("ping")){ log.info("【wesocket】收到客戶端發(fā)送的消息,message={}",message); sendMessage(message); } } /** * 發(fā)送消息 * @param message * @return */ public void sendMessage(String message){ for (WebSocket webSocket : webSocketSet.values()) { webSocket.session.getAsyncRemote().sendText(message); } log.info("【wesocket】發(fā)送消息,message={}", message); }}
<div> <input type="text" name="message" id="message"> <button id="sendBtn">發(fā)送</button></div><div style="width:100px;height: 500px;" id="content"></div><script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script><script type="text/javascript"> var ws = new WebSocket("ws://127.0.0.1:8080/message"); ws.onopen = function(evt) { console.log("Connection open ..."); }; ws.onmessage = function(evt) { console.log( "Received Message: " + evt.data); var p = $("<p>"+evt.data+"</p>") $("#content").prepend(p); $("#message").val(""); }; ws.onclose = function(evt) { console.log("Connection closed."); }; $("#sendBtn").click(function(){ var aa = $("#message").val(); ws.send(aa); })</script>
服務(wù)端和客戶端中的OnOpen
、onclose
、onmessage
都是一一對(duì)應(yīng)的。ws.onopen
調(diào)用服務(wù)端的@OnOpen
注解的方法,儲(chǔ)存客戶端的session信息,握手建立連接。ws.send
發(fā)送消息,對(duì)應(yīng)服務(wù)端的@OnMessage
注解下面的方法接收消息。session.getAsyncRemote().sendText
發(fā)送消息,對(duì)應(yīng)的客戶端ws.onmessage
接收消息。@GetMapping({"","index.html"})public ModelAndView index() { ModelAndView view = new ModelAndView("index"); return view;}
RabbitMQ
作為消息中間件,而RabbitMQ
支持發(fā)布訂閱模式:@Configurationpublic class RabbitConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE"); } @Bean public Queue psQueue() throws SocketException { // ip + 端口 為隊(duì)列名 String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort(); return new Queue("ps_" + ip); } @Bean public Binding routingFirstBinding() throws SocketException { return BindingBuilder.bind(psQueue()).to(fanoutExchange()); }}
獲取服務(wù)器IP和端口可以具體查看Github源碼,這里就不做詳細(xì)描述了。
WebSocket
添加消息的接收方法,@RabbitListener
接收消息,隊(duì)列名稱使用常量命名,動(dòng)態(tài)隊(duì)列名稱使用 #{name}
,其中的name
是Queue
的bean
名稱:@RabbitListener(queues= "#{psQueue.name}")public void pubsubQueueFirst(String message) { System.out.println(message); sendMessage(message);}
然后再調(diào)用sendMessage
方法發(fā)送給所在連接的客戶端。WebSocket
類的onMessage
方法將消息發(fā)送改成RabbitMQ
方式發(fā)送:@OnMessagepublic void onMessage(String message){ if (!message.equals("ping")){ log.info("【wesocket】收到客戶端發(fā)送的消息,message={}",message); //sendMessage(message); if (rabbitTemplate == null) { rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate"); } rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message); }}
消息通知流程如下所示:Edit Configurations
:server.port=8081
:8080
和8081
。在啟動(dòng)8081
端口的服務(wù),將前端連接端口改成8081
:var ws = new WebSocket("ws://127.0.0.1:8081/message");
關(guān)鍵詞:方案,解決
客戶&案例
營(yíng)銷資訊
關(guān)于我們
客戶&案例
營(yíng)銷資訊
關(guān)于我們
微信公眾號(hào)
版權(quán)所有? 億企邦 1997-2025 保留一切法律許可權(quán)利。