日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

netty使用http和webSocket

作者:小徐敲java 更新時間: 2024-04-04 編程語言

1:pom.xml配置

		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.73.Final</version>
		</dependency>

2:Netty作為HTTP服務器

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

public class HttpServer {
    private final int port;

    public HttpServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();

                    // 添加 HTTP 編解碼器和自定義的ChannelHandler
                    p.addLast(new HttpServerCodec());
                    p.addLast(new HttpObjectAggregator(1024 * 1024)); // 設置最大聚合大小為1MB
                    p.addLast(new LargeJsonHandler());
                }
             })
             .option(ChannelOption.SO_BACKLOG, 128)
             .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 綁定端口,開始接受進來的連接
            ChannelFuture f = b.bind(port).sync();

            // 等待服務器 socket 關閉
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new HttpServer(8080).start();
    }
}

class LargeJsonHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (HttpUtil.is100ContinueExpected(request)) {
            send100Continue(ctx);
        }

        ByteBuf content = request.content();
        String jsonStr = content.toString(CharsetUtil.UTF_8);

        // 在這里對 JSON 數據進行處理
        System.out.println(jsonStr);

        // 發送響應
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
        response.content().writeBytes("OK".getBytes(CharsetUtil.UTF_8));
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        ctx.writeAndFlush(response);
    }

    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }
	@Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        // 寫一個空的buf,并刷新寫出區域。完成后關閉sock channel連接。
        //ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

注意:如果發送的JSO數據如果大于1M,是會分包發送的,每次發送都會執行channelReadComplete方法,所以不可以關閉通道,發送完數據才執行channelRead0方法

3:Netty作為webSocket服務器

package com.example.slave.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

/**
 * @Description:
 * @Author: xu
 * @Data: 2024-2024/1/4-11
 * @Version: V1.0
 */
public class CustomWebSocket {
    private final int port;

    public CustomWebSocket(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        //設置用于連接的boss組, 可在構造器中定義使用的線程數  監聽端口接收客戶端連接,一個端口一個線程,然后轉給worker組
        //boss組用于監聽客戶端連接請求,有連接傳入時就生成連接channel傳給worker,等worker 接收請求 io多路復用,
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            //定義使用的通道 可以選擇是NIO或者是OIO 代表了worker在處理socket channel時的不同情況。oio只能1對1, nio則沒有1對1對關系
            //當netty要處理長連接時最好使用NIO,不然如果要保證效率 需要創建大量的線程,和io多路復用一致
                    .channel(NioServerSocketChannel.class)
                    //.channel(OioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // 添加 HTTP 編解碼器和自定義的ChannelHandler
                            p.addLast(new HttpServerCodec());
                            p.addLast(new HttpObjectAggregator(1024 * 1024)); // 設置最大聚合大小為1MB
                            p.addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
                            p.addLast(new MyWebSocketHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 綁定端口,開始接受進來的連接
            ChannelFuture f = b.bind(port).sync();

            // 等待服務器 socket 關閉
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}

public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    public static ChannelGroup channelGroup;
    static {
        channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    }
    //客戶端與服務器建立連接的時候觸發,
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("與客戶端建立連接,通道開啟!");
        //添加到channelGroup通道組
        channelGroup.add(ctx.channel());
    }
    //客戶端與服務器關閉連接的時候觸發,
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("與客戶端斷開連接,通道關閉!");
        channelGroup.remove(ctx.channel());
    }
	@Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        // 寫一個空的buf,并刷新寫出區域。完成后關閉sock channel連接。
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
?
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 關閉發生異常的連接
        ctx.close();
    }


    //服務器接受客戶端的數據信息,
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg){
        System.out.println("服務器收到的數據:" + msg.text());
        //sendMessage(ctx);
        sendAllMessage();
    }
    //給固定的人發消息
    private void sendMessage(ChannelHandlerContext ctx) {
        String message = "你好,"+ctx.channel().localAddress()+" 給固定的人發消息";
        ctx.channel().writeAndFlush(new TextWebSocketFrame(message));
    }
    //發送群消息,此時其他客戶端也能收到群消息
    private void sendAllMessage(){
        String message = "我是服務器,這里發送的是群消息";
        channelGroup.writeAndFlush( new TextWebSocketFrame(message));
    }
}

原文鏈接:https://blog.csdn.net/qq_19891197/article/details/135381585

  • 上一篇:沒有了
  • 下一篇:沒有了
欄目分類
最近更新