IT/Airflow

[Airflow] Airflow 2.4.0에선 무엇이 달라졌을까?

wookiist 2022. 10. 15. 12:09

본 글은 Ash Berlin-Taylor가 작성한 ‘Apache Airflow 2.4.0: That Data Aware Release’ 글을 읽고 번역한 글입니다. 중간 중간에 필요한 제 사견이나 첨언도 들어가 있으니 참고 부탁드립니다.


Apache Airflow 2.4.0에는 650개 이상의 유저 커밋, 그리고 총 870개 이상의 커밋이 포함되어 있습니다. 이번 버전에는 46개의 새로운 기능, 39개의 개선 사항 그리고 52개의 버그 픽스가 포함됩니다.

Data-aware scheduling (AIP-48)

정말 대단한 기능입니다. 이제 Airflow는 데이터셋을 업데이트하는 다른 task를 기반으로 DAG을 스케줄링할 수 있게 되었습니다.

이게 정확히 무슨 뜻일까요? 이 기능으로 인해 DAG 작성자들은 더 작고, 독립적인 DAG을 생성해서, 더 큰 데이터 기반의 워크플로에 연결할 수 있습니다.

자 이야기는 이걸로 충분한 거 같습니다. 이제 my-dataset이라는 데이터셋을 생성하는 my_task라는 이름의 간단한 DAG을 생성해봅시다.

from airflow import Dataset

**dataset = Dataset(uri='my-dataset')**

with DAG(dag_id='producer', ...)
    **@task(outlets=[dataset])**
    def my_task():
        ...

데이터셋은 URI로 정의합니다. 이제, 이 데이터셋에 변화가 생기면 언제라도 스케줄되는 두 번째 DAG (consumer) 을 만들어볼까요.

from airflow import Dataset

**dataset = Dataset(uri='my-dataset')**

with DAG(dag_id='dataset-consumer', **scheduler=[dataset]**):
    ...

my_task 인스턴스가 완료되면, Airflow는 dataset-consumer를 실행하는 DAG run을 만들겁니다.

많은 사람들이 희망하던 dataset 관련 기능을 모두 만족시키지는 못했음을 알고 있습니다.그러나 앞으로 다가올 마이너 릴리즈(2.5, 2.6…)에서 이번 업데이트를 토대로 확장시키고 개선해나갈 예정입니다.

Dataset 은 데이터셋의 추상화된 컨셉입니다. 이번 릴리즈는 미래를 위한 초석을 심은 것으로, (아직까지는) 데이터셋을 직접 읽고 쓰는 기능은 포함되어 있지 않습니다만, 이 기능은 우리가 목표하는 바의 일부입니다.

데이터셋에 관련된 더 많은 정보가 필요하다면 ‘Data-aware scheduling’ 문서를 참조해주세요. 이 문서에서 데이터셋이 어떻게 URI로 확인할 수 있는지, 여러 개의 데이터셋에 기반할 수 있는지, 그리고 데이터셋이 무엇인지 생각해볼 수 있을 겁니다.

새로운 ExternalPythonOperator를 이용해서 Python 디펜던시 충돌을 더 쉽게 관리하기

모든 파이썬 라이브러리가 다같이 행복하게 사용될 수 있기를 바라는만큼, 우리가 사는 세상을 바라보면 (그렇지 못하다보니) 더 슬퍼집니다. 때론 Airflow 설치 과정에서 여러 개의 파이썬 라이브러리를 설치하고자 할 때 충돌이 발생하곤 하는데요, 최근 들어 dbt-core 에 관해 이 이슈가 자주 들리고 있습니다.

이런 문제를 더 쉽게 해결하고자 @task.external_python (== ExternalPythonOperator) 를 도입했습니다. 이를 이용하면 Airflow task로 파이썬 함수를 실행할 때, 사전에 정의해둔 virtualenv 내에서 실행하거나, 완전히 다른 파이썬 버전을 이용해 실행할 수 있습니다. 예를 들어볼까요?

@task.external_python(python='/opt/venvs/task_deps/bin/python')
def my_task(data_interval_start, data_interval_end):
    print(f'Looking at data between {data_interval_start} and {data_interval_end}')
    ...

접근하려는 context_variables에 따라 virtualenv에 설치해야 하는 몇 가지 요소들이 있으니 사용 전에 꼭 ‘how-to on using the ExternalPythonOperator’ 문서를 읽어봐주세요.

Dynamic Task Mapping에 관한 더 많은 발전 사항

여러분들의 요청사항을 잘 들었습니다. Dynamic Task Mapping은 이제 다음 사항을 지원합니다.

  • expand_kwargs : TaskFlow가 아닌 오퍼레이터에 여러 개의 파라미터를 할당할 수 있습니다.
  • zip : 벡터곱 없이 여러 가지를 합치고 싶을 때 활용할 수 있습니다.
  • map : task가 실행되기 전에 파리미터를 변형하고자 할 때 사용할 수 있습니다.

더 많은 dynamic task mapping 관련 정보를 얻고 싶다면, ‘Transforming Mapped Data, Combining upstream data (aka “zipping”)’ 문서와 ‘Assigning multiple parameters to a non-TaskFlow operator’ 문서를 읽어봐주세요.

context manager에서 DAG이 자동 등록됩니다 (더 이상 as dag: 가 필요하지 않아요)

이 업데이트는 삶의 질을 조금 높여줄 것입니다. 그리고 필자 입장에선 as dag: 를 쓰는 것을 얼마나 많이 잊어버렸었는지 인정하고 싶지 않습니다.

기존의 코드는 다음과 같았죠.

with DAG(dag_id='example') as dag:
    ...

@dag
def dag_maker():
    ...

dag2 = dag_maker()

이젠 이렇게만 해도 됩니다.

with DAG(dag_id='example'):
    ...

@dag
def my_dag():
    ...

my_dag()

만약 어떤 이유에선가 DAG이 자동 등록되는 것을 방지하고 싶다면, auto_register=False 옵션을 넣어주면 됩니다.

# variable로 할당되지 않았기 때문에, 이 dag은 Airflow에 의해 픽업되지 않을 겁니다.
with DAG(dag_id='example', auto_register=False):
    ...

기타 개선사항들

650개의 커밋 내용을 여기 포함하기엔 너무 많습니다. 하지만 몇 가지 눈에 띄는 피처를 좀 더 살펴볼까요?

  • 홈페이지에서 자동 새로고침 기능
  • TaskFlow 데코레이터로 @task.short_circuit 추가
  • CLI로 role delete 기능 추가
  • ExternalTaskSensor 에서 TaskGroup 지원
  • TaskFlow 데코레이터로 @task.kubernetes 추가 (오…)
  • 워커에서 Dynamic DAG의 최적화를 처리하도록 하는 parsing_context 실험적 기능 추가
  • 하나의 schedule 변수로 통합
  • 이전엔 ‘모 아니면 도’였던 컨피그 마스킹 기능을 이제 Admin → Configuration에서 민감하지 않은 데이터로 설정할 수 있도록 기능 제공
  • class 이름에서 operator 이름을 분리(더 이상 TaskFlow를 사용할 때 _PythonDecoratedOperator 를 이용하지 않아도 됨!)

제 개인적인 생각으론 데이터 리니지에 대한 관리도 편해졌지만, VirtualEnv를 이용해서, 새로운 이미지를 말 필요 없이 한 이미지에 모든 라이브러리를 넣어 둘 수 있다는 것도 큰 장점이라고 보입니다. 물론 이미지의 크기가 너무 커지면, 새로운 노드에서 실행될 때 시간을 많이 잡아 먹겠지만, 좀 더 생각해보면 좋은 활용 방법이 있을 거 같네요. 어서 빌드해서 테스트해봐야겠습니다~!

+) 헉 그나저나 벌써 2.4.1 이 출시되었네요. 버그가 많았나.. ㅎㅎ

마무리

만약 이 글이 도움이 되셨다면 글 좌측 하단의 하트❤를 눌러주시면 감사하겠습니다.

혹시라도 글에 이상이 있거나, 오역, 이상한 번역이 있거나, 이해가 가지 않으시는 부분, 또는 추가적으로 궁금하신 내용이 있다면 주저 마시고 댓글💬을 남겨주세요! 빠른 시간 안에 답변을 드리겠습니다 😊

참고

반응형