简介
Java中的IO有三种,BIO(阻塞IO)、NIO(非阻塞IO)、AIO(非阻塞IO)
BIO:每个连接创建成功之后都需要一个线程来维护,这就带来如下几个问题:
- 线程资源受限:线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费,操作系统耗不起
- 线程切换效率低下:单机 CPU 核数固定,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。
- 除了以上两个问题,IO 编程中,我们看到数据读写是以字节流为单位。
为了解决这三个问题,JDK 在 1.4 之后提出了 NIO。
NIO:NIO 编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责。这就是 NIO 模型解决线程资源受限的方案,实际开发过程中,我们会开多个线程,每个线程都管理着一批连接,相对于 IO 模型中一个线程管理一条连接,消耗的线程资源大幅减少
- NIO 的读写是面向 Buffer 的,你可以随意读取里面任何一个字节数据,不需要你自己缓存数据,这一切只需要移动读写指针即可。
我们来看一段NIO 服务器端的代码
public class MyNioServer {
private static final int PORT = 8080; // 服务器监听端口
private static final int BUF_SIZE = 10240;// buffer 大小
private static Selector serverSelector;
private static Selector clientSelector;
static {
try {
serverSelector = Selector.open();
clientSelector = Selector.open();
} catch (Exception e) {
}
}
private void initServer() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
serverSocketChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
System.out.println("在 " + PORT + " 端口监听");
while (true) {
serverSelector.select();
Set<SelectionKey> keys = serverSelector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {// 是一个链接事件
doAccept(key);
}
}
}
}
private void readAndWrite() throws IOException {
while (true) {
if (clientSelector.select(1) > 0) {
Set<SelectionKey> keys = clientSelector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) { // 是一个可读事件
doRead(key);
} else if (key.isWritable() && key.isValid()) {// 是一个读事件
doWrite(key);
}
}
}
}
}
private void doAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
System.out.println("ServerSocketChannel正在循环监听");
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(clientSelector, SelectionKey.OP_READ);
}
private void doRead(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
int read = socketChannel.read(byteBuffer);
byte[] bytes = null;
StringBuilder info = new StringBuilder();
while (read > 0) {
byteBuffer.flip(); // 重置缓冲区,将输入与输出结构转换
bytes = byteBuffer.array();
info.append(new String(bytes).trim());
System.out.println("客户端发送来的消息是:" + info);
byteBuffer.clear();
read = socketChannel.read(byteBuffer);
}
byteBuffer.clear();
byteBuffer.put(("[ECHO]" + info.toString()).getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
socketChannel.write(byteBuffer);
socketChannel.register(clientSelector, SelectionKey.OP_WRITE);
}
private void doWrite(SelectionKey key) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.read(buffer);
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
socketChannel.close();
}
public static void main(String[] args){
Thread init = new Thread(() -> {
MyNioServer myNioServer = new MyNioServer();
try {
myNioServer.initServer();
} catch (IOException e) {
e.printStackTrace();
}
});
Thread read = new Thread(() -> {
MyNioServer myNioServer = new MyNioServer();
try {
myNioServer.readAndWrite();
} catch (IOException e) {
e.printStackTrace();
}
});
read.start();
init.start();
}
}
-
NIO 模型中通常会有两个线程,每个线程绑定一个轮询器 selector ,在我们这个例子中serverSelector负责轮询是否有新的连接,clientSelector负责轮询连接是否有数据可读
-
服务端监测到新的连接之后,不再创建一个新的线程,而是直接将新连接绑定到clientSelector上,这样就不用 IO 模型中 1w 个 while 循环在死等
-
clientSelector被一个 while 死循环包裹着,如果在某一时刻有多条连接有数据可读,那么通过 clientSelector.select(1)方法可以轮询出来,进而批量处理
-
数据的读写面向 Buffer
Netty编程
Netty 封装了 JDK 的 NIO,让你用得更爽,你不用再写一大堆复杂的代码了。 用官方正式的话来说就是:Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。
总结为什么使用 Netty 不使用 JDK 原生 NIO.
-
使用 JDK 自带的NIO需要了解太多的概念,编程复杂,一不小心 bug 横飞
-
Netty 底层 IO 模型随意切换,而这一切只需要做微小的改动,改改参数,Netty可以直接从 NIO 模型变身为 IO 模型
-
Netty 自带的拆包解包,异常检测等机制让你从NIO的繁重细节中脱离出来,让你只需要关心业务逻辑
-
Netty 解决了 JDK 的很多包括空轮询在内的 Bug
-
Netty 底层对线程,selector 做了很多细小的优化,精心设计的 reactor 线程模型做到非常高效的并发处理
-
自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
-
Netty 社区活跃,遇到问题随时邮件列表或者 issue
-
Netty 已经历各大 RPC 框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大
Netty代码开发
引入 Maven 依赖,版本 4.1.76.Final 版本
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.76.Final</version>
</dependency>
接下来实现一个简单的 ECHO 程序
服务器端代码
public class NettyServer {
public static void main(String[] args) {
// netty服务器端 使用ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* NioEventLoopGroup 创建一个Nio事件处理线程池,为指定线程池大小,默认开启的线程数是硬件线程数的两倍
*/
// 会创建两个线程池,一个用于处理连接请求操作,一个用于处理数据操作
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); // 用于处理连接操作
NioEventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理数据操作
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class) // 服务器端通道
/**
* 子处理器,这里可以添加具体的处理业务逻辑
* NioSocketChannel:客户端的通道
*/
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8)); // 编码器
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
});
}
})
.bind(8080);// 服务器端绑定端口
}
}
这么一小段代码就实现了我们前面 NIO 编程中的所有的功能,包括服务端启动,接受新连接,打印客户端传来的数据。
-
bossGroup 对应 IOServer.java 中的接受新连接线程,主要负责创建新连接
-
workerGroup 对应 IOServer.java 中的负责读取数据的线程,主要用于读取数据以及业务逻辑处理
客户端代码
public class NettyClient {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
});
ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080));
Channel channel = future.channel();
while (true) {
channel.writeAndFlush(new Date() + "hello netty");
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在客户端程序中,group对应了我们IOClient.java中 main 函数起的线程
服务端启动流程
来分析一下上面服务器端的代码
-
首先看到,我们创建了两个
NioEventLoopGroup
,这两个对象可以看做是传统IO编程模型的两大线程组,bossGroup
表示监听端口,accept
新连接的线程组,workerGroup
表示处理每一条连接的数据读写的线程组。用生活中的例子来讲就是,一个工厂要运作,必然要有一个老板负责从外面接活,然后有很多员工,负责具体干活,老板就是 bossGroup,员工们就是workerGroup,bossGroup接收完连接,扔给workerGroup去处理。 -
接下来 我们创建了一个引导类
ServerBootstrap
,这个类将引导我们进行服务端的启动工作,直接new出来开搞。 -
我们通过
.group(bossGroup, workerGroup)
给引导类配置两大线程组,这个引导类的线程模型也就定型了。 -
然后,我们指定我们服务端的
IO
模型为 NIO,我们通过.channel(NioServerSocketChannel.class)
来指定 IO 模型,当然,这里也有其他的选择,如果你想指定IO
模型为 BIO,那么这里配置上OioServerSocketChannel.class
类型即可,当然通常我们也不会这么做,因为Netty的优势就在于 NIO。 -
接着,我们调用
childHandler()
方法,给这个引导类创建一个ChannelInitializer
,这里主要就是定义后续每条连接的数据读写,业务处理逻辑。ChannelInitializer
这个类中,我们注意到有一个泛型参数NioSocketChannel
,这个类是 Netty 对 NIO 类型的连接的抽象,而我们前面 NioServerSocketChannel 也是对 NIO 类型的连接的抽象,NioServerSocketChannel和NioSocketChannel的概念可以和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上
一个Netty程序最小化参数配置到这里就完成了,要启动一个Netty服务端,必须要指定三类属性,分别是线程模型、IO 模型、连接读写处理逻辑,有了这三者,之后在调用 bind() 方法绑定一个端口,就可以在这个端口上跑起来
自动绑定递增端口
在上面代码中我们绑定了 8000 端口,接下来我们实现一个稍微复杂一点的逻辑,我们指定一个起始端口号,比如 1000,然后呢,我们从1000号端口往上找一个端口,直到这个端口能够绑定成功,比如 1000 端口不可用,我们就尝试绑定 1001,然后 1002,依次类推。
serverBootstrap.bind(8000)
;这个方法呢,它是一个 异步 的方法,调用之后是立即返回的,他的返回值是一个ChannelFuture,我们可以给这个ChannelFuture添加一个监听器GenericFutureListener,然后我们在GenericFutureListener的 operationComplete
方法里面,我们可以监听端口是否绑定成功,接下来是监测端口是否绑定成功的代码片段
private static void bind(final ServerBootstrap serverBootstrap, final int port) {
serverBootstrap.bind(port).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
System.out.println("端口[ " + port + " ]绑定成功");
} else {
System.out.println("端口[ " + port + " ]绑定失败");
// 通过递归的方式进行端口绑定
bind(serverBootstrap, port + 1);
System.out.println("将在[ " + port + 1 + " ]端口重新绑定");
}
}
});
}
服务端启动其他方法
handler()方法
serverBootstrap.handler(new ChannelInitializer<NioServerSocketChannel>() {
protected void initChannel(NioServerSocketChannel ch) {
System.out.println("服务端启动中");
}
})
handler()方法必须在绑定端口后执行,它可以和我们前面分析的childHandler()方法对应起来,childHandler()用于指定处理新连接数据的读写处理逻辑,handler()用于指定在服务端启动过程中的一些逻辑。对应着NIO selector 中 OP_ACCEPT
事件。通常情况下呢,我们用不着这个方法。
attr() 方法
serverBootstrap.attr(AttributeKey.newInstance("serverName"), "nettyServer")
attr()方法可以给服务端的 channel,也就是NioServerSocketChannel指定一些自定义属性,然后我们可以通过channel.attr()取出这个属性,比如,上面的代码我们指定我们服务端channel的一个serverName属性,属性值为nettyServer,其实说白了就是给NioServerSocketChannel维护一个 map
而已,通常情况下,我们也用不上这个方法。
那么,当然,除了可以给服务端 channel NioServerSocketChannel指定一些自定义属性之外,我们还可以给每一条连接指定自定义属性
childAttr() 方法
serverBootstrap.childAttr(AttributeKey.newInstance("clientKey"), "clientValue")
上面的childAttr可以给每一条连接指定自定义属性,然后后续我们可以通过channel.attr()取出该属性。
attr()和childAttr()的区别:attr()是给netty服务器端自定义一个属性,childAttr()是给每一条链接自定义一个属性。它们都是 map
结构。都是通过channel.attr()取出。
option() 方法
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024)
给服务端channel设置一些属性,最常见的就是so_backlog,如下设置
- 表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
childOption() 方法
serverBootstrap
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
childOption()可以给每条连接设置一些TCP底层相关的属性,比如上面,我们设置了两种TCP属性,其中
-
ChannelOption.SO_KEEPALIVE表示是否开启TCP底层心跳机制,true为开启
-
ChannelOption.TCP_NODELAY表示是否开启Nagle算法,true表示关闭,false表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互,就开启。
客户端启动流程
对于客户端的启动来说,和服务端的启动类似,依然需要线程模型、IO 模型,以及 IO 业务处理逻辑三大参数,下面,我们来看一下客户端启动的标准流程
public class NettyClient {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
});
ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8000)).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()){
System.out.println("连接成功");
}else {
System.out.println("连接失败");
}
}
});
}
}
客户端启动的引导类是 Bootstrap
,负责启动客户端以及连接服务端,而上面在描述服务端的启动的时候,这个辅导类是 ServerBootstrap,引导类创建完成之后,下面我们描述一下客户端启动的流程
-
首先,与服务端的启动一样,我们需要给它指定线程模型,驱动着连接的数据读写
-
然后,我们指定 IO 模型为 NioSocketChannel,表示 IO 模型为 NIO,当然,你可以可以设置 IO 模型为 OioSocketChannel,但是通常不会这么做,因为 Netty 的优势在于 NIO
-
接着,给引导类指定一个 handler,这里主要就是定义连接的业务处理逻辑,不理解没关系,在后面我们会详细分析
-
配置完线程模型、IO 模型、业务处理逻辑之后,调用 connect 方法进行连接,可以看到 connect 方法有两个参数,第一个参数可以填写 IP 或者域名,第二个参数填写的是端口号,由于 connect 方法返回的是一个 Future,也就是说这个方是异步的,我们通过 addListener 方法可以监听到连接是否成功,进而打印出连接信息
失败重连
在网络情况差的情况下,客户端第一次连接可能会连接失败,这个时候我们可能会尝试重新连接,重新连接的逻辑写在连接失败的逻辑块里
重新连接的时候,依然是调用一样的逻辑,因此,我们把建立连接的逻辑先抽取出来,然后在重连失败的时候,递归调用自身
但是,通常情况下,连接建立失败不会立即重新连接,而是会通过一个指数退避的方式,比如每隔 1 秒、2 秒、4 秒、8 秒,以 2 的幂次来建立连接,然后到达一定次数之后就放弃连接,接下来我们就来实现一下这段逻辑,我们默认重试 5 次
connect(bootstrap, "juejin.cn", 80, MAX_RETRY);
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else if (retry == 0) {
System.err.println("重试次数已用完,放弃连接!");
} else {
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
.SECONDS);
}
});
}
从上面的代码可以看到,通过判断连接是否成功以及剩余重试次数,分别执行不同的逻辑
-
如果连接成功则打印连接成功的消息
-
如果连接失败但是重试次数已经用完,放弃连接
-
如果连接失败但是重试次数仍然没有用完,则计算下一次重连间隔 delay,然后定期重连
在上面的代码中,我们看到,我们定时任务是调用 bootstrap.config().group().schedule()
, 其中 bootstrap.config()
这个方法返回的是 BootstrapConfig
,他是对 **Bootstrap **配置参数的抽象,然后 bootstrap.config().group() 返回的就是我们在一开始的时候配置的线程模型 workerGroup
,调 workerGroup
的 schedule
方法即可实现定时任务逻辑。
在 schedule 方法块里面,前面四个参数我们原封不动地传递,最后一个重试次数参数减掉一,就是下一次建立连接时候的上下文信息。
客户端启动其他方法
attr() 方法
bootstrap.attr(AttributeKey.newInstance("clientName"), "nettyClient")
attr() 方法可以给客户端 Channel,也就是 NioSocketChannel
绑定自定义属性,然后我们可以通过 channel.attr()
取出这个属性,比如,上面的代码我们指定我们客户端 Channel 的一个clientName属性,属性值为nettyClient,其实说白了就是给NioSocketChannel维护一个 map
而已,后续在这个 NioSocketChannel 通过参数传来传去的时候,就可以通过他来取出设置的属性,非常方便。
option() 方法
Bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
option() 方法可以给连接设置一些 TCP 底层相关的属性,比如上面,我们设置了三种 TCP 属性,其中
-
ChannelOption.CONNECT_TIMEOUT_MILLIS 表示连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
-
ChannelOption.SO_KEEPALIVE 表示是否开启 TCP 底层心跳机制,true 为开启
-
ChannelOption.TCP_NODELAY 表示是否开始 Nagle 算法,true 表示关闭,false 表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就设置为 true 关闭,如果需要减少发送次数减少网络交互,就设置为 false 开启
服务器与客户端双向通信
知道了服务器端与客户端的启动流程,接下来就可以建立服务端与客户端之间的通信了,用一个小的demo来了解一下服务端和客户端是如何来通信的。
实现的功能是:客户端连接成功之后,向服务端写一段数据 ,服务端收到数据之后打印,并向客户端回一段数据
客户端发数据到服务端
客户端相关的数据读写逻辑是通过 Bootstrap 的 handler() 方法指定
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
// 指定连接数据读写逻辑
}
});
在 initChannel() 方法里面给客户端添加一个逻辑处理器,这个处理器的作用就是负责向服务端写数据
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new FirstClientHandler());
}
});
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(new Date() + "客户端写出数据");
// 获取客户端的输出数据
ByteBuf byteBuf = getByteBuf(ctx);
// 2、向服务器端发送数据
ctx.writeAndFlush(byteBuf);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 1. 获取二进制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 2. 准备数据,指定字符串的字符集为 utf-8
byte[] bytes = "你好 Netty 服务器端".getBytes(StandardCharsets.UTF_8);
// 3. 填充数据到 ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
}
-
这个逻辑处理器继承自 ChannelInboundHandlerAdapter,然后覆盖了 channelActive()方法,这个方法会在客户端连接建立成功之后被调用
-
客户端连接建立成功之后,调用到 channelActive() 方法,在这个方法里面,我们编写向服务端写数据的逻辑
-
写数据的逻辑分为两步:首先我们需要获取一个 netty 对二进制数据的抽象 ByteBuf,上面代码中,
ctx.alloc()
获取到一个 ByteBuf 的内存管理器,这个
内存管理器的作用就是分配一个 ByteBuf,然后我们把字符串的二进制数据填充到 ByteBuf,这样我们就获取到了 Netty 需要的一个数据格式,最后我们调用 ctx.channel().writeAndFlush()
把数据写到服务端
服务端读取客户端数据
服务端相关的数据处理逻辑是通过 ServerBootstrap 的 childHandler() 方法指定
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// 指定连接数据读写逻辑
}
});
我们在 initChannel() 方法里面给服务端添加一个逻辑处理器,这个处理器的作用就是负责读取客户端来的数据
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new FirstServerHandler());
}
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收客户端数据逻辑
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + " : 接收到客户端数据 -> {" + byteBuf.toString(CharsetUtil.UTF_8));
}
}
服务端侧的逻辑处理器同样继承自 ChannelInboundHandlerAdapter
,与客户端不同的是,这里覆盖的方法是 channelRead()
,这个方法在接收到客户端发来的数据之后被回调。
这里的 msg 参数指的就是 Netty 里面数据读写的载体,为什么这里不直接是 ByteBuf,而需要我们强转一下,我们后面会分析到。这里我们强转之后,然后调用 byteBuf.toString() 就能够拿到我们客户端发过来的字符串数据。
服务端回数据给客户端
服务端向客户端写数据逻辑与客户端侧的写数据逻辑一样,先创建一个 ByteBuf,然后填充二进制数据,最后调用 writeAndFlush() 方法写出去,下面是服务端回数据的代码
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收客户端数据逻辑
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + " : 接收到客户端数据 -> {" + byteBuf.toString(CharsetUtil.UTF_8));
// 回应客户端逻辑
System.out.println(new Date() + ": 服务端写出数据");
byteBuf = getByteBuf(ctx, byteBuf);
ctx.writeAndFlush(byteBuf);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx, ByteBuf byteBuf){
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(("[ECHO] " + byteBuf.toString(CharsetUtil.UTF_8)).getBytes(StandardCharsets.UTF_8));
return buffer;
}
客户端的读取数据的逻辑和服务端读取数据的逻辑一样,同样是覆盖 ChannelRead() 方法
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(new Date() + "客户端写出数据");
// 获取客户端的输出数据
ByteBuf byteBuf = getByteBuf(ctx);
// 2、向服务器端发送数据
ctx.writeAndFlush(byteBuf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收客户端数据逻辑
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + " : 接收到服务器端数据 -> {" + byteBuf.toString(CharsetUtil.UTF_8));
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 1. 获取二进制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 2. 准备数据,指定字符串的字符集为 utf-8
byte[] bytes = "你好 Netty 服务器端".getBytes(StandardCharsets.UTF_8);
// 3. 填充数据到 ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
}
-
客户端和服务端的逻辑处理是均是在启动的时候,通过给逻辑处理链 pipeline 添加逻辑处理器,来编写数据的读写逻辑,pipeline 的逻辑我们在后面会分析。
-
在客户端连接成功之后会回调到逻辑处理器的 channelActive() 方法,而不管是服务端还是客户端,收到数据之后都会调用到 channelRead 方法。
-
写数据调用writeAndFlush方法,客户端与服务端交互的二进制数据载体为 ByteBuf,ByteBuf 通过连接的内存管理器创建,字节数据填充到 ByteBuf 之后才能写到对端。