Skip to content

Introduce AHC RxJava 2.x extras #1391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extras/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<module>jdeferred</module>
<module>registry</module>
<module>rxjava</module>
<module>rxjava2</module>
<module>simple</module>
</modules>

Expand Down
18 changes: 18 additions & 0 deletions extras/rxjava2/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>async-http-client-extras-parent</artifactId>
<groupId>org.asynchttpclient</groupId>
<version>2.1.0-SNAPSHOT</version>
</parent>
<artifactId>async-http-client-extras-rxjava2</artifactId>
<name>Asynchronous Http Client RxJava2 Extras</name>
<description>The Async Http Client RxJava2 Extras.</description>
<dependencies>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.8</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.extras.rxjava2;

import static java.util.Objects.requireNonNull;

import java.util.concurrent.Future;
import java.util.function.Supplier;

import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Request;
import org.asynchttpclient.extras.rxjava2.maybe.MaybeAsyncHandlerBridge;
import org.asynchttpclient.extras.rxjava2.maybe.ProgressAsyncMaybeEmitterBridge;
import org.asynchttpclient.handler.ProgressAsyncHandler;

import io.reactivex.Maybe;
import io.reactivex.MaybeEmitter;
import io.reactivex.disposables.Disposables;

/**
* Straight forward default implementation of the {@code RxHttpClient} interface.
*/
public class DefaultRxHttpClient implements RxHttpClient {

private final AsyncHttpClient asyncHttpClient;

/**
* Returns a new {@code DefaultRxHttpClient} instance that uses the given {@code asyncHttpClient} under the hoods.
*
* @param asyncHttpClient
* the Async HTTP Client instance to be used
*
* @return a new {@code RxHttpClient} instance
*
* @throws NullPointerException
* if {@code asyncHttpClient} is {@code null}
*/
public DefaultRxHttpClient(AsyncHttpClient asyncHttpClient) {
this.asyncHttpClient = requireNonNull(asyncHttpClient);
}

@Override
public <T> Maybe<T> prepare(Request request, Supplier<? extends AsyncHandler<T>> handlerSupplier) {
requireNonNull(request);
requireNonNull(handlerSupplier);

return Maybe.create(emitter -> {
final AsyncHandler<?> bridge = createBridge(emitter, handlerSupplier.get());
final Future<?> responseFuture = asyncHttpClient.executeRequest(request, bridge);
emitter.setDisposable(Disposables.fromFuture(responseFuture));
});
}

/**
* Creates an {@code AsyncHandler} that bridges events from the given {@code handler} to the given {@code emitter}
* and cancellation/disposal in the other direction.
*
* @param <T>
* the result type produced by {@code handler} and emitted by {@code emitter}
*
* @param emitter
* the RxJava emitter instance that receives results upon completion and will be queried for disposal
* during event processing
* @param handler
* the {@code AsyncHandler} instance that receives downstream events and produces the result that will be
* emitted upon request completion
*
* @return the bridge handler
*/
protected <T> AsyncHandler<?> createBridge(MaybeEmitter<T> emitter, AsyncHandler<T> handler) {
if (handler instanceof ProgressAsyncHandler) {
return new ProgressAsyncMaybeEmitterBridge<>(emitter, (ProgressAsyncHandler<? extends T>) handler);
}

return new MaybeAsyncHandlerBridge<>(emitter, handler);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.extras.rxjava2;

import java.util.concurrent.CancellationException;

/**
* Indicates that the HTTP request has been disposed asynchronously via RxJava.
*/
public class DisposedException extends CancellationException {
private static final long serialVersionUID = -5885577182105850384L;

public DisposedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.extras.rxjava2;

import java.util.function.Supplier;

import org.asynchttpclient.AsyncCompletionHandlerBase;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;

import io.reactivex.Maybe;

/**
* Prepares HTTP requests by wrapping them into RxJava 2 {@code Maybe} instances.
*
* @see <a href="https://github.com/ReactiveX/RxJava">RxJava – Reactive Extensions for the JVM</a>
*/
public interface RxHttpClient {

/**
* Returns a new {@code RxHttpClient} instance that uses the given {@code asyncHttpClient} under the hoods.
*
* @param asyncHttpClient
* the Async HTTP Client instance to be used
*
* @return a new {@code RxHttpClient} instance
*
* @throws NullPointerException
* if {@code asyncHttpClient} is {@code null}
*/
static RxHttpClient create(AsyncHttpClient asyncHttpClient) {
return new DefaultRxHttpClient(asyncHttpClient);
}

/**
* Prepares the given {@code request}. For each subscription to the returned {@code Maybe}, a new HTTP request will
* be executed and its response will be emitted.
*
* @param request
* the request that is to be executed
*
* @return a {@code Maybe} that executes {@code request} upon subscription and emits the response
*
* @throws NullPointerException
* if {@code request} is {@code null}
*/
default Maybe<Response> prepare(Request request) {
return prepare(request, AsyncCompletionHandlerBase::new);
}

/**
* Prepares the given {@code request}. For each subscription to the returned {@code Maybe}, a new HTTP request will
* be executed and the results of {@code AsyncHandlers} obtained from {@code handlerSupplier} will be emitted.
*
* @param <T>
* the result type produced by handlers produced by {@code handlerSupplier} and emitted by the returned
* {@code Maybe} instance
*
* @param request
* the request that is to be executed
* @param handlerSupplier
* supplies the desired {@code AsyncHandler} instances that are used to produce results
*
* @return a {@code Maybe} that executes {@code request} upon subscription and that emits the results produced by
* the supplied handers
*
* @throws NullPointerException
* if at least one of the parameters is {@code null}
*/
<T> Maybe<T> prepare(Request request, Supplier<? extends AsyncHandler<T>> handlerSupplier);
}
Loading