Skip to content

3.x: ConnectableObservable redesign #5628

Closed
@akarnokd

Description

@akarnokd

Problem

The CompletableObservable (and CompletableFlowable) implementors, publish and replay in RxJava 1.x and 2.x are inconsistent in their terminal behavior.

When publish terminates, its CompletableObservable will appear as fresh to the new subscribers. This has the drawback that such subscribers may hang as connect may be never called again.

In contrast, replay will stay terminated along with any cached items and new subscribers can still get those events. The drawback here is that a new connect will clear the internal storage and start the consumption of the main source while not giving any chance to subscribers to prepare and receive that stream of events from the start if the replay is bounded.

Dealing with this inconsistency currently requires refCount to trigger a reset on an unofficial channel: casting the CompletableObserver into Disposable if possible and disposing it when the count reaches zero again.

Suggested solution

I suggest changing the API to include an explicit reset() method and changing the logic to have 3 states:

fresh --> connect() --> running --> onComplete()/onError() --> terminated --> reset() --> fresh

and possibly:

terminated --> connect() --> running

In the fresh state, consumers can pile up and be ready to receive events. An atomic state change to running will begin streaming events until a terminal event is reached, at which point the state atomically changes to terminated. Consumers subscribing in this state will always receive the terminal event, and in case of replay, the cached items as well.

A call to reset() will clear the internal storage of the ConnectableObservable and start out as fresh again, allowing new consumers to gather around and get all fresh events from the beginning.

It is possible to support the call to connect in the terminated state to skip the fresh state. Preventing this transition, however, may be more involved as connect() should communicate this to be illegal transition someway as well as the need for a soft way for checking if connect is to succeed or not. Note that calling connect on a running ConnectableObservable is a no-op in 1.x and 2.x.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions