티스토리 뷰

리액티브 프로그래밍에서 퍼블리셔가 만들어내는 데이터의 흐름은 ColdHot 두가지로 나뉜다. 참고로 리액터에서는 퍼블리셔가 만들어내는 데이터의 흐름에 대해서 시퀀스라는 용어를 사용한다. 여기서 Cold PublisherHot Publisher 그리고 Cold SequenceHot Sequence는 거의 같은 의미라고 할 수 있다.


그렇다면 Cold SequenceHot Sequence는 대체 무슨 차이가 있는지 아래에서 알아보자.


Cold Sequence

Cold PublisherCold Sequence라는 데이터의 흐름을 만들어내는 퍼블리셔이다.

먼저 맨 위의 첫 번째 Subscriber의 구독이 발생하고, 그 뒤 Cold publusher는 1, 3, 5, 7이라는 데이터를 차례대로 emit한다. 그러면 Subscriberemit된 데이터를 차례대로 전달받게 된다.


다음 아래 시점에서 두 번째 Subscriber의 구독이 발생하게 되고, 첫 번째 구독이 발생하면서 Cold publusheremit한 1, 3, 5, 7의 데이터가 두 번째 구독이 발생한 경우에도 1, 3, 5, 7의 데이터가 emit되어서 Subscriber에게 차례대로 전달된다.


즉, Subscriber가 구독을 할 때 마다 타임라인에 처음부터 emit된 모든 데이터를 받을 수 있다. 이처럼 Subscriber의 구독 시점이 서로 다르더라도 구독을 할 때마다 퍼블리셔가 데이터를 emit하는 과정을 처음부터 다시 시작하는 데이터의 흐름을 바로 Cold Sequence라고 한다.


import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.util.Arrays;

@Slf4j
public class ColdSequenceExample {

    public static void main(String[] args) {
        Flux<String> coldFux = Flux.fromIterable(Arrays.asList("RED", "YELLOW", "PINK"))
                .map(String::toLowerCase);

        coldFux.subscribe(country -> log.info("# Subscriber1: {}", country));
        log.info("========================================================");
        coldFux.subscribe(country -> log.info("# Subscriber2: {}", country));
    }

}

  • 코드 결과를 보면 Subscriber1이 구독한 데이터를 Subscriber2가 다시 처음부터 구독하는 모습을 볼 수 있다.

Hot Sequence

첫 번째 Subscriber 같은 경우에는 1, 3, 5, 7의 모든 데이터를 전달받을 수 있다.


다음으로 3 이후에 두 번째 구독이 발생하게 되는데, 두 번째 Subscriber는 구독이 발생한 시점 이전에 emit된 1과 3의 데이터는 전달받지 못하고, 두 번째 구독이 발생한 시점 이후에 emit된 5와 7의 데이터만 전달 받을 수 있다.


그리고 5 이후에 세 번째 구독이 발생하게 되는데, 세 번째 Subscriber는 세 번쨰 구독이 발생한 시점 이전에 emit된 1, 3, 5의 데이터는 전달 받지 못하고, 세 번째 구독이 발생한 시점 이후에 emit된 7의 데이터만 전달 받을 수 있다.


즉, Subscriber가 구독한 시점의 타임라인부터 emit된 데이터를 받을 수 있다. Cold Publisher같은 경우에는 구독을 할 때마다 모든 데이터를 전달 받을 수 있었는데, Hot Publisher의 경우에는 구독한 시점에 타임라인부터 emit된 데이터를 전달 받을 수 있다.


import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.stream.Stream;

@Slf4j
public class HotSequenceExample {

    public static void main(String[] args) throws InterruptedException {
        Flux<String> concertFlux = Flux.fromStream(Stream.of("Singer A", "Singer B", "Singer C", "Singer D", "Singer E"))
                .delayElements(Duration.ofSeconds(1)).share(); // share() 원본 Flux를 여러 Subscriber가 공유한다.

        concertFlux.subscribe(singer -> log.info("# Subscriber1 is watching {}`s song.", singer));
        Thread.sleep(2500);

        concertFlux.subscribe(singer -> log.info("# Subscriber2 is watching {}`s song.", singer));
        Thread.sleep(3000);
    }

}

  • 코드 결과를 보면 Subscriber1는 모든 데이터를 전달 받지만, Subscriber2는 구독이 시작된 시점부터 데이터를 전달 받는 모습을 볼 수 있다.
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함