Настройка Kafka Consumer-ов (кафка-получателей)
Программе сервера можно указать переменную окружения MYBPM_CONSUMER_DIR
например так:
MYBPM_CONSUMER_DIR=cons3
Тогда в среде конфигурирования будет создана директория:
/mybpm/consumers-cons3/
И в этой директории будут созданы файлы конфигурации для всех кафка-получателей, которые активированы на данном сервере.
Далее эта директория будет называться директорией конфигурации кафка-получателей.
Активацией тех или иных кафка-получателей занимаются переменные окружения:
MYBPM_ALL_CONSUMERS_OFF, MYBPM_ALL_KAFKA_NOTIFICATIONS_OFF
Подробнее про них смотрите в разделе Переменные окружения.
Если переменная окружения MYBPM_CONSUMER_DIR
не задана, то директорией конфигурации кафка-получателей
будет директория:
/mybpm/consumers/
Структура и значения конфигурационных файлов в директории конфигураций кафка-получателей
Откроем например файл:
BoiEvents.workerCount
В нём будет такое содержимое:
# Генерирует события инстанций БО по их изменениям и кидает их в кафку
boiKafkaToBoiEventKafka = 1
# Парсит события инстанций БО для отображения их на клиенте
applyBoiEventKafkaToMongo = 1
(некоторые строки не показаны).
В этом файле строки, которые начинаются с решётки (#) являются комментарием и компьютер из игнорирует. Они нужны для описания человеку.
Параметр:
boiKafkaToBoiEventKafka = 1
Обозначает, что нужно запустить один поток на каждом сервере, который будет обслуживать данный кафка-продюсер. Что это за продюсер описано в комментарии.
Если единицу поменять на тройку:
boiKafkaToBoiEventKafka = 3
То это обозначает, что на одном сервере для данного кафка-получателя запуститься три потока. Соответственно, если у нас два сервера, то обслуживать данный получатель будут шесть потоков.
Данный кафка-получатель boiKafkaToBoiEventKafka
считывает данные с кафка-топика BOI
. Если у этого топика имеется
48 партиций, то эти партиции поровну поделятся между всеми шести потоками, т.е. 48/6 = 8 патриций будет обслуживаться
каждым потоком.
За соотношением количества партиций и общим количеством потоков нужно следить и не допускать, что количество потоков будет превышено количество партиций. Если это будет превышено, то некоторые потоки будут крутиться вхолостую, не обрабатывая данные, но занимая ресурсы процессора и ОЗУ.
Если случилось так, что количество потоков нужно увеличивать, а партиций уже не хватает, то нужно у кафка-топика увеличивать количество партиций.
Распределение обязанностей между несколькими серверами MyBPM
Лучшей практикой считается распределение кафка-получателей между серверами MyBPM. По умолчанию сервер MyBPM обслуживает
все кафка-получатели. Это может привести к повышенной нагрузке на один сервер, и её нужно распределить между несколькими
серверами, для этого нужно уметь настраивать потоки кафка-получателей по-разному на разных серверах. Для этого
предназначена переменная окружения MYBPM_CONSUMER_DIR
, которая может задать индивидуальное расположение конфигурации
кафка-получателей. Для этого нужно настроить распределение обязанностей между несколькими серверами MyBPM.
При высокой нагрузке рекомендуется иметь четыре сервера MyBPM, которые будут обслуживать разные обязанности. Давайте придумает им обозначения в соответствии с их обязанностями. А именно:
- api
- MYBPM_CONSUMER_DIR = api
- MYBPM_ALL_CONSUMERS_OFF = true
- cons1
- MYBPM_CONSUMER_DIR = cons1
- MYBPM_ALL_KAFKA_NOTIFICATIONS_OFF = true
- cons2
- MYBPM_CONSUMER_DIR = cons2
- MYBPM_ALL_KAFKA_NOTIFICATIONS_OFF = true
- cons3
- MYBPM_CONSUMER_DIR = cons3
- MYBPM_ALL_KAFKA_NOTIFICATIONS_OFF = true
Здесь api
обслуживает запросы пользователей, поэтому на ней отключены кафка-получатели вообще с помощью переменной
окружения: MYBPM_ALL_CONSUMERS_OFF = true
.
А cons1
, cons2
, cons3
- обслуживают только кафка-получатели, а запросы пользователей не обслуживают.
Поэтому на них отключены напоминания с помощью переменной окружения MYBPM_ALL_KAFKA_NOTIFICATIONS_OFF = true
.
Также средствами Kubernetes нужно запретить обращаться к этим серверам по сервисам, иначе предоставляемый функционал
будет неполноценным.
В результате данных настроек будут созданы следующие директории в конфигурационной среде:
- /mybpm/consumers-api/
- /mybpm/consumers-cons1/
- /mybpm/consumers-cons2/
- /mybpm/consumers-cons3/
С одинаковыми иерархиями файлов конфигураций кафка-получателей, в которых всем получателям будет прописан один поток. Ну а в результате каждый получатель будет обслуживать три потока из серверов: cons1, cons2, cons3.
Для api тоже будет создана конфигурации, но там будут только оповещения. Их можно оставить без изменений. Оповещения обслуживают пользователей, работающие с платформой через браузер.
И теперь нужно пройтись по всем этим конфигурационным файлам и включить и выключить те или иные потоки.
Рекомендуется сделать следующие настройки (тут указываются настройки только в файлах с расширением .workerCount, остальные конфигурационные файлы будут расписаны ниже):
api - на нём нужно настроить оповещения, следующим образом:
- /mybpm/consumers-api/
- KafkaToWebsocket.workerCount
- kafka__to__websocket = 1
- kafkaToWebSocketBo = 1
- kafkaToWebSocketNotification = 1
- kafkaToWebsocketTelephony = 1
- kafkaToWebsocketGeoMap = 1
- UserNotification.workerCount
- chatMessageToUserNotification = 1
- eventToUserNotification = 1
- KafkaToWebsocket.workerCount
Если будут остальные в этой директории и в поддиректориях, то их нужно поставить в 0.
cons1 - выделить для миграции данных, для этого нужно активировать следующие настройки:
- /mybpm/consumers-cons1/
- InMigration.workerCount
- in_migration = 1
- InMigrationFromKafka.workerCount
- in_migration = 1
- InMigrationRunProcess.workerCount
- in_migration_run_process = 1
- OutMigration.workerCount
- out_migration = 1
- OutMigrationFromKafka.workerCount
- comeFromKafka = 1
- OperativeReport.workerCount
- report_materialized_view_create = 1
- OperativeReportStandTables.workerCount
- operative_report_bo = 1
- report_materialized_view_co = 1
- operative_report_migration = 1
- ORRef.workerCount
- operative_report_radio_button_ref = 1
- InMigration.workerCount
Остальные в этой директории нужно поставить в 0.
cons2 - нужно выделить для запуска процессов, для этого нужно активировать следующие настройки:
- /mybpm/consumers-cons2/
- run_process/RunProcess.workerCount
- runProcess = 1
- runProcessManualSave = 1
- runProcessInMigration = 1
- run_process/RunProcess.workerCount
Остальные в этой директории нужно поставить в 0.
cons3 - нужно выделить для обработки остальных кафка-получателей. Нужно пройтись по директории
- /mybpm/consumers-cons3/
И во всех перечисленных выше файлах, но лежащих в этой директории, нужно поставить 0. А остальные поставить в 1.
Другие файлы конфигурации кафка-получателей
Кроме файлов с расширением .workerCount
есть ещё файлы с именем params.config
- они служат для настройки других
параметров кафка-получателей. Этот файл действует на кафка-получателей, которые находятся в той же директории - на
другие под-директории и над-директории он не действует.
Например, файл:
/mybpm/consumers-cons3/params.config
Действует на кафка-получатель:
/mybpm/consumers-cons3/OutMigrationFromKafka.workerCount
И не действует на кафка-получатель:
/mybpm/consumers-cons2/run_process/RunProcess.workerCount
В то же время на этого получателя действует файл:
/mybpm/consumers-cons2/run_process/params.config
Вот пример файла params.config
:
con.auto.commit.interval.ms = 1000
con.fetch.min.bytes = 1
con.max.partition.fetch.bytes = 141943040
con.connections.max.idle.ms = 540000
con.default.api.timeout.ms = 60000
con.fetch.max.bytes = 52428800
con.retry.backoff.ms = 2000
con.session.timeout.ms = 45000
con.heartbeat.interval.ms = 5000
con.max.poll.interval.ms = 3000000
con.max.poll.records = 500
con.receive.buffer.bytes = 65536
con.request.timeout.ms = 30000
con.send.buffer.bytes = 131072
con.fetch.max.wait.ms = 500
out.worker.count = 1
out.poll.duration.ms = 800
В нём параметр: out.worker.count
- обозначает количество потоков по умолчанию.
А параметр: out.poll.duration.ms
- обозначает количество миллисекунд, которые тратятся на ожидание данных с брокера
в реализации кафка-получателя на платформе. Значение 800 самое подходящее - вам его никогда, скорее всего,
менять не придётся.
Параметры, которые начинаются с префикса: con.
можно прочитать в документации к Apache Kafka, только этот префикс
нужно убрать, а именно, например нам нужно узнать что значить параметр:
con.retry.backoff.ms = 2000
Убираем префикс con.
и получаем параметр:
retry.backoff.ms = 2000
Теперь в документации к Apache Kafka ищем что обозначает этот параметр и находим следующее описание:
https://kafka.apache.org/documentation/#producerconfigs_retry.backoff.ms
Так же можно добавить и другие параметры, которых здесь нет, но, при этом, нужно не забыть добавить префикс con.
.
Применение изменений файлов конфигурации кафка-получателей
Параметры применяются "на горячую" - это обозначает, что система постоянно отслеживает изменения этих файлов. И как только это произошло, она тут же их применяет - лаг составляет не больше пары секунд.
Настройка масштабирования кафка получателей
Давайте разберёмся в масштабировании с одним кафка получателем.
Пусть имеется запущенный сервер компонента с помощью yaml-файла, который приведён здесь: Установка компонента MyBPM Cons1
И пусть мы настраиваем входящую миграцию через кафку. Для неё предусмотрен параметр:
- /mybpm/consumers-cons1/
- InMigration.workerCount
- in_migration = 1
- InMigration.workerCount
Из пути конфига consumers-cons1 мы видим, что с ним работает сервер, у которого определена переменная окружения:
MYBPM_CONSUMER_DIR=cons1
В yaml-файле, фрагмент которого изображён ниже.
Фрагмент файла: 30-mybpm-cons1.yaml
env:
- { name: MYBPM_JAVA_OPTS, value: "-Xmx2Gi -Xms2Gi" }
- { name: MYBPM_ALL_CONSUMERS_OFF, value: "true" }
- { name: MYBPM_CONSUMER_DIR, value: "cons1" } # <------------------- ВОТ
- { name: MYBPM_COMPANY_CODE, value: "greetgo" }
Так же в этом yaml-файле есть параметр replicas:
Фрагмент файла: 30-mybpm-cons1.yaml
spec:
selector:
matchLabels:
app: mybpm-cons1
replicas: 1 # <-------------------- ВОТ
template:
metadata:
На данный момент in_migration = 1
и replicas: 1
- это означает, что запущен один сервер, и на этом сервере запущен
один поток, который обслуживает данный кафка получатель (входящая миграция данных).
И пусть теперь Ваша задача ускорить эту миграцию. Вначале вы увеличиваете количество потоков, обслуживающие данный получатель, установив:
in_migration = 16
Нужно поменять конфигурационный файл и сохранить его. Перезапускать сервер не нужно - система обнаружит изменения файла и автоматически запустит новые потоки. А кафка брокеры перестроятся и начнут в данном случае отдавать 16-ти потокам данного получателя.
При увеличении количества потоков, миграция должна ускориться.
Если увеличение количество потоков не приводит к ускорению миграции, например потому что упёрлись в производительность CPU, то нужно увеличить количество подов данного сервера. Для этого в yaml-файле увеличим параметр:
Фрагмент файла: 30-mybpm-cons1.yaml
replicas: 3
Применив этот файл, Kubernetes запустит три сервера. Каждый сервер прочитает конфигурационный параметр
in_migration = 16
и запустит 16 потоков на обслуживание данного получателя. Итого количество потоков,
которые обслуживают данный получатель, станет:
3 * 16 = 48
Сорок восемь потоков будут обслуживать данный получатель.
Соответственно скорость обработки данного получателя должна тоже увеличиться.
ВАЖНО: Кафка устроена так, что одна партиция не может обслуживаться несколькими потоками одновременно. Соответственно, если Вы настроили 50 потоков, а у кафка-топика всего 48 партиций, то получается, что два потока будут крутиться бездействия.
Если нужно увеличивать количество потоков, а партиций мало, то количество партиций тоже нужно увеличивать. Это можно сделать в кафке.
По умолчанию каждому топику даётся 48 партиций. Если этого оказалось недостаточно, то сделайте настройку по умолчанию на 480 топиков и перезапустите кафка брокеры. Изменить значение по умолчанию недостаточно, нужно ещё пройтись по всем кафка топиками и индивидуально у каждого поменять это количество.