Перейти к основному содержимому

Основы asyncio в Python

Разработчику Архитектору
Связанные статьи

Словарь главы

ТерминКратко
asyncioСтандартный модуль Python для асинхронного кода
Event loop (цикл событий)Диспетчер, который по очереди запускает корутины и ждёт готовности I/O
КорутинаФункция с async def; может приостанавливаться на await
TaskКорутина, поставленная в очередь event loop
awaitТочка паузы. Корутина ждёт результат и отдаёт управление циклу
I/O-boundЗадача упирается в ожидание сети, диска или БД, а не в скорость CPU — см. теорию
КонкурентностьНесколько задач продвигаются вперёд в одном промежутке времени
ПараллелизмНесколько задач реально выполняются одновременно на разных ядрах

Однопоточная конкурентность

Модуль asyncio запускает асинхронный код в одном потоке одного процесса. Event loop выбирает готовую корутину, выполняет её до следующего await, затем переключается на другую.

Пока одна корутина ждёт ответ от сервера, чтение файла или таймер (asyncio.sleep), цикл может выполнять остальные. Именно перекрытие ожиданий даёт ускорение на I/O-bound нагрузке.

Параллелизма на нескольких ядрах CPU здесь нет — в каждый момент интерпретатор выполняет байт-код одной корутины. Для вычислений на всех ядрах нужен multiprocessing.

Пример. Сто HTTP-запросов, каждый ждёт ответ ~1 с.

  • последовательный синхронный код — около 100 с;
  • asyncio с конкурентным запуском — около 1 с плюс накладные расходы, потому что все ожидания идут параллельно по времени.

Кооперативная многозадачность

Вытесняющая многозадачность — модель потоков и процессов. Операционная система сама решает, когда прервать задачу. Переключение контекста стоит дорого.

Кооперативная многозадачность — модель asyncio. Задача сама объявляет паузу через await. Event loop переключается только в этих точках.

Преимущества кооперативной модели:

  • меньше накладных расходов, чем у потоков при тысячах соединений;
  • пауза обычно ставится перед ожиданием I/O — удобный момент для переключения;
  • один поток интерпретатора — проще рассуждать о порядке выполнения.

Ограничение: любой долгий синхронный участок без await блокирует весь event loop. Все корутины в процессе "замирают", пока этот код не закончится. Подробнее о блокирующих вызовах — в разделе Типичные ошибки.

Теория кооперативного планирования — Корутины в общем разделе по асинхронности.


Корутина, await и asyncio.run

Корутина объявляется ключевым словом async def. Обычный вызов fetch() не запускает тело функции — он возвращает объект корутины, который нужно передать в event loop.

import asyncio

async def fetch():
await asyncio.sleep(0.1)
return 42

# coro = fetch() объект корутины, выполнение ещё не началось
asyncio.run(fetch())

Разбор:

  • asyncio.run(coro) — стандартная точка входа из обычного синхронного скрипта;
  • внутри создаётся event loop, выполняется корутина, loop корректно закрывается;
  • внутри других корутин вместо run используют await.

await ставится перед объектом, который можно ожидать (awaitable). Пока результат не готов, текущая корутина приостановлена, а loop обслуживает остальные задачи.


Последовательное и конкурентное ожидание

Несколько await подряд выполняются по очереди. Второй запрос стартует только после завершения первого.

import asyncio

async def slow(name: str, seconds: float) -> str:
print(f"{name}: старт")
await asyncio.sleep(seconds)
print(f"{name}: готово")
return name

async def one_after_another():
await slow("A", 1)
await slow("B", 1)
await slow("C", 1)
# суммарно около 3 с

async def at_the_same_time():
t1 = asyncio.create_task(slow("A", 1))
t2 = asyncio.create_task(slow("B", 1))
t3 = asyncio.create_task(slow("C", 1))
await asyncio.gather(t1, t2, t3)
# суммарно около 1 с — все sleep идут одновременно

Разбор create_task:

  • корутина сразу попадает в очередь event loop;
  • функция at_the_same_time продолжает работу, не дожидаясь завершения slow;
  • gather в конце собирает все результаты.
ЗаписьЧто происходит
await coro()Текущая корутина ждёт. Следующая строка кода выполнится только после результата
task = create_task(coro())Задача запланирована, выполнение текущей корутины идёт дальше
await taskОжидание конкретной задачи и получение её результата

Задачу из create_task нужно где-то дождаться — через await, gather или wait. Если asyncio.run завершится раньше, незавершённая задача будет отменена.


asyncio.gather и сбор результатов

asyncio.gather принимает несколько корутин или задач, запускает их конкурентно и возвращает список результатов в порядке аргументов.

import asyncio

async def fetch_id(user_id: int) -> dict:
await asyncio.sleep(0.2)
return {"id": user_id}

async def main():
results = await asyncio.gather(
fetch_id(1),
fetch_id(2),
fetch_id(3),
)
print(results)

asyncio.run(main())

Полезные параметры:

  • return_exceptions=True — исключение в одной задаче попадает в список как значение, остальные задачи не прерываются;
  • можно передавать уже созданные Task, если нужен контроль до вызова gather.

Справочник по API — stdlib, раздел asyncio.


as_completed и wait

Когда важен порядок готовности, а не порядок аргументов, используют asyncio.as_completed.

import asyncio

async def job(n: int) -> int:
await asyncio.sleep(n * 0.1)
return n

async def main():
tasks = [asyncio.create_task(job(i)) for i in (3, 1, 2)]
for finished in asyncio.as_completed(tasks):
print(await finished) # 1, затем 2, затем 3

asyncio.run(main())

asyncio.wait возвращает пару (done, pending) и параметр return_when:

  • asyncio.FIRST_COMPLETED — продолжить, как только завершилась хотя бы одна задача;
  • asyncio.FIRST_EXCEPTION — остановиться при первой ошибке;
  • asyncio.ALL_COMPLETED — дождаться всех (поведение по умолчанию).

Task, Future и Awaitable

Типы asyncio связаны иерархией. Для чтения traceback и документации библиотек полезно их различать.

  • Awaitable — любой объект после await. Реализует протокол __await__.
  • Coroutine — результат вызова async def. Сама по себе в loop не планируется, пока её не обернут в Task или не передадут в gather / create_task.
  • Task — корутина, поставленная в event loop. Хранит состояние, поддерживает отмену через task.cancel().
  • Future — "обещание" результата в будущем. Низкоуровневый примитив; Task наследует Future. В прикладном коде Future создают редко.
import asyncio

async def main():
task = asyncio.create_task(asyncio.sleep(0))
print(isinstance(task, asyncio.Task)) # True
print(isinstance(task, asyncio.Future)) # True
await task

asyncio.run(main())

Таймауты и отмена

Сетевой запрос без лимита времени может зависнуть навсегда. Корутина удерживает память и слот в пуле соединений.

asyncio.wait_for(aw, timeout=...) оборачивает awaitable и отменяет его при превышении лимита.

import asyncio

async def slow():
await asyncio.sleep(10)

async def main():
task = asyncio.create_task(slow())
try:
await asyncio.wait_for(task, timeout=1.0)
except asyncio.TimeoutError:
print("Превышен лимит времени, задача отменена:", task.cancelled())

asyncio.run(main())

Ручная отмена через task.cancel():

task = asyncio.create_task(slow())
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Задача остановлена")

cancel() не прерывает синхронный Python-код мгновенно. Исключение CancelledError возникает на следующем await внутри задачи.

asyncio.shield(aw) защищает внутреннюю операцию от отмены обёртки. Применяют, когда тайм-аут уже сработал, но транзакцию в БД всё равно нужно дописать до конца.

Обработка ошибок в async-коде — Исключения в Python.


Типичные ошибки

Тяжёлые вычисления внутри корутины

Цикл for на миллион итераций, парсинг огромного JSON, хеширование — всё это идёт без пауз. Event loop не переключится на другие задачи.

Что делать:

  • вынести CPU-работу в multiprocessing или ProcessPoolExecutor;
  • оставить в asyncio только координацию и I/O.

Блокирующий ввод-вывод

Синхронные вызовы останавливают единственный поток, в котором крутится loop:

  • time.sleep
  • requests.get
  • синхронный cursor.execute в SQLAlchemy

Варианты решения:

  • асинхронная библиотека (httpx.AsyncClient, asyncpg, aiofiles) — см. сетевой справочник;
  • вынос в поток через asyncio.to_thread (Python 3.9+):
import asyncio

def blocking_read(path: str) -> str:
with open(path, encoding="utf-8") as f:
return f.read()

async def main():
text = await asyncio.to_thread(blocking_read, "data.txt")
print(len(text))

asyncio.run(main())
  • для тяжёлой CPU-функции — loop.run_in_executor с ProcessPoolExecutor.

Работа с файлами в целом — Файлы, сеть и внешние API.

Задача без ожидания результата

async def main():
asyncio.create_task(background_worker())
await asyncio.sleep(0)

Паттерн "запустил и забыл" допустим для фоновых воркеров, но исключение внутри background_worker может остаться незамеченным. Надёжнее:

  • хранить ссылки на Task в списке;
  • обрабатывать ошибки через task.add_done_callback;
  • или собирать задачи через gather(..., return_exceptions=True).

Синхронизация в одном потоке

Даже при одном потоке между двумя await другая корутина может изменить общие данные. Если между чтением и записью есть await, нужна защита.

import asyncio

counter = 0
lock = asyncio.Lock()

async def increment():
global counter
async with lock:
value = counter
await asyncio.sleep(0)
counter = value + 1

async def main():
await asyncio.gather(*(increment() for _ in range(100)))
print(counter) # 100

asyncio.run(main())

Примитивы asyncio:

ПримитивПрименение
asyncio.LockТолько одна корутина в критической секции
asyncio.Semaphore(n)Не больше n операций одновременно — лимит HTTP-запросов
asyncio.EventОдна задача сигнализирует, другая ждёт
asyncio.ConditionОжидание условия с уведомлением
asyncio.QueueОчередь между производителем и потребителем

Теория синхронизации — Синхронизация потоков. Пример Semaphore в обзоре — Асинхронность и многопоточность.


Как устроен event loop

Под asyncio лежат неблокирующие сокеты и механизмы ОС (epoll в Linux, kqueue в macOS, IOCP в Windows). Модуль selectors даёт к ним переносимый доступ — подробнее в Сетевое программирование.

Упрощённый алгоритм цикла:

ready = [стартовая корутина]
пока есть готовые задачи, таймеры или ожидающий I/O:
выполнить корутины из ready до следующего await
зарегистрировать сокеты приостановленных корутин в селекторе
дождаться события сокета или таймера
вернуть завершившие ожидание корутины в ready

Корутина вызывает await sock.recv() — loop регистрирует сокет и переключается на другую задачу. Когда данные пришли, корутина снова в очереди ready. Тот же принцип у asyncio.sleep и сетевых библиотек.

Ускоренная реализация цикла — пакет uvloop (libuv, тот же низкий уровень, что у Node.js). Подключение — в обзоре моделей.


Параллельные HTTP-запросы

Типовой шаблон для клиентского кода:

import asyncio
import httpx

URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]

async def fetch(client: httpx.AsyncClient, url: str) -> int:
response = await client.get(url, timeout=5.0)
response.raise_for_status()
return response.status_code

async def main():
sem = asyncio.Semaphore(10)

async def bounded(url: str) -> int:
async with sem:
return await fetch(client, url)

async with httpx.AsyncClient() as client:
codes = await asyncio.gather(*(bounded(u) for u in URLS))
print(codes)

asyncio.run(main())

Разбор:

  • httpx.AsyncClient переиспользует TCP-соединения;
  • Semaphore(10) не даёт открыть слишком много одновременных запросов;
  • timeout на каждый вызов ограничивает зависание.

Серверный asyncio-стек:


Дальнейшее чтение