python 多进程+协程 实现并发(Python multi process + CO process to achieve concurrency)-python
python 多进程+协程 实现并发(Python multi process + CO process to achieve concurrency)
使用进程池创(pool)建进程,用队列(queue)进行进程间通信。在子进程里边用协程去处理。
直接上代码:
from datetime import datetime
from multiprocessing import Pool, Manager
import asyncio
from random import randint
import math
import os
'''
需求:
有一个列表,列表中的元素求二次幂,并将值返回
方案:
用3个子进程,并在每个子进程中用协程去处理
'''
async def power(num):
# 幂运算 用延时来模拟阻塞
# print(f'pid: {os.getpid}')
await asyncio.sleep(randint(1, 5))
return num*num
def create_task(data:list, queue):
# print(f'pid: {os.getpid}')
# 创建event_loop
loop = asyncio.new_event_loop()
# 创建task 将每一个元素创建一个task去执行
tasks = [loop.create_task(power(el)) for el in data]
# 执行
loop.run_until_complete(asyncio.wait(tasks))
# 获取task结果
for task in tasks:
# 将结果写到队列中
# print(f'---: {task.result()}')
queue.put(task.result())
def handle(data: list):
process_num = 3
# 初始化进程池
pool = Pool(processes=process_num)
# 创建队列 使用进程池的时候,创建队列用Manager才能使用
queue = Manager().Queue()
start = 0
num = math.ceil(len(data) / process_num) # 数据长度 / 子进程数量 向上取整,将数据分成份,分给给每个进程
for i in range(1, num + 1):
end = i*num
# 创建子进程,并传入数据和队列
pool.apply_async(create_task, args=(data[start:end], queue))
start = end
# 等所有进程执行完之后关闭进程
pool.close()
pool.join()
# 获取执行结果
res = list()
while not queue.empty():
# 从队列中获取数据
res.append(queue.get())
return res
if __name__ == '__main__':
bg = datetime.now()
data = list(range(10))
res = handle(data)
print('res', res)
print('spend time: ', datetime.now()-bg)
在django中使用多进程时,如果出现
django.core.exceptions.AppRegistryNotReady: Apps aren’t loaded yet.
解决方案:
在使用到多进程的文件的最上边添加以下代码:
import django
django.setup()
————————
< strong > use the process pool to create processes, and use the queue to communicate between processes. In the subprocess, it is handled with a coroutine strong>
Direct code:
from datetime import datetime
from multiprocessing import Pool, Manager
import asyncio
from random import randint
import math
import os
'''
需求:
有一个列表,列表中的元素求二次幂,并将值返回
方案:
用3个子进程,并在每个子进程中用协程去处理
'''
async def power(num):
# 幂运算 用延时来模拟阻塞
# print(f'pid: {os.getpid}')
await asyncio.sleep(randint(1, 5))
return num*num
def create_task(data:list, queue):
# print(f'pid: {os.getpid}')
# 创建event_loop
loop = asyncio.new_event_loop()
# 创建task 将每一个元素创建一个task去执行
tasks = [loop.create_task(power(el)) for el in data]
# 执行
loop.run_until_complete(asyncio.wait(tasks))
# 获取task结果
for task in tasks:
# 将结果写到队列中
# print(f'---: {task.result()}')
queue.put(task.result())
def handle(data: list):
process_num = 3
# 初始化进程池
pool = Pool(processes=process_num)
# 创建队列 使用进程池的时候,创建队列用Manager才能使用
queue = Manager().Queue()
start = 0
num = math.ceil(len(data) / process_num) # 数据长度 / 子进程数量 向上取整,将数据分成份,分给给每个进程
for i in range(1, num + 1):
end = i*num
# 创建子进程,并传入数据和队列
pool.apply_async(create_task, args=(data[start:end], queue))
start = end
# 等所有进程执行完之后关闭进程
pool.close()
pool.join()
# 获取执行结果
res = list()
while not queue.empty():
# 从队列中获取数据
res.append(queue.get())
return res
if __name__ == '__main__':
bg = datetime.now()
data = list(range(10))
res = handle(data)
print('res', res)
print('spend time: ', datetime.now()-bg)
< strong > when using multiple processes in Django, if < / strong >
django.core.exceptions.AppRegistryNotReady: Apps aren’t loaded yet.
Solution:
Add the following code at the top of the file used to multiple processes:
import django
django.setup()