Netty—Day1

小龙 726 2022-04-20

简介

Java中的IO有三种,BIO(阻塞IO)、NIO(非阻塞IO)、AIO(非阻塞IO)

BIO:每个连接创建成功之后都需要一个线程来维护,这就带来如下几个问题:

  1. 线程资源受限:线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费,操作系统耗不起
  2. 线程切换效率低下:单机 CPU 核数固定,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。
  3. 除了以上两个问题,IO 编程中,我们看到数据读写是以字节流为单位。

为了解决这三个问题,JDK 在 1.4 之后提出了 NIO。
NIO:NIO 编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责。这就是 NIO 模型解决线程资源受限的方案,实际开发过程中,我们会开多个线程,每个线程都管理着一批连接,相对于 IO 模型中一个线程管理一条连接,消耗的线程资源大幅减少

  • NIO 的读写是面向 Buffer 的,你可以随意读取里面任何一个字节数据,不需要你自己缓存数据,这一切只需要移动读写指针即可。

我们来看一段NIO 服务器端的代码

public class MyNioServer {
    private static final int PORT = 8080; // 服务器监听端口
    private static final int BUF_SIZE = 10240;// buffer 大小
    private static Selector serverSelector;
    private static Selector clientSelector;
    static {
        try {
            serverSelector = Selector.open();
            clientSelector = Selector.open();
        } catch (Exception e) {

        }
    }
    private void initServer() throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
        serverSocketChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
        System.out.println("在 " + PORT + " 端口监听");
        while (true) {
            serverSelector.select();
            Set<SelectionKey> keys = serverSelector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {// 是一个链接事件
                    doAccept(key);
                }
            }
        }
    }
    private void readAndWrite() throws IOException {
        while (true) {
            if (clientSelector.select(1) > 0) {
                Set<SelectionKey> keys = clientSelector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isReadable()) { // 是一个可读事件
                        doRead(key);
                    } else if (key.isWritable() && key.isValid()) {// 是一个读事件
                        doWrite(key);
                    }
                }
            }
        }
    }
    private void doAccept(SelectionKey key) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        System.out.println("ServerSocketChannel正在循环监听");
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(clientSelector, SelectionKey.OP_READ);
    }
    private void doRead(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
        int read = socketChannel.read(byteBuffer);
        byte[] bytes = null;
        StringBuilder info = new StringBuilder();
        while (read > 0) {
            byteBuffer.flip(); // 重置缓冲区,将输入与输出结构转换
            bytes = byteBuffer.array();
            info.append(new String(bytes).trim());
            System.out.println("客户端发送来的消息是:" + info);
            byteBuffer.clear();
            read = socketChannel.read(byteBuffer);
        }
        byteBuffer.clear();
        byteBuffer.put(("[ECHO]" + info.toString()).getBytes(StandardCharsets.UTF_8));
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
        socketChannel.register(clientSelector, SelectionKey.OP_WRITE);
    }
    private void doWrite(SelectionKey key) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
        SocketChannel socketChannel = (SocketChannel) key.channel();
        socketChannel.read(buffer);
        while (buffer.hasRemaining()) {
            socketChannel.write(buffer);
        }
        socketChannel.close();
    }
    public static void main(String[] args){
        Thread init = new Thread(() -> {
            MyNioServer myNioServer = new MyNioServer();
            try {
                myNioServer.initServer();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        Thread read = new Thread(() -> {
            MyNioServer myNioServer = new MyNioServer();
            try {
                myNioServer.readAndWrite();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        read.start();
        init.start();
    }
}
  1. NIO 模型中通常会有两个线程,每个线程绑定一个轮询器 selector ,在我们这个例子中serverSelector负责轮询是否有新的连接,clientSelector负责轮询连接是否有数据可读

  2. 服务端监测到新的连接之后,不再创建一个新的线程,而是直接将新连接绑定到clientSelector上,这样就不用 IO 模型中 1w 个 while 循环在死等

  3. clientSelector被一个 while 死循环包裹着,如果在某一时刻有多条连接有数据可读,那么通过 clientSelector.select(1)方法可以轮询出来,进而批量处理

  4. 数据的读写面向 Buffer

Netty编程

Netty 封装了 JDK 的 NIO,让你用得更爽,你不用再写一大堆复杂的代码了。 用官方正式的话来说就是:Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。

总结为什么使用 Netty 不使用 JDK 原生 NIO.

  1. 使用 JDK 自带的NIO需要了解太多的概念,编程复杂,一不小心 bug 横飞

  2. Netty 底层 IO 模型随意切换,而这一切只需要做微小的改动,改改参数,Netty可以直接从 NIO 模型变身为 IO 模型

  3. Netty 自带的拆包解包,异常检测等机制让你从NIO的繁重细节中脱离出来,让你只需要关心业务逻辑

  4. Netty 解决了 JDK 的很多包括空轮询在内的 Bug

  5. Netty 底层对线程,selector 做了很多细小的优化,精心设计的 reactor 线程模型做到非常高效的并发处理

  6. 自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手

  7. Netty 社区活跃,遇到问题随时邮件列表或者 issue

  8. Netty 已经历各大 RPC 框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大

Netty代码开发

引入 Maven 依赖,版本 4.1.76.Final 版本

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.76.Final</version>
</dependency>

接下来实现一个简单的 ECHO 程序
服务器端代码

public class NettyServer {
    public static void main(String[] args) {
        // netty服务器端 使用ServerBootstrap
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        /**
         * NioEventLoopGroup 创建一个Nio事件处理线程池,为指定线程池大小,默认开启的线程数是硬件线程数的两倍
         */
        // 会创建两个线程池,一个用于处理连接请求操作,一个用于处理数据操作
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); // 用于处理连接操作
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理数据操作

        serverBootstrap.group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class) // 服务器端通道
                /**
                 * 子处理器,这里可以添加具体的处理业务逻辑
                 * NioSocketChannel:客户端的通道
                 */
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder(StandardCharsets.UTF_8)); // 编码器
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                                System.out.println(s);
                            }
                        });
                    }
                })
                .bind(8080);// 服务器端绑定端口
    }
}

这么一小段代码就实现了我们前面 NIO 编程中的所有的功能,包括服务端启动,接受新连接,打印客户端传来的数据。

  1. bossGroup 对应 IOServer.java 中的接受新连接线程,主要负责创建新连接

  2. workerGroup 对应 IOServer.java 中的负责读取数据的线程,主要用于读取数据以及业务逻辑处理

客户端代码

public class NettyClient {
    public static void main(String[] args) {
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(msg);
                            }
                        });
                    }
                });
        ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080));
        Channel channel = future.channel();
        while (true) {
            channel.writeAndFlush(new Date() + "hello netty");
            try {
                TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

在客户端程序中,group对应了我们IOClient.java中 main 函数起的线程

服务端启动流程

来分析一下上面服务器端的代码

  • 首先看到,我们创建了两个 NioEventLoopGroup,这两个对象可以看做是传统IO编程模型的两大线程组,bossGroup 表示监听端口,accept 新连接的线程组,workerGroup 表示处理每一条连接的数据读写的线程组。用生活中的例子来讲就是,一个工厂要运作,必然要有一个老板负责从外面接活,然后有很多员工,负责具体干活,老板就是 bossGroup,员工们就是workerGroup,bossGroup接收完连接,扔给workerGroup去处理。

  • 接下来 我们创建了一个引导类 ServerBootstrap,这个类将引导我们进行服务端的启动工作,直接new出来开搞。

  • 我们通过 .group(bossGroup, workerGroup) 给引导类配置两大线程组,这个引导类的线程模型也就定型了。

  • 然后,我们指定我们服务端的 IO 模型为 NIO,我们通过 .channel(NioServerSocketChannel.class) 来指定 IO 模型,当然,这里也有其他的选择,如果你想指定 IO 模型为 BIO,那么这里配置上 OioServerSocketChannel.class 类型即可,当然通常我们也不会这么做,因为Netty的优势就在于 NIO

  • 接着,我们调用 childHandler() 方法,给这个引导类创建一个 ChannelInitializer ,这里主要就是定义后续每条连接的数据读写,业务处理逻辑ChannelInitializer 这个类中,我们注意到有一个泛型参数 NioSocketChannel,这个类是 Netty 对 NIO 类型的连接的抽象,而我们前面 NioServerSocketChannel 也是对 NIO 类型的连接的抽象,NioServerSocketChannelNioSocketChannel的概念可以和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上

一个Netty程序最小化参数配置到这里就完成了,要启动一个Netty服务端,必须要指定三类属性,分别是线程模型IO 模型连接读写处理逻辑,有了这三者,之后在调用 bind() 方法绑定一个端口,就可以在这个端口上跑起来

自动绑定递增端口

在上面代码中我们绑定了 8000 端口,接下来我们实现一个稍微复杂一点的逻辑,我们指定一个起始端口号,比如 1000,然后呢,我们从1000号端口往上找一个端口,直到这个端口能够绑定成功,比如 1000 端口不可用,我们就尝试绑定 1001,然后 1002,依次类推。

serverBootstrap.bind(8000);这个方法呢,它是一个 异步 的方法,调用之后是立即返回的,他的返回值是一个ChannelFuture,我们可以给这个ChannelFuture添加一个监听器GenericFutureListener,然后我们在GenericFutureListeneroperationComplete 方法里面,我们可以监听端口是否绑定成功,接下来是监测端口是否绑定成功的代码片段

private static void bind(final ServerBootstrap serverBootstrap, final int port) {
        serverBootstrap.bind(port).addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("端口[ " + port + " ]绑定成功");
                } else {
                    System.out.println("端口[ " + port + " ]绑定失败");
                    // 通过递归的方式进行端口绑定
                    bind(serverBootstrap, port + 1);
                    System.out.println("将在[ " + port + 1 + " ]端口重新绑定");
                }
            }
        });
    }

服务端启动其他方法

handler()方法

serverBootstrap.handler(new ChannelInitializer<NioServerSocketChannel>() {
    protected void initChannel(NioServerSocketChannel ch) {
        System.out.println("服务端启动中");
    }
})

handler()方法必须在绑定端口后执行,它可以和我们前面分析的childHandler()方法对应起来,childHandler()用于指定处理新连接数据的读写处理逻辑,handler()用于指定在服务端启动过程中的一些逻辑。对应着NIO selector 中 OP_ACCEPT 事件。通常情况下呢,我们用不着这个方法。

attr() 方法

serverBootstrap.attr(AttributeKey.newInstance("serverName"), "nettyServer")

attr()方法可以给服务端的 channel,也就是NioServerSocketChannel指定一些自定义属性,然后我们可以通过channel.attr()取出这个属性,比如,上面的代码我们指定我们服务端channel的一个serverName属性,属性值为nettyServer,其实说白了就是给NioServerSocketChannel维护一个 map 而已,通常情况下,我们也用不上这个方法。

那么,当然,除了可以给服务端 channel NioServerSocketChannel指定一些自定义属性之外,我们还可以给每一条连接指定自定义属性

childAttr() 方法

serverBootstrap.childAttr(AttributeKey.newInstance("clientKey"), "clientValue")

上面的childAttr可以给每一条连接指定自定义属性,然后后续我们可以通过channel.attr()取出该属性。

attr()和childAttr()的区别:attr()是给netty服务器端自定义一个属性,childAttr()是给每一条链接自定义一个属性。它们都是 map 结构。都是通过channel.attr()取出。

option() 方法

serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024)

给服务端channel设置一些属性,最常见的就是so_backlog,如下设置

  • 表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数

childOption() 方法

serverBootstrap
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.TCP_NODELAY, true)

childOption()可以给每条连接设置一些TCP底层相关的属性,比如上面,我们设置了两种TCP属性,其中

  • ChannelOption.SO_KEEPALIVE表示是否开启TCP底层心跳机制,true为开启

  • ChannelOption.TCP_NODELAY表示是否开启Nagle算法,true表示关闭,false表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互,就开启。

客户端启动流程

对于客户端的启动来说,和服务端的启动类似,依然需要线程模型、IO 模型,以及 IO 业务处理逻辑三大参数,下面,我们来看一下客户端启动的标准流程

public class NettyClient {
    public static void main(String[] args) {
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(msg);
                            }
                        });
                    }
                });
        ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8000)).addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                if (future.isSuccess()){
                    System.out.println("连接成功");
                }else {
                    System.out.println("连接失败");
                }
            }
        });
    }
}

客户端启动的引导类是 Bootstrap,负责启动客户端以及连接服务端,而上面在描述服务端的启动的时候,这个辅导类是 ServerBootstrap,引导类创建完成之后,下面我们描述一下客户端启动的流程

  1. 首先,与服务端的启动一样,我们需要给它指定线程模型,驱动着连接的数据读写

  2. 然后,我们指定 IO 模型为 NioSocketChannel,表示 IO 模型为 NIO,当然,你可以可以设置 IO 模型为 OioSocketChannel,但是通常不会这么做,因为 Netty 的优势在于 NIO

  3. 接着,给引导类指定一个 handler,这里主要就是定义连接的业务处理逻辑,不理解没关系,在后面我们会详细分析

  4. 配置完线程模型、IO 模型、业务处理逻辑之后,调用 connect 方法进行连接,可以看到 connect 方法有两个参数,第一个参数可以填写 IP 或者域名,第二个参数填写的是端口号,由于 connect 方法返回的是一个 Future,也就是说这个方是异步的,我们通过 addListener 方法可以监听到连接是否成功,进而打印出连接信息

失败重连

在网络情况差的情况下,客户端第一次连接可能会连接失败,这个时候我们可能会尝试重新连接,重新连接的逻辑写在连接失败的逻辑块里

重新连接的时候,依然是调用一样的逻辑,因此,我们把建立连接的逻辑先抽取出来,然后在重连失败的时候,递归调用自身

但是,通常情况下,连接建立失败不会立即重新连接,而是会通过一个指数退避的方式,比如每隔 1 秒、2 秒、4 秒、8 秒,以 2 的幂次来建立连接,然后到达一定次数之后就放弃连接,接下来我们就来实现一下这段逻辑,我们默认重试 5 次

connect(bootstrap, "juejin.cn", 80, MAX_RETRY);

private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
    bootstrap.connect(host, port).addListener(future -> {
        if (future.isSuccess()) {
            System.out.println("连接成功!");
        } else if (retry == 0) {
            System.err.println("重试次数已用完,放弃连接!");
        } else {
            // 第几次重连
            int order = (MAX_RETRY - retry) + 1;
            // 本次重连的间隔
            int delay = 1 << order;
            System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
            bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
                    .SECONDS);
        }
    });
}

从上面的代码可以看到,通过判断连接是否成功以及剩余重试次数,分别执行不同的逻辑

  1. 如果连接成功则打印连接成功的消息

  2. 如果连接失败但是重试次数已经用完,放弃连接

  3. 如果连接失败但是重试次数仍然没有用完,则计算下一次重连间隔 delay,然后定期重连

在上面的代码中,我们看到,我们定时任务是调用 bootstrap.config().group().schedule(), 其中 bootstrap.config() 这个方法返回的是 BootstrapConfig,他是对 **Bootstrap **配置参数的抽象,然后 bootstrap.config().group() 返回的就是我们在一开始的时候配置的线程模型 workerGroup,调 workerGroupschedule 方法即可实现定时任务逻辑。

在 schedule 方法块里面,前面四个参数我们原封不动地传递,最后一个重试次数参数减掉一,就是下一次建立连接时候的上下文信息。

客户端启动其他方法

attr() 方法

bootstrap.attr(AttributeKey.newInstance("clientName"), "nettyClient")

attr() 方法可以给客户端 Channel,也就是 NioSocketChannel 绑定自定义属性,然后我们可以通过 channel.attr() 取出这个属性,比如,上面的代码我们指定我们客户端 Channel 的一个clientName属性,属性值为nettyClient,其实说白了就是给NioSocketChannel维护一个 map 而已,后续在这个 NioSocketChannel 通过参数传来传去的时候,就可以通过他来取出设置的属性,非常方便。

option() 方法

Bootstrap
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.TCP_NODELAY, true)

option() 方法可以给连接设置一些 TCP 底层相关的属性,比如上面,我们设置了三种 TCP 属性,其中

  • ChannelOption.CONNECT_TIMEOUT_MILLIS 表示连接的超时时间,超过这个时间还是建立不上的话则代表连接失败

  • ChannelOption.SO_KEEPALIVE 表示是否开启 TCP 底层心跳机制,true 为开启

  • ChannelOption.TCP_NODELAY 表示是否开始 Nagle 算法,true 表示关闭,false 表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就设置为 true 关闭,如果需要减少发送次数减少网络交互,就设置为 false 开启

服务器与客户端双向通信

知道了服务器端与客户端的启动流程,接下来就可以建立服务端与客户端之间的通信了,用一个小的demo来了解一下服务端和客户端是如何来通信的。

实现的功能是:客户端连接成功之后,向服务端写一段数据 ,服务端收到数据之后打印,并向客户端回一段数据

客户端发数据到服务端

客户端相关的数据读写逻辑是通过 Bootstrap 的 handler() 方法指定

.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        // 指定连接数据读写逻辑
    }
});

在 initChannel() 方法里面给客户端添加一个逻辑处理器,这个处理器的作用就是负责向服务端写数据

.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline().addLast(new FirstClientHandler());
    }
});
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(new Date() + "客户端写出数据");
        // 获取客户端的输出数据
        ByteBuf byteBuf = getByteBuf(ctx);
        // 2、向服务器端发送数据
        ctx.writeAndFlush(byteBuf);
    }
    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        // 1. 获取二进制抽象 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        // 2. 准备数据,指定字符串的字符集为 utf-8
        byte[] bytes = "你好 Netty 服务器端".getBytes(StandardCharsets.UTF_8);
        // 3. 填充数据到 ByteBuf
        buffer.writeBytes(bytes);
        return buffer;
    }
}
  1. 这个逻辑处理器继承自 ChannelInboundHandlerAdapter,然后覆盖了 channelActive()方法,这个方法会在客户端连接建立成功之后被调用

  2. 客户端连接建立成功之后,调用到 channelActive() 方法,在这个方法里面,我们编写向服务端写数据的逻辑

  3. 写数据的逻辑分为两步:首先我们需要获取一个 netty 对二进制数据的抽象 ByteBuf,上面代码中, ctx.alloc() 获取到一个 ByteBuf 的内存管理器,这个

内存管理器的作用就是分配一个 ByteBuf,然后我们把字符串的二进制数据填充到 ByteBuf,这样我们就获取到了 Netty 需要的一个数据格式,最后我们调用 ctx.channel().writeAndFlush() 把数据写到服务端

服务端读取客户端数据

服务端相关的数据处理逻辑是通过 ServerBootstrap 的 childHandler() 方法指定

.childHandler(new ChannelInitializer<NioSocketChannel>() {
   protected void initChannel(NioSocketChannel ch) {
       // 指定连接数据读写逻辑
   }
});

我们在 initChannel() 方法里面给服务端添加一个逻辑处理器,这个处理器的作用就是负责读取客户端来的数据

@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast(new FirstServerHandler());
}
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 接收客户端数据逻辑
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(new Date() + " : 接收到客户端数据 -> {" + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

服务端侧的逻辑处理器同样继承自 ChannelInboundHandlerAdapter,与客户端不同的是,这里覆盖的方法是 channelRead() ,这个方法在接收到客户端发来的数据之后被回调。

这里的 msg 参数指的就是 Netty 里面数据读写的载体,为什么这里不直接是 ByteBuf,而需要我们强转一下,我们后面会分析到。这里我们强转之后,然后调用 byteBuf.toString() 就能够拿到我们客户端发过来的字符串数据。

服务端回数据给客户端

服务端向客户端写数据逻辑与客户端侧的写数据逻辑一样,先创建一个 ByteBuf,然后填充二进制数据,最后调用 writeAndFlush() 方法写出去,下面是服务端回数据的代码

public class FirstServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 接收客户端数据逻辑
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(new Date() + " : 接收到客户端数据 -> {" + byteBuf.toString(CharsetUtil.UTF_8));

        // 回应客户端逻辑
        System.out.println(new Date() + ": 服务端写出数据");
        byteBuf = getByteBuf(ctx, byteBuf);
        ctx.writeAndFlush(byteBuf);
    }
    private ByteBuf getByteBuf(ChannelHandlerContext ctx,  ByteBuf byteBuf){
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes(("[ECHO] " + byteBuf.toString(CharsetUtil.UTF_8)).getBytes(StandardCharsets.UTF_8));
        return buffer;
    }

客户端的读取数据的逻辑和服务端读取数据的逻辑一样,同样是覆盖 ChannelRead() 方法

public class FirstClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(new Date() + "客户端写出数据");
        // 获取客户端的输出数据
        ByteBuf byteBuf = getByteBuf(ctx);
        // 2、向服务器端发送数据
        ctx.writeAndFlush(byteBuf);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 接收客户端数据逻辑
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(new Date() + " : 接收到服务器端数据 -> {" + byteBuf.toString(CharsetUtil.UTF_8));
    }
    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        // 1. 获取二进制抽象 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        // 2. 准备数据,指定字符串的字符集为 utf-8
        byte[] bytes = "你好 Netty 服务器端".getBytes(StandardCharsets.UTF_8);
        // 3. 填充数据到 ByteBuf
        buffer.writeBytes(bytes);
        return buffer;
    }
}
  • 客户端和服务端的逻辑处理是均是在启动的时候,通过给逻辑处理链 pipeline 添加逻辑处理器,来编写数据的读写逻辑,pipeline 的逻辑我们在后面会分析。

  • 在客户端连接成功之后会回调到逻辑处理器的 channelActive() 方法,而不管是服务端还是客户端,收到数据之后都会调用到 channelRead 方法。

  • 写数据调用writeAndFlush方法,客户端与服务端交互的二进制数据载体为 ByteBuf,ByteBuf 通过连接的内存管理器创建,字节数据填充到 ByteBuf 之后才能写到对端。


# Netty