본문 바로가기
개발환경 & 도구

RabbitMQ 메시지 큐 동작

by DoRightting 2024. 10. 20.
  • 주요 키워드
    • Producer : 메시지 발송 주체
      • Queue에 저장되는데, Producer에 직접 접근하지 않고 Exchange를 통해 접근
    • Consumer : 메시지 수신 주체
      • Queue에 직접 접근하여 메시지를 가져옴
    • Queue
      • Producer들이 발송한 메시지들을 Consumer가 소비하기 전까지 보관되는 장소
      • Queue는 singleton으로 생성해야 함!
    • Broker
      • Producer로부터 메시지를 수신하여 Consumer에게 라우팅하는 소프트웨어
    • Exchange : 메시지의 라우터 역할. 4가지 유형이 있다.
  • Direct Exchange:
    • 간단하고 효율적인 라우팅
    • 특정 큐로 정확한 메시지 전달
    • 구현과 이해가 쉬움
    단점:
    • 라우팅 키가 정확히 일치해야 함
    • 복잡한 라우팅 패턴에는 적합하지 않음
    사용 사례: 특정 작업 큐로 작업을 분배할 때 (예: 특정 서비스로 요청 라우팅)
  • Fanout Exchange:
    • 모든 바인딩된 큐에 메시지를 브로드캐스트
    • 매우 빠른 성능 (라우팅 결정이 필요 없음)
    • 간단한 pub/sub 패턴 구현에 적합
    단점:
    • 메시지를 선택적으로 받을 수 없음
    • 모든 바인딩된 큐가 모든 메시지를 받음 (필터링 불가)
    사용 사례: 실시간 알림, 로그 브로드캐스팅, 채팅 애플리케이션
  • Topic Exchange:
    • 유연한 라우팅 패턴 (와일드카드 사용 가능)
    • 복잡한 라우팅 시나리오 처리 가능
    • 메시지를 여러 큐로 선택적 라우팅 가능
    단점:
    • 설정이 복잡할 수 있음
    • 라우팅 키 설계에 주의 필요
    • Direct Exchange보다 성능이 약간 떨어질 수 있음
    사용 사례: 다양한 기준으로 메시지를 필터링해야 하는 경우 (예: 로그 레벨별, 지역별 데이터 처리)
  • Headers Exchange:
    • 속성 기반 라우팅 (키-값 쌍 사용)
    • 라우팅 키 대신 메시지 속성으로 라우팅
    • 복잡한 라우팅 조건을 유연하게 처리
    단점:
    • 다른 exchange 유형보다 성능 오버헤드가 큼
    • 설정과 사용이 더 복잡함
    • 라우팅 키 기반 패턴에 익숙한 개발자에게는 직관적이지 않을 수 있음
    사용 사례: 여러 속성을 기반으로 메시지를 라우팅해야 하는 복잡한 시스템 (예: 다중 조건 기반 작업 분배)
    • 동작방식 

//Config를 통해 Quere, Exchange(나의 경우엔 Direct Exchange), Binding을 정의함.

@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_NAME = "game-resource-queue";
    public static final String EXCHANGE_NAME = "game-resource-exchange";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true);
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
    }
}

 

 

//RabbitMQInitializer를 통해서 binding 함.

@Component
@Slf4j
public class RabbitMQInitializer {
    private final AmqpAdmin amqpAdmin;

    @Autowired
    public RabbitMQInitializer(AmqpAdmin amqpAdmin) {
        this.amqpAdmin = amqpAdmin;
    }

    @PostConstruct
    public void initialize() {
        try {
            log.info("Starting RabbitMQ initialization...");

            log.info("Declaring exchange: {}", RabbitMQConfig.EXCHANGE_NAME);
            Exchange exchange = new DirectExchange(RabbitMQConfig.EXCHANGE_NAME);
            amqpAdmin.declareExchange(exchange);

            log.info("Declaring queue: {}", RabbitMQConfig.QUEUE_NAME);
            Queue queue = new Queue(RabbitMQConfig.QUEUE_NAME, true, false, false);
            amqpAdmin.declareQueue(queue);

            log.info("Declaring binding between {} and {}", RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.QUEUE_NAME);
            Binding binding = new Binding(RabbitMQConfig.QUEUE_NAME, Binding.DestinationType.QUEUE,
                    RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.QUEUE_NAME, null);
            amqpAdmin.declareBinding(binding);

            log.info("RabbitMQ initialization completed successfully");
        } catch (Exception e) {
            log.error("Failed to initialize RabbitMQ", e);
            throw new RuntimeException("Failed to initialize RabbitMQ", e);
        }
    }
}

 

//producer 생성 예제

@Slf4j
@Component
public class GameResourceProducer {

    private final RabbitTemplate rabbitTemplate;

    public GameResourceProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String message) {
        try {
            rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.QUEUE_NAME, message);
            log.info("Message sent successfully to exchange: {}", message);
        } catch (AmqpException e) {
            log.error("Failed to send message: {}", message, e);
        }
    }

    public void sendGameResourceCreationMessage(GameResourceCreationMessage message) {
        try {
            rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, message);
            log.info("게임 리소스 생성 메시지를 큐에 전송: {}", message);
        } catch (AmqpException e) {
            log.error("게임 리소스 생성 메시지를 큐에 전송 실패", e);
            // 여기에 재시도 메커니즘 구현 또는 사용자 정의 예외 throw 가능
        }
    }
}



// consumer 생성 예제
@Slf4j
@Component
public class GameResourceConsumer {

    private final GameResourceService gameResourceService;
    private final FastAPIClientService fastAPIClientService;
    private final ObjectMapper objectMapper;

    public GameResourceConsumer(GameResourceService gameResourceService, FastAPIClientService fastAPIClientService, ObjectMapper objectMapper) {
        this.gameResourceService = gameResourceService;
        this.fastAPIClientService = fastAPIClientService;
        this.objectMapper = objectMapper;
        this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void processGameResourceCreation(String messageJson, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            log.debug("Received message: {}", messageJson);

            // 이중 JSON 인코딩 처리
            if (messageJson.startsWith("\"") && messageJson.endsWith("\"")) {
                messageJson = messageJson.substring(1, messageJson.length() - 1);
            }
            messageJson = messageJson.replace("\\\"", "\"");

            GameResourceCreationMessage message = objectMapper.readValue(messageJson, GameResourceCreationMessage.class);
            log.info("Parsed message for task: {}. File name: {}", message.getTaskId(), message.getFileName());

            processGameResourceCreationMessage(message);

            channel.basicAck(tag, false);
            log.info("Message processed successfully for task: {}", message.getTaskId());
        } catch (Exception e) {
            log.error("Error processing message: " + messageJson, e);
            try {
                channel.basicNack(tag, false, false);
                log.warn("Message sent to DLQ: {}", tag);
            } catch (IOException ioException) {
                log.error("Error sending nack", ioException);
            }
        }
    }

    private void processGameResourceCreationMessage(GameResourceCreationMessage message) {
        try {
            MultipartFile file = gameResourceService.convertToMultipartFile(message);
            Map<String, Object> result = fastAPIClientService.generateImage(file, message.getRequest().getPrompt());

            String taskId = message.getTaskId();
            gameResourceService.processTaskUpdate(taskId, result);

            while (true) {
                Map<String, Object> status = fastAPIClientService.getTaskStatus((String) result.get("task_id"));
                gameResourceService.processTaskUpdate(taskId, status);

                if ("completed".equals(status.get("status")) || "failed".equals(status.get("status"))) {
                    break;
                }

                Thread.sleep(5000);
            }
        } catch (Exception e) {
            log.error("Error processing game resource creation message", e);
        }
    }
}

 

  • 동작 순서 시각화
    •  

 

 

exchange와 binding이 처리되는 과정

 

  • Queue 아키텍쳐 구축을 통해 생기는 이점.
    1. 비동기 처리:
      • Service가 메시지를 큐에 보내고 즉시 응답할 수 있습니다.
      • 긴 작업(예: 이미지 생성)은 백그라운드에서 Consumer에 의해 처리됩니다.
      • 이는 시스템의 응답성을 크게 향상시키고, 클라이언트의 대기 시간을 줄입니다.
    2. 시스템 분리 및 확장성:
      • 리소스 생성 요청(Service)과 실제 처리(Consumer)가 분리됩니다.
      • 필요에 따라 여러 개의 컨슈머를 추가하여 처리량을 쉽게 늘릴 수 있습니다.
    3. 안정성 및 내구성:
      • RabbitMQ가 메시지를 저장하므로, 처리 중 시스템 장애가 발생해도 메시지가 손실되지 않습니다.
      • 재시작 후에도 미처리된 메시지를 처리할 수 있습니다.
    4. 부하 관리:
      • 큐를 통해 갑작스러운 요청 증가를 완충할 수 있습니다.
      • 시스템 용량을 초과하는 요청도 순차적으로 처리할 수 있습니다.
    5. 모니터링 및 추적:
      • RabbitMQInitializer와 로깅을 통해 메시지 흐름을 쉽게 모니터링할 수 있습니다.
      • 문제 발생 시 디버깅이 용이해집니다.
    6. 유연한 아키텍처:
      • RabbitMQConfig를 통해 exchange와 queue 설정을 중앙에서 관리할 수 있습니다.
      • 필요에 따라 다양한 유형의 exchange를 쉽게 추가하거나 변경할 수 있습니다.
    7. 작업 상태 추적:
      • Service에서 구현된 작업 상태 추적 메커니즘(taskStatusMap)을 통해 각 작업의 진행 상황을 실시간으로 확인할 수 있습니다.
    8. 오류 처리 향상:
      • Consumer에서의 예외 처리를 통해 개별 메시지 처리 실패가 전체 시스템에 영향을 미치지 않도록 합니다.
    9. 서비스 간 결합도 감소:
      • 메시지 큐를 사용함으로써 Service와 실제 처리 로직 간의 직접적인 의존성이 줄어듭니다.
    10. 배치 처리 가능성:
      • 필요한 경우, 여러 메시지를 한 번에 처리하는 배치 처리를 구현하기 쉬워집니다.