You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is your feature request related to a problem? Please describe.
The current Runner.run and Runner.run_streamed methods in src/agents/run.py execute an agent workflow to completion and return a final RunResult or RunResultStreaming. This design doesn't allow for real-time interruption of an ongoing run or the injection of new information that could alter the agent's subsequent actions. This is a limitation for dynamic use cases like on-call incident response, where an incident might be resolved mid-investigation or new critical findings might emerge.
Describe the solution you'd like
We propose enhancements to the Runner and its associated components to support a more dynamic run lifecycle:
Cancellable/Interruptible Runs:
Need: The Runner.run and Runner.run_streamed methods should return an object (e.g., an ActiveRun or Task-like handle) that exposes a method to signal cancellation (e.g., active_run.cancel()).
How it could work:
The while True loop within Runner.run (and the equivalent in _run_streamed_impl) would need to check an internal flag or an asyncio.Event set by the cancel() method on the returned handle.
Upon detecting a cancellation request, the loop would terminate gracefully, potentially allowing for a final cleanup span and then raising a specific RunCancelledException or returning a RunResult indicating cancellation.
For run_streamed, the _run_impl_task (an asyncio.Task) created in Runner.run_streamed seems like a natural candidate to be cancelled. The RunResultStreaming object could expose a method that cancels this underlying task.
Dynamically Add Information to an Active Run:
Need: A mechanism to inject new information (e.g., new user messages or tool-like observations) into an active run, so the agent can consider it in subsequent turns.
How it could work:
The object returned by run/run_streamed could also expose a method like active_run.add_information(item: TResponseInputItem) or active_run.update_history(items: list[TResponseInputItem]).
Internally, this method would need to append to a queue or a shared list that _run_single_turn (or its streaming equivalent) checks before preparing the input for model.get_response or model.stream_response.
Specifically, the input variable within _run_single_turn or _run_single_turn_streamed (which is currently built from original_input and generated_items or streamed_result.input and streamed_result.new_items) would incorporate this externally added information before the next model invocation.
A new type of RunItem or a way to flag these externally added items in traces might be beneficial for debugging.
Why these features are important:
These features are critical for building agents that can adapt to rapidly changing external environments.
Interrupt Use Case (On-Call Triage):
A PagerDuty alert triggers an investigation run via Runner.run.
If an engineer resolves the incident manually, the external system should call active_run.cancel(). The agent run stops, preventing wasted resources and irrelevant notifications.
Add Information Use Case (On-Call Triage):
While an agent is investigating an incident, an engineer posts a crucial finding (e.g., "Load balancer X is confirmed to be the culprit") in a Slack thread.
The external system monitoring Slack calls active_run.add_information({"role": "user", "content": "New finding from Slack: Load balancer X is confirmed culprit."}).
In the agent's next turn, this new message is included in its input history, allowing it to adjust its investigation strategy.
Current Limitations in src/agents/run.py:
Runner.run is an async def that awaits the full completion of the internal loop.
Runner.run_streamed creates an asyncio.create_task for _run_streamed_impl, which runs in the background. While tasks can be cancelled, there's no exposed, clean way to do this through the RunResultStreaming object.
The input for each turn is constructed based on the initial input and items generated internally by the agent's previous turns. There's no current mechanism to inject external data into this sequence mid-run.
These enhancements would significantly improve the SDK's applicability to real-time, interactive agent workflows.
The text was updated successfully, but these errors were encountered:
Would love to discuss this. My first instinct is that we should explore this architecture:
We store the state of the run in RunState
We expose run_single_step, which takes the previous step, if any, and runs the next step. A step would be similar to what happens now - call model, run tools, etc. It returns a new state that also indicates what it would want to do next.
Is your feature request related to a problem? Please describe.
The current
Runner.run
andRunner.run_streamed
methods insrc/agents/run.py
execute an agent workflow to completion and return a finalRunResult
orRunResultStreaming
. This design doesn't allow for real-time interruption of an ongoing run or the injection of new information that could alter the agent's subsequent actions. This is a limitation for dynamic use cases like on-call incident response, where an incident might be resolved mid-investigation or new critical findings might emerge.Describe the solution you'd like
We propose enhancements to the
Runner
and its associated components to support a more dynamic run lifecycle:Cancellable/Interruptible Runs:
Runner.run
andRunner.run_streamed
methods should return an object (e.g., anActiveRun
orTask
-like handle) that exposes a method to signal cancellation (e.g.,active_run.cancel()
).while True
loop withinRunner.run
(and the equivalent in_run_streamed_impl
) would need to check an internal flag or anasyncio.Event
set by thecancel()
method on the returned handle.RunCancelledException
or returning aRunResult
indicating cancellation.run_streamed
, the_run_impl_task
(anasyncio.Task
) created inRunner.run_streamed
seems like a natural candidate to be cancelled. TheRunResultStreaming
object could expose a method that cancels this underlying task.Dynamically Add Information to an Active Run:
run
/run_streamed
could also expose a method likeactive_run.add_information(item: TResponseInputItem)
oractive_run.update_history(items: list[TResponseInputItem])
._run_single_turn
(or its streaming equivalent) checks before preparing theinput
formodel.get_response
ormodel.stream_response
.input
variable within_run_single_turn
or_run_single_turn_streamed
(which is currently built fromoriginal_input
andgenerated_items
orstreamed_result.input
andstreamed_result.new_items
) would incorporate this externally added information before the next model invocation.RunItem
or a way to flag these externally added items in traces might be beneficial for debugging.Why these features are important:
These features are critical for building agents that can adapt to rapidly changing external environments.
Interrupt Use Case (On-Call Triage):
Runner.run
.active_run.cancel()
. The agent run stops, preventing wasted resources and irrelevant notifications.Add Information Use Case (On-Call Triage):
active_run.add_information({"role": "user", "content": "New finding from Slack: Load balancer X is confirmed culprit."})
.Current Limitations in
src/agents/run.py
:Runner.run
is anasync def
thatawait
s the full completion of the internal loop.Runner.run_streamed
creates anasyncio.create_task
for_run_streamed_impl
, which runs in the background. While tasks can be cancelled, there's no exposed, clean way to do this through theRunResultStreaming
object.These enhancements would significantly improve the SDK's applicability to real-time, interactive agent workflows.
The text was updated successfully, but these errors were encountered: