Совместный доступ к структурам Ctypes и массивам NumPy в неродственных процессах с помощью разделяемой памяти POSIX в Python

Совместное использование различных средств межпроцессного взаимодействия отлично поддерживается в Python с помощью стандартных библиотек, таких как Threading и Multiprocessing. Однако, данные средства предназначены для организации различных механизмов IPC между родственными процессами, то есть такими, которые порождаются из общего предка и наследуют объекты IPC. Однако, часто возникает задача использования средств IPC между неродственными процессами, которые запускаются независимо. В этом случае применяются именованные объекты IPC (POSIX или SysV), которые позволяют неродственным процессам получить объект IPC по уникальному имени. Такое взаимодействие не поддерживается стандартными инструментами Python.

В Python 3.8 появилась библиотека multiprocessing.shared_memory, которая представляет собой первый шаг к реализации средств IPC, предназначенных для связи неродственных процессов. Данная статья как раз была задумана как проба этой библиотеки. Однако, все пошло не так. На 29 ноября 2019 года реализация разделяемой памяти в этой библиотеке некорректна – объект разделяемой памяти удаляется даже в том случае, если вы просто хотите прекратить использовать объект в одном из процессов, не удаляя его. Несмотря на наличие двух вызовов close() и unlink(), независимо от их вызова или невызова, объект удаляется когда любой из процессов-пользователей объекта завершает свою работу.

Решить проблему мы смогли с помощью сторонней реализации POSIX IPC, которая хоть и является низкоуровневой, но прекрасно работает. Далее, мы реализуем две программы:

  • write.py, которая будет читать фрейм OpenCV с камеры (NumPy Ndarray) и передавать его через разделяемую память программе read.py;
  • read.py, которая получает фрейм OpenCV из разделяемой памяти и отображает его на экране с помощью стандартного инструментария OpenCV.

Зачем это надо. Передача и совместное использование объектов между процессами посредством разделяемой памяти значительно эффективнее чем сериализации и десериализация (поскольку практически бесплатна), поэтому актуальна для организации обмена данными с минимальной задержкой между процессами в рамках одного узла. К сожалению, при необходимости обмена данными между несколькими узлами необходимо использовать традиционные подходы, основанные на передаче сообщений.

При реализации мы продемонстрируем:

  • семафор POSIX для синхронизации двух неродственных процессов;
  • структуру Ctypes, данные которой хранятся в разделяемой памяти;
  • буфер для хранения массива NumPy Ndarray в разделяемой памяти.

Исходный код прокта можно найти в репозитории на GitHub: https://github.com/bwsw/shared-ctypes-numpy-posix-ipc-python.

Структура с метаданными

from ctypes import Structure, c_int32, c_int64


class MD(Structure):
    _fields_ = [
        ('shape_0', c_int32),
        ('shape_1', c_int32),
        ('shape_2', c_int32),
        ('size', c_int64),
        ('count', c_int64)
    ]

В данной структуре мы определим метаданные, которые будем использовать для передачи спецификации фрейма. Через нее мы будем передавать читателю (read.py) какой размер буфера мы используем и в какая форма массива NumPy Ndarray должна использоваться при восстановлении объекта из буфера разделяемой памяти. Данный код должен быть размещен в файле structures.py.

Писатель

Писатель получает фрейм из видеокамеры с помощью OpenCV и передает его спецификацию через структуру метаданных в процесс-читатель. На основании этих данных читатель сможет подключиться к буферу разделяемой памяти, который используется для передачи самого фрейма.

Для защиты от ситуации гонки за ресурсы используется бинарный семафор, что позволяет не бояться ситуации, когда читатель может прочить не полностью обновленные данные из буфера метаданных и буфера фрейма.

import cv2
import numpy as np
import mmap
from posix_ipc import Semaphore, O_CREX, ExistentialError, O_CREAT, SharedMemory, unlink_shared_memory
from ctypes import sizeof, memmove, addressof, create_string_buffer
from structures import MD

md_buf = create_string_buffer(sizeof(MD))


class ShmWrite:
    def __init__(self, name):
        self.shm_region = None

        self.md_region = SharedMemory(name + '-meta', O_CREAT, size=sizeof(MD))
        self.md_buf = mmap.mmap(self.md_region.fd, self.md_region.size)
        self.md_region.close_fd()

        self.shm_buf = None
        self.shm_name = name
        self.count = 0

        try:
            self.sem = Semaphore(name, O_CREX)
        except ExistentialError:
            sem = Semaphore(name, O_CREAT)
            sem.unlink()
            self.sem = Semaphore(name, O_CREX)
        self.sem.release()

    def add(self, frame: np.ndarray):
        byte_size = frame.nbytes
        if not self.shm_region:
            self.shm_region = SharedMemory(self.shm_name, O_CREAT, size=byte_size)
            self.shm_buf = mmap.mmap(self.shm_region.fd, byte_size)
            self.shm_region.close_fd()

        self.count += 1
        md = MD(frame.shape[0], frame.shape[1], frame.shape[2], byte_size, self.count)
        self.sem.acquire()
        memmove(md_buf, addressof(md), sizeof(md))
        self.md_buf[:] = bytes(md_buf)
        self.shm_buf[:] = frame.tobytes()
        self.sem.release()

    def release(self):
        self.sem.acquire()

        self.md_buf.close()
        unlink_shared_memory(self.shm_name + '-meta')

        self.shm_buf.close()
        unlink_shared_memory(self.shm_name)

        self.sem.release()
        self.sem.close()


if __name__ == '__main__':

    cap = cv2.VideoCapture(0)

    shm_w = ShmWrite('abc')

    try:
        while True:
            ret, frame = cap.read()
            shm_w.add(frame)
    except KeyboardInterrupt:
        pass

    shm_w.release()

Читатель

Читатель получает из буфера метаданных спецификацию фрейма и воссоздает по ней массив NumPy Ndarray требуемой размерности. Для избежания ситуации чтения нецелостных данных используется бинарный семафор.

import cv2
import numpy as np
import mmap
from posix_ipc import Semaphore, SharedMemory, ExistentialError
from ctypes import sizeof, memmove, addressof, create_string_buffer
from time import sleep

from structures import MD

md_buf = create_string_buffer(sizeof(MD))


class ShmRead:
    def __init__(self, name):
        self.shm_buf = None
        self.md_buf = None

        while not self.md_buf:
            try:
                print("Waiting for MetaData shared memory is available.")
                md_region = SharedMemory(name + '-meta')
                self.md_buf = mmap.mmap(md_region.fd, sizeof(MD))
                md_region.close_fd()
                sleep(1)
            except ExistentialError:
                sleep(1)

        self.shm_name = name
        self.sem = Semaphore(name, 0)

    def get(self):
        md = MD()

        self.sem.acquire()
        md_buf[:] = self.md_buf
        memmove(addressof(md), md_buf, sizeof(md))
        self.sem.release()

        while not self.shm_buf:
            try:
                print("Waiting for Data shared memory is available.")
                shm_region = SharedMemory(name=self.shm_name)
                self.shm_buf = mmap.mmap(shm_region.fd, md.size)
                shm_region.close_fd()
                sleep(1)
            except ExistentialError:
                sleep(1)

        self.sem.acquire()
        f = np.ndarray(shape=(md.shape_0, md.shape_1, md.shape_2), dtype='uint8', buffer=self.shm_buf)
        self.sem.release()
        return f

    def release(self):
        self.md_buf.close()
        self.shm_buf.close()


if __name__ == '__main__':

    shm_r = ShmRead('abc')

    while True:
        sleep(0.03)
        f = shm_r.get()
        cv2.imshow('frame', f)
        if cv2.waitKey(1) & 0xFF == ord('q'):
             break

    shm_r.release()

Зависимости

В проекте используются следующие зависимости:

opencv-python
numpy
posix_ipc

Объединение пространства IPC для приложений в контейнерах Docker

Рассмотрим, как можно использовать разделяемую память в Docker, поскольку эта среда является самой популярной при распространении современных приложений и позволяет в полной мере раскрыть возможности разделяемой памяти POSIX, описанные в настоящей статье.

По умолчанию контейнеры Docker не разделяют объекты IPC, что не позволяет использовать разделяемую память приложениям, выполняющимся в разных контейнерах. Это правильное поведение, поскольку оно отражает идею контроля и ограничения доступа к ресурсам контейнера и изоляцию ресурсов между контейнерами. Однако, существует стандартный механизм Docker, который позволяет объединить пространства IPC нескольких контейнеров. Рассмотрим как это сделать.

Создадим контейнер, который будет использоваться для тестирования кода, с помощью Dockerfile следующего вида:

FROM python:3.8

MAINTAINER Bitworks Software info@bitworks.software

RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections

RUN DEBIAN_FRONTEND=noninteractive apt-get update
RUN DEBIAN_FRONTEND=noninteractive apt-get install -y -q python3-pip python-dev
RUN pip3 install --upgrade pip
RUN pip3 install opencv-python numpy posix_ipc
COPY ./*.py /opt/

WORKDIR /opt

ENTRYPOINT ["/usr/local/bin/python3.8"]
CMD ["/opt/write.py"]

Выполним его сборку:

docker build -t opencv .

Теперь можно запустить процесс write.py, настроив проброс видеокамеры внутрь:

docker run -it --rm --name write --device /dev/video0:/dev/video0 opencv

Процесс запущен в интерактивном режиме. Откройте еще одну консоль для запуска процесса read.py.

Убедимся, что процесс read.py не имеет доступ к IPC контейнера write по умолчанию:

xhost +local:
docker run -it --name read --rm -e DISPLAY=unix$DISPLAY -v /tmp/.X11-unix:/tmp/.X11-unix opencv /opt/read.py

Waiting for MetaData shared memory is available.
Waiting for MetaData shared memory is available.
Waiting for MetaData shared memory is available.
...

Сообщения свидетельствуют о том, что объекты IPC контейнера write недоступны.

При запуске используется проброс доступа к X11 в контейнер, иначе у приложения read.py не будет возможности использовать средства отображения изображения на экране.

Теперь выполним запуск с объединением IPC с контейнером write:

docker run -it --name read --ipc container:write --rm -e DISPLAY=unix$DISPLAY -v /tmp/.X11-unix:/tmp/.X11-unix opencv /opt/read.py

Сейчас вы должны увидеть окно с изображением с вебкамеры, которое формируется посредством передачи фрейма через разделяемую память между контейнерами write и read через объединенное пространство IPC.

Заключение

Исходный код прокта можно найти в репозитории на GitHub: https://github.com/bwsw/shared-ctypes-numpy-posix-ipc-python.

Современные стандартные инструменты Python не позволяют реализовать взаимодействие неродственных процессов с помощью POSIX IPC или SystemV IPC, однако, с помощью сторонних библиотек данное взаимодействие возможно. Разработчики Python прилагают усилия для включения в стандартный инструментарий средств для организации такого взаимодействия, что позволяет надеяться на полноценную поддержку таких механизмов IPC в будущем.