Skip to content

Orchestrations stuck in running state #280

@multipleofzero

Description

@multipleofzero

Description

I am using Fan-out/fan-in to do a two step calculation. The first step, "CalculateRouteProfile",
calls a third party API that returns data used in the second step "CalculateCo2FromRouteProfile".

The calculation is triggered by an HTTPStarter.

When running this for about 15'000 inputs, some of the calculations never seem to finish (most
of them do, though). The ones that do not finsh are stuck in the "Running" state.

Example of a finished calculation: c553159a6c994173bf6c4381376954dc. Examples of not finished
calculations follow below.

Expected behavior

All ~15'000 inputs are processed.

Actual behavior

Out of 754 orchestrators all but 14 eventually complete. The remaining 14 stay in "Running" state.

Relevant source code snippets

import logging
import azure.durable_functions as df
from azure.durable_functions.models.RetryOptions import RetryOptions


MAX_ROUTES_PER_POST_REQUEST = 100
FAN_OUT_CHUNK_SIZE = 25


def chunks(lst, n):
    """Yield successive n-sized chunks from list."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]


def orchestrator_function(context: df.DurableOrchestrationContext):
    """ """
    list_of_route_params = context.get_input()
    logging.info(
        f"client input in orchestrator function: {len(list_of_route_params)} items."
    )
    if len(list_of_route_params) > 0:
        logging.info(
            f"first item: {list_of_route_params[0]}."
        )
    custom_status = {
        "msg": "just started",
        "total_number_of_routes": len(list_of_route_params),
        "current_chunk": None,
        "routes_in_current_chunk": None
    }
    context.set_custom_status(custom_status)

    out = []  # this will contain our final output
    # We need to avoid that one call immediately try to calculate thousands of
    # routes without limits.  So we split the total list of input routes into
    # chunks, with FAN_OUT_CHUNK_SIZE as the limit per chunk.
    chunk_list = list(chunks(list_of_route_params, FAN_OUT_CHUNK_SIZE))
    logging.info(
        (f"I have {len(chunk_list)} chunks  with chunk size {FAN_OUT_CHUNK_SIZE} "
         f"for calculating {len(list_of_route_params)} route items")
    )
    custom_status["number_of_chunks"] = len(chunk_list)
    context.set_custom_status(custom_status)
    for idx, chunks_of_route_params in enumerate(chunk_list):
        # this is a chained "fan-in-fan-out" call: first get the profiles,
        # then calculate the profile and the emissions

        custom_status["msg"] = "processing route profiles"
        custom_status["routes_in_current_chunk"] = len(chunks_of_route_params)
        custom_status["current_chunk"] = idx
        context.set_custom_status(custom_status)

        parallel_tasks = []
        retry_options = RetryOptions(1000, 3)
        for route_params in chunks_of_route_params:
            # first calculate the routes including the profile
            parallel_tasks.append(
                context.call_activity_with_retry("CalculateRouteProfile",
                                                 retry_options, route_params)
            )
        chunk_profiles = yield context.task_all(parallel_tasks)

        custom_status["msg"] = "processing from route profiles"
        context.set_custom_status(custom_status)
        parallel_tasks = []
        for profile in chunk_profiles:
            # calculate emission based on the profile
            parallel_tasks.append(
                context.call_activity(
                    "CalculateCo2FromRouteProfile",
                    {
                        "profile": profile["serialized_profile"],
                        "route_params": profile["route_params"]
                    },
                )
            )
        out += yield context.task_all(parallel_tasks)

    custom_status["msg"] = "done with processing"
    custom_status["routes_in_current_chunk"] = len(chunk_list[-1])
    custom_status["current_chunk"] = len(chunk_list)
    context.set_custom_status(custom_status)

    return out


main = df.Orchestrator.create(orchestrator_function)

host.json:

{
    "version": "2.0",
    "logging": {
        "applicationInsights": {
            "samplingSettings": {
                "isEnabled": true,
                "excludedTypes": "Request"
            }
        }
    },
    "extensionBundle": {
        "id": "Microsoft.Azure.Functions.ExtensionBundle",
        "version": "[2.*, 3.0.0)"
    },
    "extensions": {
        "durableTask": {
            "maxConcurrentActivityFunctions": 3,
            "maxConcurrentOrchestratorFunctions": 1
        }
    }
}

Known workarounds

Using small batches of input (around 100 routes) seems to work reliably.

App Details

  • Durable Functions extension version (e.g. v1.8.3): 1.3.0 (just called "Azure Functions" in VS Code)
  • Azure Functions runtime version (1.0 or 2.0): 2.0
  • Programming language used: Python

If deployed to Azure

  • Timeframe issue observed: "createdTime": "2021-03-31T15:29:19Z"
  • Function App name: co2-route-calculator-dev
  • Function name(s): CalculateCo2FromRouteProfile, CalculateRouteProfile
  • Azure region: Switzerland North
  • Orchestration instance ID(s): a9ee8c72713644619b3e1d1eaeed7811, 1d986a195d1f4708be368e86c9e5cd06, adacc094cf934a68a5ae45f4562ec961, ...
  • Azure storage account name: co2routecalculatordev

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions