Description
Issue found in version 2.2.15.
The Zip, CombineLatest, and Amb operators of the Observable
class each have an overload that accepts an Iterable<ObservableSource>
and a Function<Array<Any>, R>
where R
is a type parameter. The problem is that the elements of the Iterable
are copied into an Observable[]
internally by the Observer
classes implementing the operators. This causes an ArrayStoreException
if one supplies any custom ObservableSource
implementation that doesn't subclass Observable
.
The equivalent functions for the other stream types don't have this restriction. I.e, the Zip implementation in Maybe
copies the source values into a MaybeSource[]
and avoids this problem.
Example kotlin code which reproduces the issue:
import io.reactivex.Observable
import io.reactivex.ObservableSource
val sourceA = ObservableSource<String> { Observable.just("A").subscribe(it) }
val sourceB = ObservableSource<String> { Observable.just("b", "B").subscribe(it) }
val combined =
Observable.combineLatest(
listOf(sourceA, sourceB),
{ (it[0] as String) + (it[1] as String) }
)
fun main() {
combined.subscribe(::println)
}
A simple workaround is to wrap any ObservableSource
. To fix the above example pass listOf(sourceA, sourceB).map { Observable.wrap(it) }
to combineLatest
.