From b43f7bfde97d5873e3c6e94e1b1a93541d0cf0f3 Mon Sep 17 00:00:00 2001 From: David Justo Date: Wed, 15 Jul 2020 16:19:44 -0700 Subject: [PATCH 1/8] WIP: rewind --- .../models/DurableOrchestrationClient.py | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index c96f370d..960dd539 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -1,6 +1,6 @@ import json from datetime import datetime -from typing import List, Any, Awaitable +from typing import List, Any, Awaitable, Optional from time import time from asyncio import sleep from urllib.parse import urlparse, quote @@ -509,3 +509,41 @@ def _get_raise_event_url(self, instance_id, event_name, task_hub_name, connectio request_url += "?" + "&".join(query) return request_url + + async def rewind(self, + instance_id: str, + reason: str, + task_hub_name: Optional[str] = None, + connection_name: Optional[str] = None): + id_placeholder = self._orchestration_bindings.management_urls.copy()["id"] + request_url: str = "" + if self._orchestration_bindings.rpc_base_url: ## RPCMANAGEMENT OPS?? + path = "instances/{instance_id}/rewind?reason={reason}" + query: List[str] = [] + if not (task_hub_name is None): + query.append(f"taskHub={task_hub_name}") + if not (connection_name is None): + query.append(f"connection={connection_name}") + if len(query) > 0: + path += "&" + "&".join(query) + + request_url = f"{self._orchestration_bindings.rpc_base_url}/" + path + else: + # TODO: double check this path is safe + request_url = self._orchestration_bindings.management_urls.\ + replace(id_placeholder, instance_id).\ + replace(self._reason_placeholder, reason) + + response = self._post_async_request(request_url, None) + status: int = response[0] + if status == 200: + return + elif status == 404: + ex_msg = f"No instance with ID {instance_id} found." + raise Exception(ex_msg) + elif status == 410: + ex_msg = "The rewind operation is only supported on failed orchestration instances." + raise Exception(ex_msg) + else: + ex_msg = response[1] + raise Exception(ex_msg) From 8806ccd4017f26333149603f81c72d8e0d1831a6 Mon Sep 17 00:00:00 2001 From: David Justo Date: Fri, 11 Sep 2020 10:06:06 -0700 Subject: [PATCH 2/8] Added unit tests, fixed non-awaited coroutine --- .../models/DurableOrchestrationClient.py | 8 ++--- .../models/test_DurableOrchestrationClient.py | 36 +++++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 788ee06d..22e0ab20 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -555,7 +555,7 @@ async def rewind(self, id_placeholder = self._orchestration_bindings.management_urls.copy()["id"] request_url: str = "" if self._orchestration_bindings.rpc_base_url: ## RPCMANAGEMENT OPS?? - path = "instances/{instance_id}/rewind?reason={reason}" + path = f"instances/{instance_id}/rewind?reason={reason}" query: List[str] = [] if not (task_hub_name is None): query.append(f"taskHub={task_hub_name}") @@ -564,16 +564,16 @@ async def rewind(self, if len(query) > 0: path += "&" + "&".join(query) - request_url = f"{self._orchestration_bindings.rpc_base_url}/" + path + request_url = f"{self._orchestration_bindings.rpc_base_url}" + path else: # TODO: double check this path is safe request_url = self._orchestration_bindings.management_urls.\ replace(id_placeholder, instance_id).\ replace(self._reason_placeholder, reason) - response = self._post_async_request(request_url, None) + response = await self._post_async_request(request_url, None) status: int = response[0] - if status == 200: + if status == 200 or status == 202: return elif status == 404: ex_msg = f"No instance with ID {instance_id} found." diff --git a/tests/models/test_DurableOrchestrationClient.py b/tests/models/test_DurableOrchestrationClient.py index 49fc9ff3..b3a27f92 100644 --- a/tests/models/test_DurableOrchestrationClient.py +++ b/tests/models/test_DurableOrchestrationClient.py @@ -19,6 +19,9 @@ MESSAGE_500 = 'instance failed with unhandled exception' MESSAGE_501 = "well we didn't expect that" +INSTANCE_ID = "2e2568e7-a906-43bd-8364-c81733c5891e" +REASON = "Stuff" + TEST_ORCHESTRATOR = "MyDurableOrchestrator" EXCEPTION_ORCHESTRATOR_NOT_FOUND_EXMESSAGE = "The function doesn't exist,"\ " is disabled, or is not an orchestrator function. Additional info: "\ @@ -540,3 +543,36 @@ async def test_start_new_orchestrator_internal_exception(binding_string): with pytest.raises(Exception) as ex: await client.start_new(TEST_ORCHESTRATOR) ex.match(status_str) + +@pytest.mark.asyncio +async def test_rewind_works_under_200_and_200_http_codes(binding_string): + """Tests that the rewind API works as expected under 'successful' http codes: 200, 202""" + client = DurableOrchestrationClient(binding_string) + for code in [200, 202]: + mock_request = MockRequest( + expected_url=f"{RPC_BASE_URL}instances/{INSTANCE_ID}/rewind?reason={REASON}", + response=[202, ""]) + client._post_async_request = mock_request.post + result = await client.rewind(INSTANCE_ID, REASON) + assert result is None + +@pytest.mark.asyncio +async def test_rewind_throws_exception_during_404_410_and_500_errors(binding_string): + """Tests the behaviour of rewind under 'exception' http codes: 404, 410, 500""" + client = DurableOrchestrationClient(binding_string) + codes = [404, 410, 500] + exception_strs = [ + f"No instance with ID {INSTANCE_ID} found.", + "The rewind operation is only supported on failed orchestration instances.", + "Something went wrong" + ] + for http_code, expected_exception_str in zip(codes, exception_strs): + mock_request = MockRequest( + expected_url=f"{RPC_BASE_URL}instances/{INSTANCE_ID}/rewind?reason={REASON}", + response=[http_code, "Something went wrong"]) + client._post_async_request = mock_request.post + + with pytest.raises(Exception) as ex: + await client.rewind(INSTANCE_ID, REASON) + ex_message = str(ex.value) + assert ex_message == expected_exception_str From 369aea664bdc6aae6dfbac2eac667b4008ba1ebb Mon Sep 17 00:00:00 2001 From: David Justo Date: Fri, 11 Sep 2020 10:22:08 -0700 Subject: [PATCH 3/8] added docstring --- .../models/DurableOrchestrationClient.py | 40 ++++++++++++++----- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 22e0ab20..b3f263c3 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -1,6 +1,6 @@ import json from datetime import datetime -from typing import List, Any, Optional, Dict, Union, Awaitable +from typing import List, Any, Optional, Dict, Union from time import time from asyncio import sleep from urllib.parse import urlparse, quote @@ -546,15 +546,34 @@ def _get_raise_event_url( request_url += "?" + "&".join(query) return request_url - - async def rewind(self, - instance_id: str, - reason: str, - task_hub_name: Optional[str] = None, - connection_name: Optional[str] = None): + + async def rewind(self, + instance_id: str, + reason: str, + task_hub_name: Optional[str] = None, + connection_name: Optional[str] = None): + """Return / "rewind" a failed orchestration instance to a prior "healthy" state. + + Parameters + ---------- + instance_id: str + The ID of the orchestration instance to rewind. + reason: str + The reason for rewinding the orchestration instance. + task_hub_name: Optional[str] + The TaskHub of the orchestration to rewind + connection_name: Optional[str] + Name of the application setting containing the storage + connection string to use. + + Raises + ------ + Exception: + In case of a failure, it reports the reason for the exception + """ id_placeholder = self._orchestration_bindings.management_urls.copy()["id"] request_url: str = "" - if self._orchestration_bindings.rpc_base_url: ## RPCMANAGEMENT OPS?? + if self._orchestration_bindings.rpc_base_url: path = f"instances/{instance_id}/rewind?reason={reason}" query: List[str] = [] if not (task_hub_name is None): @@ -563,14 +582,13 @@ async def rewind(self, query.append(f"connection={connection_name}") if len(query) > 0: path += "&" + "&".join(query) - + request_url = f"{self._orchestration_bindings.rpc_base_url}" + path else: - # TODO: double check this path is safe request_url = self._orchestration_bindings.management_urls.\ replace(id_placeholder, instance_id).\ replace(self._reason_placeholder, reason) - + response = await self._post_async_request(request_url, None) status: int = response[0] if status == 200 or status == 202: From 37a559dafa6df2c9de4a77e9cf197ae93079a777 Mon Sep 17 00:00:00 2001 From: David Justo Date: Fri, 11 Sep 2020 15:27:55 -0700 Subject: [PATCH 4/8] dropping support for legacy endpoint --- tests/models/test_DurableOrchestrationClient.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/models/test_DurableOrchestrationClient.py b/tests/models/test_DurableOrchestrationClient.py index b3a27f92..ca9234ea 100644 --- a/tests/models/test_DurableOrchestrationClient.py +++ b/tests/models/test_DurableOrchestrationClient.py @@ -576,3 +576,18 @@ async def test_rewind_throws_exception_during_404_410_and_500_errors(binding_str await client.rewind(INSTANCE_ID, REASON) ex_message = str(ex.value) assert ex_message == expected_exception_str + +@pytest.mark.asyncio +async def test_rewind_with_no_rpc_endpoint(binding_string): + """Tests the behaviour of rewind without an RPC endpoint / under the legacy HTTP enepoin5.""" + client = DurableOrchestrationClient(binding_string) + mock_request = MockRequest( + expected_url=f"{RPC_BASE_URL}instances/{INSTANCE_ID}/rewind?reason={REASON}", + response=[-1, ""]) + client._post_async_request = mock_request.post + client._orchestration_bindings._rpc_base_url = None + expected_exception_str = "The Python SDK only supports RPC endpoints." + with pytest.raises(Exception) as ex: + await client.rewind(INSTANCE_ID, REASON) + ex_message = str(ex.value) + assert ex_message == expected_exception_str From f2f65af91ddadef2c5150059fd31190088fc74ec Mon Sep 17 00:00:00 2001 From: David Justo Date: Fri, 11 Sep 2020 15:29:34 -0700 Subject: [PATCH 5/8] updating durable client --- azure/durable_functions/models/DurableOrchestrationClient.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index b3f263c3..407e65e5 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -571,7 +571,6 @@ async def rewind(self, Exception: In case of a failure, it reports the reason for the exception """ - id_placeholder = self._orchestration_bindings.management_urls.copy()["id"] request_url: str = "" if self._orchestration_bindings.rpc_base_url: path = f"instances/{instance_id}/rewind?reason={reason}" @@ -585,9 +584,7 @@ async def rewind(self, request_url = f"{self._orchestration_bindings.rpc_base_url}" + path else: - request_url = self._orchestration_bindings.management_urls.\ - replace(id_placeholder, instance_id).\ - replace(self._reason_placeholder, reason) + raise Exception("The Python SDK only supports RPC endpoints.") response = await self._post_async_request(request_url, None) status: int = response[0] From 5d3d37a8f694670eb2ec9dccd199be9335910183 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 14 Sep 2020 09:26:14 -0700 Subject: [PATCH 6/8] PR feedback --- tests/models/test_DurableOrchestrationClient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/models/test_DurableOrchestrationClient.py b/tests/models/test_DurableOrchestrationClient.py index ca9234ea..53c3be16 100644 --- a/tests/models/test_DurableOrchestrationClient.py +++ b/tests/models/test_DurableOrchestrationClient.py @@ -551,7 +551,7 @@ async def test_rewind_works_under_200_and_200_http_codes(binding_string): for code in [200, 202]: mock_request = MockRequest( expected_url=f"{RPC_BASE_URL}instances/{INSTANCE_ID}/rewind?reason={REASON}", - response=[202, ""]) + response=[code, ""]) client._post_async_request = mock_request.post result = await client.rewind(INSTANCE_ID, REASON) assert result is None @@ -579,7 +579,7 @@ async def test_rewind_throws_exception_during_404_410_and_500_errors(binding_str @pytest.mark.asyncio async def test_rewind_with_no_rpc_endpoint(binding_string): - """Tests the behaviour of rewind without an RPC endpoint / under the legacy HTTP enepoin5.""" + """Tests the behaviour of rewind without an RPC endpoint / under the legacy HTTP endpoint.""" client = DurableOrchestrationClient(binding_string) mock_request = MockRequest( expected_url=f"{RPC_BASE_URL}instances/{INSTANCE_ID}/rewind?reason={REASON}", From cffb677e1b9f65ddd77ffe726589701174d91052 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 14 Sep 2020 09:35:16 -0700 Subject: [PATCH 7/8] PR feedback: typo and bad refactor --- azure/durable_functions/models/DurableOrchestrationClient.py | 3 ++- tests/models/test_DurableOrchestrationClient.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 6bdebfd8..d6fbf6d6 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -584,7 +584,8 @@ async def rewind(self, request_url = f"{self._orchestration_bindings.rpc_base_url}" + path else: - raise Exception("The Python SDK only supports RPC endpoints.") + raise Exception("The Python SDK only supports RPC endpoints." \ + + "Please remove the `localRpcEnabled` setting from host.json") response = await self._post_async_request(request_url, None) status: int = response[0] diff --git a/tests/models/test_DurableOrchestrationClient.py b/tests/models/test_DurableOrchestrationClient.py index 4ebfed22..6a877568 100644 --- a/tests/models/test_DurableOrchestrationClient.py +++ b/tests/models/test_DurableOrchestrationClient.py @@ -586,7 +586,8 @@ async def test_rewind_with_no_rpc_endpoint(binding_string): response=[-1, ""]) client._post_async_request = mock_request.post client._orchestration_bindings._rpc_base_url = None - expected_exception_str = "The Python SDK only supports RPC endpoints." + expected_exception_str = "The Python SDK only supports RPC endpoints."\ + + "Please remove the `localRpcEnabled` setting from host.json" with pytest.raises(Exception) as ex: await client.rewind(INSTANCE_ID, REASON) ex_message = str(ex.value) From 05712348d35bb8f88deb18fd087bcf84f88424e1 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 14 Sep 2020 09:39:48 -0700 Subject: [PATCH 8/8] linting fix --- azure/durable_functions/models/DurableOrchestrationClient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index d6fbf6d6..a94f5722 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -584,8 +584,8 @@ async def rewind(self, request_url = f"{self._orchestration_bindings.rpc_base_url}" + path else: - raise Exception("The Python SDK only supports RPC endpoints." \ - + "Please remove the `localRpcEnabled` setting from host.json") + raise Exception("The Python SDK only supports RPC endpoints." + + "Please remove the `localRpcEnabled` setting from host.json") response = await self._post_async_request(request_url, None) status: int = response[0]