@@ -152,6 +152,9 @@
WorkflowDefinition workflowDefinition = null!;
WorkflowInstance? workflowInstance;
ProblemDetails? problemDetails;
+ HorizontalCollapsible instancesListPanel = null!;
+ HorizontalCollapsible graphPanel = null!;
+ HorizontalCollapsible definitionPanel = null!;
readonly IEnumerable
columns =
[
"Name",
@@ -167,6 +170,7 @@
///
protected override async Task OnInitializedAsync()
{
+ Store.DisableNamespaceListing();
UpdateBreadcrumb();
Store.WorkflowInstanceName.Subscribe(value => OnStateChanged(_ => instanceName = value), token: CancellationTokenSource.Token);
Store.WorkflowDefinition.Where(value => value != null).Subscribe(value => OnStateChanged(_ => workflowDefinition = value!), token: CancellationTokenSource.Token);
@@ -189,8 +193,9 @@
await base.OnInitializedAsync().ConfigureAwait(false);
}
+
///
- protected override void OnParametersSet()
+ protected override async Task OnParametersSetAsync()
{
if (Version != version)
{
@@ -199,7 +204,14 @@
if (InstanceName != instanceName)
{
Store.SetWorkflowInstanceName(InstanceName);
+ if (!string.IsNullOrWhiteSpace(InstanceName))
+ {
+ await instancesListPanel.HideAsync();
+ await graphPanel.HideAsync();
+ await definitionPanel.HideAsync();
+ }
}
+ await base.OnParametersSetAsync();
}
///
@@ -263,8 +275,11 @@
void OnShowInstanceDetails(WorkflowInstance instance) => NavigationManager.NavigateTo($"/workflows/details/{@namespace}/{Name}/{version}/{instance.GetName()}");
- void OnCloseWorkflowInstance()
+ async Task OnCloseWorkflowInstanceAsync()
{
+ await instancesListPanel.ShowAsync();
+ await graphPanel.ShowAsync();
+ await definitionPanel.ShowAsync();
NavigationManager.NavigateTo($"/workflows/details/{@namespace}/{Name}/{version}");
StateHasChanged();
}
diff --git a/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj b/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj
index 4559655e9..f6a2a55eb 100644
--- a/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj
+++ b/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj
@@ -10,14 +10,14 @@
-
+
-
+
diff --git a/src/operator/Synapse.Operator/Synapse.Operator.csproj b/src/operator/Synapse.Operator/Synapse.Operator.csproj
index d072a8852..e42903576 100644
--- a/src/operator/Synapse.Operator/Synapse.Operator.csproj
+++ b/src/operator/Synapse.Operator/Synapse.Operator.csproj
@@ -8,7 +8,7 @@
en
True
1.0.0
- alpha5.12
+
$(VersionPrefix)
$(VersionPrefix)
The Synapse Authors
diff --git a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs
index cb4ccfb00..1c5df3660 100644
--- a/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs
+++ b/src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs
@@ -424,7 +424,7 @@ public virtual async Task CorrelateAsync(ITaskExecutionConte
{
ArgumentNullException.ThrowIfNull(task);
if (task.Definition is not ListenTaskDefinition listenTask) throw new ArgumentException("The specified task's definition must be a 'listen' task", nameof(task));
- if (listenTask.Foreach == null) throw new ArgumentException($"Since the specified listen task uses streaming, the {nameof(StreamAsync)} method must be used instead");
+ if (listenTask.Foreach != null) throw new ArgumentException($"Since the specified listen task uses streaming, the {nameof(StreamAsync)} method must be used instead");
if (this.Instance.Status?.Correlation?.Contexts?.TryGetValue(task.Instance.Reference.OriginalString, out var context) == true && context != null) return context;
var @namespace = task.Workflow.Instance.GetNamespace()!;
var name = $"{task.Workflow.Instance.GetName()}.{task.Instance.Id}";
@@ -449,6 +449,7 @@ public virtual async Task CorrelateAsync(ITaskExecutionConte
Source = new ResourceReference(task.Workflow.Instance.GetName(), task.Workflow.Instance.GetNamespace()),
Lifetime = CorrelationLifetime.Ephemeral,
Events = listenTask.Listen.To,
+ Keys = this.Instance.Status?.Correlation?.Keys,
Expressions = task.Workflow.Definition.Evaluate ?? new(),
Outcome = new()
{
@@ -511,6 +512,17 @@ public virtual async Task CorrelateAsync(ITaskExecutionConte
CompletedAt = DateTimeOffset.Now
}
}, cancellationToken).ConfigureAwait(false);
+ using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false);
+ this.Instance = await this.Api.WorkflowInstances.GetAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken).ConfigureAwait(false);
+ var originalInstance = this.Instance.Clone();
+ foreach(var correlationKey in correlationContext.Keys)
+ {
+ this.Instance.Status!.Correlation!.Keys ??= [];
+ this.Instance.Status!.Correlation!.Keys[correlationKey.Key] = correlationKey.Value;
+ }
+ this.Instance.Status!.Correlation!.Contexts!.Remove(task.Instance.Reference.OriginalString);
+ var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(originalInstance, this.Instance);
+ this.Instance = await this.Api.WorkflowInstances.PatchStatusAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, new Patch(PatchType.JsonPatch, jsonPatch), null, cancellationToken).ConfigureAwait(false);
return correlationContext;
}
diff --git a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
index 3159cca90..5d4e71c7d 100644
--- a/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
+++ b/src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs
@@ -18,6 +18,7 @@
using Neuroglia.AsyncApi.IO;
using Neuroglia.AsyncApi.v3;
using Neuroglia.Data.Expressions;
+using System.Threading;
namespace Synapse.Runner.Services.Executors;
@@ -94,6 +95,11 @@ public class AsyncApiCallExecutor(IServiceProvider serviceProvider, ILogger
protected uint? Offset { get; set; }
+ ///
+ /// Gets/sets a boolean indicating whether or not to keep consuming incoming messages
+ ///
+ protected bool KeepConsume { get; set; } = true;
+
///
/// Gets the path for the specified message
///
@@ -234,7 +240,7 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken
if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation");
await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document);
var parameters = new AsyncApiSubscribeOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol);
- await using var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false);
+ var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false);
if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI subscribe operation");
if(result.Messages == null)
{
@@ -244,24 +250,24 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken
var observable = result.Messages;
if (this.AsyncApi.Subscription.Consume.For != null) observable = observable.TakeUntil(Observable.Timer(this.AsyncApi.Subscription.Consume.For.ToTimeSpan()));
if (this.AsyncApi.Subscription.Consume.Amount.HasValue) observable = observable.Take(this.AsyncApi.Subscription.Consume.Amount.Value);
- else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) observable = observable.Select(message => Observable.FromAsync(async () =>
- {
- var keepGoing = await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!,this.GetExpressionEvaluationArguments(),cancellationToken).ConfigureAwait(false);
- return (message, keepGoing);
- })).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message);
- else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) observable = observable.Select(message => Observable.FromAsync(async () =>
- {
- var keepGoing = !(await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false));
- return (message, keepGoing);
- })).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message);
if (this.AsyncApi.Subscription.Foreach == null)
{
- var messages = await observable.ToAsyncEnumerable().ToListAsync(cancellationToken).ConfigureAwait(false);
+ var messages = await observable.ToAsyncEnumerable().TakeWhileAwait(async m =>
+ {
+ if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) return await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
+ if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) return !await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
+ return true;
+ }).ToListAsync(cancellationToken).ConfigureAwait(false);
await this.SetResultAsync(messages, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
}
else
{
- this.Subscription = observable.SubscribeAsync(OnStreamingMessageAsync, OnStreamingErrorAsync, OnStreamingCompletedAsync);
+ //todo: fix
+ this.Subscription = observable.TakeWhile(_ => this.KeepConsume).SelectMany(m =>
+ {
+ OnStreamingMessageAsync(m).GetAwaiter().GetResult();
+ return Observable.Return(m);
+ }).SubscribeAsync(_ => System.Threading.Tasks.Task.CompletedTask, OnStreamingErrorAsync, OnStreamingCompletedAsync);
}
}
@@ -274,6 +280,11 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message)
{
if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation");
+ if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While) && !await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!, this.GetExpressionEvaluationArguments(), this.CancellationTokenSource!.Token).ConfigureAwait(false))
+ {
+ this.KeepConsume = false;
+ return;
+ }
if (this.AsyncApi.Subscription.Foreach?.Do != null)
{
var taskDefinition = new DoTaskDefinition()
@@ -284,10 +295,15 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message)
new(SynapseDefaults.Tasks.Metadata.PathPrefix.Name, false)
]
};
- var arguments = this.GetExpressionEvaluationArguments();
var messageData = message as object;
+ var offset = this.Offset ?? 0;
+ if (!this.Offset.HasValue) this.Offset = 0;
+ var arguments = this.GetExpressionEvaluationArguments();
+ arguments ??= new Dictionary();
+ arguments[this.AsyncApi.Subscription.Foreach.Item ?? RuntimeExpressions.Arguments.Each] = messageData!;
+ arguments[this.AsyncApi.Subscription.Foreach.At ?? RuntimeExpressions.Arguments.Index] = offset;
if (this.AsyncApi.Subscription.Foreach.Output?.As is string fromExpression) messageData = await this.Task.Workflow.Expressions.EvaluateAsync