diff --git a/extras/pom.xml b/extras/pom.xml index 332f93c4bb..fe8d413f16 100644 --- a/extras/pom.xml +++ b/extras/pom.xml @@ -17,6 +17,7 @@ jdeferred registry rxjava + rxjava2 simple diff --git a/extras/rxjava2/pom.xml b/extras/rxjava2/pom.xml new file mode 100644 index 0000000000..1e1025f5e9 --- /dev/null +++ b/extras/rxjava2/pom.xml @@ -0,0 +1,18 @@ + + 4.0.0 + + async-http-client-extras-parent + org.asynchttpclient + 2.1.0-SNAPSHOT + + async-http-client-extras-rxjava2 + Asynchronous Http Client RxJava2 Extras + The Async Http Client RxJava2 Extras. + + + io.reactivex.rxjava2 + rxjava + 2.0.8 + + + diff --git a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/DefaultRxHttpClient.java b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/DefaultRxHttpClient.java new file mode 100644 index 0000000000..9cf1459e48 --- /dev/null +++ b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/DefaultRxHttpClient.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.extras.rxjava2; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.Future; +import java.util.function.Supplier; + +import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.Request; +import org.asynchttpclient.extras.rxjava2.maybe.MaybeAsyncHandlerBridge; +import org.asynchttpclient.extras.rxjava2.maybe.ProgressAsyncMaybeEmitterBridge; +import org.asynchttpclient.handler.ProgressAsyncHandler; + +import io.reactivex.Maybe; +import io.reactivex.MaybeEmitter; +import io.reactivex.disposables.Disposables; + +/** + * Straight forward default implementation of the {@code RxHttpClient} interface. + */ +public class DefaultRxHttpClient implements RxHttpClient { + + private final AsyncHttpClient asyncHttpClient; + + /** + * Returns a new {@code DefaultRxHttpClient} instance that uses the given {@code asyncHttpClient} under the hoods. + * + * @param asyncHttpClient + * the Async HTTP Client instance to be used + * + * @return a new {@code RxHttpClient} instance + * + * @throws NullPointerException + * if {@code asyncHttpClient} is {@code null} + */ + public DefaultRxHttpClient(AsyncHttpClient asyncHttpClient) { + this.asyncHttpClient = requireNonNull(asyncHttpClient); + } + + @Override + public Maybe prepare(Request request, Supplier> handlerSupplier) { + requireNonNull(request); + requireNonNull(handlerSupplier); + + return Maybe.create(emitter -> { + final AsyncHandler bridge = createBridge(emitter, handlerSupplier.get()); + final Future responseFuture = asyncHttpClient.executeRequest(request, bridge); + emitter.setDisposable(Disposables.fromFuture(responseFuture)); + }); + } + + /** + * Creates an {@code AsyncHandler} that bridges events from the given {@code handler} to the given {@code emitter} + * and cancellation/disposal in the other direction. + * + * @param + * the result type produced by {@code handler} and emitted by {@code emitter} + * + * @param emitter + * the RxJava emitter instance that receives results upon completion and will be queried for disposal + * during event processing + * @param handler + * the {@code AsyncHandler} instance that receives downstream events and produces the result that will be + * emitted upon request completion + * + * @return the bridge handler + */ + protected AsyncHandler createBridge(MaybeEmitter emitter, AsyncHandler handler) { + if (handler instanceof ProgressAsyncHandler) { + return new ProgressAsyncMaybeEmitterBridge<>(emitter, (ProgressAsyncHandler) handler); + } + + return new MaybeAsyncHandlerBridge<>(emitter, handler); + } +} diff --git a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/DisposedException.java b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/DisposedException.java new file mode 100644 index 0000000000..8113d12e8b --- /dev/null +++ b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/DisposedException.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.extras.rxjava2; + +import java.util.concurrent.CancellationException; + +/** + * Indicates that the HTTP request has been disposed asynchronously via RxJava. + */ +public class DisposedException extends CancellationException { + private static final long serialVersionUID = -5885577182105850384L; + + public DisposedException(String message) { + super(message); + } +} diff --git a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/RxHttpClient.java b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/RxHttpClient.java new file mode 100644 index 0000000000..766de8a764 --- /dev/null +++ b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/RxHttpClient.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.extras.rxjava2; + +import java.util.function.Supplier; + +import org.asynchttpclient.AsyncCompletionHandlerBase; +import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.Request; +import org.asynchttpclient.Response; + +import io.reactivex.Maybe; + +/** + * Prepares HTTP requests by wrapping them into RxJava 2 {@code Maybe} instances. + * + * @see RxJava – Reactive Extensions for the JVM + */ +public interface RxHttpClient { + + /** + * Returns a new {@code RxHttpClient} instance that uses the given {@code asyncHttpClient} under the hoods. + * + * @param asyncHttpClient + * the Async HTTP Client instance to be used + * + * @return a new {@code RxHttpClient} instance + * + * @throws NullPointerException + * if {@code asyncHttpClient} is {@code null} + */ + static RxHttpClient create(AsyncHttpClient asyncHttpClient) { + return new DefaultRxHttpClient(asyncHttpClient); + } + + /** + * Prepares the given {@code request}. For each subscription to the returned {@code Maybe}, a new HTTP request will + * be executed and its response will be emitted. + * + * @param request + * the request that is to be executed + * + * @return a {@code Maybe} that executes {@code request} upon subscription and emits the response + * + * @throws NullPointerException + * if {@code request} is {@code null} + */ + default Maybe prepare(Request request) { + return prepare(request, AsyncCompletionHandlerBase::new); + } + + /** + * Prepares the given {@code request}. For each subscription to the returned {@code Maybe}, a new HTTP request will + * be executed and the results of {@code AsyncHandlers} obtained from {@code handlerSupplier} will be emitted. + * + * @param + * the result type produced by handlers produced by {@code handlerSupplier} and emitted by the returned + * {@code Maybe} instance + * + * @param request + * the request that is to be executed + * @param handlerSupplier + * supplies the desired {@code AsyncHandler} instances that are used to produce results + * + * @return a {@code Maybe} that executes {@code request} upon subscription and that emits the results produced by + * the supplied handers + * + * @throws NullPointerException + * if at least one of the parameters is {@code null} + */ + Maybe prepare(Request request, Supplier> handlerSupplier); +} diff --git a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java new file mode 100644 index 0000000000..1b8a344969 --- /dev/null +++ b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.extras.rxjava2.maybe; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.HttpResponseBodyPart; +import org.asynchttpclient.HttpResponseHeaders; +import org.asynchttpclient.HttpResponseStatus; +import org.asynchttpclient.extras.rxjava2.DisposedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.reactivex.MaybeEmitter; +import io.reactivex.exceptions.CompositeException; +import io.reactivex.exceptions.Exceptions; + +/** + * Abstract base class that bridges events between the {@code Maybe} reactive base type and {@code AsyncHandlers}. + * + * When an event is received, it's first checked if the Rx stream has been disposed asynchronously. If so, request + * processing is {@linkplain #disposed() aborted}, otherwise, the event is forwarded to the {@linkplain #delegate() + * wrapped handler}. + * + * When the request is {@link AsyncHandler#onCompleted() completed}, the result produced by the wrapped instance is + * forwarded to the {@code Maybe}: If the result is {@code null}, {@link MaybeEmitter#onComplete()} is invoked, + * {@link MaybeEmitter#onSuccess(T)} otherwise. + * + * Any errors during request processing are forwarded via {@link MaybeEmitter#onError(Throwable)}. + * + * @param + * the result type produced by the wrapped {@code AsyncHandler} and emitted via RxJava + */ +public abstract class AbstractMaybeAsyncHandlerBridge implements AsyncHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMaybeAsyncHandlerBridge.class); + + private static volatile DisposedException sharedDisposed; + + /** + * The Rx callback object that receives downstream events and will be queried for its + * {@link MaybeEmitter#isDisposed() disposed state} when Async HTTP Client callbacks are invoked. + */ + protected final MaybeEmitter emitter; + + /** + * Indicates if the delegate has already received a terminal event. + */ + private final AtomicBoolean delegateTerminated = new AtomicBoolean(); + + protected AbstractMaybeAsyncHandlerBridge(MaybeEmitter emitter) { + this.emitter = requireNonNull(emitter); + } + + @Override + public final State onBodyPartReceived(HttpResponseBodyPart content) throws Exception { + return emitter.isDisposed() ? disposed() : delegate().onBodyPartReceived(content); + } + + @Override + public final State onStatusReceived(HttpResponseStatus status) throws Exception { + return emitter.isDisposed() ? disposed() : delegate().onStatusReceived(status); + } + + @Override + public final State onHeadersReceived(HttpResponseHeaders headers) throws Exception { + return emitter.isDisposed() ? disposed() : delegate().onHeadersReceived(headers); + } + + /** + * {@inheritDoc} + * + *

+ * The value returned by the wrapped {@code AsyncHandler} won't be returned by this method, but emtited via RxJava. + *

+ * + * @return always {@code null} + */ + @Override + public final Void onCompleted() { + if (delegateTerminated.getAndSet(true)) { + return null; + } + + final T result; + try { + result = delegate().onCompleted(); + } catch (final Throwable t) { + emitOnError(t); + return null; + } + + if (!emitter.isDisposed()) { + if (result == null) { + emitter.onComplete(); + } else { + emitter.onSuccess(result); + } + } + + return null; + } + + /** + * {@inheritDoc} + * + *

+ * The exception will first be propagated to the wrapped {@code AsyncHandler}, then emitted via RxJava. If the + * invocation of the delegate itself throws an exception, both the original exception and the follow-up exception + * will be wrapped into RxJava's {@code CompositeException} and then be emitted. + *

+ */ + @Override + public final void onThrowable(Throwable t) { + if (delegateTerminated.getAndSet(true)) { + return; + } + + Throwable error = t; + try { + delegate().onThrowable(t); + } catch (final Throwable x) { + error = new CompositeException(Arrays.asList(t, x)); + } + + emitOnError(error); + } + + /** + * Called to indicate that request processing is to be aborted because the linked Rx stream has been disposed. If + * the {@link #delegate() delegate} didn't already receive a terminal event, + * {@code AsyncHandler#onThrowable(Throwable) onThrowable} will be called with a {@link DisposedException}. + * + * @return always {@link State#ABORT} + */ + protected final AsyncHandler.State disposed() { + if (!delegateTerminated.getAndSet(true)) { + + DisposedException disposed = sharedDisposed; + if (disposed == null) { + disposed = new DisposedException("Subscription has been disposed."); + final StackTraceElement[] stackTrace = disposed.getStackTrace(); + if (stackTrace.length > 0) { + disposed.setStackTrace(new StackTraceElement[] { stackTrace[0] }); + } + + sharedDisposed = disposed; + } + + delegate().onThrowable(disposed); + } + + return State.ABORT; + } + + /** + * @return the wrapped {@code AsyncHandler} instance to which calls are delegated + */ + protected abstract AsyncHandler delegate(); + + private void emitOnError(Throwable error) { + Exceptions.throwIfFatal(error); + if (!emitter.isDisposed()) { + emitter.onError(error); + } else { + LOGGER.debug("Not propagating onError after disposal: {}", error.getMessage(), error); + } + } +} diff --git a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeProgressAsyncHandlerBridge.java b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeProgressAsyncHandlerBridge.java new file mode 100644 index 0000000000..c68a10c38e --- /dev/null +++ b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeProgressAsyncHandlerBridge.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.extras.rxjava2.maybe; + +import org.asynchttpclient.handler.ProgressAsyncHandler; + +import io.reactivex.MaybeEmitter; + +/** + * An extension to {@code AbstractMaybeAsyncHandlerBridge} for {@code ProgressAsyncHandlers}. + * + * @param + * the result type produced by the wrapped {@code ProgressAsyncHandler} and emitted via RxJava + */ +public abstract class AbstractMaybeProgressAsyncHandlerBridge extends AbstractMaybeAsyncHandlerBridge + implements ProgressAsyncHandler { + + protected AbstractMaybeProgressAsyncHandlerBridge(MaybeEmitter emitter) { + super(emitter); + } + + @Override + public final State onHeadersWritten() { + return emitter.isDisposed() ? disposed() : delegate().onHeadersWritten(); + } + + @Override + public final State onContentWritten() { + return emitter.isDisposed() ? disposed() : delegate().onContentWritten(); + } + + @Override + public final State onContentWriteProgress(long amount, long current, long total) { + return emitter.isDisposed() ? disposed() : delegate().onContentWriteProgress(amount, current, total); + } + + @Override + protected abstract ProgressAsyncHandler delegate(); + +} diff --git a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/MaybeAsyncHandlerBridge.java b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/MaybeAsyncHandlerBridge.java new file mode 100644 index 0000000000..b4af729aa4 --- /dev/null +++ b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/MaybeAsyncHandlerBridge.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.extras.rxjava2.maybe; + +import static java.util.Objects.requireNonNull; + +import org.asynchttpclient.AsyncHandler; + +import io.reactivex.MaybeEmitter; + +public final class MaybeAsyncHandlerBridge extends AbstractMaybeAsyncHandlerBridge { + + private final AsyncHandler delegate; + + public MaybeAsyncHandlerBridge(MaybeEmitter emitter, AsyncHandler delegate) { + super(emitter); + this.delegate = requireNonNull(delegate); + } + + @Override + protected AsyncHandler delegate() { + return delegate; + } +} diff --git a/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/ProgressAsyncMaybeEmitterBridge.java b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/ProgressAsyncMaybeEmitterBridge.java new file mode 100644 index 0000000000..4e54a823d6 --- /dev/null +++ b/extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/ProgressAsyncMaybeEmitterBridge.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.extras.rxjava2.maybe; + +import static java.util.Objects.requireNonNull; + +import org.asynchttpclient.handler.ProgressAsyncHandler; + +import io.reactivex.MaybeEmitter; + +public final class ProgressAsyncMaybeEmitterBridge extends AbstractMaybeProgressAsyncHandlerBridge { + + private final ProgressAsyncHandler delegate; + + public ProgressAsyncMaybeEmitterBridge(MaybeEmitter emitter, ProgressAsyncHandler delegate) { + super(emitter); + this.delegate = requireNonNull(delegate); + } + + @Override + protected ProgressAsyncHandler delegate() { + return delegate; + } +} diff --git a/extras/rxjava2/src/test/java/org/asynchttpclient/extras/rxjava2/DefaultRxHttpClientTest.java b/extras/rxjava2/src/test/java/org/asynchttpclient/extras/rxjava2/DefaultRxHttpClientTest.java new file mode 100644 index 0000000000..77f0553739 --- /dev/null +++ b/extras/rxjava2/src/test/java/org/asynchttpclient/extras/rxjava2/DefaultRxHttpClientTest.java @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.extras.rxjava2; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import java.util.function.Supplier; + +import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.ListenableFuture; +import org.asynchttpclient.Request; +import org.asynchttpclient.handler.ProgressAsyncHandler; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import io.reactivex.Maybe; +import io.reactivex.observers.TestObserver; + +public class DefaultRxHttpClientTest { + + @Mock + private AsyncHttpClient asyncHttpClient; + + @Mock + private Request request; + + @Mock + private Supplier> handlerSupplier; + + @Mock + private AsyncHandler handler; + + @Mock + private ProgressAsyncHandler progressHandler; + + @Captor + private ArgumentCaptor> handlerCaptor; + + @Mock + private ListenableFuture resposeFuture; + + @InjectMocks + private DefaultRxHttpClient underTest; + + @BeforeMethod(groups = "standalone") + public void initializeTest() { + underTest = null; // we want a fresh instance for each test + MockitoAnnotations.initMocks(this); + } + + @Test(groups = "standalone", expectedExceptions = NullPointerException.class) + public void rejectsNullClient() { + new DefaultRxHttpClient(null); + } + + @Test(groups = "standalone", expectedExceptions = NullPointerException.class) + public void rejectsNullRequest() { + underTest.prepare(null, handlerSupplier); + } + + @Test(groups = "standalone", expectedExceptions = NullPointerException.class) + public void rejectsNullHandlerSupplier() { + underTest.prepare(request, null); + } + + @Test(groups = "standalone") + public void emitsNullPointerExceptionWhenNullHandlerIsSupplied() { + // given + given(handlerSupplier.get()).willReturn(null); + final TestObserver subscriber = new TestObserver<>(); + + // when + underTest.prepare(request, handlerSupplier).subscribe(subscriber); + + // then + subscriber.assertTerminated(); + subscriber.assertNoValues(); + subscriber.assertError(NullPointerException.class); + then(handlerSupplier).should().get(); + verifyNoMoreInteractions(handlerSupplier); + } + + @Test(groups = "standalone") + public void usesVanillaAsyncHandler() throws Exception { + // given + given(handlerSupplier.get()).willReturn(handler); + + // when + underTest.prepare(request, handlerSupplier).subscribe(); + + // then + then(asyncHttpClient).should().executeRequest(eq(request), handlerCaptor.capture()); + final AsyncHandler bridge = handlerCaptor.getValue(); + assertThat(bridge, is(not(instanceOf(ProgressAsyncHandler.class)))); + } + + @Test(groups = "standalone") + public void usesProgressAsyncHandler() throws Exception { + given(handlerSupplier.get()).willReturn(progressHandler); + + // when + underTest.prepare(request, handlerSupplier).subscribe(); + + // then + then(asyncHttpClient).should().executeRequest(eq(request), handlerCaptor.capture()); + final AsyncHandler bridge = handlerCaptor.getValue(); + assertThat(bridge, is(instanceOf(ProgressAsyncHandler.class))); + } + + @Test(groups = "standalone") + public void callsSupplierForEachSubscription() throws Exception { + // given + given(handlerSupplier.get()).willReturn(handler); + final Maybe prepared = underTest.prepare(request, handlerSupplier); + + // when + prepared.subscribe(); + prepared.subscribe(); + + // then + then(handlerSupplier).should(times(2)).get(); + } + + @Test(groups = "standalone") + public void cancelsResponseFutureOnDispose() throws Exception { + given(handlerSupplier.get()).willReturn(handler); + given(asyncHttpClient.executeRequest(eq(request), any())).willReturn(resposeFuture); + + /* when */ underTest.prepare(request, handlerSupplier).subscribe().dispose(); + + // then + then(asyncHttpClient).should().executeRequest(eq(request), handlerCaptor.capture()); + final AsyncHandler bridge = handlerCaptor.getValue(); + then(resposeFuture).should().cancel(true); + verifyZeroInteractions(handler); + assertThat(bridge.onStatusReceived(null), is(AsyncHandler.State.ABORT)); + verify(handler).onThrowable(isA(DisposedException.class)); + verifyNoMoreInteractions(handler); + } +} diff --git a/extras/rxjava2/src/test/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridgeTest.java b/extras/rxjava2/src/test/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridgeTest.java new file mode 100644 index 0000000000..2e37df5202 --- /dev/null +++ b/extras/rxjava2/src/test/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridgeTest.java @@ -0,0 +1,243 @@ +/* + * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.extras.rxjava2.maybe; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.willThrow; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.only; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; + +import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.AsyncHandler.State; +import org.asynchttpclient.HttpResponseBodyPart; +import org.asynchttpclient.HttpResponseHeaders; +import org.asynchttpclient.HttpResponseStatus; +import org.asynchttpclient.extras.rxjava2.DisposedException; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import io.reactivex.MaybeEmitter; +import io.reactivex.exceptions.CompositeException; + +public class AbstractMaybeAsyncHandlerBridgeTest { + + @Mock + MaybeEmitter emitter; + + @Mock + AsyncHandler delegate; + + @Mock + private HttpResponseStatus status; + + @Mock + private HttpResponseHeaders headers; + + @Mock + private HttpResponseBodyPart bodyPart; + + @Captor + private ArgumentCaptor throwable; + + private AbstractMaybeAsyncHandlerBridge underTest; + + @BeforeMethod + public void initializeTest() { + MockitoAnnotations.initMocks(this); + underTest = new UnderTest(); + } + + @Test + public void forwardsEvents() throws Exception { + given(delegate.onCompleted()).willReturn(this); + + /* when */ underTest.onStatusReceived(status); + then(delegate).should().onStatusReceived(status); + + /* when */ underTest.onHeadersReceived(headers); + then(delegate).should().onHeadersReceived(headers); + + /* when */ underTest.onBodyPartReceived(bodyPart); + /* when */ underTest.onBodyPartReceived(bodyPart); + then(delegate).should(times(2)).onBodyPartReceived(bodyPart); + + /* when */ underTest.onCompleted(); + then(delegate).should().onCompleted(); + then(emitter).should().onSuccess(this); + /* then */ verifyNoMoreInteractions(delegate); + } + + @Test + public void wontCallOnCompleteTwice() throws Exception { + InOrder inOrder = Mockito.inOrder(emitter); + + /* when */ underTest.onCompleted(); + /* then */ inOrder.verify(emitter).onComplete(); + + /* when */ underTest.onCompleted(); + /* then */ inOrder.verify(emitter, never()).onComplete(); + } + + @Test + public void wontCallOnErrorTwice() throws Exception { + InOrder inOrder = Mockito.inOrder(emitter); + + /* when */ underTest.onThrowable(null); + /* then */ inOrder.verify(emitter).onError(null); + + /* when */ underTest.onThrowable(new RuntimeException("unwanted")); + /* then */ inOrder.verify(emitter, never()).onError(any()); + } + + @Test + public void wontCallOnErrorAfterOnComplete() throws Exception { + /* when */ underTest.onCompleted(); + then(emitter).should().onComplete(); + + /* when */ underTest.onThrowable(null); + then(emitter).should(never()).onError(any()); + } + + @Test + public void wontCallOnCompleteAfterOnError() throws Exception { + /* when */ underTest.onThrowable(null); + then(emitter).should().onError(null); + + /* when */ underTest.onCompleted(); + then(emitter).should(never()).onComplete(); + } + + @Test + public void wontCallOnCompleteAfterDisposal() throws Exception { + given(emitter.isDisposed()).willReturn(true); + /* when */ underTest.onCompleted(); + /* then */ verify(emitter, never()).onComplete(); + } + + @Test + public void wontCallOnErrorAfterDisposal() throws Exception { + given(emitter.isDisposed()).willReturn(true); + /* when */ underTest.onThrowable(new RuntimeException("ignored")); + /* then */ verify(emitter, never()).onError(any()); + } + + @Test + public void handlesExceptionsWhileCompleting() throws Exception { + /* given */ final Throwable x = new RuntimeException("mocked error in delegate onCompleted()"); + given(delegate.onCompleted()).willThrow(x); + /* when */ underTest.onCompleted(); + then(emitter).should().onError(x); + } + + @Test + public void handlesExceptionsWhileFailing() throws Exception { + // given + final Throwable initial = new RuntimeException("mocked error for onThrowable()"); + final Throwable followup = new RuntimeException("mocked error in delegate onThrowable()"); + willThrow(followup).given(delegate).onThrowable(initial); + + /* when */ underTest.onThrowable(initial); + + // then + then(emitter).should().onError(throwable.capture()); + final Throwable thrown = throwable.getValue(); + assertThat(thrown, is(instanceOf(CompositeException.class))); + assertThat(((CompositeException) thrown).getExceptions(), is(Arrays.asList(initial, followup))); + } + + @Test + public void cachesDisposedException() throws Exception { + // when + new UnderTest().disposed(); + new UnderTest().disposed(); + + // then + then(delegate).should(times(2)).onThrowable(throwable.capture()); + final List errors = throwable.getAllValues(); + final Throwable firstError = errors.get(0), secondError = errors.get(1); + assertThat(secondError, is(sameInstance(firstError))); + final StackTraceElement[] stackTrace = firstError.getStackTrace(); + assertThat(stackTrace.length, is(1)); + assertThat(stackTrace[0].getClassName(), is(AbstractMaybeAsyncHandlerBridge.class.getName())); + assertThat(stackTrace[0].getMethodName(), is("disposed")); + } + + @DataProvider + public Object[][] httpEvents() { + return new Object[][] { // + { named("onStatusReceived", () -> underTest.onStatusReceived(status)) }, // + { named("onHeadersReceived", () -> underTest.onHeadersReceived(headers)) }, // + { named("onBodyPartReceived", () -> underTest.onBodyPartReceived(bodyPart)) }, // + }; + } + + @Test(dataProvider = "httpEvents") + public void httpEventCallbacksCheckDisposal(Callable httpEvent) throws Exception { + given(emitter.isDisposed()).willReturn(true); + + /* when */ final AsyncHandler.State firstState = httpEvent.call(); + /* then */ assertThat(firstState, is(State.ABORT)); + then(delegate).should(only()).onThrowable(isA(DisposedException.class)); + + /* when */ final AsyncHandler.State secondState = httpEvent.call(); + /* then */ assertThat(secondState, is(State.ABORT)); + /* then */ verifyNoMoreInteractions(delegate); + } + + private final class UnderTest extends AbstractMaybeAsyncHandlerBridge { + UnderTest() { + super(AbstractMaybeAsyncHandlerBridgeTest.this.emitter); + } + + @Override + protected AsyncHandler delegate() { + return delegate; + } + } + + private static Callable named(String name, Callable callable) { + return new Callable() { + @Override + public String toString() { + return name; + } + + @Override + public T call() throws Exception { + return callable.call(); + } + }; + } +} diff --git a/extras/rxjava2/src/test/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeProgressAsyncHandlerBridgeTest.java b/extras/rxjava2/src/test/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeProgressAsyncHandlerBridgeTest.java new file mode 100644 index 0000000000..5f33906e5e --- /dev/null +++ b/extras/rxjava2/src/test/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeProgressAsyncHandlerBridgeTest.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2017 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.extras.rxjava2.maybe; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.only; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.util.concurrent.Callable; + +import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.AsyncHandler.State; +import org.asynchttpclient.extras.rxjava2.DisposedException; +import org.asynchttpclient.handler.ProgressAsyncHandler; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import io.reactivex.MaybeEmitter; + +public class AbstractMaybeProgressAsyncHandlerBridgeTest { + + @Mock + MaybeEmitter emitter; + + @Mock + ProgressAsyncHandler delegate; + + private AbstractMaybeProgressAsyncHandlerBridge underTest; + + @BeforeMethod + public void initializeTest() { + MockitoAnnotations.initMocks(this); + underTest = new UnderTest(); + } + + @Test + public void forwardsEvents() throws Exception { + /* when */ underTest.onHeadersWritten(); + then(delegate).should().onHeadersWritten(); + + /* when */ underTest.onContentWriteProgress(40, 60, 100); + then(delegate).should().onContentWriteProgress(40, 60, 100); + + /* when */ underTest.onContentWritten(); + then(delegate).should().onContentWritten(); + } + + @DataProvider + public Object[][] httpEvents() { + return new Object[][] { // + { named("onHeadersWritten", () -> underTest.onHeadersWritten()) }, // + { named("onContentWriteProgress", () -> underTest.onContentWriteProgress(40, 60, 100)) }, // + { named("onContentWritten", () -> underTest.onContentWritten()) }, // + }; + } + + @Test(dataProvider = "httpEvents") + public void httpEventCallbacksCheckDisposal(Callable httpEvent) throws Exception { + given(emitter.isDisposed()).willReturn(true); + + /* when */ final AsyncHandler.State firstState = httpEvent.call(); + /* then */ assertThat(firstState, is(State.ABORT)); + then(delegate).should(only()).onThrowable(isA(DisposedException.class)); + + /* when */ final AsyncHandler.State secondState = httpEvent.call(); + /* then */ assertThat(secondState, is(State.ABORT)); + /* then */ verifyNoMoreInteractions(delegate); + } + + private final class UnderTest extends AbstractMaybeProgressAsyncHandlerBridge { + UnderTest() { + super(AbstractMaybeProgressAsyncHandlerBridgeTest.this.emitter); + } + + @Override + protected ProgressAsyncHandler delegate() { + return delegate; + } + + } + + private static Callable named(String name, Callable callable) { + return new Callable() { + @Override + public String toString() { + return name; + } + + @Override + public T call() throws Exception { + return callable.call(); + } + }; + } +}