티스토리 뷰
리액터에서 스케줄러는 쓰레드를 관리하는 쓰레드 관리자 역할을 하는데, 구독시점에 데이터가 emit
되는 영역과 emit
된 데이터를 operator
로 가공 처리하는 영역을 분리해서 손쉽게 멀티스레딩을 가능하게 한다.
또한, 리액터에서의 스케줄러는 크게 operator chain
에서 스케줄러를 전환하는 역할을 하는 전용 operator와 스케줄러를 통해 생성되는 쓰레드 실행 모델을 지정하는 부분으로 구성이 되어 있다.
이 두가지 구성요소 중, 스케줄러를 전환해주는 operator에 대해 자세히 살펴보자.
스케줄러를 전환해주는 operator
리엑터에서 지원하는 스케줄러를 위한 전용 operator
는 크게 세가지가 있다.
publishOn()
: operator chain에서 Downstream Operator의 실행을 위한 쓰레드를 지정한다.subscribeOn()
: 최상위 Upstream Publisher의 실행을 위한 쓰레드를 지정한다. 즉, 원본 데이터 소스를 emit하기 위한 스케줄러를 지정한다.parallel()
: Downstream에 대한 데이터 처리를 병렬로 분할 처리하기 위한 쓰레드를 지정한다.
publishOn() 및 subscribeOn()의 동작 방식 1
publishOn
이나 subscribeOn
과 같은 operator
를 특별히 사용하지 않으면 operator chain
에서 최초 실행되는 쓰레드는 subscribe method
가 호출되는 영역에 있는 쓰레드가 된다.
위 그림에서는 publishOn
이나 subscribeOn
과 같은 operator
가 별도로 추가되어 있지 않은 상태이다. 때문에 구독이 발생하는 시점에 operator chain
이 실행되면 모든 operator
들이 전부 메인 쓰레드에서 실행된다.
SchedulerOperator 예제 코드 1
public class SchedulerOperatorExample1 {
public static void main(String[] args) {
Flux.fromArray(new Integer[]{1, 3, 5, 7})
.filter(data -> data > 3)
.map(data -> data * 10)
.subscribe(Logger::onNext);
}
}
- 실행 결과를 보면 메인 쓰레드에서 실행된 것을 확인할 수 있다.
publishOn() 및 subscribeOn()의 동작 방식 2
operator chain
에서 publishOn
이 호출되면 publishOn
호출 이후의 operator chain
은 다음 publishOn
을 만나기전까지 publishOn
에서 지정한 쓰레드(여기서는 A쓰레드)에서 실행된다.
mpa operator
는 두 번째 publishOn
이 추가되었기 때문에 B쓰레드에서 실행된다.
SchedulerOperator 예제 코드 2
public class SchedulerOperatorExample2 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[]{1, 3, 5, 7})
.doOnNext(data -> Logger.doOnNext("fromArray", data))
.publishOn(Schedulers.parallel())
.filter(data -> data > 3)
.doOnNext(data -> Logger.doOnNext("filter", data))
.map(data -> data * 10)
.doOnNext(data -> Logger.doOnNext("map", data))
.subscribe(Logger::onNext);
Thread.sleep(500);
}
}
- 실행 결과를 보면
fromArray
에서emit
된 1, 3, 5 ,7은 메인 쓰레드에서 모두 실행되었고,publishOn
이 추가된 시점 이후에는parallel-1
이라는 쓰레드에서 실행이 된 것을 확인할 수 있다.
publishOn() 및 subscribeOn()의 동작 방식 3
subscribeOn
은 최상위 Upstream Publisher의 실행 쓰레드를 subscribe()
호출 scope의 쓰레드에서 subscribeOn
에서 지정한 쓰레드로 바꾼다.
SchedulerOperator 예제 코드 3
public class SchedulerOperatorExample3 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[]{1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(data -> Logger.doOnNext("fromArray", data))
.filter(data -> data > 3)
.doOnNext(data -> Logger.doOnNext("filter", data))
.map(data -> data * 10)
.doOnNext(data -> Logger.doOnNext("map", data))
.subscribe(Logger::onNext);
Thread.sleep(500);
}
}
- 실행 결과를 보면
SubscribeOn
를 추가했기 때문에 Upstream Publisher와 동일한 쓰레드로 실행되는 것을 확인할 수 있다.
publishOn() 및 subscribeOn()의 동작 방식 4
subscribeOn
과 publishOn
이 같이 있다면, publishOn
을 만나기 전까지의 Upstream Operator 체인은 subscribeOn
에서 지정한 쓰레드에서 실행되고, publishOn
을 만날때마다 publishOn
아래의 operator chain
Downstream은 publishOn
에서 지정한 쓰레드에서 실행된다.
SchedulerOperator 예제 코드 4
public class SchedulerOperatorExample4 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[]{1, 3, 5, 7})
.subscribeOn(Schedulers.boundedElastic())
.filter(data -> data > 3)
.doOnNext(data -> Logger.doOnNext("filter", data))
.publishOn(Schedulers.parallel())
.map(data -> data * 10)
.doOnNext(data -> Logger.doOnNext("map", data))
.subscribe(Logger::onNext);
Thread.sleep(500);
}
}
- 실행 결과를 보면
publishOn
이 사용된filter
는parallel-1
에서 실행된 것을 확인할 수 있다.
ParallerFlux의 동작 방식
컴퓨터 시스템에서 워크로드라는 의미는 처리해야 될 어떤 작업량을 의미하고, 리액터에서 리액트 시퀀스상의 처리해야 될 작업량은 바로 퍼블리셔가 emit
하는 데이터들을 처리하기 위한 작업량을 의미한다.
Flex
의 operator
중 하나인 Paraller Operator
를 사용해서 처리해야 될 워크로드를 병렬로 처리하겠다는 정의를 하게 되는데, 단순히 Paraller Operator
만 추가했다고 해서 실제 병렬 작업이 실행 되는 것은 아니고, Paraller Operator
가 Return
값으로 반환하는 ParallerFlux
타입에서 지원하는 Run on Operator
를 사용해서 스케줄러를 지정하는 시점에 병렬 작업이 시작된다.
그리고 워크로드를 병렬로 처리하기 위해서 워크로드를 분할한 뒤, Rail
이라는 논리적인 작업 단위를 통해서 분할된 워크로드가 처리된다.
ParallerFlux 예제 코드 1
public class ParallelExample1 {
public static void main(String[] args) {
Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15})
.parallel()
.subscribe(Logger::onNext);
}
}
- 실행 결과를 보면
Paraller Operator
만 추가된 상태이기 때문에 메인 쓰레드만 동작하고, 병렬 처리가 되지 않은 모습을 확인할 수 있다.
ParallerFlux 예제 코드 2
public class ParallelExample2 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15})
.parallel()
.runOn(Schedulers.parallel())
.subscribe(Logger::onNext);
Thread.sleep(100);
}
}
- 실행 결과를 보면 1부터 8까지의 실행 쓰레드가 데이터를
emit
하는 모습을 볼 수 있다. - 8개의 쓰레드는
CPU
에서 지원하는 논리적인 코어와 연관이 있다.
ParallerFlux 예제 코드 3
public class ParallelExample3 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
.parallel()
.runOn(Schedulers.parallel())
.subscribe(Logger::onNext);
Thread.sleep(100);
}
}
ParallerFlux
예제 코드 2와 마찬가지로 1부터 8까지의 총 8개의 쓰레드를 이용해서 병렬로 작업을 처리한 모습을 확인할 수 있다.
ParallerFlux 예제 코드 4
public class ParallelExample4 {
public static void main(String[] args) throws InterruptedException {
Flux.fromArray(new Integer[]{1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
.parallel(4)
.runOn(Schedulers.parallel())
.subscribe(Logger::onNext);
Thread.sleep(100);
}
}
Paraller Operator
의 파라미터로 쓰레드의 개수(여기서는 4)를 지정하면 컴퓨터에서 사용할 수 있는 논리적인 코어의 개수 범위 안에서 파라미터로 지정한 숫자만큼의 쓰레드를 사용해서 병렬 작업을 처리한다.
Paraller Operator
를 사용할 때 주의할 점으로는 Paraller Operator
와 Run on Operator
를 함께 사용해야만 병렬로 작업을 처리할 수 있다는 사실이다.
'Spring' 카테고리의 다른 글
[Spring Webflux] DispatcherHandler (0) | 2024.05.06 |
---|---|
[Spring Webflux] 컨텍스트 (Context) (0) | 2024.01.28 |
[Spring Webflux] 백프레셔 (Backpressure) (2) | 2024.01.15 |
[Spring Webflux] Cold Sequence & Hot Sequence (0) | 2024.01.07 |
[Spring] 메트릭(metric)을 직접 등록해보자 (0) | 2023.04.17 |
- Total
- Today
- Yesterday
- 김영한
- 코테
- MySQL
- 인프런
- webflux
- 노마드
- 구현
- 그리디
- Spring
- 노마드코더
- kotlin
- 스프링 부트
- Algorithm
- 리팩토링
- Real MySQL
- 자료구조
- 데이터베이스
- 코틀린
- spring boot
- 북클럽
- 릿코드
- 정렬
- leetcode
- 알고리즘
- 스프링
- 스프링부트
- mysql 8.0
- 백준
- 문자열
- 파이썬
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |