Как использовать asyncio и multiprocessing в Python
Модуль multiprocessing module позволяет вам писать компактные приложения использующие множество процессов.
Но иногда у вас есть старый код который запускал один процесс и вы хотите чтобы теперь он запускал несколько одновременно.
Вы можете полностью переписать его используя модуль multiprocessing.
Но тогда вам придется заново отлаживать это по сути новое приложение.
Я предлагаю иной подход - чуть модифицировать ваш старый код, добавив в него asyncio. В итоге ваше приложение по сути останется тем же, вам не потребуется долго отлаживать его заново.
import asyncio
import subprocess
import random
semaphore = asyncio.Queue(maxsize=3-1) # Max 3 processes
async def worker(id):
"""
We could use more straightforward consumer-producer pattern:
* producer puts tasks into the queue
* worker waits for tasks in the queue
But for this tiny code sniped that would produce too much boilerplates.
"""
delay = random.random()
print('>'*5, f'task {id} starts with delay {delay:.1} seconds')
process = await asyncio.create_subprocess_exec(
'sleep', str(delay),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
(output, err) = await process.communicate()
status = await process.wait()
print('<'*5, f'task {id} finished with status {status}')
print(f'Stdout: {output}, Stderr: {err}')
await semaphore.get()
async def main(loop):
for task_id in range(6):
await semaphore.put(task_id) # It does'n matter what we put in the queue. We use it as semaphore.
loop.create_task(worker(task_id))
# all the tasks are scheduled at the moment but not all done
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks())) # Wait for all tasks in the loop.
>>>>> task 0 starts with delay 0.9 seconds
>>>>> task 1 starts with delay 0.4 seconds
>>>>> task 2 starts with delay 0.6 seconds
<<<<< task 1 finished with status 0
Stdout: b'', Stderr: b''
>>>>> task 3 starts with delay 0.3 seconds
<<<<< task 3 finished with status 0
Stdout: b'', Stderr: b''
<<<<< task 2 finished with status 0
Stdout: b'', Stderr: b''
>>>>> task 4 starts with delay 0.9 seconds
>>>>> task 5 starts with delay 0.3 seconds
<<<<< task 0 finished with status 0
Stdout: b'', Stderr: b''
<<<<< task 5 finished with status 0
Stdout: b'', Stderr: b''
<<<<< task 4 finished with status 0
Stdout: b'', Stderr: b''
Process finished with exit code 0
Этот код запускает 6 задач в максимум трех одновременно работающих экземплярах внешнего процесса. Я ограничиваю это с помощью asyncio.Queue.
Когда процесс завершается вы можете обработать его результаты. Следующий процесс запустится автоматически так что в любой момент времени будет работать одновременно максимум процесса.
Как видите, код очень прост и содержит практически только прикладную логику.
Он выглядит как синхронный. То есть как предположительный старый код, который запускал только один процесс и который мы теперь конвертировали в асинхронный.