
网络编程的核心是实现跨主机的进程间通信,而IO模型则是决定通信性能、并发能力的核心底层逻辑。在深入Netty之前,必须先彻底搞懂IO模型的核心分类与本质区别,这是所有网络编程的根基。
基于UNIX网络编程的标准定义,先明确两个极易混淆的核心维度:
IO模型 | 核心特点 | 适用场景 | 核心痛点 |
|---|---|---|---|
同步阻塞IO(BIO) | 一连接一线程,阻塞等待连接与数据读写 | 连接数少、架构简单的场景 | 线程资源消耗大,并发上限极低,高并发下性能雪崩 |
同步非阻塞IO(NIO) | 单线程管理多连接,非阻塞轮询事件就绪状态 | 高并发、短连接场景 | 轮询消耗CPU,API复杂,开发门槛极高 |
IO多路复用 | 单线程通过Selector监听多个连接的事件,仅事件就绪时才通知线程处理 | 高并发、长连接场景,是当前主流架构的底层基石 | 原生API复杂,需处理大量底层细节,存在已知底层BUG |
异步非阻塞IO(AIO) | 操作系统完成IO全流程后通知用户线程,全程无阻塞 | 连接数多、读写操作耗时的场景 | Linux系统下实现不完善,实际生产应用极少 |
通俗类比:把IO操作比作餐厅点餐
Java从JDK1.4开始提供NIO API,但原生NIO存在无法忽视的致命问题,这也是Netty成为Java网络编程事实标准的核心原因:
Netty是一款基于Java NIO封装的高性能、异步事件驱动的网络编程框架,屏蔽了原生NIO的底层复杂度,提供了极简易用的API,同时具备极高的性能、稳定性和可扩展性,是Dubbo、RocketMQ、Elasticsearch、网关等中间件的核心通信层底座。
Netty的核心架构基于Doug Lea在《Scalable IO in Java》中提出的Reactor模式实现,默认采用主从Reactor多线程模型,这是支撑Netty高并发、高性能的核心架构。

主从Reactor模型的核心分工:

Netty的所有功能都基于核心组件实现,彻底理解每个组件的作用、底层逻辑与使用规范,是掌握Netty的核心前提。
这两个类是Netty服务端与客户端的启动入口,负责整个Netty程序的初始化、配置与启动,是程序的总入口。
核心配置规范:
这是Netty线程模型的核心组件,负责管理Reactor线程,处理所有IO事件与任务。
核心特性与规范:
CPU核心数 * 2,可根据业务场景手动调整Channel是Netty对网络通信套接字的抽象,对应Java NIO的Channel,是网络读写操作的核心载体,代表了一个客户端与服务端的连接。
核心特性:
Netty基于责任链模式实现了事件的流转与处理,核心就是ChannelPipeline、ChannelHandler与ChannelHandlerContext。
核心事件流转规则(90%开发者都会踩坑的点):
fireChannelRead()方法传递给下一个入站HandlerwriteAndFlush()等方法传递给下一个出站HandlerChannelHandler是开发者实现业务逻辑的核心入口,所有业务逻辑、编解码、异常处理都通过ChannelHandler实现,是Netty中最常用的组件。
核心分类:
Netty提供了两个默认的适配器类,简化开发:ChannelInboundHandlerAdapter与ChannelOutboundHandlerAdapter,开发者只需继承适配器类,重写对应的事件处理方法即可。
核心使用规范:
fireChannelRead()方法将事件传递给下一个Handler,否则事件会中断流转exceptionCaught()方法,处理异常事件,关闭资源,否则异常会被吞掉,无法定位问题ByteBuf是Netty对字节数据的容器抽象,替代了Java NIO的ByteBuffer,解决了原生ByteBuffer的所有痛点,是Netty高性能的核心组件之一。
对比原生ByteBuffer,ByteBuf具备颠覆性的优势:
分类维度 | 类型 | 核心特点 | 适用场景 |
|---|---|---|---|
内存管理 | 池化PooledByteBuf | 从内存池中获取内存,复用对象,性能极高 | 生产环境默认使用,高频读写场景 |
内存管理 | 非池化UnpooledByteBuf | 每次创建都分配新的内存,性能较低 | 低频读写、简单测试场景 |
内存类型 | 堆内HeapByteBuf | 内存分配在JVM堆中,受GC管理,无需手动释放 | 简单数据处理,无高频读写场景 |
内存类型 | 堆外DirectByteBuf | 内存分配在操作系统的堆外内存中,不受GC管理,需手动释放 | 网络IO传输,零拷贝场景,生产环境首选 |
内存类型 | 复合CompositeByteBuf | 将多个ByteBuf合并为一个逻辑上的ByteBuf,无需物理内存拷贝 | 多数据包合并场景,零拷贝优化 |
PooledByteBufAllocator.DEFAULT.directBuffer(),性能最优,零拷贝支持最好Netty的所有IO操作都是异步的,调用后会立即返回,无法立即获取操作结果,Netty通过ChannelFuture与Promise实现异步结果的处理。
核心使用规范:
零拷贝是Netty高性能的核心特性之一,核心目标是减少数据在用户态与内核态之间的拷贝次数,减少CPU的上下文切换开销,提升数据传输效率。
Netty的零拷贝分为两个维度:操作系统级零拷贝与用户态零拷贝。
Netty的内存池是其高性能的核心支撑,解决了频繁内存分配与回收带来的性能开销、GC压力、内存碎片问题。
核心实现原理:
TCP是面向流的协议,数据以字节流的形式传输,没有消息边界,这就会导致半包粘包问题,是网络编程中必须解决的核心问题。
Netty提供了成熟的编解码器,彻底解决半包粘包问题,行业主流的解决方案是消息头+长度字段的固定格式协议,对应Netty提供的LengthFieldBasedFrameDecoder解码器,这是生产环境首选的解决方案。
LengthFieldBasedFrameDecoder核心参数说明:
长连接场景下,必须通过心跳机制检测连接的可用性,及时处理断连、网络波动等异常场景,Netty提供了IdleStateHandler空闲检测处理器,可极简实现心跳机制。
IdleStateHandler核心参数:
核心实现逻辑:
本案例基于JDK17、Netty最新稳定版实现,包含自定义协议、半包粘包处理、心跳检测、服务端与客户端完整实现。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.4</version>
<relativePath/>
</parent>
<groupId>com.jam</groupId>
<artifactId>netty-demo</artifactId>
<version>1.0.0</version>
<name>netty-demo</name>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<netty.version>4.1.112.Final</netty.version>
<lombok.version>1.18.30</lombok.version>
<fastjson2.version>2.0.52</fastjson2.version>
<guava.version>32.1.3-jre</guava.version>
<mybatis.plus.version>3.5.6</mybatis.plus.version>
<mysql.version>8.0.36</mysql.version>
<springdoc.version>2.5.0</springdoc.version>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis.plus.version}</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.12.1</version>
<configuration>
<source>17</source>
<target>17</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
采用行业标准的固定格式协议,彻底解决半包粘包问题,协议格式如下:
字段名 | 长度(字节) | 说明 |
|---|---|---|
魔数 | 4 | 固定值0x12345678,用于校验数据包合法性 |
版本号 | 1 | 协议版本号,当前为1 |
指令类型 | 1 | 0-业务数据,1-心跳请求,2-心跳响应 |
数据长度 | 4 | 消息体数据的字节长度 |
消息体 | 不固定 | 业务数据,JSON格式 |
package com.jam.demo.protocol;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 自定义通信协议实体
* @author ken
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
publicclass CustomProtocol {
/**
* 魔数,固定值,用于校验数据包合法性
*/
publicstaticfinalint MAGIC_NUMBER = 0x12345678;
/**
* 协议版本号
*/
publicstaticfinalbyte VERSION = 1;
/**
* 指令类型-业务数据
*/
publicstaticfinalbyte COMMAND_TYPE_DATA = 0;
/**
* 指令类型-心跳请求
*/
publicstaticfinalbyte COMMAND_TYPE_HEARTBEAT_REQUEST = 1;
/**
* 指令类型-心跳响应
*/
publicstaticfinalbyte COMMAND_TYPE_HEARTBEAT_RESPONSE = 2;
/**
* 协议头部固定长度
*/
publicstaticfinalint HEADER_LENGTH = 10;
/**
* 魔数
*/
privateint magicNumber;
/**
* 版本号
*/
privatebyte version;
/**
* 指令类型
*/
privatebyte commandType;
/**
* 数据长度
*/
privateint dataLength;
/**
* 消息体数据
*/
privatebyte[] data;
}
package com.jam.demo.codec;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
/**
* 自定义协议编码器
* @author ken
*/
@Slf4j
publicclass CustomProtocolEncoder extends MessageToByteEncoder<CustomProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) {
out.writeInt(msg.getMagicNumber());
out.writeByte(msg.getVersion());
out.writeByte(msg.getCommandType());
out.writeInt(msg.getDataLength());
if (msg.getDataLength() > 0) {
out.writeBytes(msg.getData());
}
log.debug("协议编码完成,指令类型:{},数据长度:{}", msg.getCommandType(), msg.getDataLength());
}
}
package com.jam.demo.codec;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
/**
* 自定义协议解码器
* 基于LengthFieldBasedFrameDecoder实现,彻底解决半包粘包问题
* @author ken
*/
@Slf4j
publicclass CustomProtocolDecoder extends LengthFieldBasedFrameDecoder {
/**
* 解码器构造函数
* maxFrameLength:最大帧长度,这里设置为10MB
* lengthFieldOffset:长度字段偏移量,魔数4+版本1+指令1=6字节
* lengthFieldLength:长度字段长度,4字节
* lengthAdjustment:长度调整值,0
* initialBytesToStrip:跳过的字节数,0,完整保留头部信息
*/
public CustomProtocolDecoder() {
super(10 * 1024 * 1024, 6, 4, 0, 0);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
returnnull;
}
try {
int magicNumber = frame.readInt();
if (magicNumber != CustomProtocol.MAGIC_NUMBER) {
log.error("非法数据包,魔数校验失败,远程地址:{}", ctx.channel().remoteAddress());
ctx.close();
returnnull;
}
CustomProtocol protocol = new CustomProtocol();
protocol.setMagicNumber(magicNumber);
protocol.setVersion(frame.readByte());
protocol.setCommandType(frame.readByte());
protocol.setDataLength(frame.readInt());
if (protocol.getDataLength() > 0) {
byte[] data = newbyte[protocol.getDataLength()];
frame.readBytes(data);
protocol.setData(data);
}
log.debug("协议解码完成,指令类型:{},数据长度:{}", protocol.getCommandType(), protocol.getDataLength());
return protocol;
} finally {
frame.release();
}
}
}
package com.jam.demo.handler;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
/**
* 服务端心跳处理器
* @author ken
*/
@Slf4j
@ChannelHandler.Sharable
publicclass ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
/**
* 最大空闲次数,超过该次数关闭连接
*/
privatestaticfinalint MAX_IDLE_COUNT = 3;
/**
* 当前空闲次数
*/
privateint idleCount = 0;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent event) {
if (event.state() == IdleState.READER_IDLE) {
idleCount++;
log.warn("读空闲事件触发,远程地址:{},当前空闲次数:{}", ctx.channel().remoteAddress(), idleCount);
if (idleCount >= MAX_IDLE_COUNT) {
log.error("连续{}次读空闲,关闭连接,远程地址:{}", MAX_IDLE_COUNT, ctx.channel().remoteAddress());
ctx.close();
}
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
idleCount = 0;
CustomProtocol protocol = (CustomProtocol) msg;
if (protocol.getCommandType() == CustomProtocol.COMMAND_TYPE_HEARTBEAT_REQUEST) {
log.debug("收到客户端心跳请求,远程地址:{}", ctx.channel().remoteAddress());
CustomProtocol response = new CustomProtocol();
response.setMagicNumber(CustomProtocol.MAGIC_NUMBER);
response.setVersion(CustomProtocol.VERSION);
response.setCommandType(CustomProtocol.COMMAND_TYPE_HEARTBEAT_RESPONSE);
response.setDataLength(0);
ctx.writeAndFlush(response);
return;
}
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("服务端心跳处理器异常,远程地址:{}", ctx.channel().remoteAddress(), cause);
ctx.close();
}
}
package com.jam.demo.handler;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
/**
* 客户端心跳处理器
* @author ken
*/
@Slf4j
publicclass ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent event) {
if (event.state() == IdleState.WRITER_IDLE) {
log.debug("写空闲事件触发,发送心跳请求到服务端");
CustomProtocol request = new CustomProtocol();
request.setMagicNumber(CustomProtocol.MAGIC_NUMBER);
request.setVersion(CustomProtocol.VERSION);
request.setCommandType(CustomProtocol.COMMAND_TYPE_HEARTBEAT_REQUEST);
request.setDataLength(0);
ctx.writeAndFlush(request);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
CustomProtocol protocol = (CustomProtocol) msg;
if (protocol.getCommandType() == CustomProtocol.COMMAND_TYPE_HEARTBEAT_RESPONSE) {
log.debug("收到服务端心跳响应");
return;
}
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("客户端心跳处理器异常", cause);
ctx.close();
}
}
package com.jam.demo.handler;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* 服务端业务处理器
* @author ken
*/
@Slf4j
@ChannelHandler.Sharable
publicclass ServerBusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("客户端连接成功,远程地址:{}", ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("客户端连接断开,远程地址:{}", ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
CustomProtocol protocol = (CustomProtocol) msg;
if (protocol.getCommandType() != CustomProtocol.COMMAND_TYPE_DATA) {
log.warn("非业务数据指令,忽略处理,指令类型:{}", protocol.getCommandType());
return;
}
String data = new String(protocol.getData(), StandardCharsets.UTF_8);
log.info("收到客户端业务数据,远程地址:{},数据内容:{}", ctx.channel().remoteAddress(), data);
if (!StringUtils.hasText(data)) {
log.warn("业务数据为空,忽略处理");
return;
}
Map<String, Object> requestMap = JSON.parseObject(data);
Map<String, Object> responseMap = Map.of("code", 200, "msg", "处理成功", "data", requestMap);
byte[] responseData = JSON.toJSONString(responseMap).getBytes(StandardCharsets.UTF_8);
CustomProtocol response = new CustomProtocol();
response.setMagicNumber(CustomProtocol.MAGIC_NUMBER);
response.setVersion(CustomProtocol.VERSION);
response.setCommandType(CustomProtocol.COMMAND_TYPE_DATA);
response.setDataLength(responseData.length);
response.setData(responseData);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("服务端业务处理器异常,远程地址:{}", ctx.channel().remoteAddress(), cause);
ctx.close();
}
}
package com.jam.demo.handler;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
/**
* 客户端业务处理器
* @author ken
*/
@Slf4j
publicclass ClientBusinessHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("成功连接到服务端,服务端地址:{}", ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("与服务端的连接断开");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
CustomProtocol protocol = (CustomProtocol) msg;
if (protocol.getCommandType() != CustomProtocol.COMMAND_TYPE_DATA) {
log.warn("非业务数据指令,忽略处理,指令类型:{}", protocol.getCommandType());
return;
}
String data = new String(protocol.getData(), StandardCharsets.UTF_8);
log.info("收到服务端响应数据:{}", data);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("客户端业务处理器异常", cause);
ctx.close();
}
}
package com.jam.demo.server;
import com.jam.demo.codec.CustomProtocolDecoder;
import com.jam.demo.codec.CustomProtocolEncoder;
import com.jam.demo.handler.ServerBusinessHandler;
import com.jam.demo.handler.ServerHeartbeatHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* Netty服务端启动类
* @author ken
*/
@Slf4j
@Component
publicclass NettyServer {
/**
* 服务端监听端口
*/
privatestaticfinalint PORT = 9000;
/**
* 读空闲时间,单位秒
*/
privatestaticfinalint READER_IDLE_TIME = 10;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
/**
* 启动Netty服务端
*/
public void start() {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS))
.addLast(new CustomProtocolDecoder())
.addLast(new CustomProtocolEncoder())
.addLast(new ServerHeartbeatHandler())
.addLast(new ServerBusinessHandler());
}
});
ChannelFuture future = bootstrap.bind(PORT).sync();
log.info("Netty服务端启动成功,监听端口:{}", PORT);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("Netty服务端启动异常", e);
Thread.currentThread().interrupt();
} finally {
shutdown();
}
}
/**
* 关闭Netty服务端,释放资源
*/
public void shutdown() {
if (bossGroup != null && !bossGroup.isShutdown()) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null && !workerGroup.isShutdown()) {
workerGroup.shutdownGracefully();
}
log.info("Netty服务端已关闭,资源释放完成");
}
}
package com.jam.demo.client;
import com.alibaba.fastjson2.JSON;
import com.jam.demo.codec.CustomProtocolDecoder;
import com.jam.demo.codec.CustomProtocolEncoder;
import com.jam.demo.handler.ClientBusinessHandler;
import com.jam.demo.handler.ClientHeartbeatHandler;
import com.jam.demo.protocol.CustomProtocol;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Netty客户端启动类
* @author ken
*/
@Slf4j
@Component
publicclass NettyClient {
/**
* 服务端地址
*/
privatestaticfinal String HOST = "127.0.0.1";
/**
* 服务端端口
*/
privatestaticfinalint PORT = 9000;
/**
* 写空闲时间,单位秒
*/
privatestaticfinalint WRITER_IDLE_TIME = 5;
/**
* 重连间隔时间,单位秒
*/
privatestaticfinalint RECONNECT_INTERVAL = 3;
private EventLoopGroup workerGroup;
private Channel channel;
private Bootstrap bootstrap;
/**
* 初始化客户端
*/
public void init() {
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(0, WRITER_IDLE_TIME, 0, TimeUnit.SECONDS))
.addLast(new CustomProtocolDecoder())
.addLast(new CustomProtocolEncoder())
.addLast(new ClientHeartbeatHandler())
.addLast(new ClientBusinessHandler());
}
});
}
/**
* 连接服务端
*/
public void connect() {
if (bootstrap == null) {
init();
}
try {
ChannelFuture future = bootstrap.connect(HOST, PORT).sync();
if (future.isSuccess()) {
channel = future.channel();
log.info("Netty客户端连接服务端成功,服务端地址:{}:{}", HOST, PORT);
}
future.channel().closeFuture().addListener(f -> {
log.warn("与服务端的连接断开,{}秒后尝试重连", RECONNECT_INTERVAL);
workerGroup.schedule(this::connect, RECONNECT_INTERVAL, TimeUnit.SECONDS);
});
} catch (InterruptedException e) {
log.error("Netty客户端连接服务端异常", e);
Thread.currentThread().interrupt();
}
}
/**
* 发送业务数据到服务端
* @param data 待发送的业务数据
*/
public void sendData(Map<String, Object> data) {
if (channel == null || !channel.isActive()) {
log.error("与服务端的连接未就绪,无法发送数据");
return;
}
String jsonData = JSON.toJSONString(data);
byte[] dataBytes = jsonData.getBytes(StandardCharsets.UTF_8);
CustomProtocol protocol = new CustomProtocol();
protocol.setMagicNumber(CustomProtocol.MAGIC_NUMBER);
protocol.setVersion(CustomProtocol.VERSION);
protocol.setCommandType(CustomProtocol.COMMAND_TYPE_DATA);
protocol.setDataLength(dataBytes.length);
protocol.setData(dataBytes);
channel.writeAndFlush(protocol).addListener(future -> {
if (future.isSuccess()) {
log.debug("数据发送成功,数据内容:{}", jsonData);
} else {
log.error("数据发送失败", future.cause());
}
});
}
/**
* 关闭客户端,释放资源
*/
public void shutdown() {
if (channel != null) {
channel.close();
}
if (workerGroup != null && !workerGroup.isShutdown()) {
workerGroup.shutdownGracefully();
}
log.info("Netty客户端已关闭,资源释放完成");
}
}
package com.jam.demo;
import com.jam.demo.server.NettyServer;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
/**
* Netty Demo项目启动类
* @author ken
*/
@Slf4j
@SpringBootApplication
@OpenAPIDefinition(info = @Info(title = "Netty Demo API", version = "1.0.0", description = "Netty实战项目API文档"))
publicclass NettyDemoApplication {
public static void main(String[] args) {
SpringApplication.run(NettyDemoApplication.class, args);
log.info("Netty Demo项目启动成功");
}
/**
* 启动Netty服务端
*/
@Bean
public CommandLineRunner startNettyServer(NettyServer nettyServer) {
return args -> new Thread(nettyServer::start, "netty-server-thread").start();
}
}
package com.jam.demo.controller;
import com.jam.demo.client.NettyClient;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* Netty测试接口
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/netty")
@RequiredArgsConstructor
@Tag(name = "Netty测试接口", description = "Netty客户端数据发送测试接口")
publicclass NettyTestController {
privatefinal NettyClient nettyClient;
/**
* 发送数据到Netty服务端
*/
@PostMapping("/send")
@Operation(summary = "发送数据", description = "通过Netty客户端发送数据到服务端")
public Map<String, Object> sendData(@RequestBody Map<String, Object> data) {
nettyClient.sendData(data);
return Map.of("code", 200, "msg", "数据发送成功");
}
/**
* 初始化并连接Netty客户端
*/
@PostMapping("/connect")
@Operation(summary = "连接服务端", description = "初始化Netty客户端并连接服务端")
public Map<String, Object> connect() {
nettyClient.init();
new Thread(nettyClient::connect, "netty-client-thread").start();
return Map.of("code", 200, "msg", "客户端连接请求已发起");
}
/**
* 关闭Netty客户端
*/
@PostMapping("/shutdown")
@Operation(summary = "关闭客户端", description = "关闭Netty客户端并释放资源")
public Map<String, Object> shutdown() {
nettyClient.shutdown();
return Map.of("code", 200, "msg", "客户端已关闭,资源释放完成");
}
}
CREATE TABLE`netty_heartbeat_log` (
`id`bigintNOTNULL AUTO_INCREMENT COMMENT'主键ID',
`client_address`varchar(128) NOTNULLCOMMENT'客户端地址',
`heartbeat_type`tinyintNOTNULLCOMMENT'心跳类型:1-请求,2-响应',
`create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPCOMMENT'创建时间',
PRIMARY KEY (`id`),
KEY`idx_client_address` (`client_address`),
KEY`idx_create_time` (`create_time`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='Netty心跳日志表';
package com.jam.demo.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* Netty心跳日志实体
* @author ken
*/
@Data
@TableName("netty_heartbeat_log")
publicclass HeartbeatLog {
@TableId(type = IdType.AUTO)
private Long id;
private String clientAddress;
private Integer heartbeatType;
private LocalDateTime createTime;
}
-Dio.netty.leakDetection.level=PARANOID;ByteBuf若未传递到下一个Handler,必须手动release();使用try-finally结构确保释放LengthFieldBasedFrameDecoder解码器,基于固定协议格式处理,不要手动实现Netty作为Java网络编程的事实标准,屏蔽了原生NIO的底层复杂度与BUG,提供了高性能、高可靠、易扩展的网络编程能力,是分布式中间件、网关、游戏服务器、即时通信等系统的核心技术底座。
本文从IO模型的底层原理出发,彻底讲透了Netty的核心架构、核心组件、高级特性,提供了生产级可运行的完整实战案例,同时总结了最佳实践与高频踩坑指南,帮助开发者彻底掌握Netty,从入门到生产落地。
掌握Netty的核心,本质是掌握Reactor线程模型、责任链模式、内存管理、零拷贝这些底层核心思想,这些思想不仅适用于Netty,更是高性能分布式系统设计的核心基石。