找回密码
 立即注册
首页 业界区 安全 Python 并发编程

Python 并发编程

忌才砟 2025-9-25 20:59:05
Python 并发编程是提升程序执行效率的核心技术,尤其在处理多任务场景(如网络请求、数据计算、文件 IO 等)时至关重要。
1、threading与线程池

多线程是 Python 中最常用的并发方式之一,通过创建多个线程实现任务并行执行。但受GIL(全局解释器锁) 限制,同一时刻只有一个线程执行 Python 字节码
1.1 GIL

GIL 是 Python 解释器的一种机制,确保同一时刻只有一个线程执行 Python 代码。其影响:

  • IO 密集型任务:线程等待 IO 时会释放 GIL,其他线程可执行,因此多线程有效(如网络请求时,线程阻塞等待响应,GIL 释放给其他线程)。
  • CPU 密集型任务:线程持续占用 CPU,GIL 无法释放,多线程反而因切换开销降低效率(此时应选多进程)。
1.2 线程的创建与基本使用

通过threading.Thread类创建线程,核心参数:

  • target:线程执行的函数。
  • args/kwargs:传递给函数的参数。
  • name:线程名称(用于调试)。
基础线程创建与执行
  1. import threading
  2. import time
  3. def task(name, delay):
  4.     """线程执行的任务:模拟IO操作(如网络请求)"""
  5.     print(f"线程 {name} 启动,延迟 {delay} 秒")
  6.     time.sleep(delay)  # 模拟IO阻塞(此时GIL释放,其他线程可执行)
  7.     print(f"线程 {name} 完成")
  8. # 创建线程
  9. thread1 = threading.Thread(target=task, args=("Thread-1", 2), name="T1")
  10. thread2 = threading.Thread(target=task, args=("Thread-2", 1), name="T2")
  11. # 启动线程
  12. thread1.start()
  13. thread2.start()
  14. # 等待线程完成(主线程阻塞,直到子线程结束)
  15. thread1.join()
  16. thread2.join()
  17. print("所有线程执行完毕")
复制代码
输出
  1. 线程 Thread-1 启动,延迟 2 秒
  2. 线程 Thread-2 启动,延迟 1 秒
  3. 线程 Thread-2 完成  # 1秒后先完成
  4. 线程 Thread-1 完成  # 再等1秒(共2秒)完成
  5. 所有线程执行完毕
复制代码
1.3 线程同步:避免竞态条件

多线程共享内存时,多个线程同时操作共享资源可能导致竞态条件(数据不一致)。需通过同步机制控制访问:
1.3.1 互斥锁(Lock)

最基础的同步工具
Lock是最简单的同步原语,通过 “加锁 - 操作 - 解锁” 流程保证临界区的互斥访问:

  • acquire():获取锁(若锁已被占用,则阻塞等待)。
  • release():释放锁(必须在acquire()后调用,否则报错)。
基本用法:解决竞态条件
  1. import threading
  2. counter = 0
  3. lock = threading.Lock()  # 创建互斥锁
  4. def increment():
  5.     global counter
  6.     for _ in range(100000):
  7.         lock.acquire()  # 加锁:确保只有一个线程进入临界区
  8.         try:
  9.             # 临界区操作(安全)
  10.             temp = counter
  11.             temp += 1
  12.             counter = temp
  13.         finally:
  14.             lock.release()  # 解锁:无论是否异常,必须释放锁
  15. t1 = threading.Thread(target=increment)
  16. t2 = threading.Thread(target=increment)
  17. t1.start()
  18. t2.start()
  19. t1.join()
  20. t2.join()
  21. print(f"最终counter值:{counter}")  # 正确输出200000
复制代码
上下文管理器:简化锁操作
Lock支持with语句,自动完成acquire()和release(),避免遗漏解锁:
  1. def increment():
  2.     global counter
  3.     for _ in range(100000):
  4.         with lock:  # 等价于acquire() + finally release()
  5.             temp = counter
  6.             temp += 1
  7.             counter = temp
复制代码
1.3.2 可重入锁(RLock)

解决同一线程多次加锁问题
Lock不允许同一线程多次获取锁(会导致死锁),而RLock(可重入锁)允许同一线程多次获取锁,但需保证acquire()和release()次数相等。
递归函数或嵌套锁
  1. import threading
  2. rlock = threading.RLock()  # 创建可重入锁
  3. def inner():
  4.     with rlock:  # 同一线程再次获取锁(RLock允许)
  5.         print("进入inner函数")
  6. def outer():
  7.     with rlock:  # 第一次获取锁
  8.         print("进入outer函数")
  9.         inner()  # 调用inner,需要再次获取锁
  10. # 单线程中嵌套获取锁
  11. outer()
复制代码
输出
  1. 进入outer函数
  2. 进入inner函数
复制代码
说明:若用Lock替代RLock,inner()中with lock会阻塞(锁已被outer()占用),导致死锁。
1.3.3 信号量(Semaphore)

限制并发访问数量
Semaphore通过维护一个 “计数器” 控制同时访问资源的线程数:

  • 初始计数器值为允许的最大并发数。
  • acquire():计数器 - 1(若为 0 则阻塞)。
  • release():计数器 + 1。
限制资源并发访问(如连接池、爬虫速率控制)
  1. import threading
  2. import time
  3. # 信号量:最多允许3个线程同时访问
  4. semaphore = threading.Semaphore(3)
  5. def access_resource(name):
  6.     with semaphore:  # 获取信号量(计数器-1)
  7.         print(f"线程 {name} 开始访问资源")
  8.         time.sleep(2)  # 模拟资源访问耗时
  9.         print(f"线程 {name} 结束访问资源")
  10. # 创建5个线程竞争资源
  11. threads = [threading.Thread(target=access_resource, args=(f"T{i}",)) for i in range(5)]
  12. for t in threads:
  13.     t.start()
  14. for t in threads:
  15.     t.join()
复制代码
输出
  1. 线程 T0 开始访问资源
  2. 线程 T1 开始访问资源
  3. 线程 T2 开始访问资源
  4. (2秒后)
  5. 线程 T0 结束访问资源
  6. 线程 T1 结束访问资源
  7. 线程 T2 结束访问资源
  8. 线程 T3 开始访问资源
  9. 线程 T4 开始访问资源
  10. (再2秒后)
  11. 线程 T3 结束访问资源
  12. 线程 T4 结束访问资源
复制代码
说明:同一时间最多 3 个线程访问资源,超出的线程需等待其他线程释放信号量。
1.3.4 事件(Event)

线程间的信号通知
Event通过一个 “标志位” 实现线程间通信:

  • set():将标志位设为True,唤醒所有等待的线程。
  • clear():将标志位设为False。
  • wait():若标志位为False则阻塞,直到set()被调用。
  • is_set():判断标志位是否为True。
一个线程等待另一个线程的 “信号”(如初始化完成通知)
  1. import threading
  2. import time
  3. # 创建事件(初始标志位为False)
  4. event = threading.Event()
  5. def init_task():
  6.     print("初始化任务开始...")
  7.     time.sleep(3)  # 模拟初始化耗时
  8.     print("初始化完成,发送信号")
  9.     event.set()  # 发送信号:标志位设为True
  10. def worker(name):
  11.     print(f"工作线程 {name} 等待初始化...")
  12.     event.wait()  # 阻塞等待信号
  13.     print(f"工作线程 {name} 开始工作")
  14. # 创建初始化线程和工作线程
  15. t_init = threading.Thread(target=init_task)
  16. t_workers = [threading.Thread(target=worker, args=(f"W{i}",)) for i in range(3)]
  17. t_init.start()
  18. for t in t_workers:
  19.     t.start()
  20. t_init.join()
  21. for t in t_workers:
  22.     t.join()
复制代码
输出
  1. 初始化任务开始...
  2. 工作线程 W0 等待初始化...
  3. 工作线程 W1 等待初始化...
  4. 工作线程 W2 等待初始化...
  5. (3秒后)
  6. 初始化完成,发送信号
  7. 工作线程 W0 开始工作
  8. 工作线程 W1 开始工作
  9. 工作线程 W2 开始工作
复制代码
1.3.5 条件变量(Condition)

复杂条件的同步
Condition结合了Lock和Event的功能,允许线程等待某个 “条件成立”,并在条件变化时通知其他线程。核心方法:

  • acquire()/release():与Lock一致,控制临界区。
  • wait():释放锁并阻塞,等待被notify()唤醒(唤醒后需重新获取锁)。
  • notify(n=1):唤醒最多n个等待的线程。
  • notify_all():唤醒所有等待的线程。
生产者 - 消费者模型(队列满 / 空时的等待)
  1. import threading
  2. import time
  3. from queue import Queue
  4. # 创建条件变量(内部包含一个Lock)
  5. condition = threading.Condition()
  6. queue = Queue(maxsize=5)  # 缓冲区:最多存5个数据
  7. def producer(name):
  8.     for i in range(10):  # 生产10个数据
  9.         with condition:  # 获取锁
  10.             # 若队列满,则等待(释放锁,允许消费者取数据)
  11.             while queue.full():
  12.                 print(f"生产者 {name}:队列满,等待...")
  13.                 condition.wait()  # 等待消费者通知
  14.             # 生产数据
  15.             data = f"数据{i}"
  16.             queue.put(data)
  17.             print(f"生产者 {name}:生产 {data},当前队列大小:{queue.qsize()}")
  18.             condition.notify()  # 通知消费者:有新数据
  19.         time.sleep(0.5)  # 模拟生产耗时
  20. def consumer(name):
  21.     while True:
  22.         with condition:
  23.             # 若队列空,则等待(释放锁,允许生产者存数据)
  24.             while queue.empty():
  25.                 print(f"消费者 {name}:队列空,等待...")
  26.                 condition.wait()  # 等待生产者通知
  27.             # 消费数据
  28.             data = queue.get()
  29.             print(f"消费者 {name}:消费 {data},当前队列大小:{queue.qsize()}")
  30.             condition.notify()  # 通知生产者:有空闲位置
  31.             queue.task_done()  # 标记任务完成
  32.         time.sleep(1)  # 模拟消费耗时
  33. # 创建生产者和消费者
  34. t_producer = threading.Thread(target=producer, args=("P1",))
  35. t_consumer1 = threading.Thread(target=consumer, args=("C1",), daemon=True)  # 守护线程
  36. t_consumer2 = threading.Thread(target=consumer, args=("C2",), daemon=True)
  37. t_producer.start()
  38. t_consumer1.start()
  39. t_consumer2.start()
  40. t_producer.join()
  41. queue.join()  # 等待所有数据被消费
  42. print("所有数据处理完毕")
复制代码
输出
  1. 生产者 P1:生产 数据0,当前队列大小:1
  2. 消费者 C1:消费 数据0,当前队列大小:0
  3. 生产者 P1:生产 数据1,当前队列大小:1
  4. 消费者 C2:消费 数据1,当前队列大小:0
  5. ...
复制代码
说明

  • 当队列满时,生产者调用wait()释放锁并等待,消费者取走数据后notify()唤醒生产者。
  • 当队列空时,消费者wait(),生产者放入数据后notify()唤醒消费者。
1.4 线程池

concurrent.futures.ThreadPoolExecutor
创建大量线程时,频繁创建 / 销毁线程会消耗资源。线程池通过复用线程提高效率,适合任务数量多的场景。
线程池处理批量任务
  1. from concurrent.futures import ThreadPoolExecutor, as_completed
  2. import time
  3. def fetch_url(url, delay):
  4.     """模拟网络请求任务"""
  5.     time.sleep(delay)
  6.     return f"URL {url} 耗时 {delay} 秒"
  7. def main():
  8.     urls = [f"https://example.com/{i}" for i in range(5)]
  9.     delays = [1, 3, 2, 4, 1]  # 每个URL的模拟延迟
  10.    
  11.     # 创建线程池(最大5个线程)
  12.     with ThreadPoolExecutor(max_workers=5) as executor:
  13.         # 提交任务到线程池(返回Future对象列表)
  14.         futures = [executor.submit(fetch_url, url, delay)
  15.                   for url, delay in zip(urls, delays)]
  16.         
  17.         # 遍历已完成的任务(按完成顺序)
  18.         for future in as_completed(futures):
  19.             print(future.result())  # 获取任务结果
  20. if __name__ == "__main__":
  21.     start = time.time()
  22.     main()
  23.     print(f"总耗时:{time.time() - start:.2f} 秒")  # 约等于最长延迟4秒
复制代码
输出
  1. URL https://example.com/0 耗时 1 秒
  2. URL https://example.com/4 耗时 1 秒
  3. URL https://example.com/2 耗时 2 秒
  4. URL https://example.com/1 耗时 3 秒
  5. URL https://example.com/3 耗时 4 秒
  6. 总耗时:4.01 秒
复制代码
2、multiprocessing与进程池

多进程通过创建独立进程实现并发,每个进程有自己的 Python 解释器和内存空间,完全避开 GIL 限制,适合 CPU 密集型任务(如数据计算、图像处理)。
2.1 进程与线程的核心区别


  • 内存空间:进程间内存不共享(需通过特定机制通信),线程共享同一进程的内存。
  • 开销:进程创建 / 销毁开销大(约是线程的 10 倍),切换成本高。
  • GIL 影响:进程不受 GIL 限制,可利用多核 CPU;线程受 GIL 限制。
2.2 进程的创建与基本使用

通过multiprocessing.Process类创建进程,用法与threading.Thread类似,但需注意:

  • Windows 系统中,进程函数需放在if __name__ == "__main__":内(避免无限递归创建进程)。
  • 进程间无法直接共享全局变量,需用multiprocessing.Queue、Pipe等工具通信。
基础进程创建与执行
  1. import multiprocessing
  2. import time
  3. def process_task(name, delay):
  4.     """进程执行的任务:模拟CPU计算(无IO阻塞)"""
  5.     print(f"进程 {name} 启动,计算 {delay} 秒")
  6.     # 模拟CPU密集型任务(循环计算)
  7.     start = time.time()
  8.     while time.time() - start < delay:
  9.         pass  # 空循环消耗CPU
  10.     print(f"进程 {name} 完成")
  11. if __name__ == "__main__":  # Windows必须加此判断
  12.     # 创建进程
  13.     p1 = multiprocessing.Process(target=process_task, args=("Process-1", 2), name="P1")
  14.     p2 = multiprocessing.Process(target=process_task, args=("Process-2", 2), name="P2")
  15.    
  16.     # 启动进程
  17.     p1.start()
  18.     p2.start()
  19.    
  20.     # 等待进程完成
  21.     p1.join()
  22.     p2.join()
  23.    
  24.     print("所有进程执行完毕")
复制代码
输出
  1. 进程 Process-1 启动,计算 2 秒
  2. 进程 Process-2 启动,计算 2 秒
  3. 进程 Process-1 完成  # 约2秒后同时完成
  4. 进程 Process-2 完成
  5. 所有进程执行完毕
复制代码
2.3 进程间通信

进程内存不共享,需通过以下方式通信:
通信方式适用场景特点Queue多进程间传递数据(先进先出)线程安全,支持多生产者 / 多消费者Pipe两个进程间双向通信速度快,仅支持两个进程Manager共享复杂数据结构(列表、字典等)支持多种类型,但开销较大2.3.1 队列(Queue)

多进程安全的消息传递
Queue是基于消息传递的 IPC 方式,底层通过管道(Pipe)和锁(Lock)实现,支持多生产者、多消费者模型,是最常用的进程间通信工具。
核心特性:

  • 线程 / 进程安全:内部实现了同步机制,可安全用于多进程。
  • FIFO(先进先出):数据按发送顺序接收。
  • 阻塞 / 非阻塞操作:put()/get()支持超时参数,避免永久阻塞。
生产者 - 消费者模型
  1. from multiprocessing import Process, Queue
  2. import time
  3. def producer(queue, name):
  4.     """生产者:向队列发送数据"""
  5.     for i in range(5):
  6.         data = f"生产者{name}的数据{i}"
  7.         queue.put(data)  # 向队列放入数据(满时阻塞)
  8.         print(f"生产者{name}发送:{data}")
  9.         time.sleep(0.5)  # 模拟生产耗时
  10. def consumer(queue, name):
  11.     """消费者:从队列接收数据"""
  12.     while True:
  13.         data = queue.get()  # 从队列获取数据(空时阻塞)
  14.         if data == "EXIT":  # 退出信号
  15.             break
  16.         print(f"消费者{name}接收:{data}")
  17.         time.sleep(1)  # 模拟消费耗时
  18.     print(f"消费者{name}退出")
  19. if __name__ == "__main__":
  20.     # 创建队列(默认无界,可指定maxsize限制容量)
  21.     queue = Queue(maxsize=3)  # 队列最多存3个数据
  22.    
  23.     # 创建2个生产者和2个消费者进程
  24.     producers = [
  25.         Process(target=producer, args=(queue, "P1")),
  26.         Process(target=producer, args=(queue, "P2"))
  27.     ]
  28.     consumers = [
  29.         Process(target=consumer, args=(queue, "C1")),
  30.         Process(target=consumer, args=(queue, "C2"))
  31.     ]
  32.    
  33.     # 启动进程
  34.     for p in producers:
  35.         p.start()
  36.     for c in consumers:
  37.         c.start()
  38.    
  39.     # 等待生产者完成
  40.     for p in producers:
  41.         p.join()
  42.    
  43.     # 发送退出信号(每个消费者一个)
  44.     for _ in consumers:
  45.         queue.put("EXIT")
  46.    
  47.     # 等待消费者完成
  48.     for c in consumers:
  49.         c.join()
  50.    
  51.     print("所有进程结束")
复制代码
关键方法:

  • queue.put(data, block=True, timeout=None)

    • 向队列放入数据,block=True时队列满则阻塞,timeout设置超时时间(超时抛queue.Full)。

  • queue.get(block=True, timeout=None)

    • 从队列获取数据,block=True时队列空则阻塞,timeout设置超时时间(超时抛queue.Empty)。

  • queue.qsize():返回当前队列中的数据量(非精确值,仅参考)。
  • queue.empty()/queue.full():判断队列是否为空 / 满(非线程安全,仅参考)。
2.3.2 管道(Pipe)

两个进程的双向通信
Pipe用于两个进程之间的双向通信,底层基于操作系统的管道机制,速度比Queue快,但仅支持点对点通信。
核心特性:

  • 双向通信:管道两端均可发送和接收数据。
  • 无缓冲 / 有缓冲:默认是无缓冲(发送方阻塞直到接收方读取),也可创建有缓冲管道。
  • 仅支持两个进程:多进程使用可能导致数据混乱。
基本用法:双向通信示例
  1. from multiprocessing import Process, Pipe
  2. import time
  3. def process_a(conn):
  4.     """进程A:发送数据并接收响应"""
  5.     # 发送数据到进程B
  6.     conn.send("Hello from A")
  7.     print("A发送:Hello from A")
  8.    
  9.     # 接收进程B的响应
  10.     response = conn.recv()
  11.     print(f"A接收:{response}")
  12.    
  13.     # 关闭连接
  14.     conn.close()
  15. def process_b(conn):
  16.     """进程B:接收数据并发送响应"""
  17.     # 接收进程A的数据
  18.     data = conn.recv()
  19.     print(f"B接收:{data}")
  20.    
  21.     # 发送响应到进程A
  22.     time.sleep(1)  # 模拟处理耗时
  23.     conn.send("Hello from B")
  24.     print("B发送:Hello from B")
  25.    
  26.     # 关闭连接
  27.     conn.close()
  28. if __name__ == "__main__":
  29.     # 创建管道:返回两个连接对象(conn1和conn2)
  30.     conn_a, conn_b = Pipe()  # 双向管道
  31.    
  32.     # 创建两个进程,分别传递管道的一端
  33.     p_a = Process(target=process_a, args=(conn_a,))
  34.     p_b = Process(target=process_b, args=(conn_b,))
  35.    
  36.     p_a.start()
  37.     p_b.start()
  38.    
  39.     p_a.join()
  40.     p_b.join()
  41.    
  42.     print("通信结束")
复制代码
关键方法:

  • conn.send(data):发送数据(数据需可序列化,如字符串、列表等)。
  • conn.recv():接收数据(阻塞直到有数据可接收)。
  • conn.close():关闭连接。
  • conn.poll(timeout=None):检查是否有数据可接收(非阻塞,timeout设置等待时间)。
2.3.3 共享内存(Value/Array)

高效的数据共享
共享内存允许多个进程直接访问同一块内存空间,是效率最高的 IPC 方式(无需数据拷贝),但需结合同步机制(如锁)避免竞态条件。multiprocessing提供Value(单个值)和Array(数组)两种共享内存工具。
核心特性:

  • 高效:数据直接在内存中共享,无需序列化 / 反序列化。
  • 类型限制:需指定数据类型(如'i'表示整数,'d'表示双精度浮点数)。
  • 需同步:多进程读写时必须加锁,否则会导致数据混乱。
共享内存与同步
  1. from multiprocessing import Process, Value, Array, Lock
  2. import time
  3. def increment_counter(counter, lock):
  4.     """递增共享计数器(需加锁保护)"""
  5.     for _ in range(100000):
  6.         with lock:  # 加锁确保原子操作
  7.             counter.value += 1  # 访问共享内存中的值
  8. def modify_array(arr, lock, start, end):
  9.     """修改共享数组(需加锁保护)"""
  10.     with lock:
  11.         for i in range(start, end):
  12.             arr[i] = i * 2  # 访问共享数组
  13.         time.sleep(1)  # 模拟耗时
  14. if __name__ == "__main__":
  15.     # 创建共享变量:Value(类型, 初始值)
  16.     counter = Value('i', 0)  # 'i'表示有符号整数
  17.    
  18.     # 创建共享数组:Array(类型, 初始值/长度)
  19.     arr = Array('i', [0] * 10)  # 长度为10的整数数组
  20.    
  21.     # 同步锁(确保共享内存操作安全)
  22.     lock = Lock()
  23.    
  24.     # 测试共享计数器(2个进程并发递增)
  25.     p1 = Process(target=increment_counter, args=(counter, lock))
  26.     p2 = Process(target=increment_counter, args=(counter, lock))
  27.    
  28.     p1.start()
  29.     p2.start()
  30.     p1.join()
  31.     p2.join()
  32.    
  33.     print(f"共享计数器结果:{counter.value}")  # 预期200000
  34.    
  35.     # 测试共享数组(2个进程分别修改不同区间)
  36.     p3 = Process(target=modify_array, args=(arr, lock, 0, 5))
  37.     p4 = Process(target=modify_array, args=(arr, lock, 5, 10))
  38.    
  39.     p3.start()
  40.     p4.start()
  41.     p3.join()
  42.     p4.join()
  43.    
  44.     print(f"共享数组结果:{list(arr)}")  # 预期[0,2,4,6,8,10,12,14,16,18]
复制代码
类型代码:
Value和Array的类型参数需使用ctypes兼容的类型代码,常见类型:

  • 'i':int(整数)
  • 'd':double(双精度浮点数)
  • 'f':float(单精度浮点数)
  • 'c':char(字符)
2.3.4 管理器(Manager)

共享复杂数据结构
Manager通过创建一个服务器进程管理共享数据,其他进程通过网络连接访问该进程,支持共享复杂数据结构(如字典、列表、队列等),灵活性高但效率低于共享内存。
核心特性:

  • 支持复杂类型:字典、列表、集合、自定义对象等。
  • 多进程安全:内部实现了同步机制。
  • 跨机器通信:可通过网络在不同机器的进程间共享数据(需配置地址)。
基本用法:共享字典和列表
  1. from multiprocessing import Process, Manager
  2. import time
  3. def update_dict(shared_dict, name):
  4.     """更新共享字典"""
  5.     for i in range(3):
  6.         shared_dict[f"{name}_{i}"] = i
  7.         time.sleep(0.5)
  8.         print(f"进程{name}更新字典:{shared_dict}")
  9. def append_list(shared_list, name):
  10.     """向共享列表追加元素"""
  11.     for i in range(3):
  12.         shared_list.append(f"{name}_{i}")
  13.         time.sleep(0.5)
  14.         print(f"进程{name}更新列表:{shared_list}")
  15. if __name__ == "__main__":
  16.     # 创建Manager对象(启动服务器进程)
  17.     with Manager() as manager:
  18.         # 创建共享字典和列表
  19.         shared_dict = manager.dict()  # 共享字典
  20.         shared_list = manager.list()  # 共享列表
  21.         
  22.         # 创建进程修改共享数据
  23.         p1 = Process(target=update_dict, args=(shared_dict, "P1"))
  24.         p2 = Process(target=update_dict, args=(shared_dict, "P2"))
  25.         p3 = Process(target=append_list, args=(shared_list, "P3"))
  26.         p4 = Process(target=append_list, args=(shared_list, "P4"))
  27.         
  28.         # 启动进程
  29.         p1.start()
  30.         p2.start()
  31.         p3.start()
  32.         p4.start()
  33.         
  34.         # 等待完成
  35.         p1.join()
  36.         p2.join()
  37.         p3.join()
  38.         p4.join()
  39.         
  40.         # 打印最终结果
  41.         print("\n最终共享字典:", dict(shared_dict))
  42.         print("最终共享列表:", list(shared_list))
复制代码
支持的共享类型:
Manager支持多种数据结构,通过以下方法创建:

  • manager.dict():共享字典
  • manager.list():共享列表
  • manager.set():共享集合
  • manager.Queue():共享队列
  • manager.Value()/manager.Array():共享值 / 数组(类似multiprocessing的同名工具,但通过 Manager 管理)
2.4 进程池

multiprocessing.Pool与ProcessPoolExecutor
进程池用于管理多个进程,复用进程资源,适合批量 CPU 密集型任务。
进程池处理 CPU 密集型任务(计算素数)
  1. from concurrent.futures import ProcessPoolExecutor
  2. import math
  3. def is_prime(n):
  4.     """判断是否为素数(CPU密集型任务)"""
  5.     if n < 2:
  6.         return False
  7.     for i in range(2, int(math.sqrt(n)) + 1):
  8.         if n % i == 0:
  9.             return False
  10.     return True
  11. def count_primes(range_start, range_end):
  12.     """统计指定范围内的素数数量"""
  13.     count = 0
  14.     for num in range(range_start, range_end):
  15.         if is_prime(num):
  16.             count += 1
  17.     return f"{range_start}-{range_end} 素数数量:{count}"
  18. def main():
  19.     # 划分任务:将1-100000分为4个区间,交给4个进程
  20.     ranges = [(1, 25000), (25000, 50000), (50000, 75000), (75000, 100000)]
  21.    
  22.     # 创建进程池(最大4个进程,与CPU核心数匹配)
  23.     with ProcessPoolExecutor(max_workers=4) as executor:
  24.         # 提交所有任务并获取结果(按输入顺序返回)
  25.         results = list(executor.map(lambda r: count_primes(r[0], r[1]), ranges))
  26.    
  27.     for res in results:
  28.         print(res)
  29. if __name__ == "__main__":
  30.     import time
  31.     start = time.time()
  32.     main()
  33.     print(f"总耗时:{time.time() - start:.2f} 秒")  # 多进程并行计算,耗时约为单进程的1/4
复制代码
3、异步编程:asyncio与协程

https://www.cnblogs.com/gange111/p/19109363

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册