How To Stream Runnables - ? ? LangChain
How To Stream Runnables - ? ? LangChain
How To Stream Runnables - ? ? LangChain
How to stream
runnables
PREREQUISITES
This guide assumes familiarity with the following concepts:
Chat models
LangChain Expression Language
Output parsers
Let's take a look at both approaches, and try to understand how to use
them.
INFO
For a higher-level overview of streaming techniques in LangChain,
see this section of the conceptual guide.
Using Stream
All Runnable objects implement a sync method called stream and an
async variant called astream .
These methods are designed to stream the final output in chunks, yielding
each chunk as soon as it is available.
Streaming is only possible if all steps in the program know how to process
an input stream; i.e., process an input chunk one at a time, and yield a
corresponding output chunk.
The best place to start exploring streaming is with the single most
important components in LLMs apps-- the LLMs themselves!
The key strategy to make the application feel more responsive is to show
intermediate progress; viz., to stream the output from the model token by
token.
We will show examples of streaming using a chat model. Choose one from
the options below:
import getpass
import os
os.environ["GROQ_API_KEY"] = getpass.getpass()
model = ChatGroq(model="llama3-8b-8192")
chunks = []
for chunk in model.stream("what color is the sky?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
The| sky| appears| blue| during| the| day|.|
chunks = []
async for chunk in model.astream("what color is the
sky?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
chunks[0]
AIMessageChunk(content='The', id='run-b36bea64-5511-
4d7a-b6a3-a07b3db0c8e7')
Chains
Virtually all LLM applications involve more steps than just a call to a
language model.
( LCEL ) that combines a prompt, model and a parser and verify that
streaming works.
We will use StrOutputParser to parse the output from the model. This is
a simple parser that extracts the content field from an
AIMessageChunk , giving us the token returned by the model.
TIP
LCEL is a declarative way to specify a "program" by chainining
together different LangChain primitives. Chains created using LCEL
benefit from an automatic implementation of stream and astream
allowing streaming of the final output. In fact, chains created with
LCEL implement the entire standard Runnable interface.
prompt = ChatPromptTemplate.from_template("tell me a
joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser
Note that we're getting streaming output even though we're using
parser at the end of the chain above. The parser operates on each
NOTE
The LangChain Expression language allows you to separate the
construction of a chain from the mode in which it is used (e.g.,
sync/async, batch/streaming etc.). If this is not relevant to what
you're building, you can also rely on a standard imperative
programming approach by caling invoke , batch or stream on
each component individually, assigning the results to variables and
then using them downstream as you see fit.
If you were to rely on json.loads to parse the partial json, the parsing
would fail as the partial json wouldn't be valid json.
chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain,
JsonOutputParser did not stream results from some
models
async for text in chain.astream(
"output a list of the countries france, spain and
japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which
contains a list of countries. '
"Each country should have the key `name` and
`population`"
):
print(text, flush=True)
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 67413}]}
{'countries': [{'name': 'France', 'population':
67413000}]}
{'countries': [{'name': 'France', 'population':
67413000}, {}]}
{'countries': [{'name': 'France', 'population':
67413000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population':
67413000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population':
67413000}, {'name': 'Spain', 'population': 47}]}
{'countries': [{'name': 'France', 'population':
67413000}, {'name': 'Spain', 'population': 47351}]}
{'countries': [{'name': 'France', 'population':
67413000}, {'name': 'Spain', 'population': 47351567}]}
{'countries': [{'name': 'France', 'population':
67413000}, {'name': 'Spain', 'population': 47351567},
{}]}
{'countries': [{'name': 'France', 'population':
67413000}, {'name': 'Spain', 'population': 47351567},
{'name': ''}]}
{'countries': [{'name': 'France', 'population':
67413000}, {'name': 'Spain', 'population': 47351567},
{'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population':
67413000}, {'name': 'Spain', 'population': 47351567},
{'name': 'Japan', 'population': 125}]}
{'countries': [{'name': 'France', 'population':
67413000}, {'name': 'Spain', 'population': 47351567},
{'name': 'Japan', 'population': 125584}]}
{'countries': [{'name': 'France', 'population':
67413000}, {'name': 'Spain', 'population': 47351567},
{'name': 'Japan', 'population': 125584000}]}
Now, let's break streaming. We'll use the previous example and append an
extraction function at the end that extracts the country names from the
finalized JSON.
WARNING
Any steps in the chain that operate on finalized inputs rather than on
input streams can break streaming functionality via stream or
astream .
TIP
Later, we will discuss the astream_events API which streams
results from intermediate steps. This API will stream results from
intermediate steps even if the chain contains steps that only operate
on finalized inputs.
countries = inputs["countries"]
country_names = [
country.get("name") for country in countries if
isinstance(country, dict)
]
return country_names
Generator Functions
Let's fix the streaming using a generator function that can operate on the
input stream.
TIP
A generator function (a function that uses yield ) allows writing
code that operates on input streams
async def
_extract_country_names_streaming(input_stream):
"""A function that operates on input streams."""
country_names_so_far = set()
countries = input["countries"]
France|Spain|Japan|
NOTE
Because the code above is relying on JSON auto-completion, you
may see partial names of countries (e.g., Sp and Spain ), which is
not what one would want for an extraction result!
Non-streaming components
Some built-in components like Retrievers do not offer any streaming .
What happens if we try to stream them? 🤨
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import
StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import
RunnablePassthrough
from langchain_openai import OpenAIEmbeddings
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
vectorstore = FAISS.from_texts(
["harrison worked at kensho", "harrison likes spicy
food"],
embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()
🥹
This is OK ! Not all components have to implement streaming -- in some
cases streaming is either unnecessary, difficult or just doesn't make
sense.
TIP
An LCEL chain constructed using non-streaming components, will
still be able to stream in a lot of cases, with streaming of partial
output starting after the last non-streaming step in the chain.
retrieval_chain = (
{
"context":
retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
Now that we've seen how stream and astream work, let's venture into
the world of streaming events. 🏞️
Using Stream Events
Event Streaming is a beta API. This API may change a bit based on
feedback.
NOTE
This guide demonstrates the V2 API and requires langchain-core >=
0.2. For the V1 API compatible with older versions of LangChain, see
here.
import langchain_core
langchain_core.__version__
Use async throughout the code to the extent possible (e.g., async
tools etc)
Propagate callbacks if defining custom functions / runnables
Whenever using runnables without LCEL, make sure to call
.astream() on LLMs rather than .ainvoke to force the LLM to
stream tokens.
Let us know if anything doesn't work as expected! :)
Event Reference
Below is a reference table that shows some events that might be emitted
by the various Runnable objects.
NOTE
When streaming is implemented properly, the inputs to a runnable
will not be known until after the input stream has been entirely
consumed. This means that inputs will often be included only for
end events and rather than for start events.
on_chain_start format_docs
on_chain_end format_docs
on_tool_start some_tool
on_tool_end some_tool
on_prompt_start [template_name]
on_prompt_end [template_name]
Chat Model
Let's start off by looking at the events produced by a chat model.
events = []
async for event in model.astream_events("hello",
version="v2"):
events.append(event)
/home/eugene/src/langchain/libs/core/langchain_core/_api/be
LangChainBetaWarning: This API is in beta and may change in
warn_beta(
NOTE
Hey what's that funny version="v2" parameter in the API?! 😾
This is a beta API, and we're almost certainly going to make some
changes to it (in fact, we already have!)
Let's take a look at the few of the start event and a few of the end events.
events[:3]
[{'event': 'on_chat_model_start',
'data': {'input': 'hello'},
'name': 'ChatAnthropic',
'tags': [],
'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
'metadata': {}},
{'event': 'on_chat_model_stream',
'data': {'chunk': AIMessageChunk(content='Hello',
id='run-a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},
'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {}},
{'event': 'on_chat_model_stream',
'data': {'chunk': AIMessageChunk(content='!', id='run-
a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},
'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {}}]
events[-2:]
[{'event': 'on_chat_model_stream',
'data': {'chunk': AIMessageChunk(content='?', id='run-
a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3')},
'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {}},
{'event': 'on_chat_model_end',
'data': {'output': AIMessageChunk(content='Hello! How
can I assist you today?', id='run-a81e4c0f-fc36-4d33-
93bc-1ac25b9bb2c3')},
'run_id': 'a81e4c0f-fc36-4d33-93bc-1ac25b9bb2c3',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {}}]
Chain
Let's revisit the example chain that parsed streaming JSON to explore the
streaming events API.
chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain,
JsonOutputParser did not stream results from some
models
events = [
event
async for event in chain.astream_events(
"output a list of the countries france, spain
and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries"
which contains a list of countries. '
"Each country should have the key `name` and
`population`",
version="v2",
)
]
If you examine at the first few events, you'll notice that there are 3
different start events rather than 2 start events.
events[:3]
[{'event': 'on_chain_start',
'data': {'input': 'output a list of the countries
france, spain and japan and their populations in JSON
format. Use a dict with an outer key of "countries"
which contains a list of countries. Each country should
have the key `name` and `population`'},
'name': 'RunnableSequence',
'tags': [],
'run_id': '4765006b-16e2-4b1d-a523-edd9fd64cb92',
'metadata': {}},
{'event': 'on_chat_model_start',
'data': {'input': {'messages':
[[HumanMessage(content='output a list of the countries
france, spain and japan and their populations in JSON
format. Use a dict with an outer key of "countries"
which contains a list of countries. Each country should
have the key `name` and `population`')]]}},
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
'metadata': {}},
{'event': 'on_chat_model_stream',
'data': {'chunk': AIMessageChunk(content='{', id='run-
0320c234-7b52-4a14-ae4e-5f100949e589')},
'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'metadata': {}}]
What do you think you'd see if you looked at the last 3 events? what about
the middle?
Let's use this API to take output the stream events from the model and
the parser. We're ignoring start events, end events and events from the
chain.
num_events = 0
Because both the model and the parser support streaming, we see
streaming events from both components in real time! Kind of cool isn't it?
🦜
Filtering Events
Because this API produces so many events, it is useful to be able to filter
on events.
By Name
chain = model.with_config({"run_name": "model"}) |
JsonOutputParser().with_config(
{"run_name": "my_parser"}
)
max_events = 0
async for event in chain.astream_events(
"output a list of the countries france, spain and
japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which
contains a list of countries. '
"Each country should have the key `name` and
`population`",
version="v2",
include_names=["my_parser"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
By Type
chain = model.with_config({"run_name": "model"}) |
JsonOutputParser().with_config(
{"run_name": "my_parser"}
)
max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and
japan and their populations in JSON format. Use a dict
with an outer key of "countries" which contains a list
of countries. Each country should have the key `name`
and `population`',
version="v2",
include_types=["chat_model"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chat_model_start', 'data': {'input':
'output a list of the countries france, spain and japan
and their populations in JSON format. Use a dict with an
outer key of "countries" which contains a list of
countries. Each country should have the key `name` and
`population`'}, 'name': 'model', 'tags': ['seq:step:1'],
'run_id': 'db246792-2a91-4eb3-a14b-29658947065d',
'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk':
AIMessageChunk(content='{', id='run-db246792-2a91-4eb3-
a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-
a14b-29658947065d', 'name': 'model', 'tags':
['seq:step:1'], 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk':
AIMessageChunk(content='\n ', id='run-db246792-2a91-
4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-
4eb3-a14b-29658947065d', 'name': 'model', 'tags':
['seq:step:1'], 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk':
AIMessageChunk(content='"', id='run-db246792-2a91-4eb3-
a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-
a14b-29658947065d', 'name': 'model', 'tags':
['seq:step:1'], 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk':
AIMessageChunk(content='countries', id='run-db246792-
2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-
2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags':
['seq:step:1'], 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk':
AIMessageChunk(content='":', id='run-db246792-2a91-4eb3-
a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-
a14b-29658947065d', 'name': 'model', 'tags':
['seq:step:1'], 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk':
AIMessageChunk(content=' [', id='run-db246792-2a91-4eb3-
a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-
a14b-29658947065d', 'name': 'model', 'tags':
['seq:step:1'], 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk':
AIMessageChunk(content='\n ', id='run-db246792-2a91-
4eb3-a14b-29658947065d')}, 'run_id': 'db246792-2a91-
4eb3-a14b-29658947065d', 'name': 'model', 'tags':
['seq:step:1'], 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk':
AIMessageChunk(content='{', id='run-db246792-2a91-4eb3-
a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-
a14b-29658947065d', 'name': 'model', 'tags':
['seq:step:1'], 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk':
AIMessageChunk(content='\n ', id='run-db246792-
2a91-4eb3-a14b-29658947065d')}, 'run_id': 'db246792-
2a91-4eb3-a14b-29658947065d', 'name': 'model', 'tags':
['seq:step:1'], 'metadata': {}}
{'event': 'on_chat_model_stream', 'data': {'chunk':
AIMessageChunk(content='"', id='run-db246792-2a91-4eb3-
a14b-29658947065d')}, 'run_id': 'db246792-2a91-4eb3-
a14b-29658947065d', 'name': 'model', 'tags':
['seq:step:1'], 'metadata': {}}
...
By Tags
CAUTION
Tags are inherited by child components of a given runnable.
If you're using tags to filter, make sure that this is what you want.
chain = (model |
JsonOutputParser()).with_config({"tags": ["my_chain"]})
max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and
japan and their populations in JSON format. Use a dict
with an outer key of "countries" which contains a list
of countries. Each country should have the key `name`
and `population`',
version="v2",
include_tags=["my_chain"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
Non-streaming components
Remember how some components don't stream well because they don't
operate on input streams?
While such components can break streaming of the final output when
using astream , astream_events will still yield streaming events from
intermediate steps that support streaming!
countries = inputs["countries"]
country_names = [
country.get("name") for country in countries if
isinstance(country, dict)
]
return country_names
chain = (
model | JsonOutputParser() | _extract_country_names
) # This parser only works with OpenAI right now
Now, let's confirm that with astream_events we're still seeing streaming
output from the model and the parser.
num_events = 0
Propagating Callbacks
CAUTION
If you're using invoking runnables inside your tools, you need to
propagate callbacks to the runnable; otherwise, no stream events
will be generated.
NOTE
When using RunnableLambdas or @chain decorator, callbacks are
propagated automatically behind the scenes.
reverse_word = RunnableLambda(reverse_word)
@tool
def bad_tool(word: str):
"""Custom tool that doesn't propagate callbacks."""
return reverse_word.invoke(word)
@tool
def correct_tool(word: str, callbacks):
"""A tool that correctly propagates callbacks."""
return reverse_word.invoke(word, {"callbacks":
callbacks})
reverse_and_double = RunnableLambda(reverse_and_double)
await reverse_and_double.ainvoke("1234")
await reverse_and_double.ainvoke("1234")
Next steps
Now you've learned some ways to stream both final outputs and internal
steps with LangChain.
To learn more, check out the other how-to guides in this section, or the
conceptual guide on Langchain Expression Language.
0 comments
Write Preview
Sign in to comment