본문 바로가기
Python/Airflow

[We-Co] Airflow Sensor - polling에 대해서 알아보자

by 위기의코딩맨 2025. 6. 26.
반응형

안녕하세요. 위기의 코딩맨입니다.

오늘은 오랜만에 Airflow 포스팅을 해보도록 하겠습니다~!

 

오늘은 Sensor에 대해서 공부해보도록 하겠습니다.

Sensor에 대해서 공부하면 자연스럽게 따라들어오는 plolling에 대해서도 알아보도록 하겠습니다.

 

 

[ Sensor ]

외부 시스템(S3, DB, API), Airflow(다른 DAG, 태스크) 특정 조건이 만족할때까지 대기하기 위한 오퍼레이터

  • 외부조건 : FileSensor, S3KeySensor, HttpSensor, SqlSensor 
  • 내부조건 : ExternalTaskSensor (다른 DAG/태스크가 성공했는지)

동작 방식

  • 태스크가 워커에 할당 execute() -> poke()
  • poke_interval 만큼 워커 슬롯 점유
  • 조건 만족시 종료 or poke()
wait_for_file = FileSensor(
    task_id='wait_file',
    filepath='/data/{{ ds }}/orders.csv',
    poke_interval=20,      # 20초마다 폴링
    timeout=300,           # 최대 5분 대기
    mode='poke',           # 기본값
)

 

상황에 따라서 poke(), reschedule, 비동기 방식으로 사용하여 트래픽과 점유를 조정하면서 진행하면 좋습니다!

 

[ Polling ]

설정한 poke_interval 마다 어떤 상태나 결과를 확인하는 pork 방법!

  • 센서가 조건을 확인하기 위해 쓰는 방식
  • 센서 자체가 그 방식을 구현한 연산자
wait = ExternalTaskSensor(
    task_id='dag_wait',
    external_dag_id='a_dag',
    external_task_id='a_task',
    poke_interval=60,
)
wait >> downstream_task

 

a_dag.a_task 성공여부를 폴링하고, 외부 DAG 연쇄 실행이 필요할때 사용합니다.

  • Sensor : 조건 대기 연산자
  • Polling : 센서가 내부에서 반복 확인하는 방법

 

오늘은 간단하게 sensor와 polling에 대해서 알아보았습니다.

앞으로 다양한 공부를 진행해보도록 하겠습니다.

반응형