-
CompletableFuture 테스트Java 2024. 10. 26. 18:52
https://baektothefuture.tistory.com/13
Java - 병렬처리 (Future 인터페이스 활용)
90만건 가까이 되는 데이터를 Elasticsearch에서 읽어 전처리 후 다시 Elasticsearch에 색인했었습니다.이 과정에서 약 3~4시간정도 소요가 되는데 이를 어떻게하면 빠르게 처리할 수 있을까 고민하다가
baektothefuture.tistory.com
이전글에서는 Future 인터페이스에 다루어 보았는데요 이번에는 CompletableFuture에 대해 알아보도록 하겠습니다.
Future vs CompletableFuture출처 : https://11st-tech.github.io/2024/01/04/completablefuture/
Future CompletableFuture Blocking non-blocking 여러 연산을 함께 연결하기 어려움 여러 연산을 함께 연결 여러 연산 결과를 결합하기 어려움 여러 연산 결과를 결합 연산 성공 여부만 확인할 수 있고 예외처리 어려움 execeptionally(), handle()을 통한 예외 처리 위의 표를 보면 Future 인터페이스의 한계점을 CompletablFuture을 통해 해결한 것을 볼 수 있습니다.
주로 활용하는 메서드
runAsync(): 반환값이 없는 비동기 작업을 실행합니다.
supplyAsync(): 반환값이 있는 비동기 작업을 실행합니다.
thenApply(): 이전 단계의 결과를 받아서 새로운 값을 반환합니다.
thenAccept(): 이전 단계의 결과를 받아서 소비(consumer)합니다.
thenRun(): 이전 단계의 결과를 사용하지 않고 후속 작업을 수행합니다.
이에 대한 간단한 테스트를 해보았는데 CompletableFuture의 특성을 잘 살린(?) 코드는 아니지만 처음접하는 분들은 예시를 보며 조금이나마 감을 잡았으면 좋겠습니다.
package main; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.IntStream; public class CompletableFutureExample { public static void main(String[] args) { List<Integer> job = IntStream.range(1, 11).boxed().collect(Collectors.toList()); // jobList 생성 및 초기화 List<List<Integer>> jobList = IntStream.range(1, 101) .mapToObj(i -> new ArrayList<>(job)) // 각기 다른 복사본 추가 .collect(Collectors.toList()); String version = System.getProperty("java.version"); System.out.println("JAVA Version : " + version); try { System.out.println("=========== multi thread =============="); long start = System.currentTimeMillis(); // CompletableFuture 리스트 생성 List<CompletableFuture<List<Integer>>> futureList = new ArrayList<>(); // 각 job에 대해 비동기 작업 수행 for (List<Integer> ob : jobList) { CompletableFuture<List<Integer>> future = CompletableFuture.supplyAsync(() -> { System.out.println("현재 스레드 : " + Thread.currentThread().getName()); return working(ob); }).thenApply(a -> { System.out.println(Thread.currentThread().getName() + " 마지막 단계"); return lastWorking(a); }); futureList.add(future); } // 모든 작업이 완료될 때까지 대기 CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( futureList.toArray(new CompletableFuture[0]) ); combinedFuture.get(); // 또는 combinedFuture.join(); long end = System.currentTimeMillis(); long secDiff = (end - start); System.out.println("parallel : " + secDiff + "ms"); // 작업 결과 수집 (필요한 경우) /* List<List<Integer>> results = futureList.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); */ System.out.println("=========== single thread =============="); long start2 = System.currentTimeMillis(); for (List<Integer> ob : jobList) { lastWorking(working(ob)); } long end2 = System.currentTimeMillis(); long secDiff2 = (end2 - start2); System.out.println("single thread : " + secDiff2 + "ms"); } catch (Exception e) { e.printStackTrace(); } } public static List<Integer> working(List<Integer> list) { try { Thread.sleep(2000); for (int i = 0; i < list.size(); i++) { list.set(i, list.get(i) * 10); } } catch (InterruptedException e) { e.printStackTrace(); } return list; } public static List<Integer> lastWorking(List<Integer> list) { try { Thread.sleep(2000); for (int i = 0; i < list.size(); i++) { list.set(i, list.get(i) + 1); } } catch (InterruptedException e) { e.printStackTrace(); } return list; } }위 코드의 결과값을 보면 아래와 같습니다.
..(생략)..
ForkJoinPool.commonPool-worker-10 마지막 단계 ForkJoinPool.commonPool-worker-16 마지막 단계 ForkJoinPool.commonPool-worker-17 마지막 단계 ForkJoinPool.commonPool-worker-4 마지막 단계
parallel : 24140ms
=========== single thread ==============
single thread : 401558ms
Disconnected from the target VM, address: '127.0.0.1:50706', transport: 'socket' Process finished with exit code 0본래는 분석 프로세스를 잘게 나누어 병렬처리를 하려 했지만 사실 컨택스트 스위칭 비용만 늘어나 성능이 더 떨어질 뿐 유의미한 결과를 보지 못했습니다. 병렬프로세스라고 무조건 빠르다고 생각하고 적용하지 말고 프로세스 특성에 맞는지 잘 확인하고 적용하면 좋을것 같습니다...!
'Java' 카테고리의 다른 글
병렬 프로세스 최적화_CompletableFuture (5) 2025.01.11 Java - 병렬처리 (Future 인터페이스 활용) (4) 2024.10.12