ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 병렬 프로세스 최적화_CompletableFuture
    Java 2025. 1. 11. 15:58

    저희 회사는 SNS 데이터를 다룹니다.  

    데이터의 제목, 내용 길이가 매우 짧은 것부터 긴 것까지 다양한 형태의 데이터들을 처리하고 관리합니다.

     

    재색인 프로세스를 Airflow에 적용하기 위한 검토 과정에서 문제점을 발견하여 최적화를 진행해 보았는데요..!

    나름 성공적인 결과를 낸 것 같아 최적화 과정을 공유해드리려고 합니다.

     

    현재 프로세스는 Elasticsearch에서 데이터를 가져와 해당 문서를 분석하고

    다시 Elasticsearch로 재색인 하는 과정을 진행하고 있습니다. 

    그 중 시간이 가장 오래걸리는 텍스트 분석 과정에서 병렬처리를 하고 있는데요! 코드는 아래와 같습니다.

     

    기존 병렬 프로세스 코드

    CompletableFuture<List<RDataResultDTO>> analysisDataFuture = CompletableFuture.supplyAsync(() ->
    			dataList.parallelStream()
          // 연관어 + 감성어
          .map(data -> dao.getAnalyzedRDataDTO(data, searchDate))
          .collect(Collectors.toList())
    );

    time : 43~46 sec


    개선방안

    무작위로 들어오는 텍스트 길이(분석 대상)를 고려할 때, work stealing으로 인한 빈번한 Context Switching이 단일 스레드보다 성능이 저하되지는 않을지, 또는 스레드 수를 고정하는 것이 더 효율적일지에 대해 테스트를 진행했습니다.

    분석 결과, I/O 집중 작업보다 CPU 집중 작업의 비중이 높다는 것을 확인했으며, 스레드 풀을 미리 생성하여 고정된 스레드로 처리했을 때 성능이 향상되는 것을 검증했습니다.

     

    하지만 모니터링을 보며 의아했던 점은 worker Thread Running time 입니다.

    Worker Thread 실행 시간 : 전체 실행 시간의 약 23%

     

    모니터링을 보고 느낀점은 "일 편하게 하네?" 였습니다. 

     

    현재는 분석 단계에서만 병렬 프로세스를 활용하고 있습니다. 하지만 병렬 작업을 분석에서 색인 과정까지 확대하면 처리 속도를 개선하고 CPU를 더욱 효율적으로 사용할 수 있을 것으로 판단했습니다. CPU 사용률과 메모리 활용도가 높지 않은 상태였기에, 시스템에 추가 부하를 주어도 충분히 감당할 수 있을 것으로 보였습니다.

     

    변경사항

    • 리팩토링 (메서드 세분화)
    • CompletableFuture의 thenAccept를 활용한 비동기작업 동기처리 
    List<CompletableFuture<Void>> futuresList = new ArrayList<>();
    while (check.get()) {
    	List<RDataResultDTO> docs = getDocument(searchDate, instance, distribute);
        if(docs.isEmpty()) {
        	break;
        }
        int size = docs.size();
        lastSeq = docs.get(size - 1).getLo_seq_org();
        CompletableFuture<Void> pipeline = CompletableFuture.supplyAsync(() -> doAnalysis(docs),forkJoinPool)
        	.thenAccept(this::postAnalyzedData);
        futuresList.add(pipeline); // 모든 작업을 리스트에 저장
    }
    
    // 모든 CompletableFuture 작업이 완료될 때까지 대기
    CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0])).join();

     


    테스트 결과

    기존 대비 65% 속도 향상

    리팩토링 후 적용된 병렬처리 방식으로 처리 속도가 기존 40초대에서 16초로 65% 향상되었습니다. 

    열심히 일하는 스레드

     

    다만, 이 방식은 모든 task를 작업 Queue에 넣고 시작하기에 시스템 부하를 고려한 코드 설계가 필요합니다.

     

    텍스트 길이가 5000자에서 9000자인 21,220개의 문서를 34분간 색인해 보았을때

    힙 메모리가 지속적으로 증가하는 현상을 확인했으며, 최종적으로 메모리가 감소하긴 했으나 이를 안정적으로 관리하기 위한 코드 개선이 필요했습니다.

     

    추가된 코드

    
    // 가용 프로세서 확인
    int processors = Runtime.getRuntime().availableProcessors()-2;
    System.out.println("Processor : " + processors);
    // 작업 예정 Task 수 관리 변수
    int getQueuedTaskCount = processors*2; 
    
    while (check.get()) {
    	// 기존 코드
    	...
        // 추가
        if (futuresList.size() >= 50) { ///  task를 비워줌으로써 heap을 살짝 덜먹음
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
            allFutures.join();
            futuresList.clear();
        }
    }
    if (!futuresList.isEmpty()) {
        CompletableFuture<Void> allRemainingFutures = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
        allRemainingFutures.join();
    }

     


    CPU와 메모리 사용률을 조정 후 테스트 결과 (최종)

    - 왼쪽 (오리지널)  : Uptime: 6 min 15 sec / Heap Size: 3,302,490,112B
    - 오른쪽 (튜닝) : Uptime: 4 min 38 sec / Heap Size: 3,615,490,048B

     

    시스템 부하를 고려하지 않았을 때는 초반 Used Heap 사이즈가 3GB를 가볍게 넘었었지만 작업량 관리 코드를 추가함으로써 개선된 모습을 볼 수 있었습니다.

     


     

    지금까지는 현재 환경에 바로 적용할 수 있도록 설계를 하여 최적화를 진행했지만

    앞으로 도입될 Airflow, Kubernetes 환경을 감안하면 외부 Queue를 활용하여 확장성과 운용성을 높일 수 있는 설계도 고려해 보면 좋을것 같습니다..!

    'Java' 카테고리의 다른 글

    CompletableFuture 테스트  (2) 2024.10.26
    Java - 병렬처리 (Future 인터페이스 활용)  (4) 2024.10.12
Designed by Tistory.