在学习NIO时,我们就已经接触到Channel了,我们可以通过通道来进行数据的传输,并且通道支持双向传输。
而在Netty中,也有对应的Channel类型:
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
ChannelId id(); //通道ID
EventLoop eventLoop(); //获取此通道所属的EventLoop,因为一个Channel在它的生命周期内只能注册到一个EventLoop中
Channel parent(); //Channel是具有层级关系的,这里是返回父Channel
ChannelConfig config();
boolean isOpen(); //通道当前的相关状态
boolean isRegistered();
boolean isActive();
ChannelMetadata metadata(); //通道相关信息
SocketAddress localAddress();
SocketAddress remoteAddress();
ChannelFuture closeFuture(); //关闭通道,但是会用到ChannelFuture,后面说
boolean isWritable();
long bytesBeforeUnwritable();
long bytesBeforeWritable();
Unsafe unsafe();
ChannelPipeline pipeline(); //流水线,之后也会说
ByteBufAllocator alloc(); //可以直接从Channel拿到ByteBufAllocator的实例,来分配ByteBuf
Channel read();
Channel flush(); //刷新,基操
}
可以看到,Netty中的Channel相比NIO功能就多得多了。Netty中的Channel主要特点如下:
我们可以来看一下Channel接口的父接口ChannelOutboundInvoker接口,这里面定义了大量的I/O操作:
public interface ChannelOutboundInvoker { //通道出站调用(包含大量的网络出站操作,比如写)
ChannelFuture bind(SocketAddress var1); //Socket绑定、连接、断开、关闭等操作
ChannelFuture connect(SocketAddress var1);
ChannelFuture connect(SocketAddress var1, SocketAddress var2);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress var1, ChannelPromise var2); //下面这一系列还有附带ChannelPromise的,ChannelPromise我们后面再说,其实就是ChannelFuture的增强版
ChannelFuture connect(SocketAddress var1, ChannelPromise var2);
ChannelFuture connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
ChannelFuture disconnect(ChannelPromise var1);
ChannelFuture close(ChannelPromise var1);
ChannelFuture deregister(ChannelPromise var1);
ChannelOutboundInvoker read();
ChannelFuture write(Object var1); //可以看到这些常见的写操作,都是返回的ChannelFuture,而不是直接给结果
ChannelFuture write(Object var1, ChannelPromise var2);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object var1, ChannelPromise var2);
ChannelFuture writeAndFlush(Object var1);
ChannelPromise newPromise(); //其他的暂时不提
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable var1);
ChannelPromise voidPromise();
}
当然它还实现了AttributeMap接口,其实有点类似于Session那种感觉,我们可以添加一些属性之类的:
public interface AttributeMap {
<T> Attribute<T> attr(AttributeKey<T> var1);
<T> boolean hasAttr(AttributeKey<T> var1);
}
我们了解了Netty底层的Channel之后,我们接着来看ChannelHandler,既然现在有了通道,那么怎么进行操作呢?我们可以将需要处理的事情放在ChannelHandler中,ChannelHandler充当了所有入站和出站数据的应用程序逻辑的容器,实际上就是我们之前Reactor模式中的Handler,全靠它来处理读写操作。
不过这里不仅仅是一个简单的ChannelHandler在进行处理,而是一整套流水线,我们之后会介绍ChannelPipeline。
比如我们上面就是使用了ChannelInboundHandlerAdapter抽象类,它是ChannelInboundHandler接口的实现,用于处理入站数据,可以看到我们实际上就是通过重写对应的方法来进行处理,这些方法会在合适的时间被调用:
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//ctx是上下文,msg是收到的消息,以ByteBuf形式
ByteBuf buf = (ByteBuf) msg; //类型转换一下
System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
//通过上下文可以直接发送数据回去,注意要writeAndFlush才能让客户端立即收到
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
}
});
我们先从顶层接口开始看起:
public interface ChannelHandler {
//当ChannelHandler被添加到流水线中时调用
void handlerAdded(ChannelHandlerContext var1) throws Exception;
//当ChannelHandler从流水线中移除时调用
void handlerRemoved(ChannelHandlerContext var1) throws Exception;
/** @deprecated 已过时那咱就不管了 */
@Deprecated
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
@Inherited
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface Sharable {
}
}
顶层接口的定义比较简单,就只有一些流水线相关的回调方法,我们接着来看下一级:
//ChannelInboundHandler用于处理入站相关事件
public interface ChannelInboundHandler extends ChannelHandler {
//当Channel已经注册到自己的EventLoop上时调用,前面我们说了,一个Channel只会注册到一个EventLoop上,注册到EventLoop后,这样才会在发生对应事件时被通知。
void channelRegistered(ChannelHandlerContext var1) throws Exception;
//从EventLoop上取消注册时
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
//当Channel已经处于活跃状态时被调用,此时Channel已经连接/绑定,并且已经就绪
void channelActive(ChannelHandlerContext var1) throws Exception;
//跟上面相反,不再活跃了,并且不在连接它的远程节点
void channelInactive(ChannelHandlerContext var1) throws Exception;
//当从Channel读取数据时被调用,可以看到数据被自动包装成了一个Object(默认是ByteBuf)
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
//上一个读取操作完成后调用
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
//暂时不介绍
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
//当Channel的可写状态发生改变时被调用
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
//出现异常时被调用
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}
而我们上面用到的ChannelInboundHandlerAdapter实际上就是对这些方法实现的抽象类,相比直接用接口,我们可以只重写我们需要的方法,没有重写的方法会默认向流水线下一个ChannelHandler发送。
我们来测试一下吧:
public class TestChannelHandler extends ChannelInboundHandlerAdapter {
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered");
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
//这次我们就直接使用ctx.alloc()来生成缓冲区
ByteBuf back = ctx.alloc().buffer();
back.writeCharSequence("已收到!", StandardCharsets.UTF_8);
ctx.writeAndFlush(back);
System.out.println("channelRead");
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete");
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("userEventTriggered");
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelWritabilityChanged");
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught"+cause);
}
}
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//ChannelInitializer是一个特殊的ChannelHandler,它本身不处理任何出站/入站事件,它的目的仅仅是完成Channel的初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
//将我们自定义的ChannelHandler添加到流水线
channel.pipeline().addLast(new TestChannelHandler());
}
});
bootstrap.bind(8080);
}
现在我们启动服务器,让客户端来连接并发送一下数据试试看:
可以看到ChannelInboundHandler的整个生命周期,首先是Channel注册成功,然后才会变成可用状态,接着就差不多可以等待客户端来数据了,当客户端主动断开连接时,会再次触发一次channelReadComplete
,然后不可用,最后取消注册。
我们来测试一下出现异常的情况呢?
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ByteBuf back = ctx.alloc().buffer();
back.writeCharSequence("已收到!", StandardCharsets.UTF_8);
ctx.writeAndFlush(back);
System.out.println("channelRead");
throw new RuntimeException("我是自定义异常1"); //弄点异常上去
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete");
throw new RuntimeException("我是自定义异常2"); //弄点异常上去
}
...
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught"+cause);
}
可以看到发生异常时,会接着调用exceptionCaught
方法:
与ChannelInboundHandler对应的还有ChannelOutboundHandler用于处理出站相关的操作,这里就不进行演示了。
我们接着来看看ChannelPipeline,每一个Channel都对应一个ChannelPipeline(在Channel初始化时就被创建了)
它就像是一条流水线一样,整条流水线上可能会有很多个Handler(包括入站和出站),整条流水线上的两端还有两个默认的处理器(用于一些预置操作和后续操作,比如释放资源等),我们只需要关心如何安排这些自定义的Handler即可,比如我们现在希望创建两个入站ChannelHandler,一个用于接收请求并处理,还有一个用于处理当前接收请求过程中出现的异常:
.childHandler(new ChannelInitializer<SocketChannel>() { //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline() //直接获取pipeline,然后添加两个Handler,注意顺序
.addLast(new ChannelInboundHandlerAdapter(){ //第一个用于处理消息接收
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
throw new RuntimeException("我是异常");
}
})
.addLast(new ChannelInboundHandlerAdapter(){ //第二个用于处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("我是异常处理:"+cause);
}
});
}
});
那么它是如何运作的呢?实际上如果我们不在ChannelInboundHandlerAdapter中重写对应的方法,它会默认传播到流水线的下一个ChannelInboundHandlerAdapter进行处理,比如:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause); //通过ChannelHandlerContext来向下传递,ChannelHandlerContext是在Handler添加进Pipeline中时就被自动创建的
}
比如我们现在需要将一个消息在两个Handler中进行处理:
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline() //直接获取pipeline,然后添加两个Handler
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg); //通过ChannelHandlerContext
}
})
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
}
});
}
我们接着来看看出站相关操作,我们可以使用ChannelOutboundHandlerAdapter来完成:
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new ChannelOutboundHandlerAdapter(){
//注意出栈站操作应该在入站操作的前面,当我们使用ChannelHandlerContext的write方法时,是从流水线的当前位置倒着往前找下一个ChannelOutboundHandlerAdapter,而我们之前使用的ChannelInboundHandlerAdapter是从前往后找下一个,如果我们使用的是Channel的write方法,那么会从整个流水线的最后开始倒着往前找ChannelOutboundHandlerAdapter,一定要注意顺序。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { //当执行write操作时,会
System.out.println(msg); //write的是啥,这里就是是啥
//我们将其转换为ByteBuf,这样才能发送回客户端
ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.toString().getBytes()));
}
})
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg);
}
})
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.writeAndFlush("不会吧不会吧,不会还有人都看到这里了还没三连吧"); //这里可以write任何对象
//ctx.channel().writeAndFlush("啊对对对"); 或是通过Channel进行write也可以
}
});
}
现在我们来试试看,搞两个出站的Handler,验证一下是不是上面的样子:
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline() //直接获取pipeline,然后添加两个Handler
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("1接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg);
}
})
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("2接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.channel().writeAndFlush("伞兵一号卢本伟"); //这里我们使用channel的write
}
})
.addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("1号出站:"+msg);
}
})
.addLast(new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("2号出站:"+msg);
ctx.write(msg); //继续write给其他的出站Handler,不然到这里就断了
}
});
}
所以,出站操作在流水线上是反着来的,整个流水线操作大概流程如下:
有关Channel及其处理相关操作,就先讲到这里。
前面我们讲解了Channel,那么在EventLoop中具体是如何进行调度的呢?实际上我们之前在编写NIO的时候,就是一个while循环在源源不断地等待新的事件,而EventLoop也正是这种思想,它本质就是一个事件等待/处理线程。
我们上面使用的就是EventLoopGroup,包含很多个EventLoop,我们每创建一个连接,就需要绑定到一个EventLoop上,之后EventLoop就会开始监听这个连接(只要连接不关闭,一直都是这个EventLoop负责此Channel),而一个EventLoop可以同时监听很多个Channel,实际上就是我们之前学习的Selector罢了。
当然,EventLoop并不只是用于网络操作的,我们前面所说的EventLoop其实都是NioEventLoop,它是专用于网络通信的,除了网络通信之外,我们也可以使用普通的EventLoop来处理一些其他的事件。
比如我们现在编写的服务端,虽然结构上和主从Reactor多线程模型差不多,但是我们发现,Handler似乎是和读写操作在一起进行的,而我们之前所说的模型中,Handler是在读写之外的单独线程中进行的:
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); //线程数先限制一下
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup) //指定事件循环组
.channel(NioServerSocketChannel.class) //指定为NIO的ServerSocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() { //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
Thread.sleep(10000); //这里我们直接卡10秒假装在处理任务
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
}
});
}
});
bootstrap.bind(8080);
}
可以看到,如果在这里卡住了,那么就没办法处理EventLoop绑定的其他Channel了,所以我们这里就创建一个普通的EventLoop来专门处理读写之外的任务:
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); //线程数先限制一下
EventLoopGroup handlerGroup = new DefaultEventLoopGroup(); //使用DefaultEventLoop来处理其他任务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
handlerGroup.submit(() -> {
//由于继承自ScheduledExecutorService,我们直接提交任务就行了,是不是感觉贼方便
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
});
}
});
}
});
bootstrap.bind(8080);
}
当然我们也可以写成一条流水线:
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(1); //线程数先限制一下
EventLoopGroup handlerGroup = new DefaultEventLoopGroup(); //使用DefaultEventLoop来处理其他任务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg);
}
}).addLast(handlerGroup, new ChannelInboundHandlerAdapter(){ //在添加时,可以直接指定使用哪个EventLoopGroup
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
}
});
}
});
bootstrap.bind(8080);
}
这样,我们就进一步地将EventLoop利用起来了。
按照前面服务端的方式,我们来把Netty版本的客户端也给写了:
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap(); //客户端也是使用Bootstrap来启动
bootstrap
.group(new NioEventLoopGroup()) //客户端就没那么麻烦了,直接一个EventLoop就行,用于处理发回来的数据
.channel(NioSocketChannel.class) //客户端肯定就是使用SocketChannel了
.handler(new ChannelInitializer<SocketChannel>() { //这里的数据处理方式和服务端是一样的
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(">> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).channel(); //连接后拿到对应的Channel对象
//注意上面连接操作是异步的,调用之后会继续往下走,下面我们就正式编写客户端的数据发送代码了
try(Scanner scanner = new Scanner(System.in)){ //还是和之前一样,扫了就发
while (true) {
System.out.println("<< 请输入要发送给服务端的内容:");
String text = scanner.nextLine();
if(text.isEmpty()) continue;
channel.writeAndFlush(Unpooled.wrappedBuffer(text.getBytes())); //通过Channel对象发送数据
}
}
}
我们来测试一下吧:
我们接着来看ChannelFuture,前面我们提到,Netty中Channel的相关操作都是异步进行的,并不是在当前线程同步执行,我们不能立即得到执行结果,如果需要得到结果,那么我们就必须要利用到Future。
我们先来看看ChannelFutuer接口怎么定义的:
public interface ChannelFuture extends Future<Void> {
Channel channel(); //我们可以直接获取此任务的Channel
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1); //当任务完成时,会直接执行GenericFutureListener的任务,注意执行的位置也是在EventLoop中
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... var1);
ChannelFuture sync() throws InterruptedException; //在当前线程同步等待异步任务完成,任务失败会抛出异常
ChannelFuture syncUninterruptibly(); //同上,但是无法响应中断
ChannelFuture await() throws InterruptedException; //同上,但是任务中断不会抛出异常,需要手动判断
ChannelFuture awaitUninterruptibly(); //不用我说了吧?
boolean isVoid(); //返回类型是否为void
}
此接口是继承自Netty中的Future接口的(不是JDK的那个):
public interface Future<V> extends java.util.concurrent.Future<V> { //再往上才是JDK的Future
boolean isSuccess(); //用于判断任务是否执行成功的
boolean isCancellable();
Throwable cause(); //获取导致任务失败的异常
...
V getNow(); //立即获取结果,如果还未产生结果,得到null,不过ChannelFuture定义V为Void,就算完成了获取也是null
boolean cancel(boolean var1); //取消任务
}
Channel的很多操作都是异步完成的,直接返回一个ChannelFuture,比如Channel的write操作,返回的就是一个ChannelFuture对象:
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
ChannelFuture future = ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
System.out.println("任务完成状态:"+future.isDone()); //通过ChannelFuture来获取相关信息
}
});
包括我们的服务端启动也是返回的ChannelFuture:
...
}
});
ChannelFuture future = bootstrap.bind(8080);
System.out.println("服务端启动状态:"+future.isDone());
System.out.println("我是服务端启动完成之后要做的事情!");
}
可以看到,服务端的启动就比较慢了,所以在一开始直接获取状态会返回false
,但是这个时候我们又需要等到服务端启动完成之后做一些事情,这个时候该怎么办呢?现在我们就有两种方案了:
}
});
ChannelFuture future = bootstrap.bind(8080);
future.sync(); //让当前线程同步等待任务完成
System.out.println("服务端启动状态:"+future.isDone());
System.out.println("我是服务端启动完成之后要做的事情!");
}
第一种方案是直接让当前线程同步等待异步任务完成,我们可以使用sync()
方法,这样当前线程会一直阻塞直到任务结束。第二种方案是添加一个监听器,等待任务完成时通知:
}
});
ChannelFuture future = bootstrap.bind(8080);
//直接添加监听器,当任务完成时自动执行,但是注意执行也是异步的,不是在当前线程
future.addListener(f -> System.out.println("我是服务端启动完成之后要做的事情!"));
}
包括客户端的关闭,也是异步进行的:
try(Scanner scanner = new Scanner(System.in)){
while (true) {
System.out.println("<< 请输入要发送给服务端的内容:");
String text = scanner.nextLine();
if(text.isEmpty()) continue;
if(text.equals("exit")) { //输入exit就退出
ChannelFuture future = channel.close();
future.sync(); //等待Channel完全关闭
break;
}
channel.writeAndFlush(Unpooled.wrappedBuffer(text.getBytes()));
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
group.shutdownGracefully(); //优雅退出EventLoop,其实就是把还没发送的数据之类的事情做完,当然也可以shutdownNow立即关闭
}
我们接着来看看Promise接口,它支持手动设定成功和失败的结果:
//此接口也是继承自Netty中的Future接口
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V var1); //手动设定成功
boolean trySuccess(V var1);
Promise<V> setFailure(Throwable var1); //手动设定失败
boolean tryFailure(Throwable var1);
boolean setUncancellable();
//这些就和之前的Future是一样的了
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);
Promise<V> await() throws InterruptedException;
Promise<V> awaitUninterruptibly();
Promise<V> sync() throws InterruptedException;
Promise<V> syncUninterruptibly();
}
比如我们来测试一下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Promise<String> promise = new DefaultPromise<>(new DefaultEventLoop());
System.out.println(promise.isSuccess()); //在一开始肯定不是成功的
promise.setSuccess("lbwnb"); //设定成功
System.out.println(promise.isSuccess()); //再次获取,可以发现确实成功了
System.out.println(promise.get()); //获取结果,就是我们刚刚给进去的
}
可以看到我们可以手动指定成功状态,包括ChannelOutboundInvoker中的一些基本操作,都是支持ChannelPromise的:
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
String text = buf.toString(StandardCharsets.UTF_8);
System.out.println("接收到客户端发送的数据:"+text);
ChannelPromise promise = new DefaultChannelPromise(channel);
System.out.println(promise.isSuccess());
ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()), promise);
promise.sync(); //同步等待一下
System.out.println(promise.isSuccess());
}
});
最后结果就是我们想要的了,当然我们也可以像Future那样添加监听器,当成功时自动通知:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Promise<String> promise = new DefaultPromise<>(new DefaultEventLoop());
promise.addListener(f -> System.out.println(promise.get())); //注意是在上面的DefaultEventLoop执行的
System.out.println(promise.isSuccess());
promise.setSuccess("lbwnb");
System.out.println(promise.isSuccess());
}
有关Future和Promise就暂时讲解到这里。