mybpm!
Скачать в формате: PDF pdf DOCX word

Настройка Kafka Consumer-ов (кафка-получателей)

Программе сервера можно указать переменную окружения MYBPM_CONSUMER_DIR например так:

MYBPM_CONSUMER_DIR=cons3

Тогда в среде конфигурирования будет создана директория:

/mybpm/consumers-cons3/

И в этой директории будут созданы файлы конфигурации для всех кафка-получателей, которые активированы на данном сервере.

Далее эта директория будет называться директорией конфигурации кафка-получателей.

Активацией тех или иных кафка-получателей занимаются переменные окружения:

MYBPM_ALL_CONSUMERS_OFF, MYBPM_ALL_KAFKA_NOTIFICATIONS_OFF

Подробнее про них смотрите в разделе Переменные окружения.

Если переменная окружения MYBPM_CONSUMER_DIR не задана, то директорией конфигурации кафка-получателей будет директория:

/mybpm/consumers/

Структура и значения конфигурационных файлов в директории конфигураций кафка-получателей

Откроем например файл:

BoiEvents.workerCount

В нём будет такое содержимое:

# Генерирует события инстанций БО по их изменениям и кидает их в кафку
boiKafkaToBoiEventKafka = 1

# Парсит события инстанций БО для отображения их на клиенте
applyBoiEventKafkaToMongo = 1

(некоторые строки не показаны).

В этом файле строки, которые начинаются с решётки (#) являются комментарием и компьютер из игнорирует. Они нужны для описания человеку.

Параметр:

boiKafkaToBoiEventKafka = 1

Обозначает, что нужно запустить один поток на каждом сервере, который будет обслуживать данный кафка-продюсер. Что это за продюсер описано в комментарии.

Если единицу поменять на тройку:

boiKafkaToBoiEventKafka = 3

То это обозначает, что на одном сервере для данного кафка-получателя запуститься три потока. Соответственно, если у нас два сервера, то обслуживать данный получатель будут шесть потоков.

Данный кафка-получатель boiKafkaToBoiEventKafka считывает данные с кафка-топика BOI. Если у этого топика имеется 48 партиций, то эти партиции поровну поделятся между всеми шести потоками, т.е. 48/6 = 8 патриций будет обслуживаться каждым потоком.

За соотношением количества партиций и общим количеством потоков нужно следить и не допускать, что количество потоков будет превышено количество партиций. Если это будет превышено, то некоторые потоки будут крутиться вхолостую, не обрабатывая данные, но занимая ресурсы процессора и ОЗУ.

Если случилось так, что количество потоков нужно увеличивать, а партиций уже не хватает, то нужно у кафка-топика увеличивать количество партиций.

Распределение обязанностей между несколькими серверами MyBPM

Лучшей практикой считается распределение кафка-получателей между серверами MyBPM. По умолчанию сервер MyBPM обслуживает все кафка-получатели. Это может привести к повышенной нагрузке на один сервер, и её нужно распределить между несколькими серверами, для этого нужно уметь настраивать потоки кафка-получателей по-разному на разных серверах. Для этого предназначена переменная окружения MYBPM_CONSUMER_DIR, которая может задать индивидуальное расположение конфигурации кафка-получателей. Для этого нужно настроить распределение обязанностей между несколькими серверами MyBPM.

При высокой нагрузке рекомендуется иметь четыре сервера MyBPM, которые будут обслуживать разные обязанности. Давайте придумает им обозначения в соответствии с их обязанностями. А именно:

Здесь api обслуживает запросы пользователей, поэтому на ней отключены кафка-получатели вообще с помощью переменной окружения: MYBPM_ALL_CONSUMERS_OFF = true.

А cons1, cons2, cons3 - обслуживают только кафка-получатели, а запросы пользователей не обслуживают. Поэтому на них отключены напоминания с помощью переменной окружения MYBPM_ALL_KAFKA_NOTIFICATIONS_OFF = true. Также средствами Kubernetes нужно запретить обращаться к этим серверам по сервисам, иначе предоставляемый функционал будет неполноценным.

В результате данных настроек будут созданы следующие директории в конфигурационной среде:

С одинаковыми иерархиями файлов конфигураций кафка-получателей, в которых всем получателям будет прописан один поток. Ну а в результате каждый получатель будет обслуживать три потока из серверов: cons1, cons2, cons3.

Для api тоже будет создана конфигурации, но там будут только оповещения. Их можно оставить без изменений. Оповещения обслуживают пользователей, работающие с платформой через браузер.

И теперь нужно пройтись по всем этим конфигурационным файлам и включить и выключить те или иные потоки.

Рекомендуется сделать следующие настройки (тут указываются настройки только в файлах с расширением .workerCount, остальные конфигурационные файлы будут расписаны ниже):

api - на нём нужно настроить оповещения, следующим образом:

Если будут остальные в этой директории и в поддиректориях, то их нужно поставить в 0.

cons1 - выделить для миграции данных, для этого нужно активировать следующие настройки:

Остальные в этой директории нужно поставить в 0.

cons2 - нужно выделить для запуска процессов, для этого нужно активировать следующие настройки:

Остальные в этой директории нужно поставить в 0.

cons3 - нужно выделить для обработки остальных кафка-получателей. Нужно пройтись по директории

И во всех перечисленных выше файлах, но лежащих в этой директории, нужно поставить 0. А остальные поставить в 1.

Другие файлы конфигурации кафка-получателей

Кроме файлов с расширением .workerCount есть ещё файлы с именем params.config - они служат для настройки других параметров кафка-получателей. Этот файл действует на кафка-получателей, которые находятся в той же директории - на другие под-директории и над-директории он не действует.

Например, файл:

/mybpm/consumers-cons3/params.config

Действует на кафка-получатель:

/mybpm/consumers-cons3/OutMigrationFromKafka.workerCount

И не действует на кафка-получатель:

/mybpm/consumers-cons2/run_process/RunProcess.workerCount

В то же время на этого получателя действует файл:

/mybpm/consumers-cons2/run_process/params.config

Вот пример файла params.config:

con.auto.commit.interval.ms = 1000
con.fetch.min.bytes = 1
con.max.partition.fetch.bytes = 141943040
con.connections.max.idle.ms = 540000
con.default.api.timeout.ms = 60000
con.fetch.max.bytes = 52428800
con.retry.backoff.ms = 2000
con.session.timeout.ms = 45000
con.heartbeat.interval.ms = 5000
con.max.poll.interval.ms = 3000000
con.max.poll.records = 500
con.receive.buffer.bytes = 65536
con.request.timeout.ms = 30000
con.send.buffer.bytes = 131072
con.fetch.max.wait.ms = 500
out.worker.count = 1
out.poll.duration.ms = 800

В нём параметр: out.worker.count - обозначает количество потоков по умолчанию. А параметр: out.poll.duration.ms - обозначает количество миллисекунд, которые тратятся на ожидание данных с брокера в реализации кафка-получателя на платформе. Значение 800 самое подходящее - вам его никогда, скорее всего, менять не придётся.

Параметры, которые начинаются с префикса: con. можно прочитать в документации к Apache Kafka, только этот префикс нужно убрать, а именно, например нам нужно узнать что значить параметр:

con.retry.backoff.ms = 2000

Убираем префикс con. и получаем параметр:

retry.backoff.ms = 2000

Теперь в документации к Apache Kafka ищем что обозначает этот параметр и находим следующее описание:

https://kafka.apache.org/documentation/#producerconfigs_retry.backoff.ms

Так же можно добавить и другие параметры, которых здесь нет, но, при этом, нужно не забыть добавить префикс con..

Применение изменений файлов конфигурации кафка-получателей

Параметры применяются "на горячую" - это обозначает, что система постоянно отслеживает изменения этих файлов. И как только это произошло, она тут же их применяет - лаг составляет не больше пары секунд.

Настройка масштабирования кафка получателей

Давайте разберёмся в масштабировании с одним кафка получателем.

Пусть имеется запущенный сервер компонента с помощью yaml-файла, который приведён здесь: Установка компонента MyBPM Cons1

И пусть мы настраиваем входящую миграцию через кафку. Для неё предусмотрен параметр:

Из пути конфига consumers-cons1 мы видим, что с ним работает сервер, у которого определена переменная окружения:

MYBPM_CONSUMER_DIR=cons1

В yaml-файле, фрагмент которого изображён ниже.

Фрагмент файла: 30-mybpm-cons1.yaml

          env:
            - { name: MYBPM_JAVA_OPTS, value: "-Xmx2Gi -Xms2Gi" }
            - { name: MYBPM_ALL_CONSUMERS_OFF, value: "true" }
            - { name: MYBPM_CONSUMER_DIR, value: "cons1" }  # <------------------- ВОТ
            - { name: MYBPM_COMPANY_CODE, value: "greetgo" }

Так же в этом yaml-файле есть параметр replicas:

Фрагмент файла: 30-mybpm-cons1.yaml

spec:
  selector:
    matchLabels:
      app: mybpm-cons1
  replicas: 1  # <-------------------- ВОТ
  template:
    metadata:

На данный момент in_migration = 1 и replicas: 1 - это означает, что запущен один сервер, и на этом сервере запущен один поток, который обслуживает данный кафка получатель (входящая миграция данных).

И пусть теперь Ваша задача ускорить эту миграцию. Вначале вы увеличиваете количество потоков, обслуживающие данный получатель, установив:

in_migration = 16

Нужно поменять конфигурационный файл и сохранить его. Перезапускать сервер не нужно - система обнаружит изменения файла и автоматически запустит новые потоки. А кафка брокеры перестроятся и начнут в данном случае отдавать 16-ти потокам данного получателя.

При увеличении количества потоков, миграция должна ускориться.

Если увеличение количество потоков не приводит к ускорению миграции, например потому что упёрлись в производительность CPU, то нужно увеличить количество подов данного сервера. Для этого в yaml-файле увеличим параметр:

Фрагмент файла: 30-mybpm-cons1.yaml

  replicas: 3

Применив этот файл, Kubernetes запустит три сервера. Каждый сервер прочитает конфигурационный параметр in_migration = 16 и запустит 16 потоков на обслуживание данного получателя. Итого количество потоков, которые обслуживают данный получатель, станет:

3 * 16 = 48

Сорок восемь потоков будут обслуживать данный получатель.

Соответственно скорость обработки данного получателя должна тоже увеличиться.

ВАЖНО: Кафка устроена так, что одна партиция не может обслуживаться несколькими потоками одновременно. Соответственно, если Вы настроили 50 потоков, а у кафка-топика всего 48 партиций, то получается, что два потока будут крутиться бездействия.

Если нужно увеличивать количество потоков, а партиций мало, то количество партиций тоже нужно увеличивать. Это можно сделать в кафке.

По умолчанию каждому топику даётся 48 партиций. Если этого оказалось недостаточно, то сделайте настройку по умолчанию на 480 топиков и перезапустите кафка брокеры. Изменить значение по умолчанию недостаточно, нужно ещё пройтись по всем кафка топиками и индивидуально у каждого поменять это количество.