温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

netty pipeline中的inbound和outbound事件怎么传播

发布时间:2023-04-25 17:13:08 来源:亿速云 阅读:102 作者:iii 栏目:开发技术

这篇文章主要介绍了netty pipeline中的inbound和outbound事件怎么传播的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇netty pipeline中的inbound和outbound事件怎么传播文章都会有所收获,下面我们一起来看看吧。

    传播inbound事件

    有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程。

    在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到channelRead方法中了呢, 也是我们这一小节要剖析的内容。

    在业务代码中, 传递channelRead事件方式是通过fireChannelRead方法进行传播的。

    两种写法

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //写法1:
        ctx.fireChannelRead(msg);
        //写法2
        ctx.pipeline().fireChannelRead(msg);
    }

    这里重写了channelRead方法, 并且方法体内继续通过fireChannelRead方法进行传播channelRead事件, 那么这两种写法有什么异同?

    我们先以写法2为例, 将这种写法进行剖析。

    这里首先获取当前contextpipeline对象, 然后通过pipeline对象调用自身的fireChannelRead方法进行传播, 因为默认创建的DefaultChannelpipeline

    DefaultChannelPipeline.fireChannelRead(msg)

    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

    这里首先调用的是AbstractChannelHandlerContext类的静态方法invokeChannelRead, 参数传入head节点和事件的消息

    AbstractChannelHandlerContext.invokeChannelRead(head, msg)

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    这里的m通常就是我们传入的msg, 而next, 目前是head节点, 然后再判断是否为当前eventLoop线程, 如果不是则将方法包装成task交给eventLoop线程处理

    AbstractChannelHandlerContext.invokeChannelRead(m)
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

    首先通过invokeHandler()判断当前handler是否已添加, 如果添加, 则执行当前handlerchanelRead方法, 其实这里就明白了, 通过fireChannelRead方法传递事件的过程中, 其实就是找到相关handler执行其channelRead方法, 由于我们在这里的handler就是head节点, 所以我们跟到HeadContextchannelRead方法中

    HeadContext的channelRead方法
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //向下传递channelRead事件
        ctx.fireChannelRead(msg);
    }

    在这里我们看到, 这里通过fireChannelRead方法继续往下传递channelRead事件, 而这种调用方式, 就是我们刚才分析用户代码的第一种调用方式

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //写法1:
        ctx.fireChannelRead(msg);
        //写法2
        ctx.pipeline().fireChannelRead(msg);
    }

    这里直接通过context对象调用fireChannelRead方法, 那么和使用pipeline调用有什么区别的, 我会回到HeadConetxchannelRead方法, 我们来剖析ctx.fireChannelRead(msg)这句, 大家就会对这个问题有答案了, 跟到ctxfireChannelRead方法中, 这里会走到AbstractChannelHandlerContext类中的fireChannelRead方法中

    AbstractChannelHandlerContext.fireChannelRead(msg)

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    这里我们看到, invokeChannelRead方法中传入了一个findContextInbound()参数, 而这findContextInbound方法其实就是找到当前Context的下一个节点

    AbstractChannelHandlerContext.findContextInbound()

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

    这里的逻辑也比较简单, 是通过一个doWhile循环, 找到当前handlerContext的下一个节点, 这里要注意循环的终止条件, while (!ctx.inbound)表示下一个context标志的事件不是inbound的事件, 则循环继续往下找, 言外之意就是要找到下一个标注inbound事件的节点

    有关事件的标注, 之前已经进行了分析, 如果是用户定义的handler, 是通过handler继承的接口而定的, 如果tail或者head, 那么是在初始化的时候就已经定义好, 这里不再赘述

    回到AbstractChannelHandlerContext.fireChannelRead(msg)

    AbstractChannelHandlerContext.fireChannelRead(msg)

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    找到下一个节点后, 继续调用invokeChannelRead方法, 传入下一个和消息对象

    AbstractChannelHandlerContext.invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    这里的逻辑我们又不陌生了, 因为我们传入的是当前context的下一个节点, 所以这里会调用下一个节点invokeChannelRead方法, 因我们刚才剖析的是head节点, 所以下一个节点有可能是用户添加的handler的包装类HandlerConext的对象

    AbstractChannelHandlerContext.invokeChannelRead(Object msg)
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try { 
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                //发生异常的时候在这里捕获异常
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

    又是我们熟悉的逻辑, 调用了自身handlerchannelRead方法, 如果是用户自定义的handler, 则会走到用户定义的channelRead()方法中去, 所以这里就解释了为什么通过传递channelRead事件, 最终会走到用户重写的channelRead方法中去

    同样, 也解释了该小节最初提到过的两种写法的区别

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //写法1:
        ctx.fireChannelRead(msg);
        //写法2
        ctx.pipeline().fireChannelRead(msg);
    }
    • 写法1是通过当前节点往下传播事件

    • 写法2是通过头节点往下传递事件

    • 所以, 在handler中如果要在channelRead方法中传递channelRead事件, 一定要采用写法1的方式向下传递, 或者交给其父类处理, 如果采用2的写法则每次事件传输到这里都会继续从head节点传输, 从而陷入死循环或者发生异常

    • 还有一点需要注意, 如果用户代码中channelRead方法, 如果没有显示的调用ctx.fireChannelRead(msg)那么事件则不会再往下传播, 则事件会在这里终止, 所以如果我们写业务代码的时候要考虑有关资源释放的相关操作

    如果ctx.fireChannelRead(msg)则事件会继续往下传播, 如果每一个handler都向下传播事件, 当然, 根据我们之前的分析channelRead事件只会在标识为inbound事件的HandlerConetext中传播, 传播到最后, 则最终会调用到tail节点的channelRead方法

    tailConext的channelRead方法

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        onUnhandledInboundMessage(msg);
    }

    onUnhandledInboundMessage(msg)

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            //释放资源
            ReferenceCountUtil.release(msg);
        }
    }

    这里做了释放资源的相关的操作

    到这里,对于inbound事件的传输流程以及channelRead方法的执行流程已经分析完毕。

    传播outBound事件

    有关于outBound事件, 和inbound正好相反,以自己为基准, 流向对方的事件, 比如最常见的wirte事件

    在业务代码中, , 有可能使用wirte方法往写数据

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().write("test data");
    }

    当然, 直接调用write方法是不能往对方channel中写入数据的, 因为这种方式只能写入到缓冲区, 还要调用flush方法才能将缓冲区数据刷到channel中, 或者直接调用writeAndFlush方法, 有关逻辑, 我们会在后面章节中详细讲解, 这里只是以wirte方法为例为了演示outbound事件的传播的流程

    两种写法

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //写法1
        ctx.channel().write("test data");
        //写法2
        ctx.write("test data");
    }

    这两种写法有什么区别, 首先分析第一种写法

    //这里获取ctx所绑定的channel
    ctx.channel().write("test data");

    AbstractChannel.write(Object msg)

    public ChannelFuture write(Object msg) {
    	//这里pipeline是DefaultChannelPipeline
        return pipeline.write(msg);
    }

    继续跟踪DefaultChannelPipeline.write(msg)

    DefaultChannelPipeline.write(msg)

    public final ChannelFuture write(Object msg) {
        //从tail节点开始(从最后的节点往前写)
        return tail.write(msg);
    }

    这里调用tail节点write方法, 这里我们应该能分析到, outbound事件, 是通过tail节点开始往上传播的。

    其实tail节点并没有重写write方法, 最终会调用其父类AbstractChannelHandlerContext.write方法

    AbstractChannelHandlerContext.write(Object msg)
    public ChannelFuture write(Object msg) { 
        return write(msg, newPromise());
    }

    这里有个newPromise()这个方法, 这里是创建一个Promise对象, 有关Promise的相关知识会在以后章节进行分析,继续分析write

    AbstractChannelHandlerContext.write(final Object msg, final ChannelPromise promise)
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        /**
         * 省略
         * */
        write(msg, false, promise);
        return promise;
    }
    AbstractChannelHandlerContext.write(Object msg, boolean flush, ChannelPromise promise)
    private void write(Object msg, boolean flush, ChannelPromise promise) { 
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                //没有调flush
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

    这里跟我们之前分析过channelRead方法有点类似, 但是事件传输的方向有所不同, 这里findContextOutbound()是获取上一个标注outbound事件的HandlerContext

    AbstractChannelHandlerContext.findContextOutbound()
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

    这里的逻辑跟之前的findContextInbound()方法有点像, 只是过程是反过来的

    在这里, 会找到当前context的上一个节点, 如果标注的事件不是outbound事件, 则继续往上找, 意思就是找到上一个标注outbound事件的节点

    回到AbstractChannelHandlerContext.write方法

    AbstractChannelHandlerContext next = findContextOutbound();

    这里将找到节点赋值到next属性中,因为我们之前分析的write事件是从tail节点传播的, 所以上一个节点就有可能是用户自定的handler所属的context

    然后判断是否为当前eventLoop线程, 如果是不是, 则封装成task异步执行, 如果不是, 则继续判断是否调用了flush方法, 因为我们这里没有调用, 所以会执行到next.invokeWrite(m, promise)

    AbstractChannelHandlerContext.invokeWrite(Object msg, ChannelPromise promise)
    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }

    这里会判断当前handler的状态是否是添加状态, 这里返回的是true, 将会走到invokeWrite0(msg, promise)这一步

    AbstractChannelHandlerContext.invokeWrite0(Object msg, ChannelPromise promise)
    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            //调用当前handler的wirte()方法
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

    这里的逻辑也似曾相识, 调用了当前节点包装的handlerwrite方法, 如果用户没有重写write方法, 则会交给其父类处理

    ChannelOutboundHandlerAdapter.write
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    这里调用了当前ctxwrite方法, 这种写法和我们小节开始的写法是相同的, 我们回顾一下

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //写法1
        ctx.channel().write("test data");
        //写法2
        ctx.write("test data");
    }

    我们跟到其write方法中, 这里走到的是AbstractChannelHandlerContext类的write方法

    AbstractChannelHandlerContext.write(Object msg, boolean flush, ChannelPromise promise)
    private void write(Object msg, boolean flush, ChannelPromise promise) { 
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                //没有调flush
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

    又是我们所熟悉逻辑, 找到当前节点的上一个标注事件为outbound事件的节点, 继续执行invokeWrite方法, 根据之前的剖析, 我们知道最终会执行到上一个handlerwrite方法中。

    走到这里已经不难理解, ctx.channel().write("test data")其实是从tail节点开始传播写事件, 而ctx.write("test data")是从自身开始传播写事件。

    所以, 在handler中如果重写了write方法要传递write事件, 一定采用ctx.write("test data")这种方式或者交给其父类处理处理, 而不能采用ctx.channel().write("test data")这种方式, 因为会造成每次事件传输到这里都会从tail节点重新传输, 导致不可预知的错误。

    如果用代码中没有重写handlerwrite方法, 则事件会一直往上传输, 当传输完所有的outbound节点之后, 最后会走到head节点的wirte方法中。

    HeadContext.write
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }

    我们看到write事件最终会流向这里, 通过unsafe对象进行最终的写操作

    inbound事件和outbound事件的传输流程图

    netty pipeline中的inbound和outbound事件怎么传播

    关于“netty pipeline中的inbound和outbound事件怎么传播”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“netty pipeline中的inbound和outbound事件怎么传播”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注亿速云行业资讯频道。

    向AI问一下细节

    免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

    AI