RxDart - Combining Multiple Observables

This tutorial shows you how to combine multiple Observables in RxDart.

Having multiple Observables running at the same time makes it possible to run tasks simultaneously with ease. If you have some Observables and you need to combine the results, there are some methods you can use. You should choose the method depending on the expected behavior. This page explains the methods you can use for combining Observables along with the examples.

For this tutorial, we are going to use the following Observables which are created using fromIterable factory method. But you can also create the Observables in other ways.

  Observable<String> o1 = Observable.fromIterable(['a1', 'b1', 'c1', 'd1']);
  Observable<String> o2 = Observable.fromIterable(['a2', 'b2', 'c2']);
  Observable<String> o3 = Observable.fromIterable(['a3', 'b3', 'c3', 'd3']);

Using concat, concatWith

To concatenate multiple streams, you can use concat factory method. It works by waiting for a stream to emit all items before subscribing to the next one. You need to pass the streams (Iterable<Stream<T>> streams) to be combined as the only argument.

  Observable.concat([o1, o2, o3])
      .listen(print);

Output:

  a1
  b1
  c1
  d1
  a2
  b2
  c2
  a3
  b3
  c3
  d3

You can also concatenate an Observable instance with multiple streams. Below is the example for the instance method concatWith.

  o1.concatWith([o2, o3])
      .listen(print);

The output should be the same as the output of using factory method.

Using merge, mergeWith

The factory method merge flattens the items from multiple streams into one Observable. It accepts Iterable<Stream<T>> as the parameter. Unlike concat, it doesn't wait for one stream to finish before subscribing to the next one. It subscribes to multiple streams at the same time and the order of items depends on the emitted time.

To make it easier to understand, the streams on the below example add delay between element emissions. Example:

  Observable<String> o1 = Observable.fromIterable(['a1', 'b1', 'c1', 'd1'])
      .concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 50)));
  Observable<String> o2 = Observable.fromIterable(['a2', 'b2', 'c2'])
      .concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 80)));
  Observable<String> o3 = Observable.fromIterable(['a3', 'b3', 'c3', 'd3'])
      .concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 190)));
  Observable.merge([o1, o2, o3])
      .listen(print);

Output:

  a1
  a2
  b1
  b2
  c1
  a3
  d1
  c2
  b3
  c3
  d3

At 50ms, a1 is emitted. But before b1 emitted at 100ms, a2 is emitted at 80ms. Therefore, a2 printed before b1.

The method is also available for instance as mergeWith whose parameter is an Iterable<Stream<T>> as well.

  o1.mergeWith([o2, o3])
      .listen(print)

Using zip, zipWith

zip is used for merging multiple streams into one by using zipper function when every stream has produced the element at a certain index. So, in order for the zipper function to produce element at index i, all the sequences must have produced the element at index i. That means the number of produced elements is the number of elements produced by the stream with least elements.

To use zip, pass Iterable<Stream<T>> streams as the first argument and a zipper function as the second argument. The zipper function itself has one parameter (List<T> values) which is the values from all streams at a certain index as a List.

  Observable.zip(
      [
        o1,
        o2,
        o3,
      ],
      (values) => values
          .map((value) => value.toString())
          .reduce((acc, value) => acc + value),
  )
      .listen(print);

Output:

  a1a2a3
  b1b2b3
  c1c2c3

You can also use instance method zipWith which only accepts a stream (Stream<S>)as the first argument. The zipper function is a bit different as it requires two parameters. The first one is the value from the Observable where zipWith is applied on, while the other is the value from the stream passed as the first argument of zipWith.

  o1.zipWith(o2, (x, y) => x + y)
      .listen(print);
  a1a2
  b1b2
  c1c2

Other variants of zip have number suffix, such as zip2, zip3, ..., up to zip9. The number represents the number of streams to be combined. zip2 is for combining two streams, zip3 is for combining three streams, and so on. Each method has different number of parameters, including the number of parameters for the zipper function, as you can see on the below examples.

  Observable.zip2(o1, o2, (v1, v2) => v1 + v2)
      .listen(print);

Output:

  a1a2
  b1b2
  c1c2
  Observable.zip3(o1, o2, o3, (v1, v2, v3) => v1 + v2 + v3)
      .listen(print);

Output:

  a1a2a3
  b1b2b3
  c1c2c3

Using combineLatest

This is a factory method that merges multiple streams into an Observable every time one of the streams emits an item. The first parameter is the streams (Iterable<Stream<T>> streams). The second parameter is a combiner function which takes one argument (List<T> values). To make it easier to understand, see the below example.

  Observable<String> o1 = Observable.fromIterable(['a1', 'b1', 'c1', 'd1'])
      .concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 50)));
  Observable<String> o2 = Observable.fromIterable(['a2', 'b2', 'c2'])
      .concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 80)));
  Observable<String> o3 = Observable.fromIterable(['a3', 'b3', 'c3', 'd3'])
      .concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 190)));

  Observable.combineLatest([o1, o2, o3], (values) => values.join())
      .listen(print);

Output:

  c1b2a3
  d1b2a3
  d1c2a3
  d1c2b3
  d1c2c3
  d1c2d3

To emit the first combination, it has to wait until all of the streams emit at least one element. In the above case, it happens at 190ms. At that time, the latest element emitted by o1, o2, and o3 are c1, b2, and a3 respectively. At 200ms, o1 emits d1. At 240ms, o2 emits c2 and so on.

Using startWith, startWithMany

startWith is used to prepend a value at the beginning of the source, while startWithMany is for prepending multiple values. Both are instance methods which accepts one parameter, a value (T startValue) or an array of values (List<T> startValues) respectively.

  o1.startWith('start1')
      .listen(print);

Output:

  start1
  a1
  b1
  c1
  d1
  o1.startWithMany(['start1-1', 'start1-2'])
      .listen(print);

Output:

  start1-1
  start1-2
  a1
  b1
  c1
  d1

That's how to combine Observables in RxDart.