Python 并发编程是提升程序执行效率的核心技术,尤其在处理多任务场景(如网络请求、数据计算、文件 IO 等)时至关重要。
1、threading与线程池
多线程是 Python 中最常用的并发方式之一,通过创建多个线程实现任务并行执行。但受GIL(全局解释器锁) 限制,同一时刻只有一个线程执行 Python 字节码。
1.1 GIL
GIL 是 Python 解释器的一种机制,确保同一时刻只有一个线程执行 Python 代码。其影响:
- IO 密集型任务:线程等待 IO 时会释放 GIL,其他线程可执行,因此多线程有效(如网络请求时,线程阻塞等待响应,GIL 释放给其他线程)。
- CPU 密集型任务:线程持续占用 CPU,GIL 无法释放,多线程反而因切换开销降低效率(此时应选多进程)。
1.2 线程的创建与基本使用
通过threading.Thread类创建线程,核心参数:
- target:线程执行的函数。
- args/kwargs:传递给函数的参数。
- name:线程名称(用于调试)。
基础线程创建与执行- import threading
- import time
- def task(name, delay):
- """线程执行的任务:模拟IO操作(如网络请求)"""
- print(f"线程 {name} 启动,延迟 {delay} 秒")
- time.sleep(delay) # 模拟IO阻塞(此时GIL释放,其他线程可执行)
- print(f"线程 {name} 完成")
- # 创建线程
- thread1 = threading.Thread(target=task, args=("Thread-1", 2), name="T1")
- thread2 = threading.Thread(target=task, args=("Thread-2", 1), name="T2")
- # 启动线程
- thread1.start()
- thread2.start()
- # 等待线程完成(主线程阻塞,直到子线程结束)
- thread1.join()
- thread2.join()
- print("所有线程执行完毕")
复制代码 输出- 线程 Thread-1 启动,延迟 2 秒
- 线程 Thread-2 启动,延迟 1 秒
- 线程 Thread-2 完成 # 1秒后先完成
- 线程 Thread-1 完成 # 再等1秒(共2秒)完成
- 所有线程执行完毕
复制代码 1.3 线程同步:避免竞态条件
多线程共享内存时,多个线程同时操作共享资源可能导致竞态条件(数据不一致)。需通过同步机制控制访问:
1.3.1 互斥锁(Lock)
最基础的同步工具
Lock是最简单的同步原语,通过 “加锁 - 操作 - 解锁” 流程保证临界区的互斥访问:
- acquire():获取锁(若锁已被占用,则阻塞等待)。
- release():释放锁(必须在acquire()后调用,否则报错)。
基本用法:解决竞态条件- import threading
- counter = 0
- lock = threading.Lock() # 创建互斥锁
- def increment():
- global counter
- for _ in range(100000):
- lock.acquire() # 加锁:确保只有一个线程进入临界区
- try:
- # 临界区操作(安全)
- temp = counter
- temp += 1
- counter = temp
- finally:
- lock.release() # 解锁:无论是否异常,必须释放锁
- t1 = threading.Thread(target=increment)
- t2 = threading.Thread(target=increment)
- t1.start()
- t2.start()
- t1.join()
- t2.join()
- print(f"最终counter值:{counter}") # 正确输出200000
复制代码 上下文管理器:简化锁操作
Lock支持with语句,自动完成acquire()和release(),避免遗漏解锁:- def increment():
- global counter
- for _ in range(100000):
- with lock: # 等价于acquire() + finally release()
- temp = counter
- temp += 1
- counter = temp
复制代码 1.3.2 可重入锁(RLock)
解决同一线程多次加锁问题
Lock不允许同一线程多次获取锁(会导致死锁),而RLock(可重入锁)允许同一线程多次获取锁,但需保证acquire()和release()次数相等。
递归函数或嵌套锁- import threading
- rlock = threading.RLock() # 创建可重入锁
- def inner():
- with rlock: # 同一线程再次获取锁(RLock允许)
- print("进入inner函数")
- def outer():
- with rlock: # 第一次获取锁
- print("进入outer函数")
- inner() # 调用inner,需要再次获取锁
- # 单线程中嵌套获取锁
- outer()
复制代码 输出:说明:若用Lock替代RLock,inner()中with lock会阻塞(锁已被outer()占用),导致死锁。
1.3.3 信号量(Semaphore)
限制并发访问数量
Semaphore通过维护一个 “计数器” 控制同时访问资源的线程数:
- 初始计数器值为允许的最大并发数。
- acquire():计数器 - 1(若为 0 则阻塞)。
- release():计数器 + 1。
限制资源并发访问(如连接池、爬虫速率控制)- import threading
- import time
- # 信号量:最多允许3个线程同时访问
- semaphore = threading.Semaphore(3)
- def access_resource(name):
- with semaphore: # 获取信号量(计数器-1)
- print(f"线程 {name} 开始访问资源")
- time.sleep(2) # 模拟资源访问耗时
- print(f"线程 {name} 结束访问资源")
- # 创建5个线程竞争资源
- threads = [threading.Thread(target=access_resource, args=(f"T{i}",)) for i in range(5)]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
复制代码 输出- 线程 T0 开始访问资源
- 线程 T1 开始访问资源
- 线程 T2 开始访问资源
- (2秒后)
- 线程 T0 结束访问资源
- 线程 T1 结束访问资源
- 线程 T2 结束访问资源
- 线程 T3 开始访问资源
- 线程 T4 开始访问资源
- (再2秒后)
- 线程 T3 结束访问资源
- 线程 T4 结束访问资源
复制代码 说明:同一时间最多 3 个线程访问资源,超出的线程需等待其他线程释放信号量。
1.3.4 事件(Event)
线程间的信号通知
Event通过一个 “标志位” 实现线程间通信:
- set():将标志位设为True,唤醒所有等待的线程。
- clear():将标志位设为False。
- wait():若标志位为False则阻塞,直到set()被调用。
- is_set():判断标志位是否为True。
一个线程等待另一个线程的 “信号”(如初始化完成通知)- import threading
- import time
- # 创建事件(初始标志位为False)
- event = threading.Event()
- def init_task():
- print("初始化任务开始...")
- time.sleep(3) # 模拟初始化耗时
- print("初始化完成,发送信号")
- event.set() # 发送信号:标志位设为True
- def worker(name):
- print(f"工作线程 {name} 等待初始化...")
- event.wait() # 阻塞等待信号
- print(f"工作线程 {name} 开始工作")
- # 创建初始化线程和工作线程
- t_init = threading.Thread(target=init_task)
- t_workers = [threading.Thread(target=worker, args=(f"W{i}",)) for i in range(3)]
- t_init.start()
- for t in t_workers:
- t.start()
- t_init.join()
- for t in t_workers:
- t.join()
复制代码 输出:- 初始化任务开始...
- 工作线程 W0 等待初始化...
- 工作线程 W1 等待初始化...
- 工作线程 W2 等待初始化...
- (3秒后)
- 初始化完成,发送信号
- 工作线程 W0 开始工作
- 工作线程 W1 开始工作
- 工作线程 W2 开始工作
复制代码 1.3.5 条件变量(Condition)
复杂条件的同步
Condition结合了Lock和Event的功能,允许线程等待某个 “条件成立”,并在条件变化时通知其他线程。核心方法:
- acquire()/release():与Lock一致,控制临界区。
- wait():释放锁并阻塞,等待被notify()唤醒(唤醒后需重新获取锁)。
- notify(n=1):唤醒最多n个等待的线程。
- notify_all():唤醒所有等待的线程。
生产者 - 消费者模型(队列满 / 空时的等待)- import threading
- import time
- from queue import Queue
- # 创建条件变量(内部包含一个Lock)
- condition = threading.Condition()
- queue = Queue(maxsize=5) # 缓冲区:最多存5个数据
- def producer(name):
- for i in range(10): # 生产10个数据
- with condition: # 获取锁
- # 若队列满,则等待(释放锁,允许消费者取数据)
- while queue.full():
- print(f"生产者 {name}:队列满,等待...")
- condition.wait() # 等待消费者通知
- # 生产数据
- data = f"数据{i}"
- queue.put(data)
- print(f"生产者 {name}:生产 {data},当前队列大小:{queue.qsize()}")
- condition.notify() # 通知消费者:有新数据
- time.sleep(0.5) # 模拟生产耗时
- def consumer(name):
- while True:
- with condition:
- # 若队列空,则等待(释放锁,允许生产者存数据)
- while queue.empty():
- print(f"消费者 {name}:队列空,等待...")
- condition.wait() # 等待生产者通知
- # 消费数据
- data = queue.get()
- print(f"消费者 {name}:消费 {data},当前队列大小:{queue.qsize()}")
- condition.notify() # 通知生产者:有空闲位置
- queue.task_done() # 标记任务完成
- time.sleep(1) # 模拟消费耗时
- # 创建生产者和消费者
- t_producer = threading.Thread(target=producer, args=("P1",))
- t_consumer1 = threading.Thread(target=consumer, args=("C1",), daemon=True) # 守护线程
- t_consumer2 = threading.Thread(target=consumer, args=("C2",), daemon=True)
- t_producer.start()
- t_consumer1.start()
- t_consumer2.start()
- t_producer.join()
- queue.join() # 等待所有数据被消费
- print("所有数据处理完毕")
复制代码 输出- 生产者 P1:生产 数据0,当前队列大小:1
- 消费者 C1:消费 数据0,当前队列大小:0
- 生产者 P1:生产 数据1,当前队列大小:1
- 消费者 C2:消费 数据1,当前队列大小:0
- ...
复制代码 说明:
- 当队列满时,生产者调用wait()释放锁并等待,消费者取走数据后notify()唤醒生产者。
- 当队列空时,消费者wait(),生产者放入数据后notify()唤醒消费者。
1.4 线程池
concurrent.futures.ThreadPoolExecutor
创建大量线程时,频繁创建 / 销毁线程会消耗资源。线程池通过复用线程提高效率,适合任务数量多的场景。
线程池处理批量任务- from concurrent.futures import ThreadPoolExecutor, as_completed
- import time
- def fetch_url(url, delay):
- """模拟网络请求任务"""
- time.sleep(delay)
- return f"URL {url} 耗时 {delay} 秒"
- def main():
- urls = [f"https://example.com/{i}" for i in range(5)]
- delays = [1, 3, 2, 4, 1] # 每个URL的模拟延迟
-
- # 创建线程池(最大5个线程)
- with ThreadPoolExecutor(max_workers=5) as executor:
- # 提交任务到线程池(返回Future对象列表)
- futures = [executor.submit(fetch_url, url, delay)
- for url, delay in zip(urls, delays)]
-
- # 遍历已完成的任务(按完成顺序)
- for future in as_completed(futures):
- print(future.result()) # 获取任务结果
- if __name__ == "__main__":
- start = time.time()
- main()
- print(f"总耗时:{time.time() - start:.2f} 秒") # 约等于最长延迟4秒
复制代码 输出- URL https://example.com/0 耗时 1 秒
- URL https://example.com/4 耗时 1 秒
- URL https://example.com/2 耗时 2 秒
- URL https://example.com/1 耗时 3 秒
- URL https://example.com/3 耗时 4 秒
- 总耗时:4.01 秒
复制代码 2、multiprocessing与进程池
多进程通过创建独立进程实现并发,每个进程有自己的 Python 解释器和内存空间,完全避开 GIL 限制,适合 CPU 密集型任务(如数据计算、图像处理)。
2.1 进程与线程的核心区别
- 内存空间:进程间内存不共享(需通过特定机制通信),线程共享同一进程的内存。
- 开销:进程创建 / 销毁开销大(约是线程的 10 倍),切换成本高。
- GIL 影响:进程不受 GIL 限制,可利用多核 CPU;线程受 GIL 限制。
2.2 进程的创建与基本使用
通过multiprocessing.Process类创建进程,用法与threading.Thread类似,但需注意:
- Windows 系统中,进程函数需放在if __name__ == "__main__":内(避免无限递归创建进程)。
- 进程间无法直接共享全局变量,需用multiprocessing.Queue、Pipe等工具通信。
基础进程创建与执行- import multiprocessing
- import time
- def process_task(name, delay):
- """进程执行的任务:模拟CPU计算(无IO阻塞)"""
- print(f"进程 {name} 启动,计算 {delay} 秒")
- # 模拟CPU密集型任务(循环计算)
- start = time.time()
- while time.time() - start < delay:
- pass # 空循环消耗CPU
- print(f"进程 {name} 完成")
- if __name__ == "__main__": # Windows必须加此判断
- # 创建进程
- p1 = multiprocessing.Process(target=process_task, args=("Process-1", 2), name="P1")
- p2 = multiprocessing.Process(target=process_task, args=("Process-2", 2), name="P2")
-
- # 启动进程
- p1.start()
- p2.start()
-
- # 等待进程完成
- p1.join()
- p2.join()
-
- print("所有进程执行完毕")
复制代码 输出- 进程 Process-1 启动,计算 2 秒
- 进程 Process-2 启动,计算 2 秒
- 进程 Process-1 完成 # 约2秒后同时完成
- 进程 Process-2 完成
- 所有进程执行完毕
复制代码 2.3 进程间通信
进程内存不共享,需通过以下方式通信:
通信方式适用场景特点Queue多进程间传递数据(先进先出)线程安全,支持多生产者 / 多消费者Pipe两个进程间双向通信速度快,仅支持两个进程Manager共享复杂数据结构(列表、字典等)支持多种类型,但开销较大2.3.1 队列(Queue)
多进程安全的消息传递
Queue是基于消息传递的 IPC 方式,底层通过管道(Pipe)和锁(Lock)实现,支持多生产者、多消费者模型,是最常用的进程间通信工具。
核心特性:
- 线程 / 进程安全:内部实现了同步机制,可安全用于多进程。
- FIFO(先进先出):数据按发送顺序接收。
- 阻塞 / 非阻塞操作:put()/get()支持超时参数,避免永久阻塞。
生产者 - 消费者模型- from multiprocessing import Process, Queue
- import time
- def producer(queue, name):
- """生产者:向队列发送数据"""
- for i in range(5):
- data = f"生产者{name}的数据{i}"
- queue.put(data) # 向队列放入数据(满时阻塞)
- print(f"生产者{name}发送:{data}")
- time.sleep(0.5) # 模拟生产耗时
- def consumer(queue, name):
- """消费者:从队列接收数据"""
- while True:
- data = queue.get() # 从队列获取数据(空时阻塞)
- if data == "EXIT": # 退出信号
- break
- print(f"消费者{name}接收:{data}")
- time.sleep(1) # 模拟消费耗时
- print(f"消费者{name}退出")
- if __name__ == "__main__":
- # 创建队列(默认无界,可指定maxsize限制容量)
- queue = Queue(maxsize=3) # 队列最多存3个数据
-
- # 创建2个生产者和2个消费者进程
- producers = [
- Process(target=producer, args=(queue, "P1")),
- Process(target=producer, args=(queue, "P2"))
- ]
- consumers = [
- Process(target=consumer, args=(queue, "C1")),
- Process(target=consumer, args=(queue, "C2"))
- ]
-
- # 启动进程
- for p in producers:
- p.start()
- for c in consumers:
- c.start()
-
- # 等待生产者完成
- for p in producers:
- p.join()
-
- # 发送退出信号(每个消费者一个)
- for _ in consumers:
- queue.put("EXIT")
-
- # 等待消费者完成
- for c in consumers:
- c.join()
-
- print("所有进程结束")
复制代码 关键方法:
- queue.put(data, block=True, timeout=None)
- 向队列放入数据,block=True时队列满则阻塞,timeout设置超时时间(超时抛queue.Full)。
- queue.get(block=True, timeout=None)
- 从队列获取数据,block=True时队列空则阻塞,timeout设置超时时间(超时抛queue.Empty)。
- queue.qsize():返回当前队列中的数据量(非精确值,仅参考)。
- queue.empty()/queue.full():判断队列是否为空 / 满(非线程安全,仅参考)。
2.3.2 管道(Pipe)
两个进程的双向通信
Pipe用于两个进程之间的双向通信,底层基于操作系统的管道机制,速度比Queue快,但仅支持点对点通信。
核心特性:
- 双向通信:管道两端均可发送和接收数据。
- 无缓冲 / 有缓冲:默认是无缓冲(发送方阻塞直到接收方读取),也可创建有缓冲管道。
- 仅支持两个进程:多进程使用可能导致数据混乱。
基本用法:双向通信示例- from multiprocessing import Process, Pipe
- import time
- def process_a(conn):
- """进程A:发送数据并接收响应"""
- # 发送数据到进程B
- conn.send("Hello from A")
- print("A发送:Hello from A")
-
- # 接收进程B的响应
- response = conn.recv()
- print(f"A接收:{response}")
-
- # 关闭连接
- conn.close()
- def process_b(conn):
- """进程B:接收数据并发送响应"""
- # 接收进程A的数据
- data = conn.recv()
- print(f"B接收:{data}")
-
- # 发送响应到进程A
- time.sleep(1) # 模拟处理耗时
- conn.send("Hello from B")
- print("B发送:Hello from B")
-
- # 关闭连接
- conn.close()
- if __name__ == "__main__":
- # 创建管道:返回两个连接对象(conn1和conn2)
- conn_a, conn_b = Pipe() # 双向管道
-
- # 创建两个进程,分别传递管道的一端
- p_a = Process(target=process_a, args=(conn_a,))
- p_b = Process(target=process_b, args=(conn_b,))
-
- p_a.start()
- p_b.start()
-
- p_a.join()
- p_b.join()
-
- print("通信结束")
复制代码 关键方法:
- conn.send(data):发送数据(数据需可序列化,如字符串、列表等)。
- conn.recv():接收数据(阻塞直到有数据可接收)。
- conn.close():关闭连接。
- conn.poll(timeout=None):检查是否有数据可接收(非阻塞,timeout设置等待时间)。
2.3.3 共享内存(Value/Array)
高效的数据共享
共享内存允许多个进程直接访问同一块内存空间,是效率最高的 IPC 方式(无需数据拷贝),但需结合同步机制(如锁)避免竞态条件。multiprocessing提供Value(单个值)和Array(数组)两种共享内存工具。
核心特性:
- 高效:数据直接在内存中共享,无需序列化 / 反序列化。
- 类型限制:需指定数据类型(如'i'表示整数,'d'表示双精度浮点数)。
- 需同步:多进程读写时必须加锁,否则会导致数据混乱。
共享内存与同步- from multiprocessing import Process, Value, Array, Lock
- import time
- def increment_counter(counter, lock):
- """递增共享计数器(需加锁保护)"""
- for _ in range(100000):
- with lock: # 加锁确保原子操作
- counter.value += 1 # 访问共享内存中的值
- def modify_array(arr, lock, start, end):
- """修改共享数组(需加锁保护)"""
- with lock:
- for i in range(start, end):
- arr[i] = i * 2 # 访问共享数组
- time.sleep(1) # 模拟耗时
- if __name__ == "__main__":
- # 创建共享变量:Value(类型, 初始值)
- counter = Value('i', 0) # 'i'表示有符号整数
-
- # 创建共享数组:Array(类型, 初始值/长度)
- arr = Array('i', [0] * 10) # 长度为10的整数数组
-
- # 同步锁(确保共享内存操作安全)
- lock = Lock()
-
- # 测试共享计数器(2个进程并发递增)
- p1 = Process(target=increment_counter, args=(counter, lock))
- p2 = Process(target=increment_counter, args=(counter, lock))
-
- p1.start()
- p2.start()
- p1.join()
- p2.join()
-
- print(f"共享计数器结果:{counter.value}") # 预期200000
-
- # 测试共享数组(2个进程分别修改不同区间)
- p3 = Process(target=modify_array, args=(arr, lock, 0, 5))
- p4 = Process(target=modify_array, args=(arr, lock, 5, 10))
-
- p3.start()
- p4.start()
- p3.join()
- p4.join()
-
- print(f"共享数组结果:{list(arr)}") # 预期[0,2,4,6,8,10,12,14,16,18]
复制代码 类型代码:
Value和Array的类型参数需使用ctypes兼容的类型代码,常见类型:
- 'i':int(整数)
- 'd':double(双精度浮点数)
- 'f':float(单精度浮点数)
- 'c':char(字符)
2.3.4 管理器(Manager)
共享复杂数据结构
Manager通过创建一个服务器进程管理共享数据,其他进程通过网络连接访问该进程,支持共享复杂数据结构(如字典、列表、队列等),灵活性高但效率低于共享内存。
核心特性:
- 支持复杂类型:字典、列表、集合、自定义对象等。
- 多进程安全:内部实现了同步机制。
- 跨机器通信:可通过网络在不同机器的进程间共享数据(需配置地址)。
基本用法:共享字典和列表- from multiprocessing import Process, Manager
- import time
- def update_dict(shared_dict, name):
- """更新共享字典"""
- for i in range(3):
- shared_dict[f"{name}_{i}"] = i
- time.sleep(0.5)
- print(f"进程{name}更新字典:{shared_dict}")
- def append_list(shared_list, name):
- """向共享列表追加元素"""
- for i in range(3):
- shared_list.append(f"{name}_{i}")
- time.sleep(0.5)
- print(f"进程{name}更新列表:{shared_list}")
- if __name__ == "__main__":
- # 创建Manager对象(启动服务器进程)
- with Manager() as manager:
- # 创建共享字典和列表
- shared_dict = manager.dict() # 共享字典
- shared_list = manager.list() # 共享列表
-
- # 创建进程修改共享数据
- p1 = Process(target=update_dict, args=(shared_dict, "P1"))
- p2 = Process(target=update_dict, args=(shared_dict, "P2"))
- p3 = Process(target=append_list, args=(shared_list, "P3"))
- p4 = Process(target=append_list, args=(shared_list, "P4"))
-
- # 启动进程
- p1.start()
- p2.start()
- p3.start()
- p4.start()
-
- # 等待完成
- p1.join()
- p2.join()
- p3.join()
- p4.join()
-
- # 打印最终结果
- print("\n最终共享字典:", dict(shared_dict))
- print("最终共享列表:", list(shared_list))
复制代码 支持的共享类型:
Manager支持多种数据结构,通过以下方法创建:
- manager.dict():共享字典
- manager.list():共享列表
- manager.set():共享集合
- manager.Queue():共享队列
- manager.Value()/manager.Array():共享值 / 数组(类似multiprocessing的同名工具,但通过 Manager 管理)
2.4 进程池
multiprocessing.Pool与ProcessPoolExecutor
进程池用于管理多个进程,复用进程资源,适合批量 CPU 密集型任务。
进程池处理 CPU 密集型任务(计算素数)- from concurrent.futures import ProcessPoolExecutor
- import math
- def is_prime(n):
- """判断是否为素数(CPU密集型任务)"""
- if n < 2:
- return False
- for i in range(2, int(math.sqrt(n)) + 1):
- if n % i == 0:
- return False
- return True
- def count_primes(range_start, range_end):
- """统计指定范围内的素数数量"""
- count = 0
- for num in range(range_start, range_end):
- if is_prime(num):
- count += 1
- return f"{range_start}-{range_end} 素数数量:{count}"
- def main():
- # 划分任务:将1-100000分为4个区间,交给4个进程
- ranges = [(1, 25000), (25000, 50000), (50000, 75000), (75000, 100000)]
-
- # 创建进程池(最大4个进程,与CPU核心数匹配)
- with ProcessPoolExecutor(max_workers=4) as executor:
- # 提交所有任务并获取结果(按输入顺序返回)
- results = list(executor.map(lambda r: count_primes(r[0], r[1]), ranges))
-
- for res in results:
- print(res)
- if __name__ == "__main__":
- import time
- start = time.time()
- main()
- print(f"总耗时:{time.time() - start:.2f} 秒") # 多进程并行计算,耗时约为单进程的1/4
复制代码 3、异步编程:asyncio与协程
https://www.cnblogs.com/gange111/p/19109363
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |