brunch

매거진 ReactiveX

You can make anything
by writing

C.S.Lewis

by 한로니 Jun 28. 2016

RxJava, Observable 생성하기

RxJava에서는 옵저버블을 생성하는 다양한 방법을 제공한다. 기존 함수의 결과 값을 옵저버블로 발행하거나 리스트의 값을 하나씩 발행할 수도 있으며 옵저버블 생성을 구독이 발생할 때로 지연시킬 수도 있다. 심지어 아무것도 발행하지 않고 종료하거나, 에러를 발행하는 옵저버블을 생성할 수도 있는데 이번 포스트에서는 옵저버블을 생성하는 방법에 대해 알아본다.


Observable.create()

create() 함수는 옵저버블 생성 시 가장 많이 사용되는 함수 중 하나다. 이 함수는 OnSubscribe 객체를 파라미터로 가지며 구독이 발생하면 이 객체의 call() 함수가 실행된다. 옵저버에게 아이템을 발행하기 위해서는 call() 함수 내부에서 onNext(), onError(), onCompleted()를 적절히 호출해야 한다. onError()와 onCompleted()는 동시에 호출할 수 없는 상호 배타적 관계로 정의할 수 있는데, onError()가 호출될 때는 onCompleted()를 호출하지 않아야 하고, onCompleted()가 호출될 때는 onError()를 호출하지 않아야 한다. 즉, 이 두 함수 중 하나가 호출된 이 후에는 옵저버의 어떠한 함수도 호출하지 않아야 한다.

샘플 코드

Observable<Integer> source = Observable.create(subscriber -> {

    try {

        if (!subscriber.isUnsubscribed()) {

            for (int i = 1; i < 5; i++) {

                subscriber.onNext(i);

            }

            subscriber.onCompleted();

        }

    } catch (Exception e) {

        subscriber.onError(e);

    }

});

source.subscribe(

    i -> System.out.println("Next: " + i),

    e -> System.err.println("Error: " + e),

    () -> System.out.println("Completed"));


결과

Next: 1

Next: 2

Next: 3

Next: 4

Completed


Observable.defer()

defer()는 지연 초기화(Lazy Initializations)를 제공하는 함수다. 즉, 구독이 발생할 때 비로소 옵저버블을 생성한다. defer()의 파라미터로 Func0<R>을 가지는데 이 함수는 구독이 발생할 때마다 호출되기 때문에 매번 새로운 옵저버블 객체가 생성된다.

샘플 코드

Observable<Integer> source = Observable.defer(() -> {

    System.out.println("Create an observable");

    return Observable.just(0);

});


source.delaySubscription(1, TimeUnit.SECONDS)

    .subscribe(

        i -> System.out.println("Next: " + i),

        e -> System.err.println("Error: " + e),

        () -> System.out.println("Completed"));


source.delaySubscription(3, TimeUnit.SECONDS)

    .subscribe(

        i -> System.out.println("Next: " + i),

        e -> System.err.println("Error: " + e),

        () -> System.out.println("Completed"));


결과

Create an observable

Next: 0

Completed


Create an observable

Next: 0

Completed


Observable.fromCallable()

파라미터로 Callable을 갖는다. defer와 마찬가지로 구독이 발생할 때 Callable의 call() 함수가 호출되는 지연 초기화를 위한 함수다. 

샘플 코드

Observable<String> source = Observable.fromCallable(() -> "Hello World");

source.subscribe(

    i -> System.out.println("Next: " + i),

    e -> System.err.println("Error: " + e),

    () -> System.out.println("Completed"));


결과

Next: Hello World

Completed


Observable.interval()

특정 시간 간격을 주기로 0부터 증가하는 정수 값을 발행한다.

샘플 코드

Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS);

source.take(3).subscribe(

    i -> System.out.println("Next: " + i),

    e -> System.err.println("Error: " + e),

    () -> System.out.println("Completed"));


결과

Next: 0

Next: 1

Next: 2

Completed


Observable.range()

특정 범위 내의 정수 값을 순차적으로 발행하는 옵저버블을 생성한다. 파라미터로 시작 값과 개수를 갖는다. 

샘플 코드

Observable<Integer> source = Observable.range(1, 3);

source.subscribe(

    i -> System.out.println("Next: " + i),

    e -> System.err.println("Error: " + e),

    () -> System.out.println("Completed"));


결과

Next: 1

Next: 2

Next: 3

Completed


Observable.repeat()

아이템을 N번 발행한다. 파라미터로 아무것도 넘기지 않으면 아이템을 무한히 발행한다. 스케줄러로 trampoline을 사용하고, 변경 가능하다. 

샘플 코드

Observable<Integer> source = Observable.just(0)

    .repeat(3);

source.subscribe(

    i -> System.out.println("Next: " + i),

    e -> System.err.println("Error: " + e),

    () -> System.out.println("Completed"));


결과

Next: 0

Next: 0

Next: 0

Completed


Observalbe.timer()

특정 시간 이후에 숫자 0을 발행한다. 스케줄러로 computation을 사용하고, 변경 가능하다.

샘플 코드

Observable<Long> source = Observable.timer(3, TimeUnit.SECONDS);

source.subscribe(

    i -> System.out.println("Next: " + i),

    e -> System.err.println("Error: " + e),

    () -> System.out.println("Completed"));


결과

Next: 0

Completed


Observable.just()

just() 함수는 파라미터로 주어진 아이템을 옵저버블로 발행한다. 샘플 코드처럼 기존 함수의 반환 값을 옵저버블로 변환할 때 사용할 수도 있다. 1~10개까지의 아이템을 발행할 수 있도록 함수 오버로딩 되어있다. 

샘플 코드

Observable<String> source = Observable.just(getDevice(), getBrand());

source.subscribe(

    i -> System.out.println("Next: " + i),

    e -> System.err.println("Error: " + e),

    () -> System.out.println("Completed"));


결과

Next: vbox86p

Next: generic

Completed


Observable.from()

이터러블, 배열의 아이템을 순차적으로 발행하는 옵저버블을 생성한다.

샘플 코드

Observable<Integer> source = Observable.from(Lists.newArrayList(0, 1, 2));

source.subscribe(

    i -> System.out.println("Next: " + i),

    e -> System.err.println("Error: " + e),

    () -> System.out.println("Completed"));


결과

Next: 0

Next: 1

Next: 2

Completed


Observable.empty()

아무런 아이템을 발행하지 않고, 완료를 발행하는 옵저버블을 생성한다.

샘플 코드

Observable<Object> source = Observable.empty();

source.subscribe(

    i -> System.out.println("Next: " + i),

    e -> System.err.println("Error: " + e),

    () -> System.out.println("Completed"));


결과

Completed


Observable.never()

아무런 아이템도 발행하지 않고, 완료도 발행하지 않는 옵저버블을 생성한다.

샘플 코드

Observable<Object> source = Observable.never();

source.subscribe(

    i -> System.out.println("Next: " + i),

    e -> System.err.println("Error: " + e),

    () -> System.out.println("Completed"));


결과

아무것도 출력되지 않음


Observable.throw()

에러를 발행하는 옵저버블을 생성한다.

샘플 코드

Observable<Object> source = Observable.error(new RuntimeException("Unknown error"));

source.subscribe(

    i -> System.out.println("Next: " + i),

    e -> System.err.println("Error: " + e),

    () -> System.out.println("Completed"));


결과

Error: java.lang.RuntimeException: Unknown error

매거진의 이전글 RxJava, Observable과 생명주기

작품 선택

키워드 선택 0 / 3 0

댓글여부

afliean
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari