RxJava에서는 옵저버블을 생성하는 다양한 방법을 제공한다. 기존 함수의 결과 값을 옵저버블로 발행하거나 리스트의 값을 하나씩 발행할 수도 있으며 옵저버블 생성을 구독이 발생할 때로 지연시킬 수도 있다. 심지어 아무것도 발행하지 않고 종료하거나, 에러를 발행하는 옵저버블을 생성할 수도 있는데 이번 포스트에서는 옵저버블을 생성하는 방법에 대해 알아본다.
create() 함수는 옵저버블 생성 시 가장 많이 사용되는 함수 중 하나다. 이 함수는 OnSubscribe 객체를 파라미터로 가지며 구독이 발생하면 이 객체의 call() 함수가 실행된다. 옵저버에게 아이템을 발행하기 위해서는 call() 함수 내부에서 onNext(), onError(), onCompleted()를 적절히 호출해야 한다. onError()와 onCompleted()는 동시에 호출할 수 없는 상호 배타적 관계로 정의할 수 있는데, onError()가 호출될 때는 onCompleted()를 호출하지 않아야 하고, onCompleted()가 호출될 때는 onError()를 호출하지 않아야 한다. 즉, 이 두 함수 중 하나가 호출된 이 후에는 옵저버의 어떠한 함수도 호출하지 않아야 한다.
Observable<Integer> source = Observable.create(subscriber -> {
try {
if (!subscriber.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
});
source.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
Next: 1
Next: 2
Next: 3
Next: 4
Completed
defer()는 지연 초기화(Lazy Initializations)를 제공하는 함수다. 즉, 구독이 발생할 때 비로소 옵저버블을 생성한다. defer()의 파라미터로 Func0<R>을 가지는데 이 함수는 구독이 발생할 때마다 호출되기 때문에 매번 새로운 옵저버블 객체가 생성된다.
Observable<Integer> source = Observable.defer(() -> {
System.out.println("Create an observable");
return Observable.just(0);
});
source.delaySubscription(1, TimeUnit.SECONDS)
.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
source.delaySubscription(3, TimeUnit.SECONDS)
.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
Create an observable
Next: 0
Completed
Create an observable
Next: 0
Completed
파라미터로 Callable을 갖는다. defer와 마찬가지로 구독이 발생할 때 Callable의 call() 함수가 호출되는 지연 초기화를 위한 함수다.
Observable<String> source = Observable.fromCallable(() -> "Hello World");
source.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
Next: Hello World
Completed
특정 시간 간격을 주기로 0부터 증가하는 정수 값을 발행한다.
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS);
source.take(3).subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
Next: 0
Next: 1
Next: 2
Completed
특정 범위 내의 정수 값을 순차적으로 발행하는 옵저버블을 생성한다. 파라미터로 시작 값과 개수를 갖는다.
Observable<Integer> source = Observable.range(1, 3);
source.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
Next: 1
Next: 2
Next: 3
Completed
아이템을 N번 발행한다. 파라미터로 아무것도 넘기지 않으면 아이템을 무한히 발행한다. 스케줄러로 trampoline을 사용하고, 변경 가능하다.
Observable<Integer> source = Observable.just(0)
.repeat(3);
source.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
Next: 0
Next: 0
Next: 0
Completed
특정 시간 이후에 숫자 0을 발행한다. 스케줄러로 computation을 사용하고, 변경 가능하다.
Observable<Long> source = Observable.timer(3, TimeUnit.SECONDS);
source.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
Next: 0
Completed
just() 함수는 파라미터로 주어진 아이템을 옵저버블로 발행한다. 샘플 코드처럼 기존 함수의 반환 값을 옵저버블로 변환할 때 사용할 수도 있다. 1~10개까지의 아이템을 발행할 수 있도록 함수 오버로딩 되어있다.
Observable<String> source = Observable.just(getDevice(), getBrand());
source.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
Next: vbox86p
Next: generic
Completed
이터러블, 배열의 아이템을 순차적으로 발행하는 옵저버블을 생성한다.
Observable<Integer> source = Observable.from(Lists.newArrayList(0, 1, 2));
source.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
Next: 0
Next: 1
Next: 2
Completed
아무런 아이템을 발행하지 않고, 완료를 발행하는 옵저버블을 생성한다.
Observable<Object> source = Observable.empty();
source.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
Completed
아무런 아이템도 발행하지 않고, 완료도 발행하지 않는 옵저버블을 생성한다.
Observable<Object> source = Observable.never();
source.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
아무것도 출력되지 않음
에러를 발행하는 옵저버블을 생성한다.
Observable<Object> source = Observable.error(new RuntimeException("Unknown error"));
source.subscribe(
i -> System.out.println("Next: " + i),
e -> System.err.println("Error: " + e),
() -> System.out.println("Completed"));
Error: java.lang.RuntimeException: Unknown error