이전 글에서 살펴본 future는 하나의 데이터(결괏값)를 then()에서 전달받았다. 반면에 stream은 연속된 데이터를 listen()을 통해서 비동기적으로 처리할 수 있다. 예를 들면 실시간으로 데이터를 처리할 때 future는 이미지 파일 하나를 다운로드하여 보여줄 때 적합하다면 stream은 동영상(연속된 이미지)을 보여주는 데 사용할 수 있다. stream은 이름처럼 흔히 말하는 스트리밍(streaming) 서비스의 동작 방식과 다를 바 없는것이다.
stream 동작 방식
이해를 돕기 위해 다음과 같은 가상의 동영상 스트리밍 환경이 있다고 가정하자.
- 실시간 스트리밍을 위한 동영상 파일은 여러 장의 이미지 파일로 구성됨
- 해당 파일은 서버에 존재하며 서버는 한 번에 이미지 파일 한 장씩 전송
- 클라이언트(future or stream)에서는 한 번에 이미지 파일 한 장씩 수신
- 각 이미지 파일은 최소 2초 이내에 가져와야 원활한 재생이 가능
(엉뚱한 조건이지만 개념을 구체화하기 위한 극적인 장치이다.)
위 조건을 기반으로 future와 stream의 데이터 처리 과정을 그림으로 나타내면 다음과 같다.
future는 서버에 데이터를 요청하면 image01 하나만 수신하고 해당 파일만을 결괏값으로 가져오고 끝난다. 타입이 Future<image>라고 생각하면 된다. 하지만 stream은 image01을 수신하면 listen()에서 처리한 후 끝나는 게 아니다. 이어서 image02를 수신하면 바로 listen()에서 처리할 수 있도록 전달한다. 이런 식으로 동영상의 끝까지 처리가 가능하다. listen()에서는 수신한 이미지를 연속으로 화면에 보여주는 처리를 하면 된다.
다시 정리하여 좀 더 간략하게 표현하면 다음과 같다.
future는 서버에 데이터를 요청한 후 수신한 결괏값에 대해서 then()으로 전달한다.
future
stream은 구독자 패턴(또는 관찰자 패턴, Observer pattern)이다. 구독자(listen)가 관찰 대상(stream)을 구독하여 관찰 대상에 변화가 발생하면 구독자에게 그 변화를 알려준다.
stream
서버에서 데이터를 받아오는 동작 중에 image01이 수신되면(=stream 변화 발생) 곧바로 listen()(=구독자)에게 전달하고 대기한다. 그러다 또 어느 타이밍에 image02가 수신되면 listen에게 변화를 알려주는 것을 반복하는 것이다.
'future에서 for문으로 List에 image들을 전부 넣어서 결괏값으로 넘겨주면 되잖아!'라고 할 수 있지만 여기서는 실시간 스트리밍이 목적이다. List로 저장하여 넘겨주는 것은 실시간이 아니라 동영상 파일을 다운로드한 후 재생하는 것과 같다. 즉 다음 그림과 같이 처리된다.
stream 예제 코드
1) stream을 만드는 다양한 방법
stream의 동작 방식에 대한 감이 잡혔다면 실제로 stream을 생성하는 다양한 방법에 대해서 살펴보자.
Line 3 : 하나의 데이터에 대한 이벤트를 발생하는 stream을 생성한다. 100이란 정수형 데이터를 넘겨주면 곧바로 listen()에서 출력하는 단순한 동작이다.
특정 주기로 반복적으로 이벤트를 발생하는 stream을 생성할 수도 있다.
Line 3 : stream을 생성한다. Stream.periodic()은 특정 주기로 반복적으로 이벤트를 발생하는 stream을 만드는 것이다. 첫 번째 인자는 Duration() 객체이고, 두 번째 인자는 이벤트에서 발생한 값을 계산하는 함수이다. Duration은 1초 설정하여 1초 간격으로 설정했고 계산 함수는 디폴트인 카운트 함수를 사용하도록 했다. 0부터 시작하여 1초에 1씩 증가한다. take()는 몇 회까지 반복할지 정해주는 역할을 한다.
Line 4 : listen()은 stream의 변화를 관찰하여 변화가 있을 때, 즉 새로운 데이터 입력 시 해당 데이터를 출력해준다. var stream = Stream.periodic(Duration(seconds: 1), (x) => x).take(5).listen(print); 이렇게 쭉 이어서 listen을 사용할 수도 있다.
Line 8~14 : 실행 결과이다. stream에 1초에 한 번씩 데이터가 들어오면 listen()에서 그때마다 출력을 한 것이다.
periodic 외에도 fromIterable을 통해서 List와 같은 형태의 데이터를 다룰 수 있다. 또한 future를 다루려면 fromFuture를 사용하면 된다. 간단한 예시는 다음과 같다.
Line 3~6 : periodic의 listen을 조금 수정했다. builder 형식으로 바꾸고 print 부분도 변경되었다.
Line 8~9 : List<dynamic> 타입의 데이터에서 값을 받아서 처리하고 있다. 각 요소를 순차적으로 가져와서 출력한다.
Line 11~12 : future를 처리하는 stream이다. getData()를 보면 3초 후에 'after 3 seconds'라는 문자열을 결괏값으로 가진다. 따라서 periodic의 결과와 함께 보면 3초에 출력되는 것을 확인할 수 있다.
2) StreamController 사용하기
비동기 함수에 의해서 전달되는 형태가 아니라 stream에 이벤트를 직접 지정해주고 싶다면 StreamController를 사용하자. StreamController으로 stream을 만들고 이벤트를 채워 넣으면 된다.
Line 6 : StreamController를 생성한다. StreamController은 멤버로 stream을 포함하고 있다.
Line 7 : StreamController으로 만든 stream에 대한 구독을 위한 listen을 등록한다.
Line 9~12 : add()를 통해 이벤트를 추가한다. 각 이벤트가 발생하면 listen에서 출력으로 처리한다.
Line 13 : stream을 닫는다.
기본적으로 하나의 stream에 대한 구독자(listen)는 1개만 등록할 수 있다. 만약 2개 이상 등록하고 싶으면 어떻게 해야 할까? broadcast를 사용하면 된다. stream : 제 방송 듣고 싶은 사람은 모두 들으세요~! 이런 의미다.
Line 8 : broadcast가 아닌 stream에 listen을 2개 등록하면 에러가 발생한다.
Line 10 : StreamController를 만들 때 broadcast로 생성하였다.
Line 11~12 : 하나의 stream에 listen()을 2개 등록하였다. broadcast라 가능하다.
3) async*, yield 사용하기
제너레이터(Generator) 함수는 반복 가능한 함수이다. 보통 함수는 return을 맞이하면 종료된다. 하지만 제너레이터 함수는 return 대신에 yield를 사용한다. 제너레이터 함수를 만드는 방법은 비동기 함수와 비슷하게 함수명 뒤에 async* 라는 키워드를 붙인다. 이러한 제너레이터 함수의 리턴 타입은 Stream이다. 다시 말하면 Stream 함수를 만들기 위해서 async*를 사용하는 것이다.
Line 6 : 함수를 통해 stream을 생성하였다.
Line 12~16 : 제너레이터 함수인 getData()를 구현했다. 함수의 타입이 Stream<int>인 것에 주목할 필요가 있다. 따라서 반복적으로 생성되는 데이터를 stream으로 전달하여 listen에서 처리가 가능하다.
초반에 예시로 언급했던 가상의 동영상 스트리밍 환경에 대한 동작을 처리하는 가상의 코드는 대충 다음과 같을 것이다. (가상의 환경에 대한 가상의 코드라니...)
Line 6 : 서버에 데이터를 요청하는 Stream 함수를 통해서 stream을 만든다.
Line 12~17 : 서버에 데이터를 요청하면 4개의 이미지가 약 1초 간격으로 하나씩 전달되는 상황을 표현한 예시 코드이다.
실제 서버가 없는 상황이라 서버에 요청할 수는 없다. 따라서 실제라면 서버에 데이터를 요청하면 4개의 이미지가 약 1초 간격으로 하나씩 전달되는 어떤 restful api가 있어야 하고 거기서 전달받은 결과를 yield로 stream에 넣어줘야 할 것이다.
여기까지 stream의 기본적인 부분에 대해서 살펴봤다. 어떻게 사용하느냐에 따라 다양한 방식의 형태가 나올 것이라 모든 케이스를 다룰 수는 없다. 하지만 이외에도 여러 가지 속성, 메서드를 활용할 수 있고 future처럼 에러 처리도 가능하다. (이건 추후에.. 과연..)