-
병렬 프로세스 최적화_CompletableFutureJava 2025. 1. 11. 15:58
저희 회사는 SNS 데이터를 다룹니다.
데이터의 제목, 내용 길이가 매우 짧은 것부터 긴 것까지 다양한 형태의 데이터들을 처리하고 관리합니다.
재색인 프로세스를 Airflow에 적용하기 위한 검토 과정에서 문제점을 발견하여 최적화를 진행해 보았는데요..!
나름 성공적인 결과를 낸 것 같아 최적화 과정을 공유해드리려고 합니다.
현재 프로세스는 Elasticsearch에서 데이터를 가져와 해당 문서를 분석하고
다시 Elasticsearch로 재색인 하는 과정을 진행하고 있습니다.
그 중 시간이 가장 오래걸리는 텍스트 분석 과정에서 병렬처리를 하고 있는데요! 코드는 아래와 같습니다.
기존 병렬 프로세스 코드
CompletableFuture<List<RDataResultDTO>> analysisDataFuture = CompletableFuture.supplyAsync(() -> dataList.parallelStream() // 연관어 + 감성어 .map(data -> dao.getAnalyzedRDataDTO(data, searchDate)) .collect(Collectors.toList()) );time : 43~46 sec
개선방안
무작위로 들어오는 텍스트 길이(분석 대상)를 고려할 때, work stealing으로 인한 빈번한 Context Switching이 단일 스레드보다 성능이 저하되지는 않을지, 또는 스레드 수를 고정하는 것이 더 효율적일지에 대해 테스트를 진행했습니다.
분석 결과, I/O 집중 작업보다 CPU 집중 작업의 비중이 높다는 것을 확인했으며, 스레드 풀을 미리 생성하여 고정된 스레드로 처리했을 때 성능이 향상되는 것을 검증했습니다.
하지만 모니터링을 보며 의아했던 점은 worker Thread Running time 입니다.

Worker Thread 실행 시간 : 전체 실행 시간의 약 23%
모니터링을 보고 느낀점은 "일 편하게 하네?" 였습니다.
현재는 분석 단계에서만 병렬 프로세스를 활용하고 있습니다. 하지만 병렬 작업을 분석에서 색인 과정까지 확대하면 처리 속도를 개선하고 CPU를 더욱 효율적으로 사용할 수 있을 것으로 판단했습니다. CPU 사용률과 메모리 활용도가 높지 않은 상태였기에, 시스템에 추가 부하를 주어도 충분히 감당할 수 있을 것으로 보였습니다.
변경사항
- 리팩토링 (메서드 세분화)
- CompletableFuture의 thenAccept를 활용한 비동기작업 동기처리
List<CompletableFuture<Void>> futuresList = new ArrayList<>(); while (check.get()) { List<RDataResultDTO> docs = getDocument(searchDate, instance, distribute); if(docs.isEmpty()) { break; } int size = docs.size(); lastSeq = docs.get(size - 1).getLo_seq_org(); CompletableFuture<Void> pipeline = CompletableFuture.supplyAsync(() -> doAnalysis(docs),forkJoinPool) .thenAccept(this::postAnalyzedData); futuresList.add(pipeline); // 모든 작업을 리스트에 저장 } // 모든 CompletableFuture 작업이 완료될 때까지 대기 CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0])).join();
테스트 결과
기존 대비 65% 속도 향상
리팩토링 후 적용된 병렬처리 방식으로 처리 속도가 기존 40초대에서 16초로 65% 향상되었습니다.

열심히 일하는 스레드 다만, 이 방식은 모든 task를 작업 Queue에 넣고 시작하기에 시스템 부하를 고려한 코드 설계가 필요합니다.
텍스트 길이가 5000자에서 9000자인 21,220개의 문서를 34분간 색인해 보았을때

힙 메모리가 지속적으로 증가하는 현상을 확인했으며, 최종적으로 메모리가 감소하긴 했으나 이를 안정적으로 관리하기 위한 코드 개선이 필요했습니다.
추가된 코드
// 가용 프로세서 확인 int processors = Runtime.getRuntime().availableProcessors()-2; System.out.println("Processor : " + processors); // 작업 예정 Task 수 관리 변수 int getQueuedTaskCount = processors*2; while (check.get()) { // 기존 코드 ... // 추가 if (futuresList.size() >= 50) { /// task를 비워줌으로써 heap을 살짝 덜먹음 CompletableFuture<Void> allFutures = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0])); allFutures.join(); futuresList.clear(); } } if (!futuresList.isEmpty()) { CompletableFuture<Void> allRemainingFutures = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0])); allRemainingFutures.join(); }
CPU와 메모리 사용률을 조정 후 테스트 결과 (최종)
- 왼쪽 (오리지널) : Uptime: 6 min 15 sec / Heap Size: 3,302,490,112B
- 오른쪽 (튜닝) : Uptime: 4 min 38 sec / Heap Size: 3,615,490,048B
시스템 부하를 고려하지 않았을 때는 초반 Used Heap 사이즈가 3GB를 가볍게 넘었었지만 작업량 관리 코드를 추가함으로써 개선된 모습을 볼 수 있었습니다.
지금까지는 현재 환경에 바로 적용할 수 있도록 설계를 하여 최적화를 진행했지만
앞으로 도입될 Airflow, Kubernetes 환경을 감안하면 외부 Queue를 활용하여 확장성과 운용성을 높일 수 있는 설계도 고려해 보면 좋을것 같습니다..!
'Java' 카테고리의 다른 글
CompletableFuture 테스트 (2) 2024.10.26 Java - 병렬처리 (Future 인터페이스 활용) (4) 2024.10.12