Show Buttons
Share On Facebook
Share On Twitter
Share On Google Plus
Share On Linkdin
Share On Reddit
Share On Stumbleupon
Contact us
Hide Buttons

Rxjs Observable publish refcount vs share

The two com­mon ways to share data com­ing from an observ­able is via the publish().refCount() and share() meth­ods. At first glance, they might seem sim­i­lar but there one dif­fer­ence between the two tech­niques that may sig­nif­i­cantly impact your deci­sion to choose one of them.

With observable.publish().refCount(), once the observer com­pletes, it will not restart if a new sub­scriber is added after com­ple­tion. Whereas with observable.share(), if the under­ly­ing observer com­pletes and a new sub­scriber is added later, the observer effec­tively begins a new exe­cu­tion and starts emit­ting data.

Lets see some exam­ples to under­stand what that actu­ally means.

There are 2 sce­nar­ios to con­sider when deal­ing with shared observables.

Sce­nario 1

Late sub­scrip­tion after under­ly­ing observer com­pletes, but still has active subscribers

// Create a source that emits random values until 300 milliseconds
var source$ = Rx.Observable.interval(100).take(3).map(index => `${index}-${Math.floor(50 + Math.random() * 100)}` );

// Create a refcounted observable
var refcounted$ = source$.publish().refCount();

refcounted$.subscribe(x => console.log('A' + x));

// Simulate a late subscriber
setTimeout(() => {
  // By the time B subscribes, the source$ has already completed
  refcounted$.subscribe(x => console.log('B' + x));
}, 400);

//---- Output ----
A0-124
A1-123
A2-51
// Create a source that emits random values until 300 milliseconds
var source$ = Rx.Observable.interval(100).take(3).map(index => `${index}-${Math.floor(50 + Math.random() * 100)}` );

// Create a shared stream
var shared$ = source$.share();

shared$.subscribe(x => console.log('A' + x));

// Simulate a subscriber that arrives after a delay
setTimeout(() => {
  // By this time the underlying source has completed but
  // since it is shared, it will restart execution when this
  // subscriber arrives
  shared$.subscribe(x => console.log('B' + x));
}, 500);

// ---- Output ----
A0-122
A1-100
A2-89
B0-64
B1-51
B2-112

As you can see from the exam­ples, B receives a whole new set of val­ues. The dif­fer­ence is sub­tle, yet impor­tant. The rea­son for this dif­fer­ence is that observable.publish().refCount() uses only one under­ly­ing sub­ject. So if the source observer com­pletes, future sub­scribers will receive no data because a com­pleted sub­ject can­not restart. Whereas observable.share() uses a sub­ject fac­tory. So if the under­ly­ing observ­able com­pletes, a new sub­ject is cre­ated which resub­scribes to the under­ly­ing source, there­fore future sub­scribers will receive data via a new exe­cu­tion of the source.

Sce­nario 2

Late sub­scrip­tion after under­ly­ing observer com­pletes, but has no active subscribers

// Create a source that emits random values until 300 milliseconds
var source$ = Rx.Observable.interval(100).take(3).map(index => `${index}-${Math.floor(50 + Math.random() * 100)}` );

// Create a refcounted stream
var refcounted$ = source$.publish().refCount();

var subscription = refcounted$.subscribe(x => console.log('A' + x));
setTimeout(() => {
  // This effectively makes the subscriber count to 0.
  subscription.unsubscribe();
}, 150);

// Simulate a subscriber that arrives after a delay
setTimeout(() => {
  // The underlying source had a subscriber count of 0 at this time
  // but it was still not complete when B subscribed
  refcounted$.subscribe(x => console.log('B' + x));
}, 250);

// ---- Output ----
A0-120
B0-65
B1-143
B2-63

Notice that in the above case, even though the sub­scriber B joined late, only a few mil­lisec­onds before the source could com­plete, the source was re-executed and B received a whole new set of ran­dom val­ues. The same holds hap­pens even for shared() observables.

Con­clu­sion

  • A ref­counted observ­able will not trig­ger source exe­cu­tion once com­pleted for late subscribers.
  • A shared observ­able will re-execute the source after com­ple­tion for a late subscriber.
  • Both shared or ref­counted observ­ables re-execute the source if the sub­scriber count reaches 0 but a new sub­scriber arrives before the source gets a chance to complete.

Ref­er­ences

http://reactivex.io/documentation/operators/publish.html
https://github.com/ReactiveX/RxJS/issues/1363
https://github.com/ReactiveX/rxjs/issues/453#issuecomment-153806139

Ryan Sukale

Ryan is just a regular guy next door trying to manage his life and finances.

You may also like...