diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java index 8d66828b..fd11eae1 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java @@ -23,7 +23,7 @@ import io.serverlessworkflow.api.types.func.ForTaskFunction; import io.serverlessworkflow.api.types.func.TypedFunction; import io.serverlessworkflow.impl.WorkflowApplication; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.executors.ForExecutor.ForExecutorBuilder; @@ -36,7 +36,7 @@ public class JavaForExecutorBuilder extends ForExecutorBuilder { protected JavaForExecutorBuilder( - WorkflowPosition position, + WorkflowMutablePosition position, ForTask task, Workflow workflow, WorkflowApplication application, diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java index 118b4e9a..4ad24fc3 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java @@ -22,7 +22,7 @@ import io.serverlessworkflow.api.types.func.SwitchCaseFunction; import io.serverlessworkflow.api.types.func.TypedPredicate; import io.serverlessworkflow.impl.WorkflowApplication; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder; import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; @@ -33,7 +33,7 @@ public class JavaSwitchExecutorBuilder extends SwitchExecutorBuilder { protected JavaSwitchExecutorBuilder( - WorkflowPosition position, + WorkflowMutablePosition position, SwitchTask task, Workflow workflow, WorkflowApplication application, diff --git a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java index 8dfce9de..b9c7fa7b 100644 --- a/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java +++ b/experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java @@ -19,7 +19,7 @@ import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowApplication; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory; import io.serverlessworkflow.impl.executors.TaskExecutorBuilder; import io.serverlessworkflow.impl.resources.ResourceLoader; @@ -27,7 +27,7 @@ public class JavaTaskExecutorFactory extends DefaultTaskExecutorFactory { public TaskExecutorBuilder getTaskExecutor( - WorkflowPosition position, + WorkflowMutablePosition position, Task task, Workflow workflow, WorkflowApplication application, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java index 5ad4934f..8f5a685a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java @@ -19,7 +19,7 @@ import java.util.Deque; import java.util.stream.Collectors; -public class QueueWorkflowPosition implements WorkflowPosition { +public class QueueWorkflowPosition implements WorkflowMutablePosition { private Deque queue; @@ -36,13 +36,13 @@ public QueueWorkflowPosition copy() { } @Override - public WorkflowPosition addIndex(int index) { + public WorkflowMutablePosition addIndex(int index) { queue.add(index); return this; } @Override - public WorkflowPosition addProperty(String prop) { + public WorkflowMutablePosition addProperty(String prop) { queue.add(prop); return this; } @@ -58,7 +58,7 @@ public String toString() { } @Override - public WorkflowPosition back() { + public WorkflowMutablePosition back() { queue.removeLast(); return this; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/StringBufferWorkflowPosition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/StringBufferWorkflowPosition.java index 18aaf8e4..d5ba53a8 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/StringBufferWorkflowPosition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/StringBufferWorkflowPosition.java @@ -15,7 +15,7 @@ */ package io.serverlessworkflow.impl; -public class StringBufferWorkflowPosition implements WorkflowPosition { +public class StringBufferWorkflowPosition implements WorkflowMutablePosition { private StringBuilder sb; @@ -32,13 +32,13 @@ public StringBufferWorkflowPosition copy() { } @Override - public WorkflowPosition addIndex(int index) { + public WorkflowMutablePosition addIndex(int index) { sb.append('/').append(index); return this; } @Override - public WorkflowPosition addProperty(String prop) { + public WorkflowMutablePosition addProperty(String prop) { sb.append('/').append(prop); return this; } @@ -54,7 +54,7 @@ public String toString() { } @Override - public WorkflowPosition back() { + public WorkflowMutablePosition back() { int indexOf = sb.lastIndexOf("/"); if (indexOf != -1) { sb.substring(0, indexOf); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index 91c4abab..6ede1194 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -22,7 +22,7 @@ import java.util.Map; import java.util.Optional; -public class TaskContext { +public class TaskContext implements TaskContextData { private final WorkflowModel rawInput; private final TaskBase task; @@ -81,14 +81,17 @@ public void input(WorkflowModel input) { this.output = input; } + @Override public WorkflowModel input() { return input; } + @Override public WorkflowModel rawInput() { return rawInput; } + @Override public TaskBase task() { return task; } @@ -99,6 +102,7 @@ public TaskContext rawOutput(WorkflowModel output) { return this; } + @Override public WorkflowModel rawOutput() { return rawOutput; } @@ -108,26 +112,32 @@ public TaskContext output(WorkflowModel output) { return this; } + @Override public WorkflowModel output() { return output; } + @Override public WorkflowPosition position() { return position; } + @Override public Map variables() { return contextVariables; } + @Override public Instant startedAt() { return startedAt; } + @Override public Optional parent() { return parentContext; } + @Override public String taskName() { return taskName; } @@ -137,6 +147,7 @@ public TaskContext completedAt(Instant instant) { return this; } + @Override public Instant completedAt() { return completedAt; } @@ -149,4 +160,17 @@ public TaskContext transition(TransitionInfo transition) { this.transition = transition; return this; } + + @Override + public String toString() { + return "TaskContext [position=" + + position + + ", startedAt=" + + startedAt + + ", taskName=" + + taskName + + ", completedAt=" + + completedAt + + "]"; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java new file mode 100644 index 00000000..e4e19b62 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContextData.java @@ -0,0 +1,46 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import io.serverlessworkflow.api.types.TaskBase; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; + +public interface TaskContextData { + + WorkflowModel input(); + + WorkflowModel rawInput(); + + TaskBase task(); + + WorkflowModel rawOutput(); + + WorkflowModel output(); + + WorkflowPosition position(); + + Map variables(); + + Instant startedAt(); + + Optional parent(); + + String taskName(); + + Instant completedAt(); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 16063c60..96a2d47e 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -26,6 +26,7 @@ import io.serverlessworkflow.impl.executors.TaskExecutorFactory; import io.serverlessworkflow.impl.expressions.ExpressionFactory; import io.serverlessworkflow.impl.expressions.RuntimeDescriptor; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory; import io.serverlessworkflow.impl.resources.ResourceLoaderFactory; import io.serverlessworkflow.impl.resources.StaticResource; @@ -33,12 +34,13 @@ import io.serverlessworkflow.impl.schema.SchemaValidatorFactory; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Map; import java.util.ServiceLoader; +import java.util.ServiceLoader.Provider; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; public class WorkflowApplication implements AutoCloseable { @@ -127,7 +129,10 @@ public SchemaValidator getValidator(SchemaInline inline) { private TaskExecutorFactory taskFactory; private ExpressionFactory exprFactory; - private Collection listeners; + private Collection listeners = + ServiceLoader.load(WorkflowExecutionListener.class).stream() + .map(Provider::get) + .collect(Collectors.toList()); private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get(); private SchemaValidatorFactory schemaValidatorFactory; private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition(); @@ -141,9 +146,6 @@ public SchemaValidator getValidator(SchemaInline inline) { private Builder() {} public Builder withListener(WorkflowExecutionListener listener) { - if (listeners == null) { - listeners = new HashSet<>(); - } listeners.add(listener); return this; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java index 6960ca66..6243ab17 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java @@ -15,20 +15,26 @@ */ package io.serverlessworkflow.impl; -public class WorkflowContext { +public class WorkflowContext implements WorkflowContextData { private final WorkflowDefinition definition; - private final WorkflowInstance instance; + private final WorkflowMutableInstance instance; private WorkflowModel context; - WorkflowContext(WorkflowDefinition definition, WorkflowInstance instance) { + WorkflowContext(WorkflowDefinition definition, WorkflowMutableInstance instance) { this.definition = definition; this.instance = instance; } - public WorkflowInstance instance() { + @Override + public WorkflowInstanceData instanceData() { return instance; } + public WorkflowMutableInstance instance() { + return instance; + } + + @Override public WorkflowModel context() { return context; } @@ -37,7 +43,19 @@ public void context(WorkflowModel context) { this.context = context; } + @Override public WorkflowDefinition definition() { return definition; } + + @Override + public String toString() { + return "WorkflowContext [definition=" + + definition.workflow().getDocument().getName() + + ", instance=" + + instance + + ", context=" + + context + + "]"; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContextData.java similarity index 75% rename from impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContextData.java index c121bb41..dea7c9e3 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContextData.java @@ -15,11 +15,11 @@ */ package io.serverlessworkflow.impl; -import io.serverlessworkflow.api.types.TaskBase; +public interface WorkflowContextData { -public interface WorkflowExecutionListener { + WorkflowInstanceData instanceData(); - void onTaskStarted(WorkflowPosition currentPos, TaskBase task); + WorkflowModel context(); - void onTaskEnded(WorkflowPosition currentPos, TaskBase task); + WorkflowDefinitionData definition(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 404ecf07..92b01ed9 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -25,10 +25,9 @@ import io.serverlessworkflow.impl.resources.ResourceLoader; import io.serverlessworkflow.impl.schema.SchemaValidator; import java.nio.file.Path; -import java.util.Collection; import java.util.Optional; -public class WorkflowDefinition implements AutoCloseable { +public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData { private final Workflow workflow; private Optional inputSchemaValidator = Optional.empty(); @@ -73,7 +72,7 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow, } public WorkflowInstance instance(Object input) { - return new WorkflowInstance(this, application.modelFactory().fromAny(input)); + return new WorkflowMutableInstance(this, application.modelFactory().fromAny(input)); } Optional inputSchemaValidator() { @@ -88,14 +87,11 @@ Optional inputFilter() { return inputFilter; } + @Override public Workflow workflow() { return workflow; } - public Collection listeners() { - return application.listeners(); - } - Optional outputFilter() { return outputFilter; } @@ -104,10 +100,7 @@ Optional outputSchemaValidator() { return outputSchemaValidator; } - public RuntimeDescriptorFactory runtimeDescriptorFactory() { - return application.runtimeDescriptorFactory(); - } - + @Override public WorkflowApplication application() { return application; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java new file mode 100644 index 00000000..8a0d5c0a --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinitionData.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import io.serverlessworkflow.api.types.Workflow; + +public interface WorkflowDefinitionData { + + Workflow workflow(); + + WorkflowApplication application(); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java index 88269082..e8b8e219 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java @@ -15,96 +15,8 @@ */ package io.serverlessworkflow.impl; -import io.serverlessworkflow.impl.executors.TaskExecutorHelper; -import java.time.Instant; -import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; -public class WorkflowInstance { - private final AtomicReference status; - private final String id; - private final WorkflowModel input; - - private WorkflowContext workflowContext; - private WorkflowDefinition definition; - private Instant startedAt; - private Instant completedAt; - private volatile WorkflowModel output; - private CompletableFuture completableFuture; - - WorkflowInstance(WorkflowDefinition definition, WorkflowModel input) { - this.id = definition.application().idFactory().get(); - this.input = input; - this.definition = definition; - this.status = new AtomicReference<>(WorkflowStatus.PENDING); - definition.inputSchemaValidator().ifPresent(v -> v.validate(input)); - } - - public CompletableFuture start() { - this.startedAt = Instant.now(); - this.workflowContext = new WorkflowContext(definition, this); - this.status.set(WorkflowStatus.RUNNING); - this.completableFuture = - TaskExecutorHelper.processTaskList( - definition.startTask(), - workflowContext, - Optional.empty(), - definition - .inputFilter() - .map(f -> f.apply(workflowContext, null, input)) - .orElse(input)) - .thenApply(this::whenCompleted); - return completableFuture; - } - - private WorkflowModel whenCompleted(WorkflowModel node) { - output = - workflowContext - .definition() - .outputFilter() - .map(f -> f.apply(workflowContext, null, node)) - .orElse(node); - workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output)); - status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED); - completedAt = Instant.now(); - return output; - } - - public String id() { - return id; - } - - public Instant startedAt() { - return startedAt; - } - - public Instant completedAt() { - return completedAt; - } - - public WorkflowModel input() { - return input; - } - - public WorkflowStatus status() { - return status.get(); - } - - public void status(WorkflowStatus state) { - this.status.set(state); - } - - public WorkflowModel output() { - return output; - } - - public T outputAs(Class clazz) { - return output - .as(clazz) - .orElseThrow( - () -> - new IllegalArgumentException( - "Output " + output + " cannot be converted to class " + clazz)); - } +public interface WorkflowInstance extends WorkflowInstanceData { + CompletableFuture start(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstanceData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstanceData.java new file mode 100644 index 00000000..846b70fd --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstanceData.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.time.Instant; + +public interface WorkflowInstanceData { + String id(); + + Instant startedAt(); + + Instant completedAt(); + + WorkflowModel input(); + + WorkflowStatus status(); + + WorkflowModel output(); + + T outputAs(Class clazz); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java new file mode 100644 index 00000000..cbc472c4 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutableInstance.java @@ -0,0 +1,149 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent; + +import io.serverlessworkflow.impl.executors.TaskExecutorHelper; +import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +public class WorkflowMutableInstance implements WorkflowInstance { + + protected final AtomicReference status; + private final String id; + private final WorkflowModel input; + + private WorkflowContext workflowContext; + private Instant startedAt; + private Instant completedAt; + private volatile WorkflowModel output; + private CompletableFuture completableFuture; + + WorkflowMutableInstance(WorkflowDefinition definition, WorkflowModel input) { + this.id = definition.application().idFactory().get(); + this.input = input; + this.status = new AtomicReference<>(WorkflowStatus.PENDING); + definition.inputSchemaValidator().ifPresent(v -> v.validate(input)); + this.workflowContext = new WorkflowContext(definition, this); + } + + @Override + public CompletableFuture start() { + this.startedAt = Instant.now(); + this.status.set(WorkflowStatus.RUNNING); + publishEvent( + workflowContext, l -> l.onWorkflowStarted(new WorkflowStartedEvent(workflowContext))); + this.completableFuture = + TaskExecutorHelper.processTaskList( + workflowContext.definition().startTask(), + workflowContext, + Optional.empty(), + workflowContext + .definition() + .inputFilter() + .map(f -> f.apply(workflowContext, null, input)) + .orElse(input)) + .whenComplete(this::whenFailed) + .thenApply(this::whenSuccess); + return completableFuture; + } + + private void whenFailed(WorkflowModel result, Throwable ex) { + completedAt = Instant.now(); + if (ex != null) { + status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.FAULTED); + publishEvent( + workflowContext, l -> l.onWorkflowFailed(new WorkflowFailedEvent(workflowContext, ex))); + } + } + + private WorkflowModel whenSuccess(WorkflowModel node) { + output = + workflowContext + .definition() + .outputFilter() + .map(f -> f.apply(workflowContext, null, node)) + .orElse(node); + workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output)); + status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED); + publishEvent( + workflowContext, l -> l.onWorkflowCompleted(new WorkflowCompletedEvent(workflowContext))); + return output; + } + + @Override + public String id() { + return id; + } + + @Override + public Instant startedAt() { + return startedAt; + } + + @Override + public Instant completedAt() { + return completedAt; + } + + @Override + public WorkflowModel input() { + return input; + } + + @Override + public WorkflowStatus status() { + return status.get(); + } + + @Override + public WorkflowModel output() { + return output; + } + + @Override + public T outputAs(Class clazz) { + return output + .as(clazz) + .orElseThrow( + () -> + new IllegalArgumentException( + "Output " + output + " cannot be converted to class " + clazz)); + } + + public void status(WorkflowStatus state) { + this.status.set(state); + } + + @Override + public String toString() { + return "WorkflowMutableInstance [status=" + + status + + ", id=" + + id + + ", startedAt=" + + startedAt + + ", completedAt=" + + completedAt + + "]"; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutablePosition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutablePosition.java new file mode 100644 index 00000000..20d484d1 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowMutablePosition.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +public interface WorkflowMutablePosition extends WorkflowPosition { + WorkflowMutablePosition addProperty(String prop); + + WorkflowMutablePosition addIndex(int index); + + WorkflowMutablePosition back(); + + WorkflowMutablePosition copy(); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java index 1c416100..610815cd 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java @@ -19,13 +19,5 @@ public interface WorkflowPosition { String jsonPointer(); - WorkflowPosition addProperty(String prop); - - WorkflowPosition addIndex(int index); - - WorkflowPosition back(); - - WorkflowPosition copy(); - Object last(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java index 60fa5d6a..00fe9c01 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java @@ -18,4 +18,4 @@ import java.util.function.Supplier; @FunctionalInterface -public interface WorkflowPositionFactory extends Supplier {} +public interface WorkflowPositionFactory extends Supplier {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventUtils.java index f2857ab0..2c1f46d0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CloudEventUtils.java @@ -16,11 +16,13 @@ package io.serverlessworkflow.impl.events; import io.cloudevents.CloudEvent; +import java.net.URI; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Date; import java.util.LinkedHashMap; import java.util.Map; +import java.util.UUID; public class CloudEventUtils { @@ -30,6 +32,10 @@ public static OffsetDateTime toOffset(Date date) { return date.toInstant().atOffset(ZoneOffset.UTC); } + public static String id() { + return UUID.randomUUID().toString(); + } + public static Map extensions(CloudEvent event) { Map result = new LinkedHashMap<>(); for (String name : event.getExtensionNames()) { @@ -37,4 +43,8 @@ public static Map extensions(CloudEvent event) { } return result; } + + public static URI source() { + return URI.create("reference-impl"); + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index d5a0f055..38e1630b 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -17,6 +17,7 @@ import static io.serverlessworkflow.impl.WorkflowUtils.buildWorkflowFilter; import static io.serverlessworkflow.impl.WorkflowUtils.getSchemaValidator; +import static io.serverlessworkflow.impl.lifecycle.LifecycleEventsUtils.publishEvent; import io.serverlessworkflow.api.types.Export; import io.serverlessworkflow.api.types.FlowDirective; @@ -29,9 +30,13 @@ import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowFilter; import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowPosition; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import io.serverlessworkflow.impl.resources.ResourceLoader; import io.serverlessworkflow.impl.schema.SchemaValidator; import java.time.Instant; @@ -62,7 +67,7 @@ public abstract static class AbstractTaskExecutorBuilder private Optional inputSchemaValidator = Optional.empty(); private Optional outputSchemaValidator = Optional.empty(); private Optional contextSchemaValidator = Optional.empty(); - protected final WorkflowPosition position; + protected final WorkflowMutablePosition position; protected final T task; protected final String taskName; protected final WorkflowApplication application; @@ -72,7 +77,7 @@ public abstract static class AbstractTaskExecutorBuilder private TaskExecutor instance; protected AbstractTaskExecutorBuilder( - WorkflowPosition position, + WorkflowMutablePosition position, T task, Workflow workflow, WorkflowApplication application, @@ -187,16 +192,24 @@ public CompletableFuture apply( completable .thenApply( t -> { - workflowContext - .definition() - .listeners() - .forEach(l -> l.onTaskStarted(position, task)); + publishEvent( + workflowContext, + l -> l.onTaskStarted(new TaskStartedEvent(workflowContext, taskContext))); inputSchemaValidator.ifPresent(s -> s.validate(t.rawInput())); inputProcessor.ifPresent( p -> taskContext.input(p.apply(workflowContext, t, t.rawInput()))); return t; }) .thenCompose(t -> execute(workflowContext, t)) + .whenComplete( + (t, e) -> { + if (e != null) { + publishEvent( + workflowContext, + l -> + l.onTaskFailed(new TaskFailedEvent(workflowContext, taskContext, e))); + } + }) .thenApply( t -> { outputProcessor.ifPresent( @@ -208,10 +221,11 @@ public CompletableFuture apply( p.apply(workflowContext, t, workflowContext.context()))); contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); t.completedAt(Instant.now()); - workflowContext - .definition() - .listeners() - .forEach(l -> l.onTaskEnded(position, task)); + publishEvent( + workflowContext, + l -> + l.onTaskCompleted( + new TaskCompletedEvent(workflowContext, taskContext))); return t; }), workflowContext); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java index 1dc07645..d65859be 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java @@ -21,7 +21,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.concurrent.CompletableFuture; @@ -34,7 +34,7 @@ public static class CallTaskExecutorBuilder private CallableTask callable; protected CallTaskExecutorBuilder( - WorkflowPosition position, + WorkflowMutablePosition position, T task, Workflow workflow, WorkflowApplication application, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java index bf63b5ed..a1ea9a1d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java @@ -20,7 +20,7 @@ import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowApplication; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.executors.CallTaskExecutor.CallTaskExecutorBuilder; import io.serverlessworkflow.impl.executors.DoExecutor.DoExecutorBuilder; import io.serverlessworkflow.impl.executors.EmitExecutor.EmitExecutorBuilder; @@ -50,7 +50,7 @@ protected DefaultTaskExecutorFactory() {} @Override public TaskExecutorBuilder getTaskExecutor( - WorkflowPosition position, + WorkflowMutablePosition position, Task task, Workflow workflow, WorkflowApplication application, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java index 65e3469b..bc6e5091 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java @@ -21,7 +21,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -34,7 +34,7 @@ public static class DoExecutorBuilder extends RegularTaskExecutorBuilder private TaskExecutor taskExecutor; protected DoExecutorBuilder( - WorkflowPosition position, + WorkflowMutablePosition position, DoTask task, Workflow workflow, WorkflowApplication application, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java index 8ce6c730..e13cfe47 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/EmitExecutor.java @@ -29,7 +29,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.events.CloudEventUtils; @@ -39,7 +39,6 @@ import java.time.OffsetDateTime; import java.util.Map; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.CompletableFuture; public class EmitExecutor extends RegularTaskExecutor { @@ -51,7 +50,7 @@ public static class EmitExecutorBuilder extends RegularTaskExecutorBuilder filter.apply(workflow, taskContext, taskContext.input())) - .orElse(UUID.randomUUID().toString())); + .orElse(CloudEventUtils.id())); ceBuilder.withSource( props .sourceFilter() .map(filter -> filter.apply(workflow, taskContext, taskContext.input())) .map(URI::create) - .orElse(URI.create("reference-impl"))); + .orElse(CloudEventUtils.source())); ceBuilder.withType( props .typeFilter() diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java index c77b6761..65cb578f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java @@ -21,7 +21,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; @@ -44,7 +44,7 @@ public static class ForExecutorBuilder extends RegularTaskExecutorBuilder taskExecutor; protected ForExecutorBuilder( - WorkflowPosition position, + WorkflowMutablePosition position, ForTask task, Workflow workflow, WorkflowApplication application, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java index 1353db89..046b2807 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java @@ -22,7 +22,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.executors.RegularTaskExecutor.RegularTaskExecutorBuilder; import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.HashMap; @@ -48,7 +48,7 @@ public static class ForkExecutorBuilder extends RegularTaskExecutorBuilder internalExecute( WorkflowModelCollection output = workflow.definition().application().modelFactory().createCollection(); Collection registrations = new ArrayList<>(); - workflow.instance().status(WorkflowStatus.WAITING); + ((WorkflowMutableInstance) workflow.instance()).status(WorkflowStatus.WAITING); return buildFuture( regBuilders, registrations, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RaiseExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RaiseExecutor.java index ac213e7b..2e0b70d4 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RaiseExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RaiseExecutor.java @@ -27,7 +27,7 @@ import io.serverlessworkflow.impl.WorkflowError; import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.WorkflowValueResolver; import io.serverlessworkflow.impl.resources.ResourceLoader; @@ -49,7 +49,7 @@ public static class RaiseExecutorBuilder extends RegularTaskExecutorBuilder detailFilter; protected RaiseExecutorBuilder( - WorkflowPosition position, + WorkflowMutablePosition position, RaiseTask task, Workflow workflow, WorkflowApplication application, @@ -127,6 +127,7 @@ protected RaiseExecutor(RaiseExecutorBuilder builder) { @Override protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { - throw new WorkflowException(errorBuilder.apply(workflow, taskContext)); + return CompletableFuture.failedFuture( + new WorkflowException(errorBuilder.apply(workflow, taskContext))); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java index c4a716c9..5dbcfd66 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java @@ -21,7 +21,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -41,7 +41,7 @@ public abstract static class RegularTaskExecutorBuilder private TransitionInfoBuilder transition; protected RegularTaskExecutorBuilder( - WorkflowPosition position, + WorkflowMutablePosition position, T task, Workflow workflow, WorkflowApplication application, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java index 9dc8c0a5..e4192acb 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java @@ -24,7 +24,7 @@ import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowFilter; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.concurrent.CompletableFuture; @@ -38,7 +38,7 @@ public static class SetExecutorBuilder extends RegularTaskExecutorBuilder getTaskExecutor( - WorkflowPosition position, + WorkflowMutablePosition position, Task task, Workflow workflow, WorkflowApplication application, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java index 646a16f4..0b66cf7d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java @@ -21,7 +21,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.Iterator; @@ -58,7 +58,7 @@ public static boolean isActive(WorkflowStatus status) { } public static TaskExecutor createExecutorList( - WorkflowPosition position, + WorkflowMutablePosition position, List taskItems, Workflow workflow, WorkflowApplication application, @@ -75,7 +75,7 @@ public static TaskExecutor createExecutorList( } public static Map> createBranchList( - WorkflowPosition position, + WorkflowMutablePosition position, List taskItems, Workflow workflow, WorkflowApplication application, @@ -88,7 +88,7 @@ public static Map> createBranchList( } private static Map> createExecutorBuilderList( - WorkflowPosition position, + WorkflowMutablePosition position, List taskItems, Workflow workflow, WorkflowApplication application, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java index 6a45ee13..5aa2dceb 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java @@ -27,7 +27,7 @@ import io.serverlessworkflow.impl.WorkflowError; import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowPredicate; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.resources.ResourceLoader; @@ -54,7 +54,7 @@ public static class TryExecutorBuilder extends RegularTaskExecutorBuilder> catchTaskExecutor; protected TryExecutorBuilder( - WorkflowPosition position, + WorkflowMutablePosition position, TryTask task, Workflow workflow, WorkflowApplication application, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java index 64ecde23..9e46c1ed 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java @@ -22,7 +22,8 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowMutableInstance; +import io.serverlessworkflow.impl.WorkflowMutablePosition; import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.resources.ResourceLoader; import java.time.Duration; @@ -37,7 +38,7 @@ public static class WaitExecutorBuilder extends RegularTaskExecutorBuilder internalExecute( WorkflowContext workflow, TaskContext taskContext) { - workflow.instance().status(WorkflowStatus.WAITING); + ((WorkflowMutableInstance) workflow.instance()).status(WorkflowStatus.WAITING); return new CompletableFuture() .completeOnTimeout(taskContext.output(), millisToWait.toMillis(), TimeUnit.MILLISECONDS) .thenApply( diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java new file mode 100644 index 00000000..d7fb25db --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/LifecycleEventsUtils.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.WorkflowContext; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LifecycleEventsUtils { + + private LifecycleEventsUtils() {} + + private static final Logger logger = LoggerFactory.getLogger(LifecycleEventsUtils.class); + + public static void publishEvent( + WorkflowContext workflowContext, Consumer consumer) { + workflowContext + .definition() + .application() + .listeners() + .forEach( + v -> { + try { + consumer.accept(v); + } catch (Exception ex) { + logger.error("Error processing listener. Ignoring and going on", ex); + } + }); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskCancelledEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskCancelledEvent.java new file mode 100644 index 00000000..fe3431bb --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskCancelledEvent.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; + +public class TaskCancelledEvent extends TaskEvent { + + public TaskCancelledEvent(WorkflowContext workflow, TaskContext task) { + super(workflow, task); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskCompletedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskCompletedEvent.java new file mode 100644 index 00000000..e7ab1084 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskCompletedEvent.java @@ -0,0 +1,27 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; + +public class TaskCompletedEvent extends TaskEvent { + + public TaskCompletedEvent(WorkflowContext workflow, TaskContext task) { + super(workflow, task); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskEvent.java new file mode 100644 index 00000000..10aa7e74 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskEvent.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowContextData; + +public abstract class TaskEvent extends WorkflowEvent { + + protected final TaskContextData task; + + protected TaskEvent(WorkflowContextData workflow, TaskContextData task) { + super(workflow); + this.task = task; + } + + public TaskContextData taskContext() { + return task; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskFailedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskFailedEvent.java new file mode 100644 index 00000000..a2bcd422 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskFailedEvent.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; + +public class TaskFailedEvent extends TaskEvent { + + private final Throwable cause; + + public TaskFailedEvent(WorkflowContext workflow, TaskContext task, Throwable cause) { + super(workflow, task); + this.cause = cause; + } + + public Throwable cause() { + return cause; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskResumedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskResumedEvent.java new file mode 100644 index 00000000..e9aaae8c --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskResumedEvent.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; + +public class TaskResumedEvent extends TaskEvent { + + public TaskResumedEvent(WorkflowContext workflow, TaskContext task) { + super(workflow, task); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskRetriedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskRetriedEvent.java new file mode 100644 index 00000000..5d7690e9 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskRetriedEvent.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; + +public class TaskRetriedEvent extends TaskEvent { + + public TaskRetriedEvent(WorkflowContext workflow, TaskContext task) { + super(workflow, task); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskStartedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskStartedEvent.java new file mode 100644 index 00000000..b03a6c4f --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskStartedEvent.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; + +public class TaskStartedEvent extends TaskEvent { + + public TaskStartedEvent(WorkflowContext workflow, TaskContext task) { + super(workflow, task); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskSuspendedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskSuspendedEvent.java new file mode 100644 index 00000000..6145ae94 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TaskSuspendedEvent.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; + +public class TaskSuspendedEvent extends TaskEvent { + + public TaskSuspendedEvent(WorkflowContext workflow, TaskContext task) { + super(workflow, task); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCancelledEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCancelledEvent.java new file mode 100644 index 00000000..272377c3 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCancelledEvent.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.WorkflowContextData; + +public class WorkflowCancelledEvent extends WorkflowEvent { + + public WorkflowCancelledEvent(WorkflowContextData workflow) { + super(workflow); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCompletedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCompletedEvent.java new file mode 100644 index 00000000..727a28e7 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowCompletedEvent.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.WorkflowContextData; + +public class WorkflowCompletedEvent extends WorkflowEvent { + + public WorkflowCompletedEvent(WorkflowContextData workflow) { + super(workflow); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowEvent.java new file mode 100644 index 00000000..e19ddfef --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowEvent.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.WorkflowContextData; +import java.time.OffsetDateTime; + +public abstract class WorkflowEvent { + + protected final WorkflowContextData workflow; + protected final OffsetDateTime eventDate; + + public WorkflowEvent(WorkflowContextData workflow) { + this.workflow = workflow; + this.eventDate = OffsetDateTime.now(); + } + + public WorkflowContextData workflowContext() { + return workflow; + } + + public OffsetDateTime eventDate() { + return eventDate; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java new file mode 100644 index 00000000..8d89fac7 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +public interface WorkflowExecutionListener { + + default void onWorkflowStarted(WorkflowStartedEvent ev) {} + + default void onWorkflowSuspended(WorkflowSuspendedEvent ev) {} + + default void onWorkflowResumed(WorkflowResumedEvent ev) {} + + default void onWorkflowCompleted(WorkflowCompletedEvent ev) {} + + default void onWorkflowFailed(WorkflowFailedEvent ev) {} + + default void onWorkflowCancelled(WorkflowCancelledEvent ev) {} + + default void onTaskStarted(TaskStartedEvent ev) {} + + default void onTaskCompleted(TaskCompletedEvent ev) {} + + default void onTaskFailed(TaskFailedEvent ev) {} + + default void onTaskCancelled(TaskCancelledEvent ev) {} + + default void onTaskSuspended(TaskSuspendedEvent ev) {} + + default void onTaskResumed(TaskResumedEvent ev) {} + + default void onTaskRetried(TaskRetriedEvent ev) {} +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowFailedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowFailedEvent.java new file mode 100644 index 00000000..7388ebe5 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowFailedEvent.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.WorkflowContextData; + +public class WorkflowFailedEvent extends WorkflowEvent { + + private final Throwable cause; + + public WorkflowFailedEvent(WorkflowContextData workflow, Throwable cause) { + super(workflow); + this.cause = cause; + } + + public Throwable cause() { + return cause; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowResumedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowResumedEvent.java new file mode 100644 index 00000000..53e97e04 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowResumedEvent.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.WorkflowContextData; + +public class WorkflowResumedEvent extends WorkflowEvent { + + public WorkflowResumedEvent(WorkflowContextData workflow) { + super(workflow); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowStartedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowStartedEvent.java new file mode 100644 index 00000000..d4ac34f0 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowStartedEvent.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.WorkflowContextData; + +public class WorkflowStartedEvent extends WorkflowEvent { + + public WorkflowStartedEvent(WorkflowContextData workflow) { + super(workflow); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowSuspendedEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowSuspendedEvent.java new file mode 100644 index 00000000..1975577a --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowSuspendedEvent.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle; + +import io.serverlessworkflow.impl.WorkflowContextData; + +public class WorkflowSuspendedEvent extends WorkflowEvent { + + public WorkflowSuspendedEvent(WorkflowContextData workflow) { + super(workflow); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java new file mode 100644 index 00000000..b3a30305 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java @@ -0,0 +1,165 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowDefinitionCEData.ref; +import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowErrorCEData.error; + +import io.cloudevents.CloudEventData; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.data.PojoCloudEventData; +import io.cloudevents.core.data.PojoCloudEventData.ToBytes; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.events.CloudEventUtils; +import io.serverlessworkflow.impl.events.EventPublisher; +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskEvent; +import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; +import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import java.time.OffsetDateTime; + +public abstract class AbstractLifeCyclePublisher implements WorkflowExecutionListener { + + @Override + public void onTaskStarted(TaskStartedEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new TaskStartedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), + this::convert)) + .withType("io.serverlessworkflow.task.started.v1") + .build()); + } + + @Override + public void onTaskCompleted(TaskCompletedEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new TaskCompletedCEData( + id(ev), pos(ev), ref(ev), ev.eventDate(), output(ev)), + this::convert)) + .withType("io.serverlessworkflow.task.completed.v1") + .build()); + } + + @Override + public void onTaskFailed(TaskFailedEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new TaskFailedCEData(id(ev), pos(ev), ref(ev), ev.eventDate(), error(ev)), + this::convert)) + .withType("io.serverlessworkflow.task.faulted.v1") + .build()); + } + + @Override + public void onWorkflowStarted(WorkflowStartedEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new WorkflowStartedCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) + .withType("io.serverlessworkflow.workflow.started.v1") + .build()); + } + + @Override + public void onWorkflowCompleted(WorkflowCompletedEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new WorkflowCompletedCEData(id(ev), ref(ev), ev.eventDate(), output(ev)), + this::convert)) + .withType("io.serverlessworkflow.workflow.completed.v1") + .build()); + } + + @Override + public void onWorkflowFailed(WorkflowFailedEvent ev) { + eventPublisher(ev) + .publish( + builder() + .withData( + cloudEventData( + new WorkflowFailedCEData(id(ev), ref(ev), ev.eventDate(), error(ev)), + this::convert)) + .withType("io.serverlessworkflow.workflow.faulted.v1") + .build()); + } + + protected abstract byte[] convert(WorkflowStartedCEData data); + + protected abstract byte[] convert(WorkflowCompletedCEData data); + + protected abstract byte[] convert(TaskStartedCEData data); + + protected abstract byte[] convert(TaskCompletedCEData data); + + protected abstract byte[] convert(TaskFailedCEData data); + + protected abstract byte[] convert(WorkflowFailedCEData data); + + private static CloudEventData cloudEventData(T data, ToBytes toBytes) { + return PojoCloudEventData.wrap(data, toBytes); + } + + private static CloudEventBuilder builder() { + return CloudEventBuilder.v1() + .withId(CloudEventUtils.id()) + .withSource(CloudEventUtils.source()) + .withTime(OffsetDateTime.now()); + } + + private static String id(WorkflowEvent ev) { + return ev.workflowContext().instanceData().id(); + } + + private static String pos(TaskEvent ev) { + return ev.taskContext().position().jsonPointer(); + } + + private static Object output(WorkflowEvent ev) { + return from(ev.workflowContext().instanceData().output()); + } + + private static EventPublisher eventPublisher(WorkflowEvent ev) { + return ev.workflowContext().definition().application().eventPublisher(); + } + + private static Object output(TaskEvent ev) { + return from(ev.taskContext().output()); + } + + private static Object from(WorkflowModel model) { + return model.asJavaObject(); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCompletedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCompletedCEData.java new file mode 100644 index 00000000..1480aa84 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCompletedCEData.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record TaskCompletedCEData( + String workflow, + String task, + WorkflowDefinitionCEData definition, + OffsetDateTime completedAt, + Object output) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskFailedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskFailedCEData.java new file mode 100644 index 00000000..ca80846f --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskFailedCEData.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record TaskFailedCEData( + String workflow, + String task, + WorkflowDefinitionCEData definition, + OffsetDateTime faultedAt, + WorkflowErrorCEData error) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskStartedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskStartedCEData.java new file mode 100644 index 00000000..b9d659fc --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskStartedCEData.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record TaskStartedCEData( + String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime startedAt) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCompletedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCompletedCEData.java new file mode 100644 index 00000000..f025f7a7 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCompletedCEData.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record WorkflowCompletedCEData( + String name, WorkflowDefinitionCEData definition, OffsetDateTime completedAt, Object output) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowDefinitionCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowDefinitionCEData.java new file mode 100644 index 00000000..ffdbce9d --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowDefinitionCEData.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import io.serverlessworkflow.api.types.Document; +import io.serverlessworkflow.impl.lifecycle.WorkflowEvent; + +public record WorkflowDefinitionCEData(String namespace, String name, String version) { + + public static WorkflowDefinitionCEData ref(WorkflowEvent ev) { + Document document = ev.workflowContext().definition().workflow().getDocument(); + return new WorkflowDefinitionCEData( + document.getNamespace(), document.getName(), document.getVersion()); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowErrorCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowErrorCEData.java new file mode 100644 index 00000000..ee277053 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowErrorCEData.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; +import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.concurrent.CompletionException; + +public record WorkflowErrorCEData( + String type, Integer status, String instance, String title, String detail) { + + public static WorkflowErrorCEData error(TaskFailedEvent ev) { + return error(ev.cause()); + } + + public static WorkflowErrorCEData error(WorkflowFailedEvent ev) { + return error(ev.cause()); + } + + private static WorkflowErrorCEData error(Throwable cause) { + + if (cause instanceof CompletionException) { + cause = cause.getCause(); + } + return cause instanceof WorkflowException ex ? error(ex) : commonError(cause); + } + + private static WorkflowErrorCEData commonError(Throwable cause) { + StringWriter stackTrace = new StringWriter(); + try (PrintWriter writer = new PrintWriter(stackTrace)) { + cause.printStackTrace(writer); + return new WorkflowErrorCEData( + cause.getClass().getTypeName(), null, stackTrace.toString(), null, cause.getMessage()); + } + } + + private static WorkflowErrorCEData error(WorkflowException ex) { + WorkflowError error = ex.getWorflowError(); + return new WorkflowErrorCEData( + error.type(), error.status(), error.instance(), error.title(), error.details()); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowFailedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowFailedCEData.java new file mode 100644 index 00000000..6c71c4bf --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowFailedCEData.java @@ -0,0 +1,24 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record WorkflowFailedCEData( + String name, + WorkflowDefinitionCEData definition, + OffsetDateTime faultedAt, + WorkflowErrorCEData error) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStartedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStartedCEData.java new file mode 100644 index 00000000..89ae2cda --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStartedCEData.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import java.time.OffsetDateTime; + +public record WorkflowStartedCEData( + String name, WorkflowDefinitionCEData definition, OffsetDateTime startedAt) {} diff --git a/impl/jackson/pom.xml b/impl/jackson/pom.xml index babc6904..8079cbab 100644 --- a/impl/jackson/pom.xml +++ b/impl/jackson/pom.xml @@ -53,5 +53,9 @@ logback-classic test + + org.mockito + mockito-core + \ No newline at end of file diff --git a/impl/jackson/src/main/java/io/serverlessworkflow/impl/events/json/JacksonLifeCyclePublisher.java b/impl/jackson/src/main/java/io/serverlessworkflow/impl/events/json/JacksonLifeCyclePublisher.java new file mode 100644 index 00000000..c785a273 --- /dev/null +++ b/impl/jackson/src/main/java/io/serverlessworkflow/impl/events/json/JacksonLifeCyclePublisher.java @@ -0,0 +1,68 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events.json; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import io.serverlessworkflow.impl.lifecycle.ce.AbstractLifeCyclePublisher; +import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskFailedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowFailedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData; +import java.io.UncheckedIOException; + +public class JacksonLifeCyclePublisher extends AbstractLifeCyclePublisher { + + @Override + protected byte[] convert(WorkflowStartedCEData data) { + return genericConvert(data); + } + + @Override + protected byte[] convert(WorkflowCompletedCEData data) { + return genericConvert(data); + } + + @Override + protected byte[] convert(TaskStartedCEData data) { + return genericConvert(data); + } + + @Override + protected byte[] convert(TaskCompletedCEData data) { + return genericConvert(data); + } + + @Override + protected byte[] convert(TaskFailedCEData data) { + return genericConvert(data); + } + + @Override + protected byte[] convert(WorkflowFailedCEData data) { + return genericConvert(data); + } + + protected byte[] genericConvert(T data) { + try { + return JsonUtils.mapper().writeValueAsBytes(data); + } catch (JsonProcessingException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/impl/jackson/src/main/resources/META-INF/services/io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener b/impl/jackson/src/main/resources/META-INF/services/io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener new file mode 100644 index 00000000..7324bcc0 --- /dev/null +++ b/impl/jackson/src/main/resources/META-INF/services/io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener @@ -0,0 +1 @@ +io.serverlessworkflow.impl.events.json.JacksonLifeCyclePublisher \ No newline at end of file diff --git a/impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java b/impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java new file mode 100644 index 00000000..53cb792a --- /dev/null +++ b/impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java @@ -0,0 +1,127 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.data.PojoCloudEventData; +import io.serverlessworkflow.api.WorkflowReader; +import io.serverlessworkflow.impl.events.EventRegistrationBuilder; +import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowErrorCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowFailedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class LifeCycleEventsTest { + + private static WorkflowApplication appl; + private static Collection publishedEvents; + + @BeforeAll + static void init() { + appl = WorkflowApplication.builder().build(); + appl.eventConsumer() + .listenToAll(appl) + .forEach( + v -> + appl.eventConsumer() + .register( + (EventRegistrationBuilder) v, ce -> publishedEvents.add((CloudEvent) ce))); + } + + @AfterAll + static void cleanup() { + appl.close(); + } + + @BeforeEach + void setup() { + publishedEvents = new ArrayList<>(); + } + + @AfterEach + void close() { + publishedEvents = new ArrayList<>(); + } + + @Test + void simpleWorkflow() throws IOException { + WorkflowModel model = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("simple-expression.yaml")) + .instance(Map.of()) + .start() + .join(); + WorkflowCompletedCEData workflowCompletedEvent = + assertPojoInCE( + "io.serverlessworkflow.workflow.completed.v1", WorkflowCompletedCEData.class); + assertThat(workflowCompletedEvent.output()).isEqualTo(model.asJavaObject()); + WorkflowStartedCEData workflowStartedEvent = + assertPojoInCE("io.serverlessworkflow.workflow.started.v1", WorkflowStartedCEData.class); + assertThat(workflowStartedEvent.startedAt()).isBefore(workflowCompletedEvent.completedAt()); + TaskCompletedCEData taskCompletedEvent = + assertPojoInCE("io.serverlessworkflow.task.completed.v1", TaskCompletedCEData.class); + assertThat(taskCompletedEvent.output()).isEqualTo(model.asJavaObject()); + assertThat(taskCompletedEvent.completedAt()).isBefore(workflowCompletedEvent.completedAt()); + TaskStartedCEData taskStartedEvent = + assertPojoInCE("io.serverlessworkflow.task.started.v1", TaskStartedCEData.class); + assertThat(taskStartedEvent.startedAt()).isAfter(workflowStartedEvent.startedAt()); + assertThat(taskStartedEvent.startedAt()).isBefore(taskCompletedEvent.completedAt()); + } + + @Test + void testError() throws IOException { + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("raise-inline.yaml")) + .instance(Map.of()) + .start(); + WorkflowErrorCEData error = + assertPojoInCE("io.serverlessworkflow.workflow.faulted.v1", WorkflowFailedCEData.class) + .error(); + assertThat(error.type()).isEqualTo("https://serverlessworkflow.io/errors/not-implemented"); + assertThat(error.title()).isEqualTo("Not Implemented"); + assertThat(error.status()).isEqualTo(500); + assertThat(error.detail()).contains("raise-not-implemented"); + } + + private T assertPojoInCE(String type, Class clazz) { + return assertPojoInCE(type, clazz, 1L); + } + + private T assertPojoInCE(String type, Class clazz, long count) { + assertThat(publishedEvents.stream().filter(ev -> ev.getType().equals(type)).count()) + .isEqualTo(count); + Optional event = + publishedEvents.stream().filter(ev -> ev.getType().equals(type)).findAny(); + assertThat(event) + .hasValueSatisfying(ce -> assertThat(ce.getData()).isInstanceOf(PojoCloudEventData.class)); + assertThat(event) + .hasValueSatisfying( + ce -> assertThat(((PojoCloudEventData) ce.getData()).getValue()).isInstanceOf(clazz)); + return clazz.cast(((PojoCloudEventData) event.orElseThrow().getData()).getValue()); + } +} diff --git a/impl/jackson/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java b/impl/jackson/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java index 1a338248..8a8e7816 100644 --- a/impl/jackson/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java +++ b/impl/jackson/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java @@ -90,7 +90,7 @@ private static Stream provideParameters() { Map.of("enabled", false), WorkflowDefinitionTest::checkDisableCondition), args( - "raise-inline copy.yaml", + "raise-inline.yaml", WorkflowDefinitionTest::checkWorkflowException, WorkflowException.class), args( diff --git a/impl/jackson/src/test/resources/raise-inline copy.yaml b/impl/jackson/src/test/resources/raise-inline.yaml similarity index 100% rename from impl/jackson/src/test/resources/raise-inline copy.yaml rename to impl/jackson/src/test/resources/raise-inline.yaml