mybpm!
Скачать в формате: PDF pdf DOCX word

IN Migration Process

In миграция работает с помощью кафки.

Состоит из 3 частей:

1) Забор данных из in таблиц и отправка в кафку (топик = IN_MIGRATION) 2) Получения данных из кафки и подготовка boi для отправки в другой топик кафки (BOI) 3) Дальше consumer-ы топика BOI делают свою часть.

Часть 1. Забор данных из in таблиц и отправка в кафку

InMigrationStarterImpl начинает искать таблицу process_tracking, которая отвечает за реальный запуск миграции.

Таблица process_tracking - служит для контроля обмена данными между нашей системой и системой заказчика (которая заполняет in таблицы). То есть она состоит из 2 частей.

ВАЖНО! Нужно договорится со стороной заказчика чтоб они после получения сигнала от нас не очищали in таблицы сразу же. Эти in таблицы дальше используются во 2 части миграции. Необходимо увидеть время, когда количество lag-ов в кафке (топик = IN_MIGRATION, groupId = in_migration) уйдет в 0. Это время и есть время через которое необходимо чистить in таблицы для следующей загрузки.

Описание колонок в таблице process_tracking: 1) id - просто autoincrement id 2) oper_date - опер день, за который мы получаем данные 3) start_load_time - начало загрузки данных в in таблицы 4) end_load_time - конец загрузки данных в in таблицы 5) load_status - сигнал об окончании загрузки данных в in таблицы 6) start_ins_inkafka_time - начало забора данных из in таблиц 7) end_ins_inkafka_time - конец забора данных из in таблиц 8) ins_inkafka_status - сигнал об окончании забора данных из in таблиц и отправки в кафку 9) occupied_by_process - id миграции 10) migrate_cur_date - дата миграции 11) periodic_update_time - поле нужное для борьбы с постоянным рестартом api (замечался в проекте jusan collection) 12) inserted_at - дата и время создании записи

Как только наша система удовлетворена таблицей process_tracking мы переходим к InMigrationImpl и начинаем забирать данные с in таблиц и отправлять их в кафку.

Часть 2. Получения данных из кафки и подготовка boi для отправки в другой топик кафки (BOI)

Consumer in миграции принимает объекты

1. KafkaInMigration
2. KafkaInMigrationRefChecker

KafkaInMigration - хранит данные одной записи в одной in таблице.

KafkaInMigrationRefChecker - специальный объект, который нужен для дополнительной проверки связок между инстанциями.

Но самый главный здесь это KafkaInMigration. Ниже расписан каждый этап прохождения одной записи через InMigrationKafkaRegisterImpl:

1) Достаем все данные, которые были залиты в in таблицы одной записи. Тут имеется ввиду сама таблица этой записи и также все данные из связанных таблиц.

2) Если у этой записи нет никаких данных для мигрирования, то она скипается.

3) При помощи externalId и boId проверяем есть ли уже по ним какая-нибудь инстанция. Если есть, то KafkaBoi.isCreate = false иначе KafkaBoi.isCreate = true

4) Проверяем нет ли признака removed=true в in таблице. Если есть, то мы ставим kafkaBoi.actual(false) и заканчиваем здесь проход по миграции для этой записи.

5) Заполняем в kafka.changes новые данные. С начало простые поля, а дальше поля типа BO и CO. Если мы ожидали какой-то объект в поля BO или CO а его еще нет в системе, то мы делаем insert в таблицу ожидания связок boi_reference.

6) Делаем kafkaBoi.apply если хоть одно поле этой записи изменилось в этой миграции, иначе мы просто садим в монго inMigrationUpdatedAt=new Date() чтоб не засорять кафку пустыми данными.

7) При ошибке MongoErrorDuplicateKey находим инстанцию по который был конфликт и делаем слияние данных с существующей и новой записью. Все новые данные обновляем в существующей инстанции и делаем kafkaBoi.apply. И также дальше работаем уже по существующей инстанции.

8) Если мы НЕ словили MongoErrorDuplicateKey, то сохраняем свою boiId в BoiInMigrationRefдля того, чтоб далее нас находили по externalId и boId

9) В конце запись начинает искать себя в таблице boi_reference чтоб узнать кто его ожидает. Если есть такие инстанции, то мы добавляем себя к ним.

Часть 3. Дальше consumer-ы топика BOI делают свою часть.

Это уже отдельная от in миграции часть. Тут происходит добавление в elasticSearch и тд работы.

Дополнительные замечания.

Не нужно забывать что при мигрирования системных BO как:

Person
Department
PersonGroup

Нужно в обязательном порядке мигрировать хоть одно системное поле. Иначе не появится запись.

Пример:

При этом случае Person появится в mybpm_boi, но в коллекции Person будет пусто. Что означает что этого Person не существует в системе.

ВАЖНО! Company не мигрирует через in миграцию.