Skip to content

Commit 2bed8c1

Browse files
authored
3.x: Sync up with 2.2.10 snapshot (#6507)
* 3.x: Sync up with 2.2.10 snapshot * Add the Flowable javadoc changes too.
1 parent b95e3dc commit 2bed8c1

File tree

13 files changed

+1515
-29
lines changed

13 files changed

+1515
-29
lines changed

CHANGES.md

Lines changed: 1329 additions & 0 deletions
Large diffs are not rendered by default.

docs/What's-different-in-3.0.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
Table of contents
2+
3+
# Introduction
4+
TBD.
5+
6+
### API signature changes
7+
8+
TBD.
9+
10+
- as() merged into to()
11+
- some operators returning a more appropriate Single or Maybe
12+
- functional interfaces throws widening to Throwable
13+
- standard methods removed
14+
- standard methods signature changes
15+
16+
### Standardized operators
17+
18+
(former experimental and beta operators from 2.x)
19+
20+
TBD.
21+
22+
### Operator behavior changes
23+
24+
TBD.
25+
26+
- connectable sources lifecycle-fixes
27+
28+
29+
### Test support changes
30+
31+
TBD.
32+
33+
- methods removed from the test consumers

gradle/javadoc_cleanup.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ task javadocCleanup(dependsOn: "javadoc") doLast {
1212
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/subjects/ReplaySubject.html'));
1313
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/processors/ReplayProcessor.html'));
1414
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/plugins/RxJavaPlugins.html'));
15+
16+
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/parallel/ParallelFlowable.html'));
1517
}
1618

1719
def fixJavadocFile(file) {

src/main/java/io/reactivex/Flowable.java

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@
3636
import io.reactivex.subscribers.*;
3737

3838
/**
39-
* The Flowable class that implements the Reactive-Streams Pattern and offers factory methods,
40-
* intermediate operators and the ability to consume reactive dataflows.
39+
* The Flowable class that implements the <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive Streams</a>
40+
* Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.
4141
* <p>
42-
* Reactive-Streams operates with {@code Publisher}s which {@code Flowable} extends. Many operators
42+
* Reactive Streams operates with {@link Publisher}s which {@code Flowable} extends. Many operators
4343
* therefore accept general {@code Publisher}s directly and allow direct interoperation with other
44-
* Reactive-Streams implementations.
44+
* Reactive Streams implementations.
4545
* <p>
4646
* The Flowable hosts the default buffer size of 128 elements for operators, accessible via {@link #bufferSize()},
4747
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
@@ -51,11 +51,103 @@
5151
* <p>
5252
* <img width="640" height="317" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/legend.png" alt="">
5353
* <p>
54+
* The {@code Flowable} follows the protocol
55+
* <pre><code>
56+
* onSubscribe onNext* (onError | onComplete)?
57+
* </code></pre>
58+
* where the stream can be disposed through the {@link Subscription} instance provided to consumers through
59+
* {@link Subscriber#onSubscribe(Subscription)}.
60+
* Unlike the {@code Observable.subscribe()} of version 1.x, {@link #subscribe(Subscriber)} does not allow external cancellation
61+
* of a subscription and the {@link Subscriber} instance is expected to expose such capability if needed.
62+
* <p>
63+
* Flowables support backpressure and require {@link Subscriber}s to signal demand via {@link Subscription#request(long)}.
64+
* <p>
65+
* Example:
66+
* <pre><code>
67+
* Disposable d = Flowable.just("Hello world!")
68+
* .delay(1, TimeUnit.SECONDS)
69+
* .subscribeWith(new DisposableSubscriber&lt;String&gt;() {
70+
* &#64;Override public void onStart() {
71+
* System.out.println("Start!");
72+
* request(1);
73+
* }
74+
* &#64;Override public void onNext(String t) {
75+
* System.out.println(t);
76+
* request(1);
77+
* }
78+
* &#64;Override public void onError(Throwable t) {
79+
* t.printStackTrace();
80+
* }
81+
* &#64;Override public void onComplete() {
82+
* System.out.println("Done!");
83+
* }
84+
* });
85+
*
86+
* Thread.sleep(500);
87+
* // the sequence can now be cancelled via dispose()
88+
* d.dispose();
89+
* </code></pre>
90+
* <p>
91+
* The Reactive Streams specification is relatively strict when defining interactions between {@code Publisher}s and {@code Subscriber}s, so much so
92+
* that there is a significant performance penalty due certain timing requirements and the need to prepare for invalid
93+
* request amounts via {@link Subscription#request(long)}.
94+
* Therefore, RxJava has introduced the {@link FlowableSubscriber} interface that indicates the consumer can be driven with relaxed rules.
95+
* All RxJava operators are implemented with these relaxed rules in mind.
96+
* If the subscribing {@code Subscriber} does not implement this interface, for example, due to it being from another Reactive Streams compliant
97+
* library, the Flowable will automatically apply a compliance wrapper around it.
98+
* <p>
99+
* {@code Flowable} is an abstract class, but it is not advised to implement sources and custom operators by extending the class directly due
100+
* to the large amounts of <a href="https://github.com/reactive-streams/reactive-streams-jvm#specification">Reactive Streams</a>
101+
* rules to be followed to the letter. See <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">the wiki</a> for
102+
* some guidance if such custom implementations are necessary.
103+
* <p>
104+
* The recommended way of creating custom {@code Flowable}s is by using the {@link #create(FlowableOnSubscribe, BackpressureStrategy)} factory method:
105+
* <pre><code>
106+
* Flowable&lt;String&gt; source = Flowable.create(new FlowableOnSubscribe&lt;String&gt;() {
107+
* &#64;Override
108+
* public void subscribe(FlowableEmitter&lt;String&gt; emitter) throws Exception {
109+
*
110+
* // signal an item
111+
* emitter.onNext("Hello");
112+
*
113+
* // could be some blocking operation
114+
* Thread.sleep(1000);
115+
*
116+
* // the consumer might have cancelled the flow
117+
* if (emitter.isCancelled() {
118+
* return;
119+
* }
120+
*
121+
* emitter.onNext("World");
122+
*
123+
* Thread.sleep(1000);
124+
*
125+
* // the end-of-sequence has to be signaled, otherwise the
126+
* // consumers may never finish
127+
* emitter.onComplete();
128+
* }
129+
* }, BackpressureStrategy.BUFFER);
130+
*
131+
* System.out.println("Subscribe!");
132+
*
133+
* source.subscribe(System.out::println);
134+
*
135+
* System.out.println("Done!");
136+
* </code></pre>
137+
* <p>
138+
* RxJava reactive sources, such as {@code Flowable}, are generally synchronous and sequential in nature. In the ReactiveX design, the location (thread)
139+
* where operators run is <i>orthogonal</i> to when the operators can work with data. This means that asynchrony and parallelism
140+
* has to be explicitly expressed via operators such as {@link #subscribeOn(Scheduler)}, {@link #observeOn(Scheduler)} and {@link #parallel()}. In general,
141+
* operators featuring a {@link Scheduler} parameter are introducing this type of asynchrony into the flow.
142+
* <p>
54143
* For more information see the <a href="http://reactivex.io/documentation/Publisher.html">ReactiveX
55144
* documentation</a>.
56145
*
57146
* @param <T>
58147
* the type of the items emitted by the Flowable
148+
* @see Observable
149+
* @see ParallelFlowable
150+
* @see io.reactivex.subscribers.DisposableSubscriber
59151
*/
60152
public abstract class Flowable<T> implements Publisher<T> {
61153
/** The default buffer size. */
@@ -2199,7 +2291,7 @@ public static <T> Flowable<T> fromIterable(Iterable<? extends T> source) {
21992291
}
22002292

22012293
/**
2202-
* Converts an arbitrary Reactive-Streams Publisher into a Flowable if not already a
2294+
* Converts an arbitrary Reactive Streams Publisher into a Flowable if not already a
22032295
* Flowable.
22042296
* <p>
22052297
* The {@link Publisher} must follow the
@@ -4385,7 +4477,7 @@ public static Flowable<Long> timer(long delay, TimeUnit unit, Scheduler schedule
43854477

43864478
/**
43874479
* Create a Flowable by wrapping a Publisher <em>which has to be implemented according
4388-
* to the Reactive-Streams specification by handling backpressure and
4480+
* to the Reactive Streams specification by handling backpressure and
43894481
* cancellation correctly; no safeguards are provided by the Flowable itself</em>.
43904482
* <dl>
43914483
* <dt><b>Backpressure:</b></dt>
@@ -13569,7 +13661,7 @@ public final Flowable<T> retryWhen(
1356913661
* Subscribes to the current Flowable and wraps the given Subscriber into a SafeSubscriber
1357013662
* (if not already a SafeSubscriber) that
1357113663
* deals with exceptions thrown by a misbehaving Subscriber (that doesn't follow the
13572-
* Reactive-Streams specification).
13664+
* Reactive Streams specification).
1357313665
* <dl>
1357413666
* <dt><b>Backpressure:</b></dt>
1357513667
* <dd>This operator leaves the reactive world and the backpressure behavior depends on the Subscriber's behavior.</dd>
@@ -14792,7 +14884,7 @@ public final void subscribe(Subscriber<? super T> s) {
1479214884
* If the {@link Flowable} rejects the subscription attempt or otherwise fails it will signal
1479314885
* the error via {@link FlowableSubscriber#onError(Throwable)}.
1479414886
* <p>
14795-
* This subscribe method relaxes the following Reactive-Streams rules:
14887+
* This subscribe method relaxes the following Reactive Streams rules:
1479614888
* <ul>
1479714889
* <li>§1.3: onNext should not be called concurrently until onSubscribe returns.
1479814890
* <b>FlowableSubscriber.onSubscribe should make sure a sync or async call triggered by request() is safe.</b></li>

src/main/java/io/reactivex/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* Many operators in the class accept {@code ObservableSource}(s), the base reactive interface
4343
* for such non-backpressured flows, which {@code Observable} itself implements as well.
4444
* <p>
45-
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()},
45+
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()}),
4646
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
4747
* overloads that allow setting their internal buffer size explicitly.
4848
* <p>

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -501,13 +501,14 @@ public void onComplete() {
501501
buffer = null;
502502
}
503503

504-
queue.offer(b);
505-
done = true;
506-
if (enter()) {
507-
QueueDrainHelper.drainMaxLoop(queue, downstream, false, this, this);
504+
if (b != null) {
505+
queue.offer(b);
506+
done = true;
507+
if (enter()) {
508+
QueueDrainHelper.drainMaxLoop(queue, downstream, false, this, this);
509+
}
510+
w.dispose();
508511
}
509-
510-
w.dispose();
511512
}
512513

513514
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableBufferTimed.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -504,10 +504,12 @@ public void onComplete() {
504504
buffer = null;
505505
}
506506

507-
queue.offer(b);
508-
done = true;
509-
if (enter()) {
510-
QueueDrainHelper.drainLoop(queue, downstream, false, this, this);
507+
if (b != null) {
508+
queue.offer(b);
509+
done = true;
510+
if (enter()) {
511+
QueueDrainHelper.drainLoop(queue, downstream, false, this, this);
512+
}
511513
}
512514
}
513515

src/main/java/io/reactivex/processors/package-info.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
/**
1818
* Classes representing so-called hot backpressure-aware sources, aka <strong>processors</strong>,
19-
* that implement the {@link FlowableProcessor} class,
19+
* that implement the {@link io.reactivex.processors.FlowableProcessor FlowableProcessor} class,
2020
* the Reactive Streams {@link org.reactivestreams.Processor Processor} interface
2121
* to allow forms of multicasting events to one or more subscribers as well as consuming another
2222
* Reactive Streams {@link org.reactivestreams.Publisher Publisher}.
@@ -33,7 +33,7 @@
3333
* </ul>
3434
* <p>
3535
* The non-backpressured variants of the {@code FlowableProcessor} class are called
36-
* {@link io.reactivex.Subject}s and reside in the {@code io.reactivex.subjects} package.
36+
* {@link io.reactivex.subjects.Subject}s and reside in the {@code io.reactivex.subjects} package.
3737
* @see io.reactivex.subjects
3838
*/
3939
package io.reactivex.processors;

src/main/java/io/reactivex/schedulers/Schedulers.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ public static Scheduler single() {
299299
* a time delay or periodically will use the {@link #single()} scheduler for the timed waiting
300300
* before posting the actual task to the given executor.
301301
* <p>
302-
* Tasks submitted to the {@link Scheduler.Worker} of this {@code Scheduler} are also not interruptible. Use the
302+
* Tasks submitted to the {@link io.reactivex.Scheduler.Worker Scheduler.Worker} of this {@code Scheduler} are also not interruptible. Use the
303303
* {@link #from(Executor, boolean)} overload to enable task interruption via this wrapper.
304304
* <p>
305305
* If the provided executor supports the standard Java {@link ExecutorService} API,
@@ -332,7 +332,7 @@ public static Scheduler single() {
332332
* }
333333
* </code></pre>
334334
* <p>
335-
* This type of scheduler is less sensitive to leaking {@link Scheduler.Worker} instances, although
335+
* This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker Scheduler.Worker} instances, although
336336
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
337337
* execute those tasks "unexpectedly".
338338
* <p>
@@ -350,7 +350,7 @@ public static Scheduler from(@NonNull Executor executor) {
350350
* Wraps an {@link Executor} into a new Scheduler instance and delegates {@code schedule()}
351351
* calls to it.
352352
* <p>
353-
* The tasks scheduled by the returned {@link Scheduler} and its {@link Scheduler.Worker}
353+
* The tasks scheduled by the returned {@link Scheduler} and its {@link io.reactivex.Scheduler.Worker Scheduler.Worker}
354354
* can be optionally interrupted.
355355
* <p>
356356
* If the provided executor doesn't support any of the more specific standard Java executor
@@ -388,14 +388,14 @@ public static Scheduler from(@NonNull Executor executor) {
388388
* }
389389
* </code></pre>
390390
* <p>
391-
* This type of scheduler is less sensitive to leaking {@link Scheduler.Worker} instances, although
391+
* This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker Scheduler.Worker} instances, although
392392
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
393393
* execute those tasks "unexpectedly".
394394
* <p>
395395
* Note that this method returns a new {@link Scheduler} instance, even for the same {@link Executor} instance.
396396
* @param executor
397397
* the executor to wrap
398-
* @param interruptibleWorker if {@code true} the tasks submitted to the {@link Scheduler.Worker} will
398+
* @param interruptibleWorker if {@code true} the tasks submitted to the {@link io.reactivex.Scheduler.Worker Scheduler.Worker} will
399399
* be interrupted when the task is disposed.
400400
* @return the new Scheduler wrapping the Executor
401401
* @since 2.2.6 - experimental

src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2769,4 +2769,19 @@ public void timedSizeBufferAlreadyCleared() {
27692769

27702770
sub.run();
27712771
}
2772+
2773+
@Test
2774+
public void bufferExactFailingSupplier() {
2775+
Flowable.empty()
2776+
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 10, new Callable<List<Object>>() {
2777+
@Override
2778+
public List<Object> call() throws Exception {
2779+
throw new TestException();
2780+
}
2781+
}, false)
2782+
.test()
2783+
.awaitDone(1, TimeUnit.SECONDS)
2784+
.assertFailure(TestException.class)
2785+
;
2786+
}
27722787
}

0 commit comments

Comments
 (0)