for message in result["messages"]: message.pretty_print()
Copy
================================ Human Message ================================Hi================================== Ai Message ==================================Hello!
from typing_extensions import Annotateddef add(left, right): """Can also import `add` from the `operator` built-in.""" return left + rightclass State(TypedDict): messages: Annotated[list[AnyMessage], add] extra_field: int
from langgraph.graph import STARTgraph = StateGraph(State).add_node(node).add_edge(START, "node").compile()result = graph.invoke({"messages": [HumanMessage("Hi")]})for message in result["messages"]: message.pretty_print()
Copy
================================ Human Message ================================Hi================================== Ai Message ==================================Hello!
================================ Human Message ================================Hi================================== Ai Message ==================================Hello!
from langgraph.graph import StateGraph, START, ENDfrom typing_extensions import TypedDict# Define the schema for the inputclass InputState(TypedDict): question: str# Define the schema for the outputclass OutputState(TypedDict): answer: str# Define the overall schema, combining both input and outputclass OverallState(InputState, OutputState): pass# Define the node that processes the input and generates an answerdef answer_node(state: InputState): # Example answer and an extra key return {"answer": "bye", "question": state["question"]}# Build the graph with input and output schemas specifiedbuilder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)builder.add_node(answer_node) # Add the answer nodebuilder.add_edge(START, "answer_node") # Define the starting edgebuilder.add_edge("answer_node", END) # Define the ending edgegraph = builder.compile() # Compile the graph# Invoke the graph with an input and print the resultprint(graph.invoke({"question": "hi"}))
from langgraph.graph import StateGraph, START, ENDfrom typing_extensions import TypedDict# The overall state of the graph (this is the public state shared across nodes)class OverallState(TypedDict): a: str# Output from node_1 contains private data that is not part of the overall stateclass Node1Output(TypedDict): private_data: str# The private data is only shared between node_1 and node_2def node_1(state: OverallState) -> Node1Output: output = {"private_data": "set by node_1"} print(f"Entered node `node_1`:\n\tInput: {state}.\n\tReturned: {output}") return output# Node 2 input only requests the private data available after node_1class Node2Input(TypedDict): private_data: strdef node_2(state: Node2Input) -> OverallState: output = {"a": "set by node_2"} print(f"Entered node `node_2`:\n\tInput: {state}.\n\tReturned: {output}") return output# Node 3 only has access to the overall state (no access to private data from node_1)def node_3(state: OverallState) -> OverallState: output = {"a": "set by node_3"} print(f"Entered node `node_3`:\n\tInput: {state}.\n\tReturned: {output}") return output# Connect nodes in a sequence# node_2 accepts private data from node_1, whereas# node_3 does not see the private data.builder = StateGraph(OverallState).add_sequence([node_1, node_2, node_3])builder.add_edge(START, "node_1")graph = builder.compile()# Invoke the graph with the initial stateresponse = graph.invoke( { "a": "set at start", })print()print(f"Output of graph invocation: {response}")
Copy
Entered node `node_1`: Input: {'a': 'set at start'}. Returned: {'private_data': 'set by node_1'}Entered node `node_2`: Input: {'private_data': 'set by node_1'}. Returned: {'a': 'set by node_2'}Entered node `node_3`: Input: {'a': 'set by node_2'}. Returned: {'a': 'set by node_3'}Output of graph invocation: {'a': 'set by node_3'}
from langgraph.graph import StateGraph, START, ENDfrom typing_extensions import TypedDictfrom pydantic import BaseModel# The overall state of the graph (this is the public state shared across nodes)class OverallState(BaseModel): a: strdef node(state: OverallState): return {"a": "goodbye"}# Build the state graphbuilder = StateGraph(OverallState)builder.add_node(node) # node_1 is the first nodebuilder.add_edge(START, "node") # Start the graph with node_1builder.add_edge("node", END) # End the graph after node_1graph = builder.compile()# 使用有效输入测试图一轫graph.invoke({"a": "hello"})
使用无效输入测试图一轫
Copy
try: graph.invoke({"a": 123}) # Should be a stringexcept Exception as e: print("An exception was raised because `a` is an integer rather than a string.") print(e)
Copy
An exception was raised because `a` is an integer rather than a string.1 validation error for OverallStatea Input should be a valid string [type=string_type, input_value=123, input_type=int] For further information visit https://errors.pydantic.dev/2.9/v/string_type
请参见下方了解 Pydantic 模型状态的其乶鄿功能:
串行化行为
使用 Pydantic 模型作为状态模式时,了解串行化形式如何工作是很重要的,特别是当:
传遝 Pydantic 对象作为输入
一个接收图中的输出
使用嵌套的 Pydantic 模型
我们替你看一下这些自行手段。
Copy
from langgraph.graph import StateGraph, START, ENDfrom pydantic import BaseModelclass NestedModel(BaseModel): value: strclass ComplexState(BaseModel): text: str count: int nested: NestedModeldef process_node(state: ComplexState): # Node receives a validated Pydantic object print(f"Input state type: {type(state)}") print(f"Nested type: {type(state.nested)}") # Return a dictionary update return {"text": state.text + " processed", "count": state.count + 1}# Build the graphbuilder = StateGraph(ComplexState)builder.add_node("process", process_node)builder.add_edge(START, "process")builder.add_edge("process", END)graph = builder.compile()# Create a Pydantic instance for inputinput_state = ComplexState(text="hello", count=0, nested=NestedModel(value="test"))print(f"Input object type: {type(input_state)}")# Invoke graph with a Pydantic instanceresult = graph.invoke(input_state)print(f"Output type: {type(result)}")print(f"Output content: {result}")# Convert back to Pydantic model if neededoutput_model = ComplexState(**result)print(f"Converted back to Pydantic: {type(output_model)}")
================================ Human Message ================================hi================================== Ai Message ==================================Ciao! Come posso aiutarti oggi?
Parallel execution of nodes is essential to speed up overall graph operation. LangGraph offers native support for parallel execution of nodes, which can significantly enhance the performance of graph-based workflows. This parallelization is achieved through fan-out and fan-in mechanisms, utilizing both standard edges and conditional_edges. Below are some examples showing how to add create branching dataflows that work for you.
In this example, we fan out from Node A to B and C and then fan in to D. With our state, we specify the reducer add operation. This will combine or accumulate values for the specific key in the State, rather than simply overwriting the existing value. For lists, this means concatenating the new list with the existing list. See the above section on state reducers for more detail on updating state with reducers.
Copy
import operatorfrom typing import Annotated, Anyfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): # The operator.add reducer fn makes this append-only aggregate: Annotated[list, operator.add]def a(state: State): print(f'Adding "A" to {state["aggregate"]}') return {"aggregate": ["A"]}def b(state: State): print(f'Adding "B" to {state["aggregate"]}') return {"aggregate": ["B"]}def c(state: State): print(f'Adding "C" to {state["aggregate"]}') return {"aggregate": ["C"]}def d(state: State): print(f'Adding "D" to {state["aggregate"]}') return {"aggregate": ["D"]}builder = StateGraph(State)builder.add_node(a)builder.add_node(b)builder.add_node(c)builder.add_node(d)builder.add_edge(START, "a")builder.add_edge("a", "b")builder.add_edge("a", "c")builder.add_edge("b", "d")builder.add_edge("c", "d")builder.add_edge("d", END)graph = builder.compile()
Copy
from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))
With the reducer, you can see that the values added in each node are accumulated.
Adding "A" to []Adding "B" to ['A']Adding "C" to ['A']Adding "D" to ['A', 'B', 'C']
In the above example, nodes "b" and "c" are executed concurrently in the same superstep. Because they are in the same step, node "d" executes after both "b" and "c" are finished.Importantly, updates from a parallel superstep may not be ordered consistently. If you need a consistent, predetermined ordering of updates from a parallel superstep, you should write the outputs to a separate field in the state together with a value with which to order them.
Exception handling?
LangGraph executes nodes within supersteps, meaning that while parallel branches are executed in parallel, the entire superstep is transactional. If any of these branches raises an exception, none of the updates are applied to the state (the entire superstep errors).Importantly, when using a checkpointer, results from successful nodes within a superstep are saved, and don’t repeat when resumed.If you have error-prone (perhaps want to handle flakey API calls), LangGraph provides two ways to address this:
You can write regular python code within your node to catch and handle exceptions.
You can set a retry_policy to direct the graph to retry nodes that raise certain types of exceptions. Only failing branches are retried, so you needn’t worry about performing redundant work.
Together, these let you perform parallel execution and fully control exception handling.
Set max concurrency
You can control the maximum number of concurrent tasks by setting max_concurrency in the configuration when invoking the graph.
Deferring node execution is useful when you want to delay the execution of a node until all other pending tasks are completed. This is particularly relevant when branches have different lengths, which is common in workflows like map-reduce flows.The above example showed how to fan-out and fan-in when each path was only one step. But what if one branch had more than one step? Let’s add a node "b_2" in the "b" branch:
Copy
import operatorfrom typing import Annotated, Anyfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): # The operator.add reducer fn makes this append-only aggregate: Annotated[list, operator.add]def a(state: State): print(f'Adding "A" to {state["aggregate"]}') return {"aggregate": ["A"]}def b(state: State): print(f'Adding "B" to {state["aggregate"]}') return {"aggregate": ["B"]}def b_2(state: State): print(f'Adding "B_2" to {state["aggregate"]}') return {"aggregate": ["B_2"]}def c(state: State): print(f'Adding "C" to {state["aggregate"]}') return {"aggregate": ["C"]}def d(state: State): print(f'Adding "D" to {state["aggregate"]}') return {"aggregate": ["D"]}builder = StateGraph(State)builder.add_node(a)builder.add_node(b)builder.add_node(b_2)builder.add_node(c)builder.add_node(d, defer=True)builder.add_edge(START, "a")builder.add_edge("a", "b")builder.add_edge("a", "c")builder.add_edge("b", "b_2")builder.add_edge("b_2", "d")builder.add_edge("c", "d")builder.add_edge("d", END)graph = builder.compile()
Copy
from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))
Copy
graph.invoke({"aggregate": []})
Copy
Adding "A" to []Adding "B" to ['A']Adding "C" to ['A']Adding "B_2" to ['A', 'B', 'C']Adding "D" to ['A', 'B', 'C', 'B_2']
import operatorfrom typing import Annotated, Literal, Sequencefrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): aggregate: Annotated[list, operator.add] # Add a key to the state. We will set this key to determine # how we branch. which: strdef a(state: State): print(f'Adding "A" to {state["aggregate"]}') return {"aggregate": ["A"], "which": "c"}def b(state: State): print(f'Adding "B" to {state["aggregate"]}') return {"aggregate": ["B"]}def c(state: State): print(f'Adding "C" to {state["aggregate"]}') return {"aggregate": ["C"]}builder = StateGraph(State)builder.add_node(a)builder.add_node(b)builder.add_node(c)builder.add_edge(START, "a")builder.add_edge("b", END)builder.add_edge("c", END)def conditional_edge(state: State) -> Literal["b", "c"]: # Fill in arbitrary logic here that uses the state # to determine the next node return state["which"]builder.add_conditional_edges("a", conditional_edge)graph = builder.compile()
Copy
from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))
Copy
result = graph.invoke({"aggregate": []})print(result)
Copy
Adding "A" to []Adding "C" to ['A']{'aggregate': ['A', 'C'], 'which': 'c'}
from langgraph.graph import StateGraph, START, ENDfrom langgraph.types import Sendfrom typing_extensions import TypedDict, Annotatedimport operatorclass OverallState(TypedDict): topic: str subjects: list[str] jokes: Annotated[list[str], operator.add] best_selected_joke: strdef generate_topics(state: OverallState): return {"subjects": ["lions", "elephants", "penguins"]}def generate_joke(state: OverallState): joke_map = { "lions": "Why don't lions like fast food? Because they can't catch it!", "elephants": "Why don't elephants use computers? They're afraid of the mouse!", "penguins": "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice." } return {"jokes": [joke_map[state["subject"]]]}def continue_to_jokes(state: OverallState): return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]def best_joke(state: OverallState): return {"best_selected_joke": "penguins"}builder = StateGraph(OverallState)builder.add_node("generate_topics", generate_topics)builder.add_node("generate_joke", generate_joke)builder.add_node("best_joke", best_joke)builder.add_edge(START, "generate_topics")builder.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])builder.add_edge("generate_joke", "best_joke")builder.add_edge("best_joke", END)graph = builder.compile()
Copy
from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))
Copy
# Call the graph: here we call it to generate a list of jokesfor step in graph.stream({"topic": "animals"}): print(step)
Copy
{'generate_topics': {'subjects': ['lions', 'elephants', 'penguins']}}{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}{'generate_joke': {'jokes': ['Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice.']}}{'best_joke': {'best_selected_joke': 'penguins'}}
當極進一個企箩模幼時,我京要求颛一個鐃潊殢条件为不催执行。這統常可通過新這一更条件边佛策生混竣深設伺条件边來物懐[創]]You can also set the graph recursion limit when invoking or streaming the graph. The recursion limit sets the number of supersteps that the graph is allowed to execute before it raises an error. Read more about the concept of recursion limits here.Let’s consider a simple graph with a loop to better understand how these mechanisms work.
To return the last value of your state instead of receiving a recursion limit
error, see the next section.
When creating a loop, you can include a conditional edge that specifies a termination condition:
Copy
builder = StateGraph(State)builder.add_node(a)builder.add_node(b)def route(state: State) -> Literal["b", END]: if termination_condition(state): return END else: return "b"builder.add_edge(START, "a")builder.add_conditional_edges("a", route)builder.add_edge("b", "a")graph = builder.compile()
To control the recursion limit, specify "recursionLimit" in the config. This will raise a GraphRecursionError, which you can catch and handle:
Copy
from langgraph.errors import GraphRecursionErrortry: graph.invoke(inputs, {"recursion_limit": 3})except GraphRecursionError: print("Recursion Error")
Let’s define a graph with a simple loop. Note that we use a conditional edge to implement a termination condition.
Copy
import operatorfrom typing import Annotated, Literalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): # The operator.add reducer fn makes this append-only aggregate: Annotated[list, operator.add]def a(state: State): print(f'Node A sees {state["aggregate"]}') return {"aggregate": ["A"]}def b(state: State): print(f'Node B sees {state["aggregate"]}') return {"aggregate": ["B"]}# Define nodesbuilder = StateGraph(State)builder.add_node(a)builder.add_node(b)# Define edgesdef route(state: State) -> Literal["b", END]: if len(state["aggregate"]) < 7: return "b" else: return ENDbuilder.add_edge(START, "a")builder.add_conditional_edges("a", route)builder.add_edge("b", "a")graph = builder.compile()
Copy
from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))
This architecture is similar to a ReAct agent in which node "a" is a tool-calling model, and node "b" represents the tools.In our route conditional edge, we specify that we should end after the "aggregate" list in the state passes a threshold length.Invoking the graph, we see that we alternate between nodes "a" and "b" before terminating once we reach the termination condition.
Copy
graph.invoke({"aggregate": []})
Copy
Node A sees []Node B sees ['A']Node A sees ['A', 'B']Node B sees ['A', 'B', 'A']Node A sees ['A', 'B', 'A', 'B']Node B sees ['A', 'B', 'A', 'B', 'A']Node A sees ['A', 'B', 'A', 'B', 'A', 'B']
In some applications, we may not have a guarantee that we will reach a given termination condition. In these cases, we can set the graph’s recursion limit. This will raise a GraphRecursionError after a given number of supersteps. We can then catch and handle this exception:
Node A sees []Node B sees ['A']Node C sees ['A', 'B']Node D sees ['A', 'B']Node A sees ['A', 'B', 'C', 'D']Recursion Error
Extended example: return state on hitting recursion limit
Instead of raising GraphRecursionError, we can introduce a new key to the state that keeps track of the number of steps remaining until reaching the recursion limit. We can then use this key to determine if we should end the run.LangGraph implements a special RemainingSteps annotation. Under the hood, it creates a ManagedValue channel — a state channel that will exist for the duration of our graph run and no longer.
Copy
import operatorfrom typing import Annotated, Literalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.managed.is_last_step import RemainingStepsclass State(TypedDict): aggregate: Annotated[list, operator.add] remaining_steps: RemainingStepsdef a(state: State): print(f'Node A sees {state["aggregate"]}') return {"aggregate": ["A"]}def b(state: State): print(f'Node B sees {state["aggregate"]}') return {"aggregate": ["B"]}# Define nodesbuilder = StateGraph(State)builder.add_node(a)builder.add_node(b)# Define edgesdef route(state: State) -> Literal["b", END]: if state["remaining_steps"] <= 2: return END else: return "b"builder.add_edge(START, "a")builder.add_conditional_edges("a", route)builder.add_edge("b", "a")graph = builder.compile()# Test it outresult = graph.invoke({"aggregate": []}, {"recursion_limit": 4})print(result)
Copy
Node A sees []Node B sees ['A']Node A sees ['A', 'B']{'aggregate': ['A', 'B', 'A']}
Extended example: loops with branches
To better understand how the recursion limit works, let’s consider a more complex example. Below we implement a loop, but one step fans out into two nodes:
Copy
import operatorfrom typing import Annotated, Literalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): aggregate: Annotated[list, operator.add]def a(state: State): print(f'Node A sees {state["aggregate"]}') return {"aggregate": ["A"]}def b(state: State): print(f'Node B sees {state["aggregate"]}') return {"aggregate": ["B"]}def c(state: State): print(f'Node C sees {state["aggregate"]}') return {"aggregate": ["C"]}def d(state: State): print(f'Node D sees {state["aggregate"]}') return {"aggregate": ["D"]}# Define nodesbuilder = StateGraph(State)builder.add_node(a)builder.add_node(b)builder.add_node(c)builder.add_node(d)# Define edgesdef route(state: State) -> Literal["b", END]: if len(state["aggregate"]) < 7: return "b" else: return ENDbuilder.add_edge(START, "a")builder.add_conditional_edges("a", route)builder.add_edge("b", "c")builder.add_edge("b", "d")builder.add_edge(["c", "d"], "a")graph = builder.compile()
Copy
from IPython.display import Image, displaydisplay(Image(graph.get_graph().draw_mermaid_png()))
This graph looks complex, but can be conceptualized as loop of supersteps:
Node A
Node B
Nodes C and D
Node A
…
We have a loop of four supersteps, where nodes C and D are executed concurrently.Invoking the graph as before, we see that we complete two full “laps” before hitting the termination condition:
Copy
result = graph.invoke({"aggregate": []})
Copy
Node A sees []Node B sees ['A']Node D sees ['A', 'B']Node C sees ['A', 'B']Node A sees ['A', 'B', 'C', 'D']Node B sees ['A', 'B', 'C', 'D', 'A']Node D sees ['A', 'B', 'C', 'D', 'A', 'B']Node C sees ['A', 'B', 'C', 'D', 'A', 'B']Node A sees ['A', 'B', 'C', 'D', 'A', 'B', 'C', 'D']
However, if we set the recursion limit to four, we only complete one lap because each lap is four supersteps:
Copy
from langgraph.errors import GraphRecursionErrortry: result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})except GraphRecursionError: print("Recursion Error")
Copy
Node A sees []Node B sees ['A']Node C sees ['A', 'B']Node D sees ['A', 'B']Node A sees ['A', 'B', 'C', 'D']Recursion Error
Using the async programming paradigm can produce significant performance improvements when running IO-bound code concurrently (e.g., making concurrent API requests to a chat model provider).To convert a sync implementation of the graph to an async implementation, you will need to:
Update nodes use async def instead of def.
Update the code inside to use await appropriately.
Invoke the graph with .ainvoke or .astream as desired.
Because many LangChain objects implement the Runnable Protocol which has async variants of all the sync methods it’s typically fairly quick to upgrade a sync graph to an async graph.See example below. To demonstrate async invocations of underlying LLMs, we will include a chat model:
from langchain.chat_models import init_chat_model# Follow the steps here to configure your credentials:# https://docs.aws.amazon.com/bedrock/latest/userguide/getting-started.htmlmodel = init_chat_model( "anthropic.claude-3-5-sonnet-20240620-v1:0", model_provider="bedrock_converse",)
Combine control flow and state updates with Command
It can be useful to combine control flow (edges) and state updates (nodes). For example, you might want to BOTH perform state updates AND decide which node to go to next in the SAME node. LangGraph provides a way to do so by returning a Command object from node functions:
Copy
def my_node(state: State) -> Command[Literal["my_other_node"]]: return Command( # state update update={"foo": "bar"}, # control flow goto="my_other_node" )
We show an end-to-end example below. Let’s create a simple graph with 3 nodes: A, B and C. We will first execute node A, and then decide whether to go to Node B or Node C next based on the output of node A.
Copy
import randomfrom typing_extensions import TypedDict, Literalfrom langgraph.graph import StateGraph, STARTfrom langgraph.types import Command# Define graph stateclass State(TypedDict): foo: str# Define the nodesdef node_a(state: State) -> Command[Literal["node_b", "node_c"]]: print("Called A") value = random.choice(["b", "c"]) # this is a replacement for a conditional edge function if value == "b": goto = "node_b" else: goto = "node_c" # note how Command allows you to BOTH update the graph state AND route to the next node return Command( # this is the state update update={"foo": value}, # this is a replacement for an edge goto=goto, )def node_b(state: State): print("Called B") return {"foo": state["foo"] + "b"}def node_c(state: State): print("Called C") return {"foo": state["foo"] + "c"}
We can now create the StateGraph with the above nodes. Notice that the graph doesn’t have conditional edges for routing! This is because control flow is defined with Command inside node_a.
Copy
builder = StateGraph(State)builder.add_edge(START, "node_a")builder.add_node(node_a)builder.add_node(node_b)builder.add_node(node_c)# NOTE: there are no edges between nodes A, B and C!graph = builder.compile()
You might have noticed that we used
Command as
a return type annotation, e.g. Command[Literal["node_b", "node_c"]]. This is
necessary for the graph rendering and tells LangGraph that node_a can
navigate to node_b and node_c.
Copy
from IPython.display import display, Imagedisplay(Image(graph.get_graph().draw_mermaid_png()))
If we run the graph multiple times, we’d see it take different paths (A -> B or A -> C) based on the random choice in node A.
If you are using subgraphs, you might want to navigate from a node within a subgraph to a different subgraph (i.e. a different node in the parent graph). To do so, you can specify graph=Command.PARENT in Command:
Copy
def my_node(state: State) -> Command[Literal["my_other_node"]]: return Command( update={"foo": "bar"}, goto="other_subgraph", # where `other_subgraph` is a node in the parent graph graph=Command.PARENT )
Let’s demonstrate this using the above example. We’ll do so by changing nodeA in the above example into a single-node graph that we’ll add as a subgraph to our parent graph.
State updates with Command.PARENT When you send updates from a subgraph
node to a parent graph node for a key that’s shared by both parent and
subgraph state schemas, you must
define a reducer for the key
you’re updating in the parent graph state. See the example below.
Copy
import operatorfrom typing_extensions import Annotatedclass State(TypedDict): # NOTE: we define a reducer here foo: Annotated[str, operator.add]def node_a(state: State): print("Called A") value = random.choice(["a", "b"]) # this is a replacement for a conditional edge function if value == "a": goto = "node_b" else: goto = "node_c" # note how Command allows you to BOTH update the graph state AND route to the next node return Command( update={"foo": value}, goto=goto, # this tells LangGraph to navigate to node_b or node_c in the parent graph # NOTE: this will navigate to the closest parent graph relative to the subgraph graph=Command.PARENT, )subgraph = StateGraph(State).add_node(node_a).add_edge(START, "node_a").compile()def node_b(state: State): print("Called B") # NOTE: since we've defined a reducer, we don't need to manually append # new characters to existing 'foo' value. instead, reducer will append these # automatically (via operator.add) return {"foo": "b"}def node_c(state: State): print("Called C") return {"foo": "c"}builder = StateGraph(State)builder.add_edge(START, "subgraph")builder.add_node("subgraph", subgraph)builder.add_node(node_b)builder.add_node(node_c)graph = builder.compile()
A common use case is updating graph state from inside a tool. For example, in a customer support application you might want to look up customer information based on their account number or ID in the beginning of the conversation. To update the graph state from the tool, you can return Command(update={"my_custom_key": "foo", "messages": [...]}) from the tool:
Copy
@tooldef lookup_user_info(tool_call_id: Annotated[str, InjectedToolCallId], config: RunnableConfig): """Use this to look up user information to better assist them with their questions.""" user_info = get_user_info(config.get("configurable", {}).get("user_id")) return Command( update={ # update the state keys "user_info": user_info, # update the message history "messages": [ToolMessage("Successfully looked up user information", tool_call_id=tool_call_id)] } )
You MUST include messages (or any state key used for the message history) in
Command.update when returning
Command
from a tool and the list of messages in messages MUST contain a
ToolMessage. This is necessary for the resulting message history to be valid
(LLM providers require AI messages with tool calls to be followed by the tool
result messages).
If you are using tools that update state via Command, we recommend using prebuilt ToolNode which automatically handles tools returning Command objects and propagates them to the graph state. If you’re writing a custom node that calls tools, you would need to manually propagate Command objects returned by the tools as the update from the node.
Here we demonstrate how to visualize the graphs you create.You can visualize any arbitrary Graph, including StateGraph.Let’s have some fun by drawing fractals :).
Copy
import randomfrom typing import Annotated, Literalfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langgraph.graph.message import add_messagesclass State(TypedDict): messages: Annotated[list, add_messages]class MyNode: def __init__(self, name: str): self.name = name def __call__(self, state: State): return {"messages": [("assistant", f"Called node {self.name}")]}def route(state) -> Literal["entry_node", END]: if len(state["messages"]) > 10: return END return "entry_node"def add_fractal_nodes(builder, current_node, level, max_level): if level > max_level: return # Number of nodes to create at this level num_nodes = random.randint(1, 3) # Adjust randomness as needed for i in range(num_nodes): nm = ["A", "B", "C"][i] node_name = f"node_{current_node}_{nm}" builder.add_node(node_name, MyNode(node_name)) builder.add_edge(current_node, node_name) # Recursively add more nodes r = random.random() if r > 0.2 and level + 1 < max_level: add_fractal_nodes(builder, node_name, level + 1, max_level) elif r > 0.05: builder.add_conditional_edges(node_name, route, node_name) else: # End builder.add_edge(node_name, END)def build_fractal_graph(max_level: int): builder = StateGraph(State) entry_point = "entry_node" builder.add_node(entry_point, MyNode(entry_point)) builder.add_edge(START, entry_point) add_fractal_nodes(builder, entry_point, 1, max_level) # Optional: set a finish point if required builder.add_edge(entry_point, END) # or any specific node return builder.compile()app = build_fractal_graph(3)
If preferred, we could render the Graph into a .png. Here we could use three options:
Using Mermaid.ink API (does not require additional packages)
Using Mermaid + Pyppeteer (requires pip install pyppeteer)
Using graphviz (which requires pip install graphviz)
Using Mermaid.InkBy default, draw_mermaid_png() uses Mermaid.Ink’s API to generate the diagram.
Copy
from IPython.display import Image, displayfrom langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStylesdisplay(Image(app.get_graph().draw_mermaid_png()))
Using Mermaid + Pyppeteer
Copy
import nest_asyncionest_asyncio.apply() # Required for Jupyter Notebook to run async functionsdisplay( Image( app.get_graph().draw_mermaid_png( curve_style=CurveStyle.LINEAR, node_colors=NodeStyles(first="#ffdfba", last="#baffc9", default="#fad7de"), wrap_label_n_words=9, output_file_path=None, draw_method=MermaidDrawMethod.PYPPETEER, background_color="white", padding=10, ) ))
Using Graphviz
Copy
try: display(Image(app.get_graph().draw_png()))except ImportError: print( "You likely need to install dependencies for pygraphviz, see more here https://github.com/pygraphviz/pygraphviz/blob/main/INSTALL.txt" )