diff --git a/Synapse.sln b/Synapse.sln index bb95ed2d9..5bd417f82 100644 --- a/Synapse.sln +++ b/Synapse.sln @@ -134,8 +134,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Runtime.Kubernetes" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Core.Infrastructure.Containers.Docker", "src\core\Synapse.Core.Infrastructure.Containers.Docker\Synapse.Core.Infrastructure.Containers.Docker.csproj", "{DD6381BD-2C8B-4CE1-99B2-EC585DD818FA}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "kubernetes", "kubernetes", "{B3F3DB1B-23E7-45FA-8934-448BFFB294E8}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Synapse.Core.Infrastructure.Containers.Kubernetes", "src\core\Synapse.Core.Infrastructure.Containers.Kubernetes\Synapse.Core.Infrastructure.Containers.Kubernetes.csproj", "{41C99069-BD99-4FD2-BF33-984CF03B53E8}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{35D495F4-D267-4A84-9479-DB3C1BE85434}" @@ -292,7 +290,6 @@ Global {8FF58403-9E13-4F58-864F-E6FBC877BF37} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527} {9B37AA4A-A342-4A41-A2A1-C8466825A70A} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527} {DD6381BD-2C8B-4CE1-99B2-EC585DD818FA} = {9E296C8A-4D78-4592-B046-11A3A953FD25} - {B3F3DB1B-23E7-45FA-8934-448BFFB294E8} = {562C91A3-6E91-4489-9D9D-064E7436D900} {41C99069-BD99-4FD2-BF33-984CF03B53E8} = {9E296C8A-4D78-4592-B046-11A3A953FD25} {AB30A91B-0158-411D-9BD3-36FFA441B3A2} = {35D495F4-D267-4A84-9479-DB3C1BE85434} {06404855-A5BE-4556-91BC-064630E95737} = {35D495F4-D267-4A84-9479-DB3C1BE85434} diff --git a/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj b/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj index 345d385d8..784f3f5c3 100644 --- a/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj +++ b/src/api/Synapse.Api.Application/Synapse.Api.Application.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/api/Synapse.Api.Client.Core/Synapse.Api.Client.Core.csproj b/src/api/Synapse.Api.Client.Core/Synapse.Api.Client.Core.csproj index 9dfa1105e..708ba6acd 100644 --- a/src/api/Synapse.Api.Client.Core/Synapse.Api.Client.Core.csproj +++ b/src/api/Synapse.Api.Client.Core/Synapse.Api.Client.Core.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj index 81297faa8..368116e68 100644 --- a/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj +++ b/src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors @@ -43,7 +43,7 @@ - + diff --git a/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs b/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs index 113c709a7..58ac491d7 100644 --- a/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs +++ b/src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs @@ -20,6 +20,7 @@ using Synapse.Api.Http.Controllers; using Synapse.Core.Api.Services; using System.Text.Json; +using System.Text.Json.Serialization; namespace Synapse.Api.Http; @@ -44,6 +45,7 @@ public static IServiceCollection AddSynapseHttpApi(this IServiceCollection servi .AddJsonOptions(options => { options.JsonSerializerOptions.PropertyNamingPolicy = JsonNamingPolicy.CamelCase; + options.JsonSerializerOptions.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull | JsonIgnoreCondition.WhenWritingDefault; }) .AddApplicationPart(typeof(WorkflowsController).Assembly); services.AddIdentityServer(options => diff --git a/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj b/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj index 2a4eeb1ae..2b4e27c70 100644 --- a/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj +++ b/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj @@ -8,7 +8,7 @@ Library True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors @@ -43,8 +43,8 @@ - - + + diff --git a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj index b5c7e509f..6964cbd05 100644 --- a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj +++ b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors @@ -27,7 +27,6 @@ Linux ..\..\.. false - false diff --git a/src/api/Synapse.Api.Server/appsettings.Development.json b/src/api/Synapse.Api.Server/appsettings.Development.json index 774c813cc..17aacb0c2 100644 --- a/src/api/Synapse.Api.Server/appsettings.Development.json +++ b/src/api/Synapse.Api.Server/appsettings.Development.json @@ -19,6 +19,6 @@ } }, "CloudEvents": { - "Endpoint": "https://webhook.site/a4aff725-0711-48b2-a9d2-5d1b806d04d0" + "Endpoint": "http://localhost:5151/api/events/pub" } } diff --git a/src/cli/Synapse.Cli/Synapse.Cli.csproj b/src/cli/Synapse.Cli/Synapse.Cli.csproj index 112b6d4e9..70307239d 100644 --- a/src/cli/Synapse.Cli/Synapse.Cli.csproj +++ b/src/cli/Synapse.Cli/Synapse.Cli.csproj @@ -8,7 +8,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors @@ -33,7 +33,7 @@ - + diff --git a/src/core/Synapse.Core.Infrastructure.Containers.Docker/Synapse.Core.Infrastructure.Containers.Docker.csproj b/src/core/Synapse.Core.Infrastructure.Containers.Docker/Synapse.Core.Infrastructure.Containers.Docker.csproj index 729b3fa1c..d954abc58 100644 --- a/src/core/Synapse.Core.Infrastructure.Containers.Docker/Synapse.Core.Infrastructure.Containers.Docker.csproj +++ b/src/core/Synapse.Core.Infrastructure.Containers.Docker/Synapse.Core.Infrastructure.Containers.Docker.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/core/Synapse.Core.Infrastructure.Containers.Kubernetes/Synapse.Core.Infrastructure.Containers.Kubernetes.csproj b/src/core/Synapse.Core.Infrastructure.Containers.Kubernetes/Synapse.Core.Infrastructure.Containers.Kubernetes.csproj index 5237405c7..c2b604a53 100644 --- a/src/core/Synapse.Core.Infrastructure.Containers.Kubernetes/Synapse.Core.Infrastructure.Containers.Kubernetes.csproj +++ b/src/core/Synapse.Core.Infrastructure.Containers.Kubernetes/Synapse.Core.Infrastructure.Containers.Kubernetes.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj index 72703a72c..ba4932732 100644 --- a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj +++ b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors @@ -44,14 +44,14 @@ - - - - - - - - + + + + + + + + diff --git a/src/core/Synapse.Core/Resources/CorrelationSpec.cs b/src/core/Synapse.Core/Resources/CorrelationSpec.cs index 544ce215f..a74b66472 100644 --- a/src/core/Synapse.Core/Resources/CorrelationSpec.cs +++ b/src/core/Synapse.Core/Resources/CorrelationSpec.cs @@ -46,16 +46,22 @@ public record CorrelationSpec [DataMember(Name = "events", Order = 4), JsonPropertyName("events"), JsonPropertyOrder(4), YamlMember(Alias = "events", Order = 4)] public virtual EventConsumptionStrategyDefinition Events { get; set; } = null!; + /// + /// Gets/sets a key/value mapping, if any, of the keys to use to correlate events + /// + [DataMember(Name = "keys", Order = 5), JsonPropertyName("keys"), JsonPropertyOrder(5), YamlMember(Alias = "keys", Order = 5)] + public virtual EquatableDictionary? Keys { get; set; } + /// /// Gets/sets a boolean indicating whether or not to stream events. When enabled, each correlated event is atomically published to the subscriber immediately rather than waiting for the entire correlation to complete /// - [DataMember(Name = "stream", Order = 5), JsonPropertyName("stream"), JsonPropertyOrder(5), YamlMember(Alias = "stream", Order = 5)] + [DataMember(Name = "stream", Order = 6), JsonPropertyName("stream"), JsonPropertyOrder(6), YamlMember(Alias = "stream", Order = 6)] public virtual bool Stream { get; set; } /// /// Gets/sets an object used to configure the correlation's outcome /// - [DataMember(Name = "outcome", Order = 6), JsonPropertyName("outcome"), JsonPropertyOrder(6), YamlMember(Alias = "outcome", Order = 6)] + [DataMember(Name = "outcome", Order = 7), JsonPropertyName("outcome"), JsonPropertyOrder(7), YamlMember(Alias = "outcome", Order = 7)] public virtual CorrelationOutcomeDefinition Outcome { get; set; } = null!; } diff --git a/src/core/Synapse.Core/Synapse.Core.csproj b/src/core/Synapse.Core/Synapse.Core.csproj index a2405cc83..d892ab234 100644 --- a/src/core/Synapse.Core/Synapse.Core.csproj +++ b/src/core/Synapse.Core/Synapse.Core.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors @@ -66,11 +66,11 @@ - - - + + + - + diff --git a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs index 852608349..4c430342a 100644 --- a/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs +++ b/src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs @@ -123,7 +123,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken { Id = Guid.NewGuid().ToString("N")[..12], Events = [new(filter.Key, e)], - Keys = CorrelationKeys == null ? new() : new(CorrelationKeys) + Keys = CorrelationKeys == null ? this.Correlation.Resource.Spec.Keys ?? [] : new(CorrelationKeys) }; this.Logger.LogInformation("Correlation context with id '{contextId}' successfully created", context.Id); this.Logger.LogInformation("Event successfully correlated to context with id '{contextId}'", context.Id); @@ -152,7 +152,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken { Id = Guid.NewGuid().ToString("N")[..12], Events = [new(filter.Key, e)], - Keys = CorrelationKeys == null ? new() : new(CorrelationKeys) + Keys = CorrelationKeys == null ? this.Correlation.Resource.Spec.Keys ?? [] : new(CorrelationKeys) }; await this.CreateOrUpdateContextAsync(context, cancellationToken).ConfigureAwait(false); this.Logger.LogInformation("Correlation context with id '{contextId}' successfully created", context.Id); @@ -289,7 +289,7 @@ protected virtual async Task TryFilterEventAsync(EventFilterDefinition fil protected virtual async Task<(bool Succeeded, IDictionary? CorrelationKeys)> TryExtractCorrelationKeysAsync(CloudEvent e, IDictionary? keyDefinitions, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(e); - var correlationKeys = new Dictionary(); + var correlationKeys = this.Correlation.Resource.Spec.Keys ?? []; if (keyDefinitions == null || keyDefinitions.Count < 1) return (true, correlationKeys); foreach (var keyDefinition in keyDefinitions) { @@ -305,6 +305,7 @@ protected virtual async Task TryFilterEventAsync(EventFilterDefinition fil } else if (!keyDefinition.Value.Expect.Equals(correlationTerm, StringComparison.OrdinalIgnoreCase)) return (false, null); } + if (correlationKeys.ContainsKey(keyDefinition.Key) && correlationTerm != correlationKeys[keyDefinition.Key]) return (false, null); correlationKeys[keyDefinition.Key] = correlationTerm; } return (true, correlationKeys); @@ -361,7 +362,7 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte await this.Resources.PatchStatusAsync(new(PatchType.JsonPatch, patch), workflowInstance.GetName(), workflowInstance.GetNamespace(), null, false, cancellationToken).ConfigureAwait(false); break; case CorrelationOutcomeType.Start: - var input = this.Correlation.Resource.Spec.Outcome.Start!.Input == null ? [] : await this.ExpressionEvaluator.EvaluateAsync>(this.Correlation.Resource.Spec.Outcome.Start!.Input!, context, cancellationToken: cancellationToken).ConfigureAwait(false); + var input = (this.Correlation.Resource.Spec.Outcome.Start!.Input == null ? new() { { "events", context.Events.Values } } : await this.ExpressionEvaluator.EvaluateAsync>(this.Correlation.Resource.Spec.Outcome.Start!.Input!, context, cancellationToken: cancellationToken).ConfigureAwait(false)); workflowInstance = new() { Metadata = new() @@ -373,6 +374,13 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte { Definition = this.Correlation.Resource.Spec.Outcome.Start!.Workflow, Input = input + }, + Status = new() + { + Correlation = new() + { + Keys = context.Keys + } } }; await this.Resources.AddAsync(workflowInstance, false, cancellationToken).ConfigureAwait(false); diff --git a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj index 3c1628e30..abd22a906 100644 --- a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj +++ b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj @@ -8,7 +8,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors @@ -36,12 +36,12 @@ - - - - - - + + + + + + diff --git a/src/dashboard/Synapse.Dashboard/Components/HorizontalCollapsible/HorizontalCollapsible.razor b/src/dashboard/Synapse.Dashboard/Components/HorizontalCollapsible/HorizontalCollapsible.razor index 4ee9727f7..cf51e84ee 100644 --- a/src/dashboard/Synapse.Dashboard/Components/HorizontalCollapsible/HorizontalCollapsible.razor +++ b/src/dashboard/Synapse.Dashboard/Components/HorizontalCollapsible/HorizontalCollapsible.razor @@ -26,7 +26,7 @@ { } - + @Label @if (OnClose.HasDelegate) { @@ -57,6 +57,24 @@ await base.OnParametersSetAsync(); } + public async Task HideAsync() + { + isCollapsed = true; + if (OnHidden.HasDelegate) + { + await OnHidden.InvokeAsync(); + } + } + + public async Task ShowAsync() + { + isCollapsed = false; + if (OnShown.HasDelegate) + { + await OnShown.InvokeAsync(); + } + } + async Task OnToggleAsync() { isCollapsed = !isCollapsed; diff --git a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentState.cs b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentState.cs index c03e0f5c1..910396cae 100644 --- a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentState.cs +++ b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentState.cs @@ -22,6 +22,11 @@ public record NamespacedResourceManagementComponentState where TResource : Resource, new() { + /// + /// Gets/sets a boolean value that indicates whether to list s + /// + public bool ListNamespaces { get; set; } = true; + /// /// Gets a that contains all s /// diff --git a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentStore.cs b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentStore.cs index 1b251c5df..9a4f2c4a5 100644 --- a/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentStore.cs +++ b/src/dashboard/Synapse.Dashboard/Components/ResourceManagement/NamespacedResourceManagementComponentStore.cs @@ -30,7 +30,12 @@ public class NamespacedResourceManagementComponentStore(ILogg { /// - /// Gets an used to observe s + /// Gets an used to observe + /// + public IObservable ListNamespaces => this.Select(s => s.ListNamespaces).DistinctUntilChanged(); + + /// + /// Gets an used to observe s /// public IObservable?> Namespaces => this.Select(s => s.Namespaces).DistinctUntilChanged(); @@ -51,6 +56,27 @@ public class NamespacedResourceManagementComponentStore(ILogg ) .DistinctUntilChanged(); + /// + /// Sets the to true + /// + public void EnableNamespaceListing() + { + this.Reduce(state => state with + { + ListNamespaces = true + }); + } + /// + /// Sets the to false + /// + public void DisableNamespaceListing() + { + this.Reduce(state => state with + { + ListNamespaces = false + }); + } + /// /// Sets the /// @@ -85,8 +111,8 @@ public override async Task DeleteResourceAsync(TResource resource) /// public override async Task InitializeAsync() { - await this.ListNamespacesAsync().ConfigureAwait(false); await base.InitializeAsync(); + if (this.Get(state => state.ListNamespaces)) await this.ListNamespacesAsync().ConfigureAwait(false); } } diff --git a/src/dashboard/Synapse.Dashboard/Components/WorkflowDiagram/WorkflowDiagram.razor b/src/dashboard/Synapse.Dashboard/Components/WorkflowDiagram/WorkflowDiagram.razor index 5f1fba46b..cb7caf224 100644 --- a/src/dashboard/Synapse.Dashboard/Components/WorkflowDiagram/WorkflowDiagram.razor +++ b/src/dashboard/Synapse.Dashboard/Components/WorkflowDiagram/WorkflowDiagram.razor @@ -153,6 +153,10 @@ graph.CssClass = ""; isDirty = true; } + if (graph != null && this.Store.DagreGraph != null && e.GraphElement is INodeViewModel node) + { + await this.Store.DagreGraph.CenterAsync(node); + } if (OnMouseUp.HasDelegate) { await OnMouseUp.InvokeAsync(e); diff --git a/src/dashboard/Synapse.Dashboard/Pages/Workflows/Details/View.razor b/src/dashboard/Synapse.Dashboard/Pages/Workflows/Details/View.razor index b90d9730d..c884151d6 100644 --- a/src/dashboard/Synapse.Dashboard/Pages/Workflows/Details/View.razor +++ b/src/dashboard/Synapse.Dashboard/Pages/Workflows/Details/View.razor @@ -25,7 +25,7 @@ Workflow @($"{Name}.{@namespace}:{version}")
- + - + @if (workflowDefinition == null) @@ -70,7 +70,7 @@ } - + @if (workflowDefinition == null) @@ -106,7 +106,7 @@ @if (workflowInstance != null) { - +
@@ -152,6 +152,9 @@ WorkflowDefinition workflowDefinition = null!; WorkflowInstance? workflowInstance; ProblemDetails? problemDetails; + HorizontalCollapsible instancesListPanel = null!; + HorizontalCollapsible graphPanel = null!; + HorizontalCollapsible definitionPanel = null!; readonly IEnumerable columns = [ "Name", @@ -167,6 +170,7 @@ /// protected override async Task OnInitializedAsync() { + Store.DisableNamespaceListing(); UpdateBreadcrumb(); Store.WorkflowInstanceName.Subscribe(value => OnStateChanged(_ => instanceName = value), token: CancellationTokenSource.Token); Store.WorkflowDefinition.Where(value => value != null).Subscribe(value => OnStateChanged(_ => workflowDefinition = value!), token: CancellationTokenSource.Token); @@ -189,8 +193,9 @@ await base.OnInitializedAsync().ConfigureAwait(false); } + /// - protected override void OnParametersSet() + protected override async Task OnParametersSetAsync() { if (Version != version) { @@ -199,7 +204,14 @@ if (InstanceName != instanceName) { Store.SetWorkflowInstanceName(InstanceName); + if (!string.IsNullOrWhiteSpace(InstanceName)) + { + await instancesListPanel.HideAsync(); + await graphPanel.HideAsync(); + await definitionPanel.HideAsync(); + } } + await base.OnParametersSetAsync(); } /// @@ -263,8 +275,11 @@ void OnShowInstanceDetails(WorkflowInstance instance) => NavigationManager.NavigateTo($"/workflows/details/{@namespace}/{Name}/{version}/{instance.GetName()}"); - void OnCloseWorkflowInstance() + async Task OnCloseWorkflowInstanceAsync() { + await instancesListPanel.ShowAsync(); + await graphPanel.ShowAsync(); + await definitionPanel.ShowAsync(); NavigationManager.NavigateTo($"/workflows/details/{@namespace}/{Name}/{version}"); StateHasChanged(); } diff --git a/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj b/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj index 4559655e9..f6a2a55eb 100644 --- a/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj +++ b/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj @@ -10,14 +10,14 @@ - + - + diff --git a/src/operator/Synapse.Operator/Synapse.Operator.csproj b/src/operator/Synapse.Operator/Synapse.Operator.csproj index d072a8852..e42903576 100644 --- a/src/operator/Synapse.Operator/Synapse.Operator.csproj +++ b/src/operator/Synapse.Operator/Synapse.Operator.csproj @@ -8,7 +8,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs index cb4ccfb00..1c5df3660 100644 --- a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs +++ b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs @@ -424,7 +424,7 @@ public virtual async Task CorrelateAsync(ITaskExecutionConte { ArgumentNullException.ThrowIfNull(task); if (task.Definition is not ListenTaskDefinition listenTask) throw new ArgumentException("The specified task's definition must be a 'listen' task", nameof(task)); - if (listenTask.Foreach == null) throw new ArgumentException($"Since the specified listen task uses streaming, the {nameof(StreamAsync)} method must be used instead"); + if (listenTask.Foreach != null) throw new ArgumentException($"Since the specified listen task uses streaming, the {nameof(StreamAsync)} method must be used instead"); if (this.Instance.Status?.Correlation?.Contexts?.TryGetValue(task.Instance.Reference.OriginalString, out var context) == true && context != null) return context; var @namespace = task.Workflow.Instance.GetNamespace()!; var name = $"{task.Workflow.Instance.GetName()}.{task.Instance.Id}"; @@ -449,6 +449,7 @@ public virtual async Task CorrelateAsync(ITaskExecutionConte Source = new ResourceReference(task.Workflow.Instance.GetName(), task.Workflow.Instance.GetNamespace()), Lifetime = CorrelationLifetime.Ephemeral, Events = listenTask.Listen.To, + Keys = this.Instance.Status?.Correlation?.Keys, Expressions = task.Workflow.Definition.Evaluate ?? new(), Outcome = new() { @@ -511,6 +512,17 @@ public virtual async Task CorrelateAsync(ITaskExecutionConte CompletedAt = DateTimeOffset.Now } }, cancellationToken).ConfigureAwait(false); + using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false); + this.Instance = await this.Api.WorkflowInstances.GetAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken).ConfigureAwait(false); + var originalInstance = this.Instance.Clone(); + foreach(var correlationKey in correlationContext.Keys) + { + this.Instance.Status!.Correlation!.Keys ??= []; + this.Instance.Status!.Correlation!.Keys[correlationKey.Key] = correlationKey.Value; + } + this.Instance.Status!.Correlation!.Contexts!.Remove(task.Instance.Reference.OriginalString); + var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(originalInstance, this.Instance); + this.Instance = await this.Api.WorkflowInstances.PatchStatusAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, new Patch(PatchType.JsonPatch, jsonPatch), null, cancellationToken).ConfigureAwait(false); return correlationContext; } diff --git a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs index 3159cca90..5d4e71c7d 100644 --- a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs @@ -18,6 +18,7 @@ using Neuroglia.AsyncApi.IO; using Neuroglia.AsyncApi.v3; using Neuroglia.Data.Expressions; +using System.Threading; namespace Synapse.Runner.Services.Executors; @@ -94,6 +95,11 @@ public class AsyncApiCallExecutor(IServiceProvider serviceProvider, ILogger protected uint? Offset { get; set; } + /// + /// Gets/sets a boolean indicating whether or not to keep consuming incoming messages + /// + protected bool KeepConsume { get; set; } = true; + /// /// Gets the path for the specified message /// @@ -234,7 +240,7 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation"); await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document); var parameters = new AsyncApiSubscribeOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol); - await using var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false); + var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false); if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI subscribe operation"); if(result.Messages == null) { @@ -244,24 +250,24 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken var observable = result.Messages; if (this.AsyncApi.Subscription.Consume.For != null) observable = observable.TakeUntil(Observable.Timer(this.AsyncApi.Subscription.Consume.For.ToTimeSpan())); if (this.AsyncApi.Subscription.Consume.Amount.HasValue) observable = observable.Take(this.AsyncApi.Subscription.Consume.Amount.Value); - else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) observable = observable.Select(message => Observable.FromAsync(async () => - { - var keepGoing = await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!,this.GetExpressionEvaluationArguments(),cancellationToken).ConfigureAwait(false); - return (message, keepGoing); - })).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message); - else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) observable = observable.Select(message => Observable.FromAsync(async () => - { - var keepGoing = !(await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false)); - return (message, keepGoing); - })).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message); if (this.AsyncApi.Subscription.Foreach == null) { - var messages = await observable.ToAsyncEnumerable().ToListAsync(cancellationToken).ConfigureAwait(false); + var messages = await observable.ToAsyncEnumerable().TakeWhileAwait(async m => + { + if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) return await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); + if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) return !await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false); + return true; + }).ToListAsync(cancellationToken).ConfigureAwait(false); await this.SetResultAsync(messages, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); } else { - this.Subscription = observable.SubscribeAsync(OnStreamingMessageAsync, OnStreamingErrorAsync, OnStreamingCompletedAsync); + //todo: fix + this.Subscription = observable.TakeWhile(_ => this.KeepConsume).SelectMany(m => + { + OnStreamingMessageAsync(m).GetAwaiter().GetResult(); + return Observable.Return(m); + }).SubscribeAsync(_ => System.Threading.Tasks.Task.CompletedTask, OnStreamingErrorAsync, OnStreamingCompletedAsync); } } @@ -274,6 +280,11 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message) { if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution"); if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation"); + if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While) && !await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!, this.GetExpressionEvaluationArguments(), this.CancellationTokenSource!.Token).ConfigureAwait(false)) + { + this.KeepConsume = false; + return; + } if (this.AsyncApi.Subscription.Foreach?.Do != null) { var taskDefinition = new DoTaskDefinition() @@ -284,10 +295,15 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message) new(SynapseDefaults.Tasks.Metadata.PathPrefix.Name, false) ] }; - var arguments = this.GetExpressionEvaluationArguments(); var messageData = message as object; + var offset = this.Offset ?? 0; + if (!this.Offset.HasValue) this.Offset = 0; + var arguments = this.GetExpressionEvaluationArguments(); + arguments ??= new Dictionary(); + arguments[this.AsyncApi.Subscription.Foreach.Item ?? RuntimeExpressions.Arguments.Each] = messageData!; + arguments[this.AsyncApi.Subscription.Foreach.At ?? RuntimeExpressions.Arguments.Index] = offset; if (this.AsyncApi.Subscription.Foreach.Output?.As is string fromExpression) messageData = await this.Task.Workflow.Expressions.EvaluateAsync(fromExpression, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); - else if (this.AsyncApi.Subscription.Foreach.Output?.As != null) messageData = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Subscription.Foreach.Output.As, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); + else if (this.AsyncApi.Subscription.Foreach.Output?.As != null) messageData = await this.Task.Workflow.Expressions.EvaluateAsync(this.AsyncApi.Subscription.Foreach.Output.As!, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); if (this.AsyncApi.Subscription.Foreach.Export?.As is string toExpression) { var context = (await this.Task.Workflow.Expressions.EvaluateAsync>(toExpression, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!; @@ -295,19 +311,20 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message) } else if (this.AsyncApi.Subscription.Foreach.Export?.As != null) { - var context = (await this.Task.Workflow.Expressions.EvaluateAsync>(this.AsyncApi.Subscription.Foreach.Export.As, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!; + var context = (await this.Task.Workflow.Expressions.EvaluateAsync>(this.AsyncApi.Subscription.Foreach.Export.As!, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!; await this.Task.SetContextDataAsync(context, this.CancellationTokenSource!.Token).ConfigureAwait(false); } - var offset = this.Offset ?? 0; - if (!this.Offset.HasValue) this.Offset = 0; - arguments ??= new Dictionary(); - arguments[this.AsyncApi.Subscription.Foreach.Item ?? RuntimeExpressions.Arguments.Each] = messageData!; - arguments[this.AsyncApi.Subscription.Foreach.At ?? RuntimeExpressions.Arguments.Index] = offset; var task = await this.Task.Workflow.CreateTaskAsync(taskDefinition, this.GetPathFor(offset), this.Task.Input, null, this.Task, false, this.CancellationTokenSource!.Token).ConfigureAwait(false); var taskExecutor = await this.CreateTaskExecutorAsync(task, taskDefinition, this.Task.ContextData, arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false); await taskExecutor.ExecuteAsync(this.CancellationTokenSource!.Token).ConfigureAwait(false); + if (this.Task.ContextData != taskExecutor.Task.ContextData) await this.Task.SetContextDataAsync(taskExecutor.Task.ContextData, this.CancellationTokenSource!.Token).ConfigureAwait(false); this.Offset++; } + if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until) && await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), this.CancellationTokenSource!.Token).ConfigureAwait(false)) + { + this.KeepConsume = false; + return; + } } /// diff --git a/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs index 902312732..3057ae383 100644 --- a/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs @@ -64,8 +64,8 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken var context = await this.Task.CorrelateAsync(cancellationToken).ConfigureAwait(false); var events = this.Task.Definition.Listen.Read switch { - EventReadMode.Data or EventReadMode.Raw => context.Events.Select(e => e.Value.Data), - EventReadMode.Envelope => context.Events.Select(e => e.Value.Data), + EventReadMode.Data or EventReadMode.Raw or null => context.Events.Select(e => e.Value.Data), + EventReadMode.Envelope => context.Events.Select(e => e.Value), _ => throw new NotSupportedException($"The specified event read mode '{this.Task.Definition.Listen.Read}' is not supported") }; await this.SetResultAsync(events, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false); @@ -115,7 +115,7 @@ protected virtual async Task OnStreamingEventAsync(IStreamedCloudEvent e) var arguments = this.GetExpressionEvaluationArguments(); var eventData = this.Task.Definition.Listen.Read switch { - EventReadMode.Data or EventReadMode.Raw => e.Event.Data, + EventReadMode.Data or EventReadMode.Raw or null => e.Event.Data, EventReadMode.Envelope => e.Event, _ => throw new NotSupportedException($"The specified event read mode '{this.Task.Definition.Listen.Read}' is not supported") }; diff --git a/src/runner/Synapse.Runner/Synapse.Runner.csproj b/src/runner/Synapse.Runner/Synapse.Runner.csproj index fa42a182e..4032ca3aa 100644 --- a/src/runner/Synapse.Runner/Synapse.Runner.csproj +++ b/src/runner/Synapse.Runner/Synapse.Runner.csproj @@ -8,7 +8,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors @@ -59,15 +59,15 @@ - + - - - - - + + + + + diff --git a/src/runner/Synapse.Runner/WorkflowOutputFormat.cs b/src/runner/Synapse.Runner/WorkflowOutputFormat.cs index b87090543..270230664 100644 --- a/src/runner/Synapse.Runner/WorkflowOutputFormat.cs +++ b/src/runner/Synapse.Runner/WorkflowOutputFormat.cs @@ -28,6 +28,6 @@ public enum WorkflowOutputFormat /// /// Indicates that the workflow output should be formatted to YAML /// - [EnumMember(Value = "json")] + [EnumMember(Value = "yaml")] Yaml } \ No newline at end of file diff --git a/src/runtime/Synapse.Runtime.Abstractions/Synapse.Runtime.Abstractions.csproj b/src/runtime/Synapse.Runtime.Abstractions/Synapse.Runtime.Abstractions.csproj index ff6dcb960..8cdc67a3a 100644 --- a/src/runtime/Synapse.Runtime.Abstractions/Synapse.Runtime.Abstractions.csproj +++ b/src/runtime/Synapse.Runtime.Abstractions/Synapse.Runtime.Abstractions.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/runtime/Synapse.Runtime.Docker/Synapse.Runtime.Docker.csproj b/src/runtime/Synapse.Runtime.Docker/Synapse.Runtime.Docker.csproj index 483e7fae8..aa3669edf 100644 --- a/src/runtime/Synapse.Runtime.Docker/Synapse.Runtime.Docker.csproj +++ b/src/runtime/Synapse.Runtime.Docker/Synapse.Runtime.Docker.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/runtime/Synapse.Runtime.Kubernetes/Synapse.Runtime.Kubernetes.csproj b/src/runtime/Synapse.Runtime.Kubernetes/Synapse.Runtime.Kubernetes.csproj index 038a9519c..933b081ee 100644 --- a/src/runtime/Synapse.Runtime.Kubernetes/Synapse.Runtime.Kubernetes.csproj +++ b/src/runtime/Synapse.Runtime.Kubernetes/Synapse.Runtime.Kubernetes.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/src/runtime/Synapse.Runtime.Native/Synapse.Runtime.Native.csproj b/src/runtime/Synapse.Runtime.Native/Synapse.Runtime.Native.csproj index 59922664b..a79161d70 100644 --- a/src/runtime/Synapse.Runtime.Native/Synapse.Runtime.Native.csproj +++ b/src/runtime/Synapse.Runtime.Native/Synapse.Runtime.Native.csproj @@ -7,7 +7,7 @@ en True 1.0.0 - alpha5.12 + $(VersionPrefix) $(VersionPrefix) The Synapse Authors diff --git a/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj b/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj index 8d28aa017..f2805b3f1 100644 --- a/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj +++ b/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj @@ -10,14 +10,14 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + - + diff --git a/tests/Synapse.UnitTests/Synapse.UnitTests.csproj b/tests/Synapse.UnitTests/Synapse.UnitTests.csproj index 33ad199ea..d4adf3a6b 100644 --- a/tests/Synapse.UnitTests/Synapse.UnitTests.csproj +++ b/tests/Synapse.UnitTests/Synapse.UnitTests.csproj @@ -10,20 +10,20 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + - - - - - + + + + +