操作系统视角下的任务执行
在操作系统中,所有的程序都是以进程或线程的形式被调度执行的。
什么是进程?
进程是程序被加载到内存后的运行时实例,是操作系统进行资源分配和调度的基本单位。
进程负责管理其所需的各种资源,包括但不限于用户空间中的代码段、堆、栈、动态链接库,以及通过内核中的 task_struct
结构体关联的系统资源,例如命名空间、文件挂载点、网络接口、用户权限、信号处理、共享内存和信号量等。
什么是线程?
线程是进程内的一个执行单元,是操作系统调度的最小单位。
线程与进程的区别在于资源的共享方式:
多个线程可以共享同一个进程的用户空间资源(如内存、文件描述符、信号处理器等),但每个线程依然会拥有自己独立的 task_struct
、内核栈和调度上下文。
在线程创建时,可以通过 clone()
系统调用指定共享哪些资源,从而实现不同粒度的资源隔离或共享。
什么是任务调度?
操作系统通过 task_struct
结构体来描述每一个可调度实体(无论是进程还是线程)。
当一个进程或线程被创建时,内核会为其分配一个 task_struct
,并将其加入调度器的就绪队列中,等待 CPU 分配时间片进行执行。
多线程对调度的影响
由于每个线程都对应一个独立的 task_struct
,因此操作系统会将它们作为独立的调度实体进行调度。
例如:
假设系统中共有 10 个任务(调度实体)在等待 CPU,而某进程只包含一个线程,那么它获得 CPU 的概率约为 1/10。
当该进程创建了一个新线程后,它就拥有了两个 task_struct
,调度概率提升至约 2/11,从而提高了该进程整体的执行频率和并发能力。
这种方式尤其适用于多核处理器架构,能够实现多个线程在不同 CPU 核心上同时运行,提升整体性能和响应速度。
Python中的多任务处理
多线程
如上面所讲,多线程就是通过在系统层面使用clone指定共享资源区在内核创建新的task_struct 增加被系统调度的可能性,在python中,使用threading模块来构建python的多线程应用
1 2 3 4 5 6 7 8 9 10 11 12 13
| import threading def count(): for i in range (0,100): print(i)
session1 = threading.Thread(target=count,name="thread1") session2 = threading.Thread(target=count,name="thread2") session1.start() session2.start() session1.join() session2.join()
|
这时,session1和session2 分别正常输出,因为函数中所使用的变量都是在栈中创建的,而不同线程的栈空间是相互独立的,所以不存在影响。
调用同一个对象读写时 这时就会出现输出与目标输出不一致的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import threading import time
class count1 : def __init__(self,num): self.num = num a = count1(0) test_max = 10000 def count(count_t, max): while count_t.num < max: time.sleep(0.001) count_t.num += 1
session1 = threading.Thread(target=count,name="thread1",args=(a,test_max)) session2 = threading.Thread(target=count,name="thread2",args=(a,test_max)) session3 = threading.Thread(target=count,name="thread3",args=(a,test_max)) session4 = threading.Thread(target=count,name="thread4",args=(a,test_max)) session5 = threading.Thread(target=count,name="thread5",args=(a,test_max))
session1.start() session2.start() session3.start() session4.start() session5.start()
session1.join() session2.join() session3.join() session4.join() session5.join()
|
基于类继承实现多线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import threading import time class anum : def __init__(self,num): self.num = num class count1(threading.Thread): def __init__(self,num,max): super().__init__() self.num = num self.max = max def run(self): while self.num.num<self.max: time.sleep(0.001) self.num.num += 1 return self num1 = anum(0) session1 = count1(num1,1000) session2 = count1(num1,1000) session3 = count1(num1,1000) session4 = count1(num1,1000) session5 = count1(num1,1000) session1.start() session2.start() session3.start() session4.start() session5.start() session1.join() session2.join() session3.join() session4.join() session5.join() print(num1.num)
|
多进程
在Python中,支持两种多进程的创建方式 :
- 通过fork 创建进程 即复制当前进程创建一个进程
- spawn , 重新加载解释器创建进程
1 2 3
| import multiprocessing multiprocessing.set_start_method("spawn") multiprocessing.set_start_method("fork")
|
进程的创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from multiprocessing import Process import os
def worker(name): print(f"{name} running in PID: {os.getpid()}")
if __name__ == '__main__': processes = [] for i in range(5): p = Process(target=worker, args=(f'Process-{i}',)) processes.append(p) p.start()
for p in processes: p.join()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from multiprocessing import Process
class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name
def run(self): print(f"{self.name} running in PID: {os.getpid()}")
p = MyProcess("CustomProcess") p.start() p.join()
|
再次理解多进程/多线程
我们可以看到多进程或者多线程在python中的实现都是通过 传入函数-启动线程/进程对象,或者继承thread或process类,重写run方法来实现的。这本质上对应着‘通过载入函数/对象定义创建可调度体’这一过程
同时要注意的是,Python文件中定义的线程/进程创建过程并不能与执行后的行为直接关联。对于线程来说,创建过程是 加载Python文件到内存中,创建线程,创建后的所有线程仍然可以使用文件中定义的对象。但是需要注意线程安全问题。而对于进程来说,当进程被创建后,在系统层面上是创建了一个原解释器的副本(fork) 或者 一个新的python解释器进程(spawn)。这时新进程已经和原进程的内存空间进行了隔离。相当于将 Python文件的副本加载到了两个进程中,各自的后续修改和使用已经完全隔离,如果需要进程交互,则需要进行一些进程间通信的设计。
协程
协程是一种异步编程设计,首先它将定义好的任务顺序执行到等待点,如何将任务注册到任务等待队列中当条件达成后,唤醒等待队列中的任务继续执行。协程不依赖系统时间片调度,而采用协作式调度模式(cooperative scheduling)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import asyncio
async def task(name, delay): print(f"{name} 开始执行") await asyncio.sleep(delay) print(f"{name} 执行结束(等待了 {delay} 秒)")
async def main(): await asyncio.gather( task("任务A", 2), task("任务B", 1), task("任务C", 3) )
asyncio.run(main())
|
多任务处理的局限性
多线程的问题- 线程安全问题
多线程可以提高程序被执行的效率,但是在经过CPU执行后回写的过程中,因为线程是共享内存资源的,所以会存在资源的写入问题。
程序的实际运行过程并非简单的执行-写入。而是分步骤:
- 加载内存器中的值到寄存器
- 寄存器执行运算,返回运算结果
- 将运算结果写回内存
而在多个线程执行时,如果都对同一块内存进行处理,这个结果是不可预知的,会出现:
- 加载内存器中的值到寄存器(线程1)
- 寄存器执行运算,返回运算结果(线程1)
- CPU资源被抢占,发生上下文切换,切换至线程2执行
- 加载内存器中的值到寄存器(线程2)
- 寄存器执行运算,返回运算结果(线程2)
- 将运算结果写回内存,线程2退出,切换至线程1执行
- 将运算结果写回内存(线程1)
这可能导致数据被覆盖,造成操作丢失(即‘写冲突’)
解决方法
锁
锁是一种由操作系统定义的同步原语。保护了一个线程在访问内存时,将其他尝试获取该资源的线程阻塞,直到处理资源的线程释放它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import threading import time
class count1 : def __init__(self,num): self.lock = threading.Lock() self.num = num a = count1(0) test_max = 10000 def count(count_t, max): while count_t.num < max: time.sleep(0.001) count_t.lock.acquire() if count_t.num >= max : count_t.lock.release() break count_t.num += 1 count_t.lock.release() session1 = threading.Thread(target=count,name="thread1",args=(a,test_max)) session2 = threading.Thread(target=count,name="thread2",args=(a,test_max)) session3 = threading.Thread(target=count,name="thread3",args=(a,test_max)) session4 = threading.Thread(target=count,name="thread4",args=(a,test_max)) session5 = threading.Thread(target=count,name="thread5",args=(a,test_max))
session1.start() session2.start() session3.start() session4.start() session5.start()
session1.join() session2.join() session3.join() session4.join() session5.join()
|
还有如信号量,允许设置当前最多可以执行的线程数量
1 2 3 4
| sem_t = threading.Semaphore(1) sem_t.acquire() # 请求信号量 sem_t.release() # 释放信号量
|
以及一些其他的用来保证线程安全的设计
多线程问题 - 数据通信
对于多个进程来说,因为每个进程的管理的资源是相互独立的,所以就不存在执行结果被其他进程覆盖的可能性。但是又会出现另一个问题,因为即使多个进程之间的操作是互相独立的,不同进程间需要通过一些方法进行数据传递,才能完成通信。
队列
队列是通过定义了一个数据结构,在两个进程间完成收发
1 2 3 4 5 6 7 8 9 10 11 12
| from multiprocessing import Process, Queue
def producer(q): q.put("你好")
def consumer(q): print("收到:", q.get())
q = Queue() Process(target=producer, args=(q,)).start() Process(target=consumer, args=(q,)).start()
|
管道
管道定义了点对点的 进程间通信方式
1 2 3 4 5 6 7 8 9 10
| from multiprocessing import Process, Pipe
def worker(conn): conn.send("hello") conn.close()
parent_conn, child_conn = Pipe() Process(target=worker, args=(child_conn,)).start() print(parent_conn.recv())
|
一些共享内存页/映射表 比如Value,Array或者SharedMemory
1 2 3 4 5 6 7 8 9 10 11
| from multiprocessing import Value, Process
def add(v): v.value += 1
v = Value('i', 0) p = Process(target=add, args=(v,)) p.start() p.join() print(v.value)
|
基于系统的文件描述符/缓冲结构
文件: 一个进程向系统写入文件,另一个进程接收文件并读取
Socket: 套接字,通过网络协议栈转发
不过实现原理都是基于操作系统中的一些内存映射机制,或者实现在内核内的缓存机制来完成的。任务处理设计整体的使用场景较为复杂,且很多都依赖于实际场景的设计实现,后期再进一步扩充这一部分内容。