【Netty之轨迹】对 NIO 的理解与实战

0. NIO 简介

Java 共支持 3 中 I/O 模型:BIO,NIO 和 AIO

  • BIO(Blocking IO):同步并阻塞,是传统的阻塞型没实现模式为 一个线程处理一个连接 (适用于连接数目少且固定的架构)
  • NIO(Non-blocking IO):同步非阻塞,实现模式为 一个线程处理多个连接,连接请求都会注册到多路复用器上,多路复用器采用轮询的方式处理连接请求(适用于连接数目多且时间比较短的架构,如聊天、弹幕等,从 JDK1.4 后开始支持)
  • AIO:异步非阻塞,引入了异步通道的概念,采取 Proactor 模式,适用于连接数多且连接时间长的应用,但目前还没得到广泛引用(适用于连接数目多且时间长的架构, 从 JDK1.7 后开始支持)

1. NIO 三大核心

NIO 有三大核心部分:Selector(选择器),Channel(通道)和 Buffer(缓冲区)

三大核心的关系:

  • 每个 Selector 对应一个线程,并管理多个 Channel,且它们时双向的。
    Selector 会根据不同的事件(Event)在各个哦通道上切换
  • 每个 Channel 对应一个 Buffer
  • 每个 Buffer 对应 多个连接,数据的读取和写入通过 Buffer 实现,即可读又可写,是双向的

面向 缓冲区 / 块 编程,先将数据读取到一个稍后处理的缓冲区中,需要时可以在缓冲区中前后移动,实现了非阻塞(BIO 是面向流的,无法实现)


2. Buffer 缓存区

本质是一个可读写数据的内存块,底层是一个数组

基本操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void bufferTest() {
// 床架一个 Buffer(可以存放 5 个 int 的缓冲区)
ByteBuffer byteBuffer = ByteBuffer.allocate(128);

// 写入
byteBuffer.putInt(5);
byteBuffer.putLong(500);
byteBuffer.putChar('a');

// 读写切换
byteBuffer.flip();

// 读取
System.out.println(byteBuffer.getInt());
System.out.println(byteBuffer.getLong());
System.out.println(byteBuffer.getChar());
}

▲ 凝神:

  • 读取转写入和写入转读取都需要使用 flip() ,否则结果将出错
  • 读取的类型必须和写入的类型顺序需一致,否则也会导致结果不正确,甚至出现 BufferUnderflowException 异常(如把最后的 getChar 改为 getLong)
  • 可以通过 byteBuffer.asReadOnlyBuffer() 获得只读的 Buffer,注意啦,只读的是产生的新 Buffer,原来的 Buffer 还是依旧可以读写
  • 此外 Buffer 还有其他变体,如 IntBuffer,FloatBuffer 等

NIO 支持将数据读取到一个 Buffer 数组中,会依次写入各个数组(分散)
同时也可以将一个 Buffer 数组的内容读出来,依次从各个数组读(聚合)


3. Channel 通道

通道类似于流,其特点为:

  • 通道可以同时进行读写,而流只能读或者只能写
  • 通道可以实现异步读取数据
  • 通道可以重缓存中读取数据,也可以将数据写入到缓存中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
public void fileChannelTest01() throws IOException {
String str = "Hello IceClean";

// 创建输出流
FileOutputStream fileOutputStream = new FileOutputStream("d:\\test01.txt");

// 获取对应的 FileChannel
FileChannel channel = fileOutputStream.getChannel();

// 保存到缓存区中
ByteBuffer byteBuffer = ByteBuffer.allocate(str.length());
byteBuffer.put(str.getBytes());

// buffer 提供数据,为读
byteBuffer.flip();

// channel 接收数据,为写
channel.write(byteBuffer);

// 关闭输出流
fileOutputStream.close();
}

读取文件反过来,一样的做法

▲ 凝神
要写入文件时,buffer 作为输出端需要 ’读‘ 的状态,而 Channel 作为输入端需要 ‘写’ 的状态,读取文件时相反
channel.write(buffer) 是将 buffer 中的数据写入到通道中
channel.read(buffer) 是将通道中的数据读取到 buffer 中


4. Selector 选择器

执行流程如下:

  • 当客户端连接时,会通过 ServerSocketChannel 得到 SocketChannel

  • 客户的通道需要注册到选择器中,而后由选择器进行管理

  • 选择器可以轮询各个通道,看有没有事件发生,有的话则处理

    1
    2
    3
    4
    5
    6
    轮询的方法有
    select(); 阻塞监听,如果没有通道发生事件,则一直阻塞再这里
    select(1000); 每隔 1 秒询问 1
    selectNow(); 非阻塞监听,询问后如果没有事件,立即返回

    返回的是 0 说明没有事件发生,大于 0 说明有
  • 有事件发生的话,可以通过 selector.selectedKeys() 获取到所有发生事件的 key (在服务端将客户通道注册到选择器时产生的)

  • 之后可以通过这些 key 反向获取到发生事件所在的通道

  • 每个通道在注册时都会设定自己关心的事件,往后想更改关心的事件可以通过对应的 key 执行 selectionKey.interestOps(xxx);

    1
    2
    3
    4
    5
    6
    7
    8
    9
    能被关心的事件如下:
    1) SelectionKey.OP_ACCEPT:连接事件
    只要有新的客户想要连接到客户端,该通道会进行处理

    2) SelectionKey.OP_READ:读取操作
    该通道负责读取数据到服务端,相当于客户端发送消息到服务端

    3) SelectionKey.OP_WRITE:写入操作
    该通道负责写入服务端的消息,相当于服务端给客户端发送消息
  • 显然,选择器一开始就应该有一个通道来接收其他客户端的连接,该通道本质上和其他客户端的通道时一样的,但它一开始就被设定为只关心连接操作,所以当有客户俩连接时,它将进行处理,之后由程序为客户端注册新的通道进行使用


5. 知识厘清:客户端与服务端的连接

在服务端和通道之间有一块缓存区,用于通道和服务端进行双向的数据交流
而在通道与客户端之间也有一块缓存区,用于通道与客户端进行双向的数据交流
而除了一开始接收连接的哪个通道是在服务器创建的(ServerSocketChannel)专门用于监听连接操作
其他的通道均由客户端创建,客户端想连接服务端时,会首先创建一个通道 SocketChannel,然后通过 socketChannel.connect(inetSocketAddress)(详细见下面实战)尝试连接到服务端,之后就被服务端的监听通道捕捉,监听通道通过 serverSocketChannel.accept(); 获取到客户端创建的通道,之后由程序注册到 selector 选择器中
至此,客户端和服务端的连接达成!


6. 实战演练:附带详细注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
服务器端代码:

public static void main(String[] args) throws IOException {
// 创建选择器
Selector selector = Selector.open();

// 创建服务端的监听通道(通过工厂模式 open)
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

// 添加端口,并设置为非阻塞
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
serverSocketChannel.configureBlocking(false);

// 用来监听的通道也和其他通道一样,需要注册到选择器中
// 并且注册为 OP_ACCEPT,即关心连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

// 接下来则右选择器去轮询,看有没有哪个通道有事件
while (true) {

// 设置监听 1 秒(即需要阻塞 1 秒),若返回值为 0 说明没有新事件
if (selector.select(1000) == 0) {
System.out.println("目前没有事件呐~");
continue;
}

// 有新事件的话,则取出选择器中所有的 selectKey(在事件进来时就已经产生的)
Set<SelectionKey> selectionKeys = selector.selectedKeys();

// 遍历所有的 key,一个一个处理掉
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();

// 看看这个 key 具体要做什么操作
if (selectionKey.isAcceptable()) {
// 是连接操作,则为客户注册连接通道
// 先通过 accept 获取到客户的通道,并设置为非阻塞
SocketChannel accept = serverSocketChannel.accept();
accept.configureBlocking(false);
// 再将该通道注册到选择器上,设置该通道关心读取操作(即客户端发送信息给服务端,因为通道要读取数据到服务端)
// 同时关联一个 Buffer 缓存区
accept.register(selector, SelectionKey.OP_WRITE, ByteBuffer.allocate(1024));
}
else if (selectionKey.isReadable()) {
// 是读取操作,则通过 key 反向获取到客户所用的通道,并设置为非阻塞
SocketChannel channel = (SocketChannel) selectionKey.channel();
channel.configureBlocking(false);
// 获取到该通道关联的缓存区(关联的是缓存区,强转一下就好了)
// 缓冲区里边就有客户端写入的消息
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
// 通过通道读取缓存区中的东西
channel.read(buffer);
System.out.println("客户端发来:" + new String(buffer.array()));
}

// 处理完成后要删除当前的 selectionKey,不然可能会被其他线程再次处理,造成重复
keyIterator.remove();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
客户端代码:

public static void main(String[] args) throws IOException {
// 首先创建一个连接通道去连接服务端,并设置为非阻塞
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);

// 尝试连接服务端
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("客户端正在连接中");
}
}

// 准备发送数据给服务端
ByteBuffer buffer = ByteBuffer.wrap("你好呀~我是客户端".getBytes());

// 客户端要发送,即通道要写入
socketChannel.write(buffer);

// 暂时阻塞在这里
System.in.read();
}

▲ 凝神:
在客户端中,最后一句 System.in.read(); 是在模拟客户的无事件状态,不可少
少了的话由于客户端运行完毕直接结束掉,会出现 远程主机强迫关闭了一个现有的连接 的错误
这个错误可以手动通过关闭客户端的通道来实现,但这就相当于断开了客户端与服务端的连接了


7. SelectionKey 解释

该 key 指 的有发生事件的那些 key,由 selector.selectedKeys() 获得
而由 selector.keys() 得到的时所有通道的 key ,含义不同注意区分好

① 获取选择器: 通过 selector() 方法得到与之关联的选择器
② 获取通道: 通过 channel() 方法反向获取到该 key 对应的通道
③ 改变关心事件: 通过 interestOps(xxx) 更改关心的事件
④ 事件判断:

1
2
3
4
5
6
7
boolean isAcceptable():判断是否发生连接操作
boolean isReadable():判断是否发生读取操作
boolean isWritable():判断是否发生写入操作

注意,SelectionKey 中的这些判断时站在服务端的角度来说的
也就是说,读取操作时服务端从通道中读取数据,即客户端发送消息给服务端
而写入操作时服务端想通道中写入数据,进而服务端发送消息给客户端

夕阳东起,仅在朝朝暮暮(IceClean)