Netty框架

前面我们学习了Java为我们提供的NIO框架,提供使用NIO提供的三大组件,我们就可以编写更加高性能的客户端/服务端网络程序了,甚至还可以自行规定一种通信协议进行通信。

NIO框架存在的问题

但是之前我们在使用NIO框架的时候,还是发现了一些问题,我们先来盘点一下。

客户端关闭导致服务端空轮询

可能在之前的实验中,你发现了这样一个问题:

image-20230306173647589

当我们的客户端主动与服务端断开连接时,会导致READ事件一直被触发,也就是说selector.select()会直接通过,并且是可读的状态,但是我们发现实际上读到是数据是一个空的(上面的图中在空轮询两次后抛出异常了,也有可能是无限的循环下去)所以这里我们得稍微处理一下:

java Copy
} else if(key.isReadable()) {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(128);
    //这里我们需要判断一下,如果read操作得到的结果是-1,那么说明服务端已经断开连接了
    if(channel.read(buffer) < 0) {
        System.out.println("客户端已经断开连接了:"+channel.getRemoteAddress());
        channel.close();   //直接关闭此通道
        continue;   //继续进行选择
    }
    buffer.flip();
    System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
    channel.write(ByteBuffer.wrap("已收到!".getBytes()));
}

这样,我们就可以在客户端主动断开时关闭连接了:

image-20230306173700652

当然,除了这种情况可能会导致空轮询之外,实际上还有一种可能,这种情况是NIO框架本身的BUG:

java Copy
while (true) {
    int count = selector.select();  //由于底层epoll机制的问题,导致select方法可能会一直返回0,造成无限循环的情况。
    System.out.println("监听到 "+count+" 个事件");
    Set<SelectionKey> selectionKeys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = selectionKeys.iterator();

详细请看JDK官方BUG反馈:

  1. JDK-6670302 : (se) NIO selector wakes up with 0 selected keys infinitely
  2. JDK-6403933 : (se) Selector doesn't block on Selector.select(timeout) (lnx)

本质原因也是因为客户端的主动断开导致:

This is an issue with poll (and epoll) on Linux. If a file descriptor for a connected socket is polled with a request event mask of 0, and if the connection is abruptly terminated (RST) then the poll wakes up with the POLLHUP (and maybe POLLERR) bit set in the returned event set. The implication of this behaviour is that Selector will wakeup and as the interest set for the SocketChannel is 0 it means there aren't any selected events and the select method returns 0.

这个问题本质是与操作系统有关的,所以JDK一直都认为是操作系统的问题,不应该由自己来处理,所以这个问题在当时的好几个JDK版本都是存在的,这是一个很严重的空转问题,无限制地进行空转操作会导致CPU资源被疯狂消耗。

不过,这个问题,却被Netty框架巧妙解决了,我们后面再说。

粘包/拆包问题

除了上面的问题之外,我们接着来看下一个问题。

我们在计算机网络这门课程中学习过,操作系统通过TCP协议发送数据的时候,也会先将数据存放在缓冲区中,而至于什么时候真正地发出这些数据,是由TCP协议来决定的,这是我们无法控制的事情。

image-20230306173718414

也就是说,比如现在我们要发送两个数据包(P1/P2),理想情况下,这两个包应该是依次到达服务端,并由服务端正确读取两次数据出来,但是由于上面的机制,可能会出现下面的情况:

  1. 可能P1和P2被合在一起发送给了服务端(粘包现象)
  2. 可能P1和P2的前半部分合在一起发送给了服务端(拆包现象)
  3. 可能P1的前半部分就被单独作为一个部分发给了服务端,后面的和P2一起发给服务端(也是拆包现象)
image-20230306173728520

当然,对于这种问题,也有一些比较常见的解决方案:

  1. 消息定长,发送方和接收方规定固定大小的消息长度,例如每个数据包大小固定为200字节,如果不够,空位补空格,只有接收了200个字节之后,作为一个完整的数据包进行处理。
  2. 在每个包的末尾使用固定的分隔符,比如每个数据包末尾都是\r\n,这样就一定需要读取到这样的分隔符才能将前面所有的数据作为一个完整的数据包进行处理。
  3. 将消息分为头部和本体,在头部中保存有当前整个数据包的长度,只有在读到足够长度之后才算是读到了一个完整的数据包。

这里我们就来演示一下第一种解决方案:

java Copy
public static void main(String[] args) {
    try (ServerSocketChannel serverChannel = ServerSocketChannel.open();
         Selector selector = Selector.open()){
        serverChannel.bind(new InetSocketAddress(8080));
        serverChannel.configureBlocking(false);
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        //一个数据包要求必须塞满30个字节
        ByteBuffer buffer = ByteBuffer.allocate(30);

        while (true) {
            int count = selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                ...
                    if(buffer.remaining() == 0) {
                        buffer.flip();
                        System.out.println("接收到客户端数据:"+new String(buffer.array(), 0, buffer.remaining()));
                        buffer.clear();
                    }
                    channel.write(ByteBuffer.wrap(("已收到 "+size+" 字节的数据!").getBytes()));
                }
               	...

现在,当我们的客户端发送消息时,如果没有达到30个字节,那么会暂时存储起来,等有30个之后再一次性得到,当然如果数据量超过了30,那么最多也只会读取30个字节,其他的放在下一批:

image-20230306173746619
image-20230306173755459

这样就可以在一定程度上解决粘包/拆包问题了。


走进Netty框架

前面我们盘点了一下NIO存在的一些问题,而在Netty框架中,这些问题都被巧妙的解决了。

Netty是由JBOSS提供的一个开源的java网络编程框架,主要是对java的nio包进行了再次封装。Netty比java原生的nio包提供了更加强大、稳定的功能和易于使用的api。 netty的作者是Trustin Lee,这是一个韩国人,他还开发了另外一个著名的网络编程框架,mina。二者在很多方面都十分相似,它们的线程模型也是基本一致 。不过netty社区的活跃程度要mina高得多。

Netty实际上应用场景非常多,比如我们的Minecraft游戏服务器:

image-20230306173806432

Java版本的Minecraft服务器就是使用Netty框架作为网络通信的基础,正是得益于Netty框架的高性能,我们才能愉快地和其他的小伙伴一起在服务器里面炸服。

学习了Netty框架后,说不定你也可以摸索到部分Minecraft插件/模组开发的底层细节(太折磨了,UP主高中搞了大半年这玩意)

当然除了游戏服务器之外,我们微服务之间的远程调用也可以使用Netty来完成,比如Dubbo的RPC框架,包括最新的SpringWebFlux框架,也抛弃了内嵌Tomcat而使用Netty作为通信框架。既然Netty这么强大,那么现在我们就开始Netty的学习吧!

导包先:

xml Copy
<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.76.Final</version>
    </dependency>
</dependencies>

ByteBuf介绍

Netty并没有使用NIO中提供的ByteBuffer来进行数据装载,而是自行定义了一个ByteBuf类。

那么这个类相比NIO中的ByteBuffer有什么不同之处呢?

  • 写操作完成后无需进行flip()翻转。
  • 具有比ByteBuffer更快的响应速度。
  • 动态扩容。

首先我们来看看它的内部结构:

java Copy
public abstract class AbstractByteBuf extends ByteBuf {
    ...
    int readerIndex;   //index被分为了读和写,是两个指针在同时工作
    int writerIndex;
    private int markedReaderIndex;    //mark操作也分两种
    private int markedWriterIndex;
    private int maxCapacity;    //最大容量,没错,这玩意能动态扩容

可以看到,读操作和写操作分别由两个指针在进行维护,每写入一次,writerIndex向后移动一位,每读取一次,也是readerIndex向后移动一位,当然readerIndex不能大于writerIndex,这样就不会像NIO中的ByteBuffer那样还需要进行翻转了。

image-20230814163638094

其中readerIndexwriterIndex之间的部分就是是可读的内容,而writerIndex之后到capacity都是可写的部分。

我们来实际使用一下看看:

java Copy
public static void main(String[] args) {
    //创建一个初始容量为10的ByteBuf缓冲区,这里的Unpooled是用于快速生成ByteBuf的工具类
    //至于为啥叫Unpooled是池化的意思,ByteBuf有池化和非池化两种,区别在于对内存的复用,我们之后再讨论
    ByteBuf buf = Unpooled.buffer(10);
    System.out.println("初始状态:"+Arrays.toString(buf.array()));
    buf.writeInt(-888888888);   //写入一个Int数据
    System.out.println("写入Int后:"+Arrays.toString(buf.array()));
    buf.readShort();   //无需翻转,直接读取一个short数据出来
    System.out.println("读取Short后:"+Arrays.toString(buf.array()));
    buf.discardReadBytes();   //丢弃操作,会将当前的可读部分内容丢到最前面,并且读写指针向前移动丢弃的距离
    System.out.println("丢弃之后:"+Arrays.toString(buf.array()));
    buf.clear();    //清空操作,清空之后读写指针都归零
    System.out.println("清空之后:"+Arrays.toString(buf.array()));
}

通过结合断点调试,我们可以观察读写指针的移动情况,更加清楚的认识一下ByteBuf的底层操作。

我们再来看看划分操作是不是和之前一样的:

java Copy
public static void main(String[] args) {
  	//我们也可以将一个byte[]直接包装进缓冲区(和NIO是一样的)不过写指针的值一开始就跑到最后去了,但是这玩意是不是只读的
    ByteBuf buf = Unpooled.wrappedBuffer("abcdefg".getBytes());
  	//除了包装,也可以复制数据,copiedBuffer()会完完整整将数据拷贝到一个新的缓冲区中
    buf.readByte();   //读取一个字节
    ByteBuf slice = buf.slice();   //现在读指针位于1,然后进行划分

    System.out.println(slice.arrayOffset());   //得到划分出来的ByteBuf的偏移地址
    System.out.println(Arrays.toString(slice.array()));
}

可以看到,划分也是根据当前读取的位置来进行的。

我们继续来看看它的另一个特性,动态扩容,比如我们申请一个容量为10的缓冲区:

java Copy
public static void main(String[] args) {
    ByteBuf buf = Unpooled.buffer(10);    //容量只有10字节
    System.out.println(buf.capacity());
  	//直接写一个字符串
    buf.writeCharSequence("卢本伟牛逼!", StandardCharsets.UTF_8);   //很明显这么多字已经超过10字节了
    System.out.println(buf.capacity());
}

通过结果我们发现,在写入一个超出当前容量的数据时,会进行动态扩容,扩容会从64开始,之后每次触发扩容都会x2,当然如果我们不希望它扩容,可以指定最大容量:

java Copy
public static void main(String[] args) {
    //在生成时指定maxCapacity也为10
    ByteBuf buf = Unpooled.buffer(10, 10);
    System.out.println(buf.capacity());
    buf.writeCharSequence("卢本伟牛逼!", StandardCharsets.UTF_8);
    System.out.println(buf.capacity());
}

可以看到现在无法再动态扩容了:

image-20230306173824953

我们接着来看一下缓冲区的三种实现模式:堆缓冲区模式、直接缓冲区模式、复合缓冲区模式。

堆缓冲区(数组实现)和直接缓冲区(堆外内存实现)不用多说,前面我们在NIO中已经了解过了,我们要创建一个直接缓冲区也很简单,直接调用:

java Copy
public static void main(String[] args) {
    ByteBuf buf = Unpooled.directBuffer(10);
    System.out.println(Arrays.toString(buf.array()));
}

同样的不能直接拿到数组,因为底层压根不是数组实现的:

image-20230306174001430

我们来看看复合模式,复合模式可以任意地拼凑组合其他缓冲区,比如我们可以:

image-20230306174009890

这样,如果我们想要对两个缓冲区组合的内容进行操作,我们就不用再单独创建一个新的缓冲区了,而是直接将其进行拼接操作,相当于是作为多个缓冲区组合的视图。

java Copy
//创建一个复合缓冲区
CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.copiedBuffer("abc".getBytes()));
buf.addComponent(Unpooled.copiedBuffer("def".getBytes()));

for (int i = 0; i < buf.capacity(); i++) {
    System.out.println((char) buf.getByte(i));
}

可以看到我们也可以正常操作组合后的缓冲区。

最后我们来看看,池化缓冲区和非池化缓冲区的区别。

我们研究一下Unpooled工具类中具体是如何创建buffer的:

java Copy
public final class Unpooled {
    private static final ByteBufAllocator ALLOC;  //实际上内部是有一个ByteBufAllocator对象的
    public static final ByteOrder BIG_ENDIAN;
    public static final ByteOrder LITTLE_ENDIAN;
    public static final ByteBuf EMPTY_BUFFER;

    public static ByteBuf buffer() {
        return ALLOC.heapBuffer();   //缓冲区的创建操作实际上是依靠ByteBufAllocator来进行的
    }
  
  	...
      
    static {   //ALLOC在静态代码块中进行指定,实际上真正的实现类是UnpooledByteBufAllocator
        ALLOC = UnpooledByteBufAllocator.DEFAULT;
        BIG_ENDIAN = ByteOrder.BIG_ENDIAN;
        LITTLE_ENDIAN = ByteOrder.LITTLE_ENDIAN;
        EMPTY_BUFFER = ALLOC.buffer(0, 0);   //空缓冲区容量和最大容量都是0

        assert EMPTY_BUFFER instanceof EmptyByteBuf : "EMPTY_BUFFER must be an EmptyByteBuf.";

    }
}

那么我们来看看,这个ByteBufAllocator又是个啥,顾名思义,其实就是负责分配缓冲区的。

它有两个具体实现类:UnpooledByteBufAllocatorPooledByteBufAllocator,一个是非池化缓冲区生成器,还有一个是池化缓冲区生成器,那么池化和非池化有啥区别呢?

实际上池化缓冲区利用了池化思想,将缓冲区通过设置内存池来进行内存块复用,这样就不用频繁地进行内存的申请,尤其是在使用堆外内存的时候,避免多次重复通过底层malloc()函数系统调用申请内存造成的性能损失。Netty的内存管理机制主要是借鉴Jemalloc内存分配策略,感兴趣的小伙伴可以深入了解一下。

所以,由于是复用内存空间,我们来看个例子:

java Copy
public static void main(String[] args) {
    ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
    ByteBuf buf = allocator.directBuffer(10);   //申请一个容量为10的直接缓冲区
    buf.writeChar('T');    //随便操作操作
    System.out.println(buf.readChar());
    buf.release();    //释放此缓冲区

    ByteBuf buf2 = allocator.directBuffer(10);   //重新再申请一个同样大小的直接缓冲区
    System.out.println(buf2 == buf);
}

可以看到,在我们使用完一个缓冲区之后,我们将其进行资源释放,当我们再次申请一个同样大小的缓冲区时,会直接得到之前已经申请好的缓冲区,所以,PooledByteBufAllocator实际上是将ByteBuf实例放入池中在进行复用。

零拷贝简介

注意: 此小节作为选学内容,需要掌握操作系统计算机组成原理才能学习。

零拷贝是一种I/O操作优化技术,可以快速高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,首先第一个问题,什么是内核空间,什么又是用户空间呢?

其实早期操作系统是不区分内核空间和用户空间的,但是应用程序能访问任意内存空间,程序很容易不稳定,常常把系统搞崩溃,比如清除操作系统的内存数据。实际上让应用程序随便访问内存真的太危险了,于是就按照CPU 指令的重要程度对指令进行了分级,指令分为四个级别:Ring0 ~ Ring3,Linux 下只使用了 Ring0 和 Ring3 两个运行级别,进程运行在 Ring3 级别时运行在用户态,指令只访问用户空间,而运行在 Ring0 级别时被称为运行在内核态,可以访问任意内存空间。

image-20230306174025216

比如我们Java中创建一个新的线程,实际上最终是要交给操作系统来为我们进行分配的,而需要操作系统帮助我们完成任务则需要进行系统调用,是内核在进行处理,不是我们自己的程序在处理,这时就相当于我们的程序处于了内核态,而当操作系统底层分配完成,最后到我们Java代码中返回得到线程对象时,又继续由我们的程序进行操作,所以从内核态转换回了用户态。

而我们的文件操作也是这样,我们实际上也是需要让操作系统帮助我们从磁盘上读取文件数据或是向网络发送数据,比如使用传统IO的情况下,我们要从磁盘上读取文件然后发送到网络上,就会经历以下流程:

image-20230306174033340

可以看到整个过程中是经历了2次CPU拷贝+2次DMA拷贝,一共四次拷贝,虽然逻辑比较清晰,但是数据老是这样来回进行复制,是不是太浪费时间了点?所以我们就需要寻找一种更好的方式,来实现零拷贝。

实现零拷贝我们这里演示三种方案:

  1. 使用虚拟内存

    现在的操作系统基本都是支持虚拟内存的,我们可以让内核空间和用户空间的虚拟地址指向同一个物理地址,这样就相当于是直接共用了这一块区域,也就谈不上拷贝操作了:

    image-20230306174044056
  2. 使用mmap/write内存映射

    实际上这种方式就是将内核空间中的缓存直接映射到用户空间缓存,比如我们之前在学习NIO中使用的MappedByteBuffer,就是直接作为映射存在,当我们需要将数据发送到Socket缓冲区时,直接在内核空间中进行操作就行了:

    image-20230306174056380

    不过这样还是会出现用户态和内核态的切换,我们得再优化优化。

  3. 使用sendfile方式

    在Linux2.1开始,引入了sendfile方式来简化操作,我们可以直接告诉内核要把哪个文件数据拷贝拷贝到Socket上,直接在内核空间中一步到位:

    image-20230306174108253

    比如我们之前在NIO中使用的transferTo()方法,就是利用了这种机制来实现零拷贝的。

Netty工作模型

前面我们了解了Netty为我们提供的更高级的缓冲区类,我们接着来看看Netty是如何工作的,上一章我们介绍了Reactor模式,而Netty正是以主从Reactor多线程模型为基础,构建出了一套高效的工作模型。

大致工作模型图如下:

image-20230306174117322

可以看到,和我们之前介绍的主从Reactor多线程模型非常类似:

image-20230306174127616

所有的客户端需要连接到主Reactor完成Accept操作后,其他的操作由从Reactor去完成,这里也是差不多的思想,但是它进行了一些改进,我们来看一下它的设计:

  • Netty 抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接受客户端的连接, WorkerGroup专门负读写,就像我们前面说的主从Reactor一样。
  • 无论是BossGroup还是WorkerGroup,都是使用EventLoop(事件循环,很多系统都采用了事件循环机制,比如前端框架Node.js,事件循环顾名思义,就是一个循环,不断地进行事件通知)来进行事件监听的,整个Netty也是使用事件驱动来运作的,比如当客户端已经准备好读写、连接建立时,都会进行事件通知,说白了就像我们之前写NIO多路复用那样,只不过这里换成EventLoop了而已,它已经帮助我们封装好了一些常用操作,而且我们可以自己添加一些额外的任务,如果有多个EventLoop,会存放在EventLoopGroup中,EventLoopGroup就是BossGroup和WorkerGroup的具体实现。
  • 在BossGroup之后,会正常将SocketChannel绑定到WorkerGroup中的其中一个EventLoop上,进行后续的读写操作监听。

前面我们大致了解了一下Netty的工作模型,接着我们来尝试创建一个Netty服务器:

java Copy
public static void main(String[] args) {
    //这里我们使用NioEventLoopGroup实现类即可,创建BossGroup和WorkerGroup
    //当然还有EpollEventLoopGroup,但是仅支持Linux,这是Netty基于Linux底层Epoll单独编写的一套本地实现,没有使用NIO那套
    EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();

    //创建服务端启动引导类
    ServerBootstrap bootstrap = new ServerBootstrap();
    //可链式,就很棒
    bootstrap
            .group(bossGroup, workerGroup)   //指定事件循环组
            .channel(NioServerSocketChannel.class)   //指定为NIO的ServerSocketChannel
            .childHandler(new ChannelInitializer<SocketChannel>() {   //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
                @Override
                protected void initChannel(SocketChannel channel) {
                    //获取流水线,当我们需要处理客户端的数据时,实际上是像流水线一样在处理,这个流水线上可以有很多Handler
                    channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){   //添加一个Handler,这里使用ChannelInboundHandlerAdapter
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) {  //ctx是上下文,msg是收到的消息,默认以ByteBuf形式(也可以是其他形式,后面再说)
                            ByteBuf buf = (ByteBuf) msg;   //类型转换一下
                            System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                            //通过上下文可以直接发送数据回去,注意要writeAndFlush才能让客户端立即收到
                            ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
                        }
                    });
                }
            });
    //最后绑定端口,启动
    bootstrap.bind(8080);
}

可以看到上面写了很多东西,但是你一定会懵逼,这些新来的东西,都是什么跟什么啊,怎么一个也没看明白?没关系,我们可以暂时先将代码写在这里,具体的各个部分,还请听后面细细道来。

我们接着编写一个客户端,客户端可以直接使用我们之前的:

java Copy
public static void main(String[] args) {
    //创建一个新的SocketChannel,一会通过通道进行通信
    try (SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
         Scanner scanner = new Scanner(System.in)){
        System.out.println("已连接到服务端!");
        while (true) {   //咱给它套个无限循环,这样就能一直发消息了
            System.out.println("请输入要发送给服务端的内容:");
            String text = scanner.nextLine();
            if(text.isEmpty()) continue;
            //直接向通道中写入数据,真舒服
            channel.write(ByteBuffer.wrap(text.getBytes()));
            System.out.println("已发送!");
            ByteBuffer buffer = ByteBuffer.allocate(128);
            channel.read(buffer);   //直接从通道中读取数据
            buffer.flip();
            System.out.println("收到服务器返回:"+new String(buffer.array(), 0, buffer.remaining()));
        }
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

通过通道正常收发数据即可,这样我们就成功搭建好了一个Netty服务器。

ページを読み込んでいます。しばらくお待ちください...