본문 바로가기

카테고리 없음

대용량 알림 개선기 (3) - RabbitMQ Consumer 서버 분리

0. 개요

해당 글은 < 대용량 알림 개선기 (2) > 에 이어지는 글입니다

 

 

이전 글에서는 Spring Application Server 내에서 RabbitMQ를 도입하며

 

BlockingQueue + 동기 전송 + RateLimiter 조합을 통해

 

(2) 구조


내부 흐름 제어 및 확장을 시도해보았습니다.

 

 

 

하지만 다시 생각해보니, 메시지 큐의 도입 목적 자체가 알림 발송의 완급 조절을 위함이었는데요.

 

즉, 알림 전송 대상들을 바로 FCM으로 전송하는 것이 아니라,

 

먼저 큐에 적재함으로써 전송 부담을 줄이고,

 

실제 발송은 별도로 분리된 RabbitMQ Consumer 서버에서 처리하는 구조로 설계하는 것이 맞다고 생각해

 

발행(Producer) 애플리케이션과 소비(Consumer) 애플리케이션을 분리한 스케일 아웃 구조로 개선해보았습니다 !

 

 

 


1. RabbitMQ Consumer App 분리 및 DLQ 로직 설계

도입하고자 하는 흐름

 

 

@Configuration
public class RabbitMQConfig {

    ...

    /**
     * Durable Queue - true : 재시작 후에도 queue 메세지  유지
     * @return
     */
    @Bean
    public Queue mainQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", deadLetterExchangeName);
        args.put("x-dead-letter-routing-key", deadLetterRoutingKey);
        return QueueBuilder.durable(queueName).withArguments(args).build();
    }


    /**
     * Dead Letter Queue 설정
     * @return
     */
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(deadLetterQueueName).build();
    }

    @Bean
    public DirectExchange mainExchange() {
        return new DirectExchange(exchangeName);
    }

    // DLQ 익스체인지
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(deadLetterExchangeName);
    }

    @Bean
    public Binding mainBinding() {
        return BindingBuilder.bind(mainQueue()).to(mainExchange()).with(routingKey);
    }

    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterRoutingKey);
    }

    /**
     * TTL 후 mainQueue 로 메시지를 다시 던져줌.
     * 그걸 다시 @RabbitListener 가 받아서 재시도 → 횟수 초과하면 DLQ 로 적재
     * @return
     */
    @Bean
    public Queue retryQueue() {
        return QueueBuilder.durable(retryQueueName)
                .withArgument("x-dead-letter-exchange", exchangeName)
                .withArgument("x-dead-letter-routing-key", routingKey)
                .withArgument("x-message-ttl", 3000)
                .build();
    }

    @Bean
    public Binding retryBinding() {
        return BindingBuilder.bind(retryQueue()).to(deadLetterExchange()).with(retryRoutingKey);
    }

    ...

    /**
     * set Mandatory true : 메모리에 저장되기 때문에 서버가 죽으면 휘발되는 문제
     * 따라서 메세지를 persistence 하게 전송
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }


	...

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            SimpleRabbitListenerContainerFactoryConfigurer configurer) {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);

        factory.setMessageConverter(jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(1);
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);

        return factory;
    }
}

 

Main Queue, Retry Queue, Dead Letter Queue는

 

서로 역할이 다르기 때문에 각각의 세 개의 큐로 분리하였습니다.

 

 

그리고 Retry Queue와 Dead Letter Queue는 메시지 처리 실패 시의 흐름에서 함께 사용되기 때문에,

 

공통된 목적을 가진 흐름상 연결 구조를 가지고 있습니다.

 

 

메시지가 처리 도중 실패하면, 재시도 횟수(x-retry-count)를 확인하여

  • 재시도 가능한 경우에는 Retry Queue로,
  • 재시도 횟수를 초과한 경우에는 Dead Letter Queue로
    이동하는 구조를 가지도록 설계했습니다 !

 

이처럼 Retry Queue, Dead Letter Queue모두 실패 메시지를 분기 처리하는 한 흐름 안에 존재하기 때문에

 

하나의 deadLetterExchange를 공유하면서 Routing Key만 다르게 설정해줬습니다.

 

 

즉, 공통된 실패 처리 흐름 내에서 메시지를 조건에 따라 라우팅하기 때문에,

 

하나의 Exchange를 공유하는 것이 구조적으로 효율적이라고 생각했습니다.

 

@RabbitListener(queues = "${rabbitmq.queue.name}", containerFactory = "rabbitListenerContainerFactory")
    public void consumeFcmMessage(FcmTokenRequestDto dto, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            if ("FAIL".equalsIgnoreCase(dto.getTitle())) {
                throw new RuntimeException("테스트용 강제 실패 발생!");
            }
            // 정상 메시지 전송 시도
            firebaseService.sendMessage(dto);
            channel.basicAck(deliveryTag, false);
        }

        catch (Exception e) {
            Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get("x-retry-count");
            if (retryCount == null) retryCount = 0;

            if (retryCount < 3) {
                // 3번까지 재시도
                log.warn("🔁 재시도 메시지 전송 to Retry Queue: dto={}, retryCount={}", dto, retryCount);

                // 재시도 횟수 증가시켜줌
                MessageProperties newProps = new MessageProperties();
                newProps.setContentType(message.getMessageProperties().getContentType());
                newProps.getHeaders().putAll(message.getMessageProperties().getHeaders());
                newProps.setHeader("x-retry-count", retryCount + 1);

                Message newMessage = new Message(message.getBody(), newProps);

                // Retry Queue로 전송
                rabbitTemplate.send(
                        deadLetterExchangeName,  // retry 큐를 연결할 익스체인지
                        retryRoutingKey,         // retry 큐의 라우팅 키
                        newMessage
                );

                channel.basicAck(deliveryTag, false);
            } else {
                log.error("❌ 재시도 실패! 메시지 폐기, DLQ로 이동: dto={}, error={}", dto, e.getMessage());
                // 더 이상 재시도하지 않고 DLQ로 보내기
                sendToDeadLetterQueue(dto, message);
                channel.basicAck(deliveryTag, false);
            }
        }

    }

 

[RabbitMQ Main Queue]
       |
       v
[consumeFcmMessage() 호출]
       |
       v
[Exception 발생] -----> 예 -----> 예외 발생 (테스트 실패)
       |                          |
       |                          v
       |                     [재시도 로직]
       |                          |
       |               [x-retry-count >= 3 ?]
       |                      |             |
       |                     아니오         예
       |                      |             |
       |                      v             v
       |                  Retry Queue 전송  DLQ 전송
       |                  
       |                         
       |
       v
[정상 메시지 처리 시도 → firebaseService.sendMessage()]

 

 

Retry Queue 재시도를 위한 지연 큐, 3초 TTL 후 Main Queue로 메시지를 다시 던져주고


그걸 다시 @RabbitListener가 받아서 재시도 → 3회 Retry Count 초과하면 최종적으로 DLQ에 적재되는 흐름입니다.

 

 

DLQ Rabbit Listener
서버 강제 종료 후에도 남아있는 message들

 

 

서버를 강제 종료 후 재시작 하여도

 

DLQ에 메세지들이 유실되지 않고 잘 저장되어 있음을 확인할 수 있었습니다.

 

 

 

또한 재시도에 실패한 메세지가 최종적으로 DLQ에 적재되면

 

Slack 채널에 알림이 가도록 webhook url을 설정해두었습니다.

 

 

 

< 자세한 Dead Letter Queue 처리 흐름 >

[RabbitMQ DLQ Queue]
       |
       v
[handleDeadLetter() 호출]
       |
       v
[Redis에 메시지 ID 존재?] -----> 예 -----> 중복 Slack 전송 방지 → return
       |
      아니오
       |
       v
Slack으로 전송 시도
       |
       v
[성공] → Redis에 메시지 ID 저장 (중복 방지)
[실패] → 로그만 출력 (재처리 없음)

 

    @RabbitListener(queues = "${rabbitmq.queue.dead-letter.name}", containerFactory = "rabbitListenerContainerFactory")
    public void handleDeadLetter(FcmTokenRequestDto dto) {
        String messageId = dto.getId();

        // Redis에서 이미 처리된 메시지인지 확인
        if (redisService.isMessageProcessed(messageId)) {
            log.warn("메시지가 이미 처리되었습니다. 중복 처리 방지. dto={}", dto);
            return;
        }

        try { // 메시지 처리 로직 (예: Slack에 메시지 전송)
            slackService.sendMessage(dto);
            log.info("DLQ에서 메시지 수신 및 처리 완료: dto={}", dto);
            redisService.markMessageProcessed(messageId);
        } catch (Exception e) {
            log.error("DLQ 처리 중 오류 발생: dto={}, error={}", dto, e.getMessage());
        }
    }

 

 

RabbitListener는 해당 Queue 내부에 메시지가 존재하기만 하면 메서드가 자동으로 바로 호출되기 때문에

 

서버 재시작 후 Slack 전송과 같은 메서드가 중복 처리되는 것을 방지하기 위해

 

Redis에 메세지 ID를 기준으로 존재 여부를 먼저 확인해주는 흐름을 추가해주었습니다.

 

 

 

실패 테스트 케이스를 발생시켜줘서 확인해 본 결과

 

TTL 3초 간격으로 3회 재시도 후 DLQ에 메세지를 잘 전송한 후

 

Slack 채널에도 알림이 잘 갔음을 확인할 수 있었습니다.

 

 

 


 

2. 마무리

 

 

CPU 평균 사용률 

 

Producer Server : 11%대

Consumer Server : 1%대

 

CPU 최대 사용률 

 

Producer Server : 30%대

Consumer Server : 3%대

 

 

 

대용량 알림 개선기 (1) ~ (3)을 작성하면서 가장 안정적인 수준을 보였는데요.

 

메시지 큐 기반의 비동기 처리 구조로 ​서버의 부하를 최소화할 수 있었던 것 같습니다.

 

 

특히 Consumer를 별도 서비스로 분리한 뒤부터는, 알림 처리 과정에서의 병목 현상이 완전히 줄었고

 

전체적인 응답 속도와 안정성 면에서도 훨씬 나아졌습니다.