75.5 [Spring WebFlux] Project Reactor (줄여서 Reactor)

2022. 8. 12. 00:42카테고리 없음

Project Reactor란?

 

 

Reactor란?

[리액티브 프로그래밍] 유닛에서 언급했다시피 Reactor는 리액티브 스트림즈 표준 사양을 구현한 구현체 중 하나입니다.


Reactor는 Spring 5 버전부터 지원하는 리액티브 스택에 포함되어 리액티브한 애플리케이션으로 동작하는데 있어 핵심적인 역할을 담당하는 리액티브 프로그래밍을 위한 라이브러리입니다.


Reactor 특징

Reactor의 특징은 Reactor 공식 페이지에 명확하게 드러나 있습니다.

Reactor 공식 페이지의 화면을 보면서 Reactor의 특징을 이해해 보겠습니다.

[그림 4-2] 화면으로 보는 Reactor 특징.

참고 자료 : https://projectreactor.io/


(1) 여러분도 잘 알고있다시피 Reactor는 리액티브 스트림즈(Reactive Streams)를 구현한 리액티브 라이브러리 입니다.


(2) [그림 4-2]에서 가장 많이 나오는 용어가 바로 Non-Blocking입니다. Non-Blocking은 리액티브 프로그래밍의 핵심적인 특징이며, Reactor 역시 완전한 Non-Blocking 통신을 지원합니다.


Non-Blocking의 실체가 무엇인지는 WebFlux 유닛에서 Spring MVC 기반 애플리케이션과 Spring WebFlux 기반 애플리케이션을 직접 실행시켜서 눈으로 확인해 볼 예정이니 궁금하더라도 조금만 기다려주세요.

지금은 요청 쓰레드가 차단이 되지 않는다 정도로만 기억하면 됩니다!


(3) Reactor는 Publisher 타입으로 Mono[0|1]와 Flux[N]이라는 두 가지 타입을 제공합니다. Mono[0|1]에서 0과 1의 의미는 0건 또는 1건의 데이터를 emit 할 수 있음을 의미합니다.

Flux[N]에서 N의 의미는 여러 건의 데이터를 emit할 수 있음을 의미합니다.


(4) 서비스들 간의 통신이 잦은 MSA(Microservice Architecture) 기반 애플리케이션들은 요청 쓰레드가 차단되는 Blocking 통신을 사용하기에는 무리가 있습니다. 따라서 기본적으로 Non-Blocking 통신을 완벽하게 지원하는 Reactor는 MSA 구조에 적합한 라이브러리라고 볼 수 있습니다.


(5) Backpressure는 여러분들에게 굉장히 생소한 용어일 수 있습니다. Backpressure에 대한 개념을 그림으로 이해해 봅시다.

[그림 4-3] Backpressure가 적용되지 않은 Publisher와 Subscriber

[그림 4-3] 리액티브 프로그래밍의 핵심 컴포넌트인 Publisher와 Subscriber의 인터랙션을 단순하게 표현한 그림입니다.


리액티브 프로그래밍에서는 끊임없이 들어오는 데이터를 적절하게 처리할 수 있어야 합니다.

[그림 4-3]과 같이 Publisher에서 끊임없이 들어오는 데이터를 emit하는 것과 달리 Subscriber의 처리 속도가 느리면 어떻게 될까요?


아직 처리되지 않고 대기하는 데이터가 지속적으로 쌓이는 것을 방치하게되면 오버플로우가 발생하게 되고 급기야는 시스템이 다운될 수 있습니다.


Backpressure란 [그림 4-4]에서 보다시피 Subscriber의 처리 속도가 Publihser의 emit 속도를 따라가지 못할 때 적절하게 제어하는 전략을 의미합니다.


Backpressure는 리액티브 프로그래밍에서 데이터를 적절하게 제어하기 위해 반드시 필요한 전략이며, Reactor는 이러한 Backpressure 전략을 잘 지원해주고 있습니다.



Hello Reactor로 알아보는 Reactor 구성 요소

 

package com.codestates.example;

import reactor.core.publisher.Mono;

// 리액티브 프로그래밍 기본 구조
public class HelloReactiveExample02 {
    public static void main(String[] args) {
        Mono
            .just("Hello, Reactive")
            .subscribe(message -> System.out.println(message));
    }
}
[코드 4-10] 리액티브 프로그래밍 기본 구조

코드 4-10은 [리액티브 프로그래밍] 유닛에서 살펴본 리액티브 프로그래밍 기본 구조를 이해하기 위한 예제 코드입니다.



코드 4-10에 Reactor의 코드를 조금 더 추가해서 Reactor의 기본 구성 요소를 살펴보겠습니다.

package com.codestates.example;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class HelloReactorExample {
    public static void main(String[] args) throws InterruptedException {
        Flux    // (1)
            .just("Hello", "Reactor")               // (2)
            .map(message -> message.toUpperCase())  // (3)
            .publishOn(Schedulers.parallel())       // (4)
            .subscribe(System.out::println,         // (5)
                    error -> System.out.println(error.getMessage()),  // (6)
                    () -> System.out.println("# onComplete"));        // (7)

        Thread.sleep(100L);
    }
}
[코드 4-11] Reactor의 기본 구성 요소 예제

코드 4-11의 코드는 짧은 코드이지만 Reactor에서 사용되는 핵심 구성 요소를 이해하기에 적절한 예제 코드입니다.

 

  • (1)은 Reactor Sequence의 시작점입니다. (1)에서 Flux로 시작한다는 것은 Reactor Sequence가 여러 건의 데이터를 처리함을 의미합니다.

 

  • (2) just() Operator는 원본 데이터 소스로부터(Original Data Source) 데이터를 emit하는 Publisher의 역할을 합니다.

 

  • (3) map() Operator는 Publisher로부터 전달 받은 데이터를 가공하는 Operator입니다. (3)에서는 just() Operator에서 emit된 영문 문자열을 대문자로 변환하고 있습니다.
  • Reactor에서는 map() Operator처럼 데이터를 가공 처리할 수 있는 수많은 Operator를 지원합니다.

 

  • (4) publishOn() Operator는 Reactor Sequence에서 쓰레드 관리자 역할을 하는 Scheduler를 지정하는 Operator입니다.즉, 코드 4-11에서는 Reactor Sequence 상에서 두 개의 쓰레드가 실행됩니다.
  • (4)와 같이 publishOn() Operator에 Scheduler를 지정하면 publishOn()을 기준으로 Downstream의 쓰레드Scheduler에서 지정한 유형의 쓰레드로 변경됩니다.

 

  • 코드 4-11에서 subscribe()는 파라미터로 총 세 개의 람다 표현식을 가지는데 (5)의 첫 번째 파라미터는 Publisher가 emit한 데이터를 전달 받아서 처리하는 역할을 합니다.

 

  • (6)의 두 번째 파라미터는 Reqctor Sequence 상에서 에러가 발생할 경우, 해당 에러를 전달 받아서 처리하는 역할을 합니다.

 

  • (7)의 세 번째 파라미터는 Reactor Sequence가 종료된 후 어떤 후처리를 하는 역할을 합니다.

 

subscribe() 메서드로 전달된 세 개의 람다 표현식은 Subscriber에게 전달되어 각각의 동작을 수행한다는 사실을 기억하세요!

 

HELLO
REACTOR
# onComplete

 

코드 4-11의 출력 결과를 보면 두 개의 문자열이 map() Operator를 거쳐 대문자로 변환된 후, subscribe()의 첫 번째 람다 표현식으로 전달되어, 출력 되었습니다.

 

“# onComplete” 문자열이 출력되었다는 것은 Reactor의 Sequence가 정상적으로 종료되었다는 의미입니다.

즉, subscribe()의 세 번째 파라미터인 람다 표현식은 Reactor Sequence가 정상적으로 종료되면 동작을 수행한다는 사실을 알 수 있습니다.

 

코드 4-11의 마지막 라인에 있는 Thread.sleep(100L);은 무슨 역할을 할까라고 궁금해 하는 분들이 있을 것 같습니다.

Reactor Sequence에 Scheduler를 지정하면 main 쓰레드 이외에 별도의 쓰레드가 하나 더 생깁니다.

Reactor에서 Scheduler로 지정한 쓰레드는 모두 데몬 쓰레드이기 때문에 주 쓰레드인 main 쓰레드가 종료되면 동시에 종료됩니다.

따라서 main 쓰레드를 Thread.sleep(100L)을 통해 0.1초 정도 동작을 지연시키면 그 0.1초 사이에 Scheduler로 지정한 데몬 쓰레드를 통해 Reactor Sequence가 정상 동작을 하게 됩니다.

 


 

핵심 포인트

  • Reactor는 리액티브 스트림즈 표준 사양을 구현한 구현체 중 하나이다.
  • Reactor는 완전한 Non-Blocking 통신을 지원한다.
  • Reactor는 기본적으로 Mono[0|1]Flux[N]이라는 두 가지 Publisher 타입을 제공한다.
  • Non-Blocking 통신을 완벽하게 지원하는 Reactor는 MSA 구조에 적합한 라이브러리이다.
  • BackpressureSubscriber의 처리 속도가 Publihser의 emit 속도를 따라가지 못할 때 적절하게 제어하는 전략을 의미한다.