Skip to content

New integration tests #703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion experimental/agentic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<version>8.0.0-SNAPSHOT</version>
</parent>
<artifactId>serverlessworkflow-experimental-agentic</artifactId>
<name>ServelessWorkflow:: Experimental:: Agentic</name>
<name>Serverless Workflow :: Experimental :: Agentic</name>
<dependencies>
<dependency>
<groupId>io.serverlessworkflow</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,33 @@

class AgenticModel extends JavaModel {

AgenticModel(AgenticScope agenticScope) {
super(agenticScope);
private final AgenticScope agenticScope;

AgenticModel(AgenticScope agenticScope, Object object) {
super(object);
this.agenticScope = agenticScope;
}

@Override
public void setObject(Object obj) {
super.setObject(obj);
public AgenticScope getAgenticScope() {
return agenticScope;
}

@Override
public Collection<WorkflowModel> asCollection() {
throw new UnsupportedOperationException("Not supported yet.");
throw new UnsupportedOperationException("asCollection() is not supported yet.");
}

@Override
public Optional<Map<String, Object>> asMap() {
return Optional.of(((AgenticScope) object).state());
return Optional.of(this.agenticScope.state());
}

@Override
public <T> Optional<T> as(Class<T> clazz) {
if (AgenticScope.class.isAssignableFrom(clazz)) {
return Optional.of(clazz.cast(object));
return Optional.of(clazz.cast(this.agenticScope));
} else if (Map.class.isAssignableFrom(clazz)) {
return Optional.of(clazz.cast(((AgenticScope) object).state()));
return asMap().map(clazz::cast);
} else {
return super.as(clazz);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,37 @@
import dev.langchain4j.agentic.scope.ResultWithAgenticScope;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.expressions.func.JavaModelCollection;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

class AgenticModelCollection extends JavaModelCollection {
public class AgenticModelCollection extends JavaModelCollection {

private final AgenticScope agenticScope;
private final AgenticScopeCloudEventsHandler ceHandler;

AgenticModelCollection(Collection<?> object, AgenticScope agenticScope) {
super(object);
this.agenticScope = agenticScope;
}

AgenticModelCollection(AgenticScope agenticScope) {
AgenticModelCollection(AgenticScope agenticScope, AgenticScopeCloudEventsHandler ceHandler) {
super(Collections.emptyList());
this.agenticScope = agenticScope;
this.ceHandler = ceHandler;
}

@Override
protected WorkflowModel nextItem(Object obj) {
return new AgenticModel((AgenticScope) obj);
public boolean add(WorkflowModel e) {
Optional<Map<String, Object>> asMap = e.asMap();
if (asMap.isPresent() && !asMap.get().isEmpty()) {
this.agenticScope.writeStates(asMap.get());
return super.add(e);
}

// Update the agenticScope with the event body, so agents can use the event data as input
Object value = e.asJavaObject();
if (!ceHandler.writeStateIfCloudEvent(this.agenticScope, value)) {
this.agenticScope.writeState(AgenticModelFactory.DEFAULT_AGENTIC_SCOPE_STATE_KEY, value);
}

// add to the collection
return super.add(e);
}

@Override
Expand All @@ -46,6 +58,8 @@ public <T> Optional<T> as(Class<T> clazz) {
return Optional.of(clazz.cast(agenticScope));
} else if (ResultWithAgenticScope.class.isAssignableFrom(clazz)) {
return Optional.of(clazz.cast(new ResultWithAgenticScope<>(agenticScope, object)));
} else if (Map.class.isAssignableFrom(clazz)) {
return Optional.of(clazz.cast(agenticScope.state()));
} else {
return super.as(clazz);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,40 @@
import io.serverlessworkflow.impl.WorkflowModelCollection;
import io.serverlessworkflow.impl.WorkflowModelFactory;
import io.serverlessworkflow.impl.expressions.agentic.langchain4j.AgenticScopeRegistryAssessor;
import io.serverlessworkflow.impl.expressions.func.JavaModel;
import java.time.OffsetDateTime;
import java.util.Map;

class AgenticModelFactory implements WorkflowModelFactory {

/**
* Applies any change to the model after running as task. We will always set it to a @AgenticScope
* object since @AgentExecutor is always adding the output to the agenticScope. We just have to
* make sure that agenticScope is always passed to the next input task.
*
* @param prev the global AgenticScope object getting updated by the workflow context
* @param obj the same AgenticScope object updated by the AgentExecutor
* @return the workflow context model holding the agenticScope object.
*/
static final String DEFAULT_AGENTIC_SCOPE_STATE_KEY = "input";
private final AgenticScopeRegistryAssessor scopeRegistryAssessor =
new AgenticScopeRegistryAssessor();
private final AgenticScopeCloudEventsHandler scopeCloudEventsHandler =
new AgenticScopeCloudEventsHandler();

@SuppressWarnings("unchecked")
private AgenticModel newAgenticModel(Object state) {
if (state == null) {
return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), null);
}

if (state instanceof Map) {
this.scopeRegistryAssessor.writeStates((Map<String, Object>) state);
} else {
this.scopeRegistryAssessor.writeState(DEFAULT_AGENTIC_SCOPE_STATE_KEY, state);
}

return new AgenticModel(this.scopeRegistryAssessor.getAgenticScope(), state);
}

@Override
public WorkflowModel fromAny(WorkflowModel prev, Object obj) {
// We ignore `obj` since it's already included in `prev` within the agenticScope instance
return prev;
// TODO: we shouldn't update the state if the previous task was an agent call since under the
// hood, the agent already updated it.
if (prev instanceof AgenticModel agenticModel) {
this.scopeRegistryAssessor.setAgenticScope(agenticModel.getAgenticScope());
}
return newAgenticModel(obj);
}

@Override
Expand All @@ -53,58 +68,55 @@ public WorkflowModel combine(Map<String, WorkflowModel> workflowVariables) {

@Override
public WorkflowModelCollection createCollection() {
throw new UnsupportedOperationException();
return new AgenticModelCollection(
this.scopeRegistryAssessor.getAgenticScope(), scopeCloudEventsHandler);
}

// TODO: all these methods can use agenticScope as long as we have access to the `outputName`

@Override
public WorkflowModel from(boolean value) {
return new JavaModel(value);
return newAgenticModel(value);
}

@Override
public WorkflowModel from(Number value) {
return new JavaModel(value);
return newAgenticModel(value);
}

@Override
public WorkflowModel from(String value) {
return new JavaModel(value);
return newAgenticModel(value);
}

@Override
public WorkflowModel from(CloudEvent ce) {
return new JavaModel(ce);
return from(scopeCloudEventsHandler.extractDataAsMap(ce));
}

@Override
public WorkflowModel from(CloudEventData ce) {
return new JavaModel(ce);
return from(scopeCloudEventsHandler.extractDataAsMap(ce));
}

@Override
public WorkflowModel from(OffsetDateTime value) {
return new JavaModel(value);
return newAgenticModel(value);
}

@Override
public WorkflowModel from(Map<String, Object> map) {
final AgenticScope agenticScope = new AgenticScopeRegistryAssessor().getAgenticScope();
agenticScope.writeStates(map);
return new AgenticModel(agenticScope);
return newAgenticModel(map);
}

@Override
public WorkflowModel fromNull() {
return new JavaModel(null);
return newAgenticModel(null);
}

@Override
public WorkflowModel fromOther(Object value) {
if (value instanceof AgenticScope) {
return new AgenticModel((AgenticScope) value);
if (value instanceof AgenticScope scope) {
return new AgenticModel(scope, scope.state());
}
return new JavaModel(value);
return newAgenticModel(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.expressions.agentic;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import dev.langchain4j.agentic.scope.AgenticScope;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import java.io.IOException;
import java.util.Map;

public final class AgenticScopeCloudEventsHandler {

private final ObjectMapper mapper = new ObjectMapper();

AgenticScopeCloudEventsHandler() {}

public void writeState(final AgenticScope scope, final CloudEvent cloudEvent) {
if (cloudEvent != null) {
writeState(scope, cloudEvent.getData());
}
}

public void writeState(final AgenticScope scope, final CloudEventData cloudEvent) {
scope.writeStates(extractDataAsMap(cloudEvent));
}

public boolean writeStateIfCloudEvent(final AgenticScope scope, final Object value) {
if (value instanceof CloudEvent) {
writeState(scope, (CloudEvent) value);
return true;
} else if (value instanceof CloudEventData) {
writeState(scope, (CloudEventData) value);
return true;
}
return false;
}

public Map<String, Object> extractDataAsMap(final CloudEventData ce) {
try {
if (ce != null) {
return mapper.readValue(ce.toBytes(), new TypeReference<>() {});
}
} catch (IOException e) {
throw new IllegalArgumentException("Unable to parse CloudEvent data as JSON", e);
}
return Map.of();
}

public Map<String, Object> extractDataAsMap(final CloudEvent ce) {
if (ce != null) {
return extractDataAsMap(ce.getData());
}
return Map.of();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
package io.serverlessworkflow.impl.expressions.agentic.langchain4j;

import dev.langchain4j.agentic.internal.AgenticScopeOwner;
import dev.langchain4j.agentic.scope.AgenticScope;
import dev.langchain4j.agentic.scope.AgenticScopeRegistry;
import dev.langchain4j.agentic.scope.DefaultAgenticScope;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -27,7 +29,7 @@ public class AgenticScopeRegistryAssessor implements AgenticScopeOwner {
private final AtomicReference<AgenticScopeRegistry> agenticScopeRegistry =
new AtomicReference<>();
private final String agentId;
private DefaultAgenticScope agenticScope;
private AgenticScope agenticScope;
private Object memoryId;

public AgenticScopeRegistryAssessor(String agentId) {
Expand All @@ -44,7 +46,7 @@ public void setMemoryId(Object memoryId) {
this.memoryId = memoryId;
}

public DefaultAgenticScope getAgenticScope() {
public AgenticScope getAgenticScope() {
if (agenticScope != null) {
return agenticScope;
}
Expand All @@ -57,9 +59,21 @@ public DefaultAgenticScope getAgenticScope() {
return this.agenticScope;
}

public void setAgenticScope(AgenticScope agenticScope) {
this.agenticScope = Objects.requireNonNull(agenticScope, "AgenticScope cannot be null");
}

public void writeState(String key, Object value) {
this.getAgenticScope().writeState(key, value);
}

public void writeStates(Map<String, Object> states) {
this.getAgenticScope().writeStates(states);
}

@Override
public AgenticScopeOwner withAgenticScope(DefaultAgenticScope agenticScope) {
this.agenticScope = agenticScope;
this.setAgenticScope(agenticScope);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public Optional<Number> asNumber() {

@Override
public Optional<Map<String, Object>> asMap() {

return object instanceof Map ? Optional.of((Map<String, Object>) object) : Optional.empty();
}

Expand Down
Loading