Skip to content

Non-Deterministic workflow and Object reference not set #194

@janpiskur

Description

@janpiskur

Hi,

I write to you because I get the errors below and because it is hard to debug when the information is so sparse, so I hope you can help:

(1) Non-Deterministic workflow detected: TaskScheduledEvent: 0 TaskScheduled Hello
(2) Object reference not set to an instance of an object.

I can’t tell you when these errors happen because they seem to happen so randomly. Currently, I get the “Object reference not set to an instance of an object” error after my script has run for 12 minutes or so.

Basically, I have a Python script (Python version 3.8) that loads gps data from a datalake into a pandas dataframe. Then an algorithm does some calculations on the gps data (NonMovingGPS class not included here). The script works fine if the load isn’t too high and if it runs locally as well. In other words, these errors usually show up if I increase the load. So far, I have mitigated the error by scaling up my app service plan, but I have reached a limit now. The error I get:

{
name: "DurableFunctionsOrchestrator",
instanceId: "7b066caec5a54a30869830b42348b2d1",
runtimeStatus: "Failed",
input: null,
customStatus: null,
output: "Object reference not set to an instance of an object.",
createdTime: "2020-09-14T13:11:27Z",
lastUpdatedTime: "2020-09-14T13:38:43Z"
}

My http start:

import logging

import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    instance_id = await client.start_new(req.route_params["functionName"], None, None)

    logging.info(f"Started orchestration with ID = '{instance_id}'.")

    return client.create_check_status_response(req, instance_id)

Functions.json that belongs to http start:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "anonymous",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "route": "orchestrators/{functionName}",
      "methods": [
        "post",
        "get"
      ]
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    },
    {
      "name": "starter",
      "type": "orchestrationClient",
      "direction": "in"
    }
  ]
}

My orchestrator:

import logging
import json

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    result1 = yield context.call_activity('Hello', "2019-09-01:2019-09-03")
    return [result1]

main = df.Orchestrator.create(orchestrator_function)

Functions.json that belongs to the orchestrator:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "context",
      "type": "orchestrationTrigger",
      "direction": "in"
    }
  ]
}

My activity:

from azure.storage.filedatalake import DataLakeServiceClient
from datetime import date, timedelta
import pandas as pd
import os
import re
import logging
import datetime
from io import BytesIO

from .shared_code.class_non_moving_gps import NonMovingGPS

def get_dates_between(sdate, edate):

    sdate = datetime.datetime.strptime(sdate, '%Y-%m-%d')
    edate = datetime.datetime.strptime(edate, '%Y-%m-%d')
    
    delta = edate - sdate

    days_list = []
    for i in range(delta.days + 1):
        day = sdate + timedelta(days=i)
        days_list.append(day.strftime("%Y-%m-%d"))

    return days_list

def main(dates: str) -> str:
    token = dates.split(':')
    positive_list = get_dates_between(token[0], token[1])
    
    service_client = DataLakeServiceClient.from_connection_string(conn_str="SECRET", file_system_name="rps")
    file_system_client = service_client.get_file_system_client(file_system="rps")
    paths = file_system_client.get_paths(path="vehicle-position/")
    
    paths_str = ''
    final_df = pd.DataFrame(columns=['SentUtcTime','Latitude','Longitude','VehicleId'])
    counter = 0
    for path in paths:
        p = re.compile(r'^vehicle-position\/(\d{4})\/(\d{6})\/(\d{8})')
        m = p.match(str(os.path.dirname(path.name)))
        if m:
            year = m.group(1)
            month = m.group(2)[4:6]
            date = m.group(3)[6:8]
            date_str = year + '-' + month + '-' + date
            if date_str in positive_list:
                paths_str = paths_str + path.name + '\n'

                file_client = service_client.get_file_client(file_system="rps", file_path=path.name)
                download = file_client.download_file()
                content = download.readall()
                stream = BytesIO(content)
                df_temp = pd.read_csv(stream, compression='gzip')
                logging.info(f"{path.name}: {df_temp.shape}")
                
                df_temp = df_temp[df_temp['Latitude'].notna() & df_temp['Longitude'].notna() & df_temp['VehicleId'].notna() & df_temp['SentUtcTime'].notna()][['SentUtcTime','Latitude','Longitude','VehicleId']].reset_index(drop=True)
                df_temp['VehicleId'] = df_temp['VehicleId'].astype('int32')
                df_temp['SentUtcTime'] = pd.to_datetime(df_temp['SentUtcTime'])

                final_df = final_df.append(df_temp, ignore_index=True)

                counter = counter + 1
        if counter > 0:
            pass#break

    
    unique_vehicles = list(set(final_df.VehicleId.unique()))
    
    df_spots = pd.DataFrame(columns=['ParkDateTime', 'WakeupDateTime', 'DiffMin', 'Latitude', 'Longitude', 'VehicleNumber'])
    for vehicle in unique_vehicles:
        non_moving_gps = NonMovingGPS()
        tmp = non_moving_gps.loop_single_vehicle(final_df, vehicle)
        df_spots.append(tmp, ignore_index=True)
    return f"Hi {dates}! Here are your paths: {df_spots}"

Functions.json that belongs to the activity:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "dates",
      "type": "activityTrigger",
      "direction": "in"
    }
  ]
}

My host.json:

{
  "version": "2.0",
  "extensions": {
    "durableTask": {
      "hubName": "MyGarageTaskHub"
    }
  },
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[1.*, 2.0.0)"
  },
  "functionTimeout": "-1"
}

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions