-
Notifications
You must be signed in to change notification settings - Fork 58
Description
Hi,
I write to you because I get the errors below and because it is hard to debug when the information is so sparse, so I hope you can help:
(1) Non-Deterministic workflow detected: TaskScheduledEvent: 0 TaskScheduled Hello
(2) Object reference not set to an instance of an object.
I can’t tell you when these errors happen because they seem to happen so randomly. Currently, I get the “Object reference not set to an instance of an object” error after my script has run for 12 minutes or so.
Basically, I have a Python script (Python version 3.8) that loads gps data from a datalake into a pandas dataframe. Then an algorithm does some calculations on the gps data (NonMovingGPS class not included here). The script works fine if the load isn’t too high and if it runs locally as well. In other words, these errors usually show up if I increase the load. So far, I have mitigated the error by scaling up my app service plan, but I have reached a limit now. The error I get:
{
name: "DurableFunctionsOrchestrator",
instanceId: "7b066caec5a54a30869830b42348b2d1",
runtimeStatus: "Failed",
input: null,
customStatus: null,
output: "Object reference not set to an instance of an object.",
createdTime: "2020-09-14T13:11:27Z",
lastUpdatedTime: "2020-09-14T13:38:43Z"
}
My http start:
import logging
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new(req.route_params["functionName"], None, None)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
Functions.json that belongs to http start:
{
"scriptFile": "__init__.py",
"bindings": [
{
"authLevel": "anonymous",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"route": "orchestrators/{functionName}",
"methods": [
"post",
"get"
]
},
{
"name": "$return",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "orchestrationClient",
"direction": "in"
}
]
}
My orchestrator:
import logging
import json
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
result1 = yield context.call_activity('Hello', "2019-09-01:2019-09-03")
return [result1]
main = df.Orchestrator.create(orchestrator_function)
Functions.json that belongs to the orchestrator:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
My activity:
from azure.storage.filedatalake import DataLakeServiceClient
from datetime import date, timedelta
import pandas as pd
import os
import re
import logging
import datetime
from io import BytesIO
from .shared_code.class_non_moving_gps import NonMovingGPS
def get_dates_between(sdate, edate):
sdate = datetime.datetime.strptime(sdate, '%Y-%m-%d')
edate = datetime.datetime.strptime(edate, '%Y-%m-%d')
delta = edate - sdate
days_list = []
for i in range(delta.days + 1):
day = sdate + timedelta(days=i)
days_list.append(day.strftime("%Y-%m-%d"))
return days_list
def main(dates: str) -> str:
token = dates.split(':')
positive_list = get_dates_between(token[0], token[1])
service_client = DataLakeServiceClient.from_connection_string(conn_str="SECRET", file_system_name="rps")
file_system_client = service_client.get_file_system_client(file_system="rps")
paths = file_system_client.get_paths(path="vehicle-position/")
paths_str = ''
final_df = pd.DataFrame(columns=['SentUtcTime','Latitude','Longitude','VehicleId'])
counter = 0
for path in paths:
p = re.compile(r'^vehicle-position\/(\d{4})\/(\d{6})\/(\d{8})')
m = p.match(str(os.path.dirname(path.name)))
if m:
year = m.group(1)
month = m.group(2)[4:6]
date = m.group(3)[6:8]
date_str = year + '-' + month + '-' + date
if date_str in positive_list:
paths_str = paths_str + path.name + '\n'
file_client = service_client.get_file_client(file_system="rps", file_path=path.name)
download = file_client.download_file()
content = download.readall()
stream = BytesIO(content)
df_temp = pd.read_csv(stream, compression='gzip')
logging.info(f"{path.name}: {df_temp.shape}")
df_temp = df_temp[df_temp['Latitude'].notna() & df_temp['Longitude'].notna() & df_temp['VehicleId'].notna() & df_temp['SentUtcTime'].notna()][['SentUtcTime','Latitude','Longitude','VehicleId']].reset_index(drop=True)
df_temp['VehicleId'] = df_temp['VehicleId'].astype('int32')
df_temp['SentUtcTime'] = pd.to_datetime(df_temp['SentUtcTime'])
final_df = final_df.append(df_temp, ignore_index=True)
counter = counter + 1
if counter > 0:
pass#break
unique_vehicles = list(set(final_df.VehicleId.unique()))
df_spots = pd.DataFrame(columns=['ParkDateTime', 'WakeupDateTime', 'DiffMin', 'Latitude', 'Longitude', 'VehicleNumber'])
for vehicle in unique_vehicles:
non_moving_gps = NonMovingGPS()
tmp = non_moving_gps.loop_single_vehicle(final_df, vehicle)
df_spots.append(tmp, ignore_index=True)
return f"Hi {dates}! Here are your paths: {df_spots}"
Functions.json that belongs to the activity:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "dates",
"type": "activityTrigger",
"direction": "in"
}
]
}
My host.json:
{
"version": "2.0",
"extensions": {
"durableTask": {
"hubName": "MyGarageTaskHub"
}
},
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[1.*, 2.0.0)"
},
"functionTimeout": "-1"
}