-
Notifications
You must be signed in to change notification settings - Fork 58
Description
Description
I am using Fan-out/fan-in to do a two step calculation. The first step, "CalculateRouteProfile",
calls a third party API that returns data used in the second step "CalculateCo2FromRouteProfile".
The calculation is triggered by an HTTPStarter.
When running this for about 15'000 inputs, some of the calculations never seem to finish (most
of them do, though). The ones that do not finsh are stuck in the "Running" state.
Example of a finished calculation: c553159a6c994173bf6c4381376954dc. Examples of not finished
calculations follow below.
Expected behavior
All ~15'000 inputs are processed.
Actual behavior
Out of 754 orchestrators all but 14 eventually complete. The remaining 14 stay in "Running" state.
Relevant source code snippets
import logging
import azure.durable_functions as df
from azure.durable_functions.models.RetryOptions import RetryOptions
MAX_ROUTES_PER_POST_REQUEST = 100
FAN_OUT_CHUNK_SIZE = 25
def chunks(lst, n):
"""Yield successive n-sized chunks from list."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def orchestrator_function(context: df.DurableOrchestrationContext):
""" """
list_of_route_params = context.get_input()
logging.info(
f"client input in orchestrator function: {len(list_of_route_params)} items."
)
if len(list_of_route_params) > 0:
logging.info(
f"first item: {list_of_route_params[0]}."
)
custom_status = {
"msg": "just started",
"total_number_of_routes": len(list_of_route_params),
"current_chunk": None,
"routes_in_current_chunk": None
}
context.set_custom_status(custom_status)
out = [] # this will contain our final output
# We need to avoid that one call immediately try to calculate thousands of
# routes without limits. So we split the total list of input routes into
# chunks, with FAN_OUT_CHUNK_SIZE as the limit per chunk.
chunk_list = list(chunks(list_of_route_params, FAN_OUT_CHUNK_SIZE))
logging.info(
(f"I have {len(chunk_list)} chunks with chunk size {FAN_OUT_CHUNK_SIZE} "
f"for calculating {len(list_of_route_params)} route items")
)
custom_status["number_of_chunks"] = len(chunk_list)
context.set_custom_status(custom_status)
for idx, chunks_of_route_params in enumerate(chunk_list):
# this is a chained "fan-in-fan-out" call: first get the profiles,
# then calculate the profile and the emissions
custom_status["msg"] = "processing route profiles"
custom_status["routes_in_current_chunk"] = len(chunks_of_route_params)
custom_status["current_chunk"] = idx
context.set_custom_status(custom_status)
parallel_tasks = []
retry_options = RetryOptions(1000, 3)
for route_params in chunks_of_route_params:
# first calculate the routes including the profile
parallel_tasks.append(
context.call_activity_with_retry("CalculateRouteProfile",
retry_options, route_params)
)
chunk_profiles = yield context.task_all(parallel_tasks)
custom_status["msg"] = "processing from route profiles"
context.set_custom_status(custom_status)
parallel_tasks = []
for profile in chunk_profiles:
# calculate emission based on the profile
parallel_tasks.append(
context.call_activity(
"CalculateCo2FromRouteProfile",
{
"profile": profile["serialized_profile"],
"route_params": profile["route_params"]
},
)
)
out += yield context.task_all(parallel_tasks)
custom_status["msg"] = "done with processing"
custom_status["routes_in_current_chunk"] = len(chunk_list[-1])
custom_status["current_chunk"] = len(chunk_list)
context.set_custom_status(custom_status)
return out
main = df.Orchestrator.create(orchestrator_function)
host.json
:
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[2.*, 3.0.0)"
},
"extensions": {
"durableTask": {
"maxConcurrentActivityFunctions": 3,
"maxConcurrentOrchestratorFunctions": 1
}
}
}
Known workarounds
Using small batches of input (around 100 routes) seems to work reliably.
App Details
- Durable Functions extension version (e.g. v1.8.3): 1.3.0 (just called "Azure Functions" in VS Code)
- Azure Functions runtime version (1.0 or 2.0): 2.0
- Programming language used: Python
If deployed to Azure
- Timeframe issue observed: "createdTime": "2021-03-31T15:29:19Z"
- Function App name: co2-route-calculator-dev
- Function name(s): CalculateCo2FromRouteProfile, CalculateRouteProfile
- Azure region: Switzerland North
- Orchestration instance ID(s): a9ee8c72713644619b3e1d1eaeed7811, 1d986a195d1f4708be368e86c9e5cd06, adacc094cf934a68a5ae45f4562ec961, ...
- Azure storage account name: co2routecalculatordev