티스토리 뷰

리액터에서 스케줄러는 쓰레드를 관리하는 쓰레드 관리자 역할을 하는데, 구독시점에 데이터가 emit되는 영역과 emit된 데이터를 operator로 가공 처리하는 영역을 분리해서 손쉽게 멀티스레딩을 가능하게 한다.


또한, 리액터에서의 스케줄러는 크게 operator chain에서 스케줄러를 전환하는 역할을 하는 전용 operator와 스케줄러를 통해 생성되는 쓰레드 실행 모델을 지정하는 부분으로 구성이 되어 있다.


이 두가지 구성요소 중, 스케줄러를 전환해주는 operator에 대해 자세히 살펴보자.


스케줄러를 전환해주는 operator

리엑터에서 지원하는 스케줄러를 위한 전용 operator는 크게 세가지가 있다.

  • publishOn(): operator chain에서 Downstream Operator의 실행을 위한 쓰레드를 지정한다.
  • subscribeOn(): 최상위 Upstream Publisher의 실행을 위한 쓰레드를 지정한다. 즉, 원본 데이터 소스를 emit하기 위한 스케줄러를 지정한다.
  • parallel(): Downstream에 대한 데이터 처리를 병렬로 분할 처리하기 위한 쓰레드를 지정한다.

publishOn() 및 subscribeOn()의 동작 방식 1

publishOn이나 subscribeOn과 같은 operator를 특별히 사용하지 않으면 operator chain에서 최초 실행되는 쓰레드는 subscribe method가 호출되는 영역에 있는 쓰레드가 된다.


위 그림에서는 publishOn이나 subscribeOn과 같은 operator가 별도로 추가되어 있지 않은 상태이다. 때문에 구독이 발생하는 시점에 operator chain이 실행되면 모든 operator들이 전부 메인 쓰레드에서 실행된다.


SchedulerOperator 예제 코드 1

public class SchedulerOperatorExample1 {
    public static void main(String[] args) {
        Flux.fromArray(new Integer[]{1, 3, 5, 7})
                .filter(data -> data > 3)
                .map(data -> data * 10)
                .subscribe(Logger::onNext);
    }
}

  • 실행 결과를 보면 메인 쓰레드에서 실행된 것을 확인할 수 있다.

publishOn() 및 subscribeOn()의 동작 방식 2

operator chain에서 publishOn이 호출되면 publishOn 호출 이후의 operator chain은 다음 publishOn을 만나기전까지 publishOn에서 지정한 쓰레드(여기서는 A쓰레드)에서 실행된다.


mpa operator는 두 번째 publishOn이 추가되었기 때문에 B쓰레드에서 실행된다.


SchedulerOperator 예제 코드 2

public class SchedulerOperatorExample2 {
    public static void main(String[] args) throws InterruptedException {
        Flux.fromArray(new Integer[]{1, 3, 5, 7})
                .doOnNext(data -> Logger.doOnNext("fromArray", data))
                .publishOn(Schedulers.parallel())
                .filter(data -> data > 3)
                .doOnNext(data -> Logger.doOnNext("filter", data))
                .map(data -> data * 10)
                .doOnNext(data -> Logger.doOnNext("map", data))
                .subscribe(Logger::onNext);

        Thread.sleep(500);
    }
}

  • 실행 결과를 보면 fromArray에서 emit된 1, 3, 5 ,7은 메인 쓰레드에서 모두 실행되었고, publishOn이 추가된 시점 이후에는 parallel-1이라는 쓰레드에서 실행이 된 것을 확인할 수 있다.

publishOn() 및 subscribeOn()의 동작 방식 3

subscribeOn은 최상위 Upstream Publisher의 실행 쓰레드를 subscribe() 호출 scope의 쓰레드에서 subscribeOn에서 지정한 쓰레드로 바꾼다.


SchedulerOperator 예제 코드 3

public class SchedulerOperatorExample3 {
    public static void main(String[] args) throws InterruptedException {
        Flux.fromArray(new Integer[]{1, 3, 5, 7})
                .subscribeOn(Schedulers.boundedElastic())
                .doOnNext(data -> Logger.doOnNext("fromArray", data))
                .filter(data -> data > 3)
                .doOnNext(data -> Logger.doOnNext("filter", data))
                .map(data -> data * 10)
                .doOnNext(data -> Logger.doOnNext("map", data))
                .subscribe(Logger::onNext);

        Thread.sleep(500);
    }
}

  • 실행 결과를 보면 SubscribeOn를 추가했기 때문에 Upstream Publisher와 동일한 쓰레드로 실행되는 것을 확인할 수 있다.

publishOn() 및 subscribeOn()의 동작 방식 4

subscribeOnpublishOn이 같이 있다면, publishOn을 만나기 전까지의 Upstream Operator 체인은 subscribeOn에서 지정한 쓰레드에서 실행되고, publishOn을 만날때마다 publishOn 아래의 operator chain Downstream은 publishOn에서 지정한 쓰레드에서 실행된다.


SchedulerOperator 예제 코드 4

public class SchedulerOperatorExample4 {
    public static void main(String[] args) throws InterruptedException {
        Flux.fromArray(new Integer[]{1, 3, 5, 7})
                .subscribeOn(Schedulers.boundedElastic())
                .filter(data -> data > 3)
                .doOnNext(data -> Logger.doOnNext("filter", data))
                .publishOn(Schedulers.parallel())
                .map(data -> data * 10)
                .doOnNext(data -> Logger.doOnNext("map", data))
                .subscribe(Logger::onNext);

        Thread.sleep(500);
    }
}

  • 실행 결과를 보면 publishOn이 사용된 filterparallel-1에서 실행된 것을 확인할 수 있다.

ParallerFlux의 동작 방식

컴퓨터 시스템에서 워크로드라는 의미는 처리해야 될 어떤 작업량을 의미하고, 리액터에서 리액트 시퀀스상의 처리해야 될 작업량은 바로 퍼블리셔가 emit하는 데이터들을 처리하기 위한 작업량을 의미한다.


Flexoperator 중 하나인 Paraller Operator를 사용해서 처리해야 될 워크로드를 병렬로 처리하겠다는 정의를 하게 되는데, 단순히 Paraller Operator만 추가했다고 해서 실제 병렬 작업이 실행 되는 것은 아니고, Paraller OperatorReturn 값으로 반환하는 ParallerFlux 타입에서 지원하는 Run on Operator를 사용해서 스케줄러를 지정하는 시점에 병렬 작업이 시작된다.


그리고 워크로드를 병렬로 처리하기 위해서 워크로드를 분할한 뒤, Rail이라는 논리적인 작업 단위를 통해서 분할된 워크로드가 처리된다.


ParallerFlux 예제 코드 1

public class ParallelExample1 {
    public static void main(String[] args) {
        Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15})
                .parallel()
                .subscribe(Logger::onNext);
    }
}

  • 실행 결과를 보면 Paraller Operator만 추가된 상태이기 때문에 메인 쓰레드만 동작하고, 병렬 처리가 되지 않은 모습을 확인할 수 있다.

ParallerFlux 예제 코드 2

public class ParallelExample2 {
    public static void main(String[] args) throws InterruptedException {
        Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15})
                .parallel()
                .runOn(Schedulers.parallel())
                .subscribe(Logger::onNext);

        Thread.sleep(100);
    }
}

  • 실행 결과를 보면 1부터 8까지의 실행 쓰레드가 데이터를 emit하는 모습을 볼 수 있다.
  • 8개의 쓰레드는 CPU에서 지원하는 논리적인 코어와 연관이 있다.

ParallerFlux 예제 코드 3

public class ParallelExample3 {
    public static void main(String[] args) throws InterruptedException {
        Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
                .parallel()
                .runOn(Schedulers.parallel())
                .subscribe(Logger::onNext);

        Thread.sleep(100);
    }
}

  • ParallerFlux 예제 코드 2와 마찬가지로 1부터 8까지의 총 8개의 쓰레드를 이용해서 병렬로 작업을 처리한 모습을 확인할 수 있다.

ParallerFlux 예제 코드 4

public class ParallelExample4 {
    public static void main(String[] args) throws InterruptedException {
        Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
                .parallel(4)
                .runOn(Schedulers.parallel())
                .subscribe(Logger::onNext);

        Thread.sleep(100);
    }
}

  • Paraller Operator의 파라미터로 쓰레드의 개수(여기서는 4)를 지정하면 컴퓨터에서 사용할 수 있는 논리적인 코어의 개수 범위 안에서 파라미터로 지정한 숫자만큼의 쓰레드를 사용해서 병렬 작업을 처리한다.

Paraller Operator를 사용할 때 주의할 점으로는 Paraller OperatorRun on Operator를 함께 사용해야만 병렬로 작업을 처리할 수 있다는 사실이다.

댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함