Конвертация видеопотока в циклический буфер видеофрагментов фиксированной продолжительности с помощью OpenCV, Python и ZeroMQ

Задача, описываемая в данной статье часто встречается в решениях интеллектуальной видеоаналитики. В общем случае, пользователю требуется предоставить доступ к фрагменту, в котором содержатся некоторые события. Фрагмент – это набор из одного или более небольших видеофайлов, которые содержат как сам инцидент, так и связанную с ним историю и развитие ситуации.

Такую задачу удобно решать с помощью формирования циклического буфера из видеофрагментов, разбитых по небольшим файлам, например, по 30 секунд. В этом случае, когда приложение обнаруживает, что в некоторых из них содержатся важные сигналы, оно копирует файлы, которые формируют окружение сигнала из данного циклического буфера в постоянное хранилище. Буфер является циклическим, поскольку старые файлы, созданные давно, удаляются с диска (например, после прошествия 10 минут).

В данной статье мы рассмотрим реализацию такого циклического буфера, который подключается к основному пайплайну обработки видео и обслуживает создание и удаление видеофайлов. Для решения задачи будем использовать OpenCV, Python, LZ4 и ZeroMQ. Для простоты считаем, что FPS видеофайлов соответствует FPS потока, то есть все видеофреймы из основного потока пишутся в файлы. В реальных реализациях может иметь место удаление избыточных фреймов из потока, изменение разрешения и т.п.

Обобщенная архитектура решения представлена ниже:

В рамках данной архитектуры имеется основной пайплайн обработки, который представлен цепочкой: Decode → Pub → Process → DB, который выполняет аналитическую функции и может быть сам представлен распределенным графом обработки. Для сохранения видеофрагментов используется дополнительный пайплайн обработки, который представлен цепочкой Sub → Encode → Файлы. Фрейм передается из основного пайплайна в дополнительный посредством механизма Pub/Sub ZeroMQ. Этот механизм идеально подходит для решения данной задачи поскольку:

  • основной пайплайн не блокируется, если дополнительный пайплайн не запущен (свойство Pub/Sub);
  • несколько дополнительных пайплайнов могут использоваться параллельно – к примеру, первый сохраняет видео как есть, а второй сохраняет видео с другим FPS в масштабе 1/4;
  • ZeroMQ может использовать несколько типов транспорта – сетевые сокеты unicast или multicast, сокеты Unix, что позволяет реализовать распределенную обработку без лишних усилий и дополнительного программного обеспечения.

Поскольку передаваемый фрейм занимает существенный объем памяти, для уменьшения требования к сетевому каналу и пропуска 2K видео через канал 1Gbit/s мы будем использовать сжатие фрейма с помощью LZ4. Рассмотрим саму реализацию на Python.

Основной пайплайн

Примитивная реализация основного пайплайна выглядит следующим образом:

import cv2
import zmq
import lz4.frame
import pickle
from time import time

cap = cv2.VideoCapture(0)

context = zmq.Context()
dst = context.socket(zmq.PUB)
dst.bind("tcp://127.0.0.1:5557")

frameno = 0
COMPRESSED = True

while True:
    ret, frame = cap.read()
    ts = time()
    frameno += 1
    if COMPRESSED:
        dst.send(lz4.frame.compress(pickle.dumps(frame)))
    else:
        dst.send_pyobj(frame)
    dst.send_pyobj(dict(ts=ts, frameno=frameno))

Данный пайплайн не блокируется, даже в отсутствии хотя бы одного подписчика, что удобно.

Дополнительный пайплайн

Данный пайплайн получает сообщения из PUB/SUB ZeroMQ и формирует циклический буфер:

import zmq
import cv2
from time import time
import os
import glob
import lz4.frame
import pickle


class SplitWriter:
    def __init__(self, split_size=30,
                 pub_address="tcp://127.0.0.1:5557",
                 directory='/tmp',
                 split_history=2,
                 split_prefix='split',
                 compressed=True,
                 fps=30):
        self.pub_address = pub_address
        self.split_size = split_size
        self.directory = directory
        self.split_history = split_history
        self.fps = fps
        self.compressed = compressed
        self.split_prefix = split_prefix

        self.zmq_context = zmq.Context()
        self.src = self.zmq_context.socket(zmq.SUB)
        self.src.connect(self.pub_address)
        self.src.setsockopt_string(zmq.SUBSCRIBE, "")

        self.current_split = 0
        self.new_split = 0
        self.writer = None
        self.last_frame_delay = 0
        self.remote_frameno = 0
        self.frameno = 0

    def _gen_split_name(self):
        return os.path.join(self.directory, self.split_prefix + '.%d.%d.avi' % (self.current_split, self.split_size))

    def _start_new_split(self, frame):
        self.current_split = int(time())
        self.new_split = self.current_split + self.split_size
        if self.writer:
            self.writer.release()
        self.writer = cv2.VideoWriter(self._gen_split_name(),
                                      cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'),
                                      self.fps, (frame.shape[1], frame.shape[0]))
        print("++", self._gen_split_name(),
              "Last_Frame_Delay", self.last_frame_delay,
              "Frame_Delta", self.remote_frameno - self.frameno)
        self._clear_old_splits()

    def write(self):
        if self.compressed:
            frame = pickle.loads(lz4.frame.decompress(self.src.recv()))
        else:
            frame = self.src.recv_pyobj()
        meta = self.src.recv_pyobj()
        now = time()
        self.frameno += 1
        self.remote_frameno = meta['frameno']
        self.last_frame_delay = int((now - meta['ts']) * 1000)
        if now > self.new_split:
            self._start_new_split(frame)
        self.writer.write(frame)

    def _clear_old_splits(self):
        for f in glob.glob(os.path.join(self.directory, self.split_prefix + '.*.*.avi')):
            parts = f.split('.')
            ts = int(parts[-3])
            if ts < time() - self.split_size * self.split_history:
                print("--", f)
                os.unlink(f)

    def release(self):
        self.writer.release()
        self.src.close()
        self.zmq_context.destroy()


if __name__ == "__main__":
    w = SplitWriter(split_history=3, split_size=5)
    while True:
        w.write()

ZeroMQ самостоятельно обрабатывает исключительные ситуации на PUB/SUB, поэтому оба пайплайна можно как запускать, так и перезапускать независимо друг от друга. Для удобства мониторинга при добавлении нового сегмента выводится отставание от основного пайплайна в количестве фреймов и миллисекундах.

Имена файлов формируются в формате PREFIX.ID.SPLITSIZE.avi, где PREFIX – произвольная строка, ID – UNIX timestamp, а SPLITSIZE – размер части в секундах.

Для сборки требуются следующие пакеты:

opencv-python
pyzmq
lz4
numpy

Если вам понравился этот пост, поделитесь им с друзьями.