We-Co

[We-Co] Airflow의 Operator에 대해서 알아보자!! 본문

Python/Airflow

[We-Co] Airflow의 Operator에 대해서 알아보자!!

위기의코딩맨 2024. 11. 8. 13:15
반응형

안녕하세요. 위기의 코딩맨입니다.

오늘은 Airflow의 Operator에 대해서 간단하게 알아보도록 하겠습니다.

 

Operator는 Airflow의 실행 환경을 설정하고, 개별 작업을 정의하여

DAG 작업 과정에서 중요한 역할을 맡고있습니다!

조금 더 자세히 들어가보도록 하겠습니다.

 

[ Operator ]

Operator란 간단하게 설명하면

Airflow의 DAG 작업을 정의하고 실행하는 단위입니다.

 

단순하게 Bash로 실행할수 있고, 

Python 함수, 데이터베이스의 쿼리, 

외부 통신 등 다양한 작업을 수행하도록 설계할 수 있습니다.

 

이러한 작업들이 모여서, 순차적, 병렬적으로 실행하며 

워크플로우가 형성이 됩니다.

주요 역할에 대해서 알아보도록 하겠습니다.

  • 실행 환경 설정 ( Bash, Python.. )
  • DAG의 작업단위 (흐름, 순서 정의)
  • 의존성 관리 및 분기 처리 ( BranchPythonOperator )
  • Error 관리 및 재시도
  • 외부 서비스 통신 ( S3FileTransformOperator, KubernetesPodOperator )
  • 모니터링 및 빠른 대응

 

다양한 Operator를 사용하여 각각의 역할을 지정하여

상황에 맞도록 사용할 수 있습니다.

주로 사용되는 Operator를 알아보도록 하겠습니다.

 

BashOperator : Bash 명령어를 입력하여 작업을 실행하는 오퍼레이터, 파일처리, 데이터수집 사용

더보기
with DAG(
    'WeCo_Operator',
    default_args=default_args,
    schedule_interval=None,
) as dag: 
    run_script = BashOperator(
        task_id='run_script',
        bash_command='echo Hello, Airflow!'
    )

run_script

PythonOperator : Python 함수를 사용하기 위해서 사용하는 오퍼레이터로, 데이터 처리와 변환에 주로 사용

더보기
def my_function():
    print("데이터 처리 중...")

default_args = {
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}


with DAG(
    dag_id='process_data_dag',
    default_args=default_args,
    schedule_interval=None,  
) as dag:
    process_data = PythonOperator(
        task_id='process_data',
        python_callable=my_function
    )

process_data

BranchPythonOperator : 특정 조건에 따라서 흐름을 분기시킬 사용되는 오퍼레이터, 하나의 작업에서 여러 개의 작업 ,

하나를 선택 가능

더보기
def choose_branch():
    return "branch_a" if random.choice([True, False]) else "branch_b"

default_args = {
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}

with DAG(
    dag_id='branch_dag',
    default_args=default_args,
    schedule_interval=None, ) as dag:
    
    branch_task = BranchPythonOperator(
        task_id='branch_task',
        python_callable=choose_branch
    )

    branch_a = DummyOperator(task_id='branch_a')
    branch_b = DummyOperator(task_id='branch_b')
    
    join_task = DummyOperator(task_id='join', trigger_rule='none_failed_min_one_success')

    branch_task >> [branch_a, branch_b] >> join_task

DummyOperator : 별도의 작업을 수행하지 않고, DAG 특정 위치를 표시하거나, 시작과 끝을 정의할 사용,

작업의 단계를 구분할때 주로 사용

더보기
def process_data():
    print("데이터를 처리 중입니다...")

default_args = {
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}

with DAG(
    dag_id='dummy_operator_dag',
    default_args=default_args,
    schedule_interval=None,  
) as dag:
    
    start = DummyOperator(task_id='start')

    process_data_task = PythonOperator(
        task_id='process_data_task',
        python_callable=process_data
    )

    end = DummyOperator(task_id='end') 
    start >> process_data_task >> end

 

EmailOperator : 작업의 성공여부, 상태를 설정한 이메일로 알려주는 오퍼레이터

더보기
def process_data():
    print("데이터를 처리 중입니다...")

default_args = {
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}

with DAG(
    dag_id='email_operator_dag',
    default_args=default_args,
    schedule_interval=None,  
) as dag:
    
    process_data_task = PythonOperator(
        task_id='process_data_task',
        python_callable=process_data
    )

        send_email = EmailOperator(
        task_id='send_email',
        to='example@example.com',   # 이메일
        subject='Airflow Task Completed',   # 제목
        html_content="<h3>데이터 처리가 완료되었습니다.</h3><p>프로세스가 성공적으로 종료되었습니다.</p>"  # 이메일 내용
    )
    process_data_task >> send_email

MySqlOperator, BigQueryOperator, PostgresOperator : 특정 데이터베이스에 쿼리를 실행하는 오퍼레이터, ETL 자주 사용

사용 코드는 한번 찾아보시길 바랍니다~! 

S3FileTransformOperator : S3에서 파일을 읽고 변환, 저장 작업을 수행

SensorOperators : 특정 조건이 충족될때까지 기다리는 오퍼레이터

DockerOperator : Docker 컨테이너에서 작업을 실행, 독립된 환경에서 작업을 수행

 

이 외에도 다양한 오퍼레이터가 제공되고 있으니 확인 해보셔도 재밌을 것 같습니다.

오늘은 Airflow의 Operator에 대하여 간단하게 알아보았습니다.

공부 전에는 뭐가 뭔지 몰랐지만…

Operator를 사용하여 다양한 워크플로우를 구성하고 실행하는 것에 대해서 알게되었으며,

특정 목적에 맞게 사용하는 것이 중요하구나~ 생각이 들었습니다.

반응형