본 글은 Ash Berlin-Taylor가 작성한 ‘Apache Airflow 2.4.0: That Data Aware Release’ 글을 읽고 번역한 글입니다. 중간 중간에 필요한 제 사견이나 첨언도 들어가 있으니 참고 부탁드립니다.
Apache Airflow 2.4.0에는 650개 이상의 유저 커밋, 그리고 총 870개 이상의 커밋이 포함되어 있습니다. 이번 버전에는 46개의 새로운 기능, 39개의 개선 사항 그리고 52개의 버그 픽스가 포함됩니다.
Data-aware scheduling (AIP-48)
정말 대단한 기능입니다. 이제 Airflow는 데이터셋을 업데이트하는 다른 task를 기반으로 DAG을 스케줄링할 수 있게 되었습니다.
이게 정확히 무슨 뜻일까요? 이 기능으로 인해 DAG 작성자들은 더 작고, 독립적인 DAG을 생성해서, 더 큰 데이터 기반의 워크플로에 연결할 수 있습니다.
…
자 이야기는 이걸로 충분한 거 같습니다. 이제 my-dataset
이라는 데이터셋을 생성하는 my_task
라는 이름의 간단한 DAG을 생성해봅시다.
from airflow import Dataset
**dataset = Dataset(uri='my-dataset')**
with DAG(dag_id='producer', ...)
**@task(outlets=[dataset])**
def my_task():
...
데이터셋은 URI로 정의합니다. 이제, 이 데이터셋에 변화가 생기면 언제라도 스케줄되는 두 번째 DAG (consumer
) 을 만들어볼까요.
from airflow import Dataset
**dataset = Dataset(uri='my-dataset')**
with DAG(dag_id='dataset-consumer', **scheduler=[dataset]**):
...
my_task
인스턴스가 완료되면, Airflow는 dataset-consumer
를 실행하는 DAG run을 만들겁니다.
많은 사람들이 희망하던 dataset 관련 기능을 모두 만족시키지는 못했음을 알고 있습니다.그러나 앞으로 다가올 마이너 릴리즈(2.5, 2.6…)에서 이번 업데이트를 토대로 확장시키고 개선해나갈 예정입니다.
Dataset 은 데이터셋의 추상화된 컨셉입니다. 이번 릴리즈는 미래를 위한 초석을 심은 것으로, (아직까지는) 데이터셋을 직접 읽고 쓰는 기능은 포함되어 있지 않습니다만, 이 기능은 우리가 목표하는 바의 일부입니다.
데이터셋에 관련된 더 많은 정보가 필요하다면 ‘Data-aware scheduling’ 문서를 참조해주세요. 이 문서에서 데이터셋이 어떻게 URI로 확인할 수 있는지, 여러 개의 데이터셋에 기반할 수 있는지, 그리고 데이터셋이 무엇인지 생각해볼 수 있을 겁니다.
새로운 ExternalPythonOperator를 이용해서 Python 디펜던시 충돌을 더 쉽게 관리하기
모든 파이썬 라이브러리가 다같이 행복하게 사용될 수 있기를 바라는만큼, 우리가 사는 세상을 바라보면 (그렇지 못하다보니) 더 슬퍼집니다. 때론 Airflow 설치 과정에서 여러 개의 파이썬 라이브러리를 설치하고자 할 때 충돌이 발생하곤 하는데요, 최근 들어 dbt-core
에 관해 이 이슈가 자주 들리고 있습니다.
이런 문제를 더 쉽게 해결하고자 @task.external_python
(== ExternalPythonOperator
) 를 도입했습니다. 이를 이용하면 Airflow task로 파이썬 함수를 실행할 때, 사전에 정의해둔 virtualenv 내에서 실행하거나, 완전히 다른 파이썬 버전을 이용해 실행할 수 있습니다. 예를 들어볼까요?
@task.external_python(python='/opt/venvs/task_deps/bin/python')
def my_task(data_interval_start, data_interval_end):
print(f'Looking at data between {data_interval_start} and {data_interval_end}')
...
접근하려는 context_variables에 따라 virtualenv에 설치해야 하는 몇 가지 요소들이 있으니 사용 전에 꼭 ‘how-to on using the ExternalPythonOperator’ 문서를 읽어봐주세요.
Dynamic Task Mapping에 관한 더 많은 발전 사항
여러분들의 요청사항을 잘 들었습니다. Dynamic Task Mapping은 이제 다음 사항을 지원합니다.
expand_kwargs
: TaskFlow가 아닌 오퍼레이터에 여러 개의 파라미터를 할당할 수 있습니다.zip
: 벡터곱 없이 여러 가지를 합치고 싶을 때 활용할 수 있습니다.map
: task가 실행되기 전에 파리미터를 변형하고자 할 때 사용할 수 있습니다.
더 많은 dynamic task mapping 관련 정보를 얻고 싶다면, ‘Transforming Mapped Data, Combining upstream data (aka “zipping”)’ 문서와 ‘Assigning multiple parameters to a non-TaskFlow operator’ 문서를 읽어봐주세요.
context manager에서 DAG이 자동 등록됩니다 (더 이상 as dag:
가 필요하지 않아요)
이 업데이트는 삶의 질을 조금 높여줄 것입니다. 그리고 필자 입장에선 as dag:
를 쓰는 것을 얼마나 많이 잊어버렸었는지 인정하고 싶지 않습니다.
기존의 코드는 다음과 같았죠.
with DAG(dag_id='example') as dag:
...
@dag
def dag_maker():
...
dag2 = dag_maker()
이젠 이렇게만 해도 됩니다.
with DAG(dag_id='example'):
...
@dag
def my_dag():
...
my_dag()
만약 어떤 이유에선가 DAG이 자동 등록되는 것을 방지하고 싶다면, auto_register=False
옵션을 넣어주면 됩니다.
# variable로 할당되지 않았기 때문에, 이 dag은 Airflow에 의해 픽업되지 않을 겁니다.
with DAG(dag_id='example', auto_register=False):
...
기타 개선사항들
650개의 커밋 내용을 여기 포함하기엔 너무 많습니다. 하지만 몇 가지 눈에 띄는 피처를 좀 더 살펴볼까요?
- 홈페이지에서 자동 새로고침 기능
- TaskFlow 데코레이터로
@task.short_circuit
추가 - CLI로 role delete 기능 추가
ExternalTaskSensor
에서TaskGroup
지원- TaskFlow 데코레이터로
@task.kubernetes
추가 (오…) - 워커에서 Dynamic DAG의 최적화를 처리하도록 하는
parsing_context
실험적 기능 추가 - 하나의
schedule
변수로 통합 - 이전엔 ‘모 아니면 도’였던 컨피그 마스킹 기능을 이제 Admin → Configuration에서 민감하지 않은 데이터로 설정할 수 있도록 기능 제공
- class 이름에서 operator 이름을 분리(더 이상 TaskFlow를 사용할 때
_PythonDecoratedOperator
를 이용하지 않아도 됨!)
제 개인적인 생각으론 데이터 리니지에 대한 관리도 편해졌지만, VirtualEnv를 이용해서, 새로운 이미지를 말 필요 없이 한 이미지에 모든 라이브러리를 넣어 둘 수 있다는 것도 큰 장점이라고 보입니다. 물론 이미지의 크기가 너무 커지면, 새로운 노드에서 실행될 때 시간을 많이 잡아 먹겠지만, 좀 더 생각해보면 좋은 활용 방법이 있을 거 같네요. 어서 빌드해서 테스트해봐야겠습니다~!
+) 헉 그나저나 벌써 2.4.1 이 출시되었네요. 버그가 많았나.. ㅎㅎ
마무리
만약 이 글이 도움이 되셨다면 글 좌측 하단의 하트❤를 눌러주시면 감사하겠습니다.
혹시라도 글에 이상이 있거나, 오역, 이상한 번역이 있거나, 이해가 가지 않으시는 부분, 또는 추가적으로 궁금하신 내용이 있다면 주저 마시고 댓글💬을 남겨주세요! 빠른 시간 안에 답변을 드리겠습니다 😊
참고
'IT > Airflow' 카테고리의 다른 글
[Airflow] Backfill과 Clear를 정리해보자 (0) | 2022.09.18 |
---|---|
[Airflow] PythonSensor에 pod override 옵션 적용하기 (0) | 2022.09.04 |
[Airflow] Pool (0) | 2022.07.09 |
[Airflow] Dynamic Task Mapping (동적 태스크 매핑) (0) | 2022.06.06 |
[Airflow] Sensor를 정리해보자 (2) | 2022.05.22 |