
AirflowのDAGって何?どうやって定義するの?
DAGの基本概念と、効率的な設計パターンについて解説します!

この記事を書いた人

- エンジニア歴4年のフリーランスデータエンジニア
- 高卒工場勤務からエンジニア転職
- 3年目でフリーランスになり年収1000万↑達成
- フルリモ歴2年、2児の育児中
おすすめの エージェント | 特徴 | 詳しい解説は コチラ👇 |
---|---|---|
geechs job | ・大手企業との取引が多い ・リモート案件80%以上 | /geechs_job |
Midworks | ・クラウド会計ソフトfreeeの利用が無料 ・マージンが比較的低い | /midworks |
TECH STOCK | ・平均年収が935万円と高い ・フルリモート案件が72%以上 | /techstock |
PE-BANK | ・マージンが低く手取りが多い、福利厚生も充実 ・地方の案件も豊富に取り扱っている | /pe-bank |
techadapt | ・エージェント全員がエンジニア経験者 ・確定申告時の税理士報酬負担制度あり | /techadapt |
DAGとは?
DAG(Directed Acyclic Graph)は、Apache Airflowでワークフローを管理するための構造です。
DAGは、タスク(ジョブ)の流れを定義し、依存関係を管理するための仕組みです。 例えば、以下のようなデータ処理フローを管理できます。
- データ取得 → データ変換 → データ保存
- ETL処理 → データ検証 → レポート作成
DAGは「有向非巡回グラフ」という意味を持ち、循環(ループ)が発生しないように設計されます。
DAGの基本的な定義方法
基本的なDAGの定義(タスクの依存関係を含む)
この例では、データ取得(extract)→ データ変換(transform)→ データ保存(load) の順に処理を行うDAGを定義しています。
# DAGの定義(ワークフロー全体の管理)
with DAG(
'example_dag', # DAG名(Airflow UI上での識別子)
start_date=datetime(2023, 1, 1), # DAGの開始日時
schedule_interval='@daily', # スケジュール(毎日実行)
catchup=False # 過去分の実行をスキップ
) as dag:
# データを取得する関数
def extract_data():
print("データを取得しています...")
# データを変換する関数
def transform_data():
print("データを変換しています...")
# データを保存する関数
def load_data():
print("データを保存しています...")
# 各処理をPythonOperatorとして定義(Airflow内で実行されるタスク)
extract_task = PythonOperator(
task_id='extract_data', # タスクID(Airflow上での識別名)
python_callable=extract_data # 実行するPython関数
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data
)
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data
)
# タスクの依存関係を設定(実行順序を定義)
extract_task >> transform_task >> load_task
DAGの最適な設計パターン
1. DAGのモジュール化
DAGの処理内容をPythonファイルに分割することで、再利用性を向上し管理しやすくします。
def process_data():
print("Processing data")
def validate_data():
print("Validating data")
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from tasks import process_data, validate_data
with DAG('modular_dag', start_date=datetime(2023, 1, 1), schedule_interval='@daily', catchup=False) as dag:
task1 = PythonOperator(task_id='process_data', python_callable=process_data)
task2 = PythonOperator(task_id='validate_data', python_callable=validate_data)
task1 >> task2 # タスクの依存関係
2. 動的DAGの生成
類似パターンのタスクを繰り返し処理することで、複数のワークフローを一括作成します。
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
# DAGを動的に生成する関数
def generate_dag(dag_id):
with DAG(dag_id, start_date=datetime(2023, 1, 1), schedule_interval='@daily', catchup=False) as dag:
task = BashOperator(
task_id='echo_task',
bash_command=f'echo "Running {dag_id}"'
)
return dag
# 複数のDAGを自動生成(管理しやすくする)
for i in range(3):
dag_id = f'generated_dag_{i}'
globals()[dag_id] = generate_dag(dag_id)
3. Branching
取得したデータや処理の結果に応じて、タスクの条件分岐を行います。
from airflow.operators.python import BranchPythonOperator
# 条件分岐を決定する関数
def choose_branch():
return 'task_a' if True else 'task_b'
# 分岐処理を行うオペレーター
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch
)
4. イベント駆動DAG
センサーを活用し、設定した条件に応じてトリガーします。
from airflow.sensors.filesystem import FileSensor
# 指定されたファイルが存在するか監視するセンサー
file_sensor = FileSensor(
task_id='wait_for_file',
filepath='/path/to/file',
poke_interval=30, # 30秒ごとにチェック
timeout=600 # 10分間待機
)
まとめ
AirflowのDAGは、ワークフローの流れを定義し、スケジュール管理やタスク依存関係を設定する重要な仕組みです。
DAG設計のベストプラクティス
✅ DAGのモジュール化 → Pythonファイルを分割して再利用性を高める。
✅ 動的DAGの活用 → 複数のDAGを自動生成し、メンテナンスコストを削減。
✅ Branchingを活用 → 条件分岐を明確にし、無駄なタスク実行を減らす。
✅ センサーを活用 → イベントベースでDAGを実行し、効率的なワークフローを構築。
DAGの適切な設計によって、スケーラブルで効率的なデータパイプラインを構築しましょう!
リモート案件を探せる エージェント | 特徴 |
---|---|
geechs job | 90%以上がリモート案件。高単価・優良案件が多い。 |
Midworks | フルリモート案件だけに絞って検索可能。福利厚生が手厚い。 |
TECH STOCK | 72%がリモート案件。平均年収935万。 |
PE-BANK | マージンが低く、福利厚生が手厚い。地方の案件も豊富。 |
techadapt | 条件を満たせば確定申告時の税理士費用を負担してもらえる。 |