티스토리 뷰

백프레셔는 Publisher로부터 Subscriber에게 끊임없이 전달되는 데이터를 안정적으로 처리하기 위한 처리 방식이다. 이러한 백프레셔를 좀 더 쉽게 이해하기 위해서는 PublisherSubscriber가 어떤 방식으로 데이터를 주고 받는지 이해하는 것이 좋다.


Publisher와 Subscriber간의 프로세스

리액티브 프로그래밍은 PublisherSubscriber간의 Interaction이라고 볼 수 있다.

  1. 가장 먼저 Subscriber에서 subscribe() 메서드를 호출하면서 구독을 시작한다.
  2. Publisher에서는 구독이 정상적으로 이루어졌음을 onSubscribe signalsubscriber에게 알려준다.
  3. Subscriber에서 데이터를 전달받기 위해 request signalPublisher에게 전송한다.
  4. Publisher는 전달받은 request signal에 해당되는 onNext signalSubscriber에게 전송하면서 데이터를 emit한다.
  5. SubscriberPublisher로부터 전달받은 데이터의 처리가 끝나면 다시 request signal을 전송하면서 데이터를 요청한다.
  6. Publisher는 마찬가지로 onNext signalSubscriber에게 전송하면서 데이터를 emit한다.
  7. Publisher쪽에서 더이상 emit할 데이터가 없으면 마지막으로 onComplete signalSubscriber에게 전달한다.

Backpressure

백프레셔는 Publisher에서 emit되는 데이터를 Subscriber쪽에서 안정적으로 처리하기 위한 제어기능이다.

만약 Publisher쪽에서 데이터를 하나 emit했는데, Subscriber쪽에서 처리하는 속도가 상당히 느리다고 가정해보자.
Publisher는 계속해서 데이터를 emit하고 있지만, Subscriber는 아직 1번 데이터를 처리하고 있다면 최악의 경우에는 처리하지 못한 데이터들이 쌓여 시스템 자체가 다운될 수 있다. 이처럼 리액티브 프로그래밍에서 들어오는 데이터를 안정적으로 처리하기 위해서는 백프레셔라는 기능이 필요하다.


Backpressure 전략

리액터에서는 백프레셔를 처리하는 방법으로는 Subscriber가 적절히 처리할 수 있는 수준의 데이터 갯수를 Publisher에게 요청하는 방식과 함께 다양한 전략을 제공한다.

Backpressure DROP 전략

DROP 전략이란, 버퍼가 가득 찼을 때 들어오는 데이터, Publisher로부터 emit된 데이터를 버퍼가 비워질 때까지 하나씩 DROP되는 전략을 말한다.


public class BackPressureStrategyDropExample {
    public static void main(String[] args) {
        Flux
                .interval(Duration.ofMillis(1L))
                .onBackpressureDrop(dropped -> Logger.info("dropped: {}", dropped))
                .publishOn(Schedulers.parallel())
                .subscribe(data -> {
                            TimeUtils.sleep(5);
                            Logger.onNext(data);
                        },
                        Logger::onError
                );
        TimeUtils.sleep(2000);
    }
}

  • 버퍼가 가득찬 256번째 데이터부터 dropped되는 모습을 확인할 수 있다.

Backpressure LATEST 전략

LATEST 전략이란, 버퍼가 가득찬 상태에서는 데이터가 폐기 되었다가 버퍼가 비워지는 시점에 emit된 데이터 중에서 가장 최근에 emit된 데이터부터 버퍼 안으로 들어오는 전략을 말한다.


public class BackPressureStrategyLatestExample {
    public static void main(String[] args) {
        Flux
                .interval(Duration.ofMillis(1L))
                .onBackpressureLatest()
                .publishOn(Schedulers.parallel())
                .subscribe(data -> {
                            TimeUtils.sleep(5);
                            Logger.onNext(data);
                        },
                        Logger::onError
                );
        TimeUtils.sleep(2000);
    }
}

  • 256번 데이터부터 폐기 되었다가 버퍼가 비워지는 시점에 가장 최근에 emit된 1186번째 데이터부터 전달되는 모습을 확인할 수 있다.

Backpressure BUFFER DROP-LATEST 전략

BUFFER DROP-LATEST 전략이란, 버퍼가 가득차서 오버플로우가 발생했을 때, 가장 최근에 버퍼 안으로 들어온 데이터부터 DROP되는 전략을 말한다.


public class BackPressureStrategyBufferDropLatestExample {
    public static void main(String[] args) {
        Flux
                .interval(Duration.ofMillis(1L))
                .doOnNext(data -> Logger.info("emitted by original Flux: {}", data))
                .onBackpressureBuffer(2,
                        dropped -> Logger.info("Overflow & dropped: {}", dropped),
                        BufferOverflowStrategy.DROP_LATEST
                )
                .doOnNext(data -> Logger.info("emitted by Buffer: {}", data))
                .publishOn(Schedulers.parallel(), false, 1)
                .publishOn(Schedulers.parallel())
                .subscribe(data -> {
                            TimeUtils.sleep(1000);
                            Logger.onNext(data);
                        },
                        Logger::onError
                );
        TimeUtils.sleep(2000);
    }
}

  • 오버플로우가 발생하자 가장 최근에 버퍼 안으로 들어온 5번 데이터가 DROP되는 모습을 확인할 수 있다.

Backpressure BUFFER DROP-OLDEST 전략

BUFFER DROP-OLDEST 전략이란, 버퍼가 가득차서 오버플로우가 발생했을 때, 버퍼안의 데이터 중에서 가장 먼저 들어온(가장 오래된) 데이터부터 DROP되는 전략을 말한다.


public class BackPressureStrategyBufferDropOldestExample {
    public static void main(String[] args) {
        Flux
                .interval(Duration.ofMillis(1L))
                .doOnNext(data -> Logger.info("emitted by original Flux: {}", data))
                .onBackpressureBuffer(2,
                        dropped -> Logger.info("Overflow & dropped: {}", dropped),
                        BufferOverflowStrategy.DROP_OLDEST
                )
                .doOnNext(data -> Logger.info("emitted by Buffer: {}", data))
                .publishOn(Schedulers.parallel(), false, 1)
                .publishOn(Schedulers.parallel())
                .subscribe(data -> {
                            TimeUtils.sleep(1000);
                            Logger.onNext(data);
                        },
                        Logger::onError
                );
        TimeUtils.sleep(2000);
    }
}

  • 버퍼안의 데이터 0, 1이 emit되고, 오버플로우가 발생하자 가장 먼저 들어온 2번 데이터가 DROP되는 모습을 확인할 수 있다.
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함