- 주요 키워드
- Producer : 메시지 발송 주체
- Queue에 저장되는데, Producer에 직접 접근하지 않고 Exchange를 통해 접근
- Consumer : 메시지 수신 주체
- Queue에 직접 접근하여 메시지를 가져옴
- Queue
- Producer들이 발송한 메시지들을 Consumer가 소비하기 전까지 보관되는 장소
- Queue는 singleton으로 생성해야 함!
- Broker
- Producer로부터 메시지를 수신하여 Consumer에게 라우팅하는 소프트웨어
- Exchange : 메시지의 라우터 역할. 4가지 유형이 있다.
- Producer : 메시지 발송 주체
- Direct Exchange:
- 간단하고 효율적인 라우팅
- 특정 큐로 정확한 메시지 전달
- 구현과 이해가 쉬움
- 라우팅 키가 정확히 일치해야 함
- 복잡한 라우팅 패턴에는 적합하지 않음
- Fanout Exchange:
- 모든 바인딩된 큐에 메시지를 브로드캐스트
- 매우 빠른 성능 (라우팅 결정이 필요 없음)
- 간단한 pub/sub 패턴 구현에 적합
- 메시지를 선택적으로 받을 수 없음
- 모든 바인딩된 큐가 모든 메시지를 받음 (필터링 불가)
- Topic Exchange:
- 유연한 라우팅 패턴 (와일드카드 사용 가능)
- 복잡한 라우팅 시나리오 처리 가능
- 메시지를 여러 큐로 선택적 라우팅 가능
- 설정이 복잡할 수 있음
- 라우팅 키 설계에 주의 필요
- Direct Exchange보다 성능이 약간 떨어질 수 있음
- Headers Exchange:
- 속성 기반 라우팅 (키-값 쌍 사용)
- 라우팅 키 대신 메시지 속성으로 라우팅
- 복잡한 라우팅 조건을 유연하게 처리
- 다른 exchange 유형보다 성능 오버헤드가 큼
- 설정과 사용이 더 복잡함
- 라우팅 키 기반 패턴에 익숙한 개발자에게는 직관적이지 않을 수 있음
- 동작방식
//Config를 통해 Quere, Exchange(나의 경우엔 Direct Exchange), Binding을 정의함.
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "game-resource-queue";
public static final String EXCHANGE_NAME = "game-resource-exchange";
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, true);
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME);
}
}
//RabbitMQInitializer를 통해서 binding 함.
@Component
@Slf4j
public class RabbitMQInitializer {
private final AmqpAdmin amqpAdmin;
@Autowired
public RabbitMQInitializer(AmqpAdmin amqpAdmin) {
this.amqpAdmin = amqpAdmin;
}
@PostConstruct
public void initialize() {
try {
log.info("Starting RabbitMQ initialization...");
log.info("Declaring exchange: {}", RabbitMQConfig.EXCHANGE_NAME);
Exchange exchange = new DirectExchange(RabbitMQConfig.EXCHANGE_NAME);
amqpAdmin.declareExchange(exchange);
log.info("Declaring queue: {}", RabbitMQConfig.QUEUE_NAME);
Queue queue = new Queue(RabbitMQConfig.QUEUE_NAME, true, false, false);
amqpAdmin.declareQueue(queue);
log.info("Declaring binding between {} and {}", RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.QUEUE_NAME);
Binding binding = new Binding(RabbitMQConfig.QUEUE_NAME, Binding.DestinationType.QUEUE,
RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.QUEUE_NAME, null);
amqpAdmin.declareBinding(binding);
log.info("RabbitMQ initialization completed successfully");
} catch (Exception e) {
log.error("Failed to initialize RabbitMQ", e);
throw new RuntimeException("Failed to initialize RabbitMQ", e);
}
}
}
//producer 생성 예제
@Slf4j
@Component
public class GameResourceProducer {
private final RabbitTemplate rabbitTemplate;
public GameResourceProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String message) {
try {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.QUEUE_NAME, message);
log.info("Message sent successfully to exchange: {}", message);
} catch (AmqpException e) {
log.error("Failed to send message: {}", message, e);
}
}
public void sendGameResourceCreationMessage(GameResourceCreationMessage message) {
try {
rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_NAME, message);
log.info("게임 리소스 생성 메시지를 큐에 전송: {}", message);
} catch (AmqpException e) {
log.error("게임 리소스 생성 메시지를 큐에 전송 실패", e);
// 여기에 재시도 메커니즘 구현 또는 사용자 정의 예외 throw 가능
}
}
}
// consumer 생성 예제
@Slf4j
@Component
public class GameResourceConsumer {
private final GameResourceService gameResourceService;
private final FastAPIClientService fastAPIClientService;
private final ObjectMapper objectMapper;
public GameResourceConsumer(GameResourceService gameResourceService, FastAPIClientService fastAPIClientService, ObjectMapper objectMapper) {
this.gameResourceService = gameResourceService;
this.fastAPIClientService = fastAPIClientService;
this.objectMapper = objectMapper;
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void processGameResourceCreation(String messageJson, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
log.debug("Received message: {}", messageJson);
// 이중 JSON 인코딩 처리
if (messageJson.startsWith("\"") && messageJson.endsWith("\"")) {
messageJson = messageJson.substring(1, messageJson.length() - 1);
}
messageJson = messageJson.replace("\\\"", "\"");
GameResourceCreationMessage message = objectMapper.readValue(messageJson, GameResourceCreationMessage.class);
log.info("Parsed message for task: {}. File name: {}", message.getTaskId(), message.getFileName());
processGameResourceCreationMessage(message);
channel.basicAck(tag, false);
log.info("Message processed successfully for task: {}", message.getTaskId());
} catch (Exception e) {
log.error("Error processing message: " + messageJson, e);
try {
channel.basicNack(tag, false, false);
log.warn("Message sent to DLQ: {}", tag);
} catch (IOException ioException) {
log.error("Error sending nack", ioException);
}
}
}
private void processGameResourceCreationMessage(GameResourceCreationMessage message) {
try {
MultipartFile file = gameResourceService.convertToMultipartFile(message);
Map<String, Object> result = fastAPIClientService.generateImage(file, message.getRequest().getPrompt());
String taskId = message.getTaskId();
gameResourceService.processTaskUpdate(taskId, result);
while (true) {
Map<String, Object> status = fastAPIClientService.getTaskStatus((String) result.get("task_id"));
gameResourceService.processTaskUpdate(taskId, status);
if ("completed".equals(status.get("status")) || "failed".equals(status.get("status"))) {
break;
}
Thread.sleep(5000);
}
} catch (Exception e) {
log.error("Error processing game resource creation message", e);
}
}
}
- 동작 순서 시각화
exchange와 binding이 처리되는 과정
- Queue 아키텍쳐 구축을 통해 생기는 이점.
- 비동기 처리:
- Service가 메시지를 큐에 보내고 즉시 응답할 수 있습니다.
- 긴 작업(예: 이미지 생성)은 백그라운드에서 Consumer에 의해 처리됩니다.
- 이는 시스템의 응답성을 크게 향상시키고, 클라이언트의 대기 시간을 줄입니다.
- 시스템 분리 및 확장성:
- 리소스 생성 요청(Service)과 실제 처리(Consumer)가 분리됩니다.
- 필요에 따라 여러 개의 컨슈머를 추가하여 처리량을 쉽게 늘릴 수 있습니다.
- 안정성 및 내구성:
- RabbitMQ가 메시지를 저장하므로, 처리 중 시스템 장애가 발생해도 메시지가 손실되지 않습니다.
- 재시작 후에도 미처리된 메시지를 처리할 수 있습니다.
- 부하 관리:
- 큐를 통해 갑작스러운 요청 증가를 완충할 수 있습니다.
- 시스템 용량을 초과하는 요청도 순차적으로 처리할 수 있습니다.
- 모니터링 및 추적:
- RabbitMQInitializer와 로깅을 통해 메시지 흐름을 쉽게 모니터링할 수 있습니다.
- 문제 발생 시 디버깅이 용이해집니다.
- 유연한 아키텍처:
- RabbitMQConfig를 통해 exchange와 queue 설정을 중앙에서 관리할 수 있습니다.
- 필요에 따라 다양한 유형의 exchange를 쉽게 추가하거나 변경할 수 있습니다.
- 작업 상태 추적:
- Service에서 구현된 작업 상태 추적 메커니즘(taskStatusMap)을 통해 각 작업의 진행 상황을 실시간으로 확인할 수 있습니다.
- 오류 처리 향상:
- Consumer에서의 예외 처리를 통해 개별 메시지 처리 실패가 전체 시스템에 영향을 미치지 않도록 합니다.
- 서비스 간 결합도 감소:
- 메시지 큐를 사용함으로써 Service와 실제 처리 로직 간의 직접적인 의존성이 줄어듭니다.
- 배치 처리 가능성:
- 필요한 경우, 여러 메시지를 한 번에 처리하는 배치 처리를 구현하기 쉬워집니다.
- 비동기 처리: