ITナレッジ データエンジニアリング

AirflowのDAGとは?定義方法と最適な設計パターンについて解説!

AirflowのDAGとは?定義方法と最適な設計パターンについて解説!

 

悩む人
悩む人

AirflowのDAGって何?どうやって定義するの?

DAGの基本概念と、効率的な設計パターンについて解説します!

きい
きい

 

この記事を書いた人

  • エンジニア歴4年のフリーランスデータエンジニア
  • 高卒工場勤務からエンジニア転職
  • 3年目でフリーランスになり年収1000万↑達成
  • フルリモ歴2年、2児の育児中

 

おすすめの
エージェント
特徴詳しい解説は
コチラ👇
geechs job・大手企業との取引が多い
・リモート案件80%以上
/geechs_job
Midworks・クラウド会計ソフトfreeeの利用が無料
・マージンが比較的低い
/midworks
TECH STOCK・平均年収が935万円と高い
・フルリモート案件が72%以上
/techstock
PE-BANK・マージンが低く手取りが多い、福利厚生も充実
・地方の案件も豊富に取り扱っている
/pe-bank
techadapt・エージェント全員がエンジニア経験者
・確定申告時の税理士報酬負担制度あり
/techadapt

 

DAGとは?

DAG(Directed Acyclic Graph)は、Apache Airflowでワークフローを管理するための構造です。

DAGは、タスク(ジョブ)の流れを定義し、依存関係を管理するための仕組みです。 例えば、以下のようなデータ処理フローを管理できます。

  • データ取得データ変換データ保存
  • ETL処理データ検証レポート作成

DAGは「有向非巡回グラフ」という意味を持ち、循環(ループ)が発生しないように設計されます。

 

DAGの基本的な定義方法

基本的なDAGの定義(タスクの依存関係を含む)

この例では、データ取得(extract)→ データ変換(transform)→ データ保存(load) の順に処理を行うDAGを定義しています。

Python
# DAGの定義(ワークフロー全体の管理)
with DAG(
    'example_dag',  # DAG名(Airflow UI上での識別子)
    start_date=datetime(2023, 1, 1),  # DAGの開始日時
    schedule_interval='@daily',  # スケジュール(毎日実行)
    catchup=False  # 過去分の実行をスキップ
) as dag:
    
    # データを取得する関数
    def extract_data():
        print("データを取得しています...")
    
    # データを変換する関数
    def transform_data():
        print("データを変換しています...")
    
    # データを保存する関数
    def load_data():
        print("データを保存しています...")
    
    # 各処理をPythonOperatorとして定義(Airflow内で実行されるタスク)
    extract_task = PythonOperator(
        task_id='extract_data',  # タスクID(Airflow上での識別名)
        python_callable=extract_data  # 実行するPython関数
    )
    
    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data
    )
    
    load_task = PythonOperator(
        task_id='load_data',
        python_callable=load_data
    )
    
    # タスクの依存関係を設定(実行順序を定義)
    extract_task >> transform_task >> load_task

 

DAGの最適な設計パターン

1. DAGのモジュール化

DAGの処理内容をPythonファイルに分割することで、再利用性を向上し管理しやすくします。

tasks.py(DAGとは別のファイルに処理を定義)
def process_data():
    print("Processing data")

def validate_data():
    print("Validating data")
dag.py(DAG定義ファイル)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from tasks import process_data, validate_data

with DAG('modular_dag', start_date=datetime(2023, 1, 1), schedule_interval='@daily', catchup=False) as dag:
    task1 = PythonOperator(task_id='process_data', python_callable=process_data)
    task2 = PythonOperator(task_id='validate_data', python_callable=validate_data)

    task1 >> task2  # タスクの依存関係

 

2. 動的DAGの生成

類似パターンのタスクを繰り返し処理することで、複数のワークフローを一括作成します。

Python
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# DAGを動的に生成する関数
def generate_dag(dag_id):
    with DAG(dag_id, start_date=datetime(2023, 1, 1), schedule_interval='@daily', catchup=False) as dag:
        task = BashOperator(
            task_id='echo_task',
            bash_command=f'echo "Running {dag_id}"'
        )
        return dag

# 複数のDAGを自動生成(管理しやすくする)
for i in range(3):
    dag_id = f'generated_dag_{i}'
    globals()[dag_id] = generate_dag(dag_id)

 

3. Branching

取得したデータや処理の結果に応じて、タスクの条件分岐を行います。

Python
from airflow.operators.python import BranchPythonOperator

# 条件分岐を決定する関数
def choose_branch():
    return 'task_a' if True else 'task_b'

# 分岐処理を行うオペレーター
branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=choose_branch
)

 

4. イベント駆動DAG

センサーを活用し、設定した条件に応じてトリガーします。

Python
from airflow.sensors.filesystem import FileSensor

# 指定されたファイルが存在するか監視するセンサー
file_sensor = FileSensor(
    task_id='wait_for_file',
    filepath='/path/to/file',
    poke_interval=30,  # 30秒ごとにチェック
    timeout=600  # 10分間待機
)

 

まとめ

AirflowのDAGは、ワークフローの流れを定義し、スケジュール管理やタスク依存関係を設定する重要な仕組みです。

DAG設計のベストプラクティス

DAGのモジュール化 → Pythonファイルを分割して再利用性を高める。
動的DAGの活用 → 複数のDAGを自動生成し、メンテナンスコストを削減。
Branchingを活用 → 条件分岐を明確にし、無駄なタスク実行を減らす。
センサーを活用 → イベントベースでDAGを実行し、効率的なワークフローを構築。

DAGの適切な設計によって、スケーラブルで効率的なデータパイプラインを構築しましょう!

リモート案件を探せる
エージェント
特徴
geechs job90%以上がリモート案件。高単価・優良案件が多い。
Midworksフルリモート案件だけに絞って検索可能。福利厚生が手厚い。
TECH STOCK72%がリモート案件。平均年収935万
PE-BANKマージンが低く、福利厚生が手厚い。地方の案件も豊富。
techadapt条件を満たせば確定申告時の税理士費用を負担してもらえる。

    きい(@kii_sfpy)

きい

エンジニア4年目のフリーランスデータエンジニア。

INTJ-A/5w4、最近はTypescriptが楽しい。

Python, Typescript, Salesforce, GCP, AWS, Tableau …etc

開発に関するご相談も、お気軽にご連絡ください! 

お問い合わせはコチラ

-ITナレッジ, データエンジニアリング
-,