日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Springboot整合Netty提供WebSocket服務

作者:wl_Honest 更新時間: 2022-10-14 編程語言

昨天重溫Netty學習視頻,發現Netty還能夠提供WebSocket服務,剛好前段時間做了個WebSocket服務的接口,感覺做的不是很好,特地查了一下如何用Springboot整合Netty提供WebSocket服務。經過網上的搜查和昨天視頻學習的現學現用,整理出此篇文章記錄如何用WebSocket接口定時發送消息。

首先引入依賴,這里只放出最基本的依賴:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- spring boot 2.3版本后,如果需要使用校驗,需手動導入validation包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.63.Final</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

配置文件:

server:
  port: 8099

netty:
  websocket:
    # Websocket服務端口
    port: 1024
    # 綁定的網卡
    ip: 0.0.0.0
    # 消息幀最大體積
    max-frame-size: 10240
    # URI路徑
    path: /channel

通過 ApplicationRunner 啟動Websocket服務

package com.netty.demo.websocket.runner;

import com.netty.demo.websocket.handler.WebsocketMessageHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * 初始化Netty服務
 * @author wl
 * @date 2022/10/9
 */
@Slf4j
@Component
public class NettyBootstrapRunner implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware {
    @Value("${netty.websocket.port}")
    private int port;

    @Value("${netty.websocket.ip}")
    private String ip;

    @Value("${netty.websocket.path}")
    private String path;

    @Value("${netty.websocket.max-frame-size}")
    private long maxFrameSize;

    private ApplicationContext applicationContext;

    private Channel serverChannel;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(this.ip, this.port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new HttpServerCodec());
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            ch.pipeline().addLast(new HttpObjectAggregator(65536));
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    if(msg instanceof FullHttpRequest) {
                                        FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
                                        String uri = fullHttpRequest.uri();
                                        if (!uri.equals(path)) {
                                            // 訪問的路徑不是 websocket的端點地址,響應404
                                            ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND))
                                                    .addListener(ChannelFutureListener.CLOSE);
                                            return ;
                                        }
                                    }
                                    super.channelRead(ctx, msg);
                                }
                            });
                            ch.pipeline().addLast(new WebSocketServerCompressionHandler());
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler(path, null, true, maxFrameSize));
                            /**
                             * 從IOC中獲取到Handler
                             */
                            ch.pipeline().addLast(applicationContext.getBean(WebsocketMessageHandler.class));
                        }
                    });
            Channel channel = serverBootstrap.bind().sync().channel();
            this.serverChannel = channel;
            log.info("服務啟動, ip={},port={}", this.ip, this.port);
            channel.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
        if (this.serverChannel != null) {
            this.serverChannel.close();
        }
        log.info("Websocket服務停止");
    }
}

NettyBootsrapRunner?實現了 ApplicationRunner,?ApplicationListener<ContextClosedEvent>,?ApplicationContextAware?接口。

這樣一來,NettyBootsrapRunner?可以在App的啟動和關閉時執行Websocket服務的啟動和關閉。而且通過?ApplicationContextAware?還能獲取到?ApplicationContext

通過IOC管理 Netty 的Handler

package com.netty.demo.websocket.handler;

import com.netty.demo.websocket.task.SendMsgTask;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

/**
 * @author wl
 * @date 2022/10/9
 */
@ChannelHandler.Sharable
@Slf4j
@Component
public class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
        if (msg instanceof TextWebSocketFrame) {
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
            log.info("獲取消息:{}", textWebSocketFrame.text());
            // 響應客戶端
            ctx.channel().writeAndFlush(new TextWebSocketFrame("我收到了你的消息:" + System.currentTimeMillis()));
        } else {
            // 不接受文本以外的數據幀類型
            ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        log.info("連接斷開: {}", ctx.channel().remoteAddress());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        log.info("連接創建, {}", ctx.channel().remoteAddress());
        ctx.channel().eventLoop().next().scheduleAtFixedRate(new SendMsgTask(ctx), 0, 3, TimeUnit.SECONDS);
    }
}

如代碼所示,這個handler在讀取完客戶端的消息后,會給客戶端回復一條消息。

channelActive這個方法里,我又加入了自己的定時任務。連接創建成功后,每隔3秒會執行一次,由eventLoop執行。

其中SendMsgTask代碼如下:

package com.netty.demo.websocket.task;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.time.LocalDateTime;

/**
 * @author wl
 * @date 2022/10/9
 */
public class SendMsgTask implements Runnable{
    private ChannelHandlerContext ctx;

    public SendMsgTask(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    @Override
    public void run() {
        ctx.channel().writeAndFlush(new TextWebSocketFrame("當前時間:" + LocalDateTime.now()));
    }
}

啟動服務,測試結果如圖:

?

原文鏈接:https://blog.csdn.net/wl_Honest/article/details/127228073

欄目分類
最近更新