java.lang.RuntimeException: wXb6Vnl31u :: Ошибка для HTML= 001 003 004 005 006 007 008 009
010

Настройка Kafka Consumer-ов 011 (кафка-получателей)

012 013

Программе сервера можно указать переменную окружения 014 MYBPM_CONSUMER_DIR например так:

015 016
017  MYBPM_CONSUMER_DIR=cons3
018  
019  
020  
021  
022 023

Тогда в среде конфигурирования будет создана директория:

024 025
026  /mybpm/consumers-cons3/
027  
028  
029  
030  
031 032

И в этой директории будут созданы файлы конфигурации для всех 033 кафка-получателей, которые активированы на данном сервере.

034 035

Далее эта директория будет называться директорией конфигурации 036 кафка-получателей.

037 038

Активацией тех или иных кафка-получателей занимаются переменные 039 окружения:

040 041
042  MYBPM_ALL_CONSUMERS_OFF, MYBPM_ALL_KAFKA_NOTIFICATIONS_OFF
043  
044  
045  
046  
047 048

Подробнее про них смотрите в разделе Переменные окружения.

050 051

Если переменная окружения MYBPM_CONSUMER_DIR не 052 задана, то директорией конфигурации кафка-получателей будет 053 директория:

054 055
056  /mybpm/consumers/
057  
058  
059  
060  
061 062

Структура и значения конфигурационных файлов в 063 директории конфигураций кафка-получателей

064 065

Откроем например файл:

066 067
068  BoiEvents.workerCount
069  
070  
071  
072  
073 074

В нём будет такое содержимое:

075 076
077  # Генерирует события инстанций БО по их изменениям и кидает их в кафку
078  boiKafkaToBoiEventKafka = 1
079  
080  # Парсит события инстанций БО для отображения их на клиенте
081  applyBoiEventKafkaToMongo = 1
082  
083  
084  
085  
086 087

(некоторые строки не показаны).

088 089

В этом файле строки, которые начинаются с решётки (#) являются 090 комментарием и компьютер из игнорирует. Они нужны для описания 091 человеку.

092 093

Параметр:

094 095
096  boiKafkaToBoiEventKafka = 1
097  
098  
099  
100  
101 102

Обозначает, что нужно запустить один поток на каждом сервере, 103 который будет обслуживать данный кафка-продюсер. Что это за 104 продюсер описано в комментарии.

105 106

Если единицу поменять на тройку:

107 108
109  boiKafkaToBoiEventKafka = 3
110  
111  
112  
113  
114 115

То это обозначает, что на одном сервере для данного 116 кафка-получателя запуститься три потока. Соответственно, если у нас 117 два сервера, то обслуживать данный получатель будут шесть 118 потоков.

119 120

Данный кафка-получатель boiKafkaToBoiEventKafka 121 считывает данные с кафка-топика BOI. Если у этого 122 топика имеется 48 партиций, то эти партиции поровну поделятся между 123 всеми шести потоками, т.е. 48/6 = 8 патриций будет обслуживаться 124 каждым потоком.

125 126

За соотношением количества партиций и общим количеством потоков 127 нужно следить и не допускать, что количество потоков будет 128 превышено количество партиций. Если это будет превышено, то 129 некоторые потоки будут крутиться вхолостую, не обрабатывая данные, 130 но занимая ресурсы процессора и ОЗУ.

131 132

Если случилось так, что количество потоков нужно увеличивать, а 133 партиций уже не хватает, то нужно у кафка-топика увеличивать 134 количество партиций.

135 136

Распределение обязанностей между несколькими 137 серверами MyBPM

138 139

Лучшей практикой считается распределение кафка-получателей между 140 серверами MyBPM. По умолчанию сервер MyBPM обслуживает все 141 кафка-получатели. Это может привести к повышенной нагрузке на один 142 сервер, и её нужно распределить между несколькими серверами, для 143 этого нужно уметь настраивать потоки кафка-получателей по-разному 144 на разных серверах. Для этого предназначена переменная окружения 145 MYBPM_CONSUMER_DIR, которая может задать 146 индивидуальное расположение конфигурации кафка-получателей. Для 147 этого нужно настроить распределение обязанностей между несколькими 148 серверами MyBPM.

149 150

При высокой нагрузке рекомендуется иметь четыре сервера MyBPM, 151 которые будут обслуживать разные обязанности. Давайте придумает им 152 обозначения в соответствии с их обязанностями. А именно:

153 154 191 192

Здесь api обслуживает запросы пользователей, 193 поэтому на ней отключены кафка-получатели вообще с помощью 194 переменной окружения: MYBPM_ALL_CONSUMERS_OFF = 195 true.

196 197

А cons1, cons2, cons3 - 198 обслуживают только кафка-получатели, а запросы пользователей не 199 обслуживают. Поэтому на них отключены напоминания с помощью 200 переменной окружения MYBPM_ALL_KAFKA_NOTIFICATIONS_OFF = 201 true. Также средствами Kubernetes нужно запретить обращаться 202 к этим серверам по сервисам, иначе предоставляемый функционал будет 203 неполноценным.

204 205

В результате данных настроек будут созданы следующие директории 206 в конфигурационной среде:

207 208 217 218

С одинаковыми иерархиями файлов конфигураций кафка-получателей, 219 в которых всем получателям будет прописан один поток. Ну а в 220 результате каждый получатель будет обслуживать три потока из 221 серверов: cons1, cons2, cons3.

222 223

Для api тоже будет создана конфигурации, но там будут только 224 оповещения. Их можно оставить без изменений. Оповещения обслуживают 225 пользователей, работающие с платформой через браузер.

226 227

И теперь нужно пройтись по всем этим конфигурационным файлам и 228 включить и выключить те или иные потоки.

229 230

Рекомендуется сделать следующие настройки (тут указываются 231 настройки только в файлах с расширением .workerCount, остальные 232 конфигурационные файлы будут расписаны ниже):

233 234

api - на нём нужно настроить оповещения, следующим образом:

235 236 266 267

Если будут остальные в этой директории и в поддиректориях, то их 268 нужно поставить в 0.

269 270

cons1 - выделить для миграции данных, для этого нужно 271 активировать следующие настройки:

272 273 339 340

Остальные в этой директории нужно поставить в 0.

341 342

cons2 - нужно выделить для запуска процессов, для этого нужно 343 активировать следующие настройки:

344 345 362 363

Остальные в этой директории нужно поставить в 0.

364 365

cons3 - нужно выделить для обработки остальных 366 кафка-получателей. Нужно пройтись по директории

367 368 371 372

И во всех перечисленных выше файлах, но лежащих в этой 373 директории, нужно поставить 0. А остальные поставить в 1.

374 375

Другие файлы конфигурации кафка-получателей

376 377

Кроме файлов с расширением .workerCount есть ещё 378 файлы с именем params.config - они служат для 379 настройки других параметров кафка-получателей. Этот файл действует 380 на кафка-получателей, которые находятся в той же директории - на 381 другие под-директории и над-директории он не действует.

382 383

Например, файл:

384 385
386  /mybpm/consumers-cons3/params.config
387  
388  
389  
390  
391 392

Действует на кафка-получатель:

393 394
395  /mybpm/consumers-cons3/OutMigrationFromKafka.workerCount
396  
397  
398  
399  
400 401

И не действует на кафка-получатель:

402 403
404  /mybpm/consumers-cons2/run_process/RunProcess.workerCount
405  
406  
407  
408  
409 410

В то же время на этого получателя действует файл:

411 412
413  /mybpm/consumers-cons2/run_process/params.config
414  
415  
416  
417  
418 419

Вот пример файла params.config:

420 421
422  con.auto.commit.interval.ms = 1000
423  con.fetch.min.bytes = 1
424  con.max.partition.fetch.bytes = 141943040
425  con.connections.max.idle.ms = 540000
426  con.default.api.timeout.ms = 60000
427  con.fetch.max.bytes = 52428800
428  con.retry.backoff.ms = 2000
429  con.session.timeout.ms = 45000
430  con.heartbeat.interval.ms = 5000
431  con.max.poll.interval.ms = 3000000
432  con.max.poll.records = 500
433  con.receive.buffer.bytes = 65536
434  con.request.timeout.ms = 30000
435  con.send.buffer.bytes = 131072
436  con.fetch.max.wait.ms = 500
437  out.worker.count = 1
438  out.poll.duration.ms = 800
439  
440  
441  
442  
443 444

В нём параметр: out.worker.count - обозначает 445 количество потоков по умолчанию. А параметр: 446 out.poll.duration.ms - обозначает количество 447 миллисекунд, которые тратятся на ожидание данных с брокера в 448 реализации кафка-получателя на платформе. Значение 800 самое 449 подходящее - вам его никогда, скорее всего, менять не придётся.

450 451

Параметры, которые начинаются с префикса: con. 452 можно прочитать в документации к Apache Kafka, только этот префикс 453 нужно убрать, а именно, например нам нужно узнать что значить 454 параметр:

455 456
457  con.retry.backoff.ms = 2000
458  
459  
460  
461  
462 463

Убираем префикс con. и получаем параметр:

464 465
466  retry.backoff.ms = 2000
467  
468  
469  
470  
471 472

Теперь в документации к Apache Kafka ищем что обозначает этот 473 параметр и находим следующее описание:

474 475

477 https://kafka.apache.org/documentation/#producerconfigs_retry.backoff.ms

478 479

Так же можно добавить и другие параметры, которых здесь нет, но, 480 при этом, нужно не забыть добавить префикс con..

481 482

Применение изменений файлов конфигурации 483 кафка-получателей

484 485

Параметры применяются "на горячую" - это обозначает, что система 486 постоянно отслеживает изменения этих файлов. И как только это 487 произошло, она тут же их применяет - лаг составляет не больше пары 488 секунд.

489 490

Настройка масштабирования кафка получателей

491 492

Давайте разберёмся в масштабировании с одним кафка 493 получателем.

494 495

Пусть имеется запущенный сервер компонента с помощью yaml-файла, 496 который приведён здесь: Установка компонента 498 MyBPM Cons1

499 500

И пусть мы настраиваем входящую миграцию через кафку. Для неё 501 предусмотрен параметр:

502 503 516 517

Из пути конфига consumers-cons1 мы видим, что с 518 ним работает сервер, у которого определена переменная 519 окружения:

520 521
522  MYBPM_CONSUMER_DIR=cons1
523  
524  
525  
526  
527 528

В yaml-файле, фрагмент которого изображён ниже.

529 530

Фрагмент файла: 30-mybpm-cons1.yaml

531 532
533           env:
534              - { name: MYBPM_JAVA_OPTS, value: "-Xmx2Gi -Xms2Gi" }
535              - { name: MYBPM_ALL_CONSUMERS_OFF, value: "true" }
536              - { name: MYBPM_CONSUMER_DIR, value: "cons1" }  # <------------------- ВОТ
537              - { name: MYBPM_COMPANY_CODE, value: "greetgo" }
538  
539  
540 541

Так же в этом yaml-файле есть параметр replicas:

542 543

Фрагмент файла: 30-mybpm-cons1.yaml

544 545
546  spec:
547    selector:
548      matchLabels:
549        app: mybpm-cons1
550    replicas: 1  # <-------------------- ВОТ
551    template:
552      metadata:
553  
554  
555 556

На данный момент in_migration = 1 и replicas: 557 1 - это означает, что запущен один сервер, и на этом сервере 558 запущен один поток, который обслуживает данный кафка получатель 559 (входящая миграция данных).

560 561

И пусть теперь Ваша задача ускорить эту миграцию. Вначале вы 562 увеличиваете количество потоков, обслуживающие данный получатель, 563 установив:

564 565
566  in_migration = 16
567  
568  
569  
570  
571 572

Нужно поменять конфигурационный файл и сохранить его. 573 Перезапускать сервер не нужно - система обнаружит изменения файла и 574 автоматически запустит новые потоки. А кафка брокеры перестроятся и 575 начнут в данном случае отдавать 16-ти потокам данного 576 получателя.

577 578

При увеличении количества потоков, миграция должна 579 ускориться.

580 581

Если увеличение количество потоков не приводит к ускорению 582 миграции, например потому что упёрлись в производительность CPU, то 583 нужно увеличить количество подов данного сервера. Для этого в 584 yaml-файле увеличим параметр:

585 586

Фрагмент файла: 30-mybpm-cons1.yaml

587 588
589   replicas: 3
590  
591  
592 593

Применив этот файл, Kubernetes запустит три сервера. Каждый 594 сервер прочитает конфигурационный параметр in_migration = 595 16 и запустит 16 потоков на обслуживание данного получателя. 596 Итого количество потоков, которые обслуживают данный получатель, 597 станет:

598 599
600  3 * 16 = 48
601  
602  
603  
604  
605 606

Сорок восемь потоков будут обслуживать данный получатель.

607 608

Соответственно скорость обработки данного получателя должна тоже 609 увеличиться.

610 611

ВАЖНО: Кафка устроена так, что одна 612 партиция не может обслуживаться несколькими потоками одновременно. 613 Соответственно, если Вы настроили 50 потоков, а у кафка-топика 614 всего 48 партиций, то получается, что два потока будут крутиться 615 бездействия.

616 617

Если нужно увеличивать количество потоков, а партиций мало, то 618 количество партиций тоже нужно увеличивать. Это можно сделать в 619 кафке.

620 621

По умолчанию каждому топику даётся 48 партиций. Если этого 622 оказалось недостаточно, то сделайте настройку по умолчанию на 480 623 топиков и перезапустите кафка брокеры. Изменить значение по 624 умолчанию недостаточно, нужно ещё пройтись по всем кафка топиками и 625 индивидуально у каждого поменять это количество.

626
627 628 at kz.greetgo.md_reader.util.MdUtil.xmlTextToDoc(MdUtil.java:80) at kz.greetgo.md_reader.core.MdConverter.prepareHtmlFileFrom(MdConverter.java:136) at kz.greetgo.md_reader.core.MdConverter.convert(MdConverter.java:208) at kz.greetgo.md_reader.controller.RenderController.downloadToc(RenderController.java:360) at kz.greetgo.md_reader.controller.RenderController.request(RenderController.java:108) at jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:207) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:152) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:884) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1081) at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:974) at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1011) at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:903) at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:564) at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:205) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) at kz.greetgo.md_reader.interceptors.TextReplaceFilter.doFilter(TextReplaceFilter.java:36) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:166) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:482) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:341) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:390) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:894) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.io.IOException: Server returned HTTP response code: 429 for URL: http://www.w3.org/TR/xhtml1/DTD/xhtml-lat1.ent at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:2000) at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1589) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLEntityManager.setupCurrentEntity(XMLEntityManager.java:677) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLEntityManager.startEntity(XMLEntityManager.java:1397) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLEntityManager.startEntity(XMLEntityManager.java:1333) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDTDScannerImpl.startPE(XMLDTDScannerImpl.java:732) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDTDScannerImpl.skipSeparator(XMLDTDScannerImpl.java:2101) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDTDScannerImpl.scanDecls(XMLDTDScannerImpl.java:2064) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDTDScannerImpl.scanDTDExternalSubset(XMLDTDScannerImpl.java:299) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl$DTDDriver.dispatch(XMLDocumentScannerImpl.java:1165) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl$DTDDriver.next(XMLDocumentScannerImpl.java:1040) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl$PrologDriver.next(XMLDocumentScannerImpl.java:943) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:605) at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanDocument(XMLDocumentFragmentScannerImpl.java:542) at java.xml/com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:889) at java.xml/com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:825) at java.xml/com.sun.org.apache.xerces.internal.parsers.XMLParser.parse(XMLParser.java:141) at java.xml/com.sun.org.apache.xerces.internal.parsers.DOMParser.parse(DOMParser.java:247) at java.xml/com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderImpl.parse(DocumentBuilderImpl.java:342) at java.xml/javax.xml.parsers.DocumentBuilder.parse(DocumentBuilder.java:122) at kz.greetgo.md_reader.util.MdUtil.xmlTextToDoc(MdUtil.java:71) ... 48 more