LangGraph Deep Dive: State Machines for Multi-Step Agent Workflows
The single most useful change in agent engineering between 2024 and 2026 was admitting that agents are state machines. LangGraph didn’t invent that idea — every production framework converged on it — but it shipped the cleanest API for expressing it, and the ecosystem aligned around the shape. By 2026, LangGraph is in production at Uber, JP Morgan, BlackRock, Cisco, LinkedIn, and Klarna, with roughly 90M monthly downloads of the LangChain ecosystem behind it.
This post is the deep dive: what the graph actually is, what the four primitives do, and how to use them without ending up with the kind of “graph that works only in the demo.”
The mental model
A LangGraph agent is a graph of nodes that read from and write to a shared state object. The runtime walks the graph: at each step it executes a node, the node returns a partial state update, the runtime merges it via reducers, and an edge function decides which node runs next.
The graph is typed (the state is a TypedDict or Pydantic model), persistent (the runtime checkpoints state after each node), and interruptible (a node can pause, persist, and resume later — that’s what enables human-in-the-loop). Nothing about it is hidden inside an executor. You wrote the graph; you can read the graph.
The four primitives
1. State
State is just a typed dict. Each field has a value type and, optionally, a reducer — a function that says how to combine the current value with an update. Reducers are how parallel branches merge without overwriting each other:
from typing import Annotated, TypedDict
from operator import add
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # reducer: append, dedupe
artifacts: Annotated[list[str], add] # reducer: list concat
step_count: int # default: overwrite
add_messages is the canonical reducer for chat history — it appends new messages and replaces ones with the same ID, which is what you want when a node retries a model call. For lists of strings, operator.add concatenates. For counters, default overwrite is fine because only one node writes.
This is the most important thing to get right. Forgetting the reducer on a list field means one branch’s writes silently overwrite the other’s. Most “the agent forgot what it just did” bugs trace back here.
2. Nodes
A node is a function. It takes the current state, returns a partial update:
async def plan(state: AgentState) -> dict:
response = await llm.ainvoke(state["messages"])
return {"messages": [response], "step_count": state["step_count"] + 1}
async def call_tool(state: AgentState) -> dict:
last = state["messages"][-1]
results = []
for call in last.tool_calls:
result = await TOOLS[call["name"]].ainvoke(call["args"])
results.append({"role": "tool", "content": str(result), "tool_call_id": call["id"]})
return {"messages": results}
Two rules. Return a partial update, never the whole state — that’s how reducers know what changed. And make nodes pure with respect to state — side effects (DB writes, external API calls) belong inside tools that are explicitly logged, not snuck into plan/route/summarize nodes where they’re invisible.
3. Edges
Edges decide which node runs next. Two flavors:
- Static edges — always go from
AtoB. Usegraph.add_edge("plan", "call_tool"). - Conditional edges — a function reads state and returns the name of the next node (or
END):
def route(state: AgentState) -> str:
last = state["messages"][-1]
if state["step_count"] > 10:
return "summarize"
if getattr(last, "tool_calls", None):
return "call_tool"
return END
graph.add_conditional_edges("plan", route, ["call_tool", "summarize", END])
Conditional edges are how you express “loop until the model stops calling tools, but cap at 10 steps.” They are also where you wire in dynamic interrupts — return "human_review" if the proposed action is destructive, return "call_tool" otherwise. Routing logic in a typed function is dramatically easier to reason about than the same logic buried inside a prompt.
4. Checkpointers
A checkpointer is a key-value store keyed by thread_id that persists the full state after every node. With a checkpointer attached, an agent run can be paused, killed, restarted, or branched, and it picks up exactly where it left off:
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(POSTGRES_URL)
app = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user-42-session-7"}}
result = await app.ainvoke({"messages": [user_msg]}, config=config)
In dev use MemorySaver (in-process). In prod use PostgresSaver or RedisSaver. The latency cost is one write per node — typically under 5ms — and the operational value is enormous: you can replay any user’s session step-by-step from the checkpoint table.
A complete graph
Putting the four primitives together, here is a real agent — a customer support triage that calls tools, loops, and ends:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.graph.message import add_messages
from typing import Annotated, TypedDict
class State(TypedDict):
messages: Annotated[list, add_messages]
step_count: int
graph = StateGraph(State)
async def plan(state):
response = await llm.ainvoke(state["messages"])
return {"messages": [response], "step_count": state["step_count"] + 1}
async def call_tool(state):
last = state["messages"][-1]
results = []
for call in last.tool_calls:
out = await TOOLS[call["name"]].ainvoke(call["args"])
results.append({"role": "tool", "content": str(out), "tool_call_id": call["id"]})
return {"messages": results}
def route(state):
if state["step_count"] > 10:
return END
last = state["messages"][-1]
return "call_tool" if getattr(last, "tool_calls", None) else END
graph.add_node("plan", plan)
graph.add_node("call_tool", call_tool)
graph.add_edge("__start__", "plan")
graph.add_conditional_edges("plan", route, ["call_tool", END])
graph.add_edge("call_tool", "plan")
app = graph.compile(checkpointer=PostgresSaver.from_conn_string(POSTGRES_URL))
That’s 25 lines for a stateful, resumable, traceable, budget-bounded ReAct agent. Compare it to a bare AgentExecutor and you see why the ecosystem moved.
Patterns to use
A few patterns earn their keep across nearly every LangGraph agent I’ve shipped:
- Plan / Act split. Two nodes (
planproduces a tool call or final answer;call_toolruns it). The split makes the trace readable and the loop bound explicit. - Reducers, always. Annotate every list/dict field in state with a reducer at the start. Adding one later means rewriting nodes.
step_countin state. A single integer that every conditional edge checks. The cap is the difference between a 400 incident.- Tool errors as messages. Catch exceptions inside
call_tooland return them as tool-result messages with the error text. The model often recovers; an uncaught exception just crashes the agent. - Thread IDs that map to your domain.
thread_id = "ticket-{ticket_id}", not a UUID. You’ll be glad when you grep the checkpoint table during an incident.
Patterns to avoid
- God nodes. A single 200-line node that plans, calls tools, summarizes, and decides next steps. Split it. Each responsibility is its own node.
- Stateful mutation inside nodes. Modifying
state["x"]directly instead of returning a partial update. It seems to work in dev and silently breaks parallel branches in prod. - Tracing off in non-prod. You’ll want the trace from the staging incident more than the one from prod.
- Treating the graph as documentation. Generate the Mermaid diagram (
app.get_graph().draw_mermaid()) and commit it to the repo — when the graph drifts from the diagram, the diff catches it in code review.
Next week
The graph we just built loops a fixed model+tool node until done. That’s the easy case. Next post: cycles that converge, branches that fan out and merge, and how to insert a human into the loop without rewriting the graph each time.