Skip to content

Commit dd8e6e1

Browse files
committed
Refactor workflow to run until receive a finalized message
Signed-off-by: Ricardo Zanini <ricardozanini@gmail.com>
1 parent a60c1cc commit dd8e6e1

File tree

8 files changed

+352
-103
lines changed

8 files changed

+352
-103
lines changed

fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,22 @@ void chat_bot() {
7272
t ->
7373
t.listen(
7474
l ->
75-
l.until(
76-
message ->
77-
!message
78-
.getOrDefault("userInput", "")
79-
.toString()
80-
.isEmpty(),
81-
Map.class)
82-
.any(
83-
c ->
84-
c.with(event -> event.type("org.acme.chatbot.request")))
75+
l.to(
76+
to ->
77+
to.any(
78+
c ->
79+
c.with(
80+
event ->
81+
event.type(
82+
"org.acme.chatbot.request")))
83+
.until(
84+
until ->
85+
until.one(
86+
one ->
87+
one.with(
88+
e ->
89+
e.type(
90+
"org.acme.chatbot.finalize")))))
8591
.forEach(
8692
f ->
8793
f.tasks(
@@ -94,16 +100,7 @@ void chat_bot() {
94100
e ->
95101
e.type(
96102
"org.acme.chatbot.reply"))))))
97-
.emit(
98-
emit ->
99-
emit.when(
100-
message ->
101-
message
102-
.getOrDefault("userInput", "")
103-
.toString()
104-
.isEmpty(),
105-
Map.class)
106-
.event(e -> e.type("org.acme.chatbot.finished"))))
103+
.emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished"))))
107104
.build();
108105

109106
try (WorkflowApplication app = WorkflowApplication.builder().build()) {
@@ -132,15 +129,16 @@ void chat_bot() {
132129
// The workflow is just waiting for the event
133130
assertEquals(WorkflowStatus.WAITING, waitingInstance.status());
134131

135-
// Publish the event
132+
// Publish the events
136133
app.eventPublisher().publish(newMessageEvent("Hello World!"));
137134
CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS);
138135
assertNotNull(reply);
139136

140137
// Empty message completes the workflow
141-
app.eventPublisher().publish(newMessageEvent(""));
138+
app.eventPublisher().publish(newMessageEvent("", "org.acme.chatbot.finalize"));
142139
CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS);
143140
assertNotNull(finished);
141+
assertThat(finishedEvents).isEmpty();
144142

145143
assertThat(runningModel).isCompleted();
146144
assertEquals(WorkflowStatus.COMPLETED, waitingInstance.status());
@@ -223,9 +221,17 @@ void mixed_workflow() {
223221
}
224222

225223
private CloudEvent newMessageEvent(String message) {
224+
return newMessageEvent(message, null);
225+
}
226+
227+
private CloudEvent newMessageEvent(String message, String type) {
228+
if (type == null || type.isEmpty()) {
229+
type = "org.acme.chatbot.request";
230+
}
231+
226232
return new CloudEventBuilder()
227233
.withData(String.format("{\"userInput\": \"%s\"}", message).getBytes())
228-
.withType("org.acme.chatbot.request")
234+
.withType(type)
229235
.withId(UUID.randomUUID().toString())
230236
.withDataContentType("application/json")
231237
.withSource(URI.create("test://localhost"))
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.fluent.spec;
17+
18+
import io.serverlessworkflow.api.types.AllEventConsumptionStrategy;
19+
import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy;
20+
import io.serverlessworkflow.api.types.OneEventConsumptionStrategy;
21+
import io.serverlessworkflow.api.types.Until;
22+
import io.serverlessworkflow.fluent.spec.spi.EventConsumptionStrategyFluent;
23+
import java.io.Serializable;
24+
import java.util.List;
25+
import java.util.function.Consumer;
26+
27+
public abstract class AbstractEventConsumptionStrategyBuilder<
28+
SELF extends EventConsumptionStrategyFluent<SELF, T>, T extends Serializable>
29+
implements EventConsumptionStrategyFluent<SELF, T> {
30+
31+
protected boolean oneSet, allSet, anySet;
32+
private Until until;
33+
34+
AbstractEventConsumptionStrategyBuilder() {}
35+
36+
@SuppressWarnings("unchecked")
37+
private SELF self() {
38+
return (SELF) this;
39+
}
40+
41+
public SELF one(Consumer<EventFilterBuilder> c) {
42+
ensureNoneSet();
43+
oneSet = true;
44+
EventFilterBuilder fb = new EventFilterBuilder();
45+
c.accept(fb);
46+
OneEventConsumptionStrategy strat = new OneEventConsumptionStrategy();
47+
strat.setOne(fb.build());
48+
this.setOne(strat);
49+
return this.self();
50+
}
51+
52+
abstract void setOne(OneEventConsumptionStrategy strategy);
53+
54+
public SELF all(Consumer<EventFilterBuilder> c) {
55+
ensureNoneSet();
56+
allSet = true;
57+
EventFilterBuilder fb = new EventFilterBuilder();
58+
c.accept(fb);
59+
AllEventConsumptionStrategy strat = new AllEventConsumptionStrategy();
60+
strat.setAll(List.of(fb.build()));
61+
this.setAll(strat);
62+
return this.self();
63+
}
64+
65+
abstract void setAll(AllEventConsumptionStrategy strategy);
66+
67+
public SELF any(Consumer<EventFilterBuilder> c) {
68+
ensureNoneSet();
69+
anySet = true;
70+
EventFilterBuilder fb = new EventFilterBuilder();
71+
c.accept(fb);
72+
AnyEventConsumptionStrategy strat = new AnyEventConsumptionStrategy();
73+
strat.setAny(List.of(fb.build()));
74+
this.setAny(strat);
75+
return this.self();
76+
}
77+
78+
abstract void setAny(AnyEventConsumptionStrategy strategy);
79+
80+
public SELF until(Consumer<EventConsumptionStrategyBuilder> c) {
81+
final EventConsumptionStrategyBuilder eventConsumptionStrategyBuilder =
82+
new EventConsumptionStrategyBuilder();
83+
c.accept(eventConsumptionStrategyBuilder);
84+
this.until = new Until().withAnyEventUntilConsumed(eventConsumptionStrategyBuilder.build());
85+
return this.self();
86+
}
87+
88+
public SELF until(String expression) {
89+
this.until = new Until().withAnyEventUntilCondition(expression);
90+
return this.self();
91+
}
92+
93+
private void ensureNoneSet() {
94+
if (oneSet || allSet || anySet) {
95+
throw new IllegalStateException("Only one consumption strategy can be configured");
96+
}
97+
}
98+
99+
public final T build() {
100+
if (!(oneSet || allSet || anySet)) {
101+
throw new IllegalStateException(
102+
"A consumption strategy (one, all, or any) must be configured");
103+
}
104+
105+
if (anySet) {
106+
this.setUntil(until);
107+
}
108+
return this.getEventConsumptionStrategy();
109+
}
110+
111+
abstract T getEventConsumptionStrategy();
112+
113+
abstract void setUntil(Until until);
114+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.fluent.spec;
17+
18+
import io.serverlessworkflow.api.types.AllEventConsumptionStrategy;
19+
import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy;
20+
import io.serverlessworkflow.api.types.EventConsumptionStrategy;
21+
import io.serverlessworkflow.api.types.OneEventConsumptionStrategy;
22+
import io.serverlessworkflow.api.types.Until;
23+
24+
public class EventConsumptionStrategyBuilder
25+
extends AbstractEventConsumptionStrategyBuilder<
26+
EventConsumptionStrategyBuilder, EventConsumptionStrategy> {
27+
28+
private final EventConsumptionStrategy eventConsumptionStrategy = new EventConsumptionStrategy();
29+
30+
EventConsumptionStrategyBuilder() {}
31+
32+
@Override
33+
void setOne(OneEventConsumptionStrategy strategy) {
34+
eventConsumptionStrategy.setOneEventConsumptionStrategy(strategy);
35+
}
36+
37+
@Override
38+
void setAll(AllEventConsumptionStrategy strategy) {
39+
eventConsumptionStrategy.setAllEventConsumptionStrategy(strategy);
40+
}
41+
42+
@Override
43+
void setAny(AnyEventConsumptionStrategy strategy) {
44+
eventConsumptionStrategy.setAnyEventConsumptionStrategy(strategy);
45+
}
46+
47+
@Override
48+
EventConsumptionStrategy getEventConsumptionStrategy() {
49+
return this.eventConsumptionStrategy;
50+
}
51+
52+
@Override
53+
void setUntil(Until until) {
54+
this.eventConsumptionStrategy.getAnyEventConsumptionStrategy().setUntil(until);
55+
}
56+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.fluent.spec;
17+
18+
import io.serverlessworkflow.api.types.EventFilter;
19+
import io.serverlessworkflow.api.types.EventFilterCorrelate;
20+
import java.util.function.Consumer;
21+
22+
/** Builder for event filters used in consumption strategies. */
23+
public final class EventFilterBuilder {
24+
private final EventFilter filter = new EventFilter();
25+
private final EventFilterCorrelate correlate = new EventFilterCorrelate();
26+
27+
/** Predicate to match event properties. */
28+
public EventFilterBuilder with(Consumer<EventPropertiesBuilder> c) {
29+
EventPropertiesBuilder pb = new EventPropertiesBuilder();
30+
c.accept(pb);
31+
filter.setWith(pb.build());
32+
return this;
33+
}
34+
35+
/** Correlation property for the filter. */
36+
public EventFilterBuilder correlate(
37+
String key, Consumer<ListenTaskBuilder.CorrelatePropertyBuilder> c) {
38+
ListenTaskBuilder.CorrelatePropertyBuilder cpb =
39+
new ListenTaskBuilder.CorrelatePropertyBuilder();
40+
c.accept(cpb);
41+
correlate.withAdditionalProperty(key, cpb.build());
42+
return this;
43+
}
44+
45+
public EventFilter build() {
46+
filter.setCorrelate(correlate);
47+
return filter;
48+
}
49+
}

0 commit comments

Comments
 (0)