IO
IO 是系统中“输入/输出”(Input/Output)的简称,通常指代操作系统执行读写操作的整个过程。在操作系统中,用户程序和内核程序之间存在严格的权限隔离,分别处于“用户态”和“内核态”。
- 用户态主要用于运行用户加载到内存中的应用程序,包括数据处理和逻辑执行。
- 内核态则负责管理系统资源,其核心是以
task_struct
为代表的数据结构,用于描述进程信息,并与内核各个子系统(如文件系统、内存管理、设备驱动等)交互,同时记录用户进程与内核资源之间的绑定关系。
当用户程序需要执行读写等 IO 操作时,会通过系统调用接口发起请求。这一请求会引发权限切换,即从用户态“陷入”(trap)到内核态,由内核代表应用程序操作底层资源(如文件、硬盘、内存),以保障系统安全性和稳定性,防止用户程序直接访问或破坏关键内存区域。
一个典型的文件写入流程如下:
- 用户程序创建文件抽象对象,内核创建底层文件资源;
- 通过系统调用进入内核态,由内核创建相应的底层文件结构,并返回一个 文件描述符 (file descriptor);
- 用户程序使用该描述符,在用户态创建流对象用于数据写入;
- 当写入发生时,再次触发系统调用,要求内核将数据写入对应的文件或设备;
- 数据从用户态复制到内核态缓冲区,再由内核完成实际的写入;
- 如果是读取操作,则由内核将文件内容从磁盘加载到内核缓冲区,再复制回用户态内存;
- 用户态程序随后可以对这些数据进行字节流或字符流的处理。
由上述流程我们可以看到 I/O 流程存在以下性能瓶颈 :
- 阻塞 阻塞是指应用程序在执行 I/O 操作时,如果数据还未准备好(如文件未读完、磁盘还在寻址、网络数据还未到达),进程会进入 等待状态 ,无法继续执行,直到 I/O 操作完成为止。
- 用户态与内核态的切换属于一种“上下文切换”,指操作系统在执行系统调用时,需要保存当前用户态的执行环境(寄存器、程序计数器、栈指针等),切换到内核态并加载内核的运行环境。
- 在用户态和内核态之间的数据交互并不是“共享内存”,而是通过 显式复制(copy)实现。通常发生在读写文件或网络数据时。
IO模型
同步阻塞IO
用户空间 主动发起,需要等待内核IO 操作彻底完成才返回到用户空间的IO
流程
- 发起启动调用read,用户态线程阻塞
- 内核收到read系统调用,进行数据准备。
- 内核等到完整数据到达,将数据从内核缓冲区拷贝到用户缓冲区
- 内核返回后用户线程解除阻塞状态,重新运行
优点 : 实现简单
缺点 :每个连接都使用自己的线程进行IO 操作,高并发场景下 切换开销和阻塞会导致性能很低
内核态准备数据和复制数据是同步进行的,直到数据可读后才返回
同步非阻塞IO
用户空间程序不需要等待内核IO操作彻底完成,可以立即返回用户空间执行后续命令。
流程
- 用户线程 向 内核发起查询请求,查询内核缓冲区是否存在数据
- 发起系统调用,查询内核缓冲区是否存在数据,如果不存在,返回一个调用失败信息
- 如果存在数据,将内核缓冲区中的数据复制到用户缓冲区,这时阻塞用户态线程,直到复制完成返回系统调用成功
优点 : 通过不断查询检查缓存状态,不需要阻塞线程
缺点 :不断的向内核请求查询会增加上下文切换开销
IO多路复用
通过select/epoll 使得一个线程可以同时监听多个资源描述符,一旦某个就绪后就可以将对应的资源描述符返回,供用户空间程序使用
流程
- 选择器注册。将需要read操作的 目标文件描述符提前注册到Linux的select/epoll选择器中,
- 就绪状态的轮询。通过选择器的查询方法,查询所有提前注册过的目标文件描述符I/O就绪状态。返回一个就绪的socket列表和就绪事件,提示用户线程I/O就绪
- 用户线程得到了就绪的socket列表,向对应的连接发起系统调用,阻塞线程,从内核复制数据,将数据从内核缓冲区复制到用户缓冲区
- 复制完成后,内核返回结果,用户线程解除阻塞状态,用户线程读取到了数据继续执行。
优点 :一个线程可以同时处理许多读写连接,使得用户不必维护大量的线程
缺点 :select 在监听事件时 仍然是阻塞的。但 epoll 支持边缘触发,可结合非阻塞通道实现高性能 IO
异步IO
内核空间成为主动调用者,当资源准备就绪时向用户线程通知 资源就绪
流程
- 用户发起read系统调用,不阻塞后续逻辑
- 内核开始I/O 第一个阶段,准备数据,内核将数据从内核复制到用户缓冲区
- 内核向用户线程发送信号,通知用户线程数据已经复制到缓冲区
- 用户线程读取缓冲区数据,继续后续任务
优点 : IO过程非阻塞,真正的异步调用
缺点 : 需要进行事务的注册与接收,需要操作系统支持
NIO
java 提供了非阻塞IO 库 NIO
核心组件
Channel 通道
Buffer 缓冲区
Selector 选择器
缓冲区(Buffer)
Buffer 是一个非线程安全类
capacity 读写容量 设置buffer 创建的类型数组的最大容量
position 记录读写位置 在读写过程中记录当前写入/读出的位置
limit 读写的最大上限 记录实际写入的容量/实际读取的最大内容
mark 读写位置记录缓存 暂时存储position的位置
allocate()
1 2 3 4 5 6 7 8 9
| public class UserBuffer { static IntBuffer intBuffer = null; public static void allocateTest(){ intBuffer = IntBuffer.allocate(20); ... } }
|
put()
1 2 3 4 5 6 7 8 9 10 11 12
| public class UserBuffer { static IntBuffer intBuffer = null; public static void putTest() { for(int i = 0 ; i < 5 ; i++) { intBuffer.put(i); } } }
|
flip()
1 2 3 4 5 6 7 8 9
| public class UserBuffer { static IntBuffer intBuffer = null; public static void flipTest() { intBuffer.flip(); } }
|
get()
1 2 3 4 5 6 7 8 9 10 11 12
| public class UserBuffer { static IntBuffer intBuffer = null; public static void getTest() { for(int i = 0 ; i < 2 ; i++){ intBuffer.get(); } } }
|
rewind()
1 2 3 4 5 6 7 8 9 10
| public class UserBuffer { static IntBuffer intBuffer = null; public static void rewindTest() { intBuffer.rewind(); } }
|
mark()和 reset()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class UserBuffer { static IntBuffer intBuffer = null ; public static void reRead(){ for (int i = 0 ; i < 5 ; i++){ if(i == 2){ intBuffer.mark(); } int j = intBuffer.get(); } } public static void afterReset(){ intBuffer.reset(); for(int i = 2 ; i< 5 ; i++){ int j = intBuffer.get(); } } }
|
clear()
1 2 3 4 5 6 7
| public class UserBuffer { static IntBuffer intBuffer = null ; public static void clearDemo(){ intBuffer.clear(); } }
|
通道 (Channel )
FileChannel 文件通道
SocketChannel 套接字通道
ServerSocketChannel 服务器套接字通道
DatagramChannel 数据报通道
FileChannel 文件通道
创建通道对象
1 2 3 4 5 6 7
| FileInputStream fis = new FileInputStream(srcFile); FileChannel inChannel = fis.getChannel(); FileOutputStream fos = new FileOutputStream(destFile); FileChannel outChannel = fos.getChannel();
RandomAccessFile rFile = new RandomAccessFile("filename.txt","rw"); FileChannel channel = rFile.getChannel();
|
读取
1 2 3 4 5 6 7
| RandomAccessFile aFile = new RandomAccessFile(fileName,"rw"); FileChannel channel = aFile.getChannel(); ByteBuffer buf = ByteBuffer.allocate(CAPACITY); int length = -1 while((length = channel.read(buf)) != -1){ ... }
|
写入
1 2 3 4 5
| buf.flip(); int outlength = 0; while((outlength = outChannel.write(buf)) != 0){ System.out.println("写入字节数:" + outlength); }
|
关闭通道
1 2 3
| channel.close();
channel.force(true);
|
案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public class FileNIOCopyDemo{ public static void main(String[] args){ nioCopyResourceFile(); } public static void nioCopyResouceFile(){ String sourcePath = NioDemoConfig.FILE_RESOUCE_SRC_PATH; String srcPath = IOUtil.getResoucePath(sourcePath); Logger.info("srcPath = " + srcPath);
String destPath = NioDemoConfig.FILE_RESOURCE_DEST_PATH; String destDecodePath = IOUtil.builderResourcePath(destPath); Logger.info("destDecodePath=" + destDecodePath); nioCopyFile(srcDecodePath , destDecodePath); }
public static void nioCopyFile(String srcPath , String destPath){ File srcFile = new File(srcPath); File destFile = new File(destPath); try { if(!destFile.exists()){ destFile.createNewFile(); } long startTime = System.currentTimeMillis(); FileInputStream fis = null; FileOutputStream fos = null ; FileChannel inChannel = null ; FileChannel outChannel = null ; try { fis = new FileInputStream(srcFile); fos = new FileOutputStream(destFile); inChannel = fis.getChannel(); outChannel = fos.getChannel(); int length = -1; ByteBuffer buf = ByteBuffer.allocate(1024); while((length = inChannel.read(buf)) != -1) { buf.flip(); int outlength = 0; while (( outlength = outChannel.write(buf))!=0 ){ buf.clear(); } outChannel.force(true); }finally{ IOUtil.closeQuietly(outChannel); IOUtil.closeQuietly(fos); IOUtil.closeQuietly(inchannel); IOUtil.closeQuietly(fis); } long endTime = System.currentTimeMillis(); Logger.info("base复制毫秒数:" + (endTime - startTime)); }catch (IOException e){ e.printStackTrace(); } }
|
SocketChannel
创建隧道
1 2 3 4 5 6 7 8
| SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("127.0.0.1",80)); while(! socketChannel.finishConnect()){ } ServerSocketChannel server = (ServerSocketChannel) key.channel()); SocketChannel socketChannel = server.accept(); socketChannel.configureBlocking(false);
|
读取通道
1 2
| ByteBuffer buf = ByteBuffer.allocate(1024); int bytesRead = socketChannel.read(buf);
|
写入
1 2
| buffer.flip(); socketChannel.write(buffer);
|
关闭
1 2
| socketChannel.shutdownOutput(); IOUtil.closeQuietly(socketChannel);
|
案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| public class NioSendClient{
private Charset charset = Charset.forName("UTF-8"); public void sendFile() { try { String sourcePath = NioDemoConfig.SOCKET_SEND_FILE; String srcPath = IOUtil.getResourcePath(sourcePath); Logger.debug("srcPath=" + srcPath); String destFile = NioDemoConfig.SOCKET_RECEIVE_FILE; Logger.debug("destFile=" + destFile); File file = new File(srcPath); if (!file.exists()){ Logger.debug("文件不存在"); return ; } FileChannel fileChannel = new FileInputStream(file).getChannel(); SocketChannel socketChannel = SocketChannel.open(); socketChannel.socket().connect(new InetSocketAddress("127.0.0.1",18899)); socketChannel.configureBlocking(false); Logger.debug("Client 已连接"); while (!socketChannel.finishConnect()){ ... } ByteBuffer buffer = sendFileNameAndLength(destFile, file,socketChannel); int length = sendContent(file,fileChannel,socketChannel,buffer); if (length == -1){ IOUtil.closeQuietly(fileChannel); socketChannel.shutdownOutput(); IOUtil.closeQuietly(socketChannel); } Logger.debug(" 文件传输成功 "); } catch (Exception e){ e.printStackTrace(); } } public int sendContent(File file , FileChannel fileChannel, SocketChannel socketChannel, ByteBuffer buffer) throws IOException{ Logger.debug("开始传输文件"); int length = 0; long progess = 0 ; while ((length = fileChannel.read(buffer)) >0 ) { buffer.flip(); socketChannel.write(buffer); buffer.clear(); progess += length ; Logger.debug("| + " (100 * progress/ file.length()) + "%|"); } return length; } public ByteBuffer sendFileNameAndLength(String destFile, File file , SocketChannel socketChannel) throws IOException{ ByteBuffer fileNameByteBuffer = charset.encode(destFile); ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE); int fileNameLen = fileNameByteBuffer.capacity(); buffer.putInt(fileNameLen); buffer.flip(); socketChannel.write(buffer); buffer.clear(); Logger.info("发送完成" , fileNameLen) ; socketChannel.write(fileNameByteBuffer); Logger.info("Client 文件名称发送完成", destFile); buffer.putLong(file.length()); buffer.flip(); socketChannel.write(buffer); buffer.clear(); Logger.info("Client 文件长度发送完成:" , file.length()); return buffer ; }
}
|
DatagramChannel
获取
1 2 3 4
| DatagramChannel channel = DatagramChannel.open(); datagramChannel.configureBlocking(false);
channel.socket().bind(new InetSocketAddress(18080));
|
读取
1 2
| ByteBuffer buf = ByteBuffer.allocate(1024); SocketAddress clientAddr = datagramChannel.receive(buf);
|
写入
1 2 3
| buffer.flip(); dChannel.send(buffer,new InetSocketAddress("127.0.0.1",18899)); buffer.clear();
|
关闭
案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| public class UDPClient{ public void send() throws IOException{ DatagramChannel dChannel = DatagramChannel.open(); dChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE); Scanner scanner = new Scanner(System.in); Print.tcfo("请输入发送内容"); while(scanner.hasNext()){ String next = scanner.next(); buffer.put((Dateutil.getNow() + ">>" + next).getBytes(); buffer.flip(); dChannel.send(buffer,new InetSocketAddress("127.0.0.1",18899)); buffer.clear(); } dChannel.close(); } public static void main(String[] args) throws IOException{ new UDPClient().send(); } }
public class UDPServer{ public void receive() throws IOException{ DatagramChannel datagramChannel = DatagramChannel.open(); datagramChannel.configureBlocking(false); datagramChannel.bind(new InetSocketAddress("127.0.0.1",18899)); Print.tcfo("UDP 服务器启动成功"); Selector selector = Selector.open(); datagramChannel.register(selector, SelectionKey.OP_READ); while(selector.select()>0){ Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE); while(iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); if(selectionKey.isReadable()){ SocketAddress client = datagramChannel.receive(buffer); buffer.flip(); Print.tcfo(new String(buffer.array(),0,buffer.limit)); buffer.clear(); } } iterator.remove(); } selector.close(); datagramChannel.close(); } public static void main(String[] args) throws IOException{ new UDPServer().receive(); }
}
|
选择器 (Selector)
将通道注册到选择器,通过选择器监听IO事件
1 可读 SelectionKey.OP_READ ( 通道有数据可读 )
2 可写 SelectionKey.OP_WRITE ( 通道有数据可写 )
3 连接 SelectionKey.OP_CONNECT ( 连接建立 )
4 接收 SelectionKey.OP_ACCEPT ( 有新的连接到来 )
创建 selector
1 2 3 4 5 6 7 8 9 10
| Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(18899));
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| while (selector.select() > 0){ Set selectedKeys = selector.selectedKeys(); Iterator keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()){ SelectionKey key = keyIterator.next(); if(key.isAcceptable()){ } else if (key.isConnectable()){ } else if (key.isReadable()){ } else if (key.isWritable()){ } keyIterator.remove(); }
}
|
实现discard服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
| public class NioDiscardClient{ public static void startServer() throws IOException{ Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(18899)); Logger.info("服务器启动"); serverSocketChannel.register(selector,selectionKey.OP_ACCEPT); while(selector.select()>0){ Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (selectedKeys.hasNext()){ SelectionKey selectedKey = selectedKeys.next(); if(selectedKey.isAcceptable()){ SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); }else if (selectedKey.isReadable()){ SocketChannel socketChannel = (SocketChannel) selectedKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int length = 0 ; while((length = socketChannel.read(byteBuffer))>0){ byteBuffer.flip(); Logger.info(new String(byteBuffer.array(),0,length)); byteBuffer.clear(); } socketChannel.close(); } selectedKeys.remove(); } } serverSocketChannel.close(); } public static void main(String[] args) throws IOException{ startServer(); }
}
public class NioReceiveServer{ private static final String RECEIVE_PATH = NioDemoConfig.SOCKET_RECEIVE_PATH; private Charset charset = Charset.forName("UTF-8"); static class Client { String fileName ; long fileLength ; long startTime ; InetSocketAddress remoteAddress ; FileChannel outChannel ; long receiveLength ; public boolean isFinished(){ return receiveLength >= fileLength ; } }
private ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SERVER_BUFFER_SIZE); Map<SelectableChannel , Client> clientMap = new HashMap<SelectableChannel , Client>(); public void startServer() throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverChannel.socket(); serverSocket.configureBlocking(false); InetSocketAddress address = new InetSocketAddress(18899); serverSocket.bind(address); serverChannel.register(selector , SelectionKey.OP_ACCEPT); Print.tcfo("server is listening ..."); while(selector.select() >0 ){ Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey key = it.next(); if(key.isAcceptable()){ ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel socketChannel = server.accept(); if (socketChannel == null) continue ; socketChannel.configureBlocking(false); SelectionKey selectionKey = socketChannel.register(selector,SelectionKey.OP_READ); Client client = new Client(); client.remoteAddress = (InetSocketAddress)socketChannel.getRemoteAddress(); clientMap.put(socketChannel,client); Logger.debug(socketChannel.getRemoteAddress()+"连接成功 ..."); } else if (key.isReadable()){ processData(key); } it.remove(); } } } private void processData (SelectionKey key) throws IOException { Client client = clientMap.get(key.channel()); SocketChannel socketChannel = (SocketChannel)key.channel(); int num = 0; try { buffer.clear(); while((num = socketChannel.read(buffer)) > 0){ if (null == client.fileName){ if(buffer.capacity() < 4){ continue; } int fileNameLen = buffer.getInt(); byte[] fileNameBytes = new byte[fileNameLen]; buffer.get(fileNameBytes); String fileName = new String(fileNameBytes,charset); File dirctory = new File(RECEIVE_PATH); if (!dirctory.exists()){ dirctory.mkdirs(); } Logger.info("NIO 传输目标dir : "+ dirctory); client.fileName = fileName; String fullName = dirctory.getAbsolutePath() + File.separator + fileName; Logger.info("NIO 传输目标文件:" + fullName); File file = new File(fullName.trim()); if (!file.exists()){ file.createNewFile(); } FileChannel fileChannel = new FileOutputStream(file).getChannel(); client.outChannel = fileChannel; if(buffer.capacity() < 8){ continue; } long fileLength = buffer.getLong(); client.fileLength = fileLength; client.startTime = System.currentTimeMillis(); Logger.info("传输开始"); client.receiveLength += buffer.capacity(); if(buffer.capacity() > 0){ client.outChannel.write(buffer); }if (client.isFinished()){ finished(key,client); } buffer.clear(); }else{ client.receiveLength += buffer.capacity(); client.outChannel.write(buffer); if (client.isFinished()){ finished(key,client); } buffer.clear(); } }key.cancel(); } catch (IOException e) { key.cancel(); e.printStackTrace(); return ; } if(num == 0 ){ finished(key,client); buffer.clear(); } } private void finished(SelectionKey key,Client client){ ImageOutputStreamImpl.closeQuietly(client.outChannel); Logger.info("传输结束"); key.cancel(); Logger.debug("文件传输成功,File Name : "+ client.fileName); Logger.debug("文件传输成功,File Length : "+ client.fileLength); long endTime = System.currentTimeMillis(); Logger.debug("文件传输成功,File Speed : "+ client.fileLength/(endTime - client.startTime)); } public static void main(String[] args) throws IOException{ NioReceiveServer server = new NioReceiveServer(); server.startServer(); } }
|