본문 바로가기
Python/Airflow

[We-Co] Airflow의 Operator에 대해서 알아보자!!

by 위기의코딩맨 2024. 11. 8.
반응형

안녕하세요. 위기의 코딩맨입니다.

오늘은 Airflow의 Operator에 대해서 간단하게 알아보도록 하겠습니다.

 

Operator는 Airflow의 실행 환경을 설정하고, 개별 작업을 정의하여

DAG 작업 과정에서 중요한 역할을 맡고있습니다!

조금 더 자세히 들어가보도록 하겠습니다.

 

[ Operator ]

Operator란 간단하게 설명하면

Airflow의 DAG 작업을 정의하고 실행하는 단위입니다.

 

단순하게 Bash로 실행할수 있고, 

Python 함수, 데이터베이스의 쿼리, 

외부 통신 등 다양한 작업을 수행하도록 설계할 수 있습니다.

 

이러한 작업들이 모여서, 순차적, 병렬적으로 실행하며 

워크플로우가 형성이 됩니다.

주요 역할에 대해서 알아보도록 하겠습니다.

  • 실행 환경 설정 ( Bash, Python.. )
  • DAG의 작업단위 (흐름, 순서 정의)
  • 의존성 관리 및 분기 처리 ( BranchPythonOperator )
  • Error 관리 및 재시도
  • 외부 서비스 통신 ( S3FileTransformOperator, KubernetesPodOperator )
  • 모니터링 및 빠른 대응

 

다양한 Operator를 사용하여 각각의 역할을 지정하여

상황에 맞도록 사용할 수 있습니다.

주로 사용되는 Operator를 알아보도록 하겠습니다.

 

BashOperator : Bash 명령어를 입력하여 작업을 실행하는 오퍼레이터, 파일처리, 데이터수집 사용

더보기
with DAG(
    'WeCo_Operator',
    default_args=default_args,
    schedule_interval=None,
) as dag: 
    run_script = BashOperator(
        task_id='run_script',
        bash_command='echo Hello, Airflow!'
    )

run_script

PythonOperator : Python 함수를 사용하기 위해서 사용하는 오퍼레이터로, 데이터 처리와 변환에 주로 사용

더보기
def my_function():
    print("데이터 처리 중...")

default_args = {
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}


with DAG(
    dag_id='process_data_dag',
    default_args=default_args,
    schedule_interval=None,  
) as dag:
    process_data = PythonOperator(
        task_id='process_data',
        python_callable=my_function
    )

process_data

BranchPythonOperator : 특정 조건에 따라서 흐름을 분기시킬 사용되는 오퍼레이터, 하나의 작업에서 여러 개의 작업 ,

하나를 선택 가능

더보기
def choose_branch():
    return "branch_a" if random.choice([True, False]) else "branch_b"

default_args = {
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}

with DAG(
    dag_id='branch_dag',
    default_args=default_args,
    schedule_interval=None, ) as dag:
    
    branch_task = BranchPythonOperator(
        task_id='branch_task',
        python_callable=choose_branch
    )

    branch_a = DummyOperator(task_id='branch_a')
    branch_b = DummyOperator(task_id='branch_b')
    
    join_task = DummyOperator(task_id='join', trigger_rule='none_failed_min_one_success')

    branch_task >> [branch_a, branch_b] >> join_task

DummyOperator : 별도의 작업을 수행하지 않고, DAG 특정 위치를 표시하거나, 시작과 끝을 정의할 사용,

작업의 단계를 구분할때 주로 사용

더보기
def process_data():
    print("데이터를 처리 중입니다...")

default_args = {
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}

with DAG(
    dag_id='dummy_operator_dag',
    default_args=default_args,
    schedule_interval=None,  
) as dag:
    
    start = DummyOperator(task_id='start')

    process_data_task = PythonOperator(
        task_id='process_data_task',
        python_callable=process_data
    )

    end = DummyOperator(task_id='end') 
    start >> process_data_task >> end

 

EmailOperator : 작업의 성공여부, 상태를 설정한 이메일로 알려주는 오퍼레이터

더보기
def process_data():
    print("데이터를 처리 중입니다...")

default_args = {
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}

with DAG(
    dag_id='email_operator_dag',
    default_args=default_args,
    schedule_interval=None,  
) as dag:
    
    process_data_task = PythonOperator(
        task_id='process_data_task',
        python_callable=process_data
    )

        send_email = EmailOperator(
        task_id='send_email',
        to='example@example.com',   # 이메일
        subject='Airflow Task Completed',   # 제목
        html_content="<h3>데이터 처리가 완료되었습니다.</h3><p>프로세스가 성공적으로 종료되었습니다.</p>"  # 이메일 내용
    )
    process_data_task >> send_email

MySqlOperator, BigQueryOperator, PostgresOperator : 특정 데이터베이스에 쿼리를 실행하는 오퍼레이터, ETL 자주 사용

사용 코드는 한번 찾아보시길 바랍니다~! 

S3FileTransformOperator : S3에서 파일을 읽고 변환, 저장 작업을 수행

SensorOperators : 특정 조건이 충족될때까지 기다리는 오퍼레이터

DockerOperator : Docker 컨테이너에서 작업을 실행, 독립된 환경에서 작업을 수행

 

이 외에도 다양한 오퍼레이터가 제공되고 있으니 확인 해보셔도 재밌을 것 같습니다.

오늘은 Airflow의 Operator에 대하여 간단하게 알아보았습니다.

공부 전에는 뭐가 뭔지 몰랐지만…

Operator를 사용하여 다양한 워크플로우를 구성하고 실행하는 것에 대해서 알게되었으며,

특정 목적에 맞게 사용하는 것이 중요하구나~ 생각이 들었습니다.

반응형