IT/Airflow

[Airflow] Sensor를 정리해보자

wookiist 2022. 5. 22. 10:12

Overview

최근 들어 Airflow를 적극적으로 다루는 일이 많아지다보니, 여러 요구사항을 만나게 되는데요. Airflow 자체적으로 제공하지 않는 스케줄링 처리, 특정 파일 및 조건을 만족할 때까지 대기하는 구간 등을 정의하게 되었습니다. 전자는 간단한 Operator를 구현해, 정공법은 아니지만 Workaround 느낌으로 파훼했고, 후자는 이번에 알아볼 Sensor로 해결할 수 있었습니다.

Apache Airflow Sensor는 어떤 사건이 발생할 때 까지 기다리도록 설계된 특수한 종류의 오퍼레이터입니다. 실행된 Sensor는 특정 조건을 만족하면, 성공으로 마킹되며 이후 다운스트림 태스크를 실행합니다. 적절하게 사용되면, DAG를 좀 더 Event-Driven하게 작성할 수 있도록 도와줍니다.

Sensor가 필요한 경우를 한 번 예로 들어보겠습니다. 데이터 파이프라인이 시작하는 일반적인 조건은 파이프라인에 사용할 새로운 데이터가 도착했을 경우입니다. 데이터의 입수 과정부터 직접 담당하고 있다면 정해진 시간마다 특정 기간 동안 쌓인 데이터를 가지고 파이프라인을 시작하면 됩니다. 하지만 타사, 타부서로부터 데이터를 받아 처리하는 입장이라면 어떨까요? 그리고 그 데이터가 매번 같은 시간에 입수되는 것이 아니라면요? 이럴 경우엔, 데이터 입수 시간의 마지노선을 기준으로 파이프라인을 스케줄링할 수도 있습니다. 하지만, 이렇게 할 경우, 데이터가 빨리 입수될 수록 시간적으로 손해를 보게 됩니다.

바로 이럴 때, 특정 조건을 만족하는 경우에 파이프라인을 실행하도록 하는 Sensor의 도움을 받을 수 있습니다.

Sensor Basic

Sensor는 개념적으로 간단하게 이해할 수 있습니다. 특정 주기 동안 어떤 조건을 만족했는지 확인하는 오퍼레이터입니다. 조건을 만족하면, 해당 태스크를 성공을 마킹하고, 이후의 다운스트림 태스크를 실행합니다. 만약 조건이 아직 만족되지 않았다면, Sensor는 다시 확인하기까지의 또다른 주기를 기다립니다.

모든 Sensor들은 BaseSensorOperator를 상속하고 있으며, 다음의 파라미터를 갖고 있습니다.

  • mode : Sensor의 동작 방식을 결정합니다. 다음의 두 가지 방식이 있습니다.
    • poke : 아무런 설정도 하지 않는다면 기본적으로 poke 방식으로 동작합니다. poke 를 사용하면, Sensor는 해당 Sensor의 전체 실행 시간동안 worker 슬롯을 점유하며, poke 사이에는 sleep 상태로 존재합니다. 이 poke 방식은 Sensor가 짧은 실행시간을 가질 것으로 예상할 때 사용하기에 적절합니다.
    • reschedule : 이 방식을 사용하면, 조건을 만족하지 않았을 때 Sensor는 worker 슬롯을 놓아주고, 다음 번 확인을 위해 reschedule 됩니다. 이 방식은 Sensor가 긴 실행시간을 가질 것으로 예상될 때 적절합니다. worker 슬롯을 놓아주기 때문에 다른 태스크를 실행할 수 있고, 자원을 덜 소모하기 때문입니다.
  • poke_interval : poke 방식을 사용할 때, 이 옵션은 Sensor가 조건을 확인하는 다음번 주기(초)를 결정합니다. 기본 값은 30초 입니다.
  • exponential_backoff : True 로 설정하면, poke 모드에서 이전 poke와 다음 poke 사이의 간격을 지수적으로 증가시킵니다. Kubernetes의 CrashLoopBackoff와 유사하네요.
  • timeout : Sensor가 조건을 확인을 시도할 최대 시간(초)을 설정합니다. 이 시간까지도 조건을 만족하지 못하면, 해당 태스크는 실패합니다.
  • soft_fail : True 로 설정하면, 해당 태스크가 timeout 이 나더라도 실패가 아닌 스킵으로 마킹됩니다.

일반적으로 많이 사용되는 Sensors

  • FileSensor : 특정 파일이 존재하는지 확인하고 파일이 있으면 true를, 그렇지 않다면 false를 리턴하는 Sensor입니다.

  • PythonSensor : python_callable로 설정한 함수가 True를 반환할 때까지 기다리는 Sensor입니다. DAG 내에서 복잡한 조건을 구현하고 싶을 때 유용합니다. FileSensor만 사용한다면 파일 유무만을 조건으로 활용하게 되지만, PythonSensor를 이용하면, 파일의 유무와 더불어서 특정 조건이 만족되는지도 확인이 가능합니다. 다음처럼 True, False를 리턴하는 python 함수를 하나 작성해서 사용하면 됩니다.

      from airflow.sensors.python import PythonSensor
      ...
      def check_condition():
          if ...:
              return True
          return False
    
      python_sensor_task = PythonSensor(python_callable=check_condition,
                                        task_id='python_sensor_task',
                                        poke_interval=10,
                                        dag=dag,
                                      )
  • DateTimeSensor : 지정한 날짜와 시간이 지나기를 기다리는 Sensor입니다. 동일한 DAG 내에서 다른 태스크를 다른 시간에 실행하려는 경우에 유용합니다.

  • ExternalTaskSensor : Airflow 태스크가 완료되기를 끼다리는 Sensor입니다. 같은 Airflow 환경에서 DAG 간 의존성을 정의해야할 때 유용합니다.

  • HttpSensor : API requests가 성공하는 것을 보장해야 할 때 유용한 Sensor입니다.

  • SqlSensor : 데이터가 SQL 테이블에 존재할 때까지 기다리는 Sensor입니다. 데이터가 DB에 보관되었을 때 해당 데이터를 처리하는 DAG를 작성해야 할 때 유용합니다.

Sensor를 사용하기 위한 가장 효과적인 방법

Sensor는 구현하기 쉽지만, 최고의 Airflow 경험을 획득하기 위해선 몇 가지 기억해야 하는 사항이 있습니다. 다음의 팁을 따르면, Sensor를 사용함으로써 발생할 수 있는 성능 이슈를 피할 수 있을 겁니다.

  • 언제나 의미있는 timeout 파라미터를 Sensor에 설정하세요. 이 파라미터의 기본값은 7일입니다. 7일은 Sensor가 실행되는 시간치고는 긴 편이죠. Sensor를 정의할 땐, 사용 목적과 Sensor가 얼마나 기다리게 될 지 등등을 모두 고려해서 Sensor의 Timeout을 적절하게 정의해야 합니다.
  • 긴 시간동안 실행될 거 같은 Sensor라면, reschedule 방식을 사용해서 Sensor가 Worker 슬롯을 지속적으로 점유하지 않도록 하세요. 이렇게 하면, Sensor 들이 가용한 Worker 슬롯을 모두 점거해버리는 데드락 현상을 피할 수 있습니다. 다만, 다음의 경우는 예외입니다.
    • poke_interval 이 약 5분보다도 짧다면, poke 방식을 활용하세요. 이런 상황에서 reschedule 방식을 사용하는 것은 스케줄러에 부하를 줄 수 있습니다.
  • 사용 사례에 적절한 poke_interval 값을 설정하세요. 예를 들어, 기다려야 하는 시간이 30분이라는 걸 이미 알고 있을 때, (기본값인) 30초마다 조건을 확인할 필요가 없는 것처럼 말이죠.

Sensor Deadlock

실행중인 태스크의 조건이 true가 될 때까지 다른 태스크들이 대기하게 되므로, 모든 슬롯이 데드락 상태가 됩니다.

Deferrable Operators

Asynchronous Operators라고도 불리는 Deferrable Operators는 Operator나 Sensor가 자신들이 실행되는 동안 모든 Worker 슬롯을 점거해버리는 상황을 막기 위해 Airflow 2.2에서 등장했습니다. Airflow 2.2.4에서 Deprecate된 Smart Sensor와 유사한 문제를 해결하고 있지만, 그 쓰임새는 더 광범위합니다.

DAG를 작성하는 사람들에게 빌트인 deferrable operator를 사용하는 것은, 다른 일반적인 operator를 활용하는 것과 다르지 않습니다. 단지, 스케줄러와 함께 Triggerer 프로세스가 실행되고 있음을 보장하기만 하면 됩니다. 현재 오픈소스 Airflow 진영 바깥에서는 DateTimeSensorAsyncTimeDeltaSensorAsync 가 활용되고 있는데요, 더 많은 deferrable operators는 향후 출시될 Airflow 버전에서 추가될 것으로 기대하고 있습니다.

Reference

마무리

Airflow를 막 사용하기 시작한지 얼마 되지 않은 유저의 입장에서, Sensor를 알아보는 글을 번역 + 추가 소개를 해보았습니다. TimeSensor가 있는 줄은 몰랐는데, 이번에 알게 되면서 요것도 한번 사용해볼까 고민중이네요!

만약 이 글이 도움이 되셨다면 글 좌측 하단의 하트❤를 눌러주시면 감사하겠습니다.

혹시라도 글에 이상이 있거나, 오역, 이상한 번역이 있거나, 이해가 가지 않으시는 부분, 또는 추가적으로 궁금하신 내용이 있다면 주저 마시고 댓글💬을 남겨주세요! 빠른 시간 안에 답변을 드리겠습니다 😊

반응형