From b376ee15482c6bef88cdb111699269ca2d4b26c7 Mon Sep 17 00:00:00 2001 From: Joe Hansche Date: Wed, 26 Apr 2017 17:21:02 -0700 Subject: [PATCH 1/2] Unit test for race condition in subscribe/connected --- .../com/parse/ParseLiveQueryClientImpl.java | 1 + .../com/parse/TestParseLiveQueryClient.java | 42 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java index 60d9eb9..ab78bea 100644 --- a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java +++ b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java @@ -88,6 +88,7 @@ public SubscriptionHandling subscribe(ParseQuery q Subscription subscription = new Subscription<>(requestId, query); subscriptions.append(requestId, subscription); + // TODO: differentiate between state=CONNECTED, vs received op=connected response if (inAnyState(WebSocketClient.State.CONNECTED)) { sendSubscription(subscription); } else if (userInitiatedDisconnect) { diff --git a/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java b/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java index 9b54d4e..5d636d2 100644 --- a/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java +++ b/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java @@ -1,5 +1,6 @@ package com.parse; +import org.assertj.core.api.Assertions; import org.json.JSONException; import org.json.JSONObject; import org.junit.After; @@ -82,6 +83,42 @@ public void tearDown() throws Exception { ParsePlugins.reset(); } + @Test + public void testSubscribeBetweenSocketConnectAndConnectResponse() throws Exception { + ParseQuery queryA = ParseQuery.getQuery("objA"); + ParseQuery queryB = ParseQuery.getQuery("objB"); + clearConnection(); + + // This will trigger connectIfNeeded(), which calls reconnect() + SubscriptionHandling subA = parseLiveQueryClient.subscribe(queryA); + + verify(webSocketClient, times(1)).open(); + verify(webSocketClient, never()).send(anyString()); + + // Now the socket is open + webSocketClientCallback.onOpen(); + when(webSocketClient.getState()).thenReturn(WebSocketClient.State.CONNECTED); + // and we send op=connect + verify(webSocketClient, times(1)).send(contains("\"op\":\"connect\"")); + + // Now if we subscribe to queryB, we SHOULD NOT send the subscribe yet, until we get op=connected + SubscriptionHandling subB = parseLiveQueryClient.subscribe(queryB); // TODO: fix this state + verify(webSocketClient, never()).send(contains("\"op\":\"subscribe\"")); + + // on op=connected, _then_ we should send both subscriptions + webSocketClientCallback.onMessage(createConnectedMessage().toString()); + verify(webSocketClient, times(2)).send(contains("\"op\":\"subscribe\"")); + + // 1. Subscribe to queryA + // - it is not connected yet, so it will trigger reconnect. + // 2. Socket opens & connects; initiate op=connect + // 3. subscribe to queryB + // - SOCKET is connected, but we haven't received op=connected yet. + // - BUG: it will call sendSubscription now + // 4. Server responds to #2 with op=connected + // 5. On op=connected, we replay pending subscriptions, including the one that was already sent in #3 + } + @Test public void testSubscribeWhenSubscribedToCallback() throws Exception { SubscriptionHandling.HandleSubscribeCallback subscribeMockCallback = mock(SubscriptionHandling.HandleSubscribeCallback.class); @@ -459,6 +496,11 @@ private void validateSameObject(SubscriptionHandling.HandleEventCallback Date: Thu, 27 Apr 2017 12:07:08 -0700 Subject: [PATCH 2/2] Resolve race condition by ensuring that `op=connected` has been received before sending a new subscribe event. Fixes parse-community/ParseLiveQuery-Android#46 --- .../com/parse/ParseLiveQueryClientImpl.java | 19 +++++++++++++++---- .../com/parse/TestParseLiveQueryClient.java | 14 +++----------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java index ab78bea..73380a8 100644 --- a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java +++ b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java @@ -38,6 +38,7 @@ private WebSocketClient webSocketClient; private int requestIdCount = 1; private boolean userInitiatedDisconnect = false; + private boolean hasReceivedConnected = false; /* package */ ParseLiveQueryClientImpl() { this(getDefaultUri()); @@ -88,8 +89,7 @@ public SubscriptionHandling subscribe(ParseQuery q Subscription subscription = new Subscription<>(requestId, query); subscriptions.append(requestId, subscription); - // TODO: differentiate between state=CONNECTED, vs received op=connected response - if (inAnyState(WebSocketClient.State.CONNECTED)) { + if (isConnected()) { sendSubscription(subscription); } else if (userInitiatedDisconnect) { Log.w(LOG_TAG, "Warning: The client was explicitly disconnected! You must explicitly call .reconnect() in order to process your subscriptions."); @@ -151,18 +151,21 @@ public void reconnect() { webSocketClient.close(); } + userInitiatedDisconnect = false; + hasReceivedConnected = false; webSocketClient = webSocketClientFactory.createInstance(webSocketClientCallback, uri); webSocketClient.open(); - userInitiatedDisconnect = false; } @Override public void disconnect() { if (webSocketClient != null) { - userInitiatedDisconnect = true; webSocketClient.close(); webSocketClient = null; } + + userInitiatedDisconnect = true; + hasReceivedConnected = false; } @Override @@ -186,6 +189,10 @@ private WebSocketClient.State getWebSocketState() { return state == null ? WebSocketClient.State.NONE : state; } + private boolean isConnected() { + return hasReceivedConnected && inAnyState(WebSocketClient.State.CONNECTED); + } + private boolean inAnyState(WebSocketClient.State... states) { return Arrays.asList(states).contains(getWebSocketState()); } @@ -220,6 +227,7 @@ private void parseMessage(String message) throws LiveQueryException { switch (rawOperation) { case "connected": + hasReceivedConnected = true; dispatchConnected(); Log.v(LOG_TAG, "Connected, sending pending subscription"); for (int i = 0; i < subscriptions.size(); i++) { @@ -371,6 +379,7 @@ private WebSocketClient.WebSocketClientCallback getWebSocketClientCallback() { return new WebSocketClient.WebSocketClientCallback() { @Override public void onOpen() { + hasReceivedConnected = false; Log.v(LOG_TAG, "Socket opened"); ParseUser.getCurrentSessionTokenAsync().onSuccessTask(new Continuation>() { @Override @@ -406,12 +415,14 @@ public Void then(Task task) { @Override public void onClose() { Log.v(LOG_TAG, "Socket onClose"); + hasReceivedConnected = false; dispatchDisconnected(); } @Override public void onError(Throwable exception) { Log.e(LOG_TAG, "Socket onError", exception); + hasReceivedConnected = false; dispatchSocketError(exception); } diff --git a/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java b/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java index 5d636d2..36534d8 100644 --- a/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java +++ b/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java @@ -84,7 +84,8 @@ public void tearDown() throws Exception { } @Test - public void testSubscribeBetweenSocketConnectAndConnectResponse() throws Exception { + public void testSubscribeAfterSocketConnectBeforeConnectedOp() throws Exception { + // Bug: https://github.com/parse-community/ParseLiveQuery-Android/issues/46 ParseQuery queryA = ParseQuery.getQuery("objA"); ParseQuery queryB = ParseQuery.getQuery("objB"); clearConnection(); @@ -102,21 +103,12 @@ public void testSubscribeBetweenSocketConnectAndConnectResponse() throws Excepti verify(webSocketClient, times(1)).send(contains("\"op\":\"connect\"")); // Now if we subscribe to queryB, we SHOULD NOT send the subscribe yet, until we get op=connected - SubscriptionHandling subB = parseLiveQueryClient.subscribe(queryB); // TODO: fix this state + SubscriptionHandling subB = parseLiveQueryClient.subscribe(queryB); verify(webSocketClient, never()).send(contains("\"op\":\"subscribe\"")); // on op=connected, _then_ we should send both subscriptions webSocketClientCallback.onMessage(createConnectedMessage().toString()); verify(webSocketClient, times(2)).send(contains("\"op\":\"subscribe\"")); - - // 1. Subscribe to queryA - // - it is not connected yet, so it will trigger reconnect. - // 2. Socket opens & connects; initiate op=connect - // 3. subscribe to queryB - // - SOCKET is connected, but we haven't received op=connected yet. - // - BUG: it will call sendSubscription now - // 4. Server responds to #2 with op=connected - // 5. On op=connected, we replay pending subscriptions, including the one that was already sent in #3 } @Test