Java编程 VII IO

IO

IO 是系统中“输入/输出”(Input/Output)的简称,通常指代操作系统执行读写操作的整个过程。在操作系统中,用户程序和内核程序之间存在严格的权限隔离,分别处于“用户态”和“内核态”。

  • 用户态主要用于运行用户加载到内存中的应用程序,包括数据处理和逻辑执行。
  • 内核态则负责管理系统资源,其核心是以 task_struct 为代表的数据结构,用于描述进程信息,并与内核各个子系统(如文件系统、内存管理、设备驱动等)交互,同时记录用户进程与内核资源之间的绑定关系。

当用户程序需要执行读写等 IO 操作时,会通过系统调用接口发起请求。这一请求会引发权限切换,即从用户态“陷入”(trap)到内核态,由内核代表应用程序操作底层资源(如文件、硬盘、内存),以保障系统安全性和稳定性,防止用户程序直接访问或破坏关键内存区域。

一个典型的文件写入流程如下:

  1. 用户程序创建文件抽象对象,内核创建底层文件资源;
  2. 通过系统调用进入内核态,由内核创建相应的底层文件结构,并返回一个 文件描述符 (file descriptor);
  3. 用户程序使用该描述符,在用户态创建流对象用于数据写入;
  4. 当写入发生时,再次触发系统调用,要求内核将数据写入对应的文件或设备;
  5. 数据从用户态复制到内核态缓冲区,再由内核完成实际的写入;
  6. 如果是读取操作,则由内核将文件内容从磁盘加载到内核缓冲区,再复制回用户态内存;
  7. 用户态程序随后可以对这些数据进行字节流或字符流的处理。

由上述流程我们可以看到 I/O 流程存在以下性能瓶颈 :

  1. 阻塞 阻塞是指应用程序在执行 I/O 操作时,如果数据还未准备好(如文件未读完、磁盘还在寻址、网络数据还未到达),进程会进入 等待状态 ,无法继续执行,直到 I/O 操作完成为止。
  2. 用户态与内核态的切换属于一种“上下文切换”,指操作系统在执行系统调用时,需要保存当前用户态的执行环境(寄存器、程序计数器、栈指针等),切换到内核态并加载内核的运行环境。
  3. 在用户态和内核态之间的数据交互并不是“共享内存”,而是通过 显式复制(copy)实现。通常发生在读写文件或网络数据时。

IO模型

同步阻塞IO

用户空间 主动发起,需要等待内核IO 操作彻底完成才返回到用户空间的IO

流程

  1. 发起启动调用read,用户态线程阻塞
  2. 内核收到read系统调用,进行数据准备。
  3. 内核等到完整数据到达,将数据从内核缓冲区拷贝到用户缓冲区
  4. 内核返回后用户线程解除阻塞状态,重新运行

优点 : 实现简单

缺点 :每个连接都使用自己的线程进行IO 操作,高并发场景下 切换开销和阻塞会导致性能很低

内核态准备数据和复制数据是同步进行的,直到数据可读后才返回

同步非阻塞IO

用户空间程序不需要等待内核IO操作彻底完成,可以立即返回用户空间执行后续命令。

流程

  1. 用户线程 向 内核发起查询请求,查询内核缓冲区是否存在数据
  2. 发起系统调用,查询内核缓冲区是否存在数据,如果不存在,返回一个调用失败信息
  3. 如果存在数据,将内核缓冲区中的数据复制到用户缓冲区,这时阻塞用户态线程,直到复制完成返回系统调用成功

优点 : 通过不断查询检查缓存状态,不需要阻塞线程

缺点 :不断的向内核请求查询会增加上下文切换开销

IO多路复用

通过select/epoll 使得一个线程可以同时监听多个资源描述符,一旦某个就绪后就可以将对应的资源描述符返回,供用户空间程序使用

流程

  1. 选择器注册。将需要read操作的 目标文件描述符提前注册到Linux的select/epoll选择器中,
  2. 就绪状态的轮询。通过选择器的查询方法,查询所有提前注册过的目标文件描述符I/O就绪状态。返回一个就绪的socket列表和就绪事件,提示用户线程I/O就绪
  3. 用户线程得到了就绪的socket列表,向对应的连接发起系统调用,阻塞线程,从内核复制数据,将数据从内核缓冲区复制到用户缓冲区
  4. 复制完成后,内核返回结果,用户线程解除阻塞状态,用户线程读取到了数据继续执行。

优点 :一个线程可以同时处理许多读写连接,使得用户不必维护大量的线程

缺点 :select 在监听事件时 仍然是阻塞的。但 epoll 支持边缘触发,可结合非阻塞通道实现高性能 IO

异步IO

内核空间成为主动调用者,当资源准备就绪时向用户线程通知 资源就绪

流程

  1. 用户发起read系统调用,不阻塞后续逻辑
  2. 内核开始I/O 第一个阶段,准备数据,内核将数据从内核复制到用户缓冲区
  3. 内核向用户线程发送信号,通知用户线程数据已经复制到缓冲区
  4. 用户线程读取缓冲区数据,继续后续任务

优点 : IO过程非阻塞,真正的异步调用

缺点 : 需要进行事务的注册与接收,需要操作系统支持

NIO

java 提供了非阻塞IO 库 NIO

核心组件

Channel 通道

Buffer 缓冲区

Selector 选择器

缓冲区(Buffer)

Buffer 是一个非线程安全类

capacity 读写容量 设置buffer 创建的类型数组的最大容量

position 记录读写位置 在读写过程中记录当前写入/读出的位置

limit 读写的最大上限 记录实际写入的容量/实际读取的最大内容

mark 读写位置记录缓存 暂时存储position的位置

allocate()

1
2
3
4
5
6
7
8
9
public class UserBuffer
{
static IntBuffer intBuffer = null;
public static void allocateTest(){
intBuffer = IntBuffer.allocate(20);
...
}
}
// 创建后会定义好 .capacity = 20 , .position = 0 , .limit = 20

put()

1
2
3
4
5
6
7
8
9
10
11
12
public class UserBuffer
{
static IntBuffer intBuffer = null;
public static void putTest()
{
for(int i = 0 ; i < 5 ; i++)
{
intBuffer.put(i);
}
}
}
// 向buffer中写入5个数据 .capacity = 20 , .position = 5 , .limit = 20

flip()

1
2
3
4
5
6
7
8
9
public class UserBuffer
{
static IntBuffer intBuffer = null;
public static void flipTest()
{
intBuffer.flip();
}
}
// 反转buffer .capacity = 20 , .position = 0 , .limit = 5

get()

1
2
3
4
5
6
7
8
9
10
11
12
public class UserBuffer
{
static IntBuffer intBuffer = null;
public static void getTest()
{
for(int i = 0 ; i < 2 ; i++){
intBuffer.get();
}
}
}

// 从buffer中取数据 .capacity = 20 , .position = 2 , .limit = 5

rewind()

1
2
3
4
5
6
7
8
9
10
public class UserBuffer
{
static IntBuffer intBuffer = null;
public static void rewindTest()
{
intBuffer.rewind();
}
}

// 从buffer中重置定位 .capacity = 20 , .position = 0 , .limit = 5

mark()和 reset()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class UserBuffer
{
static IntBuffer intBuffer = null ;
public static void reRead(){
for (int i = 0 ; i < 5 ; i++){
if(i == 2){
intBuffer.mark();
}
int j = intBuffer.get();
}
}
public static void afterReset(){
intBuffer.reset();
for(int i = 2 ; i< 5 ; i++){
int j = intBuffer.get();
}
}
}
// 使用mark标记 position位置,使用reset 恢复position位置,打印。

clear()

1
2
3
4
5
6
7
public class UserBuffer
{
static IntBuffer intBuffer = null ;
public static void clearDemo(){
intBuffer.clear();
}
}

通道 (Channel )

FileChannel 文件通道

SocketChannel 套接字通道

ServerSocketChannel 服务器套接字通道

DatagramChannel 数据报通道

FileChannel 文件通道

创建通道对象

1
2
3
4
5
6
7
FileInputStream fis = new FileInputStream(srcFile);
FileChannel inChannel = fis.getChannel();
FileOutputStream fos = new FileOutputStream(destFile);
FileChannel outChannel = fos.getChannel();

RandomAccessFile rFile = new RandomAccessFile("filename.txt","rw");
FileChannel channel = rFile.getChannel();

读取

1
2
3
4
5
6
7
RandomAccessFile aFile = new RandomAccessFile(fileName,"rw");
FileChannel channel = aFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate(CAPACITY);
int length = -1
while((length = channel.read(buf)) != -1){
...
}

写入

1
2
3
4
5
buf.flip();
int outlength = 0;
while((outlength = outChannel.write(buf)) != 0){
System.out.println("写入字节数:" + outlength);
}

关闭通道

1
2
3
channel.close();

channel.force(true); // 强制刷新

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class FileNIOCopyDemo{
public static void main(String[] args){
nioCopyResourceFile();
}
public static void nioCopyResouceFile(){
String sourcePath = NioDemoConfig.FILE_RESOUCE_SRC_PATH;
String srcPath = IOUtil.getResoucePath(sourcePath);
Logger.info("srcPath = " + srcPath);

String destPath = NioDemoConfig.FILE_RESOURCE_DEST_PATH;
String destDecodePath = IOUtil.builderResourcePath(destPath);
Logger.info("destDecodePath=" + destDecodePath);
nioCopyFile(srcDecodePath , destDecodePath);
}

public static void nioCopyFile(String srcPath , String destPath){
File srcFile = new File(srcPath);
File destFile = new File(destPath);
try {
if(!destFile.exists()){
destFile.createNewFile();
}
long startTime = System.currentTimeMillis();
FileInputStream fis = null;
FileOutputStream fos = null ;
FileChannel inChannel = null ;
FileChannel outChannel = null ;
try {
fis = new FileInputStream(srcFile);
fos = new FileOutputStream(destFile);
inChannel = fis.getChannel();
outChannel = fos.getChannel();
int length = -1;
ByteBuffer buf = ByteBuffer.allocate(1024);
while((length = inChannel.read(buf)) != -1) {
buf.flip();
int outlength = 0;
while (( outlength = outChannel.write(buf))!=0 ){
buf.clear();
}
outChannel.force(true);
}finally{
IOUtil.closeQuietly(outChannel);
IOUtil.closeQuietly(fos);
IOUtil.closeQuietly(inchannel);
IOUtil.closeQuietly(fis);
}
long endTime = System.currentTimeMillis();
Logger.info("base复制毫秒数:" + (endTime - startTime));

}catch (IOException e){
e.printStackTrace();
}
}

SocketChannel

创建隧道

1
2
3
4
5
6
7
8
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1",80));
while(! socketChannel.finishConnect()){
}
ServerSocketChannel server = (ServerSocketChannel) key.channel());
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);

读取通道

1
2
ByteBuffer buf = ByteBuffer.allocate(1024);
int bytesRead = socketChannel.read(buf);

写入

1
2
buffer.flip();
socketChannel.write(buffer);

关闭

1
2
socketChannel.shutdownOutput();
IOUtil.closeQuietly(socketChannel);

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class NioSendClient{

private Charset charset = Charset.forName("UTF-8");
public void sendFile()
{
try
{
String sourcePath = NioDemoConfig.SOCKET_SEND_FILE;
String srcPath = IOUtil.getResourcePath(sourcePath);
Logger.debug("srcPath=" + srcPath);

String destFile = NioDemoConfig.SOCKET_RECEIVE_FILE;
Logger.debug("destFile=" + destFile);

File file = new File(srcPath);
if (!file.exists()){
Logger.debug("文件不存在");
return ;
}
FileChannel fileChannel = new FileInputStream(file).getChannel();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.socket().connect(new InetSocketAddress("127.0.0.1",18899));
socketChannel.configureBlocking(false);
Logger.debug("Client 已连接");
while (!socketChannel.finishConnect()){
...
}
ByteBuffer buffer = sendFileNameAndLength(destFile, file,socketChannel);
int length = sendContent(file,fileChannel,socketChannel,buffer);
if (length == -1){
IOUtil.closeQuietly(fileChannel);
socketChannel.shutdownOutput();
IOUtil.closeQuietly(socketChannel);
}
Logger.debug(" 文件传输成功 ");
} catch (Exception e){
e.printStackTrace();
}
}
public int sendContent(File file , FileChannel fileChannel, SocketChannel socketChannel,
ByteBuffer buffer) throws IOException{
Logger.debug("开始传输文件");
int length = 0;
long progess = 0 ;
while ((length = fileChannel.read(buffer)) >0 )
{
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
progess += length ;
Logger.debug("| + " (100 * progress/ file.length()) + "%|");
}
return length;
}

public ByteBuffer sendFileNameAndLength(String destFile, File file , SocketChannel socketChannel) throws IOException{
ByteBuffer fileNameByteBuffer = charset.encode(destFile);
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
int fileNameLen = fileNameByteBuffer.capacity();
buffer.putInt(fileNameLen);
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
Logger.info("发送完成" , fileNameLen) ;
socketChannel.write(fileNameByteBuffer);
Logger.info("Client 文件名称发送完成", destFile);
buffer.putLong(file.length());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
Logger.info("Client 文件长度发送完成:" , file.length());
return buffer ;
}

}

DatagramChannel

获取

1
2
3
4
DatagramChannel channel = DatagramChannel.open();
datagramChannel.configureBlocking(false);

channel.socket().bind(new InetSocketAddress(18080));

读取

1
2
ByteBuffer buf = ByteBuffer.allocate(1024);
SocketAddress clientAddr = datagramChannel.receive(buf);

写入

1
2
3
buffer.flip();
dChannel.send(buffer,new InetSocketAddress("127.0.0.1",18899));
buffer.clear();

关闭

1
dChannel.close();

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 客户端 :
public class UDPClient{
public void send() throws IOException{
DatagramChannel dChannel = DatagramChannel.open();
dChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
Scanner scanner = new Scanner(System.in);
Print.tcfo("请输入发送内容");
while(scanner.hasNext()){
String next = scanner.next();
buffer.put((Dateutil.getNow() + ">>" + next).getBytes();
buffer.flip();
dChannel.send(buffer,new InetSocketAddress("127.0.0.1",18899));
buffer.clear();
}
dChannel.close();
}
public static void main(String[] args) throws IOException{
new UDPClient().send();
}
}
// 客户端 :
public class UDPServer{
public void receive() throws IOException{
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.configureBlocking(false);
datagramChannel.bind(new InetSocketAddress("127.0.0.1",18899));
Print.tcfo("UDP 服务器启动成功");
Selector selector = Selector.open();
datagramChannel.register(selector, SelectionKey.OP_READ);
while(selector.select()>0){
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
if(selectionKey.isReadable()){
SocketAddress client = datagramChannel.receive(buffer);
buffer.flip();
Print.tcfo(new String(buffer.array(),0,buffer.limit));
buffer.clear();
}
}
iterator.remove();
}
selector.close();
datagramChannel.close();
}
public static void main(String[] args) throws IOException{
new UDPServer().receive();
}

}

选择器 (Selector)

将通道注册到选择器,通过选择器监听IO事件

1 可读 SelectionKey.OP_READ ( 通道有数据可读 )

2 可写 SelectionKey.OP_WRITE ( 通道有数据可写 )

3 连接 SelectionKey.OP_CONNECT ( 连接建立 )

4 接收 SelectionKey.OP_ACCEPT ( 有新的连接到来 )

创建 selector

1
2
3
4
5
6
7
8
9
10
Selector selector = Selector.open();

//获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置非阻塞
serverSocketChannel.configureBlocking(false);
//绑定连接
serverSocketChannel.bind(new InetSocketAddress(18899));
//将通道注册到选择器,设置监听事件
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
while (selector.select() > 0){
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
if(key.isAcceptable()){
//
} else if (key.isConnectable()){
//
} else if (key.isReadable()){
//
} else if (key.isWritable()){
//
}
keyIterator.remove();
}

}

实现discard服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// 客户端
public class NioDiscardClient{
public static void startServer() throws IOException{
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(18899));
Logger.info("服务器启动");
serverSocketChannel.register(selector,selectionKey.OP_ACCEPT);
while(selector.select()>0){
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()){
SelectionKey selectedKey = selectedKeys.next();
if(selectedKey.isAcceptable()){
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}else if (selectedKey.isReadable()){
SocketChannel socketChannel = (SocketChannel) selectedKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0 ;
while((length = socketChannel.read(byteBuffer))>0){
byteBuffer.flip();
Logger.info(new String(byteBuffer.array(),0,length));
byteBuffer.clear();
}
socketChannel.close();
}
selectedKeys.remove();
}
}
serverSocketChannel.close();
}
public static void main(String[] args) throws IOException{
startServer();
}

}

// 服务端
public class NioReceiveServer{
private static final String RECEIVE_PATH = NioDemoConfig.SOCKET_RECEIVE_PATH;
private Charset charset = Charset.forName("UTF-8");
static class Client
{
String fileName ;
long fileLength ;
long startTime ;
InetSocketAddress remoteAddress ;
FileChannel outChannel ;
long receiveLength ;
public boolean isFinished(){
return receiveLength >= fileLength ;
}
}

private ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SERVER_BUFFER_SIZE);
Map<SelectableChannel , Client> clientMap = new HashMap<SelectableChannel , Client>();
public void startServer() throws IOException
{
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
serverSocket.configureBlocking(false);
InetSocketAddress address = new InetSocketAddress(18899);
serverSocket.bind(address);
serverChannel.register(selector , SelectionKey.OP_ACCEPT);
Print.tcfo("server is listening ...");
while(selector.select() >0 ){
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey key = it.next();
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = server.accept();
if (socketChannel == null) continue ;
socketChannel.configureBlocking(false);
SelectionKey selectionKey = socketChannel.register(selector,SelectionKey.OP_READ);
Client client = new Client();
client.remoteAddress = (InetSocketAddress)socketChannel.getRemoteAddress();
clientMap.put(socketChannel,client);
Logger.debug(socketChannel.getRemoteAddress()+"连接成功 ...");
} else if (key.isReadable()){
processData(key);
}
it.remove();
}
}
}
private void processData (SelectionKey key) throws IOException
{
Client client = clientMap.get(key.channel());
SocketChannel socketChannel = (SocketChannel)key.channel();
int num = 0;
try {
buffer.clear();
while((num = socketChannel.read(buffer)) > 0){
if (null == client.fileName){
if(buffer.capacity() < 4){
continue;
}
int fileNameLen = buffer.getInt();
byte[] fileNameBytes = new byte[fileNameLen];
buffer.get(fileNameBytes);
String fileName = new String(fileNameBytes,charset);
File dirctory = new File(RECEIVE_PATH);
if (!dirctory.exists()){
dirctory.mkdirs();
}
Logger.info("NIO 传输目标dir : "+ dirctory);
client.fileName = fileName;
String fullName = dirctory.getAbsolutePath() + File.separator + fileName;
Logger.info("NIO 传输目标文件:" + fullName);
File file = new File(fullName.trim());
if (!file.exists()){
file.createNewFile();
}
FileChannel fileChannel = new FileOutputStream(file).getChannel();
client.outChannel = fileChannel;
if(buffer.capacity() < 8){
continue;
}
long fileLength = buffer.getLong();
client.fileLength = fileLength;
client.startTime = System.currentTimeMillis();
Logger.info("传输开始");
client.receiveLength += buffer.capacity();
if(buffer.capacity() > 0){
client.outChannel.write(buffer);
}if (client.isFinished()){
finished(key,client);
}
buffer.clear();
}else{
client.receiveLength += buffer.capacity();
client.outChannel.write(buffer);
if (client.isFinished()){
finished(key,client);
}
buffer.clear();
}
}key.cancel();
} catch (IOException e) {
key.cancel();
e.printStackTrace();
return ;
}
if(num == 0 ){
finished(key,client);
buffer.clear();
}
}
private void finished(SelectionKey key,Client client){
ImageOutputStreamImpl.closeQuietly(client.outChannel);
Logger.info("传输结束");
key.cancel();
Logger.debug("文件传输成功,File Name : "+ client.fileName);
Logger.debug("文件传输成功,File Length : "+ client.fileLength);
long endTime = System.currentTimeMillis();
Logger.debug("文件传输成功,File Speed : "+ client.fileLength/(endTime - client.startTime));
}
public static void main(String[] args) throws IOException{
NioReceiveServer server = new NioReceiveServer();
server.startServer();
}
}

Java编程 VII IO
http://gadoid.io/2025/05/07/Java编程-VII-IO/
作者
Codfish
发布于
2025年5月7日
许可协议