RxDart - Different Ways to Create Observables

This tutorial shows you several ways to create Observable in RxDart.

Just like ReactiveX implementation in other programming languages, we have to create an Observable. It acts as event source that will be executed when a subscriber subcribes to it. Below are different ways to create an Observable if you are using RxDart.

Observable.empty

It is used to create an Observable with no values, which means the stream emits no items.

Example:

  Observable.empty()
      .listen(print, onDone: () => print('Done'));

Output:

  Done

Observable.just

It creates an Observable with a single value. Even if the passed value is an Iterable such as an array, it will be treated as one item.

Example:

  Observable.just([1, 2, 3])
      .listen(print, onDone: () => print('Done'));

Output:

  [1, 2, 3]
  Done

Observable.range

It creates an Observable emitting a sequence of integers based on the given ranges. The first parameter is the start, while the second is the end, both are inclusive. If the second number is greater than the first one, it will create a backward sequence.

Example:

  Observable.range(2,5)
      .listen(print, onDone: () => print('Done'));

Output:

  2
  3
  4
  5
  Done

Observable.fromIterable

It creates an Observable from an Iterable. If the listener cancels the subscription, the iteration will stop. The stream will end immediately if an error occurs while iterating the data.

Example:

  Observable.fromIterable([1, 2, 3])
      .listen(print, onDone: () => print('Done'));

Output:

  1
  2
  3
  Done

Example with error:

  Observable.fromIterable([1, 2, 3, 4])
      .map((v) {
        if (v < 3) {
          return v;
        } else {
          throw Observable.error("An error");
        }
      })
      .listen(print, onDone: () => print('Done'));

Output:

  1
  2
  Unhandled exception:
  Instance of 'Observable'
  #0      main. (file:///home/ivan/code/coba-dart/src/rxdart-create-observable.dart:22:11)
  #1      _MapStream._handleData (dart:async/stream_pipe.dart:227:21)
  #2      _ForwardingStreamSubscription._handleData (dart:async/stream_pipe.dart:164:13)
  #3      _RootZone.runUnaryGuarded (dart:async/zone.dart:1314:10)
  #4      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:336:11)
  #5      _IterablePendingEvents.handleNext (dart:async/stream_impl.dart:537:18)
  #6      _PendingEvents.schedule. (dart:async/stream_impl.dart:667:7)
  #7      _microtaskLoop (dart:async/schedule_microtask.dart:41:21)
  #8      _startMicrotaskLoop (dart:async/schedule_microtask.dart:50:5)
  #9      _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:116:13)
  #10     _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:173:5)

In the above example, the first and second element have been received by the listener before the error thrown. As you can see`Done` is not printed.

Observable.fromFuture

It creates an Observable from a Future. It waits for the Future to complete at which time an event (data or error) will be emitted by the stream.

Example:

  Future<List> numbers = Future.value([1, 2, 3]);
  Observable.fromFuture(numbers)
      .listen(print, onDone: () => print('Done'));

Output:

  [1, 2, 3]
  Done

Observable.never

It is used to create a non-terminating Observable sequence. This is useful for creating an infinite duration. You can use it for testing purposes or combining it with other Observables as parameters.

Example:

  Observable.never()
      .listen(print, onDone: () => print('Done'));

Output:

  // 'Done' is never printed

That's how to create Observables in RxDart. After creating Observables, another thing you need to know is how to combine the result of multiple Observables.