기술 트렌드 & 새로운 학습
CompletableFuture
by DoRightting
2024. 11. 14.
1. 구현 사례
1.1 OpenAI API 비동기 처리
public class OpenAiApi {
public CompletableFuture<String> chatAsync(String systemPrompt, String userPrompt) {
return CompletableFuture.supplyAsync(() -> {
if (!rateLimiter.tryAcquire()) {
throw new RateLimitException("API 호출 한도 초과");
}
return chat(systemPrompt, userPrompt);
});
}
public CompletableFuture<double[]> embeddingsAsync(String text) {
return CompletableFuture.supplyAsync(() -> {
if (!rateLimiter.tryAcquire()) {
throw new RateLimitException("API 호출 한도 초과");
}
return embeddings(text);
});
}
}
1.2 프롬프트 서비스에서의 병렬 처리
@Service
public class PromptService {
@Transactional
public void processPrompt(PromptMessage message) {
try {
CompletableFuture<String> analysisFuture = openAiApi.chatAsync(
OpenAiApi.ANALYSIS_SYSTEM_PROMPT,
prompt.getOriginalPrompt()
);
CompletableFuture<double[]> embeddingFuture = openAiApi.embeddingsAsync(
prompt.getOriginalPrompt()
);
CompletableFuture<Void> allOf = CompletableFuture.allOf(
analysisFuture, embeddingFuture
);
// 45초 타임아웃 설정
allOf.get(45, TimeUnit.SECONDS);
String analysis = analysisFuture.get();
double[] embedding = embeddingFuture.get();
updatePromptWithResults(prompt, processedData, embedding);
} catch (Exception e) {
throw new PromptException("API 처리 중 오류 발생: " + e.getMessage());
}
}
}
1.3 알림 서비스의 비동기 처리
@Service
public class NotificationService {
private final EmailService emailService;
private final NCloudSmsService smsService;
public void sendRenewalReminder(Subscription subscription) {
String message = String.format(
"안녕하세요. %s님의 구독이 3일 후인 %s에 갱신될 예정입니다.",
subscription.getMemberId(),
subscription.getNextPaymentDate().toLocalDate()
);
try {
CompletableFuture<Void> emailFuture =
emailService.sendEmail(subscription.getMemberId(), "구독 갱신 예정 안내", message);
CompletableFuture<Void> smsFuture =
smsService.sendSms(subscription.getMemberId(), message);
CompletableFuture.allOf(emailFuture, smsFuture).join();
} catch (Exception e) {
log.error("알림 전송 실패", e);
}
}
}
프로젝트에서 CompletableFuture를 사용한 이유와 이점
- 병렬 처리 최적화
- OpenAI API 호출의 병렬 처리
- 전체 처리 시간 단축
- CompletableFuture<String> analysisFuture = openAiApi.chatAsync(...); CompletableFuture<double[]> embeddingFuture = openAiApi.embeddingsAsync(...); CompletableFuture.allOf(analysisFuture, embeddingFuture) .get(45, TimeUnit.SECONDS);
- 비동기 통신
- 알림 발송의 비동기 처리
- 메인 로직의 응답성 향상
- CompletableFuture<Void> emailFuture = emailService.sendEmail(...); CompletableFuture<Void> smsFuture = smsService.sendSms(...);
CompletableFuture의 예외 처리
- OpenAI API 호출
- RateLimitException 처리
- API 오류 예외 변환
- public CompletableFuture<String> chatAsync(String systemPrompt, String userPrompt) { return CompletableFuture.supplyAsync(() -> { try { if (!rateLimiter.tryAcquire()) { throw new RateLimitException("API 호출 한도 초과"); } return chat(systemPrompt, userPrompt); } catch (Exception e) { throw new CompletionException(e); } }); }
- 타임아웃 처리
- try { allOf.get(45, TimeUnit.SECONDS); } catch (TimeoutException e) { throw new PromptException("API 처리 시간 초과"); } catch (Exception e) { throw new PromptException("API 처리 중 오류 발생"); } ```"
CompletableFuture와 트랜잭션 관리 조합
- 트랜잭션 경계 설정
- @Transactional public void processPrompt(PromptMessage message) { try { // 비동기 작업 시작 CompletableFuture<String> future = openAiApi.chatAsync(...); // 트랜잭션 내에서 결과 처리 String result = future.get(45, TimeUnit.SECONDS); updatePromptWithResults(prompt, result); } catch (Exception e) { // 롤백 처리 throw new PromptException(e.getMessage()); } }
- 트랜잭션 격리
- // 비동기 작업은 별도 트랜잭션으로 처리 @Async public CompletableFuture<Void> sendEmail(...) { return CompletableFuture.runAsync(() -> { // 별도 트랜잭션에서 실행 }); } ```"
CompletableFuture의 성능 관리
- 타임아웃 설정
- // OpenAI API 호출에 45초 타임아웃 적용 allOf.get(45, TimeUnit.SECONDS);
- 비동기 풀 관리
- 적절한 스레드 풀 사용
- 리소스 사용량 모니터링
- // 알림 발송 비동기 처리 CompletableFuture.allOf( emailFuture, smsFuture ).join();
CompletableFuture 사용 시 주의한 점
- 리소스 관리
- try { CompletableFuture<String> future = openAiApi.chatAsync(...); return future.get(45, TimeUnit.SECONDS); } catch (TimeoutException e) { // 타임아웃 시 리소스 정리 future.cancel(true); throw new PromptException("처리 시간 초과"); }
- 예외 전파
- public CompletableFuture<String> chatAsync(...) { return CompletableFuture.supplyAsync(() -> { try { return chat(...); } catch (Exception e) { throw new CompletionException(e); } }); }
비동기 처리의 테스트
- 단위 테스트
- @Test public void testAsyncProcessing() { CompletableFuture<String> future = openAiApi.chatAsync(...); String result = future.get(5, TimeUnit.SECONDS); assertNotNull(result); }
- 통합 테스트
- @Test public void testParallelProcessing() { PromptMessage message = createTestMessage(); promptService.processPrompt(message); Prompt result = promptRepository.findById(message.getPromptId()); assertNotNull(result.getEmbeddingVector()); }