Skip to content

Commit 24dd298

Browse files
michaelpeng36Michael Peng
andauthored
Added support for external events and the ability to cancel an awaited durable timer (#527)
Co-authored-by: Michael Peng <[email protected]>
1 parent 9c93c5a commit 24dd298

File tree

24 files changed

+476
-54
lines changed

24 files changed

+476
-54
lines changed

examples/durable/DurableApp/HumanInteractionOrchestrator/run.ps1

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,23 @@ Write-Host 'HumanInteractionOrchestrator: started.'
66

77
$output = @()
88

9-
$duration = $Context.Input.Duration
9+
$duration = New-TimeSpan -Seconds $Context.Input.Duration
10+
$managerId = $Context.Input.ManagerId
11+
$skipManagerId = $Context.Input.SkipManagerId
1012

11-
Invoke-ActivityFunction -FunctionName "RequestApproval"
13+
$output += Invoke-ActivityFunction -FunctionName "RequestApproval" -Input $managerId
1214

1315
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
1416
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
1517

1618
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
1719

1820
if ($approvalEvent -eq $firstEvent) {
19-
Stop-DurableTimerTask -TimerTask $durableTimeout
21+
Stop-DurableTimerTask -Task $durableTimeoutEvent
2022
$output += Invoke-ActivityFunction -FunctionName "ProcessApproval" -Input $approvalEvent
2123
}
2224
else {
23-
$output += Invoke-ActivityFunction -FunctionName "EscalateApproval"
25+
$output += Invoke-ActivityFunction -FunctionName "EscalateApproval" -Input $skipManagerId
2426
}
2527

2628
Write-Host 'HumanInteractionOrchestrator: finished.'

examples/durable/DurableApp/HumanInteractionStart/run.ps1

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ param($Request, $TriggerMetadata)
44

55
Write-Host 'HumanInteractionStart started'
66

7-
$OrchestratorInputs = @{ Duration = 10 }
7+
$OrchestratorInputs = @{ Duration = 45; ManagerId = 1; SkipManagerId = 2 }
88

99
$InstanceId = Start-NewOrchestration -FunctionName 'HumanInteractionOrchestrator' -InputObject $OrchestratorInputs
1010
Write-Host "Started orchestration with ID = '$InstanceId'"
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
param($name)
22

3-
Write-Host "Approval requested."
3+
"Approval requested."

examples/durable/DurableApp/local.settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"Values": {
44
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
55
"FUNCTIONS_WORKER_RUNTIME": "powershell",
6+
"FUNCTIONS_WORKER_RUNTIME_VERSION": 7,
67
"PSWorkerInProcConcurrencyUpperBound": 10
78
}
89
}

src/Durable/ActionType.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,15 @@ internal enum ActionType
4444
/// Wait for an external event.
4545
/// </summary>
4646
WaitForExternalEvent = 6,
47+
48+
/// <summary>
49+
/// Call an entity function.
50+
/// </summary>
51+
CallEntity = 7,
52+
53+
/// <summary>
54+
/// Make an Http call.
55+
/// </summary>
56+
CallHttp = 8,
4757
}
4858
}

src/Durable/ActivityInvocationTask.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ public class ActivityInvocationTask : DurableTask
1717
{
1818
internal string FunctionName { get; }
1919

20-
private object FunctionInput { get; }
20+
private object Input { get; }
2121

2222
internal ActivityInvocationTask(string functionName, object functionInput)
2323
{
2424
FunctionName = functionName;
25-
FunctionInput = functionInput;
25+
Input = functionInput;
2626
}
2727

2828
internal override HistoryEvent GetScheduledHistoryEvent(OrchestrationContext context)
@@ -44,7 +44,7 @@ internal override HistoryEvent GetCompletedHistoryEvent(OrchestrationContext con
4444

4545
internal override OrchestrationAction CreateOrchestrationAction()
4646
{
47-
return new CallActivityAction(FunctionName, FunctionInput);
47+
return new CallActivityAction(FunctionName, Input);
4848
}
4949

5050
internal static void ValidateTask(ActivityInvocationTask task, IEnumerable<AzFunctionInfo> loadedFunctions)

src/Durable/CreateDurableTimerAction.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,17 @@ internal class CreateDurableTimerAction : OrchestrationAction
1616
/// The DateTime at which the timer will fire.
1717
/// </summary>
1818
public readonly DateTime FireAt;
19-
20-
public CreateDurableTimerAction (DateTime fireAt)
19+
20+
/// <summary>
21+
/// Indicates whether the timer has been canceled.
22+
/// </summary>
23+
public bool IsCanceled;
24+
25+
public CreateDurableTimerAction (DateTime fireAt, bool isCanceled = false)
2126
: base(ActionType.CreateTimer)
2227
{
2328
FireAt = fireAt;
29+
IsCanceled = isCanceled;
2430
}
2531
}
2632
}

src/Durable/DurableTaskHandler.cs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,26 @@ public void StopAndInitiateDurableTaskOrReplay(
3131
var scheduledHistoryEvent = task.GetScheduledHistoryEvent(context);
3232
var completedHistoryEvent = task.GetCompletedHistoryEvent(context, scheduledHistoryEvent);
3333

34-
// Assume that the task scheduled must have completed if NoWait is not present and the orchestrator restarted
35-
if (scheduledHistoryEvent == null)
36-
{
37-
InitiateAndWaitForStop(context);
38-
}
39-
34+
// We must check if the task has been completed first, otherwise external events will always wait upon replays
4035
if (completedHistoryEvent != null)
4136
{
4237
CurrentUtcDateTimeUpdater.UpdateCurrentUtcDateTime(context);
43-
44-
scheduledHistoryEvent.IsProcessed = true;
45-
completedHistoryEvent.IsProcessed = true;
4638

4739
if (GetEventResult(completedHistoryEvent) != null)
4840
{
4941
output(GetEventResult(completedHistoryEvent));
5042
}
43+
44+
if (scheduledHistoryEvent != null)
45+
{
46+
scheduledHistoryEvent.IsProcessed = true;
47+
}
48+
49+
completedHistoryEvent.IsProcessed = true;
50+
}
51+
else if (scheduledHistoryEvent == null)
52+
{
53+
InitiateAndWaitForStop(context);
5154
}
5255
}
5356
}
@@ -69,7 +72,11 @@ public void WaitAll(
6972
break;
7073
}
7174

72-
scheduledHistoryEvent.IsProcessed = true;
75+
if (scheduledHistoryEvent != null)
76+
{
77+
scheduledHistoryEvent.IsProcessed = true;
78+
}
79+
7380
completedHistoryEvent.IsProcessed = true;
7481
completedEvents.Add(completedHistoryEvent);
7582
}
@@ -108,11 +115,13 @@ public void WaitAny(
108115
var scheduledHistoryEvent = task.GetScheduledHistoryEvent(context);
109116
var completedHistoryEvent = task.GetCompletedHistoryEvent(context, scheduledHistoryEvent);
110117

118+
// We must mark this event as processed even if it has not completed; subsequent completed history events
119+
// corresponding to an awaited task will not have their IsProcessed value ever set to true.
111120
if (scheduledHistoryEvent != null)
112121
{
113122
scheduledHistoryEvent.IsProcessed = true;
114123
}
115-
124+
116125
if (completedHistoryEvent != null)
117126
{
118127
completedTasks.Add(task);
@@ -149,10 +158,17 @@ public void Stop()
149158

150159
private static object GetEventResult(HistoryEvent historyEvent)
151160
{
152-
// Output the result if and only if the history event is a completed activity function
153-
return historyEvent.EventType != HistoryEventType.TaskCompleted
154-
? null
155-
: TypeExtensions.ConvertFromJson(historyEvent.Result);
161+
// Output the result if and only if the history event is a completed activity function or a raised external event
162+
163+
if (historyEvent.EventType == HistoryEventType.TaskCompleted)
164+
{
165+
return TypeExtensions.ConvertFromJson(historyEvent.Result);
166+
}
167+
else if (historyEvent.EventType == HistoryEventType.EventRaised)
168+
{
169+
return TypeExtensions.ConvertFromJson(historyEvent.Input);
170+
}
171+
return null;
156172
}
157173

158174
private void InitiateAndWaitForStop(OrchestrationContext context)

src/Durable/DurableTimerTask.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
1414
// All DurableTimerTasks must be complete or canceled for the orchestration to complete
1515
public class DurableTimerTask : DurableTask
1616
{
17-
public DateTime FireAt { get; set; }
17+
internal DateTime FireAt { get; }
18+
19+
private CreateDurableTimerAction Action { get; }
1820

1921
// Only incomplete, uncanceled DurableTimerTasks should be created
2022
internal DurableTimerTask(
2123
DateTime fireAt)
2224
{
2325
FireAt = fireAt;
26+
Action = new CreateDurableTimerAction(FireAt);
2427
}
2528

2629
internal override HistoryEvent GetScheduledHistoryEvent(OrchestrationContext context)
@@ -42,7 +45,13 @@ internal override HistoryEvent GetCompletedHistoryEvent(OrchestrationContext con
4245

4346
internal override OrchestrationAction CreateOrchestrationAction()
4447
{
45-
return new CreateDurableTimerAction(FireAt);
48+
return Action;
49+
}
50+
51+
// Indicates that the task has been canceled; without this, the orchestration will not terminate until the timer has expired
52+
internal void Cancel()
53+
{
54+
Action.IsCanceled = true;
4655
}
4756
}
4857
}

src/Durable/ExternalEventAction.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
//
2+
// Copyright (c) Microsoft. All rights reserved.
3+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
4+
//
5+
6+
namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
7+
{
8+
/// <summary>
9+
/// An orchestration action that represents listening for an external event
10+
/// </summary>
11+
internal class ExternalEventAction : OrchestrationAction
12+
{
13+
/// <summary>
14+
/// The external event name.
15+
/// </summary>
16+
public readonly string ExternalEventName;
17+
18+
/// <summary>
19+
/// Reason for the action. This field is necessary for the Durable extension to recognize the ExternalEventAction.
20+
/// </summary>
21+
public readonly string Reason = "ExternalEvent";
22+
23+
public ExternalEventAction(string externalEventName)
24+
: base(ActionType.WaitForExternalEvent)
25+
{
26+
ExternalEventName = externalEventName;
27+
}
28+
}
29+
}

0 commit comments

Comments
 (0)