diff --git a/dotnet/samples/GettingStartedWithProcesses/GettingStartedWithProcesses.csproj b/dotnet/samples/GettingStartedWithProcesses/GettingStartedWithProcesses.csproj index ee0d0ab58ef5..595ef9d24f6a 100644 --- a/dotnet/samples/GettingStartedWithProcesses/GettingStartedWithProcesses.csproj +++ b/dotnet/samples/GettingStartedWithProcesses/GettingStartedWithProcesses.csproj @@ -10,7 +10,8 @@ - $(NoWarn);CS8618,IDE0009,CA1051,CA1050,CA1707,CA1054,CA2007,VSTHRD111,CS1591,RCS1110,RCS1243,CA5394,SKEXP0001,SKEXP0010,SKEXP0020,SKEXP0040,SKEXP0050,SKEXP0060,SKEXP0070,SKEXP0080,SKEXP0101,SKEXP0110,OPENAI001 + $(NoWarn);CS8618,IDE0009,CA1051,CA1050,CA1707,CA1054,CA2007,VSTHRD111,CS1591,RCS1110,RCS1243,CA5394,SKEXP0001,SKEXP0010,SKEXP0020,SKEXP0040,SKEXP0050,SKEXP0060,SKEXP0070,SKEXP0080,SKEXP0101,SKEXP0110,OPENAI001 + Library 5ee045b0-aea3-4f08-8d31-32d1a6f8fed0 @@ -40,7 +41,7 @@ - + @@ -57,9 +58,9 @@ - - Always - + + + \ No newline at end of file diff --git a/dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs b/dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs new file mode 100644 index 000000000000..144d4ab79d3f --- /dev/null +++ b/dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs @@ -0,0 +1,236 @@ +// Copyright (c) Microsoft. All rights reserved. +using System.Text; +using Microsoft.SemanticKernel; +using Resources; + +namespace Step05; + +/// +/// Demonstrate usage of for a map-reduce operation. +/// +public class Step05_MapReduce : BaseTest +{ + // Target Open AI Services + protected override bool ForceOpenAI => true; + + /// + /// Factor to increase the scale of the content processed. + /// + private const int ScaleFactor = 100; + + private readonly string _sourceContent; + + public Step05_MapReduce(ITestOutputHelper output) + : base(output, redirectSystemConsoleOutput: true) + { + // Initialize the test content + StringBuilder content = new(); + + for (int count = 0; count < ScaleFactor; ++count) + { + content.AppendLine(EmbeddedResource.Read("Grimms-The-King-of-the-Golden-Mountain.txt")); + content.AppendLine(EmbeddedResource.Read("Grimms-The-Water-of-Life.txt")); + content.AppendLine(EmbeddedResource.Read("Grimms-The-White-Snake.txt")); + } + + this._sourceContent = content.ToString().ToUpperInvariant(); + } + + [Fact] + public async Task RunMapReduceAsync() + { + // Define the process + KernelProcess process = SetupMapReduceProcess(nameof(RunMapReduceAsync), "Start"); + + // Execute the process + Kernel kernel = new(); + using LocalKernelProcessContext localProcess = + await process.StartAsync( + kernel, + new KernelProcessEvent + { + Id = "Start", + Data = this._sourceContent, + }); + + // Display the results + Dictionary results = (Dictionary?)kernel.Data[ResultStep.ResultKey] ?? []; + foreach (var result in results) + { + Console.WriteLine($"{result.Key}: {result.Value}"); + } + } + + private KernelProcess SetupMapReduceProcess(string processName, string inputEventId) + { + ProcessBuilder process = new(processName); + + ProcessStepBuilder chunkStep = process.AddStepFromType(); + process + .OnInputEvent(inputEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(chunkStep)); + + ProcessMapBuilder mapStep = process.AddMapStepFromType(); + chunkStep + .OnEvent(ChunkStep.EventId) + .SendEventTo(new ProcessFunctionTargetBuilder(mapStep)); + + ProcessStepBuilder resultStep = process.AddStepFromType(); + mapStep + .OnEvent(CountStep.EventId) + .SendEventTo(new ProcessFunctionTargetBuilder(resultStep)); + + return process.Build(); + } + + // Step for breaking the content into chunks + private sealed class ChunkStep : KernelProcessStep + { + public const string EventId = "ChunkComplete"; + + [KernelFunction] + public async ValueTask ChunkAsync(KernelProcessStepContext context, string content) + { + int chunkSize = content.Length / Environment.ProcessorCount; + string[] chunks = ChunkContent(content, chunkSize).ToArray(); + + await context.EmitEventAsync(new() { Id = EventId, Data = chunks }); + } + + private IEnumerable ChunkContent(string content, int chunkSize) + { + for (int index = 0; index < content.Length; index += chunkSize) + { + yield return content.Substring(index, Math.Min(chunkSize, content.Length - index)); + } + } + } + + // Step for counting the words in a chunk + private sealed class CountStep : KernelProcessStep + { + public const string EventId = "CountComplete"; + + [KernelFunction] + public async ValueTask ComputeAsync(KernelProcessStepContext context, string chunk) + { + Dictionary counts = []; + + string[] words = chunk.Split([' ', '\n', '\r', '.', ',', '’'], StringSplitOptions.RemoveEmptyEntries); + foreach (string word in words) + { + if (s_notInteresting.Contains(word)) + { + continue; + } + + counts.TryGetValue(word.Trim(), out int count); + counts[word] = ++count; + } + + await context.EmitEventAsync(new() { Id = EventId, Data = counts }); + } + } + + // Step for combining the results + private sealed class ResultStep : KernelProcessStep + { + public const string ResultKey = "WordCount"; + + [KernelFunction] + public async ValueTask ComputeAsync(KernelProcessStepContext context, IList> results, Kernel kernel) + { + Dictionary totals = []; + + foreach (Dictionary result in results) + { + foreach (KeyValuePair pair in result) + { + totals.TryGetValue(pair.Key, out int count); + totals[pair.Key] = count + pair.Value; + } + } + + var sorted = + from kvp in totals + orderby kvp.Value descending + select kvp; + + kernel.Data[ResultKey] = sorted.Take(10).ToDictionary(kvp => kvp.Key, kvp => kvp.Value); + } + } + + // Uninteresting words to remove from content + private static readonly HashSet s_notInteresting = + [ + "A", + "ALL", + "AN", + "AND", + "AS", + "AT", + "BE", + "BEFORE", + "BUT", + "BY", + "CAME", + "COULD", + "FOR", + "GO", + "HAD", + "HAVE", + "HE", + "HER", + "HIM", + "HIMSELF", + "HIS", + "HOW", + "I", + "IF", + "IN", + "INTO", + "IS", + "IT", + "ME", + "MUST", + "MY", + "NO", + "NOT", + "NOW", + "OF", + "ON", + "ONCE", + "ONE", + "ONLY", + "OUT", + "S", + "SAID", + "SAW", + "SET", + "SHE", + "SHOULD", + "SO", + "THAT", + "THE", + "THEM", + "THEN", + "THEIR", + "THERE", + "THEY", + "THIS", + "TO", + "VERY", + "WAS", + "WENT", + "WERE", + "WHAT", + "WHEN", + "WHO", + "WILL", + "WITH", + "WOULD", + "UP", + "UPON", + "YOU", + ]; +} diff --git a/dotnet/samples/GettingStartedWithProcesses/Utilities/ProcessStateMetadataUtilities.cs b/dotnet/samples/GettingStartedWithProcesses/Utilities/ProcessStateMetadataUtilities.cs index 018b98a378e2..9d80f650a238 100644 --- a/dotnet/samples/GettingStartedWithProcesses/Utilities/ProcessStateMetadataUtilities.cs +++ b/dotnet/samples/GettingStartedWithProcesses/Utilities/ProcessStateMetadataUtilities.cs @@ -62,7 +62,7 @@ private static void StoreProcessStateLocally(KernelProcessStateMetadata processS throw new KernelException($"Filepath for process {processStateInfo.Name} does not have .json extension"); } - var content = JsonSerializer.Serialize(processStateInfo, s_jsonOptions); + string content = JsonSerializer.Serialize(processStateInfo, s_jsonOptions); Console.WriteLine($"Process State: \n{content}"); Console.WriteLine($"Saving Process State Locally: \n{Path.GetFullPath(fullFilepath)}"); File.WriteAllText(fullFilepath, content); diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessMap.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessMap.cs new file mode 100644 index 000000000000..f171b9527c77 --- /dev/null +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessMap.cs @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft. All rights reserved. +using System.Collections.Generic; + +namespace Microsoft.SemanticKernel; + +/// +/// A serializable representation of a ProcessMap. +/// +public sealed record KernelProcessMap : KernelProcessStepInfo +{ + /// + /// The map operation. + /// + public KernelProcessStepInfo Operation { get; } + + /// + /// Creates a new instance of the class. + /// + /// The process state. + /// The map operation. + /// The edges for the map. + public KernelProcessMap(KernelProcessMapState state, KernelProcessStepInfo operation, Dictionary> edges) + : base(typeof(KernelProcessMap), state, edges) + { + Verify.NotNull(operation, nameof(operation)); + Verify.NotNullOrWhiteSpace(state.Name, $"{nameof(state)}.{nameof(KernelProcessMapState.Name)}"); + Verify.NotNullOrWhiteSpace(state.Id, $"{nameof(state)}.{nameof(KernelProcessMapState.Id)}"); + + this.Operation = operation; + } +} diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessMapState.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessMapState.cs new file mode 100644 index 000000000000..90d24fdfbabc --- /dev/null +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessMapState.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Runtime.Serialization; + +namespace Microsoft.SemanticKernel; + +/// +/// Represents the state of a . +/// +[DataContract] +public sealed record KernelProcessMapState : KernelProcessStepState +{ + /// + /// Initializes a new instance of the class. + /// + /// The name of the associated + /// version id of the process step state + /// The Id of the associated + public KernelProcessMapState(string name, string version, string id) + : base(name, version, id) + { + Verify.NotNullOrWhiteSpace(id, nameof(id)); + } +} diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessState.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessState.cs index b21148188da2..bffffe32356a 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessState.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessState.cs @@ -5,7 +5,7 @@ namespace Microsoft.SemanticKernel; /// -/// Represents the state of a process. +/// Represents the state of a . /// [DataContract] public sealed record KernelProcessState : KernelProcessStepState diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs index bf4aba1321db..6dfac0412d29 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepContext.cs @@ -42,6 +42,8 @@ public ValueTask EmitEventAsync( object? data = null, KernelProcessEventVisibility visibility = KernelProcessEventVisibility.Internal) { + Verify.NotNullOrWhiteSpace(eventId, nameof(eventId)); + return this._stepMessageChannel.EmitEventAsync( new KernelProcessEvent { diff --git a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepState.cs b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepState.cs index 9004d8f10b78..521b507905a6 100644 --- a/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepState.cs +++ b/dotnet/src/Experimental/Process.Abstractions/KernelProcessStepState.cs @@ -62,8 +62,8 @@ internal static void RegisterDerivedType(Type derivedType) /// The Id of the associated public KernelProcessStepState(string name, string version, string? id = null) { - Verify.NotNullOrWhiteSpace(name); - Verify.NotNullOrWhiteSpace(version); + Verify.NotNullOrWhiteSpace(name, nameof(name)); + Verify.NotNullOrWhiteSpace(version, nameof(version)); this.Id = id; this.Name = name; diff --git a/dotnet/src/Experimental/Process.Abstractions/Models/KernelProcessMapStateMetadata.cs b/dotnet/src/Experimental/Process.Abstractions/Models/KernelProcessMapStateMetadata.cs new file mode 100644 index 000000000000..ef835f474b93 --- /dev/null +++ b/dotnet/src/Experimental/Process.Abstractions/Models/KernelProcessMapStateMetadata.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft. All rights reserved. +using System.Runtime.Serialization; +using System.Text.Json.Serialization; + +namespace Microsoft.SemanticKernel.Process.Models; + +/// +/// Process state used for State Persistence serialization +/// +public sealed record class KernelProcessMapStateMetadata : KernelProcessStepStateMetadata +{ + /// + /// Process State of Steps if provided + /// + [DataMember] + [JsonPropertyName("operationState")] + public KernelProcessStepStateMetadata? OperationState { get; set; } +} diff --git a/dotnet/src/Experimental/Process.Abstractions/Models/KernelProcessStepStateMetadata.cs b/dotnet/src/Experimental/Process.Abstractions/Models/KernelProcessStepStateMetadata.cs index 7b92e5e4dd90..d873c819cdb7 100644 --- a/dotnet/src/Experimental/Process.Abstractions/Models/KernelProcessStepStateMetadata.cs +++ b/dotnet/src/Experimental/Process.Abstractions/Models/KernelProcessStepStateMetadata.cs @@ -11,6 +11,7 @@ namespace Microsoft.SemanticKernel.Process.Models; /// [JsonPolymorphic(TypeDiscriminatorPropertyName = "$type", UnknownDerivedTypeHandling = JsonUnknownDerivedTypeHandling.FallBackToNearestAncestor)] [JsonDerivedType(typeof(KernelProcessStepStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Step))] +[JsonDerivedType(typeof(KernelProcessMapStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Map))] [JsonDerivedType(typeof(KernelProcessStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Process))] public record class KernelProcessStepStateMetadata { diff --git a/dotnet/src/Experimental/Process.Core/Internal/KernelProcessStateMetadataExtension.cs b/dotnet/src/Experimental/Process.Core/Internal/KernelProcessStateMetadataExtension.cs index 701cbd157dc9..11ff55608767 100644 --- a/dotnet/src/Experimental/Process.Core/Internal/KernelProcessStateMetadataExtension.cs +++ b/dotnet/src/Experimental/Process.Core/Internal/KernelProcessStateMetadataExtension.cs @@ -5,37 +5,38 @@ using Microsoft.SemanticKernel.Process.Models; namespace Microsoft.SemanticKernel.Process.Internal; + internal static class KernelProcessStateMetadataExtension { - public static List BuildWithStateMetadata(this List stepBuilders, KernelProcessStateMetadata? stateMetadata) + public static List BuildWithStateMetadata(this ProcessBuilder processBuilder, KernelProcessStateMetadata? stateMetadata) { List builtSteps = []; // 1- Validate StateMetadata: Migrate previous state versions if needed + sanitize state KernelProcessStateMetadata? sanitizedMetadata = null; if (stateMetadata != null) { - sanitizedMetadata = SanitizeProcessStateMetadata(stateMetadata, stepBuilders); + sanitizedMetadata = SanitizeProcessStateMetadata(stateMetadata, processBuilder.Steps); } // 2- Build steps info with validated stateMetadata - stepBuilders.ForEach(step => + foreach (ProcessStepBuilder step in processBuilder.Steps) { if (sanitizedMetadata != null && sanitizedMetadata.StepsState != null && sanitizedMetadata.StepsState.TryGetValue(step.Name, out var stepStateObject) && stepStateObject != null) { builtSteps.Add(step.BuildStep(stepStateObject)); - return; + continue; } builtSteps.Add(step.BuildStep()); - }); + } return builtSteps; } - private static KernelProcessStateMetadata SanitizeProcessStateMetadata(KernelProcessStateMetadata stateMetadata, List stepBuilders) + private static KernelProcessStateMetadata SanitizeProcessStateMetadata(KernelProcessStateMetadata stateMetadata, IReadOnlyList stepBuilders) { KernelProcessStateMetadata sanitizedStateMetadata = stateMetadata; - stepBuilders.ForEach(step => + foreach (ProcessStepBuilder step in stepBuilders) { // 1- find matching key name with exact match or by alias match string? stepKey = null; @@ -70,7 +71,12 @@ private static KernelProcessStateMetadata SanitizeProcessStateMetadata(KernelPro // version mismatch - check if migration logic in place if (step is ProcessBuilder subprocessBuilder) { - var sanitizedStepState = SanitizeProcessStateMetadata((KernelProcessStateMetadata)savedStateMetadata, subprocessBuilder.Steps.ToList()); + KernelProcessStateMetadata sanitizedStepState = SanitizeProcessStateMetadata((KernelProcessStateMetadata)savedStateMetadata, subprocessBuilder.Steps); + sanitizedStateMetadata.StepsState[step.Name] = sanitizedStepState; + } + else if (step is ProcessMapBuilder mapBuilder) + { + KernelProcessStateMetadata sanitizedStepState = SanitizeProcessStateMetadata((KernelProcessStateMetadata)savedStateMetadata, [mapBuilder.MapOperation]); sanitizedStateMetadata.StepsState[step.Name] = sanitizedStepState; } else if (false) @@ -92,7 +98,7 @@ private static KernelProcessStateMetadata SanitizeProcessStateMetadata(KernelPro } } } - }); + } return sanitizedStateMetadata; } diff --git a/dotnet/src/Experimental/Process.Core/ProcessBuilder.cs b/dotnet/src/Experimental/Process.Core/ProcessBuilder.cs index e8ed21744da1..e475017af6ba 100644 --- a/dotnet/src/Experimental/Process.Core/ProcessBuilder.cs +++ b/dotnet/src/Experimental/Process.Core/ProcessBuilder.cs @@ -99,54 +99,67 @@ internal override Dictionary GetFunctionMetadata internal override KernelProcessStepInfo BuildStep(KernelProcessStepStateMetadata? stateMetadata = null) { // The step is a, process so we can return the step info directly. - if (stateMetadata is KernelProcessStateMetadata processState) - { - return this.Build(processState); - } - - return this.Build(); + return this.Build(stateMetadata as KernelProcessStateMetadata); } - #region Public Interface - /// - /// A read-only collection of steps in the process. + /// Add the provided step builder to the process. /// - public IReadOnlyList Steps => this._steps.AsReadOnly(); + /// + /// Utilized by only. + /// + internal void AddStepFromBuilder(ProcessStepBuilder stepBuilder) + { + this._steps.Add(stepBuilder); + } /// /// Check to ensure stepName is not used yet in another step /// - /// - /// private bool StepNameAlreadyExists(string stepName) { return this._steps.Select(step => step.Name).Contains(stepName); } /// - /// Adds a step to the process. + /// Verify step is unique and add to the process. /// - /// The step Type. - /// The name of the step. This parameter is optional. - /// Aliases that have been used by previous versions of the step, used for supporting backward compatibility when reading old version Process States - /// An instance of - public ProcessStepBuilder AddStepFromType(string? name = null, List? aliases = null) where TStep : KernelProcessStep + private TBuilder AddStep(TBuilder builder, IReadOnlyList? aliases) where TBuilder : ProcessStepBuilder { - var stepBuilder = new ProcessStepBuilder(name); - if (this.StepNameAlreadyExists(stepBuilder.Name)) + if (this.StepNameAlreadyExists(builder.Name)) { - throw new InvalidOperationException($"Step name {stepBuilder.Name} is already used, assign a different name for step"); + throw new InvalidOperationException($"Step name {builder.Name} is already used, assign a different name for step"); } if (aliases != null && aliases.Count > 0) { - stepBuilder.Aliases = aliases; + builder.Aliases = aliases; } - this._steps.Add(stepBuilder); + this._steps.Add(builder); - return stepBuilder; + return builder; + } + + #region Public Interface + + /// + /// A read-only collection of steps in the process. + /// + public IReadOnlyList Steps => this._steps.AsReadOnly(); + + /// + /// Adds a step to the process. + /// + /// The step Type. + /// The name of the step. This parameter is optional. + /// Aliases that have been used by previous versions of the step, used for supporting backward compatibility when reading old version Process States + /// An instance of + public ProcessStepBuilder AddStepFromType(string? name = null, IReadOnlyList? aliases = null) where TStep : KernelProcessStep + { + ProcessStepBuilder stepBuilder = new(name); + + return this.AddStep(stepBuilder, aliases); } /// @@ -158,22 +171,11 @@ public ProcessStepBuilder AddStepFromType(string? name = null, ListThe name of the step. This parameter is optional. /// Aliases that have been used by previous versions of the step, used for supporting backward compatibility when reading old version Process States /// An instance of - public ProcessStepBuilder AddStepFromType(TState initialState, string? name = null, List? aliases = null) where TStep : KernelProcessStep where TState : class, new() + public ProcessStepBuilder AddStepFromType(TState initialState, string? name = null, IReadOnlyList? aliases = null) where TStep : KernelProcessStep where TState : class, new() { - var stepBuilder = new ProcessStepBuilder(name, initialState: initialState); - if (this.StepNameAlreadyExists(stepBuilder.Name)) - { - throw new InvalidOperationException($"Step name {stepBuilder.Name} is already used, assign a different name for step"); - } - - if (aliases != null && aliases.Count > 0) - { - stepBuilder.Aliases = aliases; - } - - this._steps.Add(stepBuilder); + ProcessStepBuilder stepBuilder = new(name, initialState: initialState); - return stepBuilder; + return this.AddStep(stepBuilder, aliases); } /// @@ -182,21 +184,62 @@ public ProcessStepBuilder AddStepFromType(string? name = null, ListThe process to add as a step. /// Aliases that have been used by previous versions of the step, used for supporting backward compatibility when reading old version Process States /// An instance of - public ProcessBuilder AddStepFromProcess(ProcessBuilder kernelProcess, List? aliases = null) + public ProcessBuilder AddStepFromProcess(ProcessBuilder kernelProcess, IReadOnlyList? aliases = null) { kernelProcess.HasParentProcess = true; - if (this.StepNameAlreadyExists(kernelProcess.Name)) - { - throw new InvalidOperationException($"Step name {kernelProcess.Name} is already used, assign a different name for step"); - } - if (aliases != null && aliases.Count > 0) - { - kernelProcess.Aliases = aliases; - } + return this.AddStep(kernelProcess, aliases); + } - this._steps.Add(kernelProcess); - return kernelProcess; + /// + /// Adds a step to the process. + /// + /// The step Type. + /// The name of the step. This parameter is optional. + /// Aliases that have been used by previous versions of the step, used for supporting backward compatibility when reading old version Process States + /// An instance of + public ProcessMapBuilder AddMapStepFromType(string? name = null, IReadOnlyList? aliases = null) where TStep : KernelProcessStep + { + ProcessStepBuilder stepBuilder = new(name); + + ProcessMapBuilder mapBuilder = new(stepBuilder); + + return this.AddStep(mapBuilder, aliases); + } + + /// + /// Adds a step to the process and define it's initial user-defined state. + /// + /// The step Type. + /// The state Type. + /// The initial state of the step. + /// The name of the step. This parameter is optional. + /// Aliases that have been used by previous versions of the step, used for supporting backward compatibility when reading old version Process States + /// An instance of + public ProcessMapBuilder AddMapStepFromType(TState initialState, string? name = null, IReadOnlyList? aliases = null) where TStep : KernelProcessStep where TState : class, new() + { + ProcessStepBuilder stepBuilder = new(name, initialState: initialState); + + ProcessMapBuilder mapBuilder = new(stepBuilder); + + return this.AddStep(mapBuilder, aliases); + } + + /// + /// Adds a map operation to the process that accepts an enumerable input parameter and + /// processes each individual parameter value by the specified map operation (TStep). + /// Results are coalesced into a result set of the same dimension as the input set. + /// + /// The target for the map operation + /// Aliases that have been used by previous versions of the step, used for supporting backward compatibility when reading old version Process States + /// An instance of + public ProcessMapBuilder AddMapStepFromProcess(ProcessBuilder process, IReadOnlyList? aliases = null) + { + process.HasParentProcess = true; + + ProcessMapBuilder mapBuilder = new(process); + + return this.AddStep(mapBuilder, aliases); } /// @@ -231,7 +274,7 @@ public ProcessEdgeBuilder OnError() /// public ProcessFunctionTargetBuilder WhereInputEventIs(string eventId) { - Verify.NotNullOrWhiteSpace(eventId); + Verify.NotNullOrWhiteSpace(eventId, nameof(eventId)); if (!this._externalEventTargetMap.TryGetValue(eventId, out var target)) { @@ -254,11 +297,12 @@ public KernelProcess Build(KernelProcessStateMetadata? stateMetadata = null) var builtEdges = this.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.Select(e => e.Build()).ToList()); // Build the steps and injecting initial state if any is provided - var builtSteps = this._steps.BuildWithStateMetadata(stateMetadata); + var builtSteps = this.BuildWithStateMetadata(stateMetadata); // Create the process - var state = new KernelProcessState(this.Name, version: this.Version, id: this.HasParentProcess ? this.Id : null); - var process = new KernelProcess(state, builtSteps, builtEdges); + KernelProcessState state = new(this.Name, version: this.Version, id: this.HasParentProcess ? this.Id : null); + KernelProcess process = new(state, builtSteps, builtEdges); + return process; } diff --git a/dotnet/src/Experimental/Process.Core/ProcessEdgeBuilder.cs b/dotnet/src/Experimental/Process.Core/ProcessEdgeBuilder.cs index 076912f318ec..6de3a24be770 100644 --- a/dotnet/src/Experimental/Process.Core/ProcessEdgeBuilder.cs +++ b/dotnet/src/Experimental/Process.Core/ProcessEdgeBuilder.cs @@ -1,5 +1,7 @@ // Copyright (c) Microsoft. All rights reserved. +using System; + namespace Microsoft.SemanticKernel; /// @@ -26,6 +28,9 @@ public sealed class ProcessEdgeBuilder /// The Id of the event. internal ProcessEdgeBuilder(ProcessBuilder source, string eventId) { + Verify.NotNull(source, nameof(source)); + Verify.NotNullOrWhiteSpace(eventId, nameof(eventId)); + this.Source = source; this.EventId = eventId; } @@ -35,6 +40,11 @@ internal ProcessEdgeBuilder(ProcessBuilder source, string eventId) /// public ProcessEdgeBuilder SendEventTo(ProcessFunctionTargetBuilder target) { + if (this.Target is not null) + { + throw new InvalidOperationException("An output target has already been set."); + } + this.Target = target; ProcessStepEdgeBuilder edgeBuilder = new(this.Source, this.EventId) { Target = this.Target }; this.Source.LinkTo(this.EventId, edgeBuilder); diff --git a/dotnet/src/Experimental/Process.Core/ProcessFunctionTargetBuilder.cs b/dotnet/src/Experimental/Process.Core/ProcessFunctionTargetBuilder.cs index 6c2d29f67cc9..ceb9cc290639 100644 --- a/dotnet/src/Experimental/Process.Core/ProcessFunctionTargetBuilder.cs +++ b/dotnet/src/Experimental/Process.Core/ProcessFunctionTargetBuilder.cs @@ -1,4 +1,5 @@ // Copyright (c) Microsoft. All rights reserved. +using System; namespace Microsoft.SemanticKernel; @@ -15,7 +16,8 @@ public sealed record ProcessFunctionTargetBuilder /// The parameter to target. public ProcessFunctionTargetBuilder(ProcessStepBuilder step, string? functionName = null, string? parameterName = null) { - Verify.NotNull(step); + Verify.NotNull(step, nameof(step)); + this.Step = step; // If the step is an EndStep, we don't need to resolve the function target. @@ -28,7 +30,10 @@ public ProcessFunctionTargetBuilder(ProcessStepBuilder step, string? functionNam // Make sure the function target is valid. var target = step.ResolveFunctionTarget(functionName, parameterName); - Verify.NotNull(target); + if (target == null) + { + throw new InvalidOperationException($"Failed to resolve function target for {step.GetType().Name}, {step.Name}: Function - {functionName ?? "any"} / Parameter - {parameterName ?? "any"}"); + } this.FunctionName = target.FunctionName!; this.ParameterName = target.ParameterName; diff --git a/dotnet/src/Experimental/Process.Core/ProcessMapBuilder.cs b/dotnet/src/Experimental/Process.Core/ProcessMapBuilder.cs new file mode 100644 index 000000000000..dc74b2534d95 --- /dev/null +++ b/dotnet/src/Experimental/Process.Core/ProcessMapBuilder.cs @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft. All rights reserved. +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.SemanticKernel.Process.Models; + +namespace Microsoft.SemanticKernel; + +/// +/// Provides functionality to define a step that maps an enumerable input for parallel processing +/// targeting the provided operation and provides the resulting value as an enumerable parameter +/// with equivalent dimension as the input. +/// +public sealed class ProcessMapBuilder : ProcessStepBuilder +{ + /// + /// Initializes a new instance of the class. + /// + /// The target of the map operation. May target a step or process + internal ProcessMapBuilder(ProcessStepBuilder mapOperation) + : base($"Map{mapOperation.Name}") + { + this.MapOperation = mapOperation; + } + + /// + /// Version of the map-step, used when saving the state of the step. + /// + public string Version { get; init; } = "v1"; + + /// + /// Retrieves the target for a given external event. The step associated with the target is the process itself (this). + /// + /// The Id of the event + /// An instance of + /// + public ProcessFunctionTargetBuilder WhereInputEventIs(string eventId) + { + Verify.NotNullOrWhiteSpace(eventId, nameof(eventId)); + + if (this.MapOperation is not ProcessBuilder process) + { + throw new KernelException("Map operation is not a process."); + } + + ProcessFunctionTargetBuilder operationTarget = process.WhereInputEventIs(eventId); + + return operationTarget with { Step = this, TargetEventId = eventId }; + } + + /// + /// The map operation that will be executed for each element in the input. + /// + internal ProcessStepBuilder MapOperation { get; } + + /// + /// + /// Never called as the map is a proxy for the map operation and does not have a function target. + /// + internal override Dictionary GetFunctionMetadataMap() + { + throw new NotImplementedException($"{nameof(ProcessMapBuilder)}.{nameof(GetFunctionMetadataMap)} should never be invoked"); + } + + /// + internal override KernelProcessFunctionTarget ResolveFunctionTarget(string? functionName, string? parameterName) + { + if (this.MapOperation is ProcessBuilder processOperation) + { + throw new KernelException($"Map operation is a process. Use {nameof(ProcessMapBuilder)}.{nameof(WhereInputEventIs)} to resolve target."); + } + + return this.MapOperation.ResolveFunctionTarget(functionName, parameterName); + } + + /// + internal override KernelProcessStepInfo BuildStep(KernelProcessStepStateMetadata? stateMetadata = null) + { + KernelProcessMapStateMetadata? mapMetadata = stateMetadata as KernelProcessMapStateMetadata; + + // Build the edges first + var builtEdges = this.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.Select(e => e.Build()).ToList()); + + // Define the map state + KernelProcessMapState state = new(this.Name, this.Version, this.Id); + + return new KernelProcessMap(state, this.MapOperation.BuildStep(mapMetadata?.OperationState), builtEdges); + } +} diff --git a/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs b/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs index 70749a751da3..cc1530c953de 100644 --- a/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs +++ b/dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs @@ -20,7 +20,7 @@ public abstract class ProcessStepBuilder /// /// The unique identifier for the step. This may be null until the step is run within a process. /// - public string? Id { get; } + public string Id { get; } /// /// The name of the step. This is intended to be a human-readable name and is not required to be unique. @@ -30,7 +30,7 @@ public abstract class ProcessStepBuilder /// /// Alternative names that have been used to previous versions of the step /// - public List Aliases { get; set; } = []; + public IReadOnlyList Aliases { get; internal set; } = []; /// /// Define the behavior of the step when the event with the specified Id is fired. diff --git a/dotnet/src/Experimental/Process.Core/ProcessStepEdgeBuilder.cs b/dotnet/src/Experimental/Process.Core/ProcessStepEdgeBuilder.cs index 2e4afbfa51e9..f734959fb6fe 100644 --- a/dotnet/src/Experimental/Process.Core/ProcessStepEdgeBuilder.cs +++ b/dotnet/src/Experimental/Process.Core/ProcessStepEdgeBuilder.cs @@ -20,7 +20,7 @@ public sealed class ProcessStepEdgeBuilder /// /// The source step of the edge. /// - internal ProcessStepBuilder Source { get; init; } + internal ProcessStepBuilder Source { get; } /// /// Initializes a new instance of the class. @@ -29,8 +29,8 @@ public sealed class ProcessStepEdgeBuilder /// The Id of the event. internal ProcessStepEdgeBuilder(ProcessStepBuilder source, string eventId) { - Verify.NotNull(source); - Verify.NotNullOrWhiteSpace(eventId); + Verify.NotNull(source, nameof(source)); + Verify.NotNullOrWhiteSpace(eventId, nameof(eventId)); this.Source = source; this.EventId = eventId; @@ -59,6 +59,11 @@ public ProcessStepEdgeBuilder SendEventTo(ProcessFunctionTargetBuilder target) throw new InvalidOperationException("An output target has already been set."); } + if (this.Source is ProcessMapBuilder && target.Step is ProcessMapBuilder) + { + throw new ArgumentException($"{nameof(ProcessMapBuilder)} may not target another {nameof(ProcessMapBuilder)}.", nameof(target)); + } + this.Target = target; this.Source.LinkTo(this.EventId, this); diff --git a/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Controllers/ProcessTestController.cs b/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Controllers/ProcessTestController.cs index c716968a2962..187500e15dee 100644 --- a/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Controllers/ProcessTestController.cs +++ b/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/Controllers/ProcessTestController.cs @@ -42,7 +42,7 @@ public async Task StartProcessAsync(string processId, [FromBody] return this.BadRequest("Process already started"); } - var initialEvent = KernelProcessEventSerializer.ToKernelProcessEvent(request.InitialEvent); + KernelProcessEvent initialEvent = request.InitialEvent.ToKernelProcessEvent(); var kernelProcess = request.Process.ToKernelProcess(); var context = await kernelProcess.StartAsync(initialEvent); diff --git a/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/ProcessStateTypeResolver.cs b/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/ProcessStateTypeResolver.cs index c2880c92b334..ae4569eaa933 100644 --- a/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/ProcessStateTypeResolver.cs +++ b/dotnet/src/Experimental/Process.IntegrationTestHost.Dapr/ProcessStateTypeResolver.cs @@ -13,7 +13,12 @@ namespace SemanticKernel.Process.IntegrationTests; public class ProcessStateTypeResolver : DefaultJsonTypeInfoResolver where T : KernelProcessStep { private static readonly Type s_genericType = typeof(KernelProcessStep<>); - private readonly Dictionary _types = new() { { "", typeof(KernelProcessState) } }; + private readonly Dictionary _types = + new() + { + { "process", typeof(KernelProcessState) }, + { "map", typeof(KernelProcessMapState) }, + }; /// /// Initializes a new instance of the class. @@ -75,7 +80,8 @@ public override JsonTypeInfo GetTypeInfo(Type type, JsonSerializerOptions option UnknownDerivedTypeHandling = JsonUnknownDerivedTypeHandling.FailSerialization, DerivedTypes = { - new JsonDerivedType(typeof(DaprProcessInfo), nameof(DaprProcessInfo)) + new JsonDerivedType(typeof(DaprProcessInfo), nameof(DaprProcessInfo)), + new JsonDerivedType(typeof(DaprMapInfo), nameof(DaprMapInfo)), } }; } diff --git a/dotnet/src/Experimental/Process.IntegrationTests.Resources/ProcessMapTestResources.cs b/dotnet/src/Experimental/Process.IntegrationTests.Resources/ProcessMapTestResources.cs new file mode 100644 index 000000000000..758bd75b1d2d --- /dev/null +++ b/dotnet/src/Experimental/Process.IntegrationTests.Resources/ProcessMapTestResources.cs @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.SemanticKernel; + +namespace SemanticKernel.Process.IntegrationTests; + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member + +/// +/// A step that contains a map operation that emits two events. +/// +public sealed class ComputeStep : KernelProcessStep +{ + public const string SquareEventId = "SquareResult"; + public const string CubicEventId = "CubicResult"; + public const string ComputeFunction = "MapCompute"; + + [KernelFunction(ComputeFunction)] + public async ValueTask ComputeAsync(KernelProcessStepContext context, long value) + { + long square = value * value; + await context.EmitEventAsync(new() { Id = SquareEventId, Data = square }); + await context.EmitEventAsync(new() { Id = CubicEventId, Data = square * value }); + } +} + +/// +/// State for union step to capture results. +/// +public sealed record UnionState +{ + public long SquareResult { get; set; } + public long CubicResult { get; set; } +}; + +/// +/// The step that combines the results of the map operation. +/// +public sealed class UnionStep : KernelProcessStep +{ + public const string EventId = "MapUnion"; + public const string SumSquareFunction = "UnionSquare"; + public const string SumCubicFunction = "UnionCubic"; + + private UnionState _state = new(); + + public override ValueTask ActivateAsync(KernelProcessStepState state) + { + this._state = state.State ?? throw new InvalidDataException(); + + return ValueTask.CompletedTask; + } + + [KernelFunction(SumSquareFunction)] + public void SumSquare(IList values) + { + long sum = values.Sum(); + this._state.SquareResult = sum; + } + + [KernelFunction(SumCubicFunction)] + public void SumCubic(IList values) + { + long sum = values.Sum(); + this._state.CubicResult = sum; + } +} diff --git a/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessMapTests.cs b/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessMapTests.cs new file mode 100644 index 000000000000..f110292672ea --- /dev/null +++ b/dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessMapTests.cs @@ -0,0 +1,124 @@ +// Copyright (c) Microsoft. All rights reserved. + +#pragma warning disable IDE0005 // Using directive is unnecessary. +using System.Linq; +using System.Threading.Tasks; +using Microsoft.SemanticKernel; +using Microsoft.SemanticKernel.Process; +using Xunit; +#pragma warning restore IDE0005 // Using directive is unnecessary. + +namespace SemanticKernel.Process.IntegrationTests; + +/// +/// Integration test focusing on . +/// +[Collection(nameof(ProcessTestGroup))] +public class ProcessMapTests : IClassFixture +{ + private readonly ProcessTestFixture _fixture; + + /// + /// Initializes a new instance of the class. This is called by the test framework. + /// + public ProcessMapTests(ProcessTestFixture fixture) + { + this._fixture = fixture; + } + + /// + /// Tests a map-step with a step as the map-operation. + /// + [Fact] + public async Task TestMapWithStepAsync() + { + // Arrange + ProcessBuilder process = new(nameof(TestMapWithStepAsync)); + + ProcessMapBuilder mapStep = process.AddMapStepFromType(); + process + .OnInputEvent("Start") + .SendEventTo(new ProcessFunctionTargetBuilder(mapStep)); + + ProcessStepBuilder unionStep = process.AddStepFromType("Union"); + mapStep + .OnEvent(ComputeStep.SquareEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumSquareFunction)); + mapStep + .OnEvent(ComputeStep.CubicEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumCubicFunction)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + KernelProcessContext processContext = + await this._fixture.StartProcessAsync( + processInstance, + kernel, + new KernelProcessEvent() + { + Id = "Start", + Data = new int[] { 1, 2, 3, 4, 5 } + }); + + // Assert + KernelProcess processState = await processContext.GetStateAsync(); + KernelProcessStepState unionState = (KernelProcessStepState)processState.Steps.Where(s => s.State.Name == "Union").Single().State; + + Assert.NotNull(unionState?.State); + Assert.Equal(55L, unionState.State.SquareResult); + Assert.Equal(225L, unionState.State.CubicResult); + } + + /// + /// Tests a map-step with a process as the map-operation. + /// + [Fact] + public async Task TestMapWithProcessAsync() + { + // Arrange + ProcessBuilder process = new(nameof(TestMapWithStepAsync)); + + ProcessBuilder mapProcess = new("MapOperation"); + ProcessStepBuilder computeStep = mapProcess.AddStepFromType(); + mapProcess + .OnInputEvent("Anything") + .SendEventTo(new ProcessFunctionTargetBuilder(computeStep)); + + ProcessMapBuilder mapStep = process.AddMapStepFromProcess(mapProcess); + process + .OnInputEvent("Start") + .SendEventTo(mapStep.WhereInputEventIs("Anything")); + + ProcessStepBuilder unionStep = process.AddStepFromType("Union"); + mapStep + .OnEvent(ComputeStep.SquareEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumSquareFunction)); + mapStep + .OnEvent(ComputeStep.CubicEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumCubicFunction)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + KernelProcessContext processContext = + await this._fixture.StartProcessAsync( + processInstance, + kernel, + new KernelProcessEvent() + { + Id = "Start", + Data = new int[] { 1, 2, 3, 4, 5 } + }); + + // Assert + KernelProcess processState = await processContext.GetStateAsync(); + KernelProcessStepState unionState = (KernelProcessStepState)processState.Steps.Where(s => s.State.Name == "Union").Single().State; + + Assert.NotNull(unionState?.State); + Assert.Equal(55L, unionState.State.SquareResult); + Assert.Equal(225L, unionState.State.CubicResult); + } +} diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs index d539bd77b889..b59dd70211f4 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessContext.cs @@ -1,5 +1,4 @@ // Copyright (c) Microsoft. All rights reserved. - using System; using System.Threading.Tasks; using Microsoft.SemanticKernel.Process; @@ -14,17 +13,22 @@ public sealed class LocalKernelProcessContext : KernelProcessContext, IDisposabl private readonly LocalProcess _localProcess; private readonly Kernel _kernel; - internal LocalKernelProcessContext(KernelProcess process, Kernel kernel) + internal LocalKernelProcessContext(KernelProcess process, Kernel kernel, ProcessEventProxy? eventProxy = null) { Verify.NotNull(process, nameof(process)); Verify.NotNull(kernel, nameof(kernel)); Verify.NotNullOrWhiteSpace(process.State?.Name); this._kernel = kernel; - this._localProcess = new LocalProcess(process, kernel); + this._localProcess = new LocalProcess( + process, + kernel) + { + EventProxy = eventProxy + }; } - internal Task StartWithEventAsync(KernelProcessEvent? initialEvent, Kernel? kernel = null) => + internal Task StartWithEventAsync(KernelProcessEvent initialEvent, Kernel? kernel = null) => this._localProcess.RunOnceAsync(initialEvent, kernel); /// diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessFactory.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessFactory.cs index 289661310c35..4904366c9d39 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessFactory.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalKernelProcessFactory.cs @@ -18,12 +18,9 @@ public static class LocalKernelProcessFactory /// An instance of that can be used to interrogate or stop the running process. public static async Task StartAsync(this KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent) { - Verify.NotNull(process); - Verify.NotNullOrWhiteSpace(process.State?.Name); - Verify.NotNull(kernel); - Verify.NotNull(initialEvent); + Verify.NotNull(initialEvent, nameof(initialEvent)); - var processContext = new LocalKernelProcessContext(process, kernel); + LocalKernelProcessContext processContext = new(process, kernel); await processContext.StartWithEventAsync(initialEvent).ConfigureAwait(false); return processContext; } diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalMap.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalMap.cs new file mode 100644 index 000000000000..9065ad94e19d --- /dev/null +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalMap.cs @@ -0,0 +1,135 @@ +// Copyright (c) Microsoft. All rights reserved. +using System; +using System.Collections; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.SemanticKernel.Process.Internal; +using Microsoft.SemanticKernel.Process.Runtime; + +namespace Microsoft.SemanticKernel; + +internal sealed class LocalMap : LocalStep +{ + private readonly HashSet _mapEvents; + private readonly KernelProcessMap _map; + private readonly ILogger _logger; + + /// + /// Initializes a new instance of the class. + /// + /// The instance. + /// An instance of + internal LocalMap(KernelProcessMap map, Kernel kernel) + : base(map, kernel) + { + this._map = map; + this._logger = this._kernel.LoggerFactory?.CreateLogger(this._map.State.Name) ?? new NullLogger(); + this._mapEvents = [.. map.Edges.Keys.Select(key => key.Split(ProcessConstants.EventIdSeparator).Last())]; + } + + /// + internal override async Task HandleMessageAsync(ProcessMessage message) + { + // Initialize the current operation + (IEnumerable inputValues, KernelProcess mapOperation, string startEventId) = this._map.Initialize(message, this._logger); + + // Prepare state for map execution + int index = 0; + List<(Task Task, LocalKernelProcessContext ProcessContext, MapOperationContext Context)> mapOperations = []; + ConcurrentDictionary capturedEvents = []; + try + { + // Execute the map operation for each value + foreach (var value in inputValues) + { + ++index; + + KernelProcess process = mapOperation.CloneProcess(this._logger); + MapOperationContext context = new(this._mapEvents, capturedEvents); +#pragma warning disable CA2000 // Dispose objects before losing scope + LocalKernelProcessContext processContext = new(process, this._kernel, context.Filter); + Task processTask = + processContext.StartWithEventAsync( + new KernelProcessEvent + { + Id = startEventId, + Data = value + }); +#pragma warning restore CA2000 // Dispose objects before losing scope + + mapOperations.Add((processTask, processContext, context)); + } + + // Wait for all the map operations to complete + await Task.WhenAll(mapOperations.Select(p => p.Task)).ConfigureAwait(false); + + // Correlate the operation results to emit as the map result + Dictionary resultMap = []; + for (index = 0; index < mapOperations.Count; ++index) + { + foreach (KeyValuePair capturedEvent in capturedEvents) + { + string eventName = capturedEvent.Key; + Type resultType = capturedEvent.Value; + + mapOperations[index].Context.Results.TryGetValue(eventName, out object? result); + if (!resultMap.TryGetValue(eventName, out Array? results)) + { + results = Array.CreateInstance(resultType, mapOperations.Count); + resultMap[eventName] = results; + } + + results.SetValue(result, index); + } + } + + // Emit map results + foreach (string eventName in capturedEvents.Keys) + { + Array eventResult = resultMap[eventName]; + await this.EmitEventAsync(new() { Id = eventName, Data = eventResult }).ConfigureAwait(false); + } + } + finally + { + foreach (var operation in mapOperations) + { + operation.ProcessContext.Dispose(); + } + } + } + + /// + protected override ValueTask InitializeStepAsync() + { + // The map does not need any further initialization as it's already been initialized. + // Override the base method to prevent it from being called. + return default; + } + + private sealed record MapOperationContext(in HashSet EventTargets, in IDictionary CapturedEvents) + { + public ConcurrentDictionary Results { get; } = []; + + public bool Filter(ProcessEvent processEvent) + { + string eventName = processEvent.SourceId; + if (this.EventTargets.Contains(eventName)) + { + this.CapturedEvents.TryGetValue(eventName, out Type? resultType); + if (resultType is null || resultType == typeof(object)) + { + this.CapturedEvents[eventName] = processEvent.Data?.GetType() ?? typeof(object); + } + + this.Results[eventName] = processEvent.Data; + } + + return true; + } + } +} diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs index 7b4f239f8965..c35f2ed96c49 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs @@ -14,6 +14,8 @@ namespace Microsoft.SemanticKernel; +internal delegate bool ProcessEventProxy(ProcessEvent processEvent); + internal sealed class LocalProcess : LocalStep, IDisposable { private readonly JoinableTaskFactory _joinableTaskFactory; @@ -26,6 +28,7 @@ internal sealed class LocalProcess : LocalStep, IDisposable internal readonly KernelProcess _process; private readonly ILogger _logger; + private JoinableTask? _processTask; private CancellationTokenSource? _processCancelSource; @@ -34,9 +37,8 @@ internal sealed class LocalProcess : LocalStep, IDisposable /// /// The instance. /// An instance of - /// Optional. The Id of the parent process if one exists, otherwise null. - internal LocalProcess(KernelProcess process, Kernel kernel, string? parentProcessId = null) - : base(process, kernel, parentProcessId) + internal LocalProcess(KernelProcess process, Kernel kernel) + : base(process, kernel) { Verify.NotNull(process.Steps); @@ -72,9 +74,10 @@ internal async Task StartAsync(Kernel? kernel = null, bool keepAlive = true) /// Required. The to start the process with. /// Optional. A to use when executing the process. /// A - internal async Task RunOnceAsync(KernelProcessEvent? processEvent, Kernel? kernel = null) + internal async Task RunOnceAsync(KernelProcessEvent processEvent, Kernel? kernel = null) { Verify.NotNull(processEvent, nameof(processEvent)); + Verify.NotNullOrWhiteSpace(processEvent.Id, $"{nameof(processEvent)}.{nameof(KernelProcessEvent.Id)}"); await Task.Yield(); // Ensure that the process has an opportunity to run in a different synchronization context. await this._externalEventChannel.Writer.WriteAsync(processEvent).ConfigureAwait(false); @@ -175,30 +178,40 @@ private ValueTask InitializeProcessAsync() // The current step should already have a name. Verify.NotNull(step.State?.Name); - if (step is KernelProcess kernelStep) + if (step is KernelProcess processStep) { // The process will only have an Id if its already been executed. - if (string.IsNullOrWhiteSpace(kernelStep.State.Id)) + if (string.IsNullOrWhiteSpace(processStep.State.Id)) { - kernelStep = kernelStep with { State = kernelStep.State with { Id = Guid.NewGuid().ToString() } }; + processStep = processStep with { State = processStep.State with { Id = Guid.NewGuid().ToString() } }; } - var process = new LocalProcess( - process: kernelStep, - kernel: this._kernel, - parentProcessId: this.Id); - - localStep = process; + localStep = + new LocalProcess(processStep, this._kernel) + { + ParentProcessId = this.Id, + EventProxy = this.EventProxy, + }; + } + else if (step is KernelProcessMap mapStep) + { + localStep = + new LocalMap(mapStep, this._kernel) + { + ParentProcessId = this.Id, + }; } else { // The current step should already have an Id. Verify.NotNull(step.State?.Id); - localStep = new LocalStep( - stepInfo: step, - kernel: this._kernel, - parentProcessId: this.Id); + localStep = + new LocalStep(step, this._kernel) + { + ParentProcessId = this.Id, + EventProxy = this.EventProxy, + }; } this._steps.Add(localStep); diff --git a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs index 1b0183dc7753..2fe9287bafda 100644 --- a/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs +++ b/dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs @@ -1,5 +1,4 @@ // Copyright (c) Microsoft. All rights reserved. - using System; using System.Collections.Generic; using System.Linq; @@ -64,7 +63,7 @@ public LocalStep(KernelProcessStepInfo stepInfo, Kernel kernel, string? parentPr /// /// The Id of the parent process if one exists. /// - protected string? ParentProcessId { get; } + internal string? ParentProcessId { get; init; } /// /// The name of the step. @@ -76,6 +75,11 @@ public LocalStep(KernelProcessStepInfo stepInfo, Kernel kernel, string? parentPr /// internal string Id => this._stepInfo.State.Id!; + /// + /// An event proxy that can be used to intercept events emitted by the step. + /// + internal ProcessEventProxy? EventProxy { get; init; } + /// /// Retrieves all events that have been emitted by this step in the previous superstep. /// @@ -114,7 +118,14 @@ internal IEnumerable GetEdgeForEvent(string eventId) /// A public ValueTask EmitEventAsync(KernelProcessEvent processEvent) { - this.EmitEvent(ProcessEvent.Create(processEvent, this._eventNamespace)); + Verify.NotNullOrWhiteSpace(processEvent.Id, $"{nameof(processEvent)}.{nameof(KernelProcessEvent.Id)}"); + + ProcessEvent emitEvent = ProcessEvent.Create(processEvent, this._eventNamespace); + if (this.EventProxy?.Invoke(emitEvent) ?? true) + { + this.EmitEvent(emitEvent); + } + return default; } diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr.UnitTests/ProcessMessageSerializationTests.cs b/dotnet/src/Experimental/Process.Runtime.Dapr.UnitTests/ProcessMessageSerializationTests.cs index f3de5b7cfa32..3ed0e0c42cfb 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr.UnitTests/ProcessMessageSerializationTests.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr.UnitTests/ProcessMessageSerializationTests.cs @@ -19,7 +19,7 @@ public class ProcessMessageSerializationTests /// with out an explicit type definition for /// [Fact] - public void VerifySerializeEventSingleTest() + public void VerifySerializeMessageSingleTest() { // Arrange, Act & Assert VerifyContainerSerialization([CreateMessage(new() { { "Data", 3 } })]); @@ -35,7 +35,7 @@ public void VerifySerializeEventSingleTest() /// with out varying types assigned to for /// [Fact] - public void VerifySerializeEventMixedTest() + public void VerifySerializeMessageMixedTest() { // Arrange, Act & Assert VerifyContainerSerialization( @@ -54,7 +54,7 @@ public void VerifySerializeEventMixedTest() /// with out varying types assigned to for /// [Fact] - public void VerifySerializeEventManyTest() + public void VerifySerializeMessageManyTest() { // Arrange, Act & Assert VerifyContainerSerialization( diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ActorStateKeys.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ActorStateKeys.cs index 21fafcb94dc2..8aa73d3566ac 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ActorStateKeys.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ActorStateKeys.cs @@ -16,8 +16,12 @@ internal static class ActorStateKeys public const string StepStateType = "kernelStepStateType"; public const string StepIncomingMessagesState = "incomingMessagesState"; + // MapActor keys + public const string MapInfoState = nameof(DaprMapInfo); + // ProcessActor keys public const string ProcessInfoState = nameof(DaprProcessInfo); + public const string EventProxyStepId = "processEventProxyId"; public const string StepActivatedState = "kernelStepActivated"; // MessageBufferActor keys diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/MapActor.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/MapActor.cs new file mode 100644 index 000000000000..bf4724800e8d --- /dev/null +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/MapActor.cs @@ -0,0 +1,179 @@ +// Copyright (c) Microsoft. All rights reserved. +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Dapr.Actors.Runtime; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.SemanticKernel.Process.Internal; +using Microsoft.SemanticKernel.Process.Runtime; +using Microsoft.SemanticKernel.Process.Serialization; + +namespace Microsoft.SemanticKernel; + +internal sealed class MapActor : StepActor, IMap +{ + private const string DaprProcessMapStateName = nameof(DaprMapInfo); + + private bool _isInitialized; + private HashSet _mapEvents = []; + private ILogger? _logger; + private KernelProcessMap? _map; + + internal DaprMapInfo? _mapInfo; + + /// + /// Initializes a new instance of the class. + /// + /// The Dapr host actor + /// An instance of + public MapActor(ActorHost host, Kernel kernel) + : base(host, kernel) + { + } + + #region Public Actor Methods + + public async Task InitializeMapAsync(DaprMapInfo mapInfo, string? parentProcessId) + { + // Only initialize once. This check is required as the actor can be re-activated from persisted state and + // this should not result in multiple initializations. + if (this._isInitialized) + { + return; + } + + this.InitializeMapActor(mapInfo, parentProcessId); + + this._isInitialized = true; + + // Save the state + await this.StateManager.AddStateAsync(DaprProcessMapStateName, mapInfo).ConfigureAwait(false); + await this.StateManager.AddStateAsync(ActorStateKeys.StepParentProcessId, parentProcessId).ConfigureAwait(false); + await this.StateManager.SaveStateAsync().ConfigureAwait(false); + } + + /// + /// When the process is used as a step within another process, this method will be called + /// rather than ToKernelProcessAsync when extracting the state. + /// + /// A where T is + public override Task ToDaprStepInfoAsync() => Task.FromResult(this._mapInfo!); + + protected override async Task OnActivateAsync() + { + var existingMapInfo = await this.StateManager.TryGetStateAsync(DaprProcessMapStateName).ConfigureAwait(false); + if (existingMapInfo.HasValue) + { + this.ParentProcessId = await this.StateManager.GetStateAsync(ActorStateKeys.StepParentProcessId).ConfigureAwait(false); + this.InitializeMapActor(existingMapInfo.Value, this.ParentProcessId); + } + } + + /// + /// The name of the step. + /// + protected override string Name => this._mapInfo?.State.Name ?? throw new KernelException("The Map must be initialized before accessing the Name property."); + + #endregion + + /// + /// Handles a that has been sent to the map. + /// + /// The message to map. + internal override async Task HandleMessageAsync(ProcessMessage message) + { + // Initialize the current operation + (IEnumerable inputValues, KernelProcess mapOperation, string startEventId) = this._map!.Initialize(message, this._logger); + + List mapOperations = []; + foreach (var value in inputValues) + { + KernelProcess mapProcess = mapOperation with { State = mapOperation.State with { Id = $"{this.Name}-{mapOperations.Count}-{Guid.NewGuid():N}" } }; + DaprKernelProcessContext processContext = new(mapProcess); + Task processTask = + processContext.StartWithEventAsync( + new KernelProcessEvent + { + Id = startEventId, + Data = value + }, + eventProxyStepId: this.Id); + + mapOperations.Add(processTask); + } + + // Wait for all the map operations to complete + await Task.WhenAll(mapOperations).ConfigureAwait(false); + + // Retrieve all proxied events from the map operations + IEventBuffer proxyBuffer = this.ProxyFactory.CreateActorProxy(this.Id, nameof(EventBufferActor)); + IList proxyEvents = await proxyBuffer.DequeueAllAsync().ConfigureAwait(false); + IList processEvents = proxyEvents.ToProcessEvents(); + + // Survey the events to determine the type of the results associated with each event proxied by the map + Dictionary capturedEvents = []; + foreach (ProcessEvent processEvent in processEvents) + { + string eventName = processEvent.SourceId; + if (this._mapEvents.Contains(eventName)) + { + capturedEvents.TryGetValue(eventName, out Type? resultType); + if (resultType is null || resultType == typeof(object)) + { + capturedEvents[eventName] = processEvent.Data?.GetType() ?? typeof(object); + } + } + } + + // Correlate the operation results to emit as the map result + Dictionary resultMap = []; + Dictionary resultCounts = []; + + foreach (ProcessEvent processEvent in processEvents) + { + string eventName = processEvent.SourceId; + if (capturedEvents.TryGetValue(eventName, out Type? resultType)) + { + if (!resultMap.TryGetValue(eventName, out Array? results)) + { + results = Array.CreateInstance(resultType, mapOperations.Count); + resultMap[eventName] = results; + } + + resultCounts.TryGetValue(eventName, out int resultIndex); // resultIndex defaults to 0 when not found + results.SetValue(processEvent.Data, resultIndex); + resultCounts[eventName] = resultIndex + 1; + } + } + + // Emit map results + foreach (string eventName in capturedEvents.Keys) + { + Array eventResult = resultMap[eventName]; + await this.EmitEventAsync(new KernelProcessEvent() { Id = eventName, Data = eventResult }).ConfigureAwait(false); + } + } + + private void InitializeMapActor(DaprMapInfo mapInfo, string? parentProcessId) + { + Verify.NotNull(mapInfo); + Verify.NotNull(mapInfo.Operation); + + this._mapInfo = mapInfo; + this._map = mapInfo.ToKernelProcessMap(); + this.ParentProcessId = parentProcessId; + this._logger = this._kernel.LoggerFactory?.CreateLogger(this._mapInfo.State.Name) ?? new NullLogger(); + this._outputEdges = this._mapInfo.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToList()); + this._eventNamespace = $"{this._mapInfo.State.Name}_{this._mapInfo.State.Id}"; + + // Capture the events that the map is interested in as hashtable for performant lookup + this._mapEvents = [.. this._mapInfo.Edges.Keys.Select(key => key.Split(ProcessConstants.EventIdSeparator).Last())]; + + this._isInitialized = true; + } + + private sealed record TypedResult(Type ResultType, Array Results); +} diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs index eae4e5e9db36..5651a7858dd0 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/ProcessActor.cs @@ -47,7 +47,7 @@ public ProcessActor(ActorHost host, Kernel kernel) #region Public Actor Methods - public async Task InitializeProcessAsync(DaprProcessInfo processInfo, string? parentProcessId) + public async Task InitializeProcessAsync(DaprProcessInfo processInfo, string? parentProcessId, string? eventProxyStepId = null) { Verify.NotNull(processInfo); Verify.NotNull(processInfo.Steps); @@ -60,12 +60,16 @@ public async Task InitializeProcessAsync(DaprProcessInfo processInfo, string? pa } // Initialize the process - await this.InitializeProcessActorAsync(processInfo, parentProcessId).ConfigureAwait(false); + await this.InitializeProcessActorAsync(processInfo, parentProcessId, eventProxyStepId).ConfigureAwait(false); // Save the state await this.StateManager.AddStateAsync(ActorStateKeys.ProcessInfoState, processInfo).ConfigureAwait(false); await this.StateManager.AddStateAsync(ActorStateKeys.StepParentProcessId, parentProcessId).ConfigureAwait(false); await this.StateManager.AddStateAsync(ActorStateKeys.StepActivatedState, true).ConfigureAwait(false); + if (!string.IsNullOrWhiteSpace(eventProxyStepId)) + { + await this.StateManager.AddStateAsync(ActorStateKeys.EventProxyStepId, eventProxyStepId).ConfigureAwait(false); + } await this.StateManager.SaveStateAsync().ConfigureAwait(false); } @@ -168,7 +172,12 @@ protected override async Task OnActivateAsync() if (existingProcessInfo.HasValue) { this.ParentProcessId = await this.StateManager.GetStateAsync(ActorStateKeys.StepParentProcessId).ConfigureAwait(false); - await this.InitializeProcessActorAsync(existingProcessInfo.Value, this.ParentProcessId).ConfigureAwait(false); + string? eventProxyStepId = null; + if (await this.StateManager.ContainsStateAsync(ActorStateKeys.EventProxyStepId).ConfigureAwait(false)) + { + eventProxyStepId = await this.StateManager.GetStateAsync(ActorStateKeys.EventProxyStepId).ConfigureAwait(false); + } + await this.InitializeProcessActorAsync(existingProcessInfo.Value, this.ParentProcessId, eventProxyStepId).ConfigureAwait(false); } } @@ -221,7 +230,7 @@ protected override ValueTask ActivateStepAsync() return default; } - private async Task InitializeProcessActorAsync(DaprProcessInfo processInfo, string? parentProcessId) + private async Task InitializeProcessActorAsync(DaprProcessInfo processInfo, string? parentProcessId, string? eventProxyStepId) { Verify.NotNull(processInfo, nameof(processInfo)); Verify.NotNull(processInfo.Steps); @@ -230,6 +239,10 @@ private async Task InitializeProcessActorAsync(DaprProcessInfo processInfo, stri this._process = processInfo; this._stepsInfos = [.. this._process.Steps]; this._logger = this._kernel.LoggerFactory?.CreateLogger(this._process.State.Name) ?? new NullLogger(); + if (!string.IsNullOrWhiteSpace(eventProxyStepId)) + { + this.EventProxyStepId = new ActorId(eventProxyStepId); + } // Initialize the input and output edges for the process this._outputEdges = this._process.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToList()); @@ -242,20 +255,28 @@ private async Task InitializeProcessActorAsync(DaprProcessInfo processInfo, stri // The current step should already have a name. Verify.NotNull(step.State?.Name); - if (step is DaprProcessInfo kernelStep) + if (step is DaprProcessInfo processStep) { // The process will only have an Id if its already been executed. - if (string.IsNullOrWhiteSpace(kernelStep.State.Id)) + if (string.IsNullOrWhiteSpace(processStep.State.Id)) { - kernelStep = kernelStep with { State = kernelStep.State with { Id = Guid.NewGuid().ToString() } }; + processStep = processStep with { State = processStep.State with { Id = Guid.NewGuid().ToString() } }; } // Initialize the step as a process. - var scopedProcessId = this.ScopedActorId(new ActorId(kernelStep.State.Id!)); + var scopedProcessId = this.ScopedActorId(new ActorId(processStep.State.Id!)); var processActor = this.ProxyFactory.CreateActorProxy(scopedProcessId, nameof(ProcessActor)); - await processActor.InitializeProcessAsync(kernelStep, this.Id.GetId()).ConfigureAwait(false); + await processActor.InitializeProcessAsync(processStep, this.Id.GetId(), eventProxyStepId).ConfigureAwait(false); stepActor = this.ProxyFactory.CreateActorProxy(scopedProcessId, nameof(ProcessActor)); } + else if (step is DaprMapInfo mapStep) + { + // Initialize the step as a map. + ActorId scopedMapId = this.ScopedActorId(new ActorId(mapStep.State.Id!)); + IMap mapActor = this.ProxyFactory.CreateActorProxy(scopedMapId, nameof(MapActor)); + await mapActor.InitializeMapAsync(mapStep, this.Id.GetId()).ConfigureAwait(false); + stepActor = this.ProxyFactory.CreateActorProxy(scopedMapId, nameof(MapActor)); + } else { // The current step should already have an Id. @@ -263,7 +284,7 @@ private async Task InitializeProcessActorAsync(DaprProcessInfo processInfo, stri var scopedStepId = this.ScopedActorId(new ActorId(step.State.Id!)); stepActor = this.ProxyFactory.CreateActorProxy(scopedStepId, nameof(StepActor)); - await stepActor.InitializeStepAsync(step, this.Id.GetId()).ConfigureAwait(false); + await stepActor.InitializeStepAsync(step, this.Id.GetId(), eventProxyStepId).ConfigureAwait(false); } this._steps.Add(stepActor); @@ -436,9 +457,9 @@ private async Task IsEndMessageSentAsync() } /// - /// Builds a from the current . + /// Builds a from the current . /// - /// An instance of + /// An instance of /// private async Task ToDaprProcessInfoAsync() { diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs index 289a32364a00..f5445bdf0afc 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs @@ -23,12 +23,12 @@ internal class StepActor : Actor, IStep, IKernelProcessMessageChannel private DaprStepInfo? _stepInfo; private ILogger? _logger; - private string? _eventNamespace; private Type? _innerStepType; private bool _isInitialized; protected readonly Kernel _kernel; + protected string? _eventNamespace; internal Queue _incomingMessages = new(); internal KernelProcessStepState? _stepState; @@ -39,6 +39,7 @@ internal class StepActor : Actor, IStep, IKernelProcessMessageChannel internal Dictionary?>? _initialInputs = []; internal string? ParentProcessId; + internal ActorId? EventProxyStepId; /// /// Represents a step in a process that is running in-process. @@ -59,8 +60,9 @@ public StepActor(ActorHost host, Kernel kernel) /// /// The instance describing the step. /// The Id of the parent process if one exists. + /// An optional identifier of an actor requesting to proxy events. /// A - public async Task InitializeStepAsync(DaprStepInfo stepInfo, string? parentProcessId) + public async Task InitializeStepAsync(DaprStepInfo stepInfo, string? parentProcessId, string? eventProxyStepId = null) { Verify.NotNull(stepInfo, nameof(stepInfo)); @@ -71,11 +73,15 @@ public async Task InitializeStepAsync(DaprStepInfo stepInfo, string? parentProce return; } - await this.Int_InitializeStepAsync(stepInfo, parentProcessId).ConfigureAwait(false); + this.InitializeStep(stepInfo, parentProcessId, eventProxyStepId); // Save initial state await this.StateManager.AddStateAsync(ActorStateKeys.StepInfoState, stepInfo).ConfigureAwait(false); await this.StateManager.AddStateAsync(ActorStateKeys.StepParentProcessId, parentProcessId).ConfigureAwait(false); + if (!string.IsNullOrWhiteSpace(eventProxyStepId)) + { + await this.StateManager.AddStateAsync(ActorStateKeys.EventProxyStepId, eventProxyStepId).ConfigureAwait(false); + } await this.StateManager.SaveStateAsync().ConfigureAwait(false); } @@ -84,8 +90,8 @@ public async Task InitializeStepAsync(DaprStepInfo stepInfo, string? parentProce /// /// The instance describing the step. /// The Id of the parent process if one exists. - /// A - public Task Int_InitializeStepAsync(DaprStepInfo stepInfo, string? parentProcessId) + /// An optional identifier of an actor requesting to proxy events. + private void InitializeStep(DaprStepInfo stepInfo, string? parentProcessId, string? eventProxyStepId = null) { Verify.NotNull(stepInfo, nameof(stepInfo)); @@ -102,8 +108,13 @@ public Task Int_InitializeStepAsync(DaprStepInfo stepInfo, string? parentProcess this._logger = this._kernel.LoggerFactory?.CreateLogger(this._innerStepType) ?? new NullLogger(); this._outputEdges = this._stepInfo.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToList()); this._eventNamespace = $"{this._stepInfo.State.Name}_{this._stepInfo.State.Id}"; + + if (!string.IsNullOrWhiteSpace(eventProxyStepId)) + { + this.EventProxyStepId = new ActorId(eventProxyStepId); + } + this._isInitialized = true; - return Task.CompletedTask; } /// @@ -147,9 +158,9 @@ public async Task ProcessIncomingMessagesAsync() } /// - /// Extracts the current state of the step and returns it as a . + /// Extracts the current state of the step and returns it as a . /// - /// An instance of + /// An instance of public virtual async Task ToDaprStepInfoAsync() { // Lazy one-time initialization of the step before extracting state information. @@ -170,8 +181,13 @@ protected override async Task OnActivateAsync() if (existingStepInfo.HasValue) { // Initialize the step from persisted state - var parentProcessId = await this.StateManager.GetStateAsync(ActorStateKeys.StepParentProcessId).ConfigureAwait(false); - await this.Int_InitializeStepAsync(existingStepInfo.Value, parentProcessId).ConfigureAwait(false); + string? parentProcessId = await this.StateManager.GetStateAsync(ActorStateKeys.StepParentProcessId).ConfigureAwait(false); + string? eventProxyStepId = null; + if (await this.StateManager.ContainsStateAsync(ActorStateKeys.EventProxyStepId).ConfigureAwait(false)) + { + eventProxyStepId = await this.StateManager.GetStateAsync(ActorStateKeys.EventProxyStepId).ConfigureAwait(false); + } + this.InitializeStep(existingStepInfo.Value, parentProcessId, eventProxyStepId); // Load the persisted incoming messages var incomingMessages = await this.StateManager.TryGetStateAsync>(ActorStateKeys.StepIncomingMessagesState).ConfigureAwait(false); @@ -360,8 +376,13 @@ protected virtual async ValueTask ActivateStepAsync() this._stepState = stateObject; this._stepStateType = stateType; - methodInfo.Invoke(stepInstance, [stateObject]); + + ValueTask activateTask = + (ValueTask?)methodInfo.Invoke(stepInstance, [stateObject]) ?? + throw new KernelException("The ActivateAsync method failed to complete.").Log(this._logger); + await stepInstance.ActivateAsync(stateObject).ConfigureAwait(false); + await activateTask.ConfigureAwait(false); } /// @@ -393,9 +414,15 @@ internal async ValueTask EmitEventAsync(ProcessEvent daprEvent) } } + if (this.EventProxyStepId != null) + { + IEventBuffer proxyBuffer = this.ProxyFactory.CreateActorProxy(this.EventProxyStepId, nameof(EventBufferActor)); + await proxyBuffer.EnqueueAsync(daprEvent.ToJson()).ConfigureAwait(false); + } + // Get the edges for the event and queue up the messages to be sent to the next steps. bool foundEdge = false; - foreach (var edge in this.GetEdgeForEvent(daprEvent.QualifiedId)) + foreach (KernelProcessEdge edge in this.GetEdgeForEvent(daprEvent.QualifiedId)) { ProcessMessage message = ProcessMessageFactory.CreateFromEdge(edge, daprEvent.Data); ActorId scopedStepId = this.ScopedActorId(new ActorId(edge.OutputTarget.StepId)); diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs index fd055003146b..f09fa4f39222 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessContext.cs @@ -36,11 +36,11 @@ internal DaprKernelProcessContext(KernelProcess process) /// Starts the process with an initial event. /// /// The initial event. - /// - internal async Task StartWithEventAsync(KernelProcessEvent initialEvent) + /// An optional identifier of an actor requesting to proxy events. + internal async Task StartWithEventAsync(KernelProcessEvent initialEvent, ActorId? eventProxyStepId = null) { var daprProcess = DaprProcessInfo.FromKernelProcess(this._process); - await this._daprProcess.InitializeProcessAsync(daprProcess, null).ConfigureAwait(false); + await this._daprProcess.InitializeProcessAsync(daprProcess, null, eventProxyStepId?.GetId()).ConfigureAwait(false); await this._daprProcess.RunOnceAsync(initialEvent.ToJson()).ConfigureAwait(false); } diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessFactory.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessFactory.cs index 8e84d878d034..b1e5cb55d562 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessFactory.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprKernelProcessFactory.cs @@ -27,7 +27,7 @@ public static async Task StartAsync(this KernelProcess process = process with { State = process.State with { Id = processId } }; } - var processContext = new DaprKernelProcessContext(process); + DaprKernelProcessContext processContext = new(process); await processContext.StartWithEventAsync(initialEvent).ConfigureAwait(false); return processContext; } diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprMapInfo.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprMapInfo.cs new file mode 100644 index 000000000000..06b3193f6691 --- /dev/null +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprMapInfo.cs @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft. All rights reserved. +using System.Runtime.Serialization; + +namespace Microsoft.SemanticKernel; + +/// +/// A serializable representation of a Dapr Map. +/// +[KnownType(typeof(KernelProcessEdge))] +[KnownType(typeof(KernelProcessMapState))] +[KnownType(typeof(KernelProcessStepState))] +[KnownType(typeof(KernelProcessStepState<>))] +public sealed record DaprMapInfo : DaprStepInfo +{ + /// + /// The map operation + /// + public required DaprStepInfo Operation { get; init; } + + /// + /// Initializes a new instance of the class from this instance of . + /// + /// An instance of + /// + public KernelProcessMap ToKernelProcessMap() + { + KernelProcessStepInfo processStepInfo = this.ToKernelProcessStepInfo(); + if (this.State is not KernelProcessMapState state) + { + throw new KernelException($"Unable to read state from map with name '{this.State.Name}' and Id '{this.State.Id}'."); + } + + KernelProcessStepInfo operationStep = + this.Operation is DaprProcessInfo processInfo + ? processInfo.ToKernelProcess() + : this.Operation.ToKernelProcessStepInfo(); + + return new KernelProcessMap(state, operationStep, this.Edges); + } + + /// + /// Initializes a new instance of the class from an instance of . + /// + /// The used to build the + /// An instance of + public static DaprMapInfo FromKernelProcessMap(KernelProcessMap processMap) + { + Verify.NotNull(processMap); + + DaprStepInfo operationInfo = + processMap.Operation is KernelProcess processOperation + ? DaprProcessInfo.FromKernelProcess(processOperation) + : DaprStepInfo.FromKernelStepInfo(processMap.Operation); + DaprStepInfo mapStepInfo = DaprStepInfo.FromKernelStepInfo(processMap); + + return new DaprMapInfo + { + InnerStepDotnetType = mapStepInfo.InnerStepDotnetType, + State = mapStepInfo.State, + Edges = mapStepInfo.Edges, + Operation = operationInfo, + }; + } +} diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprProcessInfo.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprProcessInfo.cs index 5086eedbbddb..e0ed2064b95f 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprProcessInfo.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprProcessInfo.cs @@ -10,6 +10,7 @@ namespace Microsoft.SemanticKernel; /// [KnownType(typeof(KernelProcessEdge))] [KnownType(typeof(KernelProcessState))] +[KnownType(typeof(KernelProcessMapState))] [KnownType(typeof(KernelProcessStepState))] [KnownType(typeof(KernelProcessStepState<>))] public sealed record DaprProcessInfo : DaprStepInfo @@ -39,6 +40,10 @@ public KernelProcess ToKernelProcess() { steps.Add(processStep.ToKernelProcess()); } + else if (step is DaprMapInfo mapStep) + { + steps.Add(mapStep.ToKernelProcessMap()); + } else { steps.Add(step.ToKernelProcessStepInfo()); @@ -62,9 +67,13 @@ public static DaprProcessInfo FromKernelProcess(KernelProcess kernelProcess) foreach (var step in kernelProcess.Steps) { - if (step is KernelProcess kernelStep) + if (step is KernelProcess processStep) + { + daprSteps.Add(DaprProcessInfo.FromKernelProcess(processStep)); + } + else if (step is KernelProcessMap mapStep) { - daprSteps.Add(DaprProcessInfo.FromKernelProcess(kernelStep)); + daprSteps.Add(DaprMapInfo.FromKernelProcessMap(mapStep)); } else { diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs index ed096bf14955..777814770fdb 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/DaprStepInfo.cs @@ -14,7 +14,9 @@ namespace Microsoft.SemanticKernel; [KnownType(typeof(KernelProcessEdge))] [KnownType(typeof(KernelProcessStepState))] [KnownType(typeof(DaprProcessInfo))] +[KnownType(typeof(DaprMapInfo))] [JsonDerivedType(typeof(DaprProcessInfo))] +[JsonDerivedType(typeof(DaprMapInfo))] public record DaprStepInfo { /// diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IMap.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IMap.cs new file mode 100644 index 000000000000..483a6bc3502a --- /dev/null +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IMap.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft. All rights reserved. +using System.Threading.Tasks; +using Dapr.Actors; + +namespace Microsoft.SemanticKernel; + +/// +/// An interface that represents a step in a process. +/// +public interface IMap : IActor +{ + /// + /// Initializes the step with the provided step information. + /// + /// A + /// + Task InitializeMapAsync(DaprMapInfo mapInfo, string? parentProcessId); +} diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IProcess.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IProcess.cs index 59855e5ad45a..8a4ce788b51d 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IProcess.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IProcess.cs @@ -15,8 +15,9 @@ public interface IProcess : IActor, IStep /// /// Used to initialize the process. /// The parent Id of the process if one exists. + /// An optional identifier of an actor requesting to proxy events. /// A - Task InitializeProcessAsync(DaprProcessInfo processInfo, string? parentProcessId); + Task InitializeProcessAsync(DaprProcessInfo processInfo, string? parentProcessId, string? eventProxyStepId); /// /// Starts an initialized process. diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IStep.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IStep.cs index 61a9bfc5adaf..1cac1d945217 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IStep.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/Interfaces/IStep.cs @@ -15,7 +15,7 @@ public interface IStep : IActor /// /// A /// - Task InitializeStepAsync(DaprStepInfo stepInfo, string? parentProcessId); + Task InitializeStepAsync(DaprStepInfo stepInfo, string? parentProcessId, string? eventProxyStepId); /// /// Triggers the step to dequeue all pending messages and prepare for processing. diff --git a/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs b/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs index 6f1082ff20ca..52f86899d608 100644 --- a/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs +++ b/dotnet/src/Experimental/Process.Runtime.Dapr/KernelProcessDaprExtensions.cs @@ -16,10 +16,11 @@ public static class KernelProcessDaprExtensions public static void AddProcessActors(this ActorRuntimeOptions actorOptions) { // Register actor types and configure actor settings - actorOptions.Actors.RegisterActor(); - actorOptions.Actors.RegisterActor(); - actorOptions.Actors.RegisterActor(); - actorOptions.Actors.RegisterActor(); - actorOptions.Actors.RegisterActor(); + actorOptions.Actors.RegisterActor(); + actorOptions.Actors.RegisterActor(); + actorOptions.Actors.RegisterActor(); + actorOptions.Actors.RegisterActor(); + actorOptions.Actors.RegisterActor(); + actorOptions.Actors.RegisterActor(); } } diff --git a/dotnet/src/Experimental/Process.UnitTests/Core/ProcessMapBuilderTests.cs b/dotnet/src/Experimental/Process.UnitTests/Core/ProcessMapBuilderTests.cs new file mode 100644 index 000000000000..cdc4bc737fb0 --- /dev/null +++ b/dotnet/src/Experimental/Process.UnitTests/Core/ProcessMapBuilderTests.cs @@ -0,0 +1,237 @@ +// Copyright (c) Microsoft. All rights reserved. +using System; +using System.Linq; +using System.Threading.Tasks; +using Xunit; + +namespace Microsoft.SemanticKernel.Process.Core.UnitTests; + +/// +/// Unit tests for . +/// +public class ProcessMapBuilderTests +{ + /// + /// Verify initialization based on . + /// + [Fact] + public void ProcessMapBuilderFromStep() + { + // Arrange + ProcessStepBuilder step = new($"One{nameof(SimpleTestStep)}"); + + // Act + ProcessMapBuilder map = new(step); + + // Assert + Assert.NotNull(map.Id); + Assert.NotNull(map.Name); + Assert.Contains(nameof(SimpleTestStep), map.Name); + Assert.NotNull(map.MapOperation); + Assert.Equal(step, map.MapOperation); + } + + /// + /// Verify cannot be a function target. + /// + [Fact] + public void ProcessMapBuilderFromMap() + { + // Arrange + ProcessStepBuilder step = new($"One{nameof(SimpleTestStep)}"); + ProcessMapBuilder map1 = new(step); + ProcessMapBuilder map2 = new(step); + + // Act & Assert + Assert.Throws(() => map1.OnEvent("any").SendEventTo(new ProcessFunctionTargetBuilder(map2))); + } + + /// + /// Verify initialization based on . + /// + [Fact] + public void ProcessMapBuilderFromProcess() + { + // Arrange + ProcessBuilder process = new("MapOperation"); + ProcessStepBuilder step = process.AddStepFromType($"One{nameof(SimpleTestStep)}"); + process.OnInputEvent("ComputeMapValue").SendEventTo(new ProcessFunctionTargetBuilder(step)); + + // Act + ProcessMapBuilder map = new(process); + + // Assert + Assert.NotNull(map.Id); + Assert.NotNull(map.Name); + Assert.Contains(process.Name, map.Name); + Assert.NotNull(map.MapOperation); + Assert.Equal(process, map.MapOperation); + } + + /// + /// Verify is able to define targets / output edges. + /// + [Fact] + public void ProcessMapBuilderCanDefineTarget() + { + // Arrange + ProcessStepBuilder step = new($"One{nameof(SimpleTestStep)}"); + ProcessMapBuilder map = new(step); + + // Act + ProcessStepBuilder step2 = new($"Two{nameof(SimpleTestStep)}"); + map.OnEvent("Any").SendEventTo(new ProcessFunctionTargetBuilder(step2)); + + // Assert + Assert.Single(map.Edges); + Assert.Single(map.Edges.Single().Value); + Assert.NotNull(map.Edges.Single().Value[0].Target); + Assert.Equal(step2, map.Edges.Single().Value[0].Target!.Step); + + // Act + KernelProcessStepInfo processMap = map.BuildStep(); + + // Assert + Assert.NotNull(processMap); + Assert.Equal(processMap.Edges.Count, map.Edges.Count); + Assert.Equal(processMap.Edges.Single().Value.Count, map.Edges.First().Value.Count); + Assert.Equal(processMap.Edges.Single().Value.Single().OutputTarget!.StepId, map.Edges.Single().Value[0].Target!.Step.Id); + } + + /// + /// Verify always throws. + /// + [Fact] + public void ProcessMapBuilderGetFunctionMetadataMapThrows() + { + // Arrange + ProcessStepBuilder step = new($"One{nameof(SimpleTestStep)}"); + ProcessMapBuilder map = new(step); + + // Act + Assert.Throws(() => map.GetFunctionMetadataMap()); + } + + /// + /// Verify produces the + /// expected . + /// + [Fact] + public void ProcessMapBuilderWillBuild() + { + // Arrange + ProcessStepBuilder step = new($"One{nameof(SimpleTestStep)}"); + ProcessMapBuilder map = new(step); + + // Act + KernelProcessStepInfo processMap = map.BuildStep(); + + // Assert + Assert.NotNull(processMap); + Assert.IsType(processMap); + Assert.Equal(map.Name, processMap.State.Name); + Assert.Equal(map.Id, processMap.State.Id); + } + + /// + /// Verify throws an exception + /// if the target is a > without the having + /// defined. + /// While this state should not be achievable by external callers, the + /// underlying state contracts do permit this permutation. + /// + [Fact] + public void ProcessMapBuilderFailsBuildForMapTarget() + { + // Arrange + ProcessBuilder process = new(nameof(InvalidTestStep)); + ProcessStepBuilder step = process.AddStepFromType(); + ProcessFunctionTargetBuilder invalidTarget = new(new ProcessMapBuilder(step)); + + // Act & Assert + Assert.Throws(() => new ProcessMapBuilder(step).OnEvent("Test").SendEventTo(invalidTarget)); + } + + /// + /// Verify throws an exception + /// if the target is a > without the having + /// defined. + /// While this state should not be achievable by external callers, the + /// underlying state contracts do permit this permutation. + /// + [Fact] + public void ProcessMapBuilderFailsBuildForInvalidTarget() + { + // Arrange + ProcessBuilder process = new(nameof(InvalidTestStep)); + ProcessStepBuilder step = process.AddStepFromType(); + + // Act & Assert + Assert.Throws(() => step.OnEvent("Test").SendEventTo(new ProcessFunctionTargetBuilder(new ProcessMapBuilder(step), "missing"))); + } + + private sealed class SimpleTestStep : KernelProcessStep + { + private TestState? _state; + + public override ValueTask ActivateAsync(KernelProcessStepState state) + { + this._state = state.State; + + return ValueTask.CompletedTask; + } + + [KernelFunction] + public void TestFunction(Guid value) + { + Assert.NotNull(this._state); + } + } + + private sealed class InvalidTestStep : KernelProcessStep + { + private TestState? _state; + + public override ValueTask ActivateAsync(KernelProcessStepState state) + { + this._state = state.State; + + return ValueTask.CompletedTask; + } + + [KernelFunction] + public void TestFunction() + { + Assert.NotNull(this._state); + } + } + + private sealed class ComplexTestStep : KernelProcessStep + { + private TestState? _state; + + public override ValueTask ActivateAsync(KernelProcessStepState state) + { + this._state = state.State; + + return ValueTask.CompletedTask; + } + + [KernelFunction] + public void TestFunctionA(Guid value) + { + Assert.NotNull(this._state); + } + + [KernelFunction] + public void TestFunctionB(Guid value) + { + Assert.NotNull(this._state); + } + } + + private sealed class TestState + { + public Guid Value { get; set; } + } +} diff --git a/dotnet/src/Experimental/Process.UnitTests/KernelProcessMapTests.cs b/dotnet/src/Experimental/Process.UnitTests/KernelProcessMapTests.cs new file mode 100644 index 000000000000..a437c97cc957 --- /dev/null +++ b/dotnet/src/Experimental/Process.UnitTests/KernelProcessMapTests.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft. All rights reserved. +using System; +using Xunit; + +namespace Microsoft.SemanticKernel.Process.UnitTests; + +/// +/// Unit testing of . +/// +public class KernelProcessMapTests +{ + /// + /// Verify initialization. + /// + [Fact] + public void KernelProcessMapStateInitialization() + { + // Arrange + KernelProcessState processState = new("Operation", "vTest"); + KernelProcess process = new(processState, [], []); + KernelProcessMapState state = new(nameof(KernelProcessMapStateInitialization), "vTest", Guid.NewGuid().ToString()); + + // Act + KernelProcessMap map = new(state, process, []); + + // Assert + Assert.Equal(state, map.State); + Assert.Equivalent(process, map.Operation); + Assert.Empty(map.Edges); + } + + /// + /// Verify requires a name and id + /// + [Fact] + public void KernelProcessMapStateRequiredProperties() + { + // Act & Assert + Assert.Throws(() => new KernelProcessMapState(name: null!, "vTest", "testid")); + Assert.Throws(() => new KernelProcessMapState(name: "testname", null!, "testid")); + Assert.Throws(() => new KernelProcessMapState("testname", "vTest", null!)); + } +} diff --git a/dotnet/src/Experimental/Process.UnitTests/KernelProcessSerializationTests.cs b/dotnet/src/Experimental/Process.UnitTests/KernelProcessSerializationTests.cs new file mode 100644 index 000000000000..dbb21073ca00 --- /dev/null +++ b/dotnet/src/Experimental/Process.UnitTests/KernelProcessSerializationTests.cs @@ -0,0 +1,244 @@ +// Copyright (c) Microsoft. All rights reserved. +using System; +using System.Collections.Generic; +using System.Text.Json; +using System.Threading.Tasks; +using Microsoft.SemanticKernel.Process.Models; +using Xunit; + +namespace Microsoft.SemanticKernel.Process.UnitTests; + +/// +/// Unit testing of +/// and associated operations. +/// +public class KernelProcessSerializationTests +{ + private static readonly JsonSerializerOptions s_serializerOptions = new() { WriteIndented = true }; + + /// + /// Verify serialization of process with step. + /// + [Fact] + public void KernelProcessSerialization() + { + // Arrange + ProcessBuilder processBuilder = new(nameof(KernelProcessSerialization)); + processBuilder.AddStepFromType("SimpleStep"); + processBuilder.AddStepFromType(new StepState { Id = Guid.NewGuid() }, "StatefulStep"); + KernelProcess process = processBuilder.Build(); + + // Act + KernelProcessStateMetadata processState = process.ToProcessStateMetadata(); + + // Assert + AssertProcessState(process, processState); + + // Act + string json = JsonSerializer.Serialize(processState, s_serializerOptions); + KernelProcessStateMetadata? copyState = JsonSerializer.Deserialize(json); + + // Assert + Assert.NotNull(copyState); + AssertProcessState(process, copyState); + + // Arrange + ProcessBuilder anotherBuilder = new(nameof(KernelProcessSerialization)); + anotherBuilder.AddStepFromType("SimpleStep"); + anotherBuilder.AddStepFromType("StatefulStep"); + KernelProcess another = anotherBuilder.Build(copyState); + + AssertProcess(process, another); + } + + /// + /// Verify serialization of process with subprocess. + /// + [Fact] + public void KernelSubProcessSerialization() + { + // Arrange + ProcessBuilder processBuilder = new(nameof(KernelProcessSerialization)); + ProcessBuilder subProcessBuilder = new("subprocess"); + subProcessBuilder.AddStepFromType("SimpleStep"); + subProcessBuilder.AddStepFromType(new StepState { Id = Guid.NewGuid() }, "StatefulStep"); + processBuilder.AddStepFromProcess(subProcessBuilder); + KernelProcess process = processBuilder.Build(); + + // Act + KernelProcessStateMetadata processState = process.ToProcessStateMetadata(); + + // Assert + AssertProcessState(process, processState); + + // Act + string json = JsonSerializer.Serialize(processState, s_serializerOptions); + KernelProcessStateMetadata? copyState = JsonSerializer.Deserialize(json); + + // Assert + Assert.NotNull(copyState); + AssertProcessState(process, copyState); + + // Arrange + ProcessBuilder anotherBuilder = new(nameof(KernelProcessSerialization)); + ProcessBuilder anotherSubBuilder = new("subprocess"); + anotherSubBuilder.AddStepFromType("SimpleStep"); + anotherSubBuilder.AddStepFromType("StatefulStep"); + anotherBuilder.AddStepFromProcess(anotherSubBuilder); + KernelProcess another = anotherBuilder.Build(copyState); + + AssertProcess(process, another); + } + + /// + /// Verify serialization of process with map-step. + /// + [Fact] + public void KernelProcessMapSerialization() + { + ProcessBuilder processBuilder = new(nameof(KernelProcessSerialization)); + processBuilder.AddMapStepFromType(new StepState { Id = Guid.NewGuid() }, "StatefulStep"); + KernelProcess process = processBuilder.Build(); + + // Act + KernelProcessStateMetadata processState = process.ToProcessStateMetadata(); + + // Assert + AssertProcessState(process, processState); + + // Act + string json = JsonSerializer.Serialize(processState, s_serializerOptions); + KernelProcessStateMetadata? copyState = JsonSerializer.Deserialize(json); + + // Assert + Assert.NotNull(copyState); + AssertProcessState(process, copyState); + + // Arrange + ProcessBuilder anotherBuilder = new(nameof(KernelProcessSerialization)); + anotherBuilder.AddMapStepFromType("StatefulStep"); + KernelProcess another = anotherBuilder.Build(copyState); + + AssertProcess(process, another); + } + + private static void AssertProcess(KernelProcess expectedProcess, KernelProcess anotherProcess) + { + Assert.Equal(expectedProcess.State.Name, anotherProcess.State.Name); + Assert.Equal(expectedProcess.State.Version, anotherProcess.State.Version); + Assert.Equal(expectedProcess.Steps.Count, anotherProcess.Steps.Count); + + for (int index = 0; index < expectedProcess.Steps.Count; ++index) + { + AssertStep(expectedProcess.Steps[index], anotherProcess.Steps[index]); + } + } + + private static void AssertStep(KernelProcessStepInfo expectedStep, KernelProcessStepInfo actualStep) + { + Assert.Equal(expectedStep.InnerStepType, actualStep.InnerStepType); + Assert.Equal(expectedStep.State.Name, actualStep.State.Name); + Assert.Equal(expectedStep.State.Version, actualStep.State.Version); + + if (expectedStep is KernelProcessMap mapStep) + { + Assert.IsType(actualStep); + AssertStep(mapStep.Operation, ((KernelProcessMap)actualStep).Operation); + } + else if (expectedStep is KernelProcess subProcess) + { + Assert.IsType(actualStep); + AssertProcess(subProcess, (KernelProcess)actualStep); + } + else if (expectedStep.State is KernelProcessStepState stepState) + { + Assert.IsType>(actualStep.State); + KernelProcessStepState actualState = (KernelProcessStepState)actualStep.State; + Assert.NotNull(stepState.State); + Assert.NotNull(actualState.State); + Assert.Equal(stepState.State.Id, actualState.State.Id); + } + } + + private static void AssertProcessState(KernelProcess process, KernelProcessStateMetadata? savedProcess) + { + Assert.NotNull(savedProcess); + Assert.Equal(process.State.Id, savedProcess.Id); + Assert.Equal(process.State.Name, savedProcess.Name); + Assert.Equal(process.State.Version, savedProcess.VersionInfo); + Assert.NotNull(savedProcess.StepsState); + Assert.Equal(process.Steps.Count, savedProcess.StepsState.Count); + + foreach (KernelProcessStepInfo step in process.Steps) + { + AssertStepState(step, savedProcess.StepsState); + } + } + + private static void AssertStepState(KernelProcessStepInfo step, Dictionary savedSteps) + { + Assert.True(savedSteps.ContainsKey(step.State.Name)); + KernelProcessStepStateMetadata savedStep = savedSteps[step.State.Name]; + Assert.Equal(step.State.Id, savedStep.Id); + Assert.Equal(step.State.Name, savedStep.Name); + Assert.Equal(step.State.Version, savedStep.VersionInfo); + + if (step is KernelProcessMap mapStep) + { + Assert.IsType(savedStep); + KernelProcessMapStateMetadata mapState = (KernelProcessMapStateMetadata)savedStep; + Assert.NotNull(mapState.OperationState); + Assert.NotNull(mapState.OperationState.Name); + AssertStepState(mapStep.Operation, new() { { mapState.OperationState.Name, mapState.OperationState } }); + } + else if (step is KernelProcess subProcess) + { + Assert.IsType(savedStep); + AssertProcessState(subProcess, (KernelProcessStateMetadata)savedStep); + } + else if (step.State is KernelProcessStepState stepState) + { + Assert.NotNull(savedStep.State); + if (savedStep.State is JsonElement jsonState) + { + StepState? savedState = jsonState.Deserialize(); + Assert.NotNull(savedState); + Assert.NotNull(stepState.State); + Assert.Equal(stepState.State.Id, savedState.Id); + } + else + { + Assert.Equal(stepState.State, (StepState)savedStep.State); + } + } + } + + private sealed class SimpleStep : KernelProcessStep + { + [KernelFunction] + public void RunSimple() + { + } + } + + private sealed class StepState + { + public Guid Id { get; set; } = Guid.Empty; + } + + private sealed class StatefulStep : KernelProcessStep + { + private StepState? _state; + + public override ValueTask ActivateAsync(KernelProcessStepState state) + { + this._state = state.State; + return default; + } + + [KernelFunction] + public void RunStateful() + { + } + } +} diff --git a/dotnet/src/Experimental/Process.UnitTests/KernelProcessStateTests.cs b/dotnet/src/Experimental/Process.UnitTests/KernelProcessStateTests.cs index ea690281ed5d..c20509037a99 100644 --- a/dotnet/src/Experimental/Process.UnitTests/KernelProcessStateTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/KernelProcessStateTests.cs @@ -21,7 +21,7 @@ public void KernelProcessStateInitializationSetsPropertiesCorrectly() string id = "123"; // Act - var state = new KernelProcessState(name, "v1", id); + KernelProcessState state = new(name, "v1", id); // Assert Assert.Equal(name, state.Name); @@ -38,7 +38,7 @@ public void KernelProcessStateInitializationWithNullIdSucceeds() string name = "TestProcess"; // Act - var state = new KernelProcessState(name, version: "v1"); + KernelProcessState state = new(name, version: "v1"); // Assert Assert.Equal(name, state.Name); @@ -52,9 +52,7 @@ public void KernelProcessStateInitializationWithNullIdSucceeds() public void KernelProcessStateInitializationWithNullNameThrows() { // Act & Assert -#pragma warning disable CS8625 // Cannot convert null literal to non-nullable reference type. - var ex = Assert.Throws(() => new KernelProcessState(name: null, version: "v1")); -#pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type. + var ex = Assert.Throws(() => new KernelProcessState(name: null!, version: "v1")); } /// @@ -64,8 +62,6 @@ public void KernelProcessStateInitializationWithNullNameThrows() public void KernelProcessStateInitializationWithNullVersionThrows() { // Act & Assert -#pragma warning disable CS8625 // Cannot convert null literal to non-nullable reference type. - var ex = Assert.Throws(() => new KernelProcessState(name: "stateName", version: null)); -#pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type. + var ex = Assert.Throws(() => new KernelProcessState(name: "stateName", version: null!)); } } diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalMapTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalMapTests.cs new file mode 100644 index 000000000000..ca3d644d27be --- /dev/null +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalMapTests.cs @@ -0,0 +1,543 @@ +// Copyright (c) Microsoft. All rights reserved. +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Microsoft.SemanticKernel.Process.Runtime.Local.UnitTests; + +/// +/// Unit tests for the class. +/// +public class LocalMapTests +{ + /// + /// Validates the result as the first step in the process + /// and with a step as the map operation. + /// + [Fact] + public async Task ProcessMapResultAsFirstAsync() + { + // Arrange + ProcessBuilder process = new(nameof(ProcessMapResultAsFirstAsync)); + + ProcessMapBuilder mapStep = process.AddMapStepFromType(); + process + .OnInputEvent("Start") + .SendEventTo(new ProcessFunctionTargetBuilder(mapStep)); + + ProcessStepBuilder unionStep = process.AddStepFromType("Union"); + mapStep + .OnEvent(ComputeStep.SquareEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumSquareFunction)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + + // Assert + UnionState unionState = await GetUnionStateAsync(processContext); + Assert.Equal(55L, unionState.SquareResult); + } + + /// + /// Validates the filtering on a specific event (cubic, not square). + /// + [Fact] + public async Task ProcessMapResultFilterEventAsync() + { + // Arrange + ProcessBuilder process = new(nameof(ProcessMapResultFilterEventAsync)); + + ProcessMapBuilder mapStep = process.AddMapStepFromType(); + process + .OnInputEvent("Start") + .SendEventTo(new ProcessFunctionTargetBuilder(mapStep)); + + ProcessStepBuilder unionStep = process.AddStepFromType("Union"); + mapStep + .OnEvent(ComputeStep.CubicEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumSquareFunction)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + + // Assert + UnionState unionState = await GetUnionStateAsync(processContext); + Assert.Equal(225L, unionState.SquareResult); + } + + /// + /// Validates the result as the first step in the process + /// and with a step as the map operation. + /// + [Fact] + public async Task ProcessMapResultWithTransformAsync() + { + // Arrange + ProcessBuilder process = new(nameof(ProcessMapResultWithTransformAsync)); + + ProcessMapBuilder mapStep = process.AddMapStepFromType(); + process + .OnInputEvent("Start") + .SendEventTo(new ProcessFunctionTargetBuilder(mapStep)); + + ProcessStepBuilder unionStep = process.AddStepFromType("Union"); + mapStep + .OnEvent(FormatStep.EventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.FormatFunction)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + + // Assert + UnionState unionState = await GetUnionStateAsync(processContext); + Assert.Equal("[1]/[2]/[3]/[4]/[5]", unionState.FormatResult); + } + + /// + /// Validates the result when the operation step + /// contains multiple function targets. + /// + [Fact] + public async Task ProcessMapResultOperationTargetAsync() + { + // Arrange + ProcessBuilder process = new(nameof(ProcessMapResultOperationTargetAsync)); + + ProcessMapBuilder mapStep = process.AddMapStepFromType(); + process + .OnInputEvent("Start") + .SendEventTo(new ProcessFunctionTargetBuilder(mapStep, ComplexStep.ComputeFunction)); + + ProcessStepBuilder unionStep = process.AddStepFromType("Union"); + mapStep + .OnEvent(ComplexStep.ComputeEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumSquareFunction)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + + // Assert + UnionState unionState = await GetUnionStateAsync(processContext); + Assert.Equal(55L, unionState.SquareResult); + } + + /// + /// Validates the result as the second step in the process + /// and with a step as the map operation. + /// + [Fact] + public async Task ProcessMapResultAsTargetAsync() + { + // Arrange + ProcessBuilder process = new(nameof(ProcessMapResultOperationTargetAsync)); + + ProcessStepBuilder initStep = process.AddStepFromType(); + process + .OnInputEvent("Start") + .SendEventTo(new ProcessFunctionTargetBuilder(initStep)); + + ProcessMapBuilder mapStep = process.AddMapStepFromType(); + initStep + .OnEvent(InitialStep.EventId) + .SendEventTo(new ProcessFunctionTargetBuilder(mapStep)); + + ProcessStepBuilder unionStep = process.AddStepFromType("Union"); + mapStep + .OnEvent(ComputeStep.SquareEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumSquareFunction)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + + // Assert + UnionState unionState = await GetUnionStateAsync(processContext); + Assert.Equal(55L, unionState.SquareResult); + } + + /// + /// Validates the result responding to multiple events + /// from a step as the map operation. + /// + [Fact] + public async Task ProcessMapResultMultiEventAsync() + { + // Arrange + ProcessBuilder process = new(nameof(ProcessMapResultMultiEventAsync)); + + ProcessMapBuilder mapStep = process.AddMapStepFromType(); + process + .OnInputEvent("Start") + .SendEventTo(new ProcessFunctionTargetBuilder(mapStep)); + + ProcessStepBuilder unionStep = process.AddStepFromType("Union"); + mapStep + .OnEvent(ComputeStep.SquareEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumSquareFunction)); + mapStep + .OnEvent(ComputeStep.CubicEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumCubicFunction)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + + // Assert + UnionState unionState = await GetUnionStateAsync(processContext); + Assert.Equal(55L, unionState.SquareResult); + Assert.Equal(225L, unionState.CubicResult); + } + + /// + /// Validates the result with a sub-process as the map operation. + /// + [Fact] + public async Task ProcessMapResultProcessOperationAsync() + { + // Arrange + ProcessBuilder process = new(nameof(ProcessMapResultProcessOperationAsync)); + + ProcessBuilder mapProcess = new("MapOperation"); + ProcessStepBuilder computeStep = mapProcess.AddStepFromType(); + mapProcess + .OnInputEvent("Anything") + .SendEventTo(new ProcessFunctionTargetBuilder(computeStep)); + + ProcessMapBuilder mapStep = process.AddMapStepFromProcess(mapProcess); + + process + .OnInputEvent("Start") + .SendEventTo(mapStep.WhereInputEventIs("Anything")); + + ProcessStepBuilder unionStep = process.AddStepFromType("Union"); + mapStep + .OnEvent(ComputeStep.SquareEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumSquareFunction)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + + // Assert + UnionState unionState = await GetUnionStateAsync(processContext); + Assert.Equal(55L, unionState.SquareResult); + } + + /// + /// Validates the result even when an invalid edge is + /// introduced to the map-operation. + /// + [Fact] + public async Task ProcessMapResultWithTargetInvalidAsync() + { + // Arrange + ProcessBuilder process = new(nameof(ProcessMapResultWithTargetInvalidAsync)); + + ProcessMapBuilder mapStep = process.AddMapStepFromType(); + process + .OnInputEvent("Start") + .SendEventTo(new ProcessFunctionTargetBuilder(mapStep)); + + // CountStep is not part of the map operation, rather it has been defined on the "outer" process. + CountStep.Index = 0; // Reset static state (test hack) + ProcessStepBuilder countStep = process.AddStepFromType(); + mapStep.MapOperation + .OnEvent(ComputeStep.SquareEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(countStep)); + + ProcessStepBuilder unionStep = process.AddStepFromType("Union"); + mapStep + .OnEvent(ComputeStep.SquareEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumSquareFunction)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act & Assert + await Assert.ThrowsAsync(() => this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start")); + } + + /// + /// Validates the result an extra edge is + /// introduced to the map-operation. + /// + [Fact] + public async Task ProcessMapResultWithTargetExtraAsync() + { + // Arrange + ProcessBuilder process = new(nameof(ProcessMapResultProcessOperationAsync)); + + ProcessBuilder mapProcess = new("MapOperation"); + ProcessStepBuilder computeStep = mapProcess.AddStepFromType(); + mapProcess + .OnInputEvent("Anything") + .SendEventTo(new ProcessFunctionTargetBuilder(computeStep)); + + ProcessStepBuilder countStep = mapProcess.AddStepFromType(); + computeStep + .OnEvent(ComputeStep.SquareEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(countStep)); + + ProcessMapBuilder mapStep = process.AddMapStepFromProcess(mapProcess); + process + .OnInputEvent("Start") + .SendEventTo(mapStep.WhereInputEventIs("Anything")); + + ProcessStepBuilder unionStep = process.AddStepFromType("Union"); + mapStep + .OnEvent(ComputeStep.SquareEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStep, UnionStep.SumSquareFunction)); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, new int[] { 1, 2, 3, 4, 5 }, "Start"); + + // Assert + UnionState unionState = await GetUnionStateAsync(processContext); + Assert.Equal(55L, unionState.SquareResult); + Assert.Equal(5, CountStep.Index); + } + + /// + /// Validates the result as for a nested map operation. + /// + [Fact] + public async Task ProcessMapResultForNestedMapAsync() + { + // Arrange + ProcessBuilder process = new(nameof(ProcessMapResultForNestedMapAsync)); + + ProcessBuilder mapProcess = new("MapOperation"); + ProcessMapBuilder mapStepInner = mapProcess.AddMapStepFromType(); + ProcessStepBuilder unionStepInner = mapProcess.AddStepFromType(); + mapStepInner + .OnEvent(ComputeStep.SquareEventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStepInner, UnionStep.SumSquareFunction)); + + mapProcess + .OnInputEvent("Anything") + .SendEventTo(new ProcessFunctionTargetBuilder(mapStepInner)); + + ProcessMapBuilder mapStepOuter = process.AddMapStepFromProcess(mapProcess); + ProcessStepBuilder unionStepOuter = process.AddStepFromType("Union"); + mapStepOuter + .OnEvent(UnionStep.EventId) + .SendEventTo(new ProcessFunctionTargetBuilder(unionStepOuter, UnionStep.SumSquareFunction)); + + process + .OnInputEvent("Start") + .SendEventTo(mapStepOuter.WhereInputEventIs("Anything")); + + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); + + // Act + int[][] input = + [ + [1, 2, 3, 4, 5], + [1, 2, 3, 4, 5], + [1, 2, 3, 4, 5], + ]; + using LocalKernelProcessContext processContext = await this.RunProcessAsync(kernel, processInstance, input, "Start"); + + // Assert + UnionState unionState = await GetUnionStateAsync(processContext); + Assert.Equal(165L, unionState.SquareResult); + } + + private async Task RunProcessAsync(Kernel kernel, KernelProcess process, object? input, string inputEvent) + { + return + await process.StartAsync( + kernel, + new KernelProcessEvent + { + Id = inputEvent, + Data = input, + }); + } + + private static async Task GetUnionStateAsync(LocalKernelProcessContext processContext) + { + KernelProcess processState = await processContext.GetStateAsync(); + KernelProcessStepState unionState = (KernelProcessStepState)processState.Steps.Single(s => s.State.Name == "Union").State; + Assert.NotNull(unionState); + Assert.NotNull(unionState.State); + return unionState.State; + } + + /// + /// A filler step used that emits the provided value as its output. + /// + private sealed class IncrementStep : KernelProcessStep + { + public const string EventId = "Bump"; + public const string IncrementFunction = "Increment"; + + [KernelFunction(IncrementFunction)] + public async ValueTask IncrementAsync(KernelProcessStepContext context, int count) + { + await context.EmitEventAsync(new() { Id = EventId, Data = count + 1, Visibility = KernelProcessEventVisibility.Public }); + } + } + + /// + /// A filler step used that emits the provided value as its output. + /// + private sealed class InitialStep : KernelProcessStep + { + public const string EventId = "Init"; + public const string InitFunction = "MapInit"; + + [KernelFunction(InitFunction)] + public async ValueTask InitAsync(KernelProcessStepContext context, object values) + { + await context.EmitEventAsync(new() { Id = EventId, Data = values, Visibility = KernelProcessEventVisibility.Public }); + } + } + + /// + /// A step that contains a map operation that emits two events. + /// + private sealed class ComputeStep : KernelProcessStep + { + public const string SquareEventId = "SquareResult"; + public const string CubicEventId = "CubicResult"; + public const string ComputeFunction = "MapCompute"; + + [KernelFunction(ComputeFunction)] + public async ValueTask ComputeAsync(KernelProcessStepContext context, long value) + { + long square = value * value; + await context.EmitEventAsync(new() { Id = SquareEventId, Data = square, Visibility = KernelProcessEventVisibility.Public }); + await context.EmitEventAsync(new() { Id = CubicEventId, Data = square * value, Visibility = KernelProcessEventVisibility.Public }); + } + } + + /// + /// A step that contains multiple functions, one of which is a map operation. + /// + private sealed class ComplexStep : KernelProcessStep + { + public const string ComputeEventId = "SquareResult"; + public const string ComputeFunction = "MapCompute"; + + public const string OtherEventId = "CubicResult"; + public const string OtherFunction = "Other"; + + [KernelFunction(ComputeFunction)] + public async ValueTask ComputeAsync(KernelProcessStepContext context, long value) + { + long square = value * value; + await context.EmitEventAsync(new() { Id = ComputeEventId, Data = square }); + } + + [KernelFunction(OtherFunction)] + public async ValueTask OtherAsync(KernelProcessStepContext context) + { + await context.EmitEventAsync(new() { Id = OtherEventId }); + } + } + + /// + /// A map operation that formats the input as a string. + /// + private sealed class FormatStep : KernelProcessStep + { + public const string EventId = "FormatResult"; + public const string FormatFunction = "MapCompute"; + + [KernelFunction(FormatFunction)] + public async ValueTask FormatAsync(KernelProcessStepContext context, object value) + { + await context.EmitEventAsync(new() { Id = EventId, Data = $"[{value}]" }); + } + } + + private sealed record UnionState + { + public long SquareResult { get; set; } + public long CubicResult { get; set; } + public string FormatResult { get; set; } = string.Empty; + }; + + /// + /// The step that combines the results of the map operation. + /// + private sealed class UnionStep : KernelProcessStep + { + public const string EventId = "MapUnion"; + public const string SumSquareFunction = "UnionSquare"; + public const string SumCubicFunction = "UnionCubic"; + public const string FormatFunction = "UnionFormat"; + + private UnionState _state = new(); + + public override ValueTask ActivateAsync(KernelProcessStepState state) + { + this._state = state.State ?? throw new InvalidDataException(); + + return ValueTask.CompletedTask; + } + + [KernelFunction(SumSquareFunction)] + public async ValueTask SumSquareAsync(KernelProcessStepContext context, IList values) + { + this._state.SquareResult = values.Sum(); + await context.EmitEventAsync(new() { Id = EventId, Data = this._state.SquareResult }); + } + + [KernelFunction(SumCubicFunction)] + public async ValueTask SumCubicAsync(KernelProcessStepContext context, IList values) + { + this._state.CubicResult = values.Sum(); + await context.EmitEventAsync(new() { Id = EventId, Data = this._state.CubicResult }); + } + + [KernelFunction(FormatFunction)] + public void FormatValues(IList values) + { + this._state.FormatResult = string.Join("/", values); + } + } + + /// + /// The step that counts how many times it has been invoked. + /// + private sealed class CountStep : KernelProcessStep + { + public const string CountFunction = nameof(Count); + + public static int Index = 0; + + [KernelFunction] + public void Count() + { + Interlocked.Increment(ref Index); + } + } +} diff --git a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs index a6f2777f8e5b..d90478f450c3 100644 --- a/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs +++ b/dotnet/src/Experimental/Process.UnitTests/Runtime.Local/LocalProcessTests.cs @@ -165,20 +165,28 @@ public async Task FunctionErrorHandlerTakesPrecedenceAsync() /// /// A class that represents a step for testing. /// - private sealed class TestStep : KernelProcessStep + [Fact] + public void ProcessWithSubprocessAndInvalidTargetThrows() { - /// - /// The name of the step. - /// - public static string Name => "TestStep"; + // Arrange + ProcessBuilder process = new(nameof(ProcessWithSubprocessAndInvalidTargetThrows)); + + ProcessBuilder subProcess = new("SubProcess"); + ProcessStepBuilder innerStep = subProcess.AddStepFromType("InnerStep"); + subProcess + .OnInputEvent("Go") + .SendEventTo(new ProcessFunctionTargetBuilder(innerStep)); + process + .OnInputEvent("Start") + .SendEventTo(subProcess.WhereInputEventIs("Go")); + + ProcessStepBuilder outerStep = process.AddStepFromType("OuterStep"); + innerStep + .OnEvent(TestStep.EventId) + .SendEventTo(new ProcessFunctionTargetBuilder(outerStep)); - /// - /// A method that represents a function for testing. - /// - [KernelFunction] - public void TestFunction() - { - } + KernelProcess processInstance = process.Build(); + Kernel kernel = new(); } /// @@ -221,9 +229,17 @@ public void FunctionErrorHandler(KernelProcessError exception, Kernel kernel) } /// - /// A class that represents a state for testing. + /// A class that represents a step for testing. /// - private sealed class TestState + private sealed class TestStep : KernelProcessStep { + public const string EventId = "Next"; + public const string Name = nameof(TestStep); + + [KernelFunction] + public async Task TestFunctionAsync(KernelProcessStepContext context) + { + await context.EmitEventAsync(new() { Id = EventId }); + } } } diff --git a/dotnet/src/Experimental/Process.Utilities.UnitTests/CloneTests.cs b/dotnet/src/Experimental/Process.Utilities.UnitTests/CloneTests.cs index e1f8957038cd..fb0c4764b081 100644 --- a/dotnet/src/Experimental/Process.Utilities.UnitTests/CloneTests.cs +++ b/dotnet/src/Experimental/Process.Utilities.UnitTests/CloneTests.cs @@ -24,7 +24,7 @@ public class CloneTests public void VerifyCloneStepStateTest() { // Arrange - KernelProcessStepState state = new(nameof(VerifyCloneStepStateTest), "test"); + KernelProcessStepState state = new(nameof(VerifyCloneStepStateTest), "v1", "test"); // Act KernelProcessStepState copy = state.Clone(typeof(KernelProcessStepState), null, NullLogger.Instance); @@ -56,7 +56,7 @@ public void VerifyCloneTypedStepStateTest() public void VerifyCloneSimpleStepTest() { // Arrange - KernelProcessStepInfo source = new(typeof(KernelProcessStep), new(nameof(VerifyCloneSimpleStepTest), "test"), []); + KernelProcessStepInfo source = new(typeof(KernelProcessStep), new(nameof(VerifyCloneSimpleStepTest), "v1", "test"), []); // Act KernelProcessStepInfo copy = source.Clone(NullLogger.Instance); @@ -89,7 +89,7 @@ public void VerifyCloneRealStepTest() public void VerifyCloneSingleProcessTest() { // Arrange - KernelProcessStepInfo step = new(typeof(KernelProcessStep), new(nameof(VerifyCloneSingleProcessTest), "teststep"), []); + KernelProcessStepInfo step = new(typeof(KernelProcessStep), new(nameof(VerifyCloneSingleProcessTest), "v1", "teststep"), []); KernelProcessState processState = new(nameof(VerifyCloneSingleProcessTest), "v1", "test"); KernelProcess source = new(processState, [step], CreateTestEdges()); @@ -118,6 +118,25 @@ public void VerifyCloneNestedProcessTest() VerifyProcess(source, copy); } + /// + /// Verify result of cloning a with a . + /// + [Fact] + public void VerifyCloneMapStepTest() + { + // Arrange + KernelProcessStepInfo step = new(typeof(KernelProcessStep), new(nameof(VerifyCloneNestedProcessTest), "v1", "teststep"), []); + KernelProcess mapOperation = new(new(nameof(VerifyCloneNestedProcessTest), "v1", "operation"), [step], CreateTestEdges()); + KernelProcessMap mapStep = new(new(nameof(VerifyCloneNestedProcessTest), "v1", "map"), mapOperation, CreateTestEdges()); + KernelProcess source = new(new(nameof(VerifyCloneNestedProcessTest), "v1", "outer"), [mapStep], []); + + // Act + KernelProcess copy = source.CloneProcess(NullLogger.Instance); + + // Assert + VerifyProcess(source, copy); + } + private static void VerifyProcess(KernelProcess expected, KernelProcess actual) { Assert.Equal(expected.State.Id, actual.State.Id); diff --git a/dotnet/src/InternalUtilities/process/Abstractions/KernelProcessStateMetadataFactory.cs b/dotnet/src/InternalUtilities/process/Abstractions/KernelProcessStateMetadataFactory.cs index 50c8f3011aa4..9a982701cb8b 100644 --- a/dotnet/src/InternalUtilities/process/Abstractions/KernelProcessStateMetadataFactory.cs +++ b/dotnet/src/InternalUtilities/process/Abstractions/KernelProcessStateMetadataFactory.cs @@ -1,38 +1,12 @@ // Copyright (c) Microsoft. All rights reserved. +using System; using Microsoft.SemanticKernel.Process.Models; namespace Microsoft.SemanticKernel.Process.Internal; + internal static class ProcessStateMetadataFactory { - /// - /// Captures Kernel Process Step State into - /// - /// - private static KernelProcessStepStateMetadata StepInfoToProcessStateMetadata(KernelProcessStepInfo stepInfo) - { - KernelProcessStepStateMetadata metadata = new() - { - Name = stepInfo.State.Name, - Id = stepInfo.State.Id, - VersionInfo = stepInfo.State.Version - }; - - if (stepInfo.InnerStepType.TryGetSubtypeOfStatefulStep(out var genericStateType) && genericStateType != null) - { - var userStateType = genericStateType.GetGenericArguments()[0]; - var stateOriginalType = typeof(KernelProcessStepState<>).MakeGenericType(userStateType); - - var innerState = stateOriginalType.GetProperty(nameof(KernelProcessStepState.State))?.GetValue(stepInfo.State); - if (innerState != null) - { - metadata.State = innerState; - } - } - - return metadata; - } - /// /// Captures Kernel Process State into /// @@ -47,17 +21,9 @@ public static KernelProcessStateMetadata KernelProcessToProcessStateMetadata(Ker StepsState = [], }; - foreach (var step in kernelProcess.Steps) + foreach (KernelProcessStepInfo step in kernelProcess.Steps) { - KernelProcessStateMetadata stepEventMetadata = new(); - if (step is KernelProcess stepSubprocess) - { - metadata.StepsState.Add(step.State.Name, KernelProcessToProcessStateMetadata(stepSubprocess)); - } - else - { - metadata.StepsState.Add(step.State.Name, StepInfoToProcessStateMetadata(step)); - } + metadata.StepsState.Add(step.State.Name, step.ToProcessStateMetadata()); } return metadata; @@ -69,7 +35,51 @@ public static KernelProcessStepStateMetadata ToProcessStateMetadata(this KernelP { return KernelProcessToProcessStateMetadata(subprocess); } + else if (stepInfo is KernelProcessMap stepMap) + { + return KernelProcessMapToProcessStateMetadata(stepMap); + } return StepInfoToProcessStateMetadata(stepInfo); } + + private static KernelProcessMapStateMetadata KernelProcessMapToProcessStateMetadata(KernelProcessMap stepMap) + { + return + new() + { + Name = stepMap.State.Name, + Id = stepMap.State.Id, + VersionInfo = stepMap.State.Version, + OperationState = ToProcessStateMetadata(stepMap.Operation), + }; + } + + /// + /// Captures Kernel Process Step State into + /// + /// + private static KernelProcessStepStateMetadata StepInfoToProcessStateMetadata(KernelProcessStepInfo stepInfo) + { + KernelProcessStepStateMetadata metadata = new() + { + Name = stepInfo.State.Name, + Id = stepInfo.State.Id, + VersionInfo = stepInfo.State.Version + }; + + if (stepInfo.InnerStepType.TryGetSubtypeOfStatefulStep(out Type? genericStateType) && genericStateType != null) + { + Type userStateType = genericStateType.GetGenericArguments()[0]; + Type stateOriginalType = typeof(KernelProcessStepState<>).MakeGenericType(userStateType); + + object? innerState = stateOriginalType.GetProperty(nameof(KernelProcessStepState.State))?.GetValue(stepInfo.State); + if (innerState != null) + { + metadata.State = innerState; + } + } + + return metadata; + } } diff --git a/dotnet/src/InternalUtilities/process/Abstractions/MapExtensions.cs b/dotnet/src/InternalUtilities/process/Abstractions/MapExtensions.cs new file mode 100644 index 000000000000..b42be3c87a9e --- /dev/null +++ b/dotnet/src/InternalUtilities/process/Abstractions/MapExtensions.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft. All rights reserved. +using System.Linq; +using Microsoft.Extensions.Logging; + +namespace Microsoft.SemanticKernel.Process.Internal; + +internal static class MapExtensions +{ + public static KernelProcessMap CloneMap(this KernelProcessMap map, ILogger logger) + { + KernelProcessMapState newState = new(map.State.Name, map.State.Version, map.State.Id!); + + KernelProcessMap copy = + new( + newState, + map.Operation.Clone(logger), + map.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToList())); + + return copy; + } +} diff --git a/dotnet/src/InternalUtilities/process/Abstractions/ProcessConstants.cs b/dotnet/src/InternalUtilities/process/Abstractions/ProcessConstants.cs index 442848337952..a4ab69995b8c 100644 --- a/dotnet/src/InternalUtilities/process/Abstractions/ProcessConstants.cs +++ b/dotnet/src/InternalUtilities/process/Abstractions/ProcessConstants.cs @@ -13,18 +13,29 @@ internal static class ProcessConstants /// public const string EndStepName = "Microsoft.SemanticKernel.Process.EndStep"; + /// + /// Separator for qualified event ids. + /// + internal const char EventIdSeparator = '.'; + /// /// Version for state of internal steps /// public const string InternalStepsVersion = "v0"; + /// + /// EventId used internally as the input event for . + /// + public const string MapEventId = "StartMap"; + /// /// Enum containing the name of internal components. /// Used for serialization purposes. /// public enum SupportedComponents { - Step = 0, - Process = 1, + Step, + Process, + Map, } } diff --git a/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs b/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs index b9a4c9d89922..ab74689a33db 100644 --- a/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs +++ b/dotnet/src/InternalUtilities/process/Abstractions/StepExtensions.cs @@ -16,6 +16,11 @@ public static KernelProcessStepInfo Clone(this KernelProcessStepInfo step, ILogg return subProcess.CloneProcess(logger); } + if (step is KernelProcessMap mapStep) + { + return mapStep.CloneMap(logger); + } + Type stateType = step.InnerStepType.ExtractStateType(out Type? userStateType, logger); KernelProcessStepState newState = step.State.Clone(stateType, userStateType, logger); diff --git a/dotnet/src/InternalUtilities/process/Runtime/MapExtensions.cs b/dotnet/src/InternalUtilities/process/Runtime/MapExtensions.cs new file mode 100644 index 000000000000..8c2e87f37d91 --- /dev/null +++ b/dotnet/src/InternalUtilities/process/Runtime/MapExtensions.cs @@ -0,0 +1,105 @@ +// Copyright (c) Microsoft. All rights reserved. +using System; +using System.Collections; +using System.Linq; +using System.Text.Json; +using Microsoft.Extensions.Logging; +using Microsoft.SemanticKernel.Process.Internal; + +namespace Microsoft.SemanticKernel.Process.Runtime; + +internal static class MapExtensions +{ + public static (IEnumerable, KernelProcess, string) Initialize(this KernelProcessMap map, ProcessMessage message, ILogger? logger) + { + IEnumerable inputValues = message.GetMapInput(logger); + KernelProcess mapOperation; + string startEventId; + + if (map.Operation is KernelProcess kernelProcess) + { + startEventId = DefineOperationEventId(kernelProcess, message); + mapOperation = kernelProcess; + } + else + { + startEventId = ProcessConstants.MapEventId; + string? parameterName = message.Values.SingleOrDefault(kvp => IsEqual(inputValues, kvp.Value)).Key; + string proxyId = Guid.NewGuid().ToString("N"); + mapOperation = + new KernelProcess( + new KernelProcessState($"Map{map.Operation.State.Name}", map.Operation.State.Version, proxyId), + [map.Operation], + new() { { ProcessConstants.MapEventId, [new KernelProcessEdge(proxyId, new KernelProcessFunctionTarget(map.Operation.State.Id!, message.FunctionName, parameterName))] } }); + } + + return (inputValues, mapOperation, startEventId); + } + + private static IEnumerable GetMapInput(this ProcessMessage message, ILogger? logger) + { + if (message.TargetEventData == null) + { + throw new KernelException($"Internal Map Error: Input data not present - {message.SourceId}/{message.DestinationId}.").Log(logger); + } + + Type valueType = message.TargetEventData.GetType(); + + return typeof(IEnumerable).IsAssignableFrom(valueType) && valueType.HasElementType ? + (IEnumerable)message.TargetEventData : + throw new KernelException($"Internal Map Error: Input parameter is not enumerable - {message.SourceId}/{message.DestinationId} [{valueType.FullName}].").Log(logger); + } + + private static string DefineOperationEventId(KernelProcess mapOperation, ProcessMessage message) + { + // Fails when zero or multiple candidate edges exist. No reason a map-operation should be irrational. + return + mapOperation.Edges.SingleOrDefault(kvp => kvp.Value.Any(e => e.OutputTarget.FunctionName == message.FunctionName)).Key ?? + throw new InvalidOperationException($"The map operation does not have an input edge that matches the message destination: {mapOperation.State.Name}/{mapOperation.State.Id}."); + } + + private static bool IsEqual(IEnumerable targetData, object? possibleValue) + { + // Short circuit for null candidate + if (possibleValue == null) + { + return false; + } + + // Object equality is valid for LocalRuntime + if (targetData == possibleValue) + { + return true; + } + + // DAPR runtime requires a deeper comparison + Type candidateType = possibleValue.GetType(); + + // Candidate must be enumerable with element type + if (!typeof(IEnumerable).IsAssignableFrom(candidateType) || + !candidateType.HasElementType) + { + return false; + } + + // Types much match + Type targetType = targetData.GetType(); + if (candidateType != targetData.GetType()) + { + return false; + } + + if (targetType.GetElementType() == candidateType.GetElementType()) + { + // Data has already been serialized to make get this far. + // Let's use serialization for equality check. + // Note: We aren't looking for equivalency. We are testing + // for a clone of the exact same data instances. + string targetDataJson = JsonSerializer.Serialize(targetData); + string possibleValueJson = JsonSerializer.Serialize(possibleValue); + return string.Equals(targetDataJson, possibleValueJson, StringComparison.Ordinal); + } + + return false; + } +} diff --git a/dotnet/src/InternalUtilities/process/Runtime/ProcessEvent.cs b/dotnet/src/InternalUtilities/process/Runtime/ProcessEvent.cs index 9715429c8609..530879b23af0 100644 --- a/dotnet/src/InternalUtilities/process/Runtime/ProcessEvent.cs +++ b/dotnet/src/InternalUtilities/process/Runtime/ProcessEvent.cs @@ -1,4 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. +using Microsoft.SemanticKernel.Process.Internal; + namespace Microsoft.SemanticKernel.Process.Runtime; /// @@ -34,7 +36,7 @@ public record ProcessEvent /// /// The Qualified Id of the event. /// - internal string QualifiedId => $"{this.Namespace}.{this.SourceId}"; + internal string QualifiedId => $"{this.Namespace}{ProcessConstants.EventIdSeparator}{this.SourceId}"; /// /// Creates a new from a .