본문 바로가기
기술 트렌드 & 새로운 학습

CompletableFuture

by DoRightting 2024. 11. 14.

1. 구현 사례

1.1 OpenAI API 비동기 처리

public class OpenAiApi {
    public CompletableFuture<String> chatAsync(String systemPrompt, String userPrompt) {
        return CompletableFuture.supplyAsync(() -> {
            if (!rateLimiter.tryAcquire()) {
                throw new RateLimitException("API 호출 한도 초과");
            }
            return chat(systemPrompt, userPrompt);
        });
    }

    public CompletableFuture<double[]> embeddingsAsync(String text) {
        return CompletableFuture.supplyAsync(() -> {
            if (!rateLimiter.tryAcquire()) {
                throw new RateLimitException("API 호출 한도 초과");
            }
            return embeddings(text);
        });
    }
}

1.2 프롬프트 서비스에서의 병렬 처리

@Service
public class PromptService {
    @Transactional
    public void processPrompt(PromptMessage message) {
        try {
            CompletableFuture<String> analysisFuture = openAiApi.chatAsync(
                OpenAiApi.ANALYSIS_SYSTEM_PROMPT,
                prompt.getOriginalPrompt()
            );

            CompletableFuture<double[]> embeddingFuture = openAiApi.embeddingsAsync(
                prompt.getOriginalPrompt()
            );

            CompletableFuture<Void> allOf = CompletableFuture.allOf(
                analysisFuture, embeddingFuture
            );

            // 45초 타임아웃 설정
            allOf.get(45, TimeUnit.SECONDS);

            String analysis = analysisFuture.get();
            double[] embedding = embeddingFuture.get();

            updatePromptWithResults(prompt, processedData, embedding);
        } catch (Exception e) {
            throw new PromptException("API 처리 중 오류 발생: " + e.getMessage());
        }
    }
}

1.3 알림 서비스의 비동기 처리

@Service
public class NotificationService {
    private final EmailService emailService;
    private final NCloudSmsService smsService;

    public void sendRenewalReminder(Subscription subscription) {
        String message = String.format(
            "안녕하세요. %s님의 구독이 3일 후인 %s에 갱신될 예정입니다.",
            subscription.getMemberId(),
            subscription.getNextPaymentDate().toLocalDate()
        );

        try {
            CompletableFuture<Void> emailFuture =
                emailService.sendEmail(subscription.getMemberId(), "구독 갱신 예정 안내", message);
            CompletableFuture<Void> smsFuture =
                smsService.sendSms(subscription.getMemberId(), message);

            CompletableFuture.allOf(emailFuture, smsFuture).join();
        } catch (Exception e) {
            log.error("알림 전송 실패", e);
        }
    }
}

프로젝트에서 CompletableFuture를 사용한 이유와 이점

  1. 병렬 처리 최적화
    • OpenAI API 호출의 병렬 처리
    • 전체 처리 시간 단축
  2. CompletableFuture<String> analysisFuture = openAiApi.chatAsync(...); CompletableFuture<double[]> embeddingFuture = openAiApi.embeddingsAsync(...); CompletableFuture.allOf(analysisFuture, embeddingFuture) .get(45, TimeUnit.SECONDS);
  3. 비동기 통신
    • 알림 발송의 비동기 처리
    • 메인 로직의 응답성 향상
  4. CompletableFuture<Void> emailFuture = emailService.sendEmail(...); CompletableFuture<Void> smsFuture = smsService.sendSms(...);

CompletableFuture의 예외 처리

  1. OpenAI API 호출
    • RateLimitException 처리
    • API 오류 예외 변환
  2. public CompletableFuture<String> chatAsync(String systemPrompt, String userPrompt) { return CompletableFuture.supplyAsync(() -> { try { if (!rateLimiter.tryAcquire()) { throw new RateLimitException("API 호출 한도 초과"); } return chat(systemPrompt, userPrompt); } catch (Exception e) { throw new CompletionException(e); } }); }
  3. 타임아웃 처리
  4. try { allOf.get(45, TimeUnit.SECONDS); } catch (TimeoutException e) { throw new PromptException("API 처리 시간 초과"); } catch (Exception e) { throw new PromptException("API 처리 중 오류 발생"); } ```"

CompletableFuture와 트랜잭션 관리 조합

  1. 트랜잭션 경계 설정
  2. @Transactional public void processPrompt(PromptMessage message) { try { // 비동기 작업 시작 CompletableFuture<String> future = openAiApi.chatAsync(...); // 트랜잭션 내에서 결과 처리 String result = future.get(45, TimeUnit.SECONDS); updatePromptWithResults(prompt, result); } catch (Exception e) { // 롤백 처리 throw new PromptException(e.getMessage()); } }
  3. 트랜잭션 격리
  4. // 비동기 작업은 별도 트랜잭션으로 처리 @Async public CompletableFuture<Void> sendEmail(...) { return CompletableFuture.runAsync(() -> { // 별도 트랜잭션에서 실행 }); } ```"

CompletableFuture의 성능 관리

  1. 타임아웃 설정
    • API 타임아웃 관리
    • 리소스 낭비 방지
  2. // OpenAI API 호출에 45초 타임아웃 적용 allOf.get(45, TimeUnit.SECONDS);
  3. 비동기 풀 관리
    • 적절한 스레드 풀 사용
    • 리소스 사용량 모니터링
  4. // 알림 발송 비동기 처리 CompletableFuture.allOf( emailFuture, smsFuture ).join();

CompletableFuture 사용 시 주의한 점

  1. 리소스 관리
    • 타임아웃 시 리소스 정리
    • 메모리 누수 방지
  2. try { CompletableFuture<String> future = openAiApi.chatAsync(...); return future.get(45, TimeUnit.SECONDS); } catch (TimeoutException e) { // 타임아웃 시 리소스 정리 future.cancel(true); throw new PromptException("처리 시간 초과"); }
  3. 예외 전파
    • 명확한 예외 처리
    • 적절한 예외 변환
  4. public CompletableFuture<String> chatAsync(...) { return CompletableFuture.supplyAsync(() -> { try { return chat(...); } catch (Exception e) { throw new CompletionException(e); } }); }

비동기 처리의 테스트

  1. 단위 테스트
    • 비동기 동작 검증
    • 타임아웃 테스트
  2. @Test public void testAsyncProcessing() { CompletableFuture<String> future = openAiApi.chatAsync(...); String result = future.get(5, TimeUnit.SECONDS); assertNotNull(result); }
  3. 통합 테스트
    • 전체 비동기 플로우 검증
    • 결과 데이터 검증
  4. @Test public void testParallelProcessing() { PromptMessage message = createTestMessage(); promptService.processPrompt(message); Prompt result = promptRepository.findById(message.getPromptId()); assertNotNull(result.getEmbeddingVector()); }

'기술 트렌드 & 새로운 학습' 카테고리의 다른 글

코사인 유사도 검색  (1) 2024.11.18
IVFFlat  (0) 2024.11.17
Rate Limiting  (0) 2024.11.13