Python 并发编程
Python 并发编程是提升程序执行效率的核心技术,尤其在处理多任务场景(如网络请求、数据计算、文件 IO 等)时至关重要。1、threading与线程池
多线程是 Python 中最常用的并发方式之一,通过创建多个线程实现任务并行执行。但受GIL(全局解释器锁) 限制,同一时刻只有一个线程执行 Python 字节码。
1.1 GIL
GIL 是 Python 解释器的一种机制,确保同一时刻只有一个线程执行 Python 代码。其影响:
[*]IO 密集型任务:线程等待 IO 时会释放 GIL,其他线程可执行,因此多线程有效(如网络请求时,线程阻塞等待响应,GIL 释放给其他线程)。
[*]CPU 密集型任务:线程持续占用 CPU,GIL 无法释放,多线程反而因切换开销降低效率(此时应选多进程)。
1.2 线程的创建与基本使用
通过threading.Thread类创建线程,核心参数:
[*]target:线程执行的函数。
[*]args/kwargs:传递给函数的参数。
[*]name:线程名称(用于调试)。
基础线程创建与执行
import threading
import time
def task(name, delay):
"""线程执行的任务:模拟IO操作(如网络请求)"""
print(f"线程 {name} 启动,延迟 {delay} 秒")
time.sleep(delay)# 模拟IO阻塞(此时GIL释放,其他线程可执行)
print(f"线程 {name} 完成")
# 创建线程
thread1 = threading.Thread(target=task, args=("Thread-1", 2), name="T1")
thread2 = threading.Thread(target=task, args=("Thread-2", 1), name="T2")
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成(主线程阻塞,直到子线程结束)
thread1.join()
thread2.join()
print("所有线程执行完毕")输出
线程 Thread-1 启动,延迟 2 秒
线程 Thread-2 启动,延迟 1 秒
线程 Thread-2 完成# 1秒后先完成
线程 Thread-1 完成# 再等1秒(共2秒)完成
所有线程执行完毕1.3 线程同步:避免竞态条件
多线程共享内存时,多个线程同时操作共享资源可能导致竞态条件(数据不一致)。需通过同步机制控制访问:
1.3.1 互斥锁(Lock)
最基础的同步工具
Lock是最简单的同步原语,通过 “加锁 - 操作 - 解锁” 流程保证临界区的互斥访问:
[*]acquire():获取锁(若锁已被占用,则阻塞等待)。
[*]release():释放锁(必须在acquire()后调用,否则报错)。
基本用法:解决竞态条件
import threading
counter = 0
lock = threading.Lock()# 创建互斥锁
def increment():
global counter
for _ in range(100000):
lock.acquire()# 加锁:确保只有一个线程进入临界区
try:
# 临界区操作(安全)
temp = counter
temp += 1
counter = temp
finally:
lock.release()# 解锁:无论是否异常,必须释放锁
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最终counter值:{counter}")# 正确输出200000上下文管理器:简化锁操作
Lock支持with语句,自动完成acquire()和release(),避免遗漏解锁:
def increment():
global counter
for _ in range(100000):
with lock:# 等价于acquire() + finally release()
temp = counter
temp += 1
counter = temp1.3.2 可重入锁(RLock)
解决同一线程多次加锁问题
Lock不允许同一线程多次获取锁(会导致死锁),而RLock(可重入锁)允许同一线程多次获取锁,但需保证acquire()和release()次数相等。
递归函数或嵌套锁
import threading
rlock = threading.RLock()# 创建可重入锁
def inner():
with rlock:# 同一线程再次获取锁(RLock允许)
print("进入inner函数")
def outer():
with rlock:# 第一次获取锁
print("进入outer函数")
inner()# 调用inner,需要再次获取锁
# 单线程中嵌套获取锁
outer()输出:
进入outer函数
进入inner函数说明:若用Lock替代RLock,inner()中with lock会阻塞(锁已被outer()占用),导致死锁。
1.3.3 信号量(Semaphore)
限制并发访问数量
Semaphore通过维护一个 “计数器” 控制同时访问资源的线程数:
[*]初始计数器值为允许的最大并发数。
[*]acquire():计数器 - 1(若为 0 则阻塞)。
[*]release():计数器 + 1。
限制资源并发访问(如连接池、爬虫速率控制)
import threading
import time
# 信号量:最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
def access_resource(name):
with semaphore:# 获取信号量(计数器-1)
print(f"线程 {name} 开始访问资源")
time.sleep(2)# 模拟资源访问耗时
print(f"线程 {name} 结束访问资源")
# 创建5个线程竞争资源
threads =
for t in threads:
t.start()
for t in threads:
t.join()输出
线程 T0 开始访问资源
线程 T1 开始访问资源
线程 T2 开始访问资源
(2秒后)
线程 T0 结束访问资源
线程 T1 结束访问资源
线程 T2 结束访问资源
线程 T3 开始访问资源
线程 T4 开始访问资源
(再2秒后)
线程 T3 结束访问资源
线程 T4 结束访问资源说明:同一时间最多 3 个线程访问资源,超出的线程需等待其他线程释放信号量。
1.3.4 事件(Event)
线程间的信号通知
Event通过一个 “标志位” 实现线程间通信:
[*]set():将标志位设为True,唤醒所有等待的线程。
[*]clear():将标志位设为False。
[*]wait():若标志位为False则阻塞,直到set()被调用。
[*]is_set():判断标志位是否为True。
一个线程等待另一个线程的 “信号”(如初始化完成通知)
import threading
import time
# 创建事件(初始标志位为False)
event = threading.Event()
def init_task():
print("初始化任务开始...")
time.sleep(3)# 模拟初始化耗时
print("初始化完成,发送信号")
event.set()# 发送信号:标志位设为True
def worker(name):
print(f"工作线程 {name} 等待初始化...")
event.wait()# 阻塞等待信号
print(f"工作线程 {name} 开始工作")
# 创建初始化线程和工作线程
t_init = threading.Thread(target=init_task)
t_workers =
t_init.start()
for t in t_workers:
t.start()
t_init.join()
for t in t_workers:
t.join()输出:
初始化任务开始...
工作线程 W0 等待初始化...
工作线程 W1 等待初始化...
工作线程 W2 等待初始化...
(3秒后)
初始化完成,发送信号
工作线程 W0 开始工作
工作线程 W1 开始工作
工作线程 W2 开始工作1.3.5 条件变量(Condition)
复杂条件的同步
Condition结合了Lock和Event的功能,允许线程等待某个 “条件成立”,并在条件变化时通知其他线程。核心方法:
[*]acquire()/release():与Lock一致,控制临界区。
[*]wait():释放锁并阻塞,等待被notify()唤醒(唤醒后需重新获取锁)。
[*]notify(n=1):唤醒最多n个等待的线程。
[*]notify_all():唤醒所有等待的线程。
生产者 - 消费者模型(队列满 / 空时的等待)
import threading
import time
from queue import Queue
# 创建条件变量(内部包含一个Lock)
condition = threading.Condition()
queue = Queue(maxsize=5)# 缓冲区:最多存5个数据
def producer(name):
for i in range(10):# 生产10个数据
with condition:# 获取锁
# 若队列满,则等待(释放锁,允许消费者取数据)
while queue.full():
print(f"生产者 {name}:队列满,等待...")
condition.wait()# 等待消费者通知
# 生产数据
data = f"数据{i}"
queue.put(data)
print(f"生产者 {name}:生产 {data},当前队列大小:{queue.qsize()}")
condition.notify()# 通知消费者:有新数据
time.sleep(0.5)# 模拟生产耗时
def consumer(name):
while True:
with condition:
# 若队列空,则等待(释放锁,允许生产者存数据)
while queue.empty():
print(f"消费者 {name}:队列空,等待...")
condition.wait()# 等待生产者通知
# 消费数据
data = queue.get()
print(f"消费者 {name}:消费 {data},当前队列大小:{queue.qsize()}")
condition.notify()# 通知生产者:有空闲位置
queue.task_done()# 标记任务完成
time.sleep(1)# 模拟消费耗时
# 创建生产者和消费者
t_producer = threading.Thread(target=producer, args=("P1",))
t_consumer1 = threading.Thread(target=consumer, args=("C1",), daemon=True)# 守护线程
t_consumer2 = threading.Thread(target=consumer, args=("C2",), daemon=True)
t_producer.start()
t_consumer1.start()
t_consumer2.start()
t_producer.join()
queue.join()# 等待所有数据被消费
print("所有数据处理完毕")输出
生产者 P1:生产 数据0,当前队列大小:1
消费者 C1:消费 数据0,当前队列大小:0
生产者 P1:生产 数据1,当前队列大小:1
消费者 C2:消费 数据1,当前队列大小:0
...说明:
[*]当队列满时,生产者调用wait()释放锁并等待,消费者取走数据后notify()唤醒生产者。
[*]当队列空时,消费者wait(),生产者放入数据后notify()唤醒消费者。
1.4 线程池
concurrent.futures.ThreadPoolExecutor
创建大量线程时,频繁创建 / 销毁线程会消耗资源。线程池通过复用线程提高效率,适合任务数量多的场景。
线程池处理批量任务
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_url(url, delay):
"""模拟网络请求任务"""
time.sleep(delay)
return f"URL {url} 耗时 {delay} 秒"
def main():
urls =
delays = # 每个URL的模拟延迟
# 创建线程池(最大5个线程)
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务到线程池(返回Future对象列表)
futures = [executor.submit(fetch_url, url, delay)
for url, delay in zip(urls, delays)]
# 遍历已完成的任务(按完成顺序)
for future in as_completed(futures):
print(future.result())# 获取任务结果
if __name__ == "__main__":
start = time.time()
main()
print(f"总耗时:{time.time() - start:.2f} 秒")# 约等于最长延迟4秒输出
URL https://example.com/0 耗时 1 秒
URL https://example.com/4 耗时 1 秒
URL https://example.com/2 耗时 2 秒
URL https://example.com/1 耗时 3 秒
URL https://example.com/3 耗时 4 秒
总耗时:4.01 秒2、multiprocessing与进程池
多进程通过创建独立进程实现并发,每个进程有自己的 Python 解释器和内存空间,完全避开 GIL 限制,适合 CPU 密集型任务(如数据计算、图像处理)。
2.1 进程与线程的核心区别
[*]内存空间:进程间内存不共享(需通过特定机制通信),线程共享同一进程的内存。
[*]开销:进程创建 / 销毁开销大(约是线程的 10 倍),切换成本高。
[*]GIL 影响:进程不受 GIL 限制,可利用多核 CPU;线程受 GIL 限制。
2.2 进程的创建与基本使用
通过multiprocessing.Process类创建进程,用法与threading.Thread类似,但需注意:
[*]Windows 系统中,进程函数需放在if __name__ == "__main__":内(避免无限递归创建进程)。
[*]进程间无法直接共享全局变量,需用multiprocessing.Queue、Pipe等工具通信。
基础进程创建与执行
import multiprocessing
import time
def process_task(name, delay):
"""进程执行的任务:模拟CPU计算(无IO阻塞)"""
print(f"进程 {name} 启动,计算 {delay} 秒")
# 模拟CPU密集型任务(循环计算)
start = time.time()
while time.time() - start < delay:
pass# 空循环消耗CPU
print(f"进程 {name} 完成")
if __name__ == "__main__":# Windows必须加此判断
# 创建进程
p1 = multiprocessing.Process(target=process_task, args=("Process-1", 2), name="P1")
p2 = multiprocessing.Process(target=process_task, args=("Process-2", 2), name="P2")
# 启动进程
p1.start()
p2.start()
# 等待进程完成
p1.join()
p2.join()
print("所有进程执行完毕")输出
进程 Process-1 启动,计算 2 秒
进程 Process-2 启动,计算 2 秒
进程 Process-1 完成# 约2秒后同时完成
进程 Process-2 完成
所有进程执行完毕2.3 进程间通信
进程内存不共享,需通过以下方式通信:
通信方式适用场景特点Queue多进程间传递数据(先进先出)线程安全,支持多生产者 / 多消费者Pipe两个进程间双向通信速度快,仅支持两个进程Manager共享复杂数据结构(列表、字典等)支持多种类型,但开销较大2.3.1 队列(Queue)
多进程安全的消息传递
Queue是基于消息传递的 IPC 方式,底层通过管道(Pipe)和锁(Lock)实现,支持多生产者、多消费者模型,是最常用的进程间通信工具。
核心特性:
[*]线程 / 进程安全:内部实现了同步机制,可安全用于多进程。
[*]FIFO(先进先出):数据按发送顺序接收。
[*]阻塞 / 非阻塞操作:put()/get()支持超时参数,避免永久阻塞。
生产者 - 消费者模型
from multiprocessing import Process, Queue
import time
def producer(queue, name):
"""生产者:向队列发送数据"""
for i in range(5):
data = f"生产者{name}的数据{i}"
queue.put(data)# 向队列放入数据(满时阻塞)
print(f"生产者{name}发送:{data}")
time.sleep(0.5)# 模拟生产耗时
def consumer(queue, name):
"""消费者:从队列接收数据"""
while True:
data = queue.get()# 从队列获取数据(空时阻塞)
if data == "EXIT":# 退出信号
break
print(f"消费者{name}接收:{data}")
time.sleep(1)# 模拟消费耗时
print(f"消费者{name}退出")
if __name__ == "__main__":
# 创建队列(默认无界,可指定maxsize限制容量)
queue = Queue(maxsize=3)# 队列最多存3个数据
# 创建2个生产者和2个消费者进程
producers = [
Process(target=producer, args=(queue, "P1")),
Process(target=producer, args=(queue, "P2"))
]
consumers = [
Process(target=consumer, args=(queue, "C1")),
Process(target=consumer, args=(queue, "C2"))
]
# 启动进程
for p in producers:
p.start()
for c in consumers:
c.start()
# 等待生产者完成
for p in producers:
p.join()
# 发送退出信号(每个消费者一个)
for _ in consumers:
queue.put("EXIT")
# 等待消费者完成
for c in consumers:
c.join()
print("所有进程结束")关键方法:
[*]queue.put(data, block=True, timeout=None)
[*]向队列放入数据,block=True时队列满则阻塞,timeout设置超时时间(超时抛queue.Full)。
[*]queue.get(block=True, timeout=None)
[*]从队列获取数据,block=True时队列空则阻塞,timeout设置超时时间(超时抛queue.Empty)。
[*]queue.qsize():返回当前队列中的数据量(非精确值,仅参考)。
[*]queue.empty()/queue.full():判断队列是否为空 / 满(非线程安全,仅参考)。
2.3.2 管道(Pipe)
两个进程的双向通信
Pipe用于两个进程之间的双向通信,底层基于操作系统的管道机制,速度比Queue快,但仅支持点对点通信。
核心特性:
[*]双向通信:管道两端均可发送和接收数据。
[*]无缓冲 / 有缓冲:默认是无缓冲(发送方阻塞直到接收方读取),也可创建有缓冲管道。
[*]仅支持两个进程:多进程使用可能导致数据混乱。
基本用法:双向通信示例
from multiprocessing import Process, Pipe
import time
def process_a(conn):
"""进程A:发送数据并接收响应"""
# 发送数据到进程B
conn.send("Hello from A")
print("A发送:Hello from A")
# 接收进程B的响应
response = conn.recv()
print(f"A接收:{response}")
# 关闭连接
conn.close()
def process_b(conn):
"""进程B:接收数据并发送响应"""
# 接收进程A的数据
data = conn.recv()
print(f"B接收:{data}")
# 发送响应到进程A
time.sleep(1)# 模拟处理耗时
conn.send("Hello from B")
print("B发送:Hello from B")
# 关闭连接
conn.close()
if __name__ == "__main__":
# 创建管道:返回两个连接对象(conn1和conn2)
conn_a, conn_b = Pipe()# 双向管道
# 创建两个进程,分别传递管道的一端
p_a = Process(target=process_a, args=(conn_a,))
p_b = Process(target=process_b, args=(conn_b,))
p_a.start()
p_b.start()
p_a.join()
p_b.join()
print("通信结束")关键方法:
[*]conn.send(data):发送数据(数据需可序列化,如字符串、列表等)。
[*]conn.recv():接收数据(阻塞直到有数据可接收)。
[*]conn.close():关闭连接。
[*]conn.poll(timeout=None):检查是否有数据可接收(非阻塞,timeout设置等待时间)。
2.3.3 共享内存(Value/Array)
高效的数据共享
共享内存允许多个进程直接访问同一块内存空间,是效率最高的 IPC 方式(无需数据拷贝),但需结合同步机制(如锁)避免竞态条件。multiprocessing提供Value(单个值)和Array(数组)两种共享内存工具。
核心特性:
[*]高效:数据直接在内存中共享,无需序列化 / 反序列化。
[*]类型限制:需指定数据类型(如'i'表示整数,'d'表示双精度浮点数)。
[*]需同步:多进程读写时必须加锁,否则会导致数据混乱。
共享内存与同步
from multiprocessing import Process, Value, Array, Lock
import time
def increment_counter(counter, lock):
"""递增共享计数器(需加锁保护)"""
for _ in range(100000):
with lock:# 加锁确保原子操作
counter.value += 1# 访问共享内存中的值
def modify_array(arr, lock, start, end):
"""修改共享数组(需加锁保护)"""
with lock:
for i in range(start, end):
arr = i * 2# 访问共享数组
time.sleep(1)# 模拟耗时
if __name__ == "__main__":
# 创建共享变量:Value(类型, 初始值)
counter = Value('i', 0)# 'i'表示有符号整数
# 创建共享数组:Array(类型, 初始值/长度)
arr = Array('i', * 10)# 长度为10的整数数组
# 同步锁(确保共享内存操作安全)
lock = Lock()
# 测试共享计数器(2个进程并发递增)
p1 = Process(target=increment_counter, args=(counter, lock))
p2 = Process(target=increment_counter, args=(counter, lock))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"共享计数器结果:{counter.value}")# 预期200000
# 测试共享数组(2个进程分别修改不同区间)
p3 = Process(target=modify_array, args=(arr, lock, 0, 5))
p4 = Process(target=modify_array, args=(arr, lock, 5, 10))
p3.start()
p4.start()
p3.join()
p4.join()
print(f"共享数组结果:{list(arr)}")# 预期类型代码:
Value和Array的类型参数需使用ctypes兼容的类型代码,常见类型:
[*]'i':int(整数)
[*]'d':double(双精度浮点数)
[*]'f':float(单精度浮点数)
[*]'c':char(字符)
2.3.4 管理器(Manager)
共享复杂数据结构
Manager通过创建一个服务器进程管理共享数据,其他进程通过网络连接访问该进程,支持共享复杂数据结构(如字典、列表、队列等),灵活性高但效率低于共享内存。
核心特性:
[*]支持复杂类型:字典、列表、集合、自定义对象等。
[*]多进程安全:内部实现了同步机制。
[*]跨机器通信:可通过网络在不同机器的进程间共享数据(需配置地址)。
基本用法:共享字典和列表
from multiprocessing import Process, Manager
import time
def update_dict(shared_dict, name):
"""更新共享字典"""
for i in range(3):
shared_dict = i
time.sleep(0.5)
print(f"进程{name}更新字典:{shared_dict}")
def append_list(shared_list, name):
"""向共享列表追加元素"""
for i in range(3):
shared_list.append(f"{name}_{i}")
time.sleep(0.5)
print(f"进程{name}更新列表:{shared_list}")
if __name__ == "__main__":
# 创建Manager对象(启动服务器进程)
with Manager() as manager:
# 创建共享字典和列表
shared_dict = manager.dict()# 共享字典
shared_list = manager.list()# 共享列表
# 创建进程修改共享数据
p1 = Process(target=update_dict, args=(shared_dict, "P1"))
p2 = Process(target=update_dict, args=(shared_dict, "P2"))
p3 = Process(target=append_list, args=(shared_list, "P3"))
p4 = Process(target=append_list, args=(shared_list, "P4"))
# 启动进程
p1.start()
p2.start()
p3.start()
p4.start()
# 等待完成
p1.join()
p2.join()
p3.join()
p4.join()
# 打印最终结果
print("\n最终共享字典:", dict(shared_dict))
print("最终共享列表:", list(shared_list))支持的共享类型:
Manager支持多种数据结构,通过以下方法创建:
[*]manager.dict():共享字典
[*]manager.list():共享列表
[*]manager.set():共享集合
[*]manager.Queue():共享队列
[*]manager.Value()/manager.Array():共享值 / 数组(类似multiprocessing的同名工具,但通过 Manager 管理)
2.4 进程池
multiprocessing.Pool与ProcessPoolExecutor
进程池用于管理多个进程,复用进程资源,适合批量 CPU 密集型任务。
进程池处理 CPU 密集型任务(计算素数)
from concurrent.futures import ProcessPoolExecutor
import math
def is_prime(n):
"""判断是否为素数(CPU密集型任务)"""
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
def count_primes(range_start, range_end):
"""统计指定范围内的素数数量"""
count = 0
for num in range(range_start, range_end):
if is_prime(num):
count += 1
return f"{range_start}-{range_end} 素数数量:{count}"
def main():
# 划分任务:将1-100000分为4个区间,交给4个进程
ranges = [(1, 25000), (25000, 50000), (50000, 75000), (75000, 100000)]
# 创建进程池(最大4个进程,与CPU核心数匹配)
with ProcessPoolExecutor(max_workers=4) as executor:
# 提交所有任务并获取结果(按输入顺序返回)
results = list(executor.map(lambda r: count_primes(r, r), ranges))
for res in results:
print(res)
if __name__ == "__main__":
import time
start = time.time()
main()
print(f"总耗时:{time.time() - start:.2f} 秒")# 多进程并行计算,耗时约为单进程的1/43、异步编程:asyncio与协程
https://www.cnblogs.com/gange111/p/19109363
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]