Netty — Day4 ChannelHandler生命周期

小龙 884 2022-04-24

channelHandler 的生命周期

ChannelHandler 有很多回调方法,这些回调方法的执行是有顺序的,而这个执行顺序可以称为 ChannelHandler 的生命周期。

了解channelHandler的生命周期我们可以添加一个自定义 ChannelHandler 来测试一下各个回调方法的执行顺序。

public class ChannelHandlerLifecycle extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("逻辑处理器被添加:handlerAdded()");
        super.handlerAdded(ctx);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 绑定到线程(NioEventLoop):channelRegistered()");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 准备就绪:channelActive()");
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channel 有数据可读:channelRead()");
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 某次数据读完:channelReadComplete()");
        super.channelReadComplete(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 被关闭:channelInactive()");
        super.channelInactive(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel 取消线程(NioEventLoop) 的绑定: channelUnregistered()");
        super.channelUnregistered(ctx);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("逻辑处理器被移除:handlerRemoved()");
        super.handlerRemoved(ctx);
    }
}

执行结果

逻辑处理器被添加:handlerAdded()
channel 绑定到线程(NioEventLoop):channelRegistered()
channel 准备就绪:channelActive()
channel 有数据可读:channelRead()
rsthe 开始登陆!
channel 某次数据读完:channelReadComplete()
channel 有数据可读:channelRead()
2022-04-24 17:39:37 收到客户端的消息 :1
channel 被关闭:channelInactive()
channel 取消线程(NioEventLoop) 的绑定: channelUnregistered()
逻辑处理器被移除:handlerRemoved()

可以看到,ChannelHandler 回调方法的执行顺序为

handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete()

逐个解释一下每个回调方法的含义

  1. handlerAdded() :指的是当检测到新连接之后,调用 ch.pipeline().addLast(new LifeCyCleTestHandler()); 之后的回调,表示在当前的 channel 中,已经成功添加了一个 handler 处理器。

  2. channelRegistered():这个回调方法,表示当前的 channel 的所有的逻辑处理已经和某个 NIO 线程建立了绑定关系,类似 BIO 编程中,accept 到新的连接,然后创建一个线程来处理这条连接的读写,只不过 Netty 里面是使用了线程池的方式,只需要从线程池里面去抓一个线程绑定在这个 channel 上即可,这里的 NIO 线程通常指的是 NioEventLoop,不理解没关系,后面我们还会讲到。

  3. channelActive() :当 channel 的所有的业务逻辑链准备完毕(也就是说 channel 的 pipeline 中已经添加完所有的 handler)以及绑定好一个 NIO 线程之后,这条连接算是真正激活了,接下来就会回调到此方法。

  4. channelRead():客户端向服务端发来数据,每次都会回调此方法,表示有数据可读。

  5. channelReadComplete():服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕。

客户端关闭,这个时候对于服务端来说,其实就是 channel 被关闭,这时候 ChannelHandler 回调方法的执行顺序为
channelInactive() -> channelUnregistered() -> handlerRemoved()

到了这里,相信大家应该已经能够看到,这里的回调方法的执行顺序是新连接建立时候的逆操作,下面我们还是来解释一下每个方法的含义

  1. channelInactive(): 表面这条连接已经被关闭了,这条连接在 TCP 层面已经不再是 ESTABLISH 状态了

  2. channelUnregistered(): 既然连接已经被关闭,那么与这条连接绑定的线程就不需要对这条连接负责了,这个回调就表明与这条连接对应的 NIO 线程移除掉对这条连接的处理

  3. chandlerRemoved():最后,我们给这条连接上添加的所有的业务逻辑处理器都给移除掉。

用一幅图来标识 ChannelHandler 的生命周期

image.png

ChannelHandler 生命周期各回调方法用法举例

来看一下这些回调方法的使用场景

Netty 对于一条连接的在各个不同状态下回调方法的定义还是蛮细致的,这个好处就在于我们能够基于这个机制写出扩展性较好的应用程序。

ChannelInitializer 的实现原理

我们在给新连接定义 handler 的时候,其实只是通过 childHandler() 方法给新连接设置了一个 handler,这个 handler 就是 ChannelInitializer,而在 ChannelInitializer 的 initChannel() 方法里面,我们通过拿到 channel 对应的 pipeline,然后往里面塞 handler

这里的 ChannelInitializer 其实就利用了 Netty 的 handler 生命周期中 channelRegistered()handlerAdded() 两个特性,我们简单翻一翻 ChannelInitializer 这个类的源代码:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
    private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(new ConcurrentHashMap());
    public ChannelInitializer() { }
    protected abstract void initChannel(C var1) throws Exception;
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (this.initChannel(ctx)) {
            ctx.pipeline().fireChannelRegistered();
            this.removeState(ctx);
        } else {
            ctx.fireChannelRegistered();
        }
    }
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause);
        }
        ctx.close();
    }
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered() && this.initChannel(ctx)) {
            this.removeState(ctx);
        }
    }
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        this.initMap.remove(ctx);
    }
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (this.initMap.add(ctx)) {
            try {
                this.initChannel(ctx.channel());
            } catch (Throwable var6) {
                this.exceptionCaught(ctx, var6);
            } finally {
                if (!ctx.isRemoved()) {
                    ctx.pipeline().remove(this);
                }
            }
            return true;
        } else {
            return false;
        }
    }
    private void removeState(final ChannelHandlerContext ctx) {
        if (ctx.isRemoved()) {
            this.initMap.remove(ctx);
        } else {
            ctx.executor().execute(new Runnable() {
                public void run() {
                    ChannelInitializer.this.initMap.remove(ctx);
                }
            });
        }
    }
}
  1. ChannelInitializer 定义了一个抽象的方法 initChannel(),这个抽象方法由我们自行实现,我们在服务端启动的流程里面的实现逻辑就是往 pipeline 里面塞我们的 handler 链

  2. handlerAdded()channelRegistered() 方法,都会尝试去调用 initChannel() 方法,initChannel() 使用 putIfAbsent() 来防止 initChannel() 被调用多次

  3. 如果你 debug 了 ChannelInitializer 的上述两个方法,你会发现,在 handlerAdded() 方法被调用的时候,channel 其实已经和某个线程绑定上了,所以,就我们的应用程序来说,这里的 channelRegistered() 其实是多余的,那为什么这里还要尝试调用一次呢?我猜测应该是担心我们自己写了个类继承自 ChannelInitializer,然后覆盖掉了 handlerAdded() 方法,这样即使覆盖掉,在 channelRegistered() 方法里面还有机会再调一次 initChannel(),把我们自定义的 handler 都添加到 pipeline 中去。

handlerAdded() 与 handlerRemoved()

这两个方法通常可以用在一些资源的申请和释放

channelActive() 与 channelInActive()

  1. 对我们的应用程序来说,这两个方法表明的含义是 TCP 连接的建立与释放,通常我们在这两个回调里面统计单机的连接数,channelActive() 被调用,连接数加一,channelInActive() 被调用,连接数减一

  2. 另外,我们也可以在 channelActive() 方法中,实现对客户端连接 ip 黑白名单的过滤,具体这里就不展开了

channelRead()

拆包粘包原理,服务端根据自定义协议来进行拆包,其实就是在这个方法里面,每次读到一定的数据,都会累加到一个容器里面,然后判断是否能够拆出来一个完整的数据包,如果够的话就拆了之后,往下进行传递

channelReadComplete()

前面小节中,我们在每次向客户端写数据的时候,都通过 writeAndFlush() 的方法写并刷新到底层,其实这种方式不是特别高效,我们可以在之前调用 writeAndFlush() 的地方都调用 write() 方法,然后在这个方面里面调用 ctx.channel().flush() 方法,相当于一个批量刷新的机制,当然,如果你对性能要求没那么高,writeAndFlush() 足矣。


# Netty