網站首頁 編程語言 正文
原理
websocket的訂閱就是在前后端建立ws連接之后,前端通過發送一定格式的消息,后端解析出來去訂閱或者取消訂閱redis頻道。
訂閱頻道消息格式:
{
"cmd":"subscribe",
"topic":[
"topic_name"
]
}
模糊訂閱格式
{
"cmd":"psubscribe",
"topic":[
"topic_name"
]
}
取消訂閱格式
{
"cmd":"unsubscribe",
"topic":[
"topic_name"
]
}
兩個核心類,一個是redis的訂閱監聽類,一個是websocket的發布訂閱類。
redis訂閱監聽類
package com.curtain.core;
import com.curtain.config.GetBeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
import java.util.Arrays;
/**
?* @Author Curtain
?* @Date 2021/6/7 14:27
?* @Description
?*/
@Component
@Slf4j
public class RedisPubSub extends JedisPubSub {
? ? private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class);
? ? private Jedis jedis;
? ? //訂閱
? ? public void subscribe(String... channels) {
? ? ? ? jedis = jedisPool.getResource();
? ? ? ? try {
? ? ? ? ? ? jedis.subscribe(this, channels);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? log.error(e.getMessage());
? ? ? ? ? ? if (jedis != null)
? ? ? ? ? ? ? ? jedis.close();
? ? ? ? ? ? //遇到異常后關閉連接重新訂閱
? ? ? ? ? ? log.info("監聽遇到異常,四秒后重新訂閱頻道:");
? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);});
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Thread.sleep(4000);
? ? ? ? ? ? } catch (InterruptedException interruptedException) {
? ? ? ? ? ? ? ? interruptedException.printStackTrace();
? ? ? ? ? ? }
? ? ? ? ? ? subscribe(channels);
? ? ? ? }
? ? }
? ? //模糊訂閱
? ? public void psubscribe(String... channels) {
? ? ? ? Jedis jedis = jedisPool.getResource();
? ? ? ? try {
? ? ? ? ? ? jedis.psubscribe(this, channels);
? ? ? ? } catch (ArithmeticException e) {//取消訂閱故意造成的異常
? ? ? ? ? ? if (jedis != null)
? ? ? ? ? ? ? ? jedis.close();
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? log.error(e.getMessage());
? ? ? ? ? ? if (jedis != null)
? ? ? ? ? ? ? ? jedis.close();
? ? ? ? ? ? //遇到異常后關閉連接重新訂閱
? ? ? ? ? ? log.info("監聽遇到異常,四秒后重新訂閱頻道:");
? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);});
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Thread.sleep(4000);
? ? ? ? ? ? } catch (InterruptedException interruptedException) {
? ? ? ? ? ? ? ? interruptedException.printStackTrace();
? ? ? ? ? ? }
? ? ? ? ? ? psubscribe(channels);
? ? ? ? }
? ? }
? ? public void unsubscribeAndClose(String... channels){
? ? ? ? unsubscribe(channels);
? ? ? ? if (jedis != null && !isSubscribed())
? ? ? ? ? ? jedis.close();
? ? }
? ? public void punsubscribeAndClose(String... channels){
? ? ? ? punsubscribe(channels);
? ? ? ? if (jedis != null && !isSubscribed())
? ? ? ? ? ? jedis.close();
? ? }
? ? @Override
? ? public void onSubscribe(String channel, int subscribedChannels) {
? ? ? ? log.info("subscribe redis channel:" + channel + ", 線程id:" + Thread.currentThread().getId());
? ? }
? ? @Override
? ? public void onPSubscribe(String pattern, int subscribedChannels) {
? ? ? ? log.info("psubscribe redis channel:" + pattern + ", 線程id:" + Thread.currentThread().getId());
? ? }
? ? @Override
? ? public void onPMessage(String pattern, String channel, String message) {
? ? ? ? log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 線程id:" + Thread.currentThread().getId());
? ? ? ? WebSocketServer.publish(message, pattern);
? ? ? ? WebSocketServer.publish(message, channel);
? ? }
? ? @Override
? ? public void onMessage(String channel, String message) {
? ? ? ? log.info("receive from redis channal: " + channel + ",message:" + message + ", 線程id:" + Thread.currentThread().getId());
? ? ? ? WebSocketServer.publish(message, channel);
? ? }
? ? @Override
? ? public void onUnsubscribe(String channel, int subscribedChannels) {
? ? ? ? log.info("unsubscribe redis channel:" + channel);
? ? }
? ? @Override
? ? public void onPUnsubscribe(String pattern, int subscribedChannels) {
? ? ? ? log.info("punsubscribe redis channel:" + pattern);
? ? }
}
1.jedis監聽redis頻道的時候如果遇見異常會關閉連接導致后續沒有監聽該頻道,所以這里在subscribe捕獲到異常的時候會重新創建一個jedis連接訂閱該redis頻道。
webSocket訂閱推送類
這個類會有兩個ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>>類型類變量,分別存儲訂閱和模糊訂閱的信息。
外面一層的String對應的值是topic_name,里面一層的String對應的值是sessionId。前端發送過來的消息里面對應的這三類操作其實就是對這兩個map里面的。
還有個ConcurrentHashMap<String, RedisPubSub>類型的變量,存儲的是事件-RedisPubSub,便于取消訂閱的時候找到監聽該頻道(事件)的RedisPubSub對象。
信息進行增加或者刪除;后端往前端推送數據也會根據不同的topic_name推送到不同的訂閱者這邊。
package com.curtain.core;
import com.alibaba.fastjson.JSON;
import com.curtain.config.WebsocketProperties;
import com.curtain.service.Cancelable;
import com.curtain.service.impl.TaskExecuteService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
?* @Author Curtain
?* @Date 2021/5/14 16:49
?* @Description
?*/
@ServerEndpoint("/ws")
@Component
@Slf4j
public class WebSocketServer {
? ? /**
? ? ?* concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。
? ? ?*/
? ? private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
? ? /**
? ? ?* 存放psub的事件
? ? ?**/
? ? private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> pWebSocketMap = new ConcurrentHashMap<>();
? ? /**
? ? ?* 存放topic(pattern)-對應的RedisPubsub
? ? ?*/
? ? private static volatile ConcurrentHashMap<String, RedisPubSub> redisPubSubMap = new ConcurrentHashMap<>();
? ? /**
? ? ?* 與某個客戶端的連接會話,需要通過它來給客戶端發送數據
? ? ?*/
? ? private Session session;
? ? private String sessionId = "";
? ? //要注入的對象
? ? private static TaskExecuteService executeService;
? ? private static WebsocketProperties properties;
? ? private Cancelable cancelable;
? ? @Autowired
? ? public void setTaskExecuteService(TaskExecuteService taskExecuteService) {
? ? ? ? WebSocketServer.executeService = taskExecuteService;
? ? }
? ? @Autowired
? ? public void setWebsocketProperties(WebsocketProperties properties) {
? ? ? ? WebSocketServer.properties = properties;
? ? }
? ? /**
? ? ?* 連接建立成功調用的方法
? ? ?*/
? ? @OnOpen
? ? public void onOpen(Session session) {
? ? ? ? this.session = session;
? ? ? ? this.sessionId = session.getId();
? ? ? ? //構造推送數據
? ? ? ? Map pubHeader = new HashMap();
? ? ? ? pubHeader.put("name", "connect_status");
? ? ? ? pubHeader.put("type", "create");
? ? ? ? pubHeader.put("from", "pubsub");
? ? ? ? pubHeader.put("time", new Date().getTime() / 1000);
? ? ? ? Map pubPayload = new HashMap();
? ? ? ? pubPayload.put("status", "success");
? ? ? ? Map pubMap = new HashMap();
? ? ? ? pubMap.put("header", pubHeader);
? ? ? ? pubMap.put("payload", pubPayload);
? ? ? ? sendMessage(JSON.toJSONString(pubMap));
? ? ? ? cancelable = executeService.runPeriodly(() -> {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? if (cancelable != null && !session.isOpen()) {
? ? ? ? ? ? ? ? ? ? log.info("斷開連接,停止發送ping");
? ? ? ? ? ? ? ? ? ? cancelable.cancel();
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? String data = "ping";
? ? ? ? ? ? ? ? ? ? ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
? ? ? ? ? ? ? ? ? ? session.getBasicRemote().sendPing(payload);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }, properties.getPeriod());
? ? }
? ? @OnMessage
? ? public void onMessage(String message) {
? ? ? ? synchronized (session) {
? ? ? ? ? ? Map msgMap = (Map) JSON.parse(message);
? ? ? ? ? ? String cmd = (String) msgMap.get("cmd");
? ? ? ? ? ? //訂閱消息
? ? ? ? ? ? if ("subscribe".equals(cmd)) {
? ? ? ? ? ? ? ? List<String> topics = (List<String>) msgMap.get("topic");
? ? ? ? ? ? ? ? //本地記錄訂閱信息
? ? ? ? ? ? ? ? for (int i = 0; i < topics.size(); i++) {
? ? ? ? ? ? ? ? ? ? String topic = topics.get(i);
? ? ? ? ? ? ? ? ? ? log.info("============================subscribe-start============================");
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",開始訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? if (webSocketMap.containsKey(topic)) {//有人訂閱過了
? ? ? ? ? ? ? ? ? ? ? ? webSocketMap.get(topic).put(this.sessionId, this);
? ? ? ? ? ? ? ? ? ? } else {//之前還沒人訂閱過,所以需要訂閱redis頻道
? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
? ? ? ? ? ? ? ? ? ? ? ? map.put(this.sessionId, this);
? ? ? ? ? ? ? ? ? ? ? ? webSocketMap.put(topic, map);
? ? ? ? ? ? ? ? ? ? ? ? new Thread(() -> {
? ? ? ? ? ? ? ? ? ? ? ? ? ? RedisPubSub redisPubSub = new RedisPubSub();
? ? ? ? ? ? ? ? ? ? ? ? ? ? //存入map
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.put(topic, redisPubSub);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSub.subscribe(topic);
? ? ? ? ? ? ? ? ? ? ? ? }).start();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",完成訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? log();
? ? ? ? ? ? ? ? ? ? log.info("============================subscribe-end============================");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //psubscribe
? ? ? ? ? ? if ("psubscribe".equals(cmd)) {
? ? ? ? ? ? ? ? List<String> topics = (List<String>) msgMap.get("topic");
? ? ? ? ? ? ? ? //本地記錄訂閱信息
? ? ? ? ? ? ? ? for (int i = 0; i < topics.size(); i++) {
? ? ? ? ? ? ? ? ? ? String topic = topics.get(i);
? ? ? ? ? ? ? ? ? ? log.info("============================psubscribe-start============================");
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",開始模糊訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? if (pWebSocketMap.containsKey(topic)) {//有人訂閱過了
? ? ? ? ? ? ? ? ? ? ? ? pWebSocketMap.get(topic).put(this.sessionId, this);
? ? ? ? ? ? ? ? ? ? } else {//之前還沒人訂閱過,所以需要訂閱redis頻道
? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
? ? ? ? ? ? ? ? ? ? ? ? map.put(this.sessionId, this);
? ? ? ? ? ? ? ? ? ? ? ? pWebSocketMap.put(topic, map);
? ? ? ? ? ? ? ? ? ? ? ? new Thread(() -> {
? ? ? ? ? ? ? ? ? ? ? ? ? ? RedisPubSub redisPubSub = new RedisPubSub();
? ? ? ? ? ? ? ? ? ? ? ? ? ? //存入map
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.put(topic, redisPubSub);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSub.psubscribe(topic);
? ? ? ? ? ? ? ? ? ? ? ? }).start();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",完成模糊訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? log();
? ? ? ? ? ? ? ? ? ? log.info("============================psubscribe-end============================");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //取消訂閱
? ? ? ? ? ? if ("unsubscribe".equals(cmd)) {
? ? ? ? ? ? ? ? List<String> topics = (List<String>) msgMap.get("topic");
? ? ? ? ? ? ? ? //刪除本地對應的訂閱信息
? ? ? ? ? ? ? ? for (String topic : topics) {
? ? ? ? ? ? ? ? ? ? log.info("============================unsubscribe-start============================");
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",開始刪除訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? if (webSocketMap.containsKey(topic)) {
? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
? ? ? ? ? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? ? ? ? ? webSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).unsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? if (pWebSocketMap.containsKey(topic)) {
? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
? ? ? ? ? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? ? ? ? ? pWebSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).punsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",完成刪除訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? log();
? ? ? ? ? ? ? ? ? ? log.info("============================unsubscribe-end============================");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? @OnMessage
? ? public void onPong(PongMessage pongMessage) {
? ? ? ? try {
? ? ? ? ? ? log.debug(new String(pongMessage.getApplicationData().array(), "utf-8") + "接收到pong");
? ? ? ? } catch (UnsupportedEncodingException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? /**
? ? ?* 連接關閉調用的方法
? ? ?*/
? ? @OnClose
? ? public void onClose() {
? ? ? ? synchronized (session) {
? ? ? ? ? ? log.info("============================onclose-start============================");
? ? ? ? ? ? //刪除訂閱
? ? ? ? ? ? Iterator iterator = webSocketMap.keySet().iterator();
? ? ? ? ? ? while (iterator.hasNext()) {
? ? ? ? ? ? ? ? String topic = (String) iterator.next();
? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? webSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).unsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //刪除模糊訂閱
? ? ? ? ? ? Iterator iteratorP = pWebSocketMap.keySet().iterator();
? ? ? ? ? ? while (iteratorP.hasNext()) {
? ? ? ? ? ? ? ? String topic = (String) iteratorP.next();
? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? pWebSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).punsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",斷開連接:");
? ? ? ? ? ? //debug
? ? ? ? ? ? log();
? ? ? ? ? ? log.info("============================onclose-end============================");
? ? ? ? }
? ? }
? ? /**
? ? ?* @param session
? ? ?* @param error
? ? ?*/
? ? @OnError
? ? public void onError(Session session, Throwable error) {
? ? ? ? synchronized (session) {
? ? ? ? ? ? log.info("============================onError-start============================");
? ? ? ? ? ? log.error("用戶錯誤,sessionId:" + session.getId() + ",原因:" + error.getMessage());
? ? ? ? ? ? error.printStackTrace();
? ? ? ? ? ? log.info("關閉錯誤用戶對應的連接");
? ? ? ? ? ? //刪除訂閱
? ? ? ? ? ? Iterator iterator = webSocketMap.keySet().iterator();
? ? ? ? ? ? while (iterator.hasNext()) {
? ? ? ? ? ? ? ? String topic = (String) iterator.next();
? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? webSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).unsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //刪除模糊訂閱
? ? ? ? ? ? Iterator iteratorP = pWebSocketMap.keySet().iterator();
? ? ? ? ? ? while (iteratorP.hasNext()) {
? ? ? ? ? ? ? ? String topic = (String) iteratorP.next();
? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? pWebSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).punsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? log.info("完成錯誤用戶對應的連接關閉");
? ? ? ? ? ? //debug
? ? ? ? ? ? log();
? ? ? ? ? ? log.info("============================onError-end============================");
? ? ? ? }
? ? }
? ? /**
? ? ?* 實現服務器主動推送
? ? ?*/
? ? public void sendMessage(String message) {
? ? ? ? synchronized (session) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? this.session.getBasicRemote().sendText(message);
? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static void publish(String msg, String topic) {
? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
? ? ? ? if (map != null && map.values() != null) {
? ? ? ? ? ? for (WebSocketServer webSocketServer : map.values())
? ? ? ? ? ? ? ? webSocketServer.sendMessage(msg);
? ? ? ? }
? ? ? ? map = pWebSocketMap.get(topic);
? ? ? ? if (map != null && map.values() != null) {
? ? ? ? ? ? for (WebSocketServer webSocketServer : map.values())
? ? ? ? ? ? ? ? webSocketServer.sendMessage(msg);
? ? ? ? }
? ? }
? ? private void log() {
? ? ? ? log.info("<<<<<<<<<<<完成操作后,打印訂閱信息開始>>>>>>>>>>");
? ? ? ? Iterator iterator1 = webSocketMap.keySet().iterator();
? ? ? ? while (iterator1.hasNext()) {
? ? ? ? ? ? String topic = (String) iterator1.next();
? ? ? ? ? ? log.info("topic:" + topic);
? ? ? ? ? ? Iterator iterator2 = webSocketMap.get(topic).keySet().iterator();
? ? ? ? ? ? while (iterator2.hasNext()) {
? ? ? ? ? ? ? ? String session = (String) iterator2.next();
? ? ? ? ? ? ? ? log.info("訂閱" + topic + "的sessionId:" + session);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? log.info("<<<<<<<<<<<完成操作后,打印訂閱信息結束>>>>>>>>>>");
? ? }
}
項目地址
上面介紹了核心代碼,下面是完整代碼地址
https://github.com/Curtain-Wang/websocket-redis-subscribe.git
Update20220415
參考評論區老哥的建議,將redis訂閱監聽類里面的subscribe和psubscribe方法調整如下:
? ? //訂閱
? ? @Override
? ? public void subscribe(String... channels) {
? ? ? ? boolean done = true;
? ? ? ? while (done){
? ? ? ? ? ? Jedis jedis = jedisPool.getResource();
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? jedis.subscribe(this, channels);
? ? ? ? ? ? ? ? done = false;
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? log.error(e.getMessage());
? ? ? ? ? ? ? ? if (jedis != null)
? ? ? ? ? ? ? ? ? ? jedis.close();
? ? ? ? ? ? ? ? //遇到異常后關閉連接重新訂閱
? ? ? ? ? ? ? ? log.info("監聽遇到異常,四秒后重新訂閱頻道:");
? ? ? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);});
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? Thread.sleep(4000);
? ? ? ? ? ? ? ? } catch (InterruptedException interruptedException) {
? ? ? ? ? ? ? ? ? ? interruptedException.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? //模糊訂閱
? ? @Override
? ? public void psubscribe(String... channels) {
? ? ? ? boolean done = true;
? ? ? ? while (done){
? ? ? ? ? ? Jedis jedis = jedisPool.getResource();
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? jedis.psubscribe(this, channels);
? ? ? ? ? ? ? ? done = false;
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? log.error(e.getMessage());
? ? ? ? ? ? ? ? if (jedis != null)
? ? ? ? ? ? ? ? ? ? jedis.close();
? ? ? ? ? ? ? ? //遇到異常后關閉連接重新訂閱
? ? ? ? ? ? ? ? log.info("監聽遇到異常,四秒后重新訂閱頻道:");
? ? ? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);});
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? Thread.sleep(4000);
? ? ? ? ? ? ? ? } catch (InterruptedException interruptedException) {
? ? ? ? ? ? ? ? ? ? interruptedException.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
原文鏈接:https://blog.csdn.net/qq_41953872/article/details/117361928
相關推薦
- 2022-07-21 Hadoop-HDFS分布式文件系統基礎
- 2022-11-05 Nginx監控模塊(vts模塊)詳解_nginx
- 2022-06-16 nginx限流及配置管理實戰記錄_nginx
- 2023-04-18 Python中selenium獲取token的方法_python
- 2022-09-18 面試必問Linux?命令su和sudo的區別解析_linux shell
- 2022-12-24 Typescript中interface與type的相同點與不同點的詳細說明_基礎知識
- 2023-01-12 Python讀取及保存mat文件的注意事項說明_python
- 2022-12-29 基于Python寫一個番茄鐘小工具_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支