diff --git a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java index 60d9eb9..73380a8 100644 --- a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java +++ b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java @@ -38,6 +38,7 @@ private WebSocketClient webSocketClient; private int requestIdCount = 1; private boolean userInitiatedDisconnect = false; + private boolean hasReceivedConnected = false; /* package */ ParseLiveQueryClientImpl() { this(getDefaultUri()); @@ -88,7 +89,7 @@ public SubscriptionHandling subscribe(ParseQuery q Subscription subscription = new Subscription<>(requestId, query); subscriptions.append(requestId, subscription); - if (inAnyState(WebSocketClient.State.CONNECTED)) { + if (isConnected()) { sendSubscription(subscription); } else if (userInitiatedDisconnect) { Log.w(LOG_TAG, "Warning: The client was explicitly disconnected! You must explicitly call .reconnect() in order to process your subscriptions."); @@ -150,18 +151,21 @@ public void reconnect() { webSocketClient.close(); } + userInitiatedDisconnect = false; + hasReceivedConnected = false; webSocketClient = webSocketClientFactory.createInstance(webSocketClientCallback, uri); webSocketClient.open(); - userInitiatedDisconnect = false; } @Override public void disconnect() { if (webSocketClient != null) { - userInitiatedDisconnect = true; webSocketClient.close(); webSocketClient = null; } + + userInitiatedDisconnect = true; + hasReceivedConnected = false; } @Override @@ -185,6 +189,10 @@ private WebSocketClient.State getWebSocketState() { return state == null ? WebSocketClient.State.NONE : state; } + private boolean isConnected() { + return hasReceivedConnected && inAnyState(WebSocketClient.State.CONNECTED); + } + private boolean inAnyState(WebSocketClient.State... states) { return Arrays.asList(states).contains(getWebSocketState()); } @@ -219,6 +227,7 @@ private void parseMessage(String message) throws LiveQueryException { switch (rawOperation) { case "connected": + hasReceivedConnected = true; dispatchConnected(); Log.v(LOG_TAG, "Connected, sending pending subscription"); for (int i = 0; i < subscriptions.size(); i++) { @@ -370,6 +379,7 @@ private WebSocketClient.WebSocketClientCallback getWebSocketClientCallback() { return new WebSocketClient.WebSocketClientCallback() { @Override public void onOpen() { + hasReceivedConnected = false; Log.v(LOG_TAG, "Socket opened"); ParseUser.getCurrentSessionTokenAsync().onSuccessTask(new Continuation>() { @Override @@ -405,12 +415,14 @@ public Void then(Task task) { @Override public void onClose() { Log.v(LOG_TAG, "Socket onClose"); + hasReceivedConnected = false; dispatchDisconnected(); } @Override public void onError(Throwable exception) { Log.e(LOG_TAG, "Socket onError", exception); + hasReceivedConnected = false; dispatchSocketError(exception); } diff --git a/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java b/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java index 9b54d4e..36534d8 100644 --- a/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java +++ b/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java @@ -1,5 +1,6 @@ package com.parse; +import org.assertj.core.api.Assertions; import org.json.JSONException; import org.json.JSONObject; import org.junit.After; @@ -82,6 +83,34 @@ public void tearDown() throws Exception { ParsePlugins.reset(); } + @Test + public void testSubscribeAfterSocketConnectBeforeConnectedOp() throws Exception { + // Bug: https://github.com/parse-community/ParseLiveQuery-Android/issues/46 + ParseQuery queryA = ParseQuery.getQuery("objA"); + ParseQuery queryB = ParseQuery.getQuery("objB"); + clearConnection(); + + // This will trigger connectIfNeeded(), which calls reconnect() + SubscriptionHandling subA = parseLiveQueryClient.subscribe(queryA); + + verify(webSocketClient, times(1)).open(); + verify(webSocketClient, never()).send(anyString()); + + // Now the socket is open + webSocketClientCallback.onOpen(); + when(webSocketClient.getState()).thenReturn(WebSocketClient.State.CONNECTED); + // and we send op=connect + verify(webSocketClient, times(1)).send(contains("\"op\":\"connect\"")); + + // Now if we subscribe to queryB, we SHOULD NOT send the subscribe yet, until we get op=connected + SubscriptionHandling subB = parseLiveQueryClient.subscribe(queryB); + verify(webSocketClient, never()).send(contains("\"op\":\"subscribe\"")); + + // on op=connected, _then_ we should send both subscriptions + webSocketClientCallback.onMessage(createConnectedMessage().toString()); + verify(webSocketClient, times(2)).send(contains("\"op\":\"subscribe\"")); + } + @Test public void testSubscribeWhenSubscribedToCallback() throws Exception { SubscriptionHandling.HandleSubscribeCallback subscribeMockCallback = mock(SubscriptionHandling.HandleSubscribeCallback.class); @@ -459,6 +488,11 @@ private void validateSameObject(SubscriptionHandling.HandleEventCallback