当前正在阅读
Java NIO 网络编程
预计阅读时间: 4 小时
  知识库提供的所有文档均为本站版权所有,禁止任何未经授权的个人或企业发布、传播、售卖本站提供的文档,如经发现,本站有权起诉侵权方并追究法律责任。

Channel详解

在学习NIO时,我们就已经接触到Channel了,我们可以通过通道来进行数据的传输,并且通道支持双向传输。

而在Netty中,也有对应的Channel类型:

java 复制代码
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
    ChannelId id();   //通道ID
    EventLoop eventLoop();   //获取此通道所属的EventLoop,因为一个Channel在它的生命周期内只能注册到一个EventLoop中
    Channel parent();   //Channel是具有层级关系的,这里是返回父Channel
    ChannelConfig config();
    boolean isOpen();   //通道当前的相关状态
    boolean isRegistered();
    boolean isActive();
    ChannelMetadata metadata();   //通道相关信息
    SocketAddress localAddress(); 
    SocketAddress remoteAddress();
    ChannelFuture closeFuture();  //关闭通道,但是会用到ChannelFuture,后面说
    boolean isWritable();
    long bytesBeforeUnwritable();
    long bytesBeforeWritable();
    Unsafe unsafe();
    ChannelPipeline pipeline();   //流水线,之后也会说
    ByteBufAllocator alloc();   //可以直接从Channel拿到ByteBufAllocator的实例,来分配ByteBuf
    Channel read();
    Channel flush();   //刷新,基操
}

可以看到,Netty中的Channel相比NIO功能就多得多了。Netty中的Channel主要特点如下:

  • 所有的IO操作都是异步的,并不是在当前线程同步运行,方法调用之后就直接返回了,那怎么获取操作的结果呢?还记得我们在前面JUC篇教程中学习的Future吗,没错,这里的ChannelFuture也是干这事的。

我们可以来看一下Channel接口的父接口ChannelOutboundInvoker接口,这里面定义了大量的I/O操作:

java 复制代码
public interface ChannelOutboundInvoker {   //通道出站调用(包含大量的网络出站操作,比如写)
    ChannelFuture bind(SocketAddress var1);  //Socket绑定、连接、断开、关闭等操作
    ChannelFuture connect(SocketAddress var1);
    ChannelFuture connect(SocketAddress var1, SocketAddress var2);
    ChannelFuture disconnect();
    ChannelFuture close();
    ChannelFuture deregister();
    ChannelFuture bind(SocketAddress var1, ChannelPromise var2);    //下面这一系列还有附带ChannelPromise的,ChannelPromise我们后面再说,其实就是ChannelFuture的增强版
    ChannelFuture connect(SocketAddress var1, ChannelPromise var2);
    ChannelFuture connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
    ChannelFuture disconnect(ChannelPromise var1);
    ChannelFuture close(ChannelPromise var1);
    ChannelFuture deregister(ChannelPromise var1);
    ChannelOutboundInvoker read();
  
    ChannelFuture write(Object var1);    //可以看到这些常见的写操作,都是返回的ChannelFuture,而不是直接给结果
    ChannelFuture write(Object var1, ChannelPromise var2);
    ChannelOutboundInvoker flush();
    ChannelFuture writeAndFlush(Object var1, ChannelPromise var2);
    ChannelFuture writeAndFlush(Object var1);
  
    ChannelPromise newPromise();   //其他的暂时不提
    ChannelProgressivePromise newProgressivePromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable var1);
    ChannelPromise voidPromise();
}

当然它还实现了AttributeMap接口,其实有点类似于Session那种感觉,我们可以添加一些属性之类的:

java 复制代码
public interface AttributeMap {
    <T> Attribute<T> attr(AttributeKey<T> var1);

    <T> boolean hasAttr(AttributeKey<T> var1);
}

我们了解了Netty底层的Channel之后,我们接着来看ChannelHandler,既然现在有了通道,那么怎么进行操作呢?我们可以将需要处理的事情放在ChannelHandler中,ChannelHandler充当了所有入站和出站数据的应用程序逻辑的容器,实际上就是我们之前Reactor模式中的Handler,全靠它来处理读写操作。

不过这里不仅仅是一个简单的ChannelHandler在进行处理,而是一整套流水线,我们之后会介绍ChannelPipeline。

比如我们上面就是使用了ChannelInboundHandlerAdapter抽象类,它是ChannelInboundHandler接口的实现,用于处理入站数据,可以看到我们实际上就是通过重写对应的方法来进行处理,这些方法会在合适的时间被调用:

java 复制代码
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {  
      	//ctx是上下文,msg是收到的消息,以ByteBuf形式
        ByteBuf buf = (ByteBuf) msg;   //类型转换一下
        System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
        //通过上下文可以直接发送数据回去,注意要writeAndFlush才能让客户端立即收到
        ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
    }
});

我们先从顶层接口开始看起:

java 复制代码
public interface ChannelHandler {
  	//当ChannelHandler被添加到流水线中时调用
    void handlerAdded(ChannelHandlerContext var1) throws Exception;
		//当ChannelHandler从流水线中移除时调用
    void handlerRemoved(ChannelHandlerContext var1) throws Exception;

    /** @deprecated 已过时那咱就不管了 */
    @Deprecated
    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;

    @Inherited
    @Documented
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    public @interface Sharable {
    }
}

顶层接口的定义比较简单,就只有一些流水线相关的回调方法,我们接着来看下一级:

java 复制代码
//ChannelInboundHandler用于处理入站相关事件
public interface ChannelInboundHandler extends ChannelHandler {
  	//当Channel已经注册到自己的EventLoop上时调用,前面我们说了,一个Channel只会注册到一个EventLoop上,注册到EventLoop后,这样才会在发生对应事件时被通知。
    void channelRegistered(ChannelHandlerContext var1) throws Exception;
		//从EventLoop上取消注册时
    void channelUnregistered(ChannelHandlerContext var1) throws Exception;
		//当Channel已经处于活跃状态时被调用,此时Channel已经连接/绑定,并且已经就绪
    void channelActive(ChannelHandlerContext var1) throws Exception;
		//跟上面相反,不再活跃了,并且不在连接它的远程节点
    void channelInactive(ChannelHandlerContext var1) throws Exception;
		//当从Channel读取数据时被调用,可以看到数据被自动包装成了一个Object(默认是ByteBuf)
    void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
		//上一个读取操作完成后调用
    void channelReadComplete(ChannelHandlerContext var1) throws Exception;
		//暂时不介绍
    void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
		//当Channel的可写状态发生改变时被调用
    void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
		//出现异常时被调用
    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}

而我们上面用到的ChannelInboundHandlerAdapter实际上就是对这些方法实现的抽象类,相比直接用接口,我们可以只重写我们需要的方法,没有重写的方法会默认向流水线下一个ChannelHandler发送。

我们来测试一下吧:

java 复制代码
public class TestChannelHandler extends ChannelInboundHandlerAdapter {

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered");
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive");
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
        //这次我们就直接使用ctx.alloc()来生成缓冲区
        ByteBuf back = ctx.alloc().buffer();
        back.writeCharSequence("已收到!", StandardCharsets.UTF_8);
        ctx.writeAndFlush(back);
        System.out.println("channelRead");
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete");
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("userEventTriggered");
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelWritabilityChanged");
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught"+cause);
    }
}
java 复制代码
public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
      			//ChannelInitializer是一个特殊的ChannelHandler,它本身不处理任何出站/入站事件,它的目的仅仅是完成Channel的初始化
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) {
                    //将我们自定义的ChannelHandler添加到流水线
                    channel.pipeline().addLast(new TestChannelHandler());
                }
            });
    bootstrap.bind(8080);
}

现在我们启动服务器,让客户端来连接并发送一下数据试试看:

image-20230306174158886

可以看到ChannelInboundHandler的整个生命周期,首先是Channel注册成功,然后才会变成可用状态,接着就差不多可以等待客户端来数据了,当客户端主动断开连接时,会再次触发一次channelReadComplete,然后不可用,最后取消注册。

我们来测试一下出现异常的情况呢?

java 复制代码
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
    ByteBuf back = ctx.alloc().buffer();
    back.writeCharSequence("已收到!", StandardCharsets.UTF_8);
    ctx.writeAndFlush(back);
    System.out.println("channelRead");
    throw new RuntimeException("我是自定义异常1");  //弄点异常上去
}

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channelReadComplete");
    throw new RuntimeException("我是自定义异常2");   //弄点异常上去
}

...

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("exceptionCaught"+cause);
}

可以看到发生异常时,会接着调用exceptionCaught方法:

image-20230306174211952

与ChannelInboundHandler对应的还有ChannelOutboundHandler用于处理出站相关的操作,这里就不进行演示了。

我们接着来看看ChannelPipeline,每一个Channel都对应一个ChannelPipeline(在Channel初始化时就被创建了)

image-20230306174211952

它就像是一条流水线一样,整条流水线上可能会有很多个Handler(包括入站和出站),整条流水线上的两端还有两个默认的处理器(用于一些预置操作和后续操作,比如释放资源等),我们只需要关心如何安排这些自定义的Handler即可,比如我们现在希望创建两个入站ChannelHandler,一个用于接收请求并处理,还有一个用于处理当前接收请求过程中出现的异常:

java 复制代码
.childHandler(new ChannelInitializer<SocketChannel>() {   //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
    @Override
    protected void initChannel(SocketChannel channel) {
        channel.pipeline()   //直接获取pipeline,然后添加两个Handler,注意顺序
                .addLast(new ChannelInboundHandlerAdapter(){   //第一个用于处理消息接收
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                        throw new RuntimeException("我是异常");
                    }
                })
                .addLast(new ChannelInboundHandlerAdapter(){   //第二个用于处理异常
                    @Override
                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                        System.out.println("我是异常处理:"+cause);
                    }
                });
    }
});

那么它是如何运作的呢?实际上如果我们不在ChannelInboundHandlerAdapter中重写对应的方法,它会默认传播到流水线的下一个ChannelInboundHandlerAdapter进行处理,比如:

java 复制代码
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.fireExceptionCaught(cause);   //通过ChannelHandlerContext来向下传递,ChannelHandlerContext是在Handler添加进Pipeline中时就被自动创建的
}

比如我们现在需要将一个消息在两个Handler中进行处理:

java 复制代码
@Override
protected void initChannel(SocketChannel channel) {
    channel.pipeline()   //直接获取pipeline,然后添加两个Handler
            .addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                    ctx.fireChannelRead(msg);   //通过ChannelHandlerContext
                }
            })
            .addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                }
            });
}

我们接着来看看出站相关操作,我们可以使用ChannelOutboundHandlerAdapter来完成:

java 复制代码
@Override
protected void initChannel(SocketChannel channel) {
    channel.pipeline()
            .addLast(new ChannelOutboundHandlerAdapter(){   
              //注意出栈站操作应该在入站操作的前面,当我们使用ChannelHandlerContext的write方法时,是从流水线的当前位置倒着往前找下一个ChannelOutboundHandlerAdapter,而我们之前使用的ChannelInboundHandlerAdapter是从前往后找下一个,如果我们使用的是Channel的write方法,那么会从整个流水线的最后开始倒着往前找ChannelOutboundHandlerAdapter,一定要注意顺序。
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  //当执行write操作时,会
                    System.out.println(msg);   //write的是啥,这里就是是啥
                  	//我们将其转换为ByteBuf,这样才能发送回客户端
                    ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.toString().getBytes()));
                }
            })
            .addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                    ctx.fireChannelRead(msg);
                }
            })
            .addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                    ctx.writeAndFlush("不会吧不会吧,不会还有人都看到这里了还没三连吧");   //这里可以write任何对象
                  	//ctx.channel().writeAndFlush("啊对对对"); 或是通过Channel进行write也可以
                }
            });
}

现在我们来试试看,搞两个出站的Handler,验证一下是不是上面的样子:

java 复制代码
@Override
protected void initChannel(SocketChannel channel) {
    channel.pipeline()   //直接获取pipeline,然后添加两个Handler
            .addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                    ctx.fireChannelRead(msg);
                }
            })
            .addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                    ctx.channel().writeAndFlush("伞兵一号卢本伟");  //这里我们使用channel的write
                }
            })
            .addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    System.out.println("1号出站:"+msg);
                }
            })
            .addLast(new ChannelOutboundHandlerAdapter(){
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    System.out.println("2号出站:"+msg);
                    ctx.write(msg);  //继续write给其他的出站Handler,不然到这里就断了
                }
            });
}

所以,出站操作在流水线上是反着来的,整个流水线操作大概流程如下:

image-20230306174237906

有关Channel及其处理相关操作,就先讲到这里。

EventLoop和任务调度

前面我们讲解了Channel,那么在EventLoop中具体是如何进行调度的呢?实际上我们之前在编写NIO的时候,就是一个while循环在源源不断地等待新的事件,而EventLoop也正是这种思想,它本质就是一个事件等待/处理线程。

image-20230306174245836

我们上面使用的就是EventLoopGroup,包含很多个EventLoop,我们每创建一个连接,就需要绑定到一个EventLoop上,之后EventLoop就会开始监听这个连接(只要连接不关闭,一直都是这个EventLoop负责此Channel),而一个EventLoop可以同时监听很多个Channel,实际上就是我们之前学习的Selector罢了。

当然,EventLoop并不只是用于网络操作的,我们前面所说的EventLoop其实都是NioEventLoop,它是专用于网络通信的,除了网络通信之外,我们也可以使用普通的EventLoop来处理一些其他的事件。

比如我们现在编写的服务端,虽然结构上和主从Reactor多线程模型差不多,但是我们发现,Handler似乎是和读写操作在一起进行的,而我们之前所说的模型中,Handler是在读写之外的单独线程中进行的:

java 复制代码
public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1);  //线程数先限制一下
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap
            .group(bossGroup, workerGroup)   //指定事件循环组
            .channel(NioServerSocketChannel.class)   //指定为NIO的ServerSocketChannel
            .childHandler(new ChannelInitializer<SocketChannel>() {   //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
                @Override
                protected void initChannel(SocketChannel channel) {
                    channel.pipeline()
                            .addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                                    Thread.sleep(10000);   //这里我们直接卡10秒假装在处理任务
                                    ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
                                }
                            });
                }
            });
    bootstrap.bind(8080);
}

可以看到,如果在这里卡住了,那么就没办法处理EventLoop绑定的其他Channel了,所以我们这里就创建一个普通的EventLoop来专门处理读写之外的任务:

java 复制代码
public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1);  //线程数先限制一下
    EventLoopGroup handlerGroup = new DefaultEventLoopGroup();  //使用DefaultEventLoop来处理其他任务
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) {
                    channel.pipeline()
                            .addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                  	System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                                    handlerGroup.submit(() -> {   
                                //由于继承自ScheduledExecutorService,我们直接提交任务就行了,是不是感觉贼方便
                                        try {
                                            Thread.sleep(10000);
                                        } catch (InterruptedException e) {
                                            throw new RuntimeException(e);
                                        }
                                        ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
                                    });
                                }
                            });
                }
            });
    bootstrap.bind(8080);
}

当然我们也可以写成一条流水线:

java 复制代码
public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1);  //线程数先限制一下
    EventLoopGroup handlerGroup = new DefaultEventLoopGroup();  //使用DefaultEventLoop来处理其他任务
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) {
                    channel.pipeline()
                            .addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                                    ctx.fireChannelRead(msg);
                                }
                            }).addLast(handlerGroup, new ChannelInboundHandlerAdapter(){  //在添加时,可以直接指定使用哪个EventLoopGroup
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    try {
                                        Thread.sleep(10000);
                                    } catch (InterruptedException e) {
                                        throw new RuntimeException(e);
                                    }
                                    ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
                                }
                            });
                }
            });
    bootstrap.bind(8080);
}

这样,我们就进一步地将EventLoop利用起来了。

按照前面服务端的方式,我们来把Netty版本的客户端也给写了:

java 复制代码
public static void main(String[] args) {
    Bootstrap bootstrap = new Bootstrap();   //客户端也是使用Bootstrap来启动
    bootstrap
            .group(new NioEventLoopGroup())   //客户端就没那么麻烦了,直接一个EventLoop就行,用于处理发回来的数据
            .channel(NioSocketChannel.class)   //客户端肯定就是使用SocketChannel了
            .handler(new ChannelInitializer<SocketChannel>() {   //这里的数据处理方式和服务端是一样的
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){   
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf buf = (ByteBuf) msg;
                            System.out.println(">> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                        }
                    });
                }
            });
    Channel channel = bootstrap.connect("localhost", 8080).channel();  //连接后拿到对应的Channel对象
  	//注意上面连接操作是异步的,调用之后会继续往下走,下面我们就正式编写客户端的数据发送代码了
    try(Scanner scanner = new Scanner(System.in)){    //还是和之前一样,扫了就发
        while (true) {
            System.out.println("<< 请输入要发送给服务端的内容:");
            String text = scanner.nextLine();
            if(text.isEmpty()) continue;
            channel.writeAndFlush(Unpooled.wrappedBuffer(text.getBytes()));  //通过Channel对象发送数据
        }
    }
}

我们来测试一下吧:

image-20230306174303352

Future和Promise

我们接着来看ChannelFuture,前面我们提到,Netty中Channel的相关操作都是异步进行的,并不是在当前线程同步执行,我们不能立即得到执行结果,如果需要得到结果,那么我们就必须要利用到Future。

我们先来看看ChannelFutuer接口怎么定义的:

java 复制代码
public interface ChannelFuture extends Future<Void> {
    Channel channel();    //我们可以直接获取此任务的Channel
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1);  //当任务完成时,会直接执行GenericFutureListener的任务,注意执行的位置也是在EventLoop中
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
    ChannelFuture sync() throws InterruptedException;   //在当前线程同步等待异步任务完成,任务失败会抛出异常
    ChannelFuture syncUninterruptibly();   //同上,但是无法响应中断
    ChannelFuture await() throws InterruptedException;  //同上,但是任务中断不会抛出异常,需要手动判断
    ChannelFuture awaitUninterruptibly();    //不用我说了吧?
    boolean isVoid();   //返回类型是否为void
}

此接口是继承自Netty中的Future接口的(不是JDK的那个):

java 复制代码
public interface Future<V> extends java.util.concurrent.Future<V> {   //再往上才是JDK的Future
    boolean isSuccess();    //用于判断任务是否执行成功的
    boolean isCancellable();
    Throwable cause();    //获取导致任务失败的异常
    
  	...
    
    V getNow();  //立即获取结果,如果还未产生结果,得到null,不过ChannelFuture定义V为Void,就算完成了获取也是null
    boolean cancel(boolean var1);    //取消任务
}

Channel的很多操作都是异步完成的,直接返回一个ChannelFuture,比如Channel的write操作,返回的就是一个ChannelFuture对象:

java 复制代码
.addLast(new ChannelInboundHandlerAdapter(){
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
        ChannelFuture future = ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
        System.out.println("任务完成状态:"+future.isDone());   //通过ChannelFuture来获取相关信息
    }
});

包括我们的服务端启动也是返回的ChannelFuture:

java 复制代码
...
								}
            });
    ChannelFuture future = bootstrap.bind(8080);
    System.out.println("服务端启动状态:"+future.isDone());
    System.out.println("我是服务端启动完成之后要做的事情!");
}

可以看到,服务端的启动就比较慢了,所以在一开始直接获取状态会返回false,但是这个时候我们又需要等到服务端启动完成之后做一些事情,这个时候该怎么办呢?现在我们就有两种方案了:

java 复制代码
                }
            });
    ChannelFuture future = bootstrap.bind(8080);
    future.sync();   //让当前线程同步等待任务完成
    System.out.println("服务端启动状态:"+future.isDone());
    System.out.println("我是服务端启动完成之后要做的事情!");
}

第一种方案是直接让当前线程同步等待异步任务完成,我们可以使用sync()方法,这样当前线程会一直阻塞直到任务结束。第二种方案是添加一个监听器,等待任务完成时通知:

java 复制代码
                }
            });
    ChannelFuture future = bootstrap.bind(8080);
		//直接添加监听器,当任务完成时自动执行,但是注意执行也是异步的,不是在当前线程
    future.addListener(f -> System.out.println("我是服务端启动完成之后要做的事情!"));
}

包括客户端的关闭,也是异步进行的:

java 复制代码
try(Scanner scanner = new Scanner(System.in)){
    while (true) {
        System.out.println("<< 请输入要发送给服务端的内容:");
        String text = scanner.nextLine();
        if(text.isEmpty()) continue;
        if(text.equals("exit")) {   //输入exit就退出
            ChannelFuture future = channel.close();
            future.sync();    //等待Channel完全关闭
            break;
        }
        channel.writeAndFlush(Unpooled.wrappedBuffer(text.getBytes()));
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);
} finally {
    group.shutdownGracefully();   //优雅退出EventLoop,其实就是把还没发送的数据之类的事情做完,当然也可以shutdownNow立即关闭
}

我们接着来看看Promise接口,它支持手动设定成功和失败的结果:

java 复制代码
//此接口也是继承自Netty中的Future接口
public interface Promise<V> extends Future<V> {
    Promise<V> setSuccess(V var1);    //手动设定成功
    boolean trySuccess(V var1);
    Promise<V> setFailure(Throwable var1);  //手动设定失败
    boolean tryFailure(Throwable var1);
    boolean setUncancellable();
		//这些就和之前的Future是一样的了
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);
    Promise<V> await() throws InterruptedException;
    Promise<V> awaitUninterruptibly();
    Promise<V> sync() throws InterruptedException;
    Promise<V> syncUninterruptibly();
}

比如我们来测试一下:

java 复制代码
public static void main(String[] args) throws ExecutionException, InterruptedException {
    Promise<String> promise = new DefaultPromise<>(new DefaultEventLoop());
    System.out.println(promise.isSuccess());    //在一开始肯定不是成功的
    promise.setSuccess("lbwnb");    //设定成功
    System.out.println(promise.isSuccess());   //再次获取,可以发现确实成功了
    System.out.println(promise.get());    //获取结果,就是我们刚刚给进去的
}

可以看到我们可以手动指定成功状态,包括ChannelOutboundInvoker中的一些基本操作,都是支持ChannelPromise的:

java 复制代码
.addLast(new ChannelInboundHandlerAdapter(){
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        String text = buf.toString(StandardCharsets.UTF_8);
        System.out.println("接收到客户端发送的数据:"+text);
        ChannelPromise promise = new DefaultChannelPromise(channel);
        System.out.println(promise.isSuccess());
        ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()), promise);
        promise.sync();  //同步等待一下
        System.out.println(promise.isSuccess());
    }
});

最后结果就是我们想要的了,当然我们也可以像Future那样添加监听器,当成功时自动通知:

java 复制代码
public static void main(String[] args) throws ExecutionException, InterruptedException {
    Promise<String> promise = new DefaultPromise<>(new DefaultEventLoop()); 
    promise.addListener(f -> System.out.println(promise.get()));   //注意是在上面的DefaultEventLoop执行的
    System.out.println(promise.isSuccess());
    promise.setSuccess("lbwnb");
    System.out.println(promise.isSuccess());
}

有关Future和Promise就暂时讲解到这里。

大纲 (于 2025年1月1日 更新)
正在加载页面,请稍后...