본문 바로가기
[개발] 언어/Java

Java 8 Parallel Stream

by Devsong26 2024. 9. 19.

병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림입니다.

따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있습니다.

 

멀티코어란?
하나의 물리적인 CPU(중앙 처리 장치) 안에 여러 개의 독립적인 처리 장치(코어)가 포함된 구조를 말합니다. 각 코어는 자체적인 연산 장치(ALU, 레지스터 등)를 가지고 있어서 동시에 여러 작업을 병렬로 처리할 수 있습니다.

 

 

ParallelStream은 parallelStream() 또는 stream().parallel() 메서드를 통해 사용할 수 있습니다.

List<Integer> list = List.of(1, 2, 3, 4, 5);
        
list.parallelStream();
list.stream().parallel();

 

 

예시 코드로 알아보는 병렬 스트림

순차 스트림

private void serialStream(long n){
    System.out.println("[serialStream] START");
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();

    final Long ignoreVal =
            LongStream.rangeClosed(1, n)
                    .reduce(0L, Long::sum);

    stopWatch.stop();
    System.out.println("Usage time(sec) >> " + stopWatch.getTotalTimeSeconds());
    System.out.println("[serialStream] END");
}

 

병렬 스트림

private void parallelStream(long n){
    System.out.println("[parallelStream] START");
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();

    final Long ignoreVal =
            LongStream.rangeClosed(1, n)
                    .parallel()
                    .reduce(0L, Long::sum);

    stopWatch.stop();
    System.out.println("Usage time(sec) >> " + stopWatch.getTotalTimeSeconds());
    System.out.println("[parallelStream] END");
}

 

시간을 보기 위한 예시 코드이므로 연산 결과 값은 무시합니다.

n이 10_000_000_000L일 경우 두 코드의 시간 성능 차이는 다음과 같습니다.

  • 순차 스트림: Usage time(sec) >> 16.7672049
  • 병렬 스트림: Usage time(sec) >> 0.7182073

멀티코어로 여러 개의 스레드를 통해 연산을 수행하는 병렬 스트림이 순차 스트림보다 더 빠른 수행시간을 보여줍니다.

 

 

 

병렬 스트림 효과적으로 사용하기

  • 자동 박싱과 언박싱은 병렬스트림의 성능을 크게 저하시킬 수 있으므로 이 때는 기본형 특화 스트림을 사용하는 것이 좋습니다.
  • limit(), findFirst()처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치뤄야 하기 때문에 비정렬된 상태에서의 limit()이나 findAny() 등의 메서드가 더 효율적입니다.
  • 처리해야 할 요소 수가 N이고 하나의 요소를 처리하는 데 드는 비용을 Q라 하면 전체 스트림 파이프라인 처리 비용은 N * Q로 예상할 수 있다. Q가 높아진다면 병렬 스트림으로 성능을 개선할 수 있습니다.
  • 소량의 데이터에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 만큼의 이득을 얻지 못하므로 병렬 스트림이 효과가 없습니다.
  • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있습니다. 예를 들어 SIZED 스트림(스트림의 크기가 정해진)은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 스트림을 병렬 처리할 수 있습니다만 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 스트림을 병렬 처리할 수 있을지 알 수 없게 됩니다.

 

 

 

스트림 소스와 분해성

자료구조별 스트림 소스 분해성(병렬스트림으로 분해하는 정도)에 대해서 정리된 표입니다.

소스 분해성
ArrayList 훌륭함
LinkedList 나쁨
IntStream.range 훌륭함
Stream.iterate 나쁨
HashSet 좋음
TreeSet 좋음

 

 

 

병렬 스트림에서 사용하는 스레드 풀 설정

병렬 스트림에서 사용하는 스레드 풀은 내부적으로 ForkJoinPool을 사용합니다. 

기본적으로 ForkJoinPool은 프로세서 수, 즉 아래 코드의 반환하는 값에 상응하는 스레드를 갖습니다.

Runtime.getRuntime().availableProcessors()

 

전역설정을 하는 방법도 있습니다만 이후 모든 병렬 스트림 연산에 영향을 주고, 자바 8 에서는 하나의 병렬 스트림에 사용할 수 있는 특정한 값을 지정할 수 없기 때문에 아래 코드 사용을 지양하고 ForkJoinPool의 기본값을 사용하도록 권장하고 있습니다.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");

 

 

 

포크/조인 프레임워크

포크/조인 프레임워크

 

포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었습니다. 포크/조인 프레임워크에서는 서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현합니다.

 

ExecutorService란?
Java에서 비동기 작업을 실행하고 관리하기 위한 인터페이스입니다. 스레드를 생성하고 관리하는 복잡성을 줄이기 위해 사용되며, 스레드 풀을 관리하여 작업을 효율적으로 처리합니다.

 

 

포크/조인 프레임워크에서는 작업 훔치기라는 기법을 사용합니다. 작업 훔치기 기법에서는 ForkJoinPool의 모든 스레드를 공정하게 분할합니다. 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리합니다. 스레드가 할일이 다 떨어지면 유휴 상태로 바뀌는 것이 아니라 다른 스레드 큐의 꼬리에서 작업을 훔쳐오며 모든 태스크(모든 큐)가 끝날 때까지 반복합니다. 따라서 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업부하를 비슷한 수준으로 유지할 수 있습니다.

 

작업 훔치기 알고리즘

 

 

 

Custom ForkJoinPool

parellelStream을 사용하게 되면 매번 새로운 스레드 풀을 생성하지 않고 Common ForkJoinPool을 사용하며 별도의 설정이 없으면 여러 ParallelStream이 공유하게 됩니다. 병렬 스트림은 Thread pool을 global하게 공유하기 때문에 만약 A 메서드에서 쓰레드를 모두 점유하게 되면 다른 병렬 스레드는 요청이 처리되지 않고 대기하게 됩니다.

또한, blocking I/O가 발생하는 작업을 하게 되면 Thread pool 내부의 스레드들은 block 되며, 이때 Thread Pool을 공유하는 다른 쪽의 병렬 Stream은 스레드를 얻을 때까지 계속해서 기다리게 되면서 문제가 발생할 수 있습니다.

 

이런 문제점은 Custom ForkJoinPool을 사용해서 해결할 수 있습니다. 

 

별도의 스레드 풀 생성 시 정석은 실행 중인 CPU 코어 수를 기준으로 생성해야 합니다.

ForkJoinPool customForkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());

 

물리적인 코어 수를 초과하여 생성할 경우, 생성은 되지만 스레드 관리 오버헤드와 스레드 간의 빈번한 컨텍스트 스위칭(Context-Switching) 등의 문제로 성능 저하가 발생할 수 있습니다.

 

Parallel Stream 별로 ForkJoinPool을 인스턴스화하여 사용하면 OOME(OutOfMemoryError)가 발생할 수 있습니다.

default로 사용되는 common ForkJoinPool은 정적(static)이기 때문에 메모리 누수가 발생하지 않지만, 

Custom한 ForkJoinPool 객체는 참조 해제되지 않거나, GC(Garbage Collection)으로 수집되지 않을 수 있습니다.

 

이 문제에 대한 해결 방법은 간단한데 Custom ForkJoinPool을 사용한 후 다음과 같이 스레드 풀을 명시적으로 종료하는 것입니다.

 

ForkJoinPool customForJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
// do something..
customForkJoinPool.shutdown();

 

이렇게 하면 사용이 끝난 Custom ForkJoinPool이 메모리에 머무르는 것을 방지할 수 있습니다.

 

 

 

References

  • Book
    • 자바 8 인 액션

 

'[개발] 언어 > Java' 카테고리의 다른 글

Java Stream API의 patitioningBy에 대해서 알아보자.  (0) 2024.11.14
빈 스트림에 allMatch를 할 경우 true 반환  (0) 2024.10.27
Java Method  (1) 2023.11.11
Java Switch Case  (0) 2023.11.09
SOLID  (1) 2023.10.30