010
IN Migration Process
011
012
In миграция работает с помощью кафки.
013
014
Состоит из 3 частей:
015
016
1) Забор данных из in таблиц и отправка в кафку (топик =
017 IN_MIGRATION) 2) Получения данных из кафки и подготовка boi для
018 отправки в другой топик кафки (BOI) 3) Дальше consumer-ы топика BOI
019 делают свою часть.
020
021
Часть 1. Забор данных из in таблиц и отправка в
022 кафку
023
024
025 - После того как при помощи DDL были созданы in
026 таблицы.
027
028 - И после того как была настроена связка с базой где находится in
029 таблицы.
030
031 - И после того как был настроен
InMigrationScheduler
032 или вызван контроллер MigrationController
.
033
034
035
InMigrationStarterImpl
начинает искать таблицу
036 process_tracking, которая отвечает за реальный
037 запуск миграции.
038
039
Таблица process_tracking - служит для контроля
040 обмена данными между нашей системой и системой заказчика (которая
041 заполняет in таблицы). То есть она состоит из 2 частей.
042
043
044 - Сторона заказчика создает новую запись в таблице и заполняет
045 свои информационные поля. Как только они заканчивают заполнения
046 всех in таблиц они дают нам сигнал об окончании.
047
048 - Наша сторона ждет появления этого сигнала, и как дожидается,
049 дальше начинается миграция in. После того как мы закончили забор
050 данных из in таблиц в кафку, мы даем сигнал об окончании забора
051 данных из in таблиц. Сторона заказчика видя сигнал с нашей стороны
052 обратно начинает свою часть. И так по кругу.
053
054
055
ВАЖНО! Нужно договорится со стороной заказчика
056 чтоб они после получения сигнала от нас не очищали in таблицы сразу
057 же. Эти in таблицы дальше используются во 2 части
058 миграции. Необходимо увидеть время, когда количество
059 lag-ов в кафке (топик = IN_MIGRATION, groupId = in_migration) уйдет
060 в 0. Это время и есть время через которое необходимо чистить in
061 таблицы для следующей загрузки.
062
063
Описание колонок в таблице process_tracking: 1)
064 id - просто autoincrement id 2) oper_date - опер день, за который
065 мы получаем данные 3) start_load_time - начало загрузки данных в in
066 таблицы 4) end_load_time - конец загрузки данных в in таблицы 5)
067 load_status - сигнал об окончании загрузки данных
068 в in таблицы 6) start_ins_inkafka_time - начало забора данных из in
069 таблиц 7) end_ins_inkafka_time - конец забора данных из in таблиц
070 8) ins_inkafka_status - сигнал об окончании забора
071 данных из in таблиц и отправки в кафку 9) occupied_by_process - id
072 миграции 10) migrate_cur_date - дата миграции 11)
073 periodic_update_time - поле нужное для борьбы с постоянным
074 рестартом api (замечался в проекте jusan collection) 12)
075 inserted_at - дата и время создании записи
076
077
Как только наша система удовлетворена таблицей
078 process_tracking мы переходим к
079 InMigrationImpl
и начинаем забирать данные с in таблиц
080 и отправлять их в кафку.
081
082
Часть 2. Получения данных из кафки и
083 подготовка boi для отправки в другой топик кафки (BOI)
084
085
Consumer in миграции принимает объекты
086
087
088 1. KafkaInMigration
089 2. KafkaInMigrationRefChecker
090
091
092
093
094
095
096
KafkaInMigration
- хранит данные одной записи в
097 одной in таблице.
098
099
KafkaInMigrationRefChecker
- специальный объект,
100 который нужен для дополнительной проверки связок между
101 инстанциями.
102
103
Но самый главный здесь это KafkaInMigration
. Ниже
104 расписан каждый этап прохождения одной записи через
105 InMigrationKafkaRegisterImpl
:
106
107
1) Достаем все данные, которые были залиты в in таблицы одной
108 записи. Тут имеется ввиду сама таблица этой записи и также все
109 данные из связанных таблиц.
110
111
2) Если у этой записи нет никаких данных для мигрирования, то
112 она скипается.
113
114
3) При помощи externalId
и boId
115 проверяем есть ли уже по ним какая-нибудь инстанция. Если есть, то
116 KafkaBoi.isCreate = false
иначе
117 KafkaBoi.isCreate = true
118
119
4) Проверяем нет ли признака removed=true
в in
120 таблице. Если есть, то мы ставим
121 kafkaBoi.actual(false)
и заканчиваем здесь проход по
122 миграции для этой записи.
123
124
5) Заполняем в kafka.changes
новые данные. С начало
125 простые поля, а дальше поля типа BO и CO. Если мы ожидали какой-то
126 объект в поля BO или CO а его еще нет в системе, то мы делаем
127 insert
в таблицу ожидания связок
128 boi_reference
.
129
130
6) Делаем kafkaBoi.apply
если хоть одно поле этой
131 записи изменилось в этой миграции, иначе мы просто садим в монго
132 inMigrationUpdatedAt=new Date()
чтоб не засорять кафку
133 пустыми данными.
134
135
7) При ошибке MongoErrorDuplicateKey
находим
136 инстанцию по который был конфликт и делаем слияние данных с
137 существующей и новой записью. Все новые данные обновляем в
138 существующей инстанции и делаем kafkaBoi.apply
. И
139 также дальше работаем уже по существующей инстанции.
140
141
8) Если мы НЕ словили
142 MongoErrorDuplicateKey
, то сохраняем свою boiId в
143 BoiInMigrationRef
для того, чтоб далее нас находили по
144 externalId
и boId
145
146
9) В конце запись начинает искать себя в таблице
147 boi_reference
чтоб узнать кто его ожидает. Если есть
148 такие инстанции, то мы добавляем себя к ним.
149
150
Часть 3. Дальше consumer-ы топика BOI
151 делают свою часть.
152
153
Это уже отдельная от in миграции часть. Тут происходит
154 добавление в elasticSearch и тд работы.
155
156
Дополнительные замечания.
157
158
Не нужно забывать что при мигрирования системных BO как:
159
160
161 Person
162 Department
163 PersonGroup
164
165
166
167
168
169
Нужно в обязательном порядке мигрировать хоть одно системное
170 поле. Иначе не появится запись.
171
172
Пример:
173
174
При этом случае Person появится в mybpm_boi
, но в
175 коллекции Person
будет пусто. Что означает что этого
176 Person не существует в системе.
177
178
ВАЖНО! Company не мигрирует через in
179 миграцию.
180
181
182
at kz.greetgo.md_reader.util.MdUtil.xmlTextToDoc(MdUtil.java:80)
at kz.greetgo.md_reader.core.MdConverter.prepareHtmlFileFrom(MdConverter.java:136)
at kz.greetgo.md_reader.core.MdConverter.convert(MdConverter.java:208)
at kz.greetgo.md_reader.controller.RenderController.downloadToc(RenderController.java:360)
at kz.greetgo.md_reader.controller.RenderController.request(RenderController.java:108)
at jdk.internal.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:207)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:152)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:884)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1081)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:974)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1011)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:903)
at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:564)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885)
at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:205)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
at kz.greetgo.md_reader.interceptors.TextReplaceFilter.doFilter(TextReplaceFilter.java:36)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:174)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:149)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:166)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:482)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:115)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:341)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:390)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:894)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Server returned HTTP response code: 429 for URL: http://www.w3.org/TR/xhtml1/DTD/xhtml-lat1.ent
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:2000)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1589)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLEntityManager.setupCurrentEntity(XMLEntityManager.java:677)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLEntityManager.startEntity(XMLEntityManager.java:1397)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLEntityManager.startEntity(XMLEntityManager.java:1333)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDTDScannerImpl.startPE(XMLDTDScannerImpl.java:732)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDTDScannerImpl.skipSeparator(XMLDTDScannerImpl.java:2101)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDTDScannerImpl.scanDecls(XMLDTDScannerImpl.java:2064)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDTDScannerImpl.scanDTDExternalSubset(XMLDTDScannerImpl.java:299)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl$DTDDriver.dispatch(XMLDocumentScannerImpl.java:1165)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl$DTDDriver.next(XMLDocumentScannerImpl.java:1040)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl$PrologDriver.next(XMLDocumentScannerImpl.java:943)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:605)
at java.xml/com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanDocument(XMLDocumentFragmentScannerImpl.java:542)
at java.xml/com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:889)
at java.xml/com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:825)
at java.xml/com.sun.org.apache.xerces.internal.parsers.XMLParser.parse(XMLParser.java:141)
at java.xml/com.sun.org.apache.xerces.internal.parsers.DOMParser.parse(DOMParser.java:247)
at java.xml/com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderImpl.parse(DocumentBuilderImpl.java:342)
at java.xml/javax.xml.parsers.DocumentBuilder.parse(DocumentBuilder.java:122)
at kz.greetgo.md_reader.util.MdUtil.xmlTextToDoc(MdUtil.java:71)
... 48 more