ITナレッジ データエンジニアリング

Airflow初心者必見!主要なオペレーターの種類と使い分けについて解説(ver 2.0〜)

Airflow初心者必見!主要なオペレーターの種類と使い分けについて解説(ver 2.0〜)

 ※本ページはプロモーションが含まれています

悩む人
悩む人

Airflowのオペレーターっていろいろあるけど、どうやって使い分けるの?

オペレーターの種類と使い分けを分かりやすく解説します!

きい
きい

 

この記事を書いた人

  • エンジニア歴4年のフリーランスデータエンジニア
  • 高卒工場勤務からエンジニア転職
  • 3年目でフリーランスになり年収1000万↑達成
  • フルリモ歴2年、2児の育児中

 

おすすめの
エージェント
特徴詳しい解説は
コチラ👇
geechs job・大手企業との取引が多い
・リモート案件80%以上
/geechs_job
Midworks・クラウド会計ソフトfreeeの利用が無料
・マージンが比較的低い
/midworks
TECH STOCK・平均年収が935万円と高い
・フルリモート案件が72%以上
/techstock
PE-BANK・マージンが低く手取りが多い、福利厚生も充実
・地方の案件も豊富に取り扱っている
/pe-bank
techadapt・エージェント全員がエンジニア経験者
・確定申告時の税理士報酬負担制度あり
/techadapt

 

Airflowのオペレーターとは?

Apache Airflowでは、タスク(ジョブ)を定義するためにオペレーター(Operator)を使用します。

オペレーターは、異なる種類の処理を実行するためのテンプレートのようなものです。

例えば、「Pythonスクリプトを実行する」「SQLクエリを実行する」「クラウドストレージにファイルをアップロードする」などのタスクを、オペレーターを使って簡単に設定できます。

 

オペレーターの種類と使い分け(Airflow 2.0 以降)

Airflowには多くのオペレーターが用意されていますが、ここでは実務でよく使われるものを用途別に紹介します。

1. 実行系オペレーター(タスクの実行)

PythonOperator(Pythonスクリプトを実行)

Python
def my_python_task():
    print("Hello from PythonOperator")

python_task = PythonOperator(
    task_id='python_task',
    python_callable=my_python_task
)

 

BashOperator(シェルスクリプトを実行)

Python
bash_task = BashOperator(
    task_id='bash_task',
    bash_command='echo "Hello from BashOperator"'
)

 

2. データベース系オペレーター(SQLの実行)

PostgresOperator(PostgreSQLのクエリを実行)

Python
postgres_task = PostgresOperator(
    task_id='postgres_task',
    postgres_conn_id='postgres_default',
    sql='SELECT COUNT(*) FROM my_table;'
)postgres_task = PostgresOperator(
    task_id='postgres_task',
    postgres_conn_id='postgres_default',
    sql='SELECT COUNT(*) FROM my_table;'
)

 

MySqlOperator(MySQLのクエリを実行)

Python
mysql_task = MySqlOperator(
    task_id='mysql_task',
    mysql_conn_id='mysql_default',
    sql='SELECT * FROM users;'
)

 

3. クラウド連携系オペレーター(AWS・GCPとの連携)

BigQueryOperator(Google BigQueryと連携)

Python
bq_task = BigQueryOperator(
    task_id='bq_task',
    sql='SELECT * FROM `my_project.my_dataset.my_table`',
    use_legacy_sql=False
)

 

S3ToRedshiftOperator(AWS S3からRedshiftへデータ転送)

Python
s3_to_redshift_task = S3ToRedshiftOperator(
    task_id='s3_to_redshift',
    schema='public',
    table='my_table',
    s3_bucket='my-bucket',
    s3_key='data.csv',
    copy_options=['CSV'],
    aws_conn_id='aws_default',
    redshift_conn_id='redshift_default'
)

 

4. 条件分岐オペレーター(タスクの分岐)

BranchPythonOperator(タスクの条件分岐)

Python
def choose_branch():
    return 'task_a' if True else 'task_b'

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=choose_branch
)

 

5. 通知系オペレーター(アラート・通知)

EmailOperator(メール通知を送信)

Python
email_task = EmailOperator(
    task_id='send_email',
    to='user@example.com',
    subject='Airflow Task Complete',
    html_content='The task has been completed successfully.'
)

 

6. フロー制御系オペレーター(DAGの流れを制御)

DummyOperator(何もしないタスク)

Python
dummy_task = DummyOperator(
    task_id='dummy_task'
)

 

まとめ

オペレーターの選び方ポイント

  • Pythonを使うなら → PythonOperator
  • シェルスクリプトを実行するなら → BashOperator
  • SQLクエリを実行するなら → PostgresOperator / MySqlOperator / BigQueryOperator
  • AWSとGCPのデータ連携なら → S3ToRedshiftOperator / BigQueryOperator
  • タスクの分岐をしたいなら → BranchPythonOperator
  • DAGの制御ポイントを作るなら → DummyOperator
  • タスク完了時に通知を送るなら → EmailOperator

適切なオペレーターを活用して、Airflowのワークフローを最適化しましょう!

    きい(@kii_sfpy)

きい

エンジニア4年目のフリーランスデータエンジニア。

INTJ-A/5w4、最近はTypescriptが楽しい。

Python, Typescript, Salesforce, GCP, AWS, Tableau …etc

開発に関するご相談も、お気軽にご連絡ください! 

お問い合わせはコチラ

-ITナレッジ, データエンジニアリング
-,