ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • RxJava의 스케줄러
    프로그래밍 2018. 4. 18. 15:49

    Schedulers in RxJava

    http://www.baeldung.com/rxjava-schedulers 문서 번역임.
    RxJava에서 제공하는 Schedulers를 이해하는 데 좋은 자료더군요. 테스트 코드는 조금 오류가 있어서 코멘트 달았습니다..


     

    1. 개요 Overview

    이 문서는 RxJava의 여러가지 Schedulers에 대해 설명한다. Schedulers는 RxJava에서 옵저버블(Observable)의 subscribeOn 또는 observeOn 함수를 통해 멀티 스레딩 작업을 하는 데 사용한다. Schedulers는 옵저버블 연결에서 실행되는 작업들을 언제-어디서 실행할지 결정할 수 있게 해 준다.
    Schedulers 클래스 팩토리 함수를 통해 하나의 스케줄러를 생성할 수 있다.

     

    2. 기본 스레드 동작 (Default Threading Behavior)

    기본적으로, Rx는 싱글 스레드이고, 옵저버블에 구독 함수(subscribe)가 불리는 곳의 스레드를 사용하게 된다. 그리고 오퍼레이터로 연결된 영역에서도 동일한 스레드를 사용하게 된다. observeOn과 subscribeOn 함수는 스케줄러를 인수로 받고 있는데, 이름에서도 알 수 있듯이 함수들은 독립적인 실행 항목을 스케줄링하기 위해 사용한다.
    우리는 createWorker 함수를 이용해 스케줄러 구현체를 만들 수 있다. 이 함수는 Scheduler.Worker를 반환한다. 이 워커(Worker)는 하나의 스레드로 순차적으로 작업들을 받아 실행한다.

    이런 경우, 하나의 Worker는 하나의 스케줄러 자신이지만, 혼란스럽지 않도록 스케줄러라고 하지 않는다.

     

    2.1. 실행항목 스케줄링 Scheduling an Action

    새로운 워커를 만들고 실행들을 예약하는 방식으로 어떤 스케줄러이든지 작업들을 예약할 수 있다:

    
    Scheduler scheduler = Schedulers.immediate();
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(() -> result += "action");
      
    Assert.assertTrue(result.equals("action"));
    

    위 코드에서 액션은 워커가 연결된 스레드의 대기열에 포함되게 된다.

     

    2.2. 실행항목 취소 Canceling an Action

    워커는 Subscription의 확장 클래스이다. 워커에 unsubscribe 함수를 실행하면 큐(queue)를 비우게 할 것이고 대기하는 모든 일들이 취소되게 된다. 다음 예제에서 확인할 수 있다:

    
    Scheduler scheduler = Schedulers.newThread();
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(() -> {
        result += "First_Action";
        worker.unsubscribe();
    });
    worker.schedule(() -> result += "Second_Action");
    
    Assert.assertTrue(result.equals("First_Action"));
    

    두 번째 태스크는, 앞서 실행된 태스크가 모든 작업을 취소했기 때문에 절대 실행되지 않는다. 이미 실행 중인 항목들은 정지된다.

     

    3. Schedulers.newThread

    이 스케줄러는 subscribeOn 또는 observeOn을 통해 요청될 때마다 단순히 새로운 스레드를 만든다.
    이것은 정말 좋은 선택이 아니다. 스레드를 시작하는 비용뿐만 아니라 생성된 스레드를 재사용되지도 않는다.

    
    Observable.just("Hello")
      .observeOn(Schedulers.newThread())
      .doOnNext(s ->
        result1 += Thread.currentThread().getName()
      )
      .observeOn(Schedulers.newThread())
      .subscribe(s ->
        result2 += Thread.currentThread().getName()
      );
    Thread.sleep(500);
    Assert.assertTrue(result1.equals("RxNewThreadScheduler-1"));
    Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));
    

    워커가 끝나면 스레드는 바로 소멸된다. 작업을 끝내는 데 오랜 시간이 걸리는- 그런 거친 작업들인 경우 아래와 같이 스케줄을 사용할 수 있다. 그런데 이런 경우는 극히 드물고 스레드가 전혀 재사용되지 않는다.

    
    Scheduler scheduler = Schedulers.newThread();
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(() -> {
        result += Thread.currentThread().getName() + "_Start";
        worker.schedule(() -> result += "_worker_");
        result += "_End";
    });
    Thread.sleep(3000);
    Assert.assertTrue(result.equals(
      "RxNewThreadScheduler-1_Start_End_worker_"));
    

    생성된 스케줄러 스레드의 워커에 작업을 예약할 때, 워커가 특정 스레드에 포함되어 있음을 알 수 있다.

     

    4. Schedulers.immediate

    Schedulers.immediate는 특별한 스케줄러이다. 비동기 방식이 아닌 블로킹 방식으로 실행이 완료되면 반환 처리하며, 요청한 스레드 안에서 작업을 수행한다:

    
    // immediate 는 2.x 버전에서 사라졌다.
    Scheduler scheduler = Schedulers.immediate();
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(() -> {
        result += Thread.currentThread().getName() + "_Start";
        worker.schedule(() -> result += "_worker_");
        result += "_End";
    });
    Thread.sleep(500);
    Assert.assertTrue(result.equals(
      "main_Start_worker__End"));
    

    사실, immediate Scheduler로 옵저버블을 구독하는 것은 일반적으로 스케줄러를 지정하지 않고 구독하는 형태와 동일하다:

    
    Observable.just("Hello")
      .subscribeOn(Schedulers.immediate())
      .subscribe(s ->
        result += Thread.currentThread().getName()
      );
    Thread.sleep(500);
    Assert.assertTrue(result.equals("main"));
    

     

    5. Schedulers.trampoline

    트램펄린 스케줄러는 immediate 스케줄러와 아주 비슷하다. 블록킹 방식으로 동일한 스레드에 작업들을 예약해놓기 때문이다. 그런데, 등록되는 작업은 이전에 예약한 모든 작업들이 완료되면 실행된다.

    
    Observable.just(2, 4, 6, 8)
      .subscribeOn(Schedulers.trampoline())
      .subscribe(i -> result += "" + i);
    Observable.just(1, 3, 5, 7, 9)
      .subscribeOn(Schedulers.trampoline())
      .subscribe(i -> result += "" + i);
    Thread.sleep(500);
    Assert.assertTrue(result.equals("246813579"));
    

    Immediate는 예약하는 작업을 바로 실행하는 반면, 트램펄린은 현재 작업이 끝날 때까지 기다린다. 트램폴린 워커는 처음 작업을 예약할 때 사용된 스레드를 사용하여 모든 작업들을 처리한다. 첫 번째 schedule 호출은 대기열이 비워질 때까지 차단되고 있다:

    
    Scheduler scheduler = Schedulers.trampoline();
    Scheduler.Worker worker = scheduler.createWorker();
    worker.schedule(() -> {
        result += Thread.currentThread().getName() + "Start";
        worker.schedule(() -> {
            result += "_middleStart";
            worker.schedule(() ->
                result += "_worker_"
            );
            result += "_middleEnd";
        });
        result += "_mainEnd";
    });
    Thread.sleep(500);
    Assert.assertTrue(result
      .equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));
    

     

    6. Schedulers.from

    Schedulers는 java.util.concurrent의 Executors 보다 내부적으로 더 복잡하다 - 그래서 다른 개념으로 생각해야 된다.
    그런데 이 두 개는 콘셉트가 매우 비슷하기 때문에, 당연하게도 Executor를 스케줄러로 변경해줄 수 있는 'from' 생성 함수를 갖고 있는 wrapper가 있다.

    
    private ThreadFactory threadFactory(String pattern) {
        return new ThreadFactoryBuilder()
          .setNameFormat(pattern)
          .build();
    }
     
    @Test
    public void givenExecutors_whenSchedulerFrom_thenReturnElements() 
     throws InterruptedException {
      
        ExecutorService poolA = newFixedThreadPool(
          10, threadFactory("Sched-A-%d"));
        Scheduler schedulerA = Schedulers.from(poolA);
        ExecutorService poolB = newFixedThreadPool(
          10, threadFactory("Sched-B-%d"));
        Scheduler schedulerB = Schedulers.from(poolB);
     
        Observable<String> observable = Observable.create(subscriber -> {
          subscriber.onNext("Alfa");
          subscriber.onNext("Beta");
          subscriber.onCompleted();
        });;
     
        observable
          .subscribeOn(schedulerA)
          .subscribeOn(schedulerB)
          .subscribe(
            x -> result += Thread.currentThread().getName() + x + "_",
            Throwable::printStackTrace,
            () -> result += "_Completed"
          );
        Thread.sleep(2000);
        Assert.assertTrue(result.equals(
          "Sched-A-0Alfa_Sched-A-0Beta__Completed"));
    }
    

    스케줄러 B는 짧은 시간 동안 사용되지만, 모든 작업을 처리하는 스케줄러 A의 새 액션을 전혀 예약하지 못한다. 따라서, 여러 subscribeOn 함수는 무시될 뿐만 아니라 작은 부하를 발생시킨다.

     

    7. Schedulers.io

    이 스케줄러는 이미 시작된 스레드를 재활용되고 향후 요청을 처리할 수 있다는 점을 제외하면 newThread와 비슷하다. 이 구현은 java.util.concurrent의 ThreadPoolExecutor를 무제한 스레드 풀로 사용하는 것과 유사하게 동작한다. 새로운 워커가 요청될 때마다 새 스레드가 시작되거나(그리고 나중에 잠시 유휴 상태가 됨) 유휴 스레드가 재사용된다.

    
    Observable.just("io")
      .subscribeOn(Schedulers.io())
      .subscribe(i -> result += Thread.currentThread().getName());
      
    Assert.assertTrue(result.equals("RxIoScheduler-2"));
    

    어떤 종류든 무제한 리소스를 주의해야 한다. 웹서비스와 같이 느리거나 응답 없는 외부 의존성이 있는 경우 io 스케줄러는 엄청나게 많은 스레드를 시작할 수 있어서, 우리 자체 애플리케이션을 응답하지 않게 만든다.
    실제로는, Schedulers.io를 따르는 것이 거의 항상 올바른 선택이다.

     

    8. Schedulers.computation

    연산 스케줄러는 기본적으로 동시에 동작하는 스레드의 개수를, Runtime.getRuntime() 유틸리티 클래스의 availableProcessors() 값으로 제한한다.
    그래서 온전히 CPU 범위 안에서 처리되는 일들은 연산 스케줄러를 사용해야 한다. 그런 것들은 계산 성능을 필요로 하며 블록킹 코드가 없어야 한다.
    이 스케줄러는 모든 스레드 앞에 제한 없는 큐 하나를 사용한다. 그래서 작업이 예약될 때, 모든 코어가 점유된 상태이면 큐에 쌓이게 된다. 한편, 각각의 스레드 앞에 큐는 증가할 것이다:

    
    Observable.just("computation")
      .subscribeOn(Schedulers.computation())
      .subscribe(i -> result += Thread.currentThread().getName());
    Assert.assertTrue(result.equals("RxComputationScheduler-1"));
    

    만약 어떤 이유로 기본과 다른 스레드 개수가 필요하면, rx.scheduler.max-computation-threads 시스템 속성을 언제든 사용할 수 있다.
    적은 스레드를 사용함으로써 하나 이상의 CPU 코어를 유휴 상태로 만들 수 있다. 그러면 무거운 작업에서도 연산 스레드 풀이 서버를 포화시키지 않는다. 연산 스레드 개수가 코어보다 많을 수는 없다.

     

    9. Schedulers.test

    이 스케줄러는 테스트 목적을 위해 사용된다. 개발 코드에서는 볼 수 없어야 한다. 이것의 주요 장점은 임의적으로 시간을 조정할 수 있다는 것이다:

    
    List<String> letters = Arrays.asList("A", "B", "C");
    TestScheduler scheduler = Schedulers.test();
    TestSubscriber<String> subscriber = new TestSubscriber<>();
     
    Observable<Long> tick = Observable
      .interval(1, TimeUnit.SECONDS, scheduler);
     
    Observable.from(letters)
      .zipWith(tick, (string, index) -> index + "-" + string)
      .subscribeOn(scheduler)
      .subscribe(subscriber);
     
    subscriber.assertNoValues();
    subscriber.assertNotCompleted();
     
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    subscriber.assertNoErrors();
    subscriber.assertValueCount(1);
    subscriber.assertValues("0-A");
     
    scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
    subscriber.assertCompleted();
    subscriber.assertNoErrors();
    subscriber.assertValueCount(3);
    assertThat(
      subscriber.getOnNextEvents(), 
      hasItems("0-A", "1-B", "2-C"));
    

     

    10. 기본 스케줄러 Default Schedulers

    RxJava에서 몇몇 옵저버블 연산자는 그 연산자에서 사용할 스케줄러를 지정할 수 있는 형태를 갖고 있다. 그 외 것들은 어떤 스케줄러를 지정할 수 없으며 특정한 기본 스케줄러에서 동작한다.
    예로, 'delay' 연산자는 상위 이벤트들 받아 주어진 시간 후에 하위 스트림으로 보낸다. 말할 필요 없이, 이것은 그 시간 동안 원래 스레드를 잡고 있을 수 없기 때문에 반드시 다른 스케줄러를 사용해야 한다:

    
    ExecutorService poolA = newFixedThreadPool(
      10, threadFactory("Sched1-"));
    Scheduler schedulerA = Schedulers.from(poolA);
    Observable.just('A', 'B')
      .delay(1, TimeUnit.SECONDS, schedulerA)
      .subscribe(i -> result+= Thread.currentThread().getName() + i + " ");
     
    Thread.sleep(2000);
    Assert.assertTrue(result.equals("Sched1-A Sched1-B "));
    

    위 코드에서 임의 스케줄러 A를 지정하지 않으면, 'delay' 아래 연산자들은 연산 스케줄러(computation scheduler)를 사용한다. 임의 스케줄러를 제공하는 다른 주요 연산자들에는 이것들이 있다. buffer, interval, range, timer, skip, take, timeout 등등. 이런 연산자들에 스케줄러를 제공하지 않으면, 연산 스케줄러가 이용된다. 연산 스케줄러는 대부분 상황에서 안전하다.

     

    11. 결론 Conclusion

    진정한 리액티브 애플리케이션에서 매우 적은 스레드와 스케줄러가 필요하다.
    스케줄러를 숙달하는 것은 RxJava를 사용하여 확장 가능하고 안전한 코드를 작성하는 데 필수적이다. subscribeOn과 observeOn의 차이는 모든 작업들이 우리가 기대하는 시기에 정확하게 실행돼야 하는 고부하 상황에서 특히 중요하다.

    마지막으로, 우리는 다운 스트림에 사용된 스케줄러가 스케줄러 업스트림에 의해 생성된 로드를 유지시킬 수 있어야 한다. 자세한 내용은 backpressure에 관한 문서에 있다.

    모든 예제와 짧은 코드의 구현은 GitHub 프로젝트에서 찾을 수 있다. 

    댓글 0

Designed by Tistory.