Fork/Join

Fork/join framework은 ExecutorService의 구현체 중 하나로 Fork/join framework를 사용하면 다중 프로세서를 활용할 수 있다. 여러 재귀적으로(쪼개져 나온 작업으로 되돌아 갈 수 있는) 작은 작업으로 쪼개어 처리할 수 있는 작업 처리를 위해 설계 되었다. 애플리케이션 성능을 향상 시킬 수 있도록 모든 가용한 프로세싱 파워를 사용하는 것이 목적이다.

다른 ExecutorService 구현체처럼, fork/join framework은 여러 작업을 쓰레드 풀(pool)내의 워커(worker)쓰레드로 분배한다. Fork/join framework의 차별점은 work-stealing 알고리즘을 사용하여 일을 분배하도록 하는 것이며, 워커 쓰레드가 더이상 처리할 것이 없는 경우, 다른 작업 처리에 바쁜 다른 워커 쓰레드가 처리할 것을 “훔쳐”와서 처리하도록 한다.

Fork/join framework의 핵심은 ForkJoinPool 클래스이며, ForkJoinPool 클래스는 AbstractExecutorService 클래스를 상속/확장한다. ForkJoinPool은 핵심이 되는 work stealing 알고리즘을 구현하고 ForkJoinTask 프로세스들을 실행한다.

기본 사용법

Fork/join framework을 사용하기 위한 처 단계는 하나의 작업을 여러 작업으로 쪼개도록 하는 코드를 작성하는 것이다. 작성해야 할 코드는 아래와 같은 의사코드(pseudocode)와 유사할 것이다.

if (처리해야 할 작업이 충분이 작은가?)
	바로 작업을 처리한다.
else
	처리할 작업을 두개의 작업으로 쪼갠다.
	두개의 작업처리를 호출하고 결과를 기다린다. (이부분이 재귀적임)

위의 코드를 ForkJoinTask의 하위 클래스에 구현한다. 보통 보다 특화된 타입인 결과를 반환하는 RecursiveTask 혹은 RecursiveAction을 사용한다.

ForkJoinTask의 하위 구현체가 준비되면 처리할 모든 작업을 나타내는 객체를 생성하고 생성한 객체를 ForkJoinPool인스턴스의 invoke 메서드로 전달한다.

선명도 흐리기 예제

Fork/join framework이 어떻게 작동하는지에 대한 이해도를 높이기 위해, 아래의 예제를 살펴보자. 이미지를 흐기게 처리하길 원한다고 가정하자. 원본 이미지는 정수의 배열로 나타낼 수 있다. 각각의 정수값은 단일 픽셀(pixel)의 색상 값을 나타낸다. 흐림처리된 최종 이미지는 원본 이미지와 동일한 크기의 정소 배열로 나타낼 수 있다.

흐림처리는 원본 배열의 픽셀을 한번에 하나씩 처리해 나가면서 완료될 수 있다. 각각의 픽셀은 주변 픽셀의 색값의 평균값을 취하도록 한다.(빨강, 녹색, 그리고 파랑색의 평균값) 그리고 결과는 최종 배열에 저장한다. 한 이미지는 큰 배열로 표현되기때문에, 이 처리 과정은 오랜 시간이 걸릴 수 있다. 따라서 fork/join framework을 사용하는 알고리짐을 구현한 다중 프로세서 시스템 상의 동시처리의 이점을 활용한다.

import java.util.concurrent.RecursiveAction;

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLengh;
    private int[] mDestination;

    //Processing window size; should be odd.

    private int mBlurWidth = 15;

    public ForkBlur(int[] src, int start, int length, int[] dest) {
        this.mSource = src;
        this.mStart = start;
        this.mLengh = length;
        this.mDestination = dest;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth -1 ) / 2;
        for (int index = mStart; index < mStart + mLengh; index++) {
            // Calculate average;
            float rt = 0, gt = 0, bt= 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(
                        Math.max(mi + index, 0),
                        mSource.length -1
                );
                int pixel = mSource[mindex];
                rt += (float) ((pixel & 0x0ff0000) >> 16) / mBlurWidth;
                gt += (float) ((pixel & 0x0ff0000) >> 8) / mBlurWidth;
                rt += (float) ((pixel & 0x0ff0000) >> 0) / mBlurWidth;
            }

            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                    (((int)rt) << 16) |
                    (((int)gt) <<  8) |
                    (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }

    @Override
    protected void compute() {
        // TODO: implement
    }
}

이제 추상 메서드인 compute() 메서드를 구현할 것이다. compute() 메서드의 구현은 흐림 처리를 바로 할 것인지 혹은 두개의 작은 작업으로 쪼갤 것인지를 결정하여 수행한다. 배열 길이의 기준점(threshold)를 정하여 작업을 나눌지 말지를 결정하도록 한다.

 protected static int sThreshold = 100000;

    @Override
    protected void compute() {
        if (mLengh < sThreshold) {
            computeDirectly();
            return;
        }
        int split = mLengh / 2;
        invokeAll(
                new ForkBlur(mSource, mStart, split, mDestination),
                new ForkBlur(mSource, mStart + split, mLengh - split, mDestination),
        );

이전 메서드들이 RecursiveAction 클래스의 하위 클래스에 구현되어 있다면, ForkJoinPool에서 실행할 작업을 설정하는 것은 간단하며, 다음 단계들을 포함한다.

  1. 처리할 작업들을 나타내는 Task를 생성한다.

     // source image pixels are in src
     // destination image pixels are in dst
     ForkBlur fb = new ForkBlur(src, 0, src.legnth, dst)
    
  2. Task를 실행할 ForkJoinPool을 생성한다.

     ForkJoinPool pool = new ForkJoinPool()
    
  3. Task를 실행한다.

     pool.invoke(fb)
    

전체 소스 코드는 ForkBlur를 참조하면 된다.

표준 구현체들

Fork/join framework을 직접 사용하여 다중 프로세서 시스템상에서 작업들을 동시에 처리되도록 구현할 수 도 있지만, 이미 fork/join framework을 사용하여 구현되어 있는 기능들이 Java SE에 포함되어 있다. 예를 들어 java.util.Arrays 클래스의 parrallelSort 메서드가 그 예중 하나이다. parrallelSort 메서드의 결과는 일반 sort 메서드 호출과 동일하다. 하지만 큰 배열을 정렬하는 경우, 다중 프로세스를 지원하는 시스템 상에서 fork/join framework을 통한 동시 처리를 활용해서 주어진 배열을 정렬하게 되면 순차적인 정렬인 sort 메서드 보다 빠르게 결과를 얻을 수 있다. 보다 상세한 구현 내용은 Java API문서를 참고하면 된다.

fork/join framework을 사용한 또 다른 구현체들은 java.util.streams 패키지내의 메서드들에서 사용이 되었다.

기타

Pool 크기 제어

기본 ForkJoinPool의 크기(Thread의 개수)는 생성 가능한 최대값(0x7fff) 32767개와 실행 환경의 사용 가능한 프로세서의 갯수 중 작은 값을 사용한다. 즉, CPU의 코어가 4개이면 Pool size는 4가 된다.

전체 시스템에 동일한 ForkJoinPool 크기 적용

현재 실행되고 있는 모든 프로세스의 모든 ForkJoinPool에 적용이 된다. 따라서 사용을 신중히 고려해서 사용해야 한다.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")

직접 크기를 지정하여 사용

ForkJoinPool 클래스를 보면 아래와 같이 parallelism을 인자로 받는 생성자를 확인할 수 있다. 이 생성자를 통해 ForkJoinPool을 생성하면 원하는 크기(단, 최대값 32767을 넘지 않는)를 설정할 수 있다.

ForkJoinPool(int parallelism)

추가로 읽어보기