Интеграция Apache CloudStack со сторонними системами. Подписка на события с помощью Apache Kafka

logo

В данной статье рассматривается подход к интеграции Apache CloudStack (ACS) со сторонними системами посредством экспорта событий в брокер очередей сообщений Apache Kafka.

В современном мире полноценное оказание услуг без интеграции продуктов практически невозможно. В случае сетевых и облачных сервисов, важной является интеграция с биллинговыми системами, системами мониторинга доступности, службами клиентского сервиса и прочими инфраструктурными и бизнес-ориентирванными компонентами. При этом, продукт, обычно, интегрируется со сторонними системами способами, изображенными на следующей картинке:

Таким образом, продукт предоставляет сторонним сервисам API, который они могут использовать для взаимодействия с продуктом, и поддерживает механизм расширения, который позволяет продукту взаимодействовать с внешними системами через их API.

В разрезе ACS данные функции реализуются следующими возможностями:

  1. Стандартный API — позволяет взаимодействовать с ACS типовым способом.
  2. Плагины API — позволяют разработчикам описывать свои расширения API, предназначенные для реализации специфических взаимодействий.
  3. Экспорт событий — позволяет взаимодействовать с внешними системами при возникновении событий внутри ACS, которые требуют действий от внешних систем.

Итак, ACS предоставляет нам все необходимые способы взаимодействия. В рамках статьи освещается третий способ взаимодействия — Экспорт событий. Вариантов, когда такой способ взаимодействия является полезным достаточно много, приведем несколько примеров:

  • уведомление пользователя посредством сторонних средств, например (SMS, IM) о некоторых событиях, например, событии остановки виртуальной машины;
  • уведомление биллинговой системы о выделении или удалении ресурсов (аккаунтинг, учет, списания);
  • уведомление биллинговой системы о новых аккаунтах ACS.

Вообще, в условиях отсутствия подсистемы уведомления сторонних систем о событиях внутри продукта, существует единственный способ решения такого класса задач — периодический опрос API. Само собой, что способ рабочий, но редко может считаться эффективным.

ACS позволяет экспортировать события в брокеры очередей сообщений двумя способами — с применением протокола AMPQ в RabbitMQ и по протоколу Apache Kafka в Apache Kafka, соответственно. Мы широко используем в своей практике Apache Kafka, поэтому в данной статье рассмотрим как подключить к серверу ACS экспорт событий в эту систему.

Экспорт событий в брокер очередей сообщений VS явный вызов API сторонней системы

При реализации подобных механизмов разработчики часто выбирают между двумя опциями — экспорт событий в брокеры очередей сообщений или явный вызов API третих систем. На наш взгляд, подход с экспортом в брокеры является крайне выгодным, и значительно превосходит по своей гибкости явный вызов API (например, REST с некоторым определенным протоколом). Связано это со следующими свойствами брокеров очередей сообщений:

  1. отказоустойчивость;
  2. высокая производительность и масштабируемость;
  3. возможность отложенной или запаздывающей обработки.

Данных свойств весьма сложно добиться в случае с непосредственным вызовом обрабатывающего кода сторонних систем без ущерба стабильности вызывающей подсистемы продукта. К примеру, представим, что при создании аккаунта требуется отправить SMS с уведомлением. В случае непосредственного вызова кода отправки возможны следующие классы ошибок:

  1. отказ вызываемого кода и неотправка уведомления;
  2. продолжительное выполнение кода на стороне вызываемой системы и возникновение ошибки на стороне вызывающей подсистемы продукта из-за переполнения пула обработчиков.

Единственным преимуществом данного подхода может считаться то, что время между экспортом события и обработкой события сторонней системой может быть меньше, чем в случае экспорта данного события в брокер очередей сообщений, однако требуется обеспечить определенные гарантии по времени и надежности обработки как на стороне вызывающей подсистемы продукта, так и на стороне вызываемой системы.

В том же случае, когда используется экспорт событий в брокер очередей сообщений, который настроен правильным образом (реплицируемая среда), данные проблемы не будут возникать, кроме того, код, обрабатывающий события из очереди брокера сообщений и вызывающий API сторонней системы, может разрабатываться и развертываться, исходя из усредненного ожидания к интенсивности потока событий, без необходимости обеспечения гарантий пиковой обработки.

Обратная сторона использования брокера сообщений — необходимость настройки данного сервиса и наличие опыта его администрирования и решения проблем. Хотя, Apache Kafka является весьма беcпроблемным сервисом, все же, рекомендуется посвятить время его настройке, моделированию аварийных ситуаций и разработке ответных мер.

Настройка экспорта событий ACS в Apache Kafka

В рамках настоящего руководства не уделяется внимание как настроить Apache Kafka для использования в “боевой” среде. Этому посвящено немало профильных руководств. Мы же уделим основное внимание тому, каким образом подключить Kafka к ACS и протестировать экспорт событий.

Для развертывания Kafka будем использовать Docker-контейнер spotify/kafka, который включает в себя все необходимые компоненты (Apache Zookeeper и Kafka) и поэтому отлично подходит для целей разработки.

Установка Docker (из официального гайда для установки в CentOS 7) выполняется элементарно:

# yum install -y yum-utils device-mapper-persistent-data lvm2
# yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
# yum makecache fast
# yum install docker-ce

Настройка Apache Kafka

Развернем контейнер с Apache Kafka:

# docker run -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=10.0.0.66 --env ADVERTISED_PORT=9092 spotify/kafka
c660741b512a

Таким образом, Kafka будет доступен по адресу 10.0.0.66:9092, а Apache Zookeeper по адресу 10.0.0.66:2181. Протестировать Kafka и Zookeeper можно следующим образом:

Создадим и запишем в топик “cs” строку “test”:

# docker exec -i -t c660741b512a \
    bash -c "echo 'test' | /opt/kafka_2.11-0.10.1.0/bin/kafka-console-producer.sh --broker-list 10.0.0.66:9092 --topic cs"
[2017-07-23 08:48:11,222] WARN Error while fetching metadata with correlation id 0 : {cs=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Прочитаем ее же:

# docker exec -i -t c660741b512a \
    /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server=10.0.0.66:9092 --topic cs --offset=earliest --partition=0
test
^CProcessed a total of 1 messages

Если все произошло так, как отображено на врезках кода выше, значит Kafka исправно функционирует.

Настройка Apache CloudStack

При подготовке данной статьи использовался ACS 4.9.2. Следующим шагом настроим экспорт событий в ACS (оригинал документации здесь).

Создадим файл настроек (/etc/cloudstack/management/kafka.producer.properties) для продюсера Kafka, который будет использоваться ACS со следующим содержимым:

bootstrap.servers=10.0.0.66:9092
acks=all
topic=cs
retries=1

Детальное описание настроек Kafka можно найти на странице официальной документации.

При использовании реплицируемого кластера Kafka в строке bootstrap.servers необходимо указать все известные серверы.

Создадим каталог для java bean, активирующего экспорт событий в Kafka:

# mkdir -p /etc/cloudstack/management/META-INF/cloudstack/core

И сам файл конфигурации для bean-a (/etc/cloudstack/management/META-INF/cloudstack/core/spring-event-bus-context.xml) со следующим содержимым:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
                           http://www.springframework.org/schema/context
                           http://www.springframework.org/schema/context/spring-context-3.0.xsd">
   <bean id="eventNotificationBus" class="org.apache.cloudstack.mom.kafka.KafkaEventBus">
     <property name="name" value="eventNotificationBus"/>
   </bean>
</beans>

Перезагрузим управляющий сервер ACS:

# systemctl restart cloudstack-management

Экспортируемые события теперь попадают в топик cs, при этом имеют формат JSON, пример событий отображен далее (отформатирован для удобства):

{
 "Role":"e767a39b-6b93-11e7-81e3-06565200012c",
 "Account":"54d5f55c-5311-48db-bbb8-c44c5175cb2a",
 "eventDateTime":"2017-07-23 14:09:08 +0700",
 "entityuuid":"54d5f55c-5311-48db-bbb8-c44c5175cb2a",
 "description":"Successfully completed creating Account. Account Name: null, Domain Id:1",
 "event":"ACCOUNT.CREATE",
 "Domain":"8a90b067-6b93-11e7-81e3-06565200012c",
 "user":"f484a624-6b93-11e7-81e3-06565200012c",
 "account":"f4849ae2-6b93-11e7-81e3-06565200012c",
 "entity":"com.cloud.user.Account","status":"Completed"
}

{
 "Role":"e767a39b-6b93-11e7-81e3-06565200012c",
 "Account":"54d5f55c-5311-48db-bbb8-c44c5175cb2a",
 "eventDateTime":"2017-07-23 14:09:08 +0700",
 "entityuuid":"4de64270-7bd7-4932-811a-c7ca7916cd2d",
 "description":"Successfully completed creating User. Account Name: null, DomainId:1",
 "event":"USER.CREATE",
 "Domain":"8a90b067-6b93-11e7-81e3-06565200012c",
 "user":"f484a624-6b93-11e7-81e3-06565200012c",
 "account":"f4849ae2-6b93-11e7-81e3-06565200012c",
 "entity":"com.cloud.user.User","status":"Completed"
}

{
 "eventDateTime":"2017-07-23 14:14:13 +0700",
 "entityuuid":"0f8ffffa-ae04-4d03-902a-d80ef0223b7b",
 "description":"Successfully completed creating User. UserName: test2, FirstName :test2, LastName: test2",
 "event":"USER.CREATE",
 "Domain":"8a90b067-6b93-11e7-81e3-06565200012c",
 "user":"f484a624-6b93-11e7-81e3-06565200012c",
 "account":"f4849ae2-6b93-11e7-81e3-06565200012c",
 "entity":"com.cloud.user.User","status":"Completed"
}

Первое событие — создание аккаунта, остальные два — создание пользователей в рамках аккаунта. Для проверки поступления событий в Kafka легче всего воспользоваться уже известным нам способом:

# docker exec -i -t c660741b512a \
    /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server=10.0.0.66:9092 --topic cs --offset=earliest --partition=0

Если события поступают, то дальше можно начинать разрабатывать интеграционные приложения с помощью любых языков программирования, для которых существует интерфейс потребителя для Apache Kafka. Вся настройка занимает 15-20 минут и не представляет никакой сложности даже для новичка.

В случае настройки экспорта событий для “боевой” среды необходимо помнить о следующем:

  1. Настройка должна быть выполнена для каждого управляющего сервера ACS;
  2. Kafka должен быть настроен в реплицируемом варианте (обычно, 3 сервера и репликация x3);
  3. Apache Zookeeper должен быть настроен в реплицируемом варианте (обычно, 3 сервера);
  4. Настройки /etc/cloudstack/management/kafka.producer.properties должны быть подобраны с учетом требуемого уровня надежности доставки событий.
  5. Не забудьте настроить период удаления старых данных для Kafka (например, 1 месяц).