15%

Сэкономьте 15% на всех хостинговых услугах

Проверьте свои навыки и получите скидку на любой тарифный план

Используйте код:

Skills
Начать
09.10.2024

Python Multiprocessing: Полное техническое руководство по параллельному выполнению

Модуль multiprocessing в Python обеспечивает истинное параллельное выполнение путём создания независимых процессов на уровне ОС, каждый из которых имеет собственное пространство памяти и интерпретатор Python — полностью обходя Global Interpreter Lock (GIL). В отличие от потоков, которые совместно используют единое состояние интерпретатора и сериализуются GIL, отдельные процессы выполняются параллельно на всех доступных ядрах CPU, что делает multiprocessing правильным инструментом для задач, ограниченных CPU, таких как численные вычисления, обработка изображений и инференс в машинном обучении.

Это руководство охватывает всё: от базовой архитектуры модели процессов Python до продвинутых паттернов, включая разделяемую память, пулы процессов, межпроцессное взаимодействие и производственные подводные камни, которые большинство руководств полностью упускают.

Почему GIL делает многопоточность недостаточной для задач, ограниченных CPU

Global Interpreter Lock — это мьютекс, защищающий внутренние счётчики ссылок объектов CPython. Только один поток может удерживать GIL и выполнять байткод Python в любой момент времени. Для задач, ограниченных I/O — сетевых запросов, запросов к базам данных, чтения файлов — потоки остаются полезными, поскольку GIL освобождается во время блокирующих системных вызовов I/O. Однако для чистых вычислений потоки непрерывно конкурируют за GIL, не обеспечивая реального параллелизма даже на 64-ядерной машине.

Multiprocessing полностью обходит эту проблему. Каждый порождённый процесс является полноценным независимым процессом ОС со своим собственным интерпретатором CPython, кучей и GIL. Планировщик операционной системы распределяет эти процессы по физическим ядрам, обеспечивая подлинный параллелизм.

Влияние GIL: конкретный пример

Рассмотрим функцию, выполняющую 10 миллионов целочисленных сложений. Запуск её в двух потоках на двухъядерной машине займёт примерно столько же реального времени, что и запуск в одном потоке — иногда дольше из-за накладных расходов на конкуренцию за GIL. Запуск в двух отдельных процессах сократит реальное время вдвое.

Multiprocessing vs. Multithreading vs. Asyncio

Понимание того, когда использовать каждую модель параллелизма, так же важно, как знание того, как их использовать.

Характеристика`multiprocessing``threading``asyncio`
Тип параллелизмаИстинный (процессы ОС)Псевдо (ограничен GIL)Кооперативный (однопоточный)
Обход GILДаНетНет
Модель памятиОтдельная для каждого процессаОбщаяОбщая
Лучший вариант использованияЗадачи, ограниченные CPUI/O-bound + устаревшие библиотекиI/O-bound, высокий параллелизм
Накладные расходы на коммуникациюВысокие (требуется IPC)Низкие (общая память)Низкие (корутины)
Изоляция сбоевВысокая (изоляция аварий)Низкая (аварийное завершение одного потока может убить все)Низкая
Накладные расходы на запускВысокиеНизкиеОчень низкие
Типичное потребление памятиВысокоеНизкоеОчень низкое

Практическое правило: Используйте `multiprocessing` для задач, ограниченных CPU, `threading` или `asyncio` для задач, ограниченных I/O. Если вам нужно и то, и другое, `concurrent.futures` предоставляет унифицированный интерфейс для обеих моделей.

Базовая архитектура: как Python порождает процессы

Python поддерживает три метода запуска для создания дочерних процессов, и выбор имеет существенные последствия:

  • `fork` (по умолчанию на Linux/macOS): Копирует память родительского процесса с использованием copy-on-write. Быстрый, но может вызывать проблемы с многопоточными родительскими процессами или C-расширениями, удерживающими блокировки.
  • `spawn` (по умолчанию на Windows, доступен на всех платформах): Запускает новый интерпретатор Python и импортирует модуль. Медленнее, но безопаснее. Требует, чтобы весь код был импортируемым, поэтому защита `if __name__ == "__main__":` является обязательной.
  • `forkserver`: Выделенный серверный процесс выполняет fork по требованию. Позволяет избежать проблем с безопасностью fork, будучи более эффективным, чем чистый spawn для многих короткоживущих процессов.

Явно задайте метод запуска в начале вашей точки входа:

“`python

import multiprocessing

if __name__ == "__main__":

multiprocessing.set_start_method("spawn")

“`

Непонимание методов запуска является одним из наиболее распространённых источников тонких, платформозависимых ошибок в производственном коде с multiprocessing.

Импорт модуля

“`python

import multiprocessing

from multiprocessing import Process, Pool, Queue, Lock, Pipe, Value, Array

“`

Ключевые примитивы и их роли

ПримитивНазначение
`Process`Порождает единственный независимый процесс
`Pool`Управляет пулом переиспользуемых рабочих процессов
`Queue`Потокобезопасная и процессобезопасная очередь FIFO для IPC
`Pipe`Быстрое двухточечное соединение между двумя процессами
`Lock` / `RLock`Взаимное исключение для общих ресурсов
`Value` / `Array`Разделяемая память для простых типов
`Manager`Прокси-объекты для сложного общего состояния
`Event` / `Semaphore`Примитивы синхронизации

Пример 1: Порождение одного процесса

Класс `Process` является фундаментальным строительным блоком. Он напрямую соответствует процессу ОС.

“`python

from multiprocessing import Process

def compute_square(n):

result = n ** 2

print(f"Square of {n} is {result}")

if __name__ == "__main__":

process = Process(target=compute_square, args=(7,))

process.start()

process.join()

print(f"Process exit code: {process.exitcode}")

“`

Ключевые атрибуты и методы:

  • `target`: Вызываемый объект для выполнения в дочернем процессе.
  • `args` / `kwargs`: Аргументы, передаваемые целевой функции.
  • `start()`: Выполняет fork или spawn дочернего процесса.
  • `join(timeout=None)`: Блокирует вызывающий процесс до завершения процесса. Всегда вызывайте `join()`, чтобы предотвратить появление зомби-процессов.
  • `exitcode`: `0` при чистом завершении, отрицательное значение при завершении по сигналу, положительное значение если процесс вызвал необработанное исключение.
  • `is_alive()`: Возвращает `True`, если процесс ещё выполняется.
  • `terminate()` / `kill()`: Отправляет `SIGTERM` / `SIGKILL` соответственно. Используйте с осторожностью — ресурсы могут быть не освобождены.

Критический подводный камень: Если вы порождаете процесс без вызова `join()`, дочерний процесс становится зомби-процессом в Unix-системах, занимая запись в таблице процессов до завершения родителя.

Пример 2: Пулы процессов с `multiprocessing.Pool`

Для рабочих нагрузок, применяющих одну и ту же функцию ко многим элементам данных, `Pool` значительно эффективнее, чем ручное управление отдельными экземплярами `Process`. Он поддерживает фиксированное количество рабочих процессов и распределяет работу между ними.

“`python

from multiprocessing import Pool

import os

def process_chunk(data_chunk):

worker_pid = os.getpid()

result = sum(x ** 2 for x in data_chunk)

return result, worker_pid

if __name__ == "__main__":

dataset = [range(i, i + 1000) for i in range(0, 10000, 1000)]

with Pool(processes=4) as pool:

results = pool.map(process_chunk, dataset)

for result, pid in results:

print(f"Worker PID {pid} computed sum: {result}")

“`

Сравнение методов Pool

МетодБлокирующийВозвращаетЛучший вариант использования
`pool.map(f, iterable)`ДаСписок результатовПростой параллельный map
`pool.imap(f, iterable)`ЛенивыйИтераторБольшие итерируемые объекты, эффективность памяти
`pool.imap_unordered(f, iterable)`ЛенивыйИтератор (неупорядоченный)Когда порядок не важен
`pool.starmap(f, iterable)`ДаСписок результатовФункции с несколькими аргументами
`pool.apply_async(f, args)`Нет`AsyncResult`Запуск без ожидания результата или с обратными вызовами
`pool.map_async(f, iterable)`Нет`AsyncResult`Неблокирующая пакетная отправка задач

Подводный камень — выбор размера пула: Установка `processes` выше, чем `os.cpu_count()`, редко улучшает пропускную способность для задач, ограниченных CPU, и увеличивает накладные расходы на переключение контекста. Распространённая эвристика — `processes = os.cpu_count() – 1`, чтобы оставить одно ядро для ОС и главного процесса.

Подводный камень — сериализация: Все аргументы и возвращаемые значения, передаваемые между главным процессом и рабочими, сериализуются с использованием `pickle`. Объекты, которые не могут быть сериализованы (лямбда-функции, вложенные функции, определённые внутри других функций, файловые дескрипторы, соединения с базами данных), вызовут `PicklingError`. Используйте `pool.starmap` с функциями уровня модуля или реструктурируйте код, чтобы избежать передачи несериализуемых объектов.

Пример 3: Межпроцессное взаимодействие с Queue

`multiprocessing.Queue` — это процессобезопасная очередь FIFO, построенная поверх канала и блокировки. Это стандартный механизм для паттерна производитель-потребитель.

“`python

from multiprocessing import Process, Queue

import time

def producer(queue, items):

for item in items:

queue.put(item)

print(f"[Producer] Enqueued: {item}")

time.sleep(0.01)

queue.put(None) # Sentinel value to signal completion

def consumer(queue):

while True:

item = queue.get()

if item is None:

print("[Consumer] Received sentinel, shutting down.")

break

print(f"[Consumer] Processing: {item}")

if __name__ == "__main__":

q = Queue(maxsize=10) # Bounded queue prevents unbounded memory growth

data = list(range(20))

p = Process(target=producer, args=(q, data))

c = Process(target=consumer, args=(q,))

p.start()

c.start()

p.join()

c.join()

“`

Критическое замечание по дизайну: Никогда не используйте `queue.empty()` для определения момента остановки потребления. Проверка `empty()` ненадёжна в контексте multiprocessing — между проверкой и последующим `get()` существует состояние гонки. Всегда используйте сигнальное значение (например, `None` или специальный объект `STOP`) для сигнализации о завершении производства.

Пример 4: Разделяемая память с Value и Array

Когда процессам необходимо совместно использовать простое числовое состояние без накладных расходов `Queue`, `multiprocessing.Value` и `multiprocessing.Array` предоставляют прямую разделяемую память, поддерживаемую `ctypes`.

“`python

from multiprocessing import Process, Value, Array, Lock

import ctypes

def increment_counter(counter, lock, iterations):

for _ in range(iterations):

with lock:

counter.value += 1

if __name__ == "__main__":

counter = Value(ctypes.c_int, 0)

lock = Lock()

processes = [

Process(target=increment_counter, args=(counter, lock, 1000))

for _ in range(4)

]

for p in processes:

p.start()

for p in processes:

p.join()

print(f"Final counter value: {counter.value}") # Expected: 4000

“`

Без блокировки итоговое значение было бы непредсказуемо меньше 4000 из-за состояний гонки в цикле чтение-изменение-запись. Всегда защищайте общее изменяемое состояние с помощью `Lock`.

Для сложных общих структур данных (списков, словарей, пользовательских объектов) используйте `multiprocessing.Manager`, который создаёт серверный процесс, управляющий объектами и предоставляющий прокси-доступ. Компромисс — более высокая задержка на каждый доступ по сравнению с прямой разделяемой памятью.

Пример 5: Pipe для прямой коммуникации между двумя процессами

`multiprocessing.Pipe` создаёт пару объектов соединения. Он быстрее, чем `Queue`, для точечной коммуникации между ровно двумя процессами, поскольку имеет меньше накладных расходов.

“`python

from multiprocessing import Process, Pipe

def worker(conn):

data = conn.recv()

result = [x ** 3 for x in data]

conn.send(result)

conn.close()

if __name__ == "__main__":

parent_conn, child_conn = Pipe()

p = Process(target=worker, args=(child_conn,))

p.start()

parent_conn.send([1, 2, 3, 4, 5])

result = parent_conn.recv()

p.join()

print(f"Cubed values: {result}")

“`

Используйте `Queue`, когда задействованы несколько производителей или потребителей. Используйте `Pipe`, когда ровно два процесса обмениваются данными напрямую.

Пример 6: Использование `concurrent.futures.ProcessPoolExecutor`

Для современного кода Python (3.2+) `concurrent.futures.ProcessPoolExecutor` предоставляет более высокоуровневый и чистый API поверх `multiprocessing.Pool` и естественно интегрируется с объектами `Future`.

“`python

from concurrent.futures import ProcessPoolExecutor, as_completed

def heavy_computation(n):

return sum(i * i for i in range(n))

if __name__ == "__main__":

inputs = [106, 2 * 106, 3 * 106, 4 * 106]

with ProcessPoolExecutor(max_workers=4) as executor:

futures = {executor.submit(heavy_computation, n): n for n in inputs}

for future in as_completed(futures):

n = futures[future]

try:

result = future.result()

print(f"Input {n}: result = {result}")

except Exception as e:

print(f"Input {n} raised an exception: {e}")

“`

`as_completed()` возвращает futures по мере их завершения, а не в порядке отправки, что полезно, когда продолжительность задач существенно варьируется.

Производственные подводные камни и продвинутые соображения

Демон-процессы

Установка `process.daemon = True` перед вызовом `start()` делает дочерний процесс демоном. Демон-процессы автоматически завершаются при выходе родительского процесса, предотвращая появление осиротевших фоновых рабочих процессов. Однако демон-процессы сами не могут порождать дочерние процессы.

Обработка исключений в рабочих процессах

Исключения, возникающие внутри рабочих функций, не распространяются в родительский процесс автоматически при использовании `Pool.map()` — они повторно вызываются при вызове `result()` на возвращённом значении или при возврате `map()`. При использовании `apply_async` необходимо явно вызывать `.get()` на `AsyncResult` для обнаружения исключений.

“`python

from multiprocessing import Pool

def risky_function(x):

if x == 3:

raise ValueError(f"Cannot process value {x}")

return x * 10

if __name__ == "__main__":

with Pool(2) as pool:

try:

results = pool.map(risky_function, [1, 2, 3, 4])

except ValueError as e:

print(f"Caught worker exception: {e}")

“`

Потребление памяти

Каждый порождённый процесс дублирует объём памяти родителя (при `fork`) или повторно импортирует все модули (при `spawn`). Для родительского процесса, потребляющего 2 GB RAM, порождение 8 рабочих процессов в системе на основе `fork` может казаться потребляющим 16 GB до вступления в силу copy-on-write. Тщательно профилируйте потребление памяти перед масштабированием количества рабочих процессов.

Избегание глобального состояния

Глобальные переменные в родительском процессе не разделяются с дочерними процессами после `spawn`. Изменения глобальных переменных в дочернем процессе невидимы для родителя и других дочерних процессов. Если вы полагаетесь на глобальную конфигурацию, передавайте её явно в качестве аргументов или используйте `Manager`.

Разбиение на чанки для эффективности Pool

`pool.map()` принимает параметр `chunksize`. Для больших итерируемых объектов установка подходящего размера чанка снижает накладные расходы IPC путём пакетирования нескольких элементов за один цикл сериализации/десериализации:

“`python

results = pool.map(process_item, large_list, chunksize=500)

“`

Выбор правильного оборудования для рабочих нагрузок с multiprocessing

Потолок производительности любого приложения с multiprocessing в конечном счёте определяется количеством доступных физических ядер CPU. Пул процессов с 32 рабочими на 4-ядерной машине не превзойдёт пул из 4 рабочих — он будет медленнее из-за накладных расходов на переключение контекста.

Для производственного развёртывания CPU-интенсивных Python-приложений — конвейеров обработки данных, научных вычислений, пакетного инференса ML — вам нужны выделенные вычислительные ресурсы. Выделенные серверы с процессорами с большим количеством ядер устраняют конкуренцию за ресурсы, присущую общим средам, предоставляя каждому рабочему процессу беспрепятственный доступ к физическому ядру.

Для разработки, тестирования или умеренных рабочих нагрузок правильно подобранный экземпляр VPS Хостинга предоставляет экономически эффективную среду, где вы можете настраивать количество рабочих процессов относительно доступных vCPU. Если вам нужна панель управления для управления средой Python-приложения, VPS с cPanel упрощает развёртывание и мониторинг процессов.

Для рабочих нагрузок с ускорением GPU, где Python multiprocessing сочетается с CUDA-библиотеками, такими как PyTorch или CuPy, GPU Хостинг предоставляет необходимое оборудование для параллельного выполнения CPU-предобработки наряду с конвейерами GPU-вычислений.

При развёртывании приложений, предоставляющих API на основе multiprocessing через HTTPS, сопряжение вашего сервера с правильно настроенным SSL-сертификатом является обязательным базовым требованием для производственной безопасности.

Практическая матрица принятия решений

Используйте следующий контрольный список для определения правильного подхода к вашей рабочей нагрузке:

Используйте `multiprocessing.Process` напрямую, когда:

  • У вас небольшое фиксированное количество разнородных задач
  • Каждая задача имеет отдельный жизненный цикл и требует индивидуального мониторинга
  • Вам нужен детальный контроль над атрибутами процесса (демон, имя, привязка к ядру)

Используйте `multiprocessing.Pool` или `ProcessPoolExecutor`, когда:

  • Вы применяете одну и ту же функцию ко многим элементам данных
  • Вам нужно автоматическое управление жизненным циклом рабочих процессов
  • Вам нужен сбор результатов с минимальным шаблонным кодом

Используйте `multiprocessing.Queue`, когда:

  • У вас архитектура производитель-потребитель
  • Задействованы несколько производителей или потребителей
  • Вам нужен контроль обратного давления через `maxsize`

Используйте `multiprocessing.Pipe`, когда:

  • Ровно два процесса общаются напрямую
  • Задержка на сообщение важнее гибкости

Используйте `multiprocessing.Value` / `Array`, когда:

  • Вы разделяете простое числовое состояние между многими рабочими процессами
  • Частота доступа высока, а накладные расходы прокси Manager неприемлемы

Используйте `multiprocessing.Manager`, когда:

  • Вам нужно разделять сложные Python-объекты (списки, словари)
  • Согласованность важнее скорости доступа

Полностью избегайте multiprocessing, когда:

  • Узкое место — I/O (сеть, диск) — используйте `asyncio` или `threading`
  • Задачи очень короткоживущие (< 1 мс) — накладные расходы на порождение процессов будут доминировать
  • Ваша кодовая база в значительной мере опирается на несериализуемые объекты

FAQ

В: Почему я должен использовать `if __name__ == "__main__":` в скриптах Python с multiprocessing?

На Windows и при использовании метода запуска `spawn` Python повторно импортирует главный модуль в каждом дочернем процессе. Без защиты `__main__` дочерний процесс попытается рекурсивно порождать собственных потомков, вызывая бесконечную fork-бомбу. Эта защита обязательна на Windows и является лучшей практикой на всех платформах.

В: В чём разница между `pool.map()` и `pool.imap()`?

`pool.map()` немедленно потребляет весь итерируемый объект, сериализует все элементы, распределяет их по рабочим процессам и блокируется до сбора всех результатов в список. `pool.imap()` является ленивым — он отправляет элементы постепенно и возвращает итератор, что делает его эффективным по памяти для очень больших наборов данных. Используйте `imap`, когда входной итерируемый объект не помещается в память комфортно.

В: Могут ли процессы Python multiprocessing совместно использовать соединение с базой данных?

Нет. Соединения с базами данных не сериализуемы и не могут передаваться между процессами. Каждый рабочий процесс должен устанавливать собственное соединение. Используйте библиотеку пула соединений (например, `SQLAlchemy` с `pool_pre_ping=True`), инициализированную внутри рабочей функции, а не в родительском процессе.

В: Как корректно обрабатывать прерывания клавиатуры (Ctrl+C) в пуле multiprocessing?

Оберните вызов `pool.map()` в блок `try/except KeyboardInterrupt` и вызовите `pool.terminate()` с последующим `pool.join()` в блоке `except`. Кроме того, установите рабочие процессы как демон-процессы, если хотите, чтобы они автоматически завершались при уничтожении родителя. Без явной обработки рабочие процессы могут продолжать выполняться как осиротевшие после прерывания родителя.

В: Безопасно ли использовать Python multiprocessing с `fork` на macOS?

Начиная с Python 3.8, метод запуска по умолчанию на macOS изменился с `fork` на `spawn` именно потому, что `fork` в сочетании со средой выполнения Objective-C macOS и определёнными C-расширениями (включая используемые NumPy и PyTorch) вызывал взаимоблокировки. Всегда используйте `spawn` или `forkserver` на macOS и явно задавайте метод запуска, а не полагайтесь на значения по умолчанию, которые различаются в разных операционных системах.

15%

Сэкономьте 15% на всех хостинговых услугах

Проверьте свои навыки и получите скидку на любой тарифный план

Используйте код:

Skills
Начать