Python 基于线程的并行 threading

Posted on Wed, 25 Dec 2024 16:10:35 +0800 by LiangMingJian


前言

在开发过程中,如果需要并发的执行多个任务,Python 提供两种并行方式供开发者使用,分别是基于线程的并行 threading 和基于进程的并行 multiprocessing。

线程和进程的区别可以查看另一篇文章: 什么是多进程和多线程?什么是协程?

本文介绍基于线程的并行 threading 模块,另一种基于进程的将在另一篇文章介绍。

基于线程的并行 threading 是 Python 提供用以在单个进程内并发运行多个线程的一个模块。它允许创建和管理线程,以便能够平行地执行多个任务,并共享内存空间。

基本使用

threading 模块的使用,最重要的步骤是实例化类 Thread,并传入工作函数与参数。

Thread(group=None, target=None, name=None, args=(), kwargs={}, daemon=None)

  • group:保留参数,不用管,未来类 ThreadGroup 的扩展。
  • target:工作函数的函数名。
  • name:线程名,默认以 Thread-N 构建,也可以自定义。
  • args:工作函数的位置参数,使用元组形式传递。
  • kwargs:工作函数的关键字参数,使用字典形式传递。
  • daemon:True/False,是否设置为守护线程(守护线程是为其他线程提供支持性服务,如垃圾回收、日志记录、资源监控等的线程。注意,守护线程在所有线程结束后强制结束,不管守护线程有没有执行结束)。

在实例化类 Thread 后,便可以通过列表保存线程池,接着对线程池内的每一个实例通过 start() 方法进行调用,同时使用 join() 方法进行等待,避免主线程提前关闭(如果不使用 join() 方法阻塞主线程,那么主程序有可能会在启动多线程后便直接结束,不会等待多线程的执行返回,最终导致多线程执行失败)。

特别的,join() 方法支持超时参数 timeout(单位秒),允许设置超时时间来在线程异常时不再阻塞主线程,继续执行主线程。

不过要注意的是,子线程在超时后不会结束,只是放在后台执行,不再阻塞主线程,通过 thread.is_alive()threading.active_count() 还是可以看见线程在运行的(Python 的 threading 模块不提供直接结束线程的方法,因此请尽量保证线程可以正常的执行并返回值,避免设置超时时间)。

示例的代码如下:

import threading  
import time  
  
# 模拟阻塞操作  
def crawl(date, delay=3):  
    print(f"crawl started for {date}")  
    time.sleep(delay)  
    print(f"crawl ended for {date}")  
  
# 建立工作线程池,并传入工作函数  
threads = []  
for each in range(3):  
    # 实例化类 Thread    
    # 使用 `args` 传入位置参数并使用 `kwargs` 传入关键字参数  
    t = threading.Thread(target=crawl, args=(each,), kwargs={"delay": 2})  
    threads.append(t)  
  
# 设置并启动守护线程 
daemon = threading.Thread(target=crawl, args=(100,), daemon=True)  
daemon.start()  
  
# 启动每个线程
for t in threads:  
    t.start()  
  
# 检查子线程是否还没结束  
for t in threads:  
    if t.is_alive():  
        print(f'{t.name} 还在执行')  

# 查看当前的线程计数
# 输出 5(主线程 1 个 + 守护线程 1 个 + 子线程 3 个)
print(threading.active_count())  
  
# 等待所有线程结束  
for t in threads:  
    t.join()

线程局部数据

threading.local() 为基于线程的并发提供一个设置线程局部数据的功能,通过 threading.local() 设置的变量,在子线程内的所有调用修改都不会影响到主线程,主线程的所有调用修改也不会影响子线程,即每次线程里面的 local 变量都是新的

import threading  
import time  

# 创建一个线程局部数据
mydata = threading.local()  
# 在主线程中设置一个变量 number
mydata.number = 100  
  
# 模拟阻塞操作  
def crawl(date, delay=3): 
    # 在子线程内设置一个变量 number 和 name
    mydata.name = '子线程'  
    mydata.number = date  
    print(f'{mydata.name}: {mydata.number}')  # 子线程: 1
    time.sleep(delay)  
  
t = threading.Thread(target=crawl, args=(1,), kwargs={"delay": 2})  
t.start()  
t.join()  

print(mydata.number)  # 100
print(mydata.name)  # no attribute 'name'

特别的,可以通过继承 threading.local() 实现一些默认值支持。

import threading  
import time  
  
class MyLocal(threading.local):  
    name = '线程'  
  
mydata = MyLocal()  
mydata.number = 100  
  
# 模拟阻塞操作  
def crawl(date, delay=3):  
    mydata.number = date  
    print(f'{mydata.name}: {mydata.number}')  # 线程: 1 
    time.sleep(delay)  
  
t = threading.Thread(target=crawl, args=(1,), kwargs={"delay": 2})  
t.start()  
t.join()  
  
print(mydata.number)  # 100  
print(mydata.name)  # 线程

线程同步

线程锁

在 threading 模块中,提供 3 种类对象来实现线程同步,这些类对象有 LockRLockCondition

Lock 是最简单的互斥锁,其要求同一时间只有一个线程可以访问资源。使用时可直接通过 with 语句进行自动获取和释放(如需手动管理,则使用locked() 检查,acquire() 获取,release() 释放等方法)。

import threading  
import time  
  
lock = threading.Lock()  
data = 0  
  
def increment():  
    time.sleep(3)  
    global data  
    # 自动获取和释放锁  
    with lock:  
        data += 1
     
threads = []  
for each in range(3):  
    t = threading.Thread(target=increment)  
    threads.append(t)  
  
for t in threads:  
    t.start()  
  
for t in threads:  
    t.join()  

print(data)  # 输出结果 3

RLock 是适用于递归函数或嵌套函数调用时使用的锁,其要求在锁定状态下,某些线程拥有锁;在非锁定状态下,没有线程拥有它。使用时同样可以通过 with 语句进行自动获取和释放。

import threading  
  
rlock = threading.RLock()  
data = 0  

# 一个递归函数,需要重复使用锁
def recursive(n):  
    global data  
    with rlock:  
        if n > 0:  
            data += 1  
            recursive(n - 1)  
  
threads = []  
for each in range(3):  
    t = threading.Thread(target=recursive, args=(2, ))  
    threads.append(t)  
  
for t in threads:  
    t.start()  
  
for t in threads:  
    t.join()  
  
print(data)  # 输出结果 6

Condition 是结合了条件判断的一个锁,其要求线程在特定条件下等待或通知其他线程继续操作。与 Lock 和 RLock 不同,Condition 除了要通过 with 语句使用外,还需要手动通过 condition.wait()condition.notify(n=1)(默认唤醒一个等待这个条件的线程,最多随机唤醒 n 个等待的线程),condition.notify_all() (唤醒所有)来等待和通知其他线程执行。

import threading  
import time  
  
condition = threading.Condition()  
data = []  
  
def producer():  
    global data  
    with condition:  
        time.sleep(3)  
        data.append(1)  
        # 通知等待这个锁的随机 1 个线程继续执行  
        condition.notify()  
  
def consumer():  
    global data  
    with condition:  
        # 当列表为空时,释放锁,然后挂起等待 
        while not data:  
            condition.wait()  
        data.append(2)
  
# 启动线程  
t1 = threading.Thread(target=producer)  
t2 = threading.Thread(target=consumer)  
  
t1.start()  
t2.start()  
  
t1.join()  
t2.join()  
  
print(data)  # 输出:[1, 2]

线程信号量(数量限制)

此外,针对线程同步,threading 模块还提供类 SemaphoreBoundedSemaphore 用于控制同时访问共享资源的线程数量

SemaphoreBoundedSemaphore 的控制是通过锁的释放计数进行的,每次释放都会使内部的计数器加 1,当有锁被获取时,则计数器减 1。两者的区别在于 BoundedSemaphore 对资源限制更为严格,当锁释放超过设定值时,就会抛出异常。

Semaphore 的使用

import threading  
import time  
  
semaphore = threading.Semaphore(3)  # 允许最多 3 个线程同时访问  
  
def task():  
    with semaphore:  
        print(threading.current_thread().name, "执行任务")  
        time.sleep(3)  
        # 先打印 3 个线程,最后再打印剩下的 2 个线程
  
threads = [threading.Thread(target=task) for _ in range(5)]  
  
for t in threads:  
    t.start()  
  
for t in threads:  
    t.join()

BoundedSemaphore 的使用

import threading  

# 计数器最多是 3 
semaphore = threading.BoundedSemaphore(3)  # 允许最多 3 个线程同时访问
# 同时释放 4 个锁,导致计数器加 4,超过限制
semaphore.release(4)  # ValueError: Semaphore released too many times

线程栅栏(同步等待)

特别的,若要让线程在达到某个条件前都进行等待,然后在条件达到后一起执行,则可以使用 Barrier 对象。

Barrier 对象支持传入参数 parties(目标线程数),action(等待完成后的回调),timeout(超时时间)来进行实例化。

实例化后的对象可以通过方法 wait() 来进行等待,当等待的线程数等于 parties 时,所有线程开始执行。

存在方法 reset()abort() 重置等待和中断登录,但会导致已完成等待的线程或后续调用 wait() 操作的线程出现报错,请尽量避免使用。

import threading  
  
def ready_action():  
    print("所有线程已准备")  
  
barrier = threading.Barrier(3, ready_action)  
  
def worker(name):  
    print(f"{name}已准备")  
    barrier.wait()  
    print(f"{name}开始操作")  
  
threads = [  
    threading.Thread(target=worker, args=("A",)),  
    threading.Thread(target=worker, args=("B",)),  
    threading.Thread(target=worker, args=("C",))  
]  
  
for t in threads:  
    t.start()  
      
for t in threads:  
    t.join()

线程通信

threading 模块通过 Even 对象来实现线程通信。Even 即事件,也可以说是事件信号,一个线程通过发送事件信号给其他线程来完成通信。

一个 Even 对象通过调用 set() 方法设置/发送信号为 True,通过 wait() 方法等待/接收信号为真,通过 clear() 清理信号为 False。

import threading  
import time  
  
even = threading.Event()  
  
def producer():  
    print("设置信号...")  
    time.sleep(3)  
    even.set()   # 发送信号  
  
def consumer():  
    print("等待信号...")  
    even.wait()  # 阻塞等待  
    print("收到数据")  
  
t1 = threading.Thread(target=producer)  
t2 = threading.Thread(target=consumer)  
  
t1.start()  
t2.start() 

t1.join()  
t2.join()

拓展

threading 模块提供一个定时器,用以在给定时间后执行相应方法函数。

import threading  
  
def hello():  
    print("hello, world")  

# 3 秒后执行 hello 方法
t = threading.Timer(3, hello)  
t.start()

————————————

threading — 基于线程的并行