Как использовать модели SQLAlchemy в Apache Airflow DAG
Apache Airflow позволяет вам описывать задачи ETL как код
на Python. Как результат, скучные конвейры данных превращаются в элегантный код,
который приятно читать и отлаживать.
Основной режим использования Apache Airflow - массивные изменения баз данных. Для которых,
безусловно, более уместны вручную оптимизированные SQL-запросы.
Но зачастую в ваших ETL DAG было бы удобно
задействовать SQLAlchemy ORM для каких-то вспомогательных операций.
Интересно, что внутри Apache Airflow основана на SQLAlchemy.
Например, вот так вы можете получить из нее список соединения (Apache Airflow Connections)
начинающиеся с my_prefix_:
То, что SQLAlchemy обычно не используется для Apache Airflow DAG - понятно и обоснованно.
Дополнительные уровни абстракции, с которыми так удобно работать в SQLAlchemy, в конвейерах по
перемалыванию данных были бы только ненужным замедлителем работы.
Если же вам нужны модели SQLAlchemy их легко использовать, получив SQLAlchemy соединение,
например, из
PostgresHook
Но во-1х неправильно захламлять код, делая это в каждой Apache Airflow Task. Во-2х, что бы у вас не
было утечки соединений Postgres надо обязательно закрывать соединение после работы. Что означает
обрамление вашего прикладного кода в try-finally и еще большее его захламление.
К счастью, в Apache Airflow вы с легкостью можете создавать свои операторы, где можно спрятать все
эти не относящиеся к прикладному коду детали.
Вот пример SQLAlchemy Operator для Postgres. При необходимости можно с легкомтью сделать его
универсальным - чтобы он работал с любым типом БД, а не только Postgres.