IT/Airflow

[Airflow] Pool

wookiist 2022. 7. 9. 11:27

Pool

Airflow가 동시에 실행하는 Task가 너무 많을 경우, 시스템에 부하를 줄 수 있습니다. Pool은 이러한 문제가 발생하는 것을 방지하기 위해, 해당 Pool을 사용하는 DAG들에서 병렬로 실행되는 Task의 개수를 제한하기 위해 도입되었습니다.

Pool을 사용하면, Task Set에 대한 병렬 처리를 제한하여 각 Task가 실행되는 시기를 세밀하게 제어할 수 있습니다. 이 내용에 대해서는 마지막에 usecase를 보면서 설명드리겠습니다. 특히, 특정 Task를 수행하는 병렬 Task의 수를 제한하기 위해 자주 사용합니다. 예를 들어, 동일한 API Endpoint나 DB를 찌르는 작업일 때, 또는 Kubernetes 클러스터의 GPU 노드에서 GPU 할당을 제어하기 위해 사용합니다.

기본적으로 Airflow의 모든 Task는 별도의 명시된 Pool이 없는한, default_pool 이라는 이름의 Pool에 할당됩니다. 이 Pool에는 128개의 Slot이 기본값으로 설정되어 있습니다. 이 값은 수정이 가능합니다만, default_pool 을 삭제할 수는 없습니다.

만약, default_pool 을 사용하지 않고 별도 Pool을 이용하려면 다음처럼 pool 파라미터를 작성해주면 됩니다.

task_a = PythonOperator(
    task_id='sample',
    python_callable=some_function,
    **pool='single_task_pool'**
)

별도의 Pool을 생성하는 방법은 매우 간단합니다. CLI를 활용하는 방법도 있고, Web UI에서 생성하는 방법도 있습니다. 여기에선 Web UI에서 생성해보도록 하겠습니다.

  • 상단의 Admin 메뉴에서 Pool 메뉴를 선택하고 새로운 Pool을 추가합니다. 다음처럼 Pool의 이름과, Slot의 개수 그리고 해당 Pool에 대한 설명을 정의할 수 있습니다.

    https://github.com/wookiist/tistory-images/blob/main/2022/07/09/airflow_pool.png?raw=true

Priority Weight in Pool

같은 Pool 내에 속해있더라도, 모든 Task가 같은 중요도를 갖지 않을 수 있습니다. 중요도가 더 높은 Task에 가중치를 더 주어서, 해당 Task에 비해 상대적으로 중요도가 낮은 Task보다 해당 Task를 먼저 실행할 수 있습니다.

이 값은 priority_weight 라는 파라미터를 설정해서 pool level에서 제어할 수 있습니다. priority_weight 는 임의의 정수 값을 가질 수 있으며, 기본값은 1 입니다. 이 값이 더 높을수록 우선 순위가 더 높아집니다.

다음과 같은 DAG를 예로 들어보겠습니다.

with DAG(
    dag_id='pool_priority_dag',
    schedule_interval=@daily,
    catchup=False,
    default_args=default_args
) as dag:

  task_a = PythonOPerator(
        task_id='task_a',
        python_callable=some_function,
        pool='single_task_pool'
    )

    task_b = PythonOperator(
        task_id='task_b',
        python_callable=some_function,
        pool='single_task_pool',
        priority_weight=2
    )

위처럼 작성했을 때, task_atask_b 는 모두 single_task_pool 이라는 이름의 pool에 할당됩니다. task_a 의 경우, priority_weight를 별도로 작성하지 않았으므로 1 이라는 값을 갖게 됩니다. task_b2 라고 명시하고 있으므로, task_a 에 비해 상대적으로 가중치가 더 높은 task_b 가 먼저 실행됩니다.

하나의 Task에 여러 Slot 할당하기

pool_slots 라는 파라미터를 추가해주면, 하나의 Task가 해당 Pool에서 pool_slots 만큼의 Slot 동시에 점유하도록 만들 수 있습니다. 특히, 무거운 작업을 수행해야 한다면, 가벼운 작업에 비해 상대적으로 더 많은 리소스를 활용할 수 있도록 충분한 여지를 주고자 할 때 활용하면 좋습니다. 다음과 같은 예시를 들어보겠습니다.

maintenance 라는 이름의 Pool이 있고, 해당 Pool의 Slot은 2 개라고 가정하겠습니다.

BashOperator(
    task_id="heavy_task",
    bash_command="bash backup_data.sh",
    pool_slots=2,
    pool="maintenance",
)

BashOperator(
    task_id="light_task1",
    bash_command="bash check_files.sh",
    pool_slots=1,
    pool="maintenance",
)

BashOperator(
    task_id="light_task2",
    bash_command="bash remove_files.sh",
    pool_slots=1,
    pool="maintenance",
)

heavy_task 가 실행되면 maintenance Pool의 Slot을 2개 점유하므로 해당 Pool의 Slot이 고갈됩니다. 따라서 나머지 light_task[1,2]heavy_task와 동시에 실행될 수 없고, heavy_task가 끝나기를 기다려야 합니다. 이렇게 하면, heavy_tasklight_task 가 동시에 실행되었을 때 발생할 수 있는 시스템 과부하를 방지할 수 있습니다.

Limitation

그러나 Pool의 능력에도 한계는 있습니다. Pool을 사용하기 전에 다음과 같은 사항을 염두에 둘 필요가 있습니다.

  • 각 Task는 단일 Pool에만 할당할 수 있습니다. 즉, 여러 Pool에 동시에 할당할 수 없다는 점을 알아둬야 합니다. 선택적으로 가져가는 것도 불가능합니다.
  • SubDAG를 작성한 경우, SubDAG 내에 있는 Task에 Pool을 직접 적용해줘야 합니다. 즉, 상위 DAG에서 일괄로 적용한 pool은 SubDAG의 Task에는 적용되지 않는다는 점을 인지하고 있어야 합니다.
  • Pool은 Task Instance의 병렬 처리를 제어하고자 하는 목적으로 도입되었습니다. Task Instance가 아닌 단일 DAG나 모든 DAG에 대한 DagRun 동시성을 제어하고자 하는 경우라면, max_active_runs 또는 core.max_active_runs_per_dag 파라미터를 확인해보는 것이 좋습니다.

UseCase: API Endpoint를 찌르는 작업의 수를 제한하기

예를 들어, 한 번에 3개의 요청만 받기를 원하는 API Endpoint가 있다고 가정해보겠습니다. 본 시나리오에서는 DAG 일정에 따라 동시에 API Endpoint를 찌르게 될 수 있는 2개의 서로 다른 DAG에 총 5개의 Task가 있습니다. 다만, 한 번에 최대 3개의 Task만 해당 API Endpoint를 찌르게 하고 싶으므로, 3개의 Slot을 가진 api_pool 이라는 Pool을 하나 생성하겠습니다.

아래 pool_priority_dag DAG를 살펴보면, default_args 파라미터에 poolpriority_weight 값을 설정하고 있습니다. 이렇게 하면 해당 DAG에 속한 모든 Task (단, SubDAG 제외) 에 일괄 적용됩니다. 따라서, task_a , task_b , task_c 는 동시에 api_pool 을 점유하여 실행되며, API Endpoint에 요청을 보낼 수 있습니다.

  • pool_priority_dag 코드

      import requests
      from datetime import datetime, timedelta
    
      from airflow import DAG
      from airflow.operators.python_operator import PythonOperator
    
      def api_function(**kwargs):
          url = 'https://covidtracking.com/api/v1/states/'
          filename = '{0}/{1}.csv'.format('wa', '2021-07-08')
          res = requests.get(url+filename)
          return res
    
      with DAG(
              dag_id='pool_priority_dag',
              start_date=datetime(2022, 7, 9),
              schedule_interval='@daily',
              catchup=False,
              default_args={
                  **'pool': 'api_pool',**
                  'retries': 1,
                  'retry_delay': timedelta(minutes=5),
                  **'priority_weight': 3**
              }
          ) as dag:
    
          task_a = PythonOperator(
              task_id='task_a',
              python_callable=api_function
          )
    
          task_b = PythonOperator(
              task_id='task_b',
              python_callable=api_function
          )
    
          task_c = PythonOperator(
              task_id='task_c',
              python_callable=api_function
          )
    
          [task_a, task_b, task_c]

해당 시점에는 pool_priority_dag DAG이 실행됨과 동시에, 아래의 pool_unimportant_dag DAG도 함께 실행되고 있습니다.

  • pool_unimportant_dag 코드

      import requests
      from datetime import datetime, timedelta
    
      from airflow import DAG
      from airflow.operators.dummy_operator import DummyOperator
      from airflow.operators.python_operator import PythonOperator
    
      def api_function(**kwargs):
          url = 'https://covidtracking.com/api/v1/states/'
          filename = '{0}/{1}.csv'.format('wa', '2021-07-08')
          res = requests.get(url+filename)
          return res
    
      with DAG(
              dag_id='pool_unimportant_dag',
              start_date=datetime(2022, 7, 9),
              schedule_interval='@daily',
              catchup=False,
              default_args={
                  'retries': 1,
                  'retry_delay': timedelta(minutes=5)
              }
          ) as dag:
    
          task_w = DummyOperator(
              task_id='start'
          )
    
          task_x = PythonOperator(
              task_id='task_x',
              python_callable=api_function,
              **pool='api_pool',**
              **priority_weight=2**
          )
    
          task_y = PythonOperator(
              task_id='task_y',
              python_callable=api_function,
              **pool='api_pool'**
          )
    
          task_z = DummyOperator(
              task_id='end'
          )
    
          task_w >> [task_x, task_y] >> task_z

이 과정에서 task_wdefault_pool 을 점유하므로, task_a , task_b , task_c 와는 별개로 실행될 수 있습니다.

그러나 task_xtask_y 는 각각 2와 1의 priority_weight 가 설정되어 있습니다. 따라서 pool_priority_dag DAG의 모든 Task 중에 하나라도 종료되면 task_xtask_y 가 실행될 수 있습니다.

단, task_xtask_y 사이에서도 가중치의 차이가 존재한다는 점을 주목해야 합니다. task_x 의 가중치가 task_y 에 비해 상대적으로 높으므로, pool_priority_dag 의 Task 중 하나가 끝나면 task_x 가 그 자리를 차지하게 됩니다. 이후 pool_priority_dag 의 나머지 두 Task 중 하나 또는 task_x 가 완료되면 그 남은 자리를 task_y 가 점유하게 됩니다.

이런 방식으로 API Endpoint에 동시에 최대 3개까지만 요청을 보낼 수 있도록 제어할 수 있습니다.

이러한 방식으로 여러 DAG간의 실행 제어도 가능하다는 것을 살펴볼 수 있었습니다. 물론, DagRun 자체에 동시성을 제어하고자 한다면, 앞서 소개드린 max_active_runs 또는 core.max_active_runs_per_dag 를 확인하는 것이 필요합니다.

Reference

반응형