Skip to content

3.x: Sync up with 2.2.10 snapshot #6507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,329 changes: 1,329 additions & 0 deletions CHANGES.md

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions docs/What's-different-in-3.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
Table of contents

# Introduction
TBD.

### API signature changes

TBD.

- as() merged into to()
- some operators returning a more appropriate Single or Maybe
- functional interfaces throws widening to Throwable
- standard methods removed
- standard methods signature changes

### Standardized operators

(former experimental and beta operators from 2.x)

TBD.

### Operator behavior changes

TBD.

- connectable sources lifecycle-fixes


### Test support changes

TBD.

- methods removed from the test consumers
2 changes: 2 additions & 0 deletions gradle/javadoc_cleanup.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ task javadocCleanup(dependsOn: "javadoc") doLast {
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/subjects/ReplaySubject.html'));
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/processors/ReplayProcessor.html'));
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/plugins/RxJavaPlugins.html'));

fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/parallel/ParallelFlowable.html'));
}

def fixJavadocFile(file) {
Expand Down
108 changes: 100 additions & 8 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import io.reactivex.subscribers.*;

/**
* The Flowable class that implements the Reactive-Streams Pattern and offers factory methods,
* intermediate operators and the ability to consume reactive dataflows.
* The Flowable class that implements the <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive Streams</a>
* Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.
* <p>
* Reactive-Streams operates with {@code Publisher}s which {@code Flowable} extends. Many operators
* Reactive Streams operates with {@link Publisher}s which {@code Flowable} extends. Many operators
* therefore accept general {@code Publisher}s directly and allow direct interoperation with other
* Reactive-Streams implementations.
* Reactive Streams implementations.
* <p>
* The Flowable hosts the default buffer size of 128 elements for operators, accessible via {@link #bufferSize()},
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
Expand All @@ -51,11 +51,103 @@
* <p>
* <img width="640" height="317" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/legend.png" alt="">
* <p>
* The {@code Flowable} follows the protocol
* <pre><code>
* onSubscribe onNext* (onError | onComplete)?
* </code></pre>
* where the stream can be disposed through the {@link Subscription} instance provided to consumers through
* {@link Subscriber#onSubscribe(Subscription)}.
* Unlike the {@code Observable.subscribe()} of version 1.x, {@link #subscribe(Subscriber)} does not allow external cancellation
* of a subscription and the {@link Subscriber} instance is expected to expose such capability if needed.
* <p>
* Flowables support backpressure and require {@link Subscriber}s to signal demand via {@link Subscription#request(long)}.
* <p>
* Example:
* <pre><code>
* Disposable d = Flowable.just("Hello world!")
* .delay(1, TimeUnit.SECONDS)
* .subscribeWith(new DisposableSubscriber&lt;String&gt;() {
* &#64;Override public void onStart() {
* System.out.println("Start!");
* request(1);
* }
* &#64;Override public void onNext(String t) {
* System.out.println(t);
* request(1);
* }
* &#64;Override public void onError(Throwable t) {
* t.printStackTrace();
* }
* &#64;Override public void onComplete() {
* System.out.println("Done!");
* }
* });
*
* Thread.sleep(500);
* // the sequence can now be cancelled via dispose()
* d.dispose();
* </code></pre>
* <p>
* The Reactive Streams specification is relatively strict when defining interactions between {@code Publisher}s and {@code Subscriber}s, so much so
* that there is a significant performance penalty due certain timing requirements and the need to prepare for invalid
* request amounts via {@link Subscription#request(long)}.
* Therefore, RxJava has introduced the {@link FlowableSubscriber} interface that indicates the consumer can be driven with relaxed rules.
* All RxJava operators are implemented with these relaxed rules in mind.
* If the subscribing {@code Subscriber} does not implement this interface, for example, due to it being from another Reactive Streams compliant
* library, the Flowable will automatically apply a compliance wrapper around it.
* <p>
* {@code Flowable} is an abstract class, but it is not advised to implement sources and custom operators by extending the class directly due
* to the large amounts of <a href="https://github.com/reactive-streams/reactive-streams-jvm#specification">Reactive Streams</a>
* rules to be followed to the letter. See <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">the wiki</a> for
* some guidance if such custom implementations are necessary.
* <p>
* The recommended way of creating custom {@code Flowable}s is by using the {@link #create(FlowableOnSubscribe, BackpressureStrategy)} factory method:
* <pre><code>
* Flowable&lt;String&gt; source = Flowable.create(new FlowableOnSubscribe&lt;String&gt;() {
* &#64;Override
* public void subscribe(FlowableEmitter&lt;String&gt; emitter) throws Exception {
*
* // signal an item
* emitter.onNext("Hello");
*
* // could be some blocking operation
* Thread.sleep(1000);
*
* // the consumer might have cancelled the flow
* if (emitter.isCancelled() {
* return;
* }
*
* emitter.onNext("World");
*
* Thread.sleep(1000);
*
* // the end-of-sequence has to be signaled, otherwise the
* // consumers may never finish
* emitter.onComplete();
* }
* }, BackpressureStrategy.BUFFER);
*
* System.out.println("Subscribe!");
*
* source.subscribe(System.out::println);
*
* System.out.println("Done!");
* </code></pre>
* <p>
* RxJava reactive sources, such as {@code Flowable}, are generally synchronous and sequential in nature. In the ReactiveX design, the location (thread)
* where operators run is <i>orthogonal</i> to when the operators can work with data. This means that asynchrony and parallelism
* has to be explicitly expressed via operators such as {@link #subscribeOn(Scheduler)}, {@link #observeOn(Scheduler)} and {@link #parallel()}. In general,
* operators featuring a {@link Scheduler} parameter are introducing this type of asynchrony into the flow.
* <p>
* For more information see the <a href="http://reactivex.io/documentation/Publisher.html">ReactiveX
* documentation</a>.
*
* @param <T>
* the type of the items emitted by the Flowable
* @see Observable
* @see ParallelFlowable
* @see io.reactivex.subscribers.DisposableSubscriber
*/
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
Expand Down Expand Up @@ -2199,7 +2291,7 @@ public static <T> Flowable<T> fromIterable(Iterable<? extends T> source) {
}

/**
* Converts an arbitrary Reactive-Streams Publisher into a Flowable if not already a
* Converts an arbitrary Reactive Streams Publisher into a Flowable if not already a
* Flowable.
* <p>
* The {@link Publisher} must follow the
Expand Down Expand Up @@ -4385,7 +4477,7 @@ public static Flowable<Long> timer(long delay, TimeUnit unit, Scheduler schedule

/**
* Create a Flowable by wrapping a Publisher <em>which has to be implemented according
* to the Reactive-Streams specification by handling backpressure and
* to the Reactive Streams specification by handling backpressure and
* cancellation correctly; no safeguards are provided by the Flowable itself</em>.
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down Expand Up @@ -13569,7 +13661,7 @@ public final Flowable<T> retryWhen(
* Subscribes to the current Flowable and wraps the given Subscriber into a SafeSubscriber
* (if not already a SafeSubscriber) that
* deals with exceptions thrown by a misbehaving Subscriber (that doesn't follow the
* Reactive-Streams specification).
* Reactive Streams specification).
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator leaves the reactive world and the backpressure behavior depends on the Subscriber's behavior.</dd>
Expand Down Expand Up @@ -14792,7 +14884,7 @@ public final void subscribe(Subscriber<? super T> s) {
* If the {@link Flowable} rejects the subscription attempt or otherwise fails it will signal
* the error via {@link FlowableSubscriber#onError(Throwable)}.
* <p>
* This subscribe method relaxes the following Reactive-Streams rules:
* This subscribe method relaxes the following Reactive Streams rules:
* <ul>
* <li>§1.3: onNext should not be called concurrently until onSubscribe returns.
* <b>FlowableSubscriber.onSubscribe should make sure a sync or async call triggered by request() is safe.</b></li>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* Many operators in the class accept {@code ObservableSource}(s), the base reactive interface
* for such non-backpressured flows, which {@code Observable} itself implements as well.
* <p>
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()},
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()}),
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
* overloads that allow setting their internal buffer size explicitly.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,13 +501,14 @@ public void onComplete() {
buffer = null;
}

queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainMaxLoop(queue, downstream, false, this, this);
if (b != null) {
queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainMaxLoop(queue, downstream, false, this, this);
}
w.dispose();
}

w.dispose();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,12 @@ public void onComplete() {
buffer = null;
}

queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainLoop(queue, downstream, false, this, this);
if (b != null) {
queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainLoop(queue, downstream, false, this, this);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/processors/package-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

/**
* Classes representing so-called hot backpressure-aware sources, aka <strong>processors</strong>,
* that implement the {@link FlowableProcessor} class,
* that implement the {@link io.reactivex.processors.FlowableProcessor FlowableProcessor} class,
* the Reactive Streams {@link org.reactivestreams.Processor Processor} interface
* to allow forms of multicasting events to one or more subscribers as well as consuming another
* Reactive Streams {@link org.reactivestreams.Publisher Publisher}.
Expand All @@ -33,7 +33,7 @@
* </ul>
* <p>
* The non-backpressured variants of the {@code FlowableProcessor} class are called
* {@link io.reactivex.Subject}s and reside in the {@code io.reactivex.subjects} package.
* {@link io.reactivex.subjects.Subject}s and reside in the {@code io.reactivex.subjects} package.
* @see io.reactivex.subjects
*/
package io.reactivex.processors;
10 changes: 5 additions & 5 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public static Scheduler single() {
* a time delay or periodically will use the {@link #single()} scheduler for the timed waiting
* before posting the actual task to the given executor.
* <p>
* Tasks submitted to the {@link Scheduler.Worker} of this {@code Scheduler} are also not interruptible. Use the
* Tasks submitted to the {@link io.reactivex.Scheduler.Worker Scheduler.Worker} of this {@code Scheduler} are also not interruptible. Use the
* {@link #from(Executor, boolean)} overload to enable task interruption via this wrapper.
* <p>
* If the provided executor supports the standard Java {@link ExecutorService} API,
Expand Down Expand Up @@ -332,7 +332,7 @@ public static Scheduler single() {
* }
* </code></pre>
* <p>
* This type of scheduler is less sensitive to leaking {@link Scheduler.Worker} instances, although
* This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker Scheduler.Worker} instances, although
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
* execute those tasks "unexpectedly".
* <p>
Expand All @@ -350,7 +350,7 @@ public static Scheduler from(@NonNull Executor executor) {
* Wraps an {@link Executor} into a new Scheduler instance and delegates {@code schedule()}
* calls to it.
* <p>
* The tasks scheduled by the returned {@link Scheduler} and its {@link Scheduler.Worker}
* The tasks scheduled by the returned {@link Scheduler} and its {@link io.reactivex.Scheduler.Worker Scheduler.Worker}
* can be optionally interrupted.
* <p>
* If the provided executor doesn't support any of the more specific standard Java executor
Expand Down Expand Up @@ -388,14 +388,14 @@ public static Scheduler from(@NonNull Executor executor) {
* }
* </code></pre>
* <p>
* This type of scheduler is less sensitive to leaking {@link Scheduler.Worker} instances, although
* This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker Scheduler.Worker} instances, although
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
* execute those tasks "unexpectedly".
* <p>
* Note that this method returns a new {@link Scheduler} instance, even for the same {@link Executor} instance.
* @param executor
* the executor to wrap
* @param interruptibleWorker if {@code true} the tasks submitted to the {@link Scheduler.Worker} will
* @param interruptibleWorker if {@code true} the tasks submitted to the {@link io.reactivex.Scheduler.Worker Scheduler.Worker} will
* be interrupted when the task is disposed.
* @return the new Scheduler wrapping the Executor
* @since 2.2.6 - experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2769,4 +2769,19 @@ public void timedSizeBufferAlreadyCleared() {

sub.run();
}

@Test
public void bufferExactFailingSupplier() {
Flowable.empty()
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 10, new Callable<List<Object>>() {
@Override
public List<Object> call() throws Exception {
throw new TestException();
}
}, false)
.test()
.awaitDone(1, TimeUnit.SECONDS)
.assertFailure(TestException.class)
;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Action;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subjects.PublishSubject;
import org.junit.Test;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2136,4 +2136,19 @@ public ObservableSource<List<Object>> apply(Observable<Object> o)
}
});
}

@Test
public void bufferExactFailingSupplier() {
Observable.empty()
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 10, new Callable<List<Object>>() {
@Override
public List<Object> call() throws Exception {
throw new TestException();
}
}, false)
.test()
.awaitDone(1, TimeUnit.SECONDS)
.assertFailure(TestException.class)
;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Action;
import io.reactivex.observers.TestObserver;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
Expand Down