새소식

데이터 엔지니어링/기술스택 및 알쓸신잡

[Airflow] 커스텀 센서 구현하기! (Redis key value sensor)

  • -

이 포스팅을 다루기 위해 레디스 필요성부터 구현방법까지 작성하고 돌아왔다.

드디어, 예전에 포스팅했던 에어플로우 DAG 작성법에 이어 커스텀 레디스 센서에 대해 작성하고자한다.

이전 포스팅이 궁긍한 분들은 여기..

 

[Airflow] 에어플로우 도커 컴포즈로 한방에 올리기

이번엔 저번 포스팅에 이어, 실제로 에어플로우를 어떻게 구현하고, 사용하는지에 대해 알아보자! TODO list도커 컴포즈로 Airflow 올리는 이유커스텀 airflow 이미지 만들기도커 컴포즈 yaml 파일 만들

9unu.tistory.com


 

TODO List

  • Airflow Sensoroperator 필요성
  • 기본 레디스 센서의 작동 방식 (poke 등)
  • 커스텀 센서 구현 방법
  • 마무리

1. Airflow Sensor operator 필요성

기본적으로 Airflow에는 Sensor operator가 존재한다.

예를 들어 file이 생성됐는지 확인하는 file operator나, 레디스에 key가 생성됐는지 확인하는 Redis operator가 있다.

 

'그럼 센서가 왜 있어야할까?'

 

어떤 상태 변화를 체크해야할 때 센서가 필요하다.

특히 복잡한 작업간의 관계를 스케줄링하는 Airflow에서 특정 조건이 만족되었을 때 다른 작업을 트리거해야하는 상황을 해결하기위해 센서는 꼭 있어야 하는 기능이다.

 

나의 경우에는 크롤링을 마쳤을 때 중간 집계 및 전체 테이블로 마이그레이션하는 작업이 트리거 되었어야하기 때문에

모든 스케줄이 완료되었다는 상태 변화를 지속적으로 체크해야했다.

 

'그럼 이때 가장 적합한 센서는?'

뭐 여러가지 있겠다만, 난 레디스 센서를 선택했다.

이미 레디스를 사용중이기도 했고, [크롤링 중 -> 크롤링 완료]의 상태 변화 확인이 주 목적이다보니 키 - 값 쌍으로 해시 접근이 가능한 레디스가 가장 적합하다고 판단했다.


2. 기본 레디스 센서의 작동 방식 (poke, reschedule 등)

이를 위해 Airflow에는 여러가지 센서 오퍼레이터가 내장되어있다.

레디스 오퍼레이터도 그 중 한가지이다.

실제 라이브러리 내부로 타고 들어가서 확인한 레디스 센서 클래스는 다음과 같다.

class RedisKeySensor(BaseSensorOperator):
    """Checks for the existence of a key in a Redis."""

    template_fields: Sequence[str] = ("key",)
    ui_color = "#f0eee4"

    def __init__(self, *, key: str, redis_conn_id: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.redis_conn_id = redis_conn_id
        self.key = key

    def poke(self, context: Context) -> bool:
        self.log.info("Sensor checks for existence of key: %s", self.key)
        return RedisHook(self.redis_conn_id).get_conn().exists(self.key)
class BaseSensorOperator(BaseOperator, SkipMixin):
    valid_modes = ["poke", "reschedule"]

    def __init__(
        self,
        *,
        poke_interval = 60,
        timeout = conf.getfloat("sensors", "default_timeout"),
        soft_fail = False,
        mode = "poke",
        exponential_backoff = False,
        max_wait = None,
        silent_fail = False,
        never_fail = False,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.poke_interval = self._coerce_poke_interval(poke_interval).total_seconds()
        self.soft_fail = soft_fail
        self.timeout = self._coerce_timeout(timeout).total_seconds()
        self.mode = mode
        self.exponential_backoff = exponential_backoff
        self.max_wait = self._coerce_max_wait(max_wait)

    def poke(self, context: Context) -> bool | PokeReturnValue:
        """Override when deriving this class."""
        raise AirflowException("Override me.")

    @property
    def reschedule(self):
        """Define mode rescheduled sensors."""
        return self.mode == "reschedule"

내가 사용하기 전에 확인 했던 내용 중 가장 핵심적인 부분만 떼온 내용이라 추가적인 내용은 직접 타고 들어가서 보는 걸 추천한다.

 

먼저 초기화 부분을 보면 매개변수 중 poke_interval, timeout, mode 같은 것들이 있다.

poke_interval은 센서가 얼마나 자주 상태를 확인할지를 초 단위로 설정한다. 기본값은 60초이다.

timeout은 센서가 얼마나 오래 대기할지를 초 단위로 설정한다. (설정 값을 벗어나면 fail 처리)

mode는 센서의 동작 방식을 정의한다. 두 가지 모드가 있다:

  1. poke 모드(기본값): 센서가 태스크 슬롯을 계속 점유하면서 주기적으로 상태를 확인한다. 조건이 충족될 때까지 태스크 슬롯을 잡고 있음.
  2. reschedule 모드: 센서가 상태를 확인한 후 태스크 슬롯을 반환하고, 다음 체크 시간에 다시 슬롯을 요청한다. 리소스 효율적.

이 두 모드의 차이는 Airflow의 워커 슬롯 사용에 영향을 미친다. poke 모드는 조건이 충족될 때까지 워커 슬롯을 계속 점유하고, reschedule 모드는 체크 사이에 슬롯을 반환하므로 다른 태스크가 그 슬롯을 사용할 수 있다.

(웬만하면 reschedule로 하는게 좋다는 말...) 

 

실제 RedisKeySensor 클래스를 보면 기본적으로 poke 메서드가 정의되어 있다. 이 메서드는 지정된 Redis 키가 존재하는지 확인하고, 키가 존재하면 True를 반환하여 센서가 성공했음을 알린다.

 

하지만 문제는, 기본 Redis 센서는 키의 존재 여부만 체크한다는 점이다.

 

예를 들어 해당 작업 완료를 확인하고 다음 작업을 트리거하는 패턴이 "계속 반복된다면",

키를 만들고, 확인하고, 없애는 과정을 계속 반복해야한다는 것이다.

 

나의 경우 키를 생성하고 삭제하는 방식보다는 이미 존재하는 키의 값을 상태에 맞춰 바꾸는게 효율적이라 판단해 커스텀 센서를 구현했다.


3. 커스텀 센서 구현 방법

이제 내가 구현한 커스텀 Redis 센서를 살펴보자. 이 센서는 키의 존재 여부가 아닌, 키의 값이 특정 조건을 만족하는지 확인한다.

from airflow.sensors.base import BaseSensorOperator
from airflow.hooks.redis_hook import RedisHook
from typing import Any, Dict

class RedisValueSensor(BaseSensorOperator):
    """
    Redis의 특정 키에 저장된 값이 지정한 값과 일치하는지 확인하는 센서
    """
    template_fields = ('key', 'expected_value')
    ui_color = '#f0eee4'

    def __init__(
        self,
        *,
        key: str,
        expected_value: str,
        redis_conn_id: str = 'redis_default',
        **kwargs
    ):
        super().__init__(**kwargs)
        self.redis_conn_id = redis_conn_id
        self.key = key
        self.expected_value = expected_value

    def poke(self, context: Dict[str, Any]) -> bool:
        """
        Redis의 키 값이 기대하는 값과 일치하는지 확인
        """
        self.log.info(f"Checking if Redis key '{self.key}' has value '{self.expected_value}'")

        redis_conn = RedisHook(self.redis_conn_id).get_conn()

        # 키가 존재하는지 먼저 확인
        if not redis_conn.exists(self.key):
            self.log.info(f"Redis key '{self.key}' does not exist")
            return False

        # 키 값 가져오기
        current_value = redis_conn.get(self.key)

        # 바이트 문자열을 일반 문자열로 디코딩
        if isinstance(current_value, bytes):
            current_value = current_value.decode('utf-8')

        self.log.info(f"Current value for key '{self.key}': {current_value}")

        # 값 비교
        return current_value == self.expected_value

이 커스텀 센서(RedisValueSensor)는 기본 RedisKeySensor를 확장하여 다음과 같은 기능을 추가했다:

  1. 키의 존재 여부뿐만 아니라 키의 값도 체크한다.
  2. 키의 값이 기대하는 값(expected_value)과 일치하는지 확인한다.
  3. Redis에서 가져온 바이트 문자열을 적절히 디코딩하여 비교한다.

이 센서를 사용하면 키를 삭제하고 다시 만드는 대신, 키의 값을 변경하는 방식으로 상태를 관리할 수 있다. 예를 들어, crawling:status 키에  completed, processing 같은 값을 저장하여 크롤링 상태를 표시할 수 있다.

실제 DAG에서는 다음과 같이 사용할 수 있다:

# 크롤링 완료를 기다리는 센서
wait_for_crawling_completed = RedisValueSensor(
    task_id='wait_for_crawling_completed',
    key='crawling:status',
    expected_value='completed',
    poke_interval=60,  # 1분마다 체크
    timeout=86400,     # 최대 24시간 대기
    mode='reschedule',  # 리소스 효율적인 모드 사용
    redis_conn_id='redis_default'
)

# 이후 작업들
process_crawled_data = PythonOperator(
    task_id='process_crawled_data',
    python_callable=process_data_function
)

# 작업 순서 설정
wait_for_crawling_completed >> process_crawled_data

이런 식으로 구성하면, 크롤링이 완료되어 Redis의 crawling:status 키 값이 completed로 변경될 때까지 기다렸다가 데이터 처리 작업을 시작할 수 있다.


4. 마무리

이번 포스팅에서는 Airflow의 센서 오퍼레이터, 특히 Redis 센서에 대해 알아보았다.

  1. 센서 오퍼레이터의 필요성: 특정 조건이 충족될 때까지 기다렸다가 작업을 트리거하기 위해 필요함
  2. 기본 Redis 센서의 작동 방식: 키의 존재 여부만 체크하는 한계가 있음
  3. 커스텀 Redis 센서 구현: 키의 값까지 확인하는 추가 기능 구현

나는 키를 삭제하고 다시 만드는 방식보다는 키의 값을 변경하는 방식이 더 효율적이라고 판단하여 커스텀 센서를 만들었다.

이건 어디까지나 내 나름의 판단에 맞춰 수행한 것으로, 키를 만들고 없애는 게 더 효율적일 수 도 있다..ㅎㅎ

그래도 문제 상황에 맞춰 해결 방법을 직접 구현하는 것도 하나의 능력이니까!! 나름 좋은 경험이었다고 생각한다 ㅎㅎ

 

지금까지 에어플로우 구현과 관련된 대략적인 내용은 모두 다뤘다.

도커 컴포즈로 에어플로우를 올리고

DAG파일을 작성하고

커스텀 센서를 만들어서 상태 관리하는 것까지...

 

이제 다음부터는 기본적인 구현보다는 execution date처럼 내가 실제로 사용하면서 이해하기 어려웠던 부분들을 위주로 포스팅하려한다!

 

기술 스택 공부에 애먹는 학생(나)들을 위해...

 

화이팅!!

 

ㄱㅊㅁ_ㅇㅈ

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.