ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • CompletableFuture 테스트
    Java 2024. 10. 26. 18:52

    https://baektothefuture.tistory.com/13

     

    Java - 병렬처리 (Future 인터페이스 활용)

    90만건 가까이 되는 데이터를 Elasticsearch에서 읽어 전처리 후 다시 Elasticsearch에 색인했었습니다.이 과정에서 약 3~4시간정도 소요가 되는데 이를 어떻게하면 빠르게 처리할 수 있을까 고민하다가

    baektothefuture.tistory.com

     


    이전글에서는 Future 인터페이스에 다루어 보았는데요 이번에는 CompletableFuture에 대해 알아보도록 하겠습니다.

    Future vs CompletableFuture

    출처 : https://11st-tech.github.io/2024/01/04/completablefuture/

    Future CompletableFuture
    Blocking non-blocking
    여러 연산을 함께 연결하기 어려움 여러 연산을 함께 연결
    여러 연산 결과를 결합하기 어려움 여러 연산 결과를 결합
    연산 성공 여부만 확인할 수 있고 예외처리 어려움 execeptionally(), handle()을 통한 예외 처리

     

    위의 표를 보면 Future 인터페이스의 한계점을 CompletablFuture을 통해 해결한 것을 볼 수 있습니다.

     

    주로 활용하는 메서드

    runAsync(): 반환값이 없는 비동기 작업을 실행합니다.

    supplyAsync(): 반환값이 있는 비동기 작업을 실행합니다.

    thenApply(): 이전 단계의 결과를 받아서 새로운 값을 반환합니다.

    thenAccept(): 이전 단계의 결과를 받아서 소비(consumer)합니다.

    thenRun(): 이전 단계의 결과를 사용하지 않고 후속 작업을 수행합니다.

     

     

    이에 대한 간단한 테스트를 해보았는데 CompletableFuture의 특성을 잘 살린(?) 코드는 아니지만 처음접하는 분들은 예시를 보며 조금이나마 감을 잡았으면 좋겠습니다.

    package main;
    
    import java.util.*;
    import java.util.concurrent.CompletableFuture;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    public class CompletableFutureExample {
        public static void main(String[] args) {
            List<Integer> job = IntStream.range(1, 11).boxed().collect(Collectors.toList());
    
            // jobList 생성 및 초기화
            List<List<Integer>> jobList = IntStream.range(1, 101)
                    .mapToObj(i -> new ArrayList<>(job)) // 각기 다른 복사본 추가
                    .collect(Collectors.toList());
    
            String version = System.getProperty("java.version");
            System.out.println("JAVA Version : " + version);
    
            try {
                System.out.println("=========== multi thread ==============");
                long start = System.currentTimeMillis();
    
                // CompletableFuture 리스트 생성
                List<CompletableFuture<List<Integer>>> futureList = new ArrayList<>();
    
                // 각 job에 대해 비동기 작업 수행
                for (List<Integer> ob : jobList) {
                    CompletableFuture<List<Integer>> future = CompletableFuture.supplyAsync(() -> {
                        System.out.println("현재 스레드 : " + Thread.currentThread().getName());
                        return working(ob);
                    }).thenApply(a -> {
                        System.out.println(Thread.currentThread().getName() + " 마지막 단계");
                        return lastWorking(a);
                    });
                    futureList.add(future);
                }
    
                // 모든 작업이 완료될 때까지 대기
                CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
                        futureList.toArray(new CompletableFuture[0])
                );
                combinedFuture.get(); // 또는 combinedFuture.join();
    
                long end = System.currentTimeMillis();
                long secDiff = (end - start);
    
                System.out.println("parallel : " + secDiff + "ms");
    
                // 작업 결과 수집 (필요한 경우)
                /*
                List<List<Integer>> results = futureList.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList());
                */
    
                System.out.println("=========== single thread ==============");
                long start2 = System.currentTimeMillis();
    
                for (List<Integer> ob : jobList) {
                    lastWorking(working(ob));
                }
    
                long end2 = System.currentTimeMillis();
                long secDiff2 = (end2 - start2);
                System.out.println("single thread : " + secDiff2 + "ms");
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static List<Integer> working(List<Integer> list) {
            try {
                Thread.sleep(2000);
                for (int i = 0; i < list.size(); i++) {
                    list.set(i, list.get(i) * 10);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return list;
        }
    
        public static List<Integer> lastWorking(List<Integer> list) {
            try {
                Thread.sleep(2000);
                for (int i = 0; i < list.size(); i++) {
                    list.set(i, list.get(i) + 1);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return list;
        }
    }

     

    위 코드의 결과값을 보면 아래와 같습니다.

    ..(생략)..
    ForkJoinPool.commonPool-worker-10 마지막 단계 ForkJoinPool.commonPool-worker-16 마지막 단계 ForkJoinPool.commonPool-worker-17 마지막 단계 ForkJoinPool.commonPool-worker-4 마지막 단계
    parallel : 24140ms
    =========== single thread ==============
    single thread : 401558ms
    Disconnected from the target VM, address: '127.0.0.1:50706', transport: 'socket' Process finished with exit code 0

     

    본래는 분석 프로세스를 잘게 나누어 병렬처리를 하려 했지만 사실 컨택스트 스위칭 비용만 늘어나 성능이 더 떨어질 뿐 유의미한 결과를 보지 못했습니다. 병렬프로세스라고 무조건 빠르다고 생각하고 적용하지 말고 프로세스 특성에 맞는지 잘 확인하고 적용하면 좋을것 같습니다...!

    'Java' 카테고리의 다른 글

    병렬 프로세스 최적화_CompletableFuture  (5) 2025.01.11
    Java - 병렬처리 (Future 인터페이스 활용)  (4) 2024.10.12
Designed by Tistory.