RxDart - Using ConnectableObservable Examples

This tutorial explains what is ConnectableObservable in RxDart and how to use it as well as its variations.

ConnectableObservable is a kind of Observable that can be listened to multiple times. While an Observable usually begins to emit items when it is listened to, a ConnectableObservable usually begins to emit items when its .connect() method is called. Because of that behavior, you need to wait until all intended Observers already listen to before calling .connect() method which triggers the start of item emission. But actually there are variations that allow item emission replay to new Observers.

ConnectableObservable broadcasts a single-subscription stream and it waits until all intended Observers to listen to the Observable before items are started to be emitted right after .connect() method is called.

Below are the steps for using ConnectableObservable

  1. First you have to convert from an ordinary Observable using .publish() method which converts it into a ConnectableObservable.
  2. Create some Observers that listen to the Observable.
  3. Call .connect() method.

Look at the code below for example.

  import 'package:rxdart/rxdart.dart';
  
  void main(List<String> arguments) {
    ConnectableObservable<int> numbers = Observable.range(1, 5).publish();
  
    numbers.listen((value) { print('Observer 1: $value'); });
    numbers.listen((value) { print('Observer 2: $value'); });
numbers.connect(); }

Output:

  Observer 1: 1
  Observer 2: 1
Observer 1: 2
Observer 2: 2
Observer 1: 3
Observer 2: 3
Observer 1: 4
Observer 2: 4
Observer 1: 5
Observer 2: 5

Using .autoConnect()

autoConnect converts the ConnectableObersvable into an Observable that automatically connects when there is a first Observer subscribes to it. It only connects at most once. If you need to disconnect, you need to provide a callback for canceling the subscription.

Signature:

  Observable<T> autoConnect({
    void Function(StreamSubscription<T> subscription) connection,
  });

Example:

  Observable<int> numbers = Observable.range(1, 5).publish().autoConnect();

Using refCount

refCount converts the ConnectableObservable into an Observable that stays connected as long as at least one subscription exists.

Signature:

  Observable<T> refCount();

Example:

  Observable<int> numbers = Observable.range(1, 5).publish().refCount(); 

or you can use the shorthand (.share())

  Observable<int> numbers = Observable.range(1, 5).share(); 

ConnectableObservable Variations

PublishConnectableObservable

A ConnectableObservable that converts a single-subscription Stream into a broadcast Stream. This is the one used in the above example.

Observable's methods for creating a PublishConnectableObservable

  ConnectableObservable<T> publish() => PublishConnectableObservable<T>(this);

ReplayConnectableObservable

A ConnectableObservable that converts a single-subscription Stream into a broadcast Stream which replays all emitted items to new listeners. It provides synchronous access to the list of emitted items.

Observable's methods for creating a ReplayConnectableObservable

  ReplayConnectableObservable<T> publishReplay({int maxSize}) =>
      ReplayConnectableObservable<T>(this, maxSize: maxSize);

Example:

  ConnectableObservable<int> numbers = Observable.range(1, 5).publishReplay();

  numbers.listen((value) { print('Observer 1: $value'); });
  numbers.connect();
  await Future.delayed(const Duration(seconds: 1));
  numbers.listen((value) { print('Observer 2: $value'); });

Output:

  Observer 1: 1
  Observer 1: 2
  Observer 1: 3
  Observer 1: 4
  Observer 1: 5
  Observer 2: 1
  Observer 2: 2
  Observer 2: 3
  Observer 2: 4
  Observer 2: 5

ValueConnectableObservable

A ConnectableObservable that converts a single-subscription Stream into a broadcast Stream which replays only the latest value to new listeners. It also provides synchronous access to the latest value.

Observable's methods for creating a ValueConnectableObservable

  ValueConnectableObservable<T> publishValue() =>
        ValueConnectableObservable<T>(this);

  ValueConnectableObservable<T> publishValueSeeded(T seedValue) =>
      ValueConnectableObservable<T>.seeded(this, seedValue);

Example:

  ConnectableObservable<int> numbers = Observable.range(1, 5).publishValue();

  numbers.listen((value) { print('Observer 1: $value'); });
  numbers.connect();
  await Future.delayed(const Duration(seconds: 1));
  numbers.listen((value) { print('Observer 2: $value'); });

Output:

  Observer 1: 1
  Observer 1: 2
  Observer 1: 3
  Observer 1: 4
  Observer 1: 5
  Observer 2: 5

The second example uses seed value.

Example:

  ConnectableObservable<int> numbers = Observable.range(1, 5).publishValueSeeded(0);

  numbers.listen((value) { print('Observer 1: $value'); });
  numbers.connect();
  await Future.delayed(const Duration(seconds: 1));
  numbers.listen((value) { print('Observer 2: $value'); });

Output:

  Observer 1: 0
  Observer 1: 1
  Observer 1: 2
  Observer 1: 3
  Observer 1: 4
  Observer 1: 5
  Observer 2: 5