IN Migration Process
In миграция работает с помощью кафки.
Состоит из 3 частей:
1) Забор данных из in таблиц и отправка в кафку (топик = IN_MIGRATION) 2) Получения данных из кафки и подготовка boi для отправки в другой топик кафки (BOI) 3) Дальше consumer-ы топика BOI делают свою часть.
Часть 1. Забор данных из in таблиц и отправка в кафку
- После того как при помощи DDL были созданы in таблицы.
- И после того как была настроена связка с базой где находится in таблицы.
- И после того как был настроен
InMigrationScheduler
или вызван контроллерMigrationController
.
InMigrationStarterImpl
начинает искать таблицу process_tracking, которая отвечает за реальный запуск миграции.
Таблица process_tracking - служит для контроля обмена данными между нашей системой и системой заказчика (которая заполняет in таблицы). То есть она состоит из 2 частей.
- Сторона заказчика создает новую запись в таблице и заполняет свои информационные поля. Как только они заканчивают заполнения всех in таблиц они дают нам сигнал об окончании.
- Наша сторона ждет появления этого сигнала, и как дожидается, дальше начинается миграция in. После того как мы закончили забор данных из in таблиц в кафку, мы даем сигнал об окончании забора данных из in таблиц. Сторона заказчика видя сигнал с нашей стороны обратно начинает свою часть. И так по кругу.
ВАЖНО! Нужно договорится со стороной заказчика чтоб они после получения сигнала от нас не очищали in таблицы сразу же. Эти in таблицы дальше используются во 2 части миграции. Необходимо увидеть время, когда количество lag-ов в кафке (топик = IN_MIGRATION, groupId = in_migration) уйдет в 0. Это время и есть время через которое необходимо чистить in таблицы для следующей загрузки.
Описание колонок в таблице process_tracking: 1) id - просто autoincrement id 2) oper_date - опер день, за который мы получаем данные 3) start_load_time - начало загрузки данных в in таблицы 4) end_load_time - конец загрузки данных в in таблицы 5) load_status - сигнал об окончании загрузки данных в in таблицы 6) start_ins_inkafka_time - начало забора данных из in таблиц 7) end_ins_inkafka_time - конец забора данных из in таблиц 8) ins_inkafka_status - сигнал об окончании забора данных из in таблиц и отправки в кафку 9) occupied_by_process - id миграции 10) migrate_cur_date - дата миграции 11) periodic_update_time - поле нужное для борьбы с постоянным рестартом api (замечался в проекте jusan collection) 12) inserted_at - дата и время создании записи
Как только наша система удовлетворена таблицей process_tracking мы переходим
к InMigrationImpl
и начинаем забирать данные с in таблиц и отправлять их в кафку.
Часть 2. Получения данных из кафки и подготовка boi для отправки в другой топик кафки (BOI)
Consumer in миграции принимает объекты
1. KafkaInMigration
2. KafkaInMigrationRefChecker
KafkaInMigration
- хранит данные одной записи в одной in таблице.
KafkaInMigrationRefChecker
- специальный объект, который нужен для дополнительной проверки связок между
инстанциями.
Но самый главный здесь это KafkaInMigration
. Ниже расписан каждый этап прохождения одной записи
через InMigrationKafkaRegisterImpl
:
1) Достаем все данные, которые были залиты в in таблицы одной записи. Тут имеется ввиду сама таблица этой записи и также все данные из связанных таблиц.
2) Если у этой записи нет никаких данных для мигрирования, то она скипается.
3) При помощи externalId
и boId
проверяем есть ли уже по ним какая-нибудь инстанция.
Если есть, то KafkaBoi.isCreate = false
иначе KafkaBoi.isCreate = true
4) Проверяем нет ли признака removed=true
в in таблице.
Если есть, то мы ставим kafkaBoi.actual(false)
и заканчиваем здесь проход по миграции для этой записи.
5) Заполняем в kafka.changes
новые данные. С начало простые поля, а дальше поля типа BO и CO.
Если мы ожидали какой-то объект в поля BO или CO а его еще нет в системе, то мы делаем insert
в
таблицу ожидания связок boi_reference
.
6) Делаем kafkaBoi.apply
если хоть одно поле этой записи изменилось в этой миграции,
иначе мы просто садим в монго inMigrationUpdatedAt=new Date()
чтоб не засорять кафку пустыми данными.
7) При ошибке MongoErrorDuplicateKey
находим инстанцию по который был конфликт и делаем
слияние данных с существующей и новой записью. Все новые данные обновляем в существующей инстанции
и делаем kafkaBoi.apply
. И также дальше работаем уже по существующей инстанции.
8) Если мы НЕ словили MongoErrorDuplicateKey
, то сохраняем свою boiId в BoiInMigrationRef
для того, чтоб далее
нас находили по externalId
и boId
9) В конце запись начинает искать себя в таблице boi_reference
чтоб узнать кто его ожидает.
Если есть такие инстанции, то мы добавляем себя к ним.
Часть 3. Дальше consumer-ы топика BOI делают свою часть.
Это уже отдельная от in миграции часть. Тут происходит добавление в elasticSearch и тд работы.
Дополнительные замечания.
Не нужно забывать что при мигрирования системных BO как:
Person
Department
PersonGroup
Нужно в обязательном порядке мигрировать хоть одно системное поле. Иначе не появится запись.
Пример:
При этом случае Person появится в mybpm_boi
, но в коллекции Person
будет пусто.
Что означает что этого Person не существует в системе.
ВАЖНО! Company не мигрирует через in миграцию.