Как использовать модели SQLAlchemy в Apache Airflow DAG

Apache Airflow позволяет вам описывать задачи ETL как код на Python. Как результат, скучные конвейры данных превращаются в элегантный код, который приятно читать и отлаживать.

Основной режим использования Apache Airflow - массивные изменения баз данных. Для которых, безусловно, более уместны вручную оптимизированные SQL-запросы.

Но зачастую в ваших ETL DAG было бы удобно задействовать SQLAlchemy ORM для каких-то вспомогательных операций.

Интересно, что внутри Apache Airflow основана на SQLAlchemy.

Например, вот так вы можете получить из нее список соединения (Apache Airflow Connections) начинающиеся с my_prefix_:

from airflow import settings
from airflow.models import Connection

session = settings.Session()
try:
    conns: Iterable[Connection] = (
        session.query(Connection.conn_id)
        .filter(Connection.conn_id.ilike('my_prefix_%'))
        .all()
    )
    conn_ids = [conn.conn_id for conn in conns]
finally:
    session.commit()

То, что SQLAlchemy обычно не используется для Apache Airflow DAG - понятно и обоснованно. Дополнительные уровни абстракции, с которыми так удобно работать в SQLAlchemy, в конвейерах по перемалыванию данных были бы только ненужным замедлителем работы.

Если же вам нужны модели SQLAlchemy их легко использовать, получив SQLAlchemy соединение, например, из PostgresHook

hook = PostgresHook(postgres_conn_id=my_conn_id)
engine = hook.get_sqlalchemy_engine()
session = sessionmaker(bind=engine)()

Но во-1х неправильно захламлять код, делая это в каждой Apache Airflow Task. Во-2х, что бы у вас не было утечки соединений Postgres надо обязательно закрывать соединение после работы. Что означает обрамление вашего прикладного кода в try-finally и еще большее его захламление.

К счастью, в Apache Airflow вы с легкостью можете создавать свои операторы, где можно спрятать все эти не относящиеся к прикладному коду детали.

Вот пример SQLAlchemy Operator для Postgres. При необходимости можно с легкомтью сделать его универсальным - чтобы он работал с любым типом БД, а не только Postgres.

from airflow.operators.python_operator import PythonOperator
from airflow.utils.decorators import apply_defaults
from sqlalchemy.orm import sessionmaker, Session
from airflow.hooks.postgres_hook import PostgresHook


def get_session(conn_id: str) -> Session:
    hook = PostgresHook(postgres_conn_id=conn_id)
    engine = hook.get_sqlalchemy_engine()
    return sessionmaker(bind=engine)()


class SQLAlchemyOperator(PythonOperator):
    """
    PythonOperator with SQLAlchemy session management - creates session for the Python callable
    and commit/rollback it afterwards.

    Set `conn_id` with you DB connection.

    Pass `session` parameter to the python callable.
    """
    @apply_defaults
    def __init__(
            self,
            conn_id: str,
            *args, **kwargs):
        self.conn_id = conn_id
        super().__init__(*args, **kwargs)

    def execute_callable(self):
        session = get_session(self.conn_id)
        try:
            result = self.python_callable(*self.op_args, session=session, **self.op_kwargs)
        except Exception:
            session.rollback()
            raise
        session.commit()
        return result

Вот пример его использования:

dag = DAG(
    dag_id='SQAlchemyDAG',
    schedule_interval='0 2 1 * *',  # monthly at 2:00 AM, 1st day of a month
    start_date=datetime(2020, 8, 1),  
)


def sqlalchemy_task(session: Session, **kwargs):
    session.query(YourSQLAlchemyModel)


request_count = SQLAlchemyOperator(
    dag=dag,
    task_id='sqlalchemy',
    conn_id='my_db',
    python_callable=sqlalchemy_task,
    provide_context=True,
)