網站首頁 編程語言 正文
昨天重溫Netty學習視頻,發現Netty還能夠提供WebSocket服務,剛好前段時間做了個WebSocket服務的接口,感覺做的不是很好,特地查了一下如何用Springboot整合Netty提供WebSocket服務。經過網上的搜查和昨天視頻學習的現學現用,整理出此篇文章記錄如何用WebSocket接口定時發送消息。
首先引入依賴,這里只放出最基本的依賴:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<optional>true</optional>
</dependency>
<!-- spring boot 2.3版本后,如果需要使用校驗,需手動導入validation包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.63.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
配置文件:
server:
port: 8099
netty:
websocket:
# Websocket服務端口
port: 1024
# 綁定的網卡
ip: 0.0.0.0
# 消息幀最大體積
max-frame-size: 10240
# URI路徑
path: /channel
通過 ApplicationRunner 啟動Websocket服務
package com.netty.demo.websocket.runner;
import com.netty.demo.websocket.handler.WebsocketMessageHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 初始化Netty服務
* @author wl
* @date 2022/10/9
*/
@Slf4j
@Component
public class NettyBootstrapRunner implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware {
@Value("${netty.websocket.port}")
private int port;
@Value("${netty.websocket.ip}")
private String ip;
@Value("${netty.websocket.path}")
private String path;
@Value("${netty.websocket.max-frame-size}")
private long maxFrameSize;
private ApplicationContext applicationContext;
private Channel serverChannel;
@Override
public void run(ApplicationArguments args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(this.ip, this.port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
String uri = fullHttpRequest.uri();
if (!uri.equals(path)) {
// 訪問的路徑不是 websocket的端點地址,響應404
ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND))
.addListener(ChannelFutureListener.CLOSE);
return ;
}
}
super.channelRead(ctx, msg);
}
});
ch.pipeline().addLast(new WebSocketServerCompressionHandler());
ch.pipeline().addLast(new WebSocketServerProtocolHandler(path, null, true, maxFrameSize));
/**
* 從IOC中獲取到Handler
*/
ch.pipeline().addLast(applicationContext.getBean(WebsocketMessageHandler.class));
}
});
Channel channel = serverBootstrap.bind().sync().channel();
this.serverChannel = channel;
log.info("服務啟動, ip={},port={}", this.ip, this.port);
channel.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
if (this.serverChannel != null) {
this.serverChannel.close();
}
log.info("Websocket服務停止");
}
}
NettyBootsrapRunner
?實現了 ApplicationRunner,?ApplicationListener<ContextClosedEvent>
,?ApplicationContextAware
?接口。
這樣一來,NettyBootsrapRunner
?可以在App的啟動和關閉時執行Websocket服務的啟動和關閉。而且通過?ApplicationContextAware
?還能獲取到?ApplicationContext
通過IOC管理 Netty 的Handler
package com.netty.demo.websocket.handler;
import com.netty.demo.websocket.task.SendMsgTask;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/**
* @author wl
* @date 2022/10/9
*/
@ChannelHandler.Sharable
@Slf4j
@Component
public class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
if (msg instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
log.info("獲取消息:{}", textWebSocketFrame.text());
// 響應客戶端
ctx.channel().writeAndFlush(new TextWebSocketFrame("我收到了你的消息:" + System.currentTimeMillis()));
} else {
// 不接受文本以外的數據幀類型
ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
log.info("連接斷開: {}", ctx.channel().remoteAddress());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
log.info("連接創建, {}", ctx.channel().remoteAddress());
ctx.channel().eventLoop().next().scheduleAtFixedRate(new SendMsgTask(ctx), 0, 3, TimeUnit.SECONDS);
}
}
如代碼所示,這個handler在讀取完客戶端的消息后,會給客戶端回復一條消息。
channelActive這個方法里,我又加入了自己的定時任務。連接創建成功后,每隔3秒會執行一次,由eventLoop執行。
其中SendMsgTask代碼如下:
package com.netty.demo.websocket.task;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
/**
* @author wl
* @date 2022/10/9
*/
public class SendMsgTask implements Runnable{
private ChannelHandlerContext ctx;
public SendMsgTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
ctx.channel().writeAndFlush(new TextWebSocketFrame("當前時間:" + LocalDateTime.now()));
}
}
啟動服務,測試結果如圖:
?
原文鏈接:https://blog.csdn.net/wl_Honest/article/details/127228073
相關推薦
- 2022-06-18 C語言簡明講解單引號與雙引號的使用_C 語言
- 2022-02-18 微信小程序----------父組件調用子組件的方法
- 2023-12-12 線程并發協作(生產者/消費者模式)
- 2023-03-17 一文掌握git?push命令_相關技巧
- 2022-02-26 Assert.assertEquals()方法參數詳解_Android
- 2021-10-04 Flutter輸入框TextField屬性及監聽事件介紹_Android
- 2022-04-27 Python腳本后臺運行的五種方式_python
- 2022-07-18 Element-UI:el-table樣式修改
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支