diff --git a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java index 5cf4f45..0339194 100644 --- a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java +++ b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java @@ -88,19 +88,21 @@ public void unsubscribe(final ParseQuery query, final SubscriptionHandling su @Override public void reconnect() { - if (webSocketClient != null) { - webSocketClient.close(); - } - this.webSocketClient = webSocketClientFactory.createInstance(webSocketClientCallback, uri); - this.webSocketClient.open(); + disconnectAsync().continueWith(new Continuation() { + @Override + public Void then(Task task) throws Exception { + webSocketClient = webSocketClientFactory.createInstance(webSocketClientCallback, uri); + webSocketClient.open(); + return null; + } + }); userInitiatedDisconnect = false; } @Override public void disconnect() { if (webSocketClient != null) { - webSocketClient.close(); - webSocketClient = null; + disconnectAsync(); userInitiatedDisconnect = true; } } @@ -131,6 +133,20 @@ public Void call() throws Exception { }, taskExecutor); } + private Task disconnectAsync() { + return Task.call(new Callable() { + @Override + public Void call() throws Exception { + if (webSocketClient != null) { + webSocketClient.close(); + webSocketClient = null; + } + + return null; + } + }, taskExecutor); + } + private void parseMessage(String message) throws LiveQueryException { try { JSONObject jsonObject = new JSONObject(message); @@ -227,7 +243,7 @@ private void sendSubscription(final Subscription subscription) { public Void then(Task task) throws Exception { String sessionToken = task.getResult(); SubscribeClientOperation op = new SubscribeClientOperation<>(subscription.getRequestId(), subscription.getQueryState(), sessionToken); - + // dispatch errors sendOperationAsync(op).continueWith(new Continuation() { public Void then(Task task) { diff --git a/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java b/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java index b1bce6e..fc60c45 100644 --- a/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java +++ b/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java @@ -13,6 +13,8 @@ import org.robolectric.annotation.Config; import java.net.URI; +import java.util.LinkedList; +import java.util.Queue; import java.util.concurrent.Executor; import bolts.Task; @@ -23,8 +25,8 @@ import static org.mockito.AdditionalMatchers.and; import static org.mockito.AdditionalMatchers.not; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.contains; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -37,6 +39,7 @@ @Config(constants = BuildConfig.class, sdk = 21) public class TestParseLiveQueryClient { + private PauseableExecutor executor; private WebSocketClient webSocketClient; private WebSocketClient.WebSocketClientCallback webSocketClientCallback; private ParseLiveQueryClient parseLiveQueryClient; @@ -64,6 +67,8 @@ public Task answer(InvocationOnMock invocation) throws Throwable { }); ParseCorePlugins.getInstance().registerCurrentUserController(currentUserController); + executor = new PauseableExecutor(); + parseLiveQueryClient = ParseLiveQueryClient.Factory.getClient(new URI(""), new WebSocketClientFactory() { @Override public WebSocketClient createInstance(WebSocketClient.WebSocketClientCallback webSocketClientCallback, URI hostUrl) { @@ -71,12 +76,7 @@ public WebSocketClient createInstance(WebSocketClient.WebSocketClientCallback we webSocketClient = mock(WebSocketClient.class); return webSocketClient; } - }, new Executor() { - @Override - public void execute(Runnable command) { - command.run(); - } - }); + }, executor); reconnect(); } @@ -345,6 +345,16 @@ public void testEmptySessionTokenOnSubscribe() { contains("\"sessionToken\":\"the token\""))); } + @Test + public void testDisconnectOnBackgroundThread() throws Exception { + executor.pause(); + + parseLiveQueryClient.disconnect(); + verify(webSocketClient, never()).close(); + assertTrue(executor.advanceOne()); + verify(webSocketClient, times(1)).close(); + } + private SubscriptionHandling createSubscription(ParseQuery parseQuery, SubscriptionHandling.HandleSubscribeCallback subscribeMockCallback) throws Exception { SubscriptionHandling subscriptionHandling = parseLiveQueryClient.subscribe(parseQuery).handleSubscribe(subscribeMockCallback); @@ -443,4 +453,39 @@ private static JSONObject createObjectDeleteMessage(int requestId, ParseObject p jsonObject.put("object", PointerEncoder.get().encodeRelatedObject(parseObject)); return jsonObject; } + + private static class PauseableExecutor implements Executor { + private boolean isPaused = false; + private final Queue queue = new LinkedList<>(); + + void pause() { + isPaused = true; + } + + void unpause() { + if (isPaused) { + isPaused = false; + + //noinspection StatementWithEmptyBody + while (advanceOne()) { + // keep going + } + } + } + + boolean advanceOne() { + Runnable next = queue.poll(); + if (next != null) next.run(); + return next != null; + } + + @Override + public void execute(Runnable runnable) { + if (isPaused) { + queue.add(runnable); + } else { + runnable.run(); + } + } + } }