multiprocessing模块
1 Python为什么慢
我们经常听程序员争论”XXX才是世界上最好的语言!”有一个理由常常让Python的拥趸者哑口无言“Python太慢了!”
以下是 The Benchmarks Game 面向主流编程语言设计的性能测试榜单,灰色反映的是时间效率,越短代表性能越好,棕色则是基于执行时间和内存开销的加权值。

Pthon3处于垫底水平,那Python为什么会慢呢?
开发者写的代码不够pythonic
对象复制
下面两种对象复制方法有数倍的速度差异。
from random import randint from time import time data = [str(randint(1000000, 9999999)) for _ in range(10000000)] s = time() all_add = '' for x in data: all_add += x e = time() print(e-s) # 1.4891037940979004 s = time() all_join = ''.join(data) e = time() print(e-s) # 0.1853940486907959
迭代器
用好迭代器能加速部分场景。
迭代器有惰性求值的特性,并非一次性将所有元素加载到内存中,而是在每次需要时才计算并返回下一个元素。
运行下面的程序,会发现get_data_1与get_data_2所用的时间类似,但是get_data_1的三个url几乎是同时输出,而get_data_2的url是一秒一个。
from time import time, sleep urls = [ "url1", "url2", "url3", ] def get_data_1(urls): results = [] for url in urls: sleep(1) results.append("data for " + url) return results def get_data_2(urls): for url in urls: sleep(1) yield "data for " + url s = time() for data in get_data_1(urls): print(data) e = time() print(e-s) # 3.0031187534332275 s = time() for data in get_data_2(urls): print(data) e = time() print(e-s) # 3.003131628036499
把代码修改成这样应该很容易看出来了:
from time import time, sleep urls = [ "url1", "url2", "url3", ] def get_data_1(urls): results = [] for url in urls: sleep(1) results.append("data for " + url) return results def get_data_2(urls): for url in urls: sleep(1) yield "data for " + url s = time() for i, data in enumerate(get_data_1(urls)): if i == 0: break print(i, data) e = time() print(e-s) # 3.0031187534332275 s = time() for i, data in enumerate(get_data_2(urls)): if i == 0: break print(i, data) e = time() print(e-s) # 1.0010180473327637
不难看到,迭代器的i=0后面的程序就没有再运行了。
数据结构
不同的数据结构容易造成较大的性能差异,比如:
from time import time data1 = list(range(0, 100000)) data2 = set(range(0, 100000)) s1 = time() 99999 in data1 e1 = time() print(e1-s1) # 0.0006175041198730469 s2 = time() 99999 in data2 # 1.1920928955078125e-06 e2 = time() print(e2-s2)
set是用hash table实现的,hash表查询速度更快
GIL (Global Interpret Lock)

全局解释器锁,确保在任何一个时刻,同一个Python进程中只有一个线程能够执行Python字节码。因此如果两个线程并发的话还是要做序列化,所以我们才会说Python中无法实现真正的多线程。
下面是一个例子:
from threading import Thread
from time import time
def empty_loop():
for _ in range(100000000):
pass # 耗时程序
def test(thread_count=1, func=empty_loop):
thread_list = [Thread(target=func) for _ in range(thread_count)]
s = time()
for task in thread_list:
task.start()
for task in thread_list:
task.join()
e = time()
print('{} threads, {} seconds'.format(thread_count, int(e-s)))
if __name__ == "__main__":
test(1) # 1 thread, 1 seconds
test(2) # 2 threads, 2 seconds
test(3) # 3 threads, 3 seconds
loop是占用了CPU,而同一进程内只能有一个线程执行,所以会慢,这个代码只是虚有多线程的壳子,没有起到多线程的作用。
2 Python并发解决方案
当然,对于IO密集型的任务(比如文件读写),上面的多线程代码是成立的,因为计算机在等待IO的过程中不占用CPU,可以切换下一个线程。针对IO密集型的任务可以使用各类异步编程库:asyncio, aiohttp, eventlet, twisted等,可以后续文章再细讲。
针对上述CPU密集型的任务(比如计算正则表达式),可以使用多进程。
将上述代码改成多进程,就可以顺利执行:
from multiprocessing import Process
from time import time
def empty_loop():
for _ in range(100000000):
pass # 耗时程序
def test(thread_count=1, func=empty_loop):
thread_list = [Process(target=func) for _ in range(thread_count)]
s = time()
for task in thread_list:
task.start()
for task in thread_list:
task.join()
e = time()
print('{} process, {} seconds'.format(thread_count, int(e-s)))
if __name__ == "__main__":
test(1) # 1 process, 1 seconds
test(2) # 2 processes, 1 seconds
test(3) # 3 processed, 1 seconds
2.1 并行与并发的区别
并发(Concurrency):看起来在同时处理任务,实际可能是使用单个处理器通过快速切换交替执行,给人同时执行的错觉。
并行(Parallelism):在多核CPU或者多设备上真正的同时执行。
2.2 multiprocessing模块
Process类
创建进程的类,由该类实例化得到的对象,表示一个子进程中的任务。
p=Process()
p.start() 启动进程
p.run() 进程启动时运行的方法,用于调用target来指定需要执行的函数;这个方法通常不应该由用户直接调用
p.terminate() 强制终止进程p,但不会做任何的清理操作
p.is_alive() 判断p是否还在运行
p.join([timeout]) 主线程等待p结束,确保主进程在子进程完成之后再继续执行
import multiprocessing
import time
def worker(name):
print(f"进程 {name} 开始工作...")
time.sleep(3)
print(f"进程 {name} 完成工作。")
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=('A',))
print("主进程:准备启动子进程。")
p.start()
print(f"主进程:子进程已启动,状态 is_alive: {p.is_alive()}")
print("主进程:等待子进程结束...")
p.join()
print(f"主进程:子进程已结束,状态 is_alive: {p.is_alive()}")
print("主进程:所有工作完成。")
Lock
多进程的本质是减少并发过程中锁的使用,但有些时候为了避免资源错乱,不得不用锁。
不加锁的后果:
import multiprocessing
def worker_without_lock(shared_value):
for _ in range(100000):
current_value = shared_value.value
shared_value.value = current_value + 1 # 对共享值进行累加
if __name__ == "__main__":
shared_number = multiprocessing.Value('i', 0)
processes = []
for _ in range(10):
p = multiprocessing.Process(target=worker_without_lock, args=(shared_number,))
processes.append(p)
p.start()
for p in processes:
p.join()
# 预期结果:1000000
print(f"实际结果: {shared_number.value}") # 172275,或者任意小于1000000的数字
加锁之后:
import multiprocessing
def worker_without_lock(shared_value):
for _ in range(100000):
# 加锁
with shared_value.get_lock():
current_value = shared_value.value
shared_value.value = current_value + 1 # 对共享值进行累加
if __name__ == "__main__":
shared_number = multiprocessing.Value('i', 0)
processes = []
for _ in range(10):
p = multiprocessing.Process(target=worker_without_lock, args=(shared_number,))
processes.append(p)
p.start()
for p in processes:
p.join()
# 预期结果:1000000
print(f"实际结果: {shared_number.value}") # 1000000
如果一个进程已经持有了某个 Lock
,在再次尝试获取这个 Lock
时,会造成自死锁。

相比于普通的Lock,Rlock允许同一个进程多次获取锁:
RLock
内部维护着一个持有者标识和一个递归计数器
当一个进程第一次获取
RLock
时,RLock
会记录标识,并将计数器设为 1。当同一个进程再次获取这个
RLock
时,它会检查到持有者就是自己,于是不会阻塞,而是将计数器加 1。每次调用
release()
,计数器会减 1。当计数器减到 0 时,锁才会被真正释放,供其它进程获取。
Pool
手动创建进程需要一个一个start并join,略为繁琐,使用进程池可以简化这一过程。
import multiprocessing
import time
def square(x):
result = x * x
time.sleep(1)
return result
if __name__ == "__main__":
numbers = [1, 2, 3, 4, 5, 6, 7, 8]
start_time = time.time()
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(square, numbers)
end_time = time.time()
一个经典的并发过程如下图所示:

4 条件变量与生产者消费者
上面的过程实际就是生产者与消费者的关系。
现在有两个队列,生产者队列负责添加数据,消费者队列负责取出数据。
在这个过程中,有一个固定大小的缓冲区:
当缓冲区已满时,生产者不应再向其中添加数据,否则会覆盖尚未被消费的数据。
当缓冲区为空时,消费者不应尝试从中取出数据,因为无数据可取。
如果有一些特殊需求需要满足:
如果消费者要求必须含有至少3个数据才开始消费,应该怎么办
同理生产者呢?
如果每个消费者只能消费特定的数据,应该怎么办?
多进程中可以通过一个共享变量信号量解决:
import multiprocessing
import time
BUFFER_SIZE = 5
def producer(buffer, empty_slots, filled_slots, mutex):
"""生产者进程"""
for i in range(10):
# P(empty_slots): 等待一个空位。如果empty_slots为0,则阻塞。
empty_slots.acquire()
# P(mutex): 获取互斥锁,保护缓冲区。
mutex.acquire()
# --- 临界区 ---
# 使用共享内存数组作为循环队列
buffer[i % BUFFER_SIZE] = i
print(f"生产了 -> {i}")
# --- 临界区结束 ---
# V(mutex): 释放互斥锁。
mutex.release()
# V(filled_slots): 通知消费者,物品数+1。
filled_slots.release()
time.sleep(0.1)
def consumer(buffer, empty_slots, filled_slots, mutex):
"""消费者进程"""
for i in range(10):
# P(filled_slots): 等待一个物品。如果filled_slots为0,则阻塞。
filled_slots.acquire()
# P(mutex): 获取互斥锁。
mutex.acquire()
# --- 临界区 ---
item = buffer[i % BUFFER_SIZE]
print(f" 消费了 <- {item}")
# --- 临界区结束 ---
# V(mutex): 释放互斥锁。
mutex.release()
# V(empty_slots): 通知生产者,空位数+1。
empty_slots.release()
time.sleep(0.4)
if __name__ == '__main__':
# 创建共享内存
shared_buffer = multiprocessing.Array('i', BUFFER_SIZE)
# 1. 计数信号量,代表空槽位
empty_slots = multiprocessing.Semaphore(BUFFER_SIZE)
# 2. 计数信号量,代表已填充的槽位
filled_slots = multiprocessing.Semaphore(0)
# 3. 二进制信号量 (用Lock实现),用于互斥访问缓冲区
mutex = multiprocessing.Lock()
# 创建并运行进程
p = multiprocessing.Process(target=producer, args=(shared_buffer, empty_slots, filled_slots, mutex))
c = multiprocessing.Process(target=consumer, args=(shared_buffer, empty_slots, filled_slots, mutex))
p.start()
c.start()
p.join()
c.join()
参考
Last updated