網站首頁 編程語言 正文
1:pom.xml配置
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.73.Final</version>
</dependency>
2:Netty作為HTTP服務器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
public class HttpServer {
private final int port;
public HttpServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 添加 HTTP 編解碼器和自定義的ChannelHandler
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1024 * 1024)); // 設置最大聚合大小為1MB
p.addLast(new LargeJsonHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 綁定端口,開始接受進來的連接
ChannelFuture f = b.bind(port).sync();
// 等待服務器 socket 關閉
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new HttpServer(8080).start();
}
}
class LargeJsonHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx);
}
ByteBuf content = request.content();
String jsonStr = content.toString(CharsetUtil.UTF_8);
// 在這里對 JSON 數據進行處理
System.out.println(jsonStr);
// 發送響應
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
response.content().writeBytes("OK".getBytes(CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
ctx.writeAndFlush(response);
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// 寫一個空的buf,并刷新寫出區域。完成后關閉sock channel連接。
//ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
注意:如果發送的JSO數據如果大于1M,是會分包發送的,每次發送都會執行channelReadComplete方法,所以不可以關閉通道,發送完數據才執行channelRead0方法
3:Netty作為webSocket服務器
package com.example.slave.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
/**
* @Description:
* @Author: xu
* @Data: 2024-2024/1/4-11
* @Version: V1.0
*/
public class CustomWebSocket {
private final int port;
public CustomWebSocket(int port) {
this.port = port;
}
public void start() throws Exception {
//設置用于連接的boss組, 可在構造器中定義使用的線程數 監聽端口接收客戶端連接,一個端口一個線程,然后轉給worker組
//boss組用于監聽客戶端連接請求,有連接傳入時就生成連接channel傳給worker,等worker 接收請求 io多路復用,
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
//定義使用的通道 可以選擇是NIO或者是OIO 代表了worker在處理socket channel時的不同情況。oio只能1對1, nio則沒有1對1對關系
//當netty要處理長連接時最好使用NIO,不然如果要保證效率 需要創建大量的線程,和io多路復用一致
.channel(NioServerSocketChannel.class)
//.channel(OioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 添加 HTTP 編解碼器和自定義的ChannelHandler
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1024 * 1024)); // 設置最大聚合大小為1MB
p.addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
p.addLast(new MyWebSocketHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 綁定端口,開始接受進來的連接
ChannelFuture f = b.bind(port).sync();
// 等待服務器 socket 關閉
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static ChannelGroup channelGroup;
static {
channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
//客戶端與服務器建立連接的時候觸發,
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("與客戶端建立連接,通道開啟!");
//添加到channelGroup通道組
channelGroup.add(ctx.channel());
}
//客戶端與服務器關閉連接的時候觸發,
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("與客戶端斷開連接,通道關閉!");
channelGroup.remove(ctx.channel());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// 寫一個空的buf,并刷新寫出區域。完成后關閉sock channel連接。
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
?
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 關閉發生異常的連接
ctx.close();
}
//服務器接受客戶端的數據信息,
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg){
System.out.println("服務器收到的數據:" + msg.text());
//sendMessage(ctx);
sendAllMessage();
}
//給固定的人發消息
private void sendMessage(ChannelHandlerContext ctx) {
String message = "你好,"+ctx.channel().localAddress()+" 給固定的人發消息";
ctx.channel().writeAndFlush(new TextWebSocketFrame(message));
}
//發送群消息,此時其他客戶端也能收到群消息
private void sendAllMessage(){
String message = "我是服務器,這里發送的是群消息";
channelGroup.writeAndFlush( new TextWebSocketFrame(message));
}
}
原文鏈接:https://blog.csdn.net/qq_19891197/article/details/135381585
- 上一篇:沒有了
- 下一篇:沒有了
相關推薦
- 2022-09-03 python四則運算表達式求值示例詳解_python
- 2022-04-15 Android開發Jetpack組件WorkManager用例詳解_Android
- 2022-05-19 Python學習之異常處理的避坑指南_python
- 2022-06-26 詳解Python數據類型、進制轉換、字符串格式化的問題_python
- 2022-11-03 Python?pandas中apply函數簡介以及用法詳解_python
- 2023-04-21 如何在C#項目中鏈接一個文件夾下的所有文件詳解_C#教程
- 2023-01-14 python的數據與matlab互通問題:SciPy_python
- 2021-12-12 C語言的常量和字符串_C 語言
- 欄目分類
-
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支