From ea90e02c391f942ddd180c865a2b7e33f7f4af01 Mon Sep 17 00:00:00 2001 From: pragnagopa Date: Tue, 16 Apr 2019 17:41:53 -0700 Subject: [PATCH 1/6] Take1 --- src/Eventing/EventSources.cs | 13 ++++ src/Eventing/IScriptEventManager.cs | 12 ++++ src/Eventing/Rpc/InboundEvent.cs | 17 ++++++ src/Eventing/Rpc/OutboundEvent.cs | 13 ++++ src/Eventing/Rpc/RpcChannelEvent.cs | 19 ++++++ src/Eventing/Rpc/RpcEvent.cs | 31 ++++++++++ src/Eventing/Rpc/RpcWriteEvent.cs | 14 +++++ src/Eventing/ScriptEvent.cs | 18 ++++++ src/Eventing/ScriptEventManager.cs | 51 ++++++++++++++++ src/Messaging/MessagingStream.cs | 23 +++++--- ...ft.Azure.Functions.PowerShellWorker.csproj | 1 + src/RequestProcessor.cs | 59 +++++++++++++------ src/Worker.cs | 2 +- 13 files changed, 246 insertions(+), 27 deletions(-) create mode 100644 src/Eventing/EventSources.cs create mode 100644 src/Eventing/IScriptEventManager.cs create mode 100644 src/Eventing/Rpc/InboundEvent.cs create mode 100644 src/Eventing/Rpc/OutboundEvent.cs create mode 100644 src/Eventing/Rpc/RpcChannelEvent.cs create mode 100644 src/Eventing/Rpc/RpcEvent.cs create mode 100644 src/Eventing/Rpc/RpcWriteEvent.cs create mode 100644 src/Eventing/ScriptEvent.cs create mode 100644 src/Eventing/ScriptEventManager.cs diff --git a/src/Eventing/EventSources.cs b/src/Eventing/EventSources.cs new file mode 100644 index 00000000..a392d8ea --- /dev/null +++ b/src/Eventing/EventSources.cs @@ -0,0 +1,13 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal static class EventSources + { + public const string ScriptFiles = "ScriptFiles"; + public const string Rpc = "Rpc"; + public const string Worker = "Worker"; + public const string WorkerProcess = "WorkerProcess"; + } +} diff --git a/src/Eventing/IScriptEventManager.cs b/src/Eventing/IScriptEventManager.cs new file mode 100644 index 00000000..8490a756 --- /dev/null +++ b/src/Eventing/IScriptEventManager.cs @@ -0,0 +1,12 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal interface IScriptEventManager : IObservable + { + void Publish(ScriptEvent scriptEvent); + } +} diff --git a/src/Eventing/Rpc/InboundEvent.cs b/src/Eventing/Rpc/InboundEvent.cs new file mode 100644 index 00000000..35630798 --- /dev/null +++ b/src/Eventing/Rpc/InboundEvent.cs @@ -0,0 +1,17 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Grpc.Core; +using Microsoft.Azure.WebJobs.Script.Grpc.Messages; + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal class InboundEvent : RpcEvent + { + public IAsyncStreamReader requestStream; + + public InboundEvent(string workerId, StreamingMessage message) : base(workerId, message, MessageOrigin.Worker) + { + } + } +} diff --git a/src/Eventing/Rpc/OutboundEvent.cs b/src/Eventing/Rpc/OutboundEvent.cs new file mode 100644 index 00000000..ded58db6 --- /dev/null +++ b/src/Eventing/Rpc/OutboundEvent.cs @@ -0,0 +1,13 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +using Microsoft.Azure.WebJobs.Script.Grpc.Messages; + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal class OutboundEvent : RpcEvent + { + public OutboundEvent(string workerId, StreamingMessage message) : base(workerId, message, MessageOrigin.Host) + { + } + } +} diff --git a/src/Eventing/Rpc/RpcChannelEvent.cs b/src/Eventing/Rpc/RpcChannelEvent.cs new file mode 100644 index 00000000..b4916939 --- /dev/null +++ b/src/Eventing/Rpc/RpcChannelEvent.cs @@ -0,0 +1,19 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using Microsoft.Azure.WebJobs.Script.Grpc.Messages; + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal class RpcChannelEvent : ScriptEvent + { + internal RpcChannelEvent(string workerId) + : base(nameof(RpcChannelEvent), EventSources.Worker) + { + WorkerId = workerId ?? throw new ArgumentNullException(nameof(workerId)); + } + + internal string WorkerId { get; private set; } + } +} \ No newline at end of file diff --git a/src/Eventing/Rpc/RpcEvent.cs b/src/Eventing/Rpc/RpcEvent.cs new file mode 100644 index 00000000..d4f4f863 --- /dev/null +++ b/src/Eventing/Rpc/RpcEvent.cs @@ -0,0 +1,31 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +using Microsoft.Azure.WebJobs.Script.Grpc.Messages; + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal class RpcEvent : ScriptEvent + { + internal RpcEvent(string workerId, StreamingMessage message, MessageOrigin origin = MessageOrigin.Host) + : base(message.ContentCase.ToString(), EventSources.Rpc) + { + Message = message; + Origin = origin; + WorkerId = workerId; + } + + public enum MessageOrigin + { + Worker, + Host + } + + public MessageOrigin Origin { get; } + + public StreamingMessage.ContentOneofCase MessageType => Message.ContentCase; + + public string WorkerId { get; } + + public StreamingMessage Message { get; } + } +} diff --git a/src/Eventing/Rpc/RpcWriteEvent.cs b/src/Eventing/Rpc/RpcWriteEvent.cs new file mode 100644 index 00000000..8eb4b3c0 --- /dev/null +++ b/src/Eventing/Rpc/RpcWriteEvent.cs @@ -0,0 +1,14 @@ +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal class RpcWriteEvent : ScriptEvent + { + public RpcWriteEvent(string workerId, string invocationId): base(nameof(RpcChannelEvent), EventSources.Worker) + { + InvocationId = invocationId; + WorkerId = workerId; + } + + public string InvocationId { get; } + public string WorkerId { get; } + } +} diff --git a/src/Eventing/ScriptEvent.cs b/src/Eventing/ScriptEvent.cs new file mode 100644 index 00000000..14b683c9 --- /dev/null +++ b/src/Eventing/ScriptEvent.cs @@ -0,0 +1,18 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal class ScriptEvent + { + public ScriptEvent(string name, string source) + { + Name = name; + Source = source; + } + + public string Name { get; } + + public string Source { get; } + } +} diff --git a/src/Eventing/ScriptEventManager.cs b/src/Eventing/ScriptEventManager.cs new file mode 100644 index 00000000..0a36bc8a --- /dev/null +++ b/src/Eventing/ScriptEventManager.cs @@ -0,0 +1,51 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Reactive.Subjects; + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal sealed class ScriptEventManager : IScriptEventManager, IDisposable + { + private readonly Subject _subject = new Subject(); + private bool _disposed = false; + + public void Publish(ScriptEvent scriptEvent) + { + ThrowIfDisposed(); + + _subject.OnNext(scriptEvent); + } + + public IDisposable Subscribe(IObserver observer) + { + ThrowIfDisposed(); + + return _subject.Subscribe(observer); + } + + private void ThrowIfDisposed() + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(ScriptEventManager)); + } + } + + private void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + _subject.Dispose(); + } + + _disposed = true; + } + } + + public void Dispose() => Dispose(true); + } +} diff --git a/src/Messaging/MessagingStream.cs b/src/Messaging/MessagingStream.cs index 750afde9..faffba91 100644 --- a/src/Messaging/MessagingStream.cs +++ b/src/Messaging/MessagingStream.cs @@ -4,6 +4,7 @@ // using System; +using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; @@ -15,7 +16,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Messaging internal class MessagingStream { private readonly AsyncDuplexStreamingCall _call; - private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(initialCount: 1, maxCount: 1); + private BlockingCollection _blockingCollectionQueue = new BlockingCollection(); internal MessagingStream(string host, int port) { @@ -28,6 +29,11 @@ internal MessagingStream(string host, int port) /// internal StreamingMessage GetCurrentMessage() => _call.ResponseStream.Current; + internal void AddToBlockingQueue(StreamingMessage streamingMessage) + { + _blockingCollectionQueue.Add(streamingMessage); + } + /// /// Move to the next message. /// @@ -43,15 +49,14 @@ internal MessagingStream(string host, int port) /// private async Task WriteImplAsync(StreamingMessage message) { - try - { - await _semaphoreSlim.WaitAsync(); - await _call.RequestStream.WriteAsync(message); - } - finally + var consumer = Task.Run(async () => { - _semaphoreSlim.Release(); - } + foreach (var rpcWriteMsg in _blockingCollectionQueue.GetConsumingEnumerable()) + { + await _call.RequestStream.WriteAsync(rpcWriteMsg); + } + }); + await consumer; } } } diff --git a/src/Microsoft.Azure.Functions.PowerShellWorker.csproj b/src/Microsoft.Azure.Functions.PowerShellWorker.csproj index 683b0e8a..749d003c 100644 --- a/src/Microsoft.Azure.Functions.PowerShellWorker.csproj +++ b/src/Microsoft.Azure.Functions.PowerShellWorker.csproj @@ -25,6 +25,7 @@ Licensed under the MIT license. See LICENSE file in the project root for full li + diff --git a/src/RequestProcessor.cs b/src/RequestProcessor.cs index cd9760f1..5676219b 100644 --- a/src/RequestProcessor.cs +++ b/src/RequestProcessor.cs @@ -15,6 +15,9 @@ using Microsoft.Azure.Functions.PowerShellWorker.DependencyManagement; using Microsoft.Azure.WebJobs.Script.Grpc.Messages; using LogLevel = Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types.Level; +using System.Reactive.Linq; +using System.Reactive.Concurrency; +using System.Collections.Concurrent; namespace Microsoft.Azure.Functions.PowerShellWorker { @@ -32,15 +35,31 @@ internal class RequestProcessor // Indicate whether the FunctionApp has been initialized. private bool _isFunctionAppInitialized; + private IScriptEventManager _eventManager; + private IObservable _inboundWorkerEvents; + IDictionary _outboundEventSubscriptions = new Dictionary(); + private List _eventSubscriptions = new List(); + private string _workerId; + private BlockingCollection _blockingCollectionQueue = new BlockingCollection(); + private Dictionary> _requestHandlers = new Dictionary>(); - internal RequestProcessor(MessagingStream msgStream) + internal RequestProcessor(MessagingStream msgStream, string workerId) { _msgStream = msgStream; _powershellPool = new PowerShellManagerPool(msgStream); _functionLoader = new FunctionLoader(); _dependencyManager = new DependencyManager(); + _eventManager = new ScriptEventManager(); + _inboundWorkerEvents = _eventManager.OfType() + .ObserveOn(NewThreadScheduler.Default) + .Where(msg => msg.WorkerId == _workerId); + _workerId = workerId; + + _eventSubscriptions.Add(_inboundWorkerEvents + .ObserveOn(NewThreadScheduler.Default) + .Subscribe((msg) => InboundEventHandler(msg))); // Host sends capabilities/init data to worker _requestHandlers.Add(StreamingMessage.ContentOneofCase.WorkerInitRequest, ProcessWorkerInitRequest); @@ -68,27 +87,33 @@ internal RequestProcessor(MessagingStream msgStream) _requestHandlers.Add(StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadRequest, ProcessFunctionEnvironmentReloadRequest); } + internal void InboundEventHandler(InboundEvent serverMessage) + { + StreamingMessage response = null; + StreamingMessage request = serverMessage.Message; + if (_requestHandlers.TryGetValue(request.ContentCase, out Func requestFunc)) + { + response = requestFunc(request); + } + else + { + RpcLogger.WriteSystemLog(string.Format(PowerShellWorkerStrings.UnsupportedMessage, request.ContentCase)); + } + + if (response != null) + { + _blockingCollectionQueue.Add(response); + _msgStream.Write(response); + } + } + internal async Task ProcessRequestLoop() { - StreamingMessage request, response; + StreamingMessage request; while (await _msgStream.MoveNext()) { request = _msgStream.GetCurrentMessage(); - - if (_requestHandlers.TryGetValue(request.ContentCase, out Func requestFunc)) - { - response = requestFunc(request); - } - else - { - RpcLogger.WriteSystemLog(string.Format(PowerShellWorkerStrings.UnsupportedMessage, request.ContentCase)); - continue; - } - - if (response != null) - { - _msgStream.Write(response); - } + _eventManager.Publish(new InboundEvent(_workerId, request)); } } diff --git a/src/Worker.cs b/src/Worker.cs index 75cd8051..c09ffa8c 100644 --- a/src/Worker.cs +++ b/src/Worker.cs @@ -30,7 +30,7 @@ public async static Task Main(string[] args) .WithNotParsed(err => Environment.Exit(1)); var msgStream = new MessagingStream(arguments.Host, arguments.Port); - var requestProcessor = new RequestProcessor(msgStream); + var requestProcessor = new RequestProcessor(msgStream, arguments.WorkerId); // Send StartStream message var startedMessage = new StreamingMessage() { From c14dbb62c422f2d391c17b1688d1e402c4b559d2 Mon Sep 17 00:00:00 2001 From: pragnagopa Date: Tue, 16 Apr 2019 17:44:43 -0700 Subject: [PATCH 2/6] take 2 --- src/RequestProcessor.cs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/RequestProcessor.cs b/src/RequestProcessor.cs index 5676219b..060a1ce4 100644 --- a/src/RequestProcessor.cs +++ b/src/RequestProcessor.cs @@ -52,13 +52,10 @@ internal RequestProcessor(MessagingStream msgStream, string workerId) _functionLoader = new FunctionLoader(); _dependencyManager = new DependencyManager(); _eventManager = new ScriptEventManager(); - _inboundWorkerEvents = _eventManager.OfType() - .ObserveOn(NewThreadScheduler.Default) - .Where(msg => msg.WorkerId == _workerId); _workerId = workerId; - - _eventSubscriptions.Add(_inboundWorkerEvents + _eventManager.OfType() .ObserveOn(NewThreadScheduler.Default) + .Where(msg => msg.WorkerId == _workerId) .Subscribe((msg) => InboundEventHandler(msg))); // Host sends capabilities/init data to worker From 196d0f93c45cef05c58b2ed41e49a17f1dfed678 Mon Sep 17 00:00:00 2001 From: pragnagopa Date: Tue, 16 Apr 2019 19:17:36 -0700 Subject: [PATCH 3/6] Take 3 --- src/Logging/RpcLogger.cs | 2 +- src/Messaging/MessagingStream.cs | 14 +++++++------- src/RequestProcessor.cs | 11 +++-------- src/Worker.cs | 9 +++++---- 4 files changed, 16 insertions(+), 20 deletions(-) diff --git a/src/Logging/RpcLogger.cs b/src/Logging/RpcLogger.cs index 97e5eeef..422e55f2 100644 --- a/src/Logging/RpcLogger.cs +++ b/src/Logging/RpcLogger.cs @@ -55,7 +55,7 @@ public void Log(LogLevel logLevel, string message, Exception exception = null, b } }; - _msgStream.Write(logMessage); + _msgStream.AddToBlockingQueue(logMessage); } else { diff --git a/src/Messaging/MessagingStream.cs b/src/Messaging/MessagingStream.cs index faffba91..71a9afdc 100644 --- a/src/Messaging/MessagingStream.cs +++ b/src/Messaging/MessagingStream.cs @@ -39,15 +39,10 @@ internal void AddToBlockingQueue(StreamingMessage streamingMessage) /// internal async Task MoveNext() => await _call.ResponseStream.MoveNext(CancellationToken.None); - /// - /// Write the outgoing message. - /// - internal void Write(StreamingMessage message) => WriteImplAsync(message).ConfigureAwait(false); - /// /// Take a message from the buffer and write to the gRPC channel. /// - private async Task WriteImplAsync(StreamingMessage message) + internal Task Write() { var consumer = Task.Run(async () => { @@ -56,7 +51,12 @@ private async Task WriteImplAsync(StreamingMessage message) await _call.RequestStream.WriteAsync(rpcWriteMsg); } }); - await consumer; + return consumer; + } + + internal Task Write(StreamingMessage msg) + { + return _call.RequestStream.WriteAsync(msg); } } } diff --git a/src/RequestProcessor.cs b/src/RequestProcessor.cs index 060a1ce4..2930ec54 100644 --- a/src/RequestProcessor.cs +++ b/src/RequestProcessor.cs @@ -36,11 +36,7 @@ internal class RequestProcessor private bool _isFunctionAppInitialized; private IScriptEventManager _eventManager; - private IObservable _inboundWorkerEvents; - IDictionary _outboundEventSubscriptions = new Dictionary(); - private List _eventSubscriptions = new List(); private string _workerId; - private BlockingCollection _blockingCollectionQueue = new BlockingCollection(); private Dictionary> _requestHandlers = new Dictionary>(); @@ -56,7 +52,7 @@ internal RequestProcessor(MessagingStream msgStream, string workerId) _eventManager.OfType() .ObserveOn(NewThreadScheduler.Default) .Where(msg => msg.WorkerId == _workerId) - .Subscribe((msg) => InboundEventHandler(msg))); + .Subscribe((msg) => InboundEventHandler(msg)); // Host sends capabilities/init data to worker _requestHandlers.Add(StreamingMessage.ContentOneofCase.WorkerInitRequest, ProcessWorkerInitRequest); @@ -99,8 +95,7 @@ internal void InboundEventHandler(InboundEvent serverMessage) if (response != null) { - _blockingCollectionQueue.Add(response); - _msgStream.Write(response); + _msgStream.AddToBlockingQueue(response); } } @@ -295,7 +290,7 @@ private void ProcessInvocationRequestImpl(StreamingMessage request, AzFunctionIn _powershellPool.ReclaimUsedWorker(psManager); } - _msgStream.Write(response); + _msgStream.AddToBlockingQueue(response); } internal StreamingMessage ProcessInvocationCancelRequest(StreamingMessage request) diff --git a/src/Worker.cs b/src/Worker.cs index c09ffa8c..e68071f0 100644 --- a/src/Worker.cs +++ b/src/Worker.cs @@ -11,6 +11,7 @@ using Microsoft.Azure.Functions.PowerShellWorker.Messaging; using Microsoft.Azure.Functions.PowerShellWorker.Utility; using Microsoft.Azure.WebJobs.Script.Grpc.Messages; +using System.Threading; namespace Microsoft.Azure.Functions.PowerShellWorker { @@ -28,7 +29,6 @@ public async static Task Main(string[] args) Parser.Default.ParseArguments(args) .WithParsed(ops => arguments = ops) .WithNotParsed(err => Environment.Exit(1)); - var msgStream = new MessagingStream(arguments.Host, arguments.Port); var requestProcessor = new RequestProcessor(msgStream, arguments.WorkerId); @@ -37,9 +37,10 @@ public async static Task Main(string[] args) RequestId = arguments.RequestId, StartStream = new StartStream() { WorkerId = arguments.WorkerId } }; - - msgStream.Write(startedMessage); - await requestProcessor.ProcessRequestLoop(); + await msgStream.Write(startedMessage); + var writerTask = msgStream.Write(); + var readerTask = requestProcessor.ProcessRequestLoop(); + await Task.WhenAll(writerTask, readerTask); } } From bc2d616347ddfca16941282b6dde743b9fe670fb Mon Sep 17 00:00:00 2001 From: pragnagopa Date: Wed, 17 Apr 2019 11:49:29 -0700 Subject: [PATCH 4/6] async invocations --- src/Eventing/Rpc/RpcChannelEvent.cs | 19 -------- src/Eventing/Rpc/RpcEvent.cs | 31 ------------- src/Eventing/Rpc/RpcWriteEvent.cs | 14 ------ src/RequestProcessor.cs | 70 ++++++++++++----------------- 4 files changed, 28 insertions(+), 106 deletions(-) delete mode 100644 src/Eventing/Rpc/RpcChannelEvent.cs delete mode 100644 src/Eventing/Rpc/RpcEvent.cs delete mode 100644 src/Eventing/Rpc/RpcWriteEvent.cs diff --git a/src/Eventing/Rpc/RpcChannelEvent.cs b/src/Eventing/Rpc/RpcChannelEvent.cs deleted file mode 100644 index b4916939..00000000 --- a/src/Eventing/Rpc/RpcChannelEvent.cs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using Microsoft.Azure.WebJobs.Script.Grpc.Messages; - -namespace Microsoft.Azure.Functions.PowerShellWorker -{ - internal class RpcChannelEvent : ScriptEvent - { - internal RpcChannelEvent(string workerId) - : base(nameof(RpcChannelEvent), EventSources.Worker) - { - WorkerId = workerId ?? throw new ArgumentNullException(nameof(workerId)); - } - - internal string WorkerId { get; private set; } - } -} \ No newline at end of file diff --git a/src/Eventing/Rpc/RpcEvent.cs b/src/Eventing/Rpc/RpcEvent.cs deleted file mode 100644 index d4f4f863..00000000 --- a/src/Eventing/Rpc/RpcEvent.cs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. -using Microsoft.Azure.WebJobs.Script.Grpc.Messages; - -namespace Microsoft.Azure.Functions.PowerShellWorker -{ - internal class RpcEvent : ScriptEvent - { - internal RpcEvent(string workerId, StreamingMessage message, MessageOrigin origin = MessageOrigin.Host) - : base(message.ContentCase.ToString(), EventSources.Rpc) - { - Message = message; - Origin = origin; - WorkerId = workerId; - } - - public enum MessageOrigin - { - Worker, - Host - } - - public MessageOrigin Origin { get; } - - public StreamingMessage.ContentOneofCase MessageType => Message.ContentCase; - - public string WorkerId { get; } - - public StreamingMessage Message { get; } - } -} diff --git a/src/Eventing/Rpc/RpcWriteEvent.cs b/src/Eventing/Rpc/RpcWriteEvent.cs deleted file mode 100644 index 8eb4b3c0..00000000 --- a/src/Eventing/Rpc/RpcWriteEvent.cs +++ /dev/null @@ -1,14 +0,0 @@ -namespace Microsoft.Azure.Functions.PowerShellWorker -{ - internal class RpcWriteEvent : ScriptEvent - { - public RpcWriteEvent(string workerId, string invocationId): base(nameof(RpcChannelEvent), EventSources.Worker) - { - InvocationId = invocationId; - WorkerId = workerId; - } - - public string InvocationId { get; } - public string WorkerId { get; } - } -} diff --git a/src/RequestProcessor.cs b/src/RequestProcessor.cs index 2930ec54..0b3773c6 100644 --- a/src/RequestProcessor.cs +++ b/src/RequestProcessor.cs @@ -38,8 +38,8 @@ internal class RequestProcessor private IScriptEventManager _eventManager; private string _workerId; - private Dictionary> _requestHandlers = - new Dictionary>(); + private Dictionary>> _requestHandlers = + new Dictionary>>(); internal RequestProcessor(MessagingStream msgStream, string workerId) { @@ -80,13 +80,13 @@ internal RequestProcessor(MessagingStream msgStream, string workerId) _requestHandlers.Add(StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadRequest, ProcessFunctionEnvironmentReloadRequest); } - internal void InboundEventHandler(InboundEvent serverMessage) + internal async void InboundEventHandler(InboundEvent serverMessage) { StreamingMessage response = null; StreamingMessage request = serverMessage.Message; - if (_requestHandlers.TryGetValue(request.ContentCase, out Func requestFunc)) + if (_requestHandlers.TryGetValue(request.ContentCase, out Func> requestFunc)) { - response = requestFunc(request); + response = await requestFunc(request); } else { @@ -109,7 +109,7 @@ internal async Task ProcessRequestLoop() } } - internal StreamingMessage ProcessWorkerInitRequest(StreamingMessage request) + internal Task ProcessWorkerInitRequest(StreamingMessage request) { StreamingMessage response = NewStreamingMessageTemplate( request.RequestId, @@ -126,15 +126,15 @@ internal StreamingMessage ProcessWorkerInitRequest(StreamingMessage request) RemoteSessionNamedPipeServer.CreateCustomNamedPipeServer(pipeName); } - return response; + return Task.FromResult(response); } - internal StreamingMessage ProcessWorkerTerminateRequest(StreamingMessage request) + internal Task ProcessWorkerTerminateRequest(StreamingMessage request) { return null; } - internal StreamingMessage ProcessWorkerStatusRequest(StreamingMessage request) + internal Task ProcessWorkerStatusRequest(StreamingMessage request) { // WorkerStatusResponse type says that it is not used but this will create an empty one anyway to return to the host StreamingMessage response = NewStreamingMessageTemplate( @@ -142,10 +142,10 @@ internal StreamingMessage ProcessWorkerStatusRequest(StreamingMessage request) StreamingMessage.ContentOneofCase.WorkerStatusResponse, out StatusResult status); - return response; + return Task.FromResult(response); } - internal StreamingMessage ProcessFileChangeEventRequest(StreamingMessage request) + internal Task ProcessFileChangeEventRequest(StreamingMessage request) { return null; } @@ -156,7 +156,7 @@ internal StreamingMessage ProcessFileChangeEventRequest(StreamingMessage request /// concurrently as a FunctionApp doesn't include a lot functions in general. Having this step sequential /// will make the Runspace-level initialization easier and more predictable. /// - internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request) + internal Task ProcessFunctionLoadRequest(StreamingMessage request) { FunctionLoadRequest functionLoadRequest = request.FunctionLoadRequest; @@ -173,7 +173,7 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request) { status.Status = StatusResult.Types.Status.Failure; status.Exception = _initTerminatingError.ToRpcException(); - return response; + return Task.FromResult(response); } // Ideally, the initialization should happen when processing 'WorkerInitRequest', however, the 'WorkerInitRequest' @@ -194,7 +194,7 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request) status.Status = StatusResult.Types.Status.Failure; status.Exception = e.ToRpcException(); - return response; + return Task.FromResult(response); } } @@ -209,14 +209,14 @@ internal StreamingMessage ProcessFunctionLoadRequest(StreamingMessage request) status.Exception = e.ToRpcException(); } - return response; + return Task.FromResult(response); } /// /// Method to process a InvocationRequest. /// This method checks out a worker from the pool, and then starts the actual invocation in a threadpool thread. /// - internal StreamingMessage ProcessInvocationRequest(StreamingMessage request) + internal Task ProcessInvocationRequest(StreamingMessage request) { AzFunctionInfo functionInfo = null; PowerShellManager psManager = null; @@ -225,19 +225,7 @@ internal StreamingMessage ProcessInvocationRequest(StreamingMessage request) { functionInfo = _functionLoader.GetFunctionInfo(request.InvocationRequest.FunctionId); psManager = _powershellPool.CheckoutIdleWorker(request, functionInfo); - - if (_powershellPool.UpperBound == 1) - { - // When the concurrency upper bound is 1, we can handle only one invocation at a time anyways, - // so it's better to just do it on the current thread to reduce the required synchronization. - ProcessInvocationRequestImpl(request, functionInfo, psManager); - } - else - { - // When the concurrency upper bound is more than 1, we have to handle the invocation in a worker - // thread, so multiple invocations can make progress at the same time, even though by time-sharing. - Task.Run(() => ProcessInvocationRequestImpl(request, functionInfo, psManager)); - } + return ProcessInvocationRequestImpl(request, functionInfo, psManager); } catch (Exception e) { @@ -252,17 +240,15 @@ internal StreamingMessage ProcessInvocationRequest(StreamingMessage request) status.Status = StatusResult.Types.Status.Failure; status.Exception = e.ToRpcException(); - return response; + return Task.FromResult(response); } - - return null; } /// /// Implementation method to actual invoke the corresponding function. /// InvocationRequest messages are processed in parallel when there are multiple PowerShellManager instances in the pool. /// - private void ProcessInvocationRequestImpl(StreamingMessage request, AzFunctionInfo functionInfo, PowerShellManager psManager) + private async Task ProcessInvocationRequestImpl(StreamingMessage request, AzFunctionInfo functionInfo, PowerShellManager psManager) { InvocationRequest invocationRequest = request.InvocationRequest; StreamingMessage response = NewStreamingMessageTemplate( @@ -275,8 +261,8 @@ private void ProcessInvocationRequestImpl(StreamingMessage request, AzFunctionIn { // Invoke the function and return a hashtable of out binding data Hashtable results = functionInfo.Type == AzFunctionType.OrchestrationFunction - ? InvokeOrchestrationFunction(psManager, functionInfo, invocationRequest) - : InvokeSingleActivityFunction(psManager, functionInfo, invocationRequest); + ? await InvokeOrchestrationFunction(psManager, functionInfo, invocationRequest) + : await InvokeSingleActivityFunction(psManager, functionInfo, invocationRequest); BindOutputFromResult(response.InvocationResponse, functionInfo, results); } @@ -290,22 +276,22 @@ private void ProcessInvocationRequestImpl(StreamingMessage request, AzFunctionIn _powershellPool.ReclaimUsedWorker(psManager); } - _msgStream.AddToBlockingQueue(response); + return response; } - internal StreamingMessage ProcessInvocationCancelRequest(StreamingMessage request) + internal Task ProcessInvocationCancelRequest(StreamingMessage request) { return null; } - internal StreamingMessage ProcessFunctionEnvironmentReloadRequest(StreamingMessage request) + internal Task ProcessFunctionEnvironmentReloadRequest(StreamingMessage request) { StreamingMessage response = NewStreamingMessageTemplate( request.RequestId, StreamingMessage.ContentOneofCase.FunctionEnvironmentReloadResponse, out StatusResult status); - return response; + return Task.FromResult(response); } #region Helper_Methods @@ -376,7 +362,7 @@ private StreamingMessage NewStreamingMessageTemplate(string requestId, Streaming /// /// Invoke an orchestration function. /// - private Hashtable InvokeOrchestrationFunction(PowerShellManager psManager, AzFunctionInfo functionInfo, InvocationRequest invocationRequest) + private Task InvokeOrchestrationFunction(PowerShellManager psManager, AzFunctionInfo functionInfo, InvocationRequest invocationRequest) { throw new NotImplementedException(PowerShellWorkerStrings.DurableFunctionNotSupported); } @@ -384,7 +370,7 @@ private Hashtable InvokeOrchestrationFunction(PowerShellManager psManager, AzFun /// /// Invoke a regular function or an activity function. /// - private Hashtable InvokeSingleActivityFunction(PowerShellManager psManager, AzFunctionInfo functionInfo, InvocationRequest invocationRequest) + private Task InvokeSingleActivityFunction(PowerShellManager psManager, AzFunctionInfo functionInfo, InvocationRequest invocationRequest) { const string InvocationId = "InvocationId"; const string FunctionDirectory = "FunctionDirectory"; @@ -420,7 +406,7 @@ private Hashtable InvokeSingleActivityFunction(PowerShellManager psManager, AzFu } } - return psManager.InvokeFunction(functionInfo, triggerMetadata, invocationRequest.InputData); + return Task.FromResult(psManager.InvokeFunction(functionInfo, triggerMetadata, invocationRequest.InputData)); } /// From 126e840c8030efba677addfc1e46712a2440b5ca Mon Sep 17 00:00:00 2001 From: pragnagopa Date: Wed, 17 Apr 2019 13:59:12 -0700 Subject: [PATCH 5/6] Add RpcEvent --- src/Eventing/Rpc/RpcEvent.cs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 src/Eventing/Rpc/RpcEvent.cs diff --git a/src/Eventing/Rpc/RpcEvent.cs b/src/Eventing/Rpc/RpcEvent.cs new file mode 100644 index 00000000..37b66c46 --- /dev/null +++ b/src/Eventing/Rpc/RpcEvent.cs @@ -0,0 +1,30 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace Microsoft.Azure.Functions.PowerShellWorker +{ + internal class RpcEvent : ScriptEvent + { + internal RpcEvent(string workerId, StreamingMessage message, MessageOrigin origin = MessageOrigin.Host) + : base(message.ContentCase.ToString(), EventSources.Rpc) + { + Message = message; + Origin = origin; + WorkerId = workerId; + } + + public enum MessageOrigin + { + Worker, + Host + } + + public MessageOrigin Origin { get; } + + public StreamingMessage.ContentOneofCase MessageType => Message.ContentCase; + + public string WorkerId { get; } + + public StreamingMessage Message { get; } + } +} From febd00dfc305de91d874d0f5497eac3f0a6413f1 Mon Sep 17 00:00:00 2001 From: pragnagopa Date: Wed, 17 Apr 2019 14:00:42 -0700 Subject: [PATCH 6/6] fix build error --- src/Eventing/Rpc/RpcEvent.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Eventing/Rpc/RpcEvent.cs b/src/Eventing/Rpc/RpcEvent.cs index 37b66c46..d4f4f863 100644 --- a/src/Eventing/Rpc/RpcEvent.cs +++ b/src/Eventing/Rpc/RpcEvent.cs @@ -1,5 +1,6 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using Microsoft.Azure.WebJobs.Script.Grpc.Messages; namespace Microsoft.Azure.Functions.PowerShellWorker {