※本ページはプロモーションが含まれています

悩む人
Airflowのオペレーターっていろいろあるけど、どうやって使い分けるの?
オペレーターの種類と使い分けを分かりやすく解説します!

きい
この記事を書いた人

- エンジニア歴4年のフリーランスデータエンジニア
- 高卒工場勤務からエンジニア転職
- 3年目でフリーランスになり年収1000万↑達成
- フルリモ歴2年、2児の育児中
おすすめの エージェント | 特徴 | 詳しい解説は コチラ👇 |
---|---|---|
geechs job | ・大手企業との取引が多い ・リモート案件80%以上 | /geechs_job |
Midworks | ・クラウド会計ソフトfreeeの利用が無料 ・マージンが比較的低い | /midworks |
TECH STOCK | ・平均年収が935万円と高い ・フルリモート案件が72%以上 | /techstock |
PE-BANK | ・マージンが低く手取りが多い、福利厚生も充実 ・地方の案件も豊富に取り扱っている | /pe-bank |
techadapt | ・エージェント全員がエンジニア経験者 ・確定申告時の税理士報酬負担制度あり | /techadapt |
目次
Airflowのオペレーターとは?
Apache Airflowでは、タスク(ジョブ)を定義するためにオペレーター(Operator)を使用します。
オペレーターは、異なる種類の処理を実行するためのテンプレートのようなものです。
例えば、「Pythonスクリプトを実行する」「SQLクエリを実行する」「クラウドストレージにファイルをアップロードする」などのタスクを、オペレーターを使って簡単に設定できます。
オペレーターの種類と使い分け(Airflow 2.0 以降)
Airflowには多くのオペレーターが用意されていますが、ここでは実務でよく使われるものを用途別に紹介します。
1. 実行系オペレーター(タスクの実行)
PythonOperator(Pythonスクリプトを実行)
Python
def my_python_task():
print("Hello from PythonOperator")
python_task = PythonOperator(
task_id='python_task',
python_callable=my_python_task
)
BashOperator(シェルスクリプトを実行)
Python
bash_task = BashOperator(
task_id='bash_task',
bash_command='echo "Hello from BashOperator"'
)
2. データベース系オペレーター(SQLの実行)
PostgresOperator(PostgreSQLのクエリを実行)
Python
postgres_task = PostgresOperator(
task_id='postgres_task',
postgres_conn_id='postgres_default',
sql='SELECT COUNT(*) FROM my_table;'
)postgres_task = PostgresOperator(
task_id='postgres_task',
postgres_conn_id='postgres_default',
sql='SELECT COUNT(*) FROM my_table;'
)
MySqlOperator(MySQLのクエリを実行)
Python
mysql_task = MySqlOperator(
task_id='mysql_task',
mysql_conn_id='mysql_default',
sql='SELECT * FROM users;'
)
3. クラウド連携系オペレーター(AWS・GCPとの連携)
BigQueryOperator(Google BigQueryと連携)
Python
bq_task = BigQueryOperator(
task_id='bq_task',
sql='SELECT * FROM `my_project.my_dataset.my_table`',
use_legacy_sql=False
)
S3ToRedshiftOperator(AWS S3からRedshiftへデータ転送)
Python
s3_to_redshift_task = S3ToRedshiftOperator(
task_id='s3_to_redshift',
schema='public',
table='my_table',
s3_bucket='my-bucket',
s3_key='data.csv',
copy_options=['CSV'],
aws_conn_id='aws_default',
redshift_conn_id='redshift_default'
)
4. 条件分岐オペレーター(タスクの分岐)
BranchPythonOperator(タスクの条件分岐)
Python
def choose_branch():
return 'task_a' if True else 'task_b'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch
)
5. 通知系オペレーター(アラート・通知)
EmailOperator(メール通知を送信)
Python
email_task = EmailOperator(
task_id='send_email',
to='user@example.com',
subject='Airflow Task Complete',
html_content='The task has been completed successfully.'
)
6. フロー制御系オペレーター(DAGの流れを制御)
DummyOperator(何もしないタスク)
Python
dummy_task = DummyOperator(
task_id='dummy_task'
)
まとめ
オペレーターの選び方ポイント
- Pythonを使うなら → PythonOperator
- シェルスクリプトを実行するなら → BashOperator
- SQLクエリを実行するなら → PostgresOperator / MySqlOperator / BigQueryOperator
- AWSとGCPのデータ連携なら → S3ToRedshiftOperator / BigQueryOperator
- タスクの分岐をしたいなら → BranchPythonOperator
- DAGの制御ポイントを作るなら → DummyOperator
- タスク完了時に通知を送るなら → EmailOperator
適切なオペレーターを活用して、Airflowのワークフローを最適化しましょう!