忌才砟 发表于 2025-9-25 20:59:05

Python 并发编程

Python 并发编程是提升程序执行效率的核心技术,尤其在处理多任务场景(如网络请求、数据计算、文件 IO 等)时至关重要。
1、threading与线程池

多线程是 Python 中最常用的并发方式之一,通过创建多个线程实现任务并行执行。但受GIL(全局解释器锁) 限制,同一时刻只有一个线程执行 Python 字节码。
1.1 GIL

GIL 是 Python 解释器的一种机制,确保同一时刻只有一个线程执行 Python 代码。其影响:

[*]IO 密集型任务:线程等待 IO 时会释放 GIL,其他线程可执行,因此多线程有效(如网络请求时,线程阻塞等待响应,GIL 释放给其他线程)。
[*]CPU 密集型任务:线程持续占用 CPU,GIL 无法释放,多线程反而因切换开销降低效率(此时应选多进程)。
1.2 线程的创建与基本使用

通过threading.Thread类创建线程,核心参数:

[*]target:线程执行的函数。
[*]args/kwargs:传递给函数的参数。
[*]name:线程名称(用于调试)。
基础线程创建与执行
import threading
import time

def task(name, delay):
    """线程执行的任务:模拟IO操作(如网络请求)"""
    print(f"线程 {name} 启动,延迟 {delay} 秒")
    time.sleep(delay)# 模拟IO阻塞(此时GIL释放,其他线程可执行)
    print(f"线程 {name} 完成")

# 创建线程
thread1 = threading.Thread(target=task, args=("Thread-1", 2), name="T1")
thread2 = threading.Thread(target=task, args=("Thread-2", 1), name="T2")

# 启动线程
thread1.start()
thread2.start()

# 等待线程完成(主线程阻塞,直到子线程结束)
thread1.join()
thread2.join()

print("所有线程执行完毕")输出
线程 Thread-1 启动,延迟 2 秒
线程 Thread-2 启动,延迟 1 秒
线程 Thread-2 完成# 1秒后先完成
线程 Thread-1 完成# 再等1秒(共2秒)完成
所有线程执行完毕1.3 线程同步:避免竞态条件

多线程共享内存时,多个线程同时操作共享资源可能导致竞态条件(数据不一致)。需通过同步机制控制访问:
1.3.1 互斥锁(Lock)

最基础的同步工具
Lock是最简单的同步原语,通过 “加锁 - 操作 - 解锁” 流程保证临界区的互斥访问:

[*]acquire():获取锁(若锁已被占用,则阻塞等待)。
[*]release():释放锁(必须在acquire()后调用,否则报错)。
基本用法:解决竞态条件
import threading

counter = 0
lock = threading.Lock()# 创建互斥锁

def increment():
    global counter
    for _ in range(100000):
      lock.acquire()# 加锁:确保只有一个线程进入临界区
      try:
            # 临界区操作(安全)
            temp = counter
            temp += 1
            counter = temp
      finally:
            lock.release()# 解锁:无论是否异常,必须释放锁

t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)

t1.start()
t2.start()
t1.join()
t2.join()

print(f"最终counter值:{counter}")# 正确输出200000上下文管理器:简化锁操作
Lock支持with语句,自动完成acquire()和release(),避免遗漏解锁:
def increment():
    global counter
    for _ in range(100000):
      with lock:# 等价于acquire() + finally release()
            temp = counter
            temp += 1
            counter = temp1.3.2 可重入锁(RLock)

解决同一线程多次加锁问题
Lock不允许同一线程多次获取锁(会导致死锁),而RLock(可重入锁)允许同一线程多次获取锁,但需保证acquire()和release()次数相等。
递归函数或嵌套锁
import threading

rlock = threading.RLock()# 创建可重入锁

def inner():
    with rlock:# 同一线程再次获取锁(RLock允许)
      print("进入inner函数")

def outer():
    with rlock:# 第一次获取锁
      print("进入outer函数")
      inner()# 调用inner,需要再次获取锁

# 单线程中嵌套获取锁
outer()输出:
进入outer函数
进入inner函数说明:若用Lock替代RLock,inner()中with lock会阻塞(锁已被outer()占用),导致死锁。
1.3.3 信号量(Semaphore)

限制并发访问数量
Semaphore通过维护一个 “计数器” 控制同时访问资源的线程数:

[*]初始计数器值为允许的最大并发数。
[*]acquire():计数器 - 1(若为 0 则阻塞)。
[*]release():计数器 + 1。
限制资源并发访问(如连接池、爬虫速率控制)
import threading
import time

# 信号量:最多允许3个线程同时访问
semaphore = threading.Semaphore(3)

def access_resource(name):
    with semaphore:# 获取信号量(计数器-1)
      print(f"线程 {name} 开始访问资源")
      time.sleep(2)# 模拟资源访问耗时
      print(f"线程 {name} 结束访问资源")

# 创建5个线程竞争资源
threads =

for t in threads:
    t.start()
for t in threads:
    t.join()输出
线程 T0 开始访问资源
线程 T1 开始访问资源
线程 T2 开始访问资源
(2秒后)
线程 T0 结束访问资源
线程 T1 结束访问资源
线程 T2 结束访问资源
线程 T3 开始访问资源
线程 T4 开始访问资源
(再2秒后)
线程 T3 结束访问资源
线程 T4 结束访问资源说明:同一时间最多 3 个线程访问资源,超出的线程需等待其他线程释放信号量。
1.3.4 事件(Event)

线程间的信号通知
Event通过一个 “标志位” 实现线程间通信:

[*]set():将标志位设为True,唤醒所有等待的线程。
[*]clear():将标志位设为False。
[*]wait():若标志位为False则阻塞,直到set()被调用。
[*]is_set():判断标志位是否为True。
一个线程等待另一个线程的 “信号”(如初始化完成通知)
import threading
import time

# 创建事件(初始标志位为False)
event = threading.Event()

def init_task():
    print("初始化任务开始...")
    time.sleep(3)# 模拟初始化耗时
    print("初始化完成,发送信号")
    event.set()# 发送信号:标志位设为True

def worker(name):
    print(f"工作线程 {name} 等待初始化...")
    event.wait()# 阻塞等待信号
    print(f"工作线程 {name} 开始工作")

# 创建初始化线程和工作线程
t_init = threading.Thread(target=init_task)
t_workers =

t_init.start()
for t in t_workers:
    t.start()

t_init.join()
for t in t_workers:
    t.join()输出:
初始化任务开始...
工作线程 W0 等待初始化...
工作线程 W1 等待初始化...
工作线程 W2 等待初始化...
(3秒后)
初始化完成,发送信号
工作线程 W0 开始工作
工作线程 W1 开始工作
工作线程 W2 开始工作1.3.5 条件变量(Condition)

复杂条件的同步
Condition结合了Lock和Event的功能,允许线程等待某个 “条件成立”,并在条件变化时通知其他线程。核心方法:

[*]acquire()/release():与Lock一致,控制临界区。
[*]wait():释放锁并阻塞,等待被notify()唤醒(唤醒后需重新获取锁)。
[*]notify(n=1):唤醒最多n个等待的线程。
[*]notify_all():唤醒所有等待的线程。
生产者 - 消费者模型(队列满 / 空时的等待)
import threading
import time
from queue import Queue

# 创建条件变量(内部包含一个Lock)
condition = threading.Condition()
queue = Queue(maxsize=5)# 缓冲区:最多存5个数据

def producer(name):
    for i in range(10):# 生产10个数据
      with condition:# 获取锁
            # 若队列满,则等待(释放锁,允许消费者取数据)
            while queue.full():
                print(f"生产者 {name}:队列满,等待...")
                condition.wait()# 等待消费者通知
            # 生产数据
            data = f"数据{i}"
            queue.put(data)
            print(f"生产者 {name}:生产 {data},当前队列大小:{queue.qsize()}")
            condition.notify()# 通知消费者:有新数据
      time.sleep(0.5)# 模拟生产耗时

def consumer(name):
    while True:
      with condition:
            # 若队列空,则等待(释放锁,允许生产者存数据)
            while queue.empty():
                print(f"消费者 {name}:队列空,等待...")
                condition.wait()# 等待生产者通知
            # 消费数据
            data = queue.get()
            print(f"消费者 {name}:消费 {data},当前队列大小:{queue.qsize()}")
            condition.notify()# 通知生产者:有空闲位置
            queue.task_done()# 标记任务完成
      time.sleep(1)# 模拟消费耗时

# 创建生产者和消费者
t_producer = threading.Thread(target=producer, args=("P1",))
t_consumer1 = threading.Thread(target=consumer, args=("C1",), daemon=True)# 守护线程
t_consumer2 = threading.Thread(target=consumer, args=("C2",), daemon=True)

t_producer.start()
t_consumer1.start()
t_consumer2.start()

t_producer.join()
queue.join()# 等待所有数据被消费
print("所有数据处理完毕")输出
生产者 P1:生产 数据0,当前队列大小:1
消费者 C1:消费 数据0,当前队列大小:0
生产者 P1:生产 数据1,当前队列大小:1
消费者 C2:消费 数据1,当前队列大小:0
...说明:

[*]当队列满时,生产者调用wait()释放锁并等待,消费者取走数据后notify()唤醒生产者。
[*]当队列空时,消费者wait(),生产者放入数据后notify()唤醒消费者。
1.4 线程池

concurrent.futures.ThreadPoolExecutor
创建大量线程时,频繁创建 / 销毁线程会消耗资源。线程池通过复用线程提高效率,适合任务数量多的场景。
线程池处理批量任务
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch_url(url, delay):
    """模拟网络请求任务"""
    time.sleep(delay)
    return f"URL {url} 耗时 {delay} 秒"

def main():
    urls =
    delays = # 每个URL的模拟延迟
   
    # 创建线程池(最大5个线程)
    with ThreadPoolExecutor(max_workers=5) as executor:
      # 提交任务到线程池(返回Future对象列表)
      futures = [executor.submit(fetch_url, url, delay)
                  for url, delay in zip(urls, delays)]
      
      # 遍历已完成的任务(按完成顺序)
      for future in as_completed(futures):
            print(future.result())# 获取任务结果

if __name__ == "__main__":
    start = time.time()
    main()
    print(f"总耗时:{time.time() - start:.2f} 秒")# 约等于最长延迟4秒输出
URL https://example.com/0 耗时 1 秒
URL https://example.com/4 耗时 1 秒
URL https://example.com/2 耗时 2 秒
URL https://example.com/1 耗时 3 秒
URL https://example.com/3 耗时 4 秒
总耗时:4.01 秒2、multiprocessing与进程池

多进程通过创建独立进程实现并发,每个进程有自己的 Python 解释器和内存空间,完全避开 GIL 限制,适合 CPU 密集型任务(如数据计算、图像处理)。
2.1 进程与线程的核心区别


[*]内存空间:进程间内存不共享(需通过特定机制通信),线程共享同一进程的内存。
[*]开销:进程创建 / 销毁开销大(约是线程的 10 倍),切换成本高。
[*]GIL 影响:进程不受 GIL 限制,可利用多核 CPU;线程受 GIL 限制。
2.2 进程的创建与基本使用

通过multiprocessing.Process类创建进程,用法与threading.Thread类似,但需注意:

[*]Windows 系统中,进程函数需放在if __name__ == "__main__":内(避免无限递归创建进程)。
[*]进程间无法直接共享全局变量,需用multiprocessing.Queue、Pipe等工具通信。
基础进程创建与执行
import multiprocessing
import time

def process_task(name, delay):
    """进程执行的任务:模拟CPU计算(无IO阻塞)"""
    print(f"进程 {name} 启动,计算 {delay} 秒")
    # 模拟CPU密集型任务(循环计算)
    start = time.time()
    while time.time() - start < delay:
      pass# 空循环消耗CPU
    print(f"进程 {name} 完成")

if __name__ == "__main__":# Windows必须加此判断
    # 创建进程
    p1 = multiprocessing.Process(target=process_task, args=("Process-1", 2), name="P1")
    p2 = multiprocessing.Process(target=process_task, args=("Process-2", 2), name="P2")
   
    # 启动进程
    p1.start()
    p2.start()
   
    # 等待进程完成
    p1.join()
    p2.join()
   
    print("所有进程执行完毕")输出
进程 Process-1 启动,计算 2 秒
进程 Process-2 启动,计算 2 秒
进程 Process-1 完成# 约2秒后同时完成
进程 Process-2 完成
所有进程执行完毕2.3 进程间通信

进程内存不共享,需通过以下方式通信:
通信方式适用场景特点Queue多进程间传递数据(先进先出)线程安全,支持多生产者 / 多消费者Pipe两个进程间双向通信速度快,仅支持两个进程Manager共享复杂数据结构(列表、字典等)支持多种类型,但开销较大2.3.1 队列(Queue)

多进程安全的消息传递
Queue是基于消息传递的 IPC 方式,底层通过管道(Pipe)和锁(Lock)实现,支持多生产者、多消费者模型,是最常用的进程间通信工具。
核心特性:

[*]线程 / 进程安全:内部实现了同步机制,可安全用于多进程。
[*]FIFO(先进先出):数据按发送顺序接收。
[*]阻塞 / 非阻塞操作:put()/get()支持超时参数,避免永久阻塞。
生产者 - 消费者模型
from multiprocessing import Process, Queue
import time

def producer(queue, name):
    """生产者:向队列发送数据"""
    for i in range(5):
      data = f"生产者{name}的数据{i}"
      queue.put(data)# 向队列放入数据(满时阻塞)
      print(f"生产者{name}发送:{data}")
      time.sleep(0.5)# 模拟生产耗时

def consumer(queue, name):
    """消费者:从队列接收数据"""
    while True:
      data = queue.get()# 从队列获取数据(空时阻塞)
      if data == "EXIT":# 退出信号
            break
      print(f"消费者{name}接收:{data}")
      time.sleep(1)# 模拟消费耗时
    print(f"消费者{name}退出")

if __name__ == "__main__":
    # 创建队列(默认无界,可指定maxsize限制容量)
    queue = Queue(maxsize=3)# 队列最多存3个数据
   
    # 创建2个生产者和2个消费者进程
    producers = [
      Process(target=producer, args=(queue, "P1")),
      Process(target=producer, args=(queue, "P2"))
    ]
    consumers = [
      Process(target=consumer, args=(queue, "C1")),
      Process(target=consumer, args=(queue, "C2"))
    ]
   
    # 启动进程
    for p in producers:
      p.start()
    for c in consumers:
      c.start()
   
    # 等待生产者完成
    for p in producers:
      p.join()
   
    # 发送退出信号(每个消费者一个)
    for _ in consumers:
      queue.put("EXIT")
   
    # 等待消费者完成
    for c in consumers:
      c.join()
   
    print("所有进程结束")关键方法:

[*]queue.put(data, block=True, timeout=None)

[*]向队列放入数据,block=True时队列满则阻塞,timeout设置超时时间(超时抛queue.Full)。

[*]queue.get(block=True, timeout=None)

[*]从队列获取数据,block=True时队列空则阻塞,timeout设置超时时间(超时抛queue.Empty)。

[*]queue.qsize():返回当前队列中的数据量(非精确值,仅参考)。
[*]queue.empty()/queue.full():判断队列是否为空 / 满(非线程安全,仅参考)。
2.3.2 管道(Pipe)

两个进程的双向通信
Pipe用于两个进程之间的双向通信,底层基于操作系统的管道机制,速度比Queue快,但仅支持点对点通信。
核心特性:

[*]双向通信:管道两端均可发送和接收数据。
[*]无缓冲 / 有缓冲:默认是无缓冲(发送方阻塞直到接收方读取),也可创建有缓冲管道。
[*]仅支持两个进程:多进程使用可能导致数据混乱。
基本用法:双向通信示例
from multiprocessing import Process, Pipe
import time

def process_a(conn):
    """进程A:发送数据并接收响应"""
    # 发送数据到进程B
    conn.send("Hello from A")
    print("A发送:Hello from A")
   
    # 接收进程B的响应
    response = conn.recv()
    print(f"A接收:{response}")
   
    # 关闭连接
    conn.close()

def process_b(conn):
    """进程B:接收数据并发送响应"""
    # 接收进程A的数据
    data = conn.recv()
    print(f"B接收:{data}")
   
    # 发送响应到进程A
    time.sleep(1)# 模拟处理耗时
    conn.send("Hello from B")
    print("B发送:Hello from B")
   
    # 关闭连接
    conn.close()

if __name__ == "__main__":
    # 创建管道:返回两个连接对象(conn1和conn2)
    conn_a, conn_b = Pipe()# 双向管道
   
    # 创建两个进程,分别传递管道的一端
    p_a = Process(target=process_a, args=(conn_a,))
    p_b = Process(target=process_b, args=(conn_b,))
   
    p_a.start()
    p_b.start()
   
    p_a.join()
    p_b.join()
   
    print("通信结束")关键方法:

[*]conn.send(data):发送数据(数据需可序列化,如字符串、列表等)。
[*]conn.recv():接收数据(阻塞直到有数据可接收)。
[*]conn.close():关闭连接。
[*]conn.poll(timeout=None):检查是否有数据可接收(非阻塞,timeout设置等待时间)。
2.3.3 共享内存(Value/Array)

高效的数据共享
共享内存允许多个进程直接访问同一块内存空间,是效率最高的 IPC 方式(无需数据拷贝),但需结合同步机制(如锁)避免竞态条件。multiprocessing提供Value(单个值)和Array(数组)两种共享内存工具。
核心特性:

[*]高效:数据直接在内存中共享,无需序列化 / 反序列化。
[*]类型限制:需指定数据类型(如'i'表示整数,'d'表示双精度浮点数)。
[*]需同步:多进程读写时必须加锁,否则会导致数据混乱。
共享内存与同步
from multiprocessing import Process, Value, Array, Lock
import time

def increment_counter(counter, lock):
    """递增共享计数器(需加锁保护)"""
    for _ in range(100000):
      with lock:# 加锁确保原子操作
            counter.value += 1# 访问共享内存中的值

def modify_array(arr, lock, start, end):
    """修改共享数组(需加锁保护)"""
    with lock:
      for i in range(start, end):
            arr = i * 2# 访问共享数组
      time.sleep(1)# 模拟耗时

if __name__ == "__main__":
    # 创建共享变量:Value(类型, 初始值)
    counter = Value('i', 0)# 'i'表示有符号整数
   
    # 创建共享数组:Array(类型, 初始值/长度)
    arr = Array('i', * 10)# 长度为10的整数数组
   
    # 同步锁(确保共享内存操作安全)
    lock = Lock()
   
    # 测试共享计数器(2个进程并发递增)
    p1 = Process(target=increment_counter, args=(counter, lock))
    p2 = Process(target=increment_counter, args=(counter, lock))
   
    p1.start()
    p2.start()
    p1.join()
    p2.join()
   
    print(f"共享计数器结果:{counter.value}")# 预期200000
   
    # 测试共享数组(2个进程分别修改不同区间)
    p3 = Process(target=modify_array, args=(arr, lock, 0, 5))
    p4 = Process(target=modify_array, args=(arr, lock, 5, 10))
   
    p3.start()
    p4.start()
    p3.join()
    p4.join()
   
    print(f"共享数组结果:{list(arr)}")# 预期类型代码:
Value和Array的类型参数需使用ctypes兼容的类型代码,常见类型:

[*]'i':int(整数)
[*]'d':double(双精度浮点数)
[*]'f':float(单精度浮点数)
[*]'c':char(字符)
2.3.4 管理器(Manager)

共享复杂数据结构
Manager通过创建一个服务器进程管理共享数据,其他进程通过网络连接访问该进程,支持共享复杂数据结构(如字典、列表、队列等),灵活性高但效率低于共享内存。
核心特性:

[*]支持复杂类型:字典、列表、集合、自定义对象等。
[*]多进程安全:内部实现了同步机制。
[*]跨机器通信:可通过网络在不同机器的进程间共享数据(需配置地址)。
基本用法:共享字典和列表
from multiprocessing import Process, Manager
import time

def update_dict(shared_dict, name):
    """更新共享字典"""
    for i in range(3):
      shared_dict = i
      time.sleep(0.5)
      print(f"进程{name}更新字典:{shared_dict}")

def append_list(shared_list, name):
    """向共享列表追加元素"""
    for i in range(3):
      shared_list.append(f"{name}_{i}")
      time.sleep(0.5)
      print(f"进程{name}更新列表:{shared_list}")

if __name__ == "__main__":
    # 创建Manager对象(启动服务器进程)
    with Manager() as manager:
      # 创建共享字典和列表
      shared_dict = manager.dict()# 共享字典
      shared_list = manager.list()# 共享列表
      
      # 创建进程修改共享数据
      p1 = Process(target=update_dict, args=(shared_dict, "P1"))
      p2 = Process(target=update_dict, args=(shared_dict, "P2"))
      p3 = Process(target=append_list, args=(shared_list, "P3"))
      p4 = Process(target=append_list, args=(shared_list, "P4"))
      
      # 启动进程
      p1.start()
      p2.start()
      p3.start()
      p4.start()
      
      # 等待完成
      p1.join()
      p2.join()
      p3.join()
      p4.join()
      
      # 打印最终结果
      print("\n最终共享字典:", dict(shared_dict))
      print("最终共享列表:", list(shared_list))支持的共享类型:
Manager支持多种数据结构,通过以下方法创建:

[*]manager.dict():共享字典
[*]manager.list():共享列表
[*]manager.set():共享集合
[*]manager.Queue():共享队列
[*]manager.Value()/manager.Array():共享值 / 数组(类似multiprocessing的同名工具,但通过 Manager 管理)
2.4 进程池

multiprocessing.Pool与ProcessPoolExecutor
进程池用于管理多个进程,复用进程资源,适合批量 CPU 密集型任务。
进程池处理 CPU 密集型任务(计算素数)
from concurrent.futures import ProcessPoolExecutor
import math

def is_prime(n):
    """判断是否为素数(CPU密集型任务)"""
    if n < 2:
      return False
    for i in range(2, int(math.sqrt(n)) + 1):
      if n % i == 0:
            return False
    return True

def count_primes(range_start, range_end):
    """统计指定范围内的素数数量"""
    count = 0
    for num in range(range_start, range_end):
      if is_prime(num):
            count += 1
    return f"{range_start}-{range_end} 素数数量:{count}"

def main():
    # 划分任务:将1-100000分为4个区间,交给4个进程
    ranges = [(1, 25000), (25000, 50000), (50000, 75000), (75000, 100000)]
   
    # 创建进程池(最大4个进程,与CPU核心数匹配)
    with ProcessPoolExecutor(max_workers=4) as executor:
      # 提交所有任务并获取结果(按输入顺序返回)
      results = list(executor.map(lambda r: count_primes(r, r), ranges))
   
    for res in results:
      print(res)

if __name__ == "__main__":
    import time
    start = time.time()
    main()
    print(f"总耗时:{time.time() - start:.2f} 秒")# 多进程并行计算,耗时约为单进程的1/43、异步编程:asyncio与协程

https://www.cnblogs.com/gange111/p/19109363

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Python 并发编程