Как использовать модели SQLAlchemy в Apache Airflow DAG
Apache Airflow позволяет вам описывать задачи ETL как код на Python. Как результат, скучные конвейры данных превращаются в элегантный код, который приятно читать и отлаживать.
Основной режим использования Apache Airflow - массивные изменения баз данных. Для которых, безусловно, более уместны вручную оптимизированные SQL-запросы.
Но зачастую в ваших ETL DAG было бы удобно задействовать SQLAlchemy ORM для каких-то вспомогательных операций.
Интересно, что внутри Apache Airflow основана на SQLAlchemy.
Например, вот так вы можете получить из нее список соединения (Apache Airflow Connections)
начинающиеся с my_prefix_
:
from airflow import settings
from airflow.models import Connection
session = settings.Session()
try:
conns: Iterable[Connection] = (
session.query(Connection.conn_id)
.filter(Connection.conn_id.ilike('my_prefix_%'))
.all()
)
conn_ids = [conn.conn_id for conn in conns]
finally:
session.commit()
То, что SQLAlchemy обычно не используется для Apache Airflow DAG - понятно и обоснованно. Дополнительные уровни абстракции, с которыми так удобно работать в SQLAlchemy, в конвейерах по перемалыванию данных были бы только ненужным замедлителем работы.
Если же вам нужны модели SQLAlchemy их легко использовать, получив SQLAlchemy соединение, например, из PostgresHook
hook = PostgresHook(postgres_conn_id=my_conn_id)
engine = hook.get_sqlalchemy_engine()
session = sessionmaker(bind=engine)()
Но во-1х неправильно захламлять код, делая это в каждой Apache Airflow Task. Во-2х, что бы у вас не
было утечки соединений Postgres надо обязательно закрывать соединение после работы. Что означает
обрамление вашего прикладного кода в try-finally
и еще большее его захламление.
К счастью, в Apache Airflow вы с легкостью можете создавать свои операторы, где можно спрятать все эти не относящиеся к прикладному коду детали.
Вот пример SQLAlchemy Operator для Postgres. При необходимости можно с легкомтью сделать его универсальным - чтобы он работал с любым типом БД, а не только Postgres.
from airflow.operators.python_operator import PythonOperator
from airflow.utils.decorators import apply_defaults
from sqlalchemy.orm import sessionmaker, Session
from airflow.hooks.postgres_hook import PostgresHook
def get_session(conn_id: str) -> Session:
hook = PostgresHook(postgres_conn_id=conn_id)
engine = hook.get_sqlalchemy_engine()
return sessionmaker(bind=engine)()
class SQLAlchemyOperator(PythonOperator):
"""
PythonOperator with SQLAlchemy session management - creates session for the Python callable
and commit/rollback it afterwards.
Set `conn_id` with you DB connection.
Pass `session` parameter to the python callable.
"""
@apply_defaults
def __init__(
self,
conn_id: str,
*args, **kwargs):
self.conn_id = conn_id
super().__init__(*args, **kwargs)
def execute_callable(self):
session = get_session(self.conn_id)
try:
result = self.python_callable(*self.op_args, session=session, **self.op_kwargs)
except Exception:
session.rollback()
raise
session.commit()
return result
Вот пример его использования:
dag = DAG(
dag_id='SQAlchemyDAG',
schedule_interval='0 2 1 * *', # monthly at 2:00 AM, 1st day of a month
start_date=datetime(2020, 8, 1),
)
def sqlalchemy_task(session: Session, **kwargs):
session.query(YourSQLAlchemyModel)
request_count = SQLAlchemyOperator(
dag=dag,
task_id='sqlalchemy',
conn_id='my_db',
python_callable=sqlalchemy_task,
provide_context=True,
)