Description
Problem
The CompletableObservable
(and CompletableFlowable
) implementors, publish
and replay
in RxJava 1.x and 2.x are inconsistent in their terminal behavior.
When publish
terminates, its CompletableObservable
will appear as fresh to the new subscribers. This has the drawback that such subscribers may hang as connect
may be never called again.
In contrast, replay
will stay terminated along with any cached items and new subscribers can still get those events. The drawback here is that a new connect
will clear the internal storage and start the consumption of the main source while not giving any chance to subscribers to prepare and receive that stream of events from the start if the replay is bounded.
Dealing with this inconsistency currently requires refCount
to trigger a reset on an unofficial channel: casting the CompletableObserver
into Disposable
if possible and disposing it when the count reaches zero again.
Suggested solution
I suggest changing the API to include an explicit reset()
method and changing the logic to have 3 states:
fresh --> connect()
--> running --> onComplete()
/onError()
--> terminated --> reset()
--> fresh
and possibly:
terminated --> connect()
--> running
In the fresh state, consumers can pile up and be ready to receive events. An atomic state change to running will begin streaming events until a terminal event is reached, at which point the state atomically changes to terminated. Consumers subscribing in this state will always receive the terminal event, and in case of replay
, the cached items as well.
A call to reset()
will clear the internal storage of the ConnectableObservable
and start out as fresh again, allowing new consumers to gather around and get all fresh events from the beginning.
It is possible to support the call to connect
in the terminated state to skip the fresh state. Preventing this transition, however, may be more involved as connect()
should communicate this to be illegal transition someway as well as the need for a soft way for checking if connect
is to succeed or not. Note that calling connect
on a running ConnectableObservable
is a no-op in 1.x and 2.x.