Python编程 VI 多任务处理

操作系统视角下的任务执行

在操作系统中,所有的程序都是以进程或线程的形式被调度执行的。

什么是进程?

进程是程序被加载到内存后的运行时实例,是操作系统进行资源分配和调度的基本单位。

进程负责管理其所需的各种资源,包括但不限于用户空间中的代码段、堆、栈、动态链接库,以及通过内核中的 task_struct 结构体关联的系统资源,例如命名空间、文件挂载点、网络接口、用户权限、信号处理、共享内存和信号量等。

什么是线程?

线程是进程内的一个执行单元,是操作系统调度的最小单位。

线程与进程的区别在于资源的共享方式:

多个线程可以共享同一个进程的用户空间资源(如内存、文件描述符、信号处理器等),但每个线程依然会拥有自己独立的 task_struct、内核栈和调度上下文。

在线程创建时,可以通过 clone() 系统调用指定共享哪些资源,从而实现不同粒度的资源隔离或共享。

什么是任务调度?

操作系统通过 task_struct 结构体来描述每一个可调度实体(无论是进程还是线程)。

当一个进程或线程被创建时,内核会为其分配一个 task_struct,并将其加入调度器的就绪队列中,等待 CPU 分配时间片进行执行。

多线程对调度的影响

由于每个线程都对应一个独立的 task_struct,因此操作系统会将它们作为独立的调度实体进行调度。

例如:

假设系统中共有 10 个任务(调度实体)在等待 CPU,而某进程只包含一个线程,那么它获得 CPU 的概率约为 1/10。

当该进程创建了一个新线程后,它就拥有了两个 task_struct,调度概率提升至约 2/11,从而提高了该进程整体的执行频率和并发能力。

这种方式尤其适用于多核处理器架构,能够实现多个线程在不同 CPU 核心上同时运行,提升整体性能和响应速度。

Python中的多任务处理

多线程

如上面所讲,多线程就是通过在系统层面使用clone指定共享资源区在内核创建新的task_struct 增加被系统调度的可能性,在python中,使用threading模块来构建python的多线程应用

1
2
3
4
5
6
7
8
9
10
11
12
13
#示例1 创建函数,将函数添加到多线程对象中启动多线程
import threading
def count():
for i in range (0,100):
print(i)

session1 = threading.Thread(target=count,name="thread1") #创建线程1,指定线程1执行的方法
session2 = threading.Thread(target=count,name="thread2") #创建线程2,指定线程2执行的方法
session1.start() #创建c层面的线程结构
session2.start() #创建c层面的线程结构
session1.join() #执行线程1
session2.join() #执行线程2
# session1和session2 分别正常输出

这时,session1和session2 分别正常输出,因为函数中所使用的变量都是在栈中创建的,而不同线程的栈空间是相互独立的,所以不存在影响。

调用同一个对象读写时 这时就会出现输出与目标输出不一致的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#示例2 创建一个类,通过操作类中的属性,来查看多线程操作时可能出现的问题
import threading
import time

class count1 :
def __init__(self,num):
self.num = num
a = count1(0)
test_max = 10000
def count(count_t, max):
while count_t.num < max:
time.sleep(0.001) #增加执行时间,增加线程切换的可能性
count_t.num += 1

session1 = threading.Thread(target=count,name="thread1",args=(a,test_max))
session2 = threading.Thread(target=count,name="thread2",args=(a,test_max))
session3 = threading.Thread(target=count,name="thread3",args=(a,test_max))
session4 = threading.Thread(target=count,name="thread4",args=(a,test_max))
session5 = threading.Thread(target=count,name="thread5",args=(a,test_max))

session1.start()
session2.start()
session3.start()
session4.start()
session5.start()

session1.join()
session2.join()
session3.join()
session4.join()
session5.join()

# 输出 10005(不固定)

基于类继承实现多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 示例3 基于类继承实现多线程
import threading
import time
class anum :
def __init__(self,num):
self.num = num

class count1(threading.Thread):
def __init__(self,num,max):
super().__init__()
self.num = num
self.max = max
def run(self):
while self.num.num<self.max:
time.sleep(0.001)
self.num.num += 1
return self
num1 = anum(0)
session1 = count1(num1,1000)
session2 = count1(num1,1000)
session3 = count1(num1,1000)
session4 = count1(num1,1000)
session5 = count1(num1,1000)
session1.start()
session2.start()
session3.start()
session4.start()
session5.start()
session1.join()
session2.join()
session3.join()
session4.join()
session5.join()
print(num1.num)
# 最终结果同样存在问题

多进程

在Python中,支持两种多进程的创建方式 :

  1. 通过fork 创建进程 即复制当前进程创建一个进程
  2. spawn , 重新加载解释器创建进程
1
2
3
import multiprocessing
multiprocessing.set_start_method("spawn") # 设置解释器创建进程
multiprocessing.set_start_method("fork") # 设置fork创建进程

进程的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 使用Process 传入worker函数在新进程中执行
from multiprocessing import Process
import os

def worker(name):
print(f"{name} running in PID: {os.getpid()}")

if __name__ == '__main__':
processes = []
for i in range(5):
p = Process(target=worker, args=(f'Process-{i}',))
processes.append(p)
p.start()

for p in processes:
p.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 继承Process类 , 重写run方法创建进程
from multiprocessing import Process

class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name

def run(self):
print(f"{self.name} running in PID: {os.getpid()}")

p = MyProcess("CustomProcess")
p.start()
p.join()

再次理解多进程/多线程

我们可以看到多进程或者多线程在python中的实现都是通过 传入函数-启动线程/进程对象,或者继承thread或process类,重写run方法来实现的。这本质上对应着‘通过载入函数/对象定义创建可调度体’这一过程

同时要注意的是,Python文件中定义的线程/进程创建过程并不能与执行后的行为直接关联。对于线程来说,创建过程是 加载Python文件到内存中,创建线程,创建后的所有线程仍然可以使用文件中定义的对象。但是需要注意线程安全问题。而对于进程来说,当进程被创建后,在系统层面上是创建了一个原解释器的副本(fork) 或者 一个新的python解释器进程(spawn)。这时新进程已经和原进程的内存空间进行了隔离。相当于将 Python文件的副本加载到了两个进程中,各自的后续修改和使用已经完全隔离,如果需要进程交互,则需要进行一些进程间通信的设计。

协程

协程是一种异步编程设计,首先它将定义好的任务顺序执行到等待点,如何将任务注册到任务等待队列中当条件达成后,唤醒等待队列中的任务继续执行。协程不依赖系统时间片调度,而采用协作式调度模式(cooperative scheduling)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio

async def task(name, delay): #定义了一个异步函数
print(f"{name} 开始执行") #前半部分执行
await asyncio.sleep(delay) #await将栈帧挂起
print(f"{name} 执行结束(等待了 {delay} 秒)") #等待过后继续执行

async def main():
# 并发运行多个协程
await asyncio.gather(
task("任务A", 2),
task("任务B", 1),
task("任务C", 3)
)

asyncio.run(main())
# 定义了多个协程执行。
# 其执行过程为 taskA , taskB, taskC 被python解释器解释,按次序执行,执行到await后被挂起,注册到事件循环的“挂起任务队列”中

多任务处理的局限性

多线程的问题- 线程安全问题

多线程可以提高程序被执行的效率,但是在经过CPU执行后回写的过程中,因为线程是共享内存资源的,所以会存在资源的写入问题。

程序的实际运行过程并非简单的执行-写入。而是分步骤:

  • 加载内存器中的值到寄存器
  • 寄存器执行运算,返回运算结果
  • 将运算结果写回内存

而在多个线程执行时,如果都对同一块内存进行处理,这个结果是不可预知的,会出现:

  • 加载内存器中的值到寄存器(线程1)
  • 寄存器执行运算,返回运算结果(线程1)
  • CPU资源被抢占,发生上下文切换,切换至线程2执行
  • 加载内存器中的值到寄存器(线程2)
  • 寄存器执行运算,返回运算结果(线程2)
  • 将运算结果写回内存,线程2退出,切换至线程1执行
  • 将运算结果写回内存(线程1)

这可能导致数据被覆盖,造成操作丢失(即‘写冲突’)

解决方法

锁是一种由操作系统定义的同步原语。保护了一个线程在访问内存时,将其他尝试获取该资源的线程阻塞,直到处理资源的线程释放它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import threading
import time

class count1 :
def __init__(self,num):
self.lock = threading.Lock()
self.num = num
a = count1(0)
test_max = 10000
def count(count_t, max):
while count_t.num < max:
time.sleep(0.001)
count_t.lock.acquire()
if count_t.num >= max :
count_t.lock.release()
break
count_t.num += 1
count_t.lock.release()
session1 = threading.Thread(target=count,name="thread1",args=(a,test_max))
session2 = threading.Thread(target=count,name="thread2",args=(a,test_max))
session3 = threading.Thread(target=count,name="thread3",args=(a,test_max))
session4 = threading.Thread(target=count,name="thread4",args=(a,test_max))
session5 = threading.Thread(target=count,name="thread5",args=(a,test_max))

session1.start()
session2.start()
session3.start()
session4.start()
session5.start()

session1.join()
session2.join()
session3.join()
session4.join()
session5.join()

还有如信号量,允许设置当前最多可以执行的线程数量

1
2
3
4
sem_t = threading.Semaphore(1)
sem_t.acquire() # 请求信号量
sem_t.release() # 释放信号量

以及一些其他的用来保证线程安全的设计

多线程问题 - 数据通信

对于多个进程来说,因为每个进程的管理的资源是相互独立的,所以就不存在执行结果被其他进程覆盖的可能性。但是又会出现另一个问题,因为即使多个进程之间的操作是互相独立的,不同进程间需要通过一些方法进行数据传递,才能完成通信。

队列

队列是通过定义了一个数据结构,在两个进程间完成收发

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process, Queue

def producer(q):
q.put("你好")

def consumer(q):
print("收到:", q.get())

q = Queue()
Process(target=producer, args=(q,)).start()
Process(target=consumer, args=(q,)).start()

管道

管道定义了点对点的 进程间通信方式

1
2
3
4
5
6
7
8
9
10
from multiprocessing import Process, Pipe

def worker(conn):
conn.send("hello")
conn.close()

parent_conn, child_conn = Pipe()
Process(target=worker, args=(child_conn,)).start()
print(parent_conn.recv())

一些共享内存页/映射表 比如Value,Array或者SharedMemory

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Value, Process

def add(v):
v.value += 1

v = Value('i', 0) # 'i' = int # 实际上是直接使用了内核中的内存存储了这个变量
p = Process(target=add, args=(v,))
p.start()
p.join()
print(v.value)

基于系统的文件描述符/缓冲结构

文件: 一个进程向系统写入文件,另一个进程接收文件并读取

Socket: 套接字,通过网络协议栈转发

不过实现原理都是基于操作系统中的一些内存映射机制,或者实现在内核内的缓存机制来完成的。任务处理设计整体的使用场景较为复杂,且很多都依赖于实际场景的设计实现,后期再进一步扩充这一部分内容。


Python编程 VI 多任务处理
http://gadoid.io/2025/04/16/Python编程-VI-多任务处理/
作者
Codfish
发布于
2025年4月16日
许可协议