일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- Ai
- 부스트클래스
- tensorflow 예제
- kaggle
- Transformer
- 캐글
- 연남 맛집
- TensorFlow
- 서울 맛집
- pycharm
- NLP
- yolo
- 연남동 맛집
- 자연어
- AI Tech 준비과정
- r
- 부스트캠프 ai tech 준비과정
- AI tech
- Spark MLlib
- spark
- mllib
- AI 엔지니어 기초 다지기
- RDD
- Python
- 부스트캠프
- 데이터 시각화
- DataSet
- 맛집
- 위기의코딩맨
- 홍대 맛집
- Today
- Total
We-Co
[We-Co] Airflow의 Operator에 대해서 알아보자!! 본문
안녕하세요. 위기의 코딩맨입니다.
오늘은 Airflow의 Operator에 대해서 간단하게 알아보도록 하겠습니다.
Operator는 Airflow의 실행 환경을 설정하고, 개별 작업을 정의하여
DAG의 작업 과정에서 중요한 역할을 맡고있습니다!
조금 더 자세히 들어가보도록 하겠습니다.
[ Operator ]
Operator란 간단하게 설명하면
Airflow의 DAG 작업을 정의하고 실행하는 단위입니다.
단순하게 Bash로 실행할수 있고,
Python 함수, 데이터베이스의 쿼리,
외부 통신 등 다양한 작업을 수행하도록 설계할 수 있습니다.
이러한 작업들이 모여서, 순차적, 병렬적으로 실행하며
워크플로우가 형성이 됩니다.
주요 역할에 대해서 알아보도록 하겠습니다.
- 실행 환경 설정 ( Bash, Python.. )
- DAG의 작업단위 (흐름, 순서 정의)
- 의존성 관리 및 분기 처리 ( BranchPythonOperator )
- Error 관리 및 재시도
- 외부 서비스 통신 ( S3FileTransformOperator, KubernetesPodOperator )
- 모니터링 및 빠른 대응
다양한 Operator를 사용하여 각각의 역할을 지정하여
상황에 맞도록 사용할 수 있습니다.
주로 사용되는 Operator를 알아보도록 하겠습니다.
BashOperator : Bash 명령어를 입력하여 작업을 실행하는 오퍼레이터, 파일처리, 데이터수집 등 사용
with DAG(
'WeCo_Operator',
default_args=default_args,
schedule_interval=None,
) as dag:
run_script = BashOperator(
task_id='run_script',
bash_command='echo Hello, Airflow!'
)
run_script
PythonOperator : Python 함수를 사용하기 위해서 사용하는 오퍼레이터로, 데이터 처리와 변환에 주로 사용
def my_function():
print("데이터 처리 중...")
default_args = {
'start_date': datetime(2024, 1, 1),
'retries': 1,
}
with DAG(
dag_id='process_data_dag',
default_args=default_args,
schedule_interval=None,
) as dag:
process_data = PythonOperator(
task_id='process_data',
python_callable=my_function
)
process_data
BranchPythonOperator : 특정 조건에 따라서 흐름을 분기시킬 때 사용되는 오퍼레이터, 하나의 작업에서 여러 개의 작업 중,
하나를 선택 가능
def choose_branch():
return "branch_a" if random.choice([True, False]) else "branch_b"
default_args = {
'start_date': datetime(2024, 1, 1),
'retries': 1,
}
with DAG(
dag_id='branch_dag',
default_args=default_args,
schedule_interval=None, ) as dag:
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch
)
branch_a = DummyOperator(task_id='branch_a')
branch_b = DummyOperator(task_id='branch_b')
join_task = DummyOperator(task_id='join', trigger_rule='none_failed_min_one_success')
branch_task >> [branch_a, branch_b] >> join_task
DummyOperator : 별도의 작업을 수행하지 않고, DAG 특정 위치를 표시하거나, 시작과 끝을 정의할 때 사용,
작업의 단계를 구분할때 주로 사용
def process_data():
print("데이터를 처리 중입니다...")
default_args = {
'start_date': datetime(2024, 1, 1),
'retries': 1,
}
with DAG(
dag_id='dummy_operator_dag',
default_args=default_args,
schedule_interval=None,
) as dag:
start = DummyOperator(task_id='start')
process_data_task = PythonOperator(
task_id='process_data_task',
python_callable=process_data
)
end = DummyOperator(task_id='end')
start >> process_data_task >> end
EmailOperator : 작업의 성공여부, 상태를 설정한 이메일로 알려주는 오퍼레이터
def process_data():
print("데이터를 처리 중입니다...")
default_args = {
'start_date': datetime(2024, 1, 1),
'retries': 1,
}
with DAG(
dag_id='email_operator_dag',
default_args=default_args,
schedule_interval=None,
) as dag:
process_data_task = PythonOperator(
task_id='process_data_task',
python_callable=process_data
)
send_email = EmailOperator(
task_id='send_email',
to='example@example.com', # 이메일
subject='Airflow Task Completed', # 제목
html_content="<h3>데이터 처리가 완료되었습니다.</h3><p>프로세스가 성공적으로 종료되었습니다.</p>" # 이메일 내용
)
process_data_task >> send_email
MySqlOperator, BigQueryOperator, PostgresOperator : 특정 데이터베이스에 쿼리를 실행하는 오퍼레이터, ETL에 자주 사용
사용 코드는 한번 찾아보시길 바랍니다~!
S3FileTransformOperator : S3에서 파일을 읽고 변환, 저장 작업을 수행
SensorOperators : 특정 조건이 충족될때까지 기다리는 오퍼레이터
DockerOperator : Docker 컨테이너에서 작업을 실행, 독립된 환경에서 작업을 수행
이 외에도 다양한 오퍼레이터가 제공되고 있으니 확인 해보셔도 재밌을 것 같습니다.
오늘은 Airflow의 Operator에 대하여 간단하게 알아보았습니다.
공부 전에는 뭐가 뭔지 몰랐지만…
Operator를 사용하여 다양한 워크플로우를 구성하고 실행하는 것에 대해서 알게되었으며,
특정 목적에 맞게 사용하는 것이 중요하구나~ 생각이 들었습니다.
'Python > Airflow' 카테고리의 다른 글
[We-Co] Airflow Backfill에 대해서 알아보자! (0) | 2024.11.07 |
---|---|
[We-Co] Airflow Dag - Dag 생성 및 적용방법 (0) | 2024.10.31 |
[We-Co] Airflow - 설치 및 가상환경 셋팅 with Mac (2) | 2024.10.30 |