Задача, описываемая в данной статье часто встречается в решениях интеллектуальной видеоаналитики. В общем случае, пользователю требуется предоставить доступ к фрагменту, в котором содержатся некоторые события. Фрагмент – это набор из одного или более небольших видеофайлов, которые содержат как сам инцидент, так и связанную с ним историю и развитие ситуации.
Такую задачу удобно решать с помощью формирования циклического буфера из видеофрагментов, разбитых по небольшим файлам, например, по 30 секунд. В этом случае, когда приложение обнаруживает, что в некоторых из них содержатся важные сигналы, оно копирует файлы, которые формируют окружение сигнала из данного циклического буфера в постоянное хранилище. Буфер является циклическим, поскольку старые файлы, созданные давно, удаляются с диска (например, после прошествия 10 минут).
В данной статье мы рассмотрим реализацию такого циклического буфера, который подключается к основному пайплайну обработки видео и обслуживает создание и удаление видеофайлов. Для решения задачи будем использовать OpenCV, Python, LZ4 и ZeroMQ. Для простоты считаем, что FPS видеофайлов соответствует FPS потока, то есть все видеофреймы из основного потока пишутся в файлы. В реальных реализациях может иметь место удаление избыточных фреймов из потока, изменение разрешения и т.п.
Обобщенная архитектура решения представлена ниже:
В рамках данной архитектуры имеется основной пайплайн обработки, который представлен цепочкой: Decode → Pub → Process → DB, который выполняет аналитическую функции и может быть сам представлен распределенным графом обработки. Для сохранения видеофрагментов используется дополнительный пайплайн обработки, который представлен цепочкой Sub → Encode → Файлы. Фрейм передается из основного пайплайна в дополнительный посредством механизма Pub/Sub ZeroMQ. Этот механизм идеально подходит для решения данной задачи поскольку:
- основной пайплайн не блокируется, если дополнительный пайплайн не запущен (свойство Pub/Sub);
- несколько дополнительных пайплайнов могут использоваться параллельно – к примеру, первый сохраняет видео как есть, а второй сохраняет видео с другим FPS в масштабе 1/4;
- ZeroMQ может использовать несколько типов транспорта – сетевые сокеты unicast или multicast, сокеты Unix, что позволяет реализовать распределенную обработку без лишних усилий и дополнительного программного обеспечения.
Поскольку передаваемый фрейм занимает существенный объем памяти, для уменьшения требования к сетевому каналу и пропуска 2K видео через канал 1Gbit/s мы будем использовать сжатие фрейма с помощью LZ4. Рассмотрим саму реализацию на Python.
Основной пайплайн
Примитивная реализация основного пайплайна выглядит следующим образом:
import cv2
import zmq
import lz4.frame
import pickle
from time import time
cap = cv2.VideoCapture(0)
context = zmq.Context()
dst = context.socket(zmq.PUB)
dst.bind("tcp://127.0.0.1:5557")
frameno = 0
COMPRESSED = True
while True:
ret, frame = cap.read()
ts = time()
frameno += 1
if COMPRESSED:
dst.send(lz4.frame.compress(pickle.dumps(frame)))
else:
dst.send_pyobj(frame)
dst.send_pyobj(dict(ts=ts, frameno=frameno))
Данный пайплайн не блокируется, даже в отсутствии хотя бы одного подписчика, что удобно.
Дополнительный пайплайн
Данный пайплайн получает сообщения из PUB/SUB ZeroMQ и формирует циклический буфер:
import zmq
import cv2
from time import time
import os
import glob
import lz4.frame
import pickle
class SplitWriter:
def __init__(self, split_size=30,
pub_address="tcp://127.0.0.1:5557",
directory='/tmp',
split_history=2,
split_prefix='split',
compressed=True,
fps=30):
self.pub_address = pub_address
self.split_size = split_size
self.directory = directory
self.split_history = split_history
self.fps = fps
self.compressed = compressed
self.split_prefix = split_prefix
self.zmq_context = zmq.Context()
self.src = self.zmq_context.socket(zmq.SUB)
self.src.connect(self.pub_address)
self.src.setsockopt_string(zmq.SUBSCRIBE, "")
self.current_split = 0
self.new_split = 0
self.writer = None
self.last_frame_delay = 0
self.remote_frameno = 0
self.frameno = 0
def _gen_split_name(self):
return os.path.join(self.directory, self.split_prefix + '.%d.%d.avi' % (self.current_split, self.split_size))
def _start_new_split(self, frame):
self.current_split = int(time())
self.new_split = self.current_split + self.split_size
if self.writer:
self.writer.release()
self.writer = cv2.VideoWriter(self._gen_split_name(),
cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'),
self.fps, (frame.shape[1], frame.shape[0]))
print("++", self._gen_split_name(),
"Last_Frame_Delay", self.last_frame_delay,
"Frame_Delta", self.remote_frameno - self.frameno)
self._clear_old_splits()
def write(self):
if self.compressed:
frame = pickle.loads(lz4.frame.decompress(self.src.recv()))
else:
frame = self.src.recv_pyobj()
meta = self.src.recv_pyobj()
now = time()
self.frameno += 1
self.remote_frameno = meta['frameno']
self.last_frame_delay = int((now - meta['ts']) * 1000)
if now > self.new_split:
self._start_new_split(frame)
self.writer.write(frame)
def _clear_old_splits(self):
for f in glob.glob(os.path.join(self.directory, self.split_prefix + '.*.*.avi')):
parts = f.split('.')
ts = int(parts[-3])
if ts < time() - self.split_size * self.split_history:
print("--", f)
os.unlink(f)
def release(self):
self.writer.release()
self.src.close()
self.zmq_context.destroy()
if __name__ == "__main__":
w = SplitWriter(split_history=3, split_size=5)
while True:
w.write()
ZeroMQ самостоятельно обрабатывает исключительные ситуации на PUB/SUB, поэтому оба пайплайна можно как запускать, так и перезапускать независимо друг от друга. Для удобства мониторинга при добавлении нового сегмента выводится отставание от основного пайплайна в количестве фреймов и миллисекундах.
Имена файлов формируются в формате PREFIX
.ID
.SPLITSIZE
.avi, где PREFIX
– произвольная строка, ID
– UNIX timestamp, а SPLITSIZE
– размер части в секундах.
Для сборки требуются следующие пакеты:
opencv-python
pyzmq
lz4
numpy
Если вам понравился этот пост, поделитесь им с друзьями.