새소식

데이터 엔지니어링/기술스택 및 알쓸신잡

[Redis] 레디스 MQ 구현 및 활용법! (Python, Docker)

  • -

저번 포스팅에서 레디스의 필요성에 대해 설명했으니, (아직 못본 분들은 링크..)

https://9unu.tistory.com/29

 

[Redis] 레디스? Nosql? 인메모리? 왜 쓰는건데?

직전 포스팅에서 에어플로우 DAG, operator들을 뜯어보면서 sensor 특히, RedisSensor에 대해 설명하기로 했었다.근데 그 전에, Redis가 뭔지 알아야하지 않는가?그래서 레디스가 무엇인지, 왜 사용하는지

9unu.tistory.com

 

이번 포스팅에서는 실제로 어떻게 레디스 MQ를 구현하고, 활용하는지 설명하려한다.


TODO List

  • 레디스 활용 사례
  • 레디스 셋업 (Docker)
  • 레디스 자료구조 사용법 (Python)
  • 마무리

1. 레디스 활용 사례

본론에 들어가기전에, 실제 활용 사례를 먼저 풀자면

본인은 Redis를 분산 처리 시스템의 매니저 큐로서 활용했고, 현재도 활용 중에 있다.

실제 프로젝트 구조도는 다음과 같다.

간단히 설명하면,

네이버 항공권 사이트로 여러 대의 크롤러 머신이 request를 보내고, 받은 response (json)를 클라우드 스토리지로 올리고, 스파크 클러스터가 json 데이터를 파싱하여 RDBMS인 Postgres로 업로드하는 구조이다.

 

'여기서 레디스가 담당한 부분은 뭘까?'

 

바로 메시지 큐이다.

request 즉, 요청을 보내기 위해서는 언제, 어느 공항에서, 어느 공항으로, 어떤 등급의 항공권 데이터를 요청할지에 대한 파라미터가 필요하다.

 

예를 들어, 그럼 한국 -> 일본으로 가는 일반석 항공편을 내일 출발부터 300일 이후 출발 항공권까지 받으려면?

300개의 항공권 파라미터가 존재하고, 이걸 크롤러 머신이 순차적으로 처리해야 한다.

 

'여기서 잠깐, 300개 정도면 그냥 단일 파이썬 스크립트내에서 deque같은 자료구조로 관리하면 되는 거 아닌가?'

 

맞다. 심지어 크롤러 머신이 한 개라면 매우 그렇다.

 

'하지만 60,600개의 항공권 조합을 수집해야 한다면?'

 

일단 크롤러 머신이 여러 개가 필요하다. (CPU 자원, 네트워크 리소스 등)

그리고 각 크롤러는 모두 각기 다른 스케줄만을 가져가고, 중복된 스케줄을 가져가서는 안 된다.

또한 API 요청이 실패할 경우 다시 재시도하는 로직도 필요하다.

여기서 생각나는 대안은 대충 두 가지 정도 있다.

  1. 시작할 때 n개의 크롤러 머신에 60,600개의 조합을 n분할하여 균등 분배하고, 그 이후에는 각 크롤러가 알아서 처리한다.
  2. 레디스에 60,600개의 조합을 넣어놓고, 여러 크롤러 머신이 알아서 스케줄을 '원자적'으로 가져가고, 실패할 경우 '원자적'으로 다시 삽입한다.

어투에서도 알 수 있듯, 난 2번 방안을 선택했다.

특히 레디스는 본디 서버로 존재하기 때문에, 통신을 통해 스케줄을 가져가고, 이는 분산 처리 시스템 (여러 크롤러 머신)에서 동기화를 최대화할 수 있다.
이에 더해 인메모리 기반 작업? 속도도 빠르다.

 

결론적으로 레디스를 MQ로 사용함으로써

여러 크롤러 머신이 통일된 스케줄 큐로부터, 모든 스케줄이 처리될 때까지, 하나씩 가져가고 실패할 경우 다시 넣는 걸 반복할 수 있다.

심지어 크롤러 머신을 늘려도, 줄여도 바뀌는 건 방화벽 설정뿐이기에 유지 보수성도 좋다.


2. 레디스 셋업 (Docker)

레디스 셋업은 생각보다 매우 간단하다.

docker run --name redis-server -d -p 6379:6379 redis

그냥 공식 레디스 이미지를 컨테이너로 올리면 그만.

원한다면 포트매핑 같은 걸 수정할 수 있지만, 나는 굳이 안 했다

(나중에 연결할 때 포트번호 일일히 기억하기 귀찮아서...)

그 다음, 우리는 Python으로 레디스에 접속하고, 큐를 매니징할 거니까 redis 라이브러리를 설치해야 한다.

pip3 install redis

그럼 기본적인 Redis 세팅은 끝!


3. 레디스 자료구조별 사용법 (Python)

3.1 큐 이름 설계 및 원자성 구현

먼저 코드에서 볼 수 있는 큐 이름 정의부터 살펴보자.

self.schedule_queue = "flight:schedule:queue"
self.schedule_processing = "flight:schedule:processing"
self.insert_flag_key = "flight:insert:flag"
self.proxy_queue = "proxy:list"
self.proxy_processing = "proxy:processing"
self.crawling_status_key = "crawling:status"

여기서 주목할 점은 두 가지다.

  1. 네이밍 컨벤션: flight:schedule:queue와 같이 콜론(:)으로 구분하는 방식이다. 이는 레디스에서 자주 사용되는 네이밍 패턴으로, 마치 폴더 구조처럼 키를 논리적으로 구성할 수 있다.
  2. 큐와 프로세싱 이원화: 각 큐에 대해 queueprocessing 두 개의 키를 사용한다. 이는 데이터 무결성 구현을 위한 것이다.

'데이터 무결성이 뭐길래 이렇게 복잡하게 해?'

간단히 말하면, 여러 크롤러가 동시에 작업할 때 스케줄이 무조건 처리된다는 것을 보장하기 위해서이다.

 

작업 절차는 이렇다:

  1. 작업 가져오기: queue에서 항목을 꺼내 processing으로 이동
  2. 작업 처리: 크롤러가 작업을 처리
  3. 결과 처리: 성공하면 processing에서 제거, 실패하면 다시 queue로 이동

이렇게 하면 중간에 크롤러가 스케줄을 가져갔다가 처리하지 못하고 꺼져도 처리중 큐에 들어있기 때문에 다시 원본 큐로 넣어서 처리하기만하면 모든 스케줄이 무조건 처리되도록 강제할 수 있다.

 

즉, 모든 데이터가 처리되도록, 완전 무결하게 메시지를 관리할 수 있게 된다는 것이다.

3.2 List 자료구조 활용 (메시지 큐 구현)

레디스에서 가장 많이 사용되는 자료구조 중 하나가 바로 List이다.
List는 FIFO(First In First Out) 방식으로 작동하는 큐를 구현하기에 완벽하다.

기본적으로 MQ 구현 시 필요한 기능은 다음과 같다:

  1. 큐에 메시지 추가 (push)
  2. 큐에서 메시지 가져오기 (pop)
  3. 처리 중인 메시지 관리
  4. 실패한 메시지 다시 큐에 넣기

내가 작성한 Redis_Manager 클래스를 보면 이런 기능들이 구현되어 있다:

def bulk_add_schedules(self, schedules):
    """
    여러 스케줄을 원자적으로 추가
    """
    pipe = self.redis_client.pipeline()
    for schedule in schedules:
        schedule_str = json.dumps(schedule)
        pipe.rpush(self.schedule_queue, schedule_str)

    pipe.execute()
    print(f"Successfully added {len(schedules)} schedules")
    return True

위 코드는 여러 스케줄을 한 번에 큐에 추가하는 함수다. 여기서 주목할 점은:

  1. rpush 명령어: 리스트의 오른쪽(뒤)에 값을 추가한다.
  2. pipeline(): 여러 명령을 한 번의 네트워크 요청으로 보내기 위한 기능이다.

큐에서 메세지를 가져오는건 'lpop'명령으로 가져올 수 있다. (rpush -> lpop)

def get_schedule(self):
    """
    스케줄 큐에서 하나의 스케줄을 가져와 처리 중 큐로 이동
    """
    # 원자적 명령을 위한 Lua 스크립트
    lua_script = """
    local item = redis.call('lpop', KEYS[1])
    if item then
        redis.call('rpush', KEYS[2], item)
        return item
    else
        return nil
    end
    """
    script = self.redis_client.register_script(lua_script)
    result = script(keys=[self.schedule_queue, self.schedule_processing])

    if result:
        return json.loads(result)
    else:
        return None

여기서는 Lua 스크립트를 사용해 레디스에서 lpop과 rpush 명령을 원자적으로 실행하고 있다.

 

'원자성이 뭔데?'

 

이후 포스팅에서 시간이 된다면 추가 설명을 하겠지만, 간단히 말하자면
작업이 쪼개지지 않고 한 번에 완벽하게 실행되거나, 아예 실행되지 않거나" 둘 중 하나만 되는 것
을 말한다.


레디스는 싱글 스레드로 동작하기 때문에, 아무리 동시에 요청을 하더라도, 무조건 순차적으로 스케줄 또는 응답을 반환한다.
하지만 나는 데이터 무결성을 위해 기본 큐와 처리 중 큐를 이원화했기 때문에,

  1. A 크롤러가 lpop으로 큐에서 항목을 가져가고, processing큐에 항목을 넣는 과정 사이에 B크롤러의 다른 명령이 끼어들 가능성이 있다.
  2. A크롤러가 lpop으로 기본 큐에서 스케줄을 가져간 이후 rpush로 processing 큐로 항목을 넘기기전에 갑자기 죽을 가능성이 있다.

1번의 경우는 사실 내 경우에 크게상관은 없다. 어차피 중복된 데이터를 처리할 경우의 수는 없으니까.
하지만 2번의 경우는 큰 문제가 된다. 앞에서 말했던 데이터 무결성이 깨지게 되니까.

 

그래서 2번의 경우의 수를 완전 차단해주는 방법이 'Lua 스크립트'인 것이다.

스케줄을 기본 큐에서 빼내고, 처리 큐로 넣는 과정을 완벽히 성공하지 못할거면, 차라리 둘다 하지마!

를 강제하고, 크롤러가 꺼지면 처리큐에있는걸 복구하고 다시 실행하면 그만이다 ㅎㅎ

3.3 실패 처리 및 복구 기능

그래서 처리중 큐에 있는 스케줄을 다시 원본 큐로 복귀시키는 기능을 추가로 구현했다.

def recover_schedule_processing(self):
    """
    schedule processing 큐에 있는 모든 항목을 원래 큐로 다시 이동
    """
    pipe = self.redis_client.pipeline()

    # Schedule processing 큐의 모든 항목 가져오기
    schedules = self.redis_client.lrange(self.schedule_processing, 0, -1)

    # Processing 큐 비우기
    pipe.delete(self.schedule_processing)

    # 원래 큐로 항목들 다시 추가
    for schedule in schedules:
        pipe.rpush(self.schedule_queue, schedule)

    pipe.execute()

    return len(schedules)

이 함수는 처리 중 큐에 있는 모든 항목을 다시 원래 큐로 되돌린다.

3.4 파이프라인 활용

코드를 보면 pipeline()을 자주 사용한 것을 볼 수 있다. 이건 왜 필요할까?

파이프라인은 여러 레디스 명령을 묶어서 한 번에 실행하는 기능이다. 이렇게 하면:

  1. 네트워크 지연 시간을 줄일 수 있다
  2. 여러 명령을 원자적으로 실행할 수 있다
def clear_schedule_queue(self):
    """
    스케줄 큐의 내용을 원자적으로 비움
    """
    # 파이프라인으로 원자적 실행
    pipe = self.redis_client.pipeline()
    pipe.llen(self.schedule_queue)
    pipe.llen(self.schedule_processing)
    pipe.delete(self.schedule_queue)
    pipe.delete(self.schedule_processing)
    results = pipe.execute()

    queue_length = results[0] + results[1]
    print(f"Current queue size before clearing: {queue_length}")
    print("Schedule queue content cleared successfully")

    return queue_length

위 코드는 큐의 길이를 확인하고, 큐를 비우는 작업을 한 번의 네트워크 요청으로 처리한다.

3.6 Sorted Set 활용 예시

레디스의 또 다른 강력한 자료구조는 Sorted Set이다. 각 항목에 점수(score)를 부여하여 정렬할 수 있다.

예를 들어, 우선순위가 있는 작업 큐를 구현할 수 있다:

def add_priority_task(self, task, priority):
    """
    우선순위가 있는 작업 추가 (낮은 숫자가 높은 우선순위)
    """
    task_str = json.dumps(task)
    self.redis_client.zadd('priority_tasks', {task_str: priority})

def get_priority_task(self):
    """
    가장 높은 우선순위(가장 낮은 점수)의 작업 가져오기
    """
    # 가장 낮은 점수의 항목 가져오기
    result = self.redis_client.zrange('priority_tasks', 0, 0, withscores=True)
    if result:
        task_str, score = result[0]
        # 가져온 항목 삭제
        self.redis_client.zrem('priority_tasks', task_str)
        return json.loads(task_str)
    return None

이런 방식으로 긴급한 작업(예: 인기있는 항공권 조합)을 더 높은 우선순위로 처리할 수 있다.

3.7 Pub/Sub 패턴 활용

레디스는 발행-구독(Pub/Sub) 패턴도 지원한다. 이는 이벤트 알림에 유용하다:

# 발행자 코드
def publish_event(self, channel, message):
    """
    특정 채널에 메시지 발행
    """
    message_str = json.dumps(message) if isinstance(message, dict) else message
    self.redis_client.publish(channel, message_str)

# 구독자 코드 (별도 스레드에서 실행)
def subscribe_events(self, channel):
    """
    특정 채널 구독하고 메시지 처리
    """
    pubsub = self.redis_client.pubsub()
    pubsub.subscribe(channel)

    for message in pubsub.listen():
        if message['type'] == 'message':
            data = message['data']
            print(f"Received: {data}")
            # 여기서 메시지 처리...

 

사실 sorted set과 sub pub는 아직 활용해본 적은 없다 ㅎㅎ

다음에 기회가 되면 써봐야지ㅎㅎ

 


4. 마무리

이번 포스팅에서는 레디스를 메시지 큐로 활용하는 방법에 대해 살펴봤다.

  1. 레디스 MQ의 활용 사례: 분산 크롤링 시스템에서의 작업 분배
  2. 도커를 이용한 레디스 세팅
  3. 파이썬을 이용한 레디스 자료구조별 활용법
    • 큐 이름 설계 및 원자성 구현 전략
    • List를 이용한 기본적인 큐 구현
    • Pipeline을 통한 명령 최적화
    • Lua 스크립트를 이용한 원자적 명령 실행
    • Sorted Set을 이용한 우선순위 큐
    • Pub/Sub 패턴을 이용한 이벤트 알림

결론적으로, 레디스는 단순한 키-값 저장소를 넘어 분산 시스템의 핵심 인프라로 활용할 수 있다.
특히 메시지 큐로서의 레디스는 시스템 간 통신, 작업 분배, 이벤트 처리 등 다양한 용도로 활용 가능하다.

이제 레디스가 왜 필요하고, 어떻게 사용하는지 감이 왔을 것이다.
다음 포스팅에서는 당초 이 포스팅의 목적이었던 에어플로우의 RedisSensor에 대해 알아보겠다.
에어플로우에서 레디스를 통해 어떻게 데이터 파이프라인을 모니터링하고 제어하는지 살펴볼 예정이다!

ㄱㅊㅁ_ㅇㅈ

Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.