python 多进程+协程 实现并发(Python multi process + CO process to achieve concurrency)

使用进程池创(pool)建进程,用队列(queue)进行进程间通信。在子进程里边用协程去处理。

直接上代码:

from datetime import datetime
from multiprocessing import Pool, Manager
import asyncio
from random import randint
import math
import os

'''
  需求:
    有一个列表,列表中的元素求二次幂,并将值返回
  方案:
    用3个子进程,并在每个子进程中用协程去处理
'''
async def power(num):
  # 幂运算 用延时来模拟阻塞
  # print(f'pid: {os.getpid}')
  await asyncio.sleep(randint(1, 5))
  return num*num

def create_task(data:list, queue):
  # print(f'pid: {os.getpid}')
  # 创建event_loop
  loop = asyncio.new_event_loop()
  # 创建task 将每一个元素创建一个task去执行
  tasks = [loop.create_task(power(el)) for el in data]
  # 执行
  loop.run_until_complete(asyncio.wait(tasks))

  # 获取task结果
  for task in tasks:
    # 将结果写到队列中
    # print(f'---: {task.result()}')
    queue.put(task.result())

  

def handle(data: list):
  process_num = 3
  # 初始化进程池
  pool = Pool(processes=process_num)
  # 创建队列 使用进程池的时候,创建队列用Manager才能使用
  queue = Manager().Queue()  

  start = 0
  num = math.ceil(len(data) / process_num)  # 数据长度 / 子进程数量 向上取整,将数据分成份,分给给每个进程

  for i in range(1, num + 1):
    end = i*num
    # 创建子进程,并传入数据和队列
    pool.apply_async(create_task, args=(data[start:end], queue))
    start = end
  
  # 等所有进程执行完之后关闭进程
  pool.close()
  pool.join()

  # 获取执行结果
  res = list()
  while not queue.empty():
    # 从队列中获取数据
    res.append(queue.get())
  
  return res

if __name__ == '__main__':
  bg = datetime.now()
  data = list(range(10))
  res = handle(data)
  print('res', res)
  print('spend time: ', datetime.now()-bg)

在django中使用多进程时,如果出现 

django.core.exceptions.AppRegistryNotReady: Apps aren’t loaded yet.

解决方案:

  在使用到多进程的文件的最上边添加以下代码:

import django
django.setup()
————————

< strong > use the process pool to create processes, and use the queue to communicate between processes. In the subprocess, it is handled with a coroutine

Direct code:

from datetime import datetime
from multiprocessing import Pool, Manager
import asyncio
from random import randint
import math
import os

'''
  需求:
    有一个列表,列表中的元素求二次幂,并将值返回
  方案:
    用3个子进程,并在每个子进程中用协程去处理
'''
async def power(num):
  # 幂运算 用延时来模拟阻塞
  # print(f'pid: {os.getpid}')
  await asyncio.sleep(randint(1, 5))
  return num*num

def create_task(data:list, queue):
  # print(f'pid: {os.getpid}')
  # 创建event_loop
  loop = asyncio.new_event_loop()
  # 创建task 将每一个元素创建一个task去执行
  tasks = [loop.create_task(power(el)) for el in data]
  # 执行
  loop.run_until_complete(asyncio.wait(tasks))

  # 获取task结果
  for task in tasks:
    # 将结果写到队列中
    # print(f'---: {task.result()}')
    queue.put(task.result())

  

def handle(data: list):
  process_num = 3
  # 初始化进程池
  pool = Pool(processes=process_num)
  # 创建队列 使用进程池的时候,创建队列用Manager才能使用
  queue = Manager().Queue()  

  start = 0
  num = math.ceil(len(data) / process_num)  # 数据长度 / 子进程数量 向上取整,将数据分成份,分给给每个进程

  for i in range(1, num + 1):
    end = i*num
    # 创建子进程,并传入数据和队列
    pool.apply_async(create_task, args=(data[start:end], queue))
    start = end
  
  # 等所有进程执行完之后关闭进程
  pool.close()
  pool.join()

  # 获取执行结果
  res = list()
  while not queue.empty():
    # 从队列中获取数据
    res.append(queue.get())
  
  return res

if __name__ == '__main__':
  bg = datetime.now()
  data = list(range(10))
  res = handle(data)
  print('res', res)
  print('spend time: ', datetime.now()-bg)

< strong > when using multiple processes in Django, if < / strong >

django.core.exceptions.AppRegistryNotReady: Apps aren’t loaded yet.

Solution:

Add the following code at the top of the file used to multiple processes:

import django
django.setup()