Skip to content

feat(multiagent): Add __call__ implementation to MultiAgentBase #645

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

mkmeral
Copy link
Contributor

@mkmeral mkmeral commented Aug 10, 2025

Description

This PR adds a concrete implementation of the __call__ method to the MultiAgentBase abstract class, enabling synchronous access to async multi-agent workflows. Previously, users had to manually handle async execution when working with custom nodes that extend MultiAgentBase. Now, they can use the familiar synchronous call syntax node(task) alongside the existing async await node.invoke_async(task).

Key Changes:

  • Added __call__ method to MultiAgentBase that delegates to invoke_async() using ThreadPoolExecutor
  • Added missing imports (asyncio, ThreadPoolExecutor) to support the implementation
  • Properly passes kwargs from synchronous calls to the underlying async implementation
  • Maintains full backward compatibility - existing code continues to work unchanged

Benefits:

  • Simplified API: Users can now call custom nodes synchronously without async/await boilerplate
  • Consistency: Matches the pattern used by other Strands components (Agent, Graph, Swarm)
  • Documentation Alignment: The existing documentation examples now work out-of-the-box
  • Flexibility: Supports both sync and async usage patterns based on user preference

Related Issues

Documentation PR

Type of Change

New feature

Testing

How have you tested the change?

  • Unit Tests: Added comprehensive tests in test_base.py covering:

    • Abstract class behavior with the new __call__ method
    • Proper delegation to invoke_async with kwargs passing
    • Error handling and status propagation
  • Documentation Example Tests: Created test_custom_node_documentation.py with:

    • Tests that verify the exact FunctionNode example from documentation works
    • Both async and sync execution paths
    • Error handling scenarios
    • Kwargs parameter passing
  • Integration Example: Created a complete working example (example_mixed_graph.py) demonstrating:

    • Mixed graphs with both AI agents and function nodes
    • Real-world workflow with validation, research, analysis, and formatting
    • Side-by-side async vs sync execution comparison

Example Usage

The following example demonstrates the new functionality:

from strands.multiagent.base import MultiAgentBase, MultiAgentResult, NodeResult, Status
from strands.agent.agent_result import AgentResult

class FunctionNode(MultiAgentBase):
    """Execute deterministic Python functions as graph nodes."""

    def __init__(self, func, name: str = None):
        super().__init__()
        self.func = func
        self.name = name or func.__name__

    async def invoke_async(self, task, **kwargs):
        # Execute function and create AgentResult
        result = self.func(task if isinstance(task, str) else str(task))
        
        agent_result = AgentResult(
            stop_reason="end_turn",
            message={"role": "assistant", "content": [{"text": str(result)}]},
            state={},
            metrics={},
        )
        
        return MultiAgentResult(
            status=Status.COMPLETED,
            results={self.name: NodeResult(result=agent_result, status=Status.COMPLETED)},
        )

def validate_data(data):
    if not data.strip():
        raise ValueError("Empty input")
    return f"✅ Validated: {data[:50]}..."

# Create custom node
validator = FunctionNode(func=validate_data, name="validator")

# Both of these now work:
result_async = await validator.invoke_async("test data")  # Existing async way
result_sync = validator("test data")                      # New sync way!

Complete Mixed Graph Example

The PR includes a comprehensive example (example_mixed_graph.py) showing:

def create_mixed_graph():
    """Create a graph mixing AI agents and function nodes."""
    
    # AI agents for creative/analytical tasks
    researcher = Agent(name="researcher", system_prompt="You are a research specialist...")
    analyzer = Agent(name="analyzer", system_prompt="You are an analysis expert...")
    
    # Function nodes for deterministic processing
    validator = FunctionNode(func=validate_content, name="validator")
    keyword_extractor = FunctionNode(func=extract_keywords, name="keyword_extractor")
    formatter = FunctionNode(func=format_output, name="formatter")
    
    # Build the graph with mixed node types
    builder = GraphBuilder()
    builder.add_node(validator, "validator")
    builder.add_node(researcher, "researcher")
    builder.add_node(keyword_extractor, "keywords")
    builder.add_node(analyzer, "analyzer")
    builder.add_node(formatter, "formatter")
    
    # Define workflow: validation → parallel research & keywords → analysis → formatting
    builder.add_edge("validator", "researcher")
    builder.add_edge("validator", "keywords")
    builder.add_edge("researcher", "analyzer")
    builder.add_edge("keywords", "analyzer")
    builder.add_edge("analyzer", "formatter")
    builder.set_entry_point("validator")
    
    return builder.build()

# Both execution modes now work seamlessly:
graph = create_mixed_graph()

# Async execution
result_async = await graph.invoke_async(task)

# Sync execution (enabled by this PR)
result_sync = graph(task)  # Uses __call__ method internally

Verification Steps

Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare
  • All existing tests pass (91/91 passing)
  • New tests added and passing (4 additional tests)
  • Documentation example verified to work correctly

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@mkmeral mkmeral enabled auto-merge (squash) August 12, 2025 16:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant