-
Notifications
You must be signed in to change notification settings - Fork 58
Description
Hi,
I have a simple program where events are sent to an orchestrator using raise_event, the events on the message queues are never getting removed after wait_for_external_event is called . Any reason why?
async def getCurrentOrchestrationInstance(orchestrator_name: str, instance_id: str, client: df.DurableOrchestrationClient):
existing_instance = await client.get_status(instance_id)
if existing_instance.runtime_status in [df.OrchestrationRuntimeStatus.Completed,
df.OrchestrationRuntimeStatus.Failed,
df.OrchestrationRuntimeStatus.Terminated,
None]:
instance_id = await client.start_new(orchestrator_name, instance_id)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
return instance_id
else:
return instance_id
@app.function_name(name="DataProcessor")
@app.event_hub_message_trigger(arg_name="eventHubEvent", event_hub_name="test",
connection="EventHubConnectionStringTrigger")
@app.durable_client_input(client_name="client")
async def data_processor(eventHubEvent: func.EventHubEvent, client: df.DurableOrchestrationClient):
eventhub_event_body = eventHubEvent.get_body()
event=json.loads(eventhub_event_body)
instance_id = "HaulCycleOrchestrator"
instance_id = await getCurrentOrchestrationInstance("Orchestrator", instance_id, client)
await client.raise_event(instance_id, "get_input", event)
Orchestrator
@app.function_name(name="Orchestrator")
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext):
cache=context.get_input()
while True:
wait_event = yield context.wait_for_external_event('get_input')
logging.info("wait_event received")