안녕하세요. 위기의 코딩맨입니다.
오늘은 Airflow의 Dag에 대해서 간단하게 알아보고 적용해보도록 하겠습니다~!
먼저 Dag가 무엇인지부터 알아보도록 해야겠죠?!
[ Dag ]
DAG는 Directed Acyclic Graphs를 의미하는 약어를 의미하며
방향성 비순환 그래프를 뜻하고 있습니다.
방향성 비순환 그래프가 무슨 의미를 갖고있냐 하면
아래 이미지와 같이 방향성을 갖고 Task를 실행하지만 다시 돌아오거나, 다시 실행되지않고
정해진 길로만 가는것을 의미합니다.
기본적으로 Task로 구성되어 있으며, 서로 의존성을 갖고 흐름을 형성합니다.
실행은 스케줄러에 의해서 실행될수 있으며, 소스에서도 실행을 제어할 수 있습니다.
이러한 Dag의 시각적인 표현은
Graph 메뉴에서 확인하실 수 있습니다.
[ 생성 및 적용 ]
이제 Dag 파일을 생성해서 적용해보도록 하겠습니다.
먼저, 필요한 라이브러리를 불러오고, 시간설정을 진행했습니다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
## 로컬 타임존 생성
local_tz = pendulum.timezone("Asia/Seoul")
다음은, Task에 적용되는 기본 설정을 알아보도록 하겠습니다.
- owner : 소유자
- depends_on_past : 이전 실행 여부
- start_date : 시작 시간
- email_on_failure : 실패시, 이메일 전송
- email_on_retry : 재실행 실패시, 이메일전송
- retries : 실패 시, 재시도 횟수
- retry_delay : 재시도 간격
더 다양한 설정도 있지만 이렇게만 설정해보도록 하겠습니다.
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2024, 10, 31, tzinfo=local_tz),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
Task의 기본설정을 하고 Dag를 정의해보도록 하겠습니다.
- DAG의 이름
- default_args : 위에서 설정한 Task의 기본설정
- description : 설명
- schedule_interval : 실행주기
- catchup : 백필 여부
이렇게 DAG를 정의하도록 하겠습니다.
dag = DAG(
'WeCo_Dag',
default_args=default_args,
description='A simple example DAG',
schedule_interval=timedelta(days=1),
catchup=False
)
이제 위에서 설정한 Dag를 실행 Task에 적용하도록 하겠습니다.
위에서 설정한 Task는 전체적인 기본 설정이고,
지금은 각각의 Task 설정으로 생가해주시면 됩니다.
task_1은 Hello, world를 출력하고,
task_2는 5초간 대기,
task_3은 All tasks are done!을 출력하도록 설정했습니다.
# Task 1: 간단한 Bash 명령어 실행 (Hello World 출력)
task_1 = BashOperator(
task_id='print_hello',
bash_command='echo "Hello, World!"',
dag=dag,
)
# Task 2: 5초 대기하는 Bash 명령어 실행
task_2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=dag,
)
# Task 3: task_1과 task_2가 끝난 후 실행될 태스크
task_3 = BashOperator(
task_id='print_done',
bash_command='echo "All tasks are done!"',
dag=dag,
)
그리고 마지막으로 설정한 task_1,2,3의 의존성을 설정해주어야합니다.
쉽게 1실행, 2실행, 3실행 순서를 정한다고 생각하시면 됩니다.
task_1 >> task_2 >> task_3
전체소스는 아래를 확인해주세요~!
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
## 로컬 타임존 생성
local_tz = pendulum.timezone("Asia/Seoul")
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2024, 10, 31,tzinfo=local_tz),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
dag = DAG(
'WeCo_Dag',
default_args=default_args,
description='A simple example DAG',
schedule_interval=timedelta(days=1),
catchup=False
)
# Task 1: 간단한 Bash 명령어 실행 (Hello World 출력)
task_1 = BashOperator(
task_id='print_hello',
bash_command='echo "Hello, World!"',
dag=dag,
)
# Task 2: 5초 대기하는 Bash 명령어 실행
task_2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=dag,
)
# Task 3: task_1과 task_2가 끝난 후 실행될 태스크
task_3 = BashOperator(
task_id='print_done',
bash_command='echo "All tasks are done!"',
dag=dag,
)
# 의존성 설정
task_1 >> task_2 >> task_3
다음은 생성한 파일을 dags 폴더에 복사해주세요~!
dags 폴더는 airflow/dags 에 있습니다.
저는 WeCo_Airflow_001.py로 작성했습니다.
dag를 적용하기 위해서
재실행하도록 합니다.
airflow scheduler
airflow webserver
그럼 홈페이지에 WeCo_Dag가 생성된 것을 확인할 수 있습니다! 하하하
옆에 재생버튼을 클릭하면
순차적으로 실행 Task를 확인할 수있습니다.
위에서 설정한 3개의 task도 확인해볼수 있습니다.
각각의 테스트를 클릭해서 로그도 확인해 보았는데
Hellow world 등등 잘 출력되는것도 확인할 수 있었습니다.
오늘은 좀 많은 양의 포스팅이 되었는데
기본적인 DAG를 생성 및 적용하고
결과를 확인하는 방법에 대해서 아주 간단하게 알아보았습니다.
너무 재밌네요~!!

'Python > Airflow' 카테고리의 다른 글
[We-Co] Airflow의 Operator에 대해서 알아보자!! (1) | 2024.11.08 |
---|---|
[We-Co] Airflow Backfill에 대해서 알아보자! (0) | 2024.11.07 |
[We-Co] Airflow - 설치 및 가상환경 셋팅 with Mac (2) | 2024.10.30 |