4.6. Миграции данных
Если требуется сделать какое-то изменение в базе данных, которая выполняет
- Изменение данных
создание каких-то строк, обновление или удаление, разовая загрузка данных из какого-то источника
- Изменение структуры данных
типа колонки с конвертацией данных в другой тип
- Выполнить любой произвольный код
при релизе какой-то функциональности
Нам требуется выполнить какой-то код и только один раз, мы можем воспользоваться миграцией данных.
Для этого нужно выполнить команду, которая создаст файл миграции, который нам нужно будет наполнить нашим кодом.
Есть 3 варианта создания файла с миграцией, от которых зависит когда именно будет выполнен код миграции
4.6.1. Создание мигации с выполнением перед DDL
dc exec manage ./manage.py syncdb_migration --exec_type pre_ddl
В этом случае, код буде выполнен перед тем, как система создаст все нужные таблицы и колонки в них после сравнения всех моделей с базой данных
4.6.2. Создание мигации с выполнением после DDL
dc exec manage ./manage.py syncdb_migration --exec_type post_ddl
В этом случае, код буде выполнен после того, как система создаст все нужные таблицы и колонки в них после сравнения всех моделей с базой данных
4.6.3. Создание мигации с выполнением в фоновом режиме
dc exec manage ./manage.py syncdb_migration --exec_type queue
В этом случае, в очередь будет выставлена задача и будет выполнена в очереди с названием default. Это может потребоваться при загрузке каких-то данных по интеграции, обработке большого объема данных, когда это не требуется сделать непосредственно перед запуском системы после релиза
4.6.4. Содержимое файла миграции общего вида
В сгененированном файле, вам необходимо наполнить кодом метод run В описании класса DataMigration отраженые некоторые, на наш взгляд, важные моменты, которые следует учитывать при этом. Подробнее чем это уже описано, повторять не имеет особого смысла.
Отдельно прошу заполнять описание аттрибута description осмысленным текстом, в котором содержится информация о номере задачи, по которой выполняется код и что он делает. Это очень полезно для записи в лог (этот параметр используется для логгирования действий, выполняемых миграциями)
from datetime import datetime
from sphere.lib.syncdb import DataMigrationBase
class DataMigration(DataMigrationBase):
"""
- Если применение миграции требует создания файла с отчетом (миграция удаляет или меняет данные),
кладите его в /app/media/syncdb_report
- Если для миграции нужны входные данные из файла,
кладите файл в syncdb_migrations/assets/
- Чтобы отправить в лог промежуточные шаги, используйте
self.emit_log(f"Обновлены {idx} строк из {count}")
- Перед операциями по удалению колонок, индексов, не забывайте проверять их существование в БД
self.has_table
self.has_index
...
Посмотрите в sphere/lib/syncdb.py класс DataMigrationBase
- Для выполнения SQL-запроса, используйте:
self.exec_sql('update test set col = null')
"""
description = "" # Заполните для правильного отображения в логе
exec_type = "post_ddl" # post_ddl или pre_ddl
def run(self, last_try_data=None):
if self.has_table("auth_user"):
self.exec_sql('update auth_user set last_name = null where id = 12')
4.6.5. Содержимое файла миграции для queue
В общем, все тоже самое, только в методе run дополнительно появляется входной именованный аргумент «last_try_data». Если выполнение миграции выбрасывает исключение, то через «retry_delay_sec» секунд будут выполняться повторные попытки в кол-ве «retry_count» и в повторные вызовы будет передаваться результат возврата метода get_last_try_data. Если попыток оказывается больше чем retry_count, то миграция остается в не выполненном состоянии. Необходимо реагировать на логи ошибок (об этом чуть ниже).
from datetime import datetime
from sphere.lib.syncdb import DataMigrationBase
class DataMigration(DataMigrationBase):
"""
- Если применение миграции требует создания файла с отчетом (миграция удаляет или меняет данные),
кладите его в /app/media/syncdb_report
- Если для миграции нужны входные данные из файла,
кладите файл в syncdb_migrations/assets/
- Чтобы отправить в лог промежуточные шаги, используйте
self.emit_log(f"Обновлены {idx} строк из {count}")
- Перед операциями по удалению колонок, индексов, не забывайте проверять их существование в БД
self.has_table
self.has_index
...
Посмотрите в sphere/lib/syncdb.py класс DataMigrationBase
- Для выполнения SQL-запроса, используйте:
self.exec_sql('update test set col = null')
"""
description = "Выполняем операцию обновления карточек по задаче 122345" # Заполните для правильного отображения в логе
exec_type = "queue" # post_ddl или pre_ddl
execute_time = None # Время запуска (опционально) в виде datetime(2023, 1, 15, 23, 0, 0)
retry_count = 4 # Кол-во попыток при ошибке
retry_delay_sec = 60 * 5 # Задержки в секундах между попытками (5 минут в данном случае)
def run(self, last_try_data=None):
"""
Код для выполнения
@last_try_data: Данные из предыдущей (упавшей с ошибкой) попытки,
чтобы продолжить с момента, на котором код упал
"""
large_query = Model.query.order_by(Model.id)
# Если были попытки предыдущие, продолжим с нужного id
if last_try_data:
large_query = large_query.filter(Model.id > last_try_data["last_id"])
for idx, obj in enumerate(large_query):
obj.do_action()
if idx % 5000 == 0:
self.db.session.commit()
self.last_id = obj.id
def get_last_try_data(self):
"""
Вернуть данные для следующей попытки в случае падения,
если retry_count > 1
чтобы продолжить обрабатывать данные с точки последнего успешного коммита порции, который прошел
"""
last_id = getattr(self, "last_id", None)
if last_id:
return {"last_id": last_id}
4.6.6. Логгирование
Все миграции логгируются в 2 логгера - sphere.syncdb в который идет вся информация, в том числе, и промежуточная, которую правильно выводить при больших обновлениях - sphere.syncdb.summary в который идет только информация о финальном состоянии, например, о успешном выполнении миграций
Отслеживайте логи ошибок через систему аггрегации логов и автоматической обработки сообщений