Skip to content
Skip to content

LangGraph Patterns: Cycles, Branches, and Human-in-the-Loop

• 9 min read
LangGraph Patterns: Cycles, Branches, and Human-in-the-Loop

The basic LangGraph loop from last week is the smallest useful agent. The next three patterns — bounded cycles, fan-out / fan-in branches, and human-in-the-loop interrupts — are what separate a demo from a system you’d let touch production data.

This post walks each one with the code, the failure modes, and the rule of thumb for when to reach for it.

Pattern 1: Bounded cycles

Every agent loops. The question is when it stops. Three conditions deserve explicit treatment in the graph:

  1. The model says it’s done — emitted a final answer, no tool calls.
  2. Step budget exceeded — total node executions exceeded the cap.
  3. Cost budget exceeded — total tokens or dollars exceeded the cap.

The mistake is checking only (1) and discovering (2) and (3) on the bill.

from langgraph.errors import GraphRecursionError

class State(TypedDict):
    messages: Annotated[list, add_messages]
    step_count: int
    tokens_used: int

MAX_STEPS = 12
MAX_TOKENS = 200_000

async def plan(state):
    response = await llm.ainvoke(state["messages"])
    tokens = response.usage_metadata["total_tokens"]
    return {
        "messages": [response],
        "step_count": state["step_count"] + 1,
        "tokens_used": state["tokens_used"] + tokens,
    }

def route(state):
    if state["step_count"] >= MAX_STEPS:
        return "force_finish"
    if state["tokens_used"] >= MAX_TOKENS:
        return "force_finish"
    last = state["messages"][-1]
    return "call_tool" if getattr(last, "tool_calls", None) else END

async def force_finish(state):
    """Last-ditch summarization when budget is exhausted."""
    summary = await llm.ainvoke(state["messages"] + [
        {"role": "user", "content": "Summarize progress and stop. Be honest if incomplete."}
    ])
    return {"messages": [summary]}

graph.add_node("force_finish", force_finish)
graph.add_conditional_edges("plan", route, ["call_tool", "force_finish", END])
graph.add_edge("force_finish", END)

Two design choices worth calling out. force_finish is its own node, not a silent kill — the user gets a coherent (if incomplete) answer instead of a timeout. And MAX_STEPS and MAX_TOKENS live in state, not as global constants — different agent personas get different caps, and the cap travels with the run.

The LangGraph runtime also enforces a recursion limit (default 25) that throws GraphRecursionError. Treat that as your safety net, not your stop condition; it exists to catch graph bugs (an edge that points back to itself), not to manage budget.

Pattern 2: Fan-out / fan-in

Some tasks are embarrassingly parallel. Summarizing five documents, scoring a candidate against ten criteria, querying three data sources — each work item is independent. A serial loop wastes wall-clock time; fan-out runs them concurrently and merges results back.

LangGraph expresses this with the Send API:

from langgraph.constants import Send

class State(TypedDict):
    documents: list[str]
    summaries: Annotated[list[str], operator.add]   # reducer is critical

async def summarize_one(state: dict) -> dict:
    """Runs once per document, in parallel."""
    summary = await llm.ainvoke([
        {"role": "user", "content": f"Summarize:\n\n{state['document']}"}
    ])
    return {"summaries": [summary.content]}

def fan_out(state: State):
    """Conditional edge that emits one Send per document."""
    return [Send("summarize_one", {"document": doc}) for doc in state["documents"]]

graph.add_node("summarize_one", summarize_one)
graph.add_conditional_edges("__start__", fan_out, ["summarize_one"])
graph.add_edge("summarize_one", "aggregate")

Two things make this pattern work:

  • The reducer on summaries (operator.add for list concat) is what lets every parallel branch’s return value get merged without overwriting the others. Without it, the last branch to finish wins and the rest of the work is silently dropped.
  • The Send object carries only what that branch needs — the document, not the whole document list. The branch runs as if it were its own tiny graph; the merge happens automatically.

When you also need a fan-in node that synthesizes the parallel results, add it as the destination of the fan-out node:

async def aggregate(state: State):
    combined = "\n\n".join(state["summaries"])
    final = await llm.ainvoke([
        {"role": "user", "content": f"Write an overview from these summaries:\n{combined}"}
    ])
    return {"messages": [final]}

graph.add_node("aggregate", aggregate)
graph.add_edge("aggregate", END)

Failure modes worth knowing. Parallel branches that mutate shared external state (the same DB row, the same file) need explicit locking — LangGraph has no idea your tools aren’t independent. And the cost of fan-out scales linearly with the number of branches; cap the fan-out width in code, not in the prompt.

Pattern 3: Human-in-the-Loop

This is the pattern that pays for adopting a graph framework in the first place. A node calls interrupt(), the runtime persists state to the checkpointer, the function returns control to your application, and a human-supplied value resumes the graph from exactly that point — possibly hours later, possibly from a different process.

The shape:

from langgraph.types import interrupt, Command

async def propose_action(state: State):
    """Agent has decided to do something destructive. Ask a human first."""
    proposed = state["pending_action"]
    decision = interrupt({
        "kind": "approval_required",
        "action": proposed,
        "context": state["reasoning_trace"][-3:],
    })
    return {"approval": decision, "messages": [{"role": "system",
              "content": f"Human {decision['outcome']} the action."}]}

def gate(state):
    if state["approval"]["outcome"] == "approved":
        return "execute"
    return "explain_decline"

graph.add_node("propose_action", propose_action)
graph.add_conditional_edges("propose_action", gate, ["execute", "explain_decline"])

In your application, the flow is two phases:

config = {"configurable": {"thread_id": "ticket-9421"}}

# Phase 1: run until interrupt
result = await app.ainvoke({"messages": [user_input]}, config=config)
if "__interrupt__" in result:
    payload = result["__interrupt__"][0].value
    await notify_approver(payload)             # Slack, email, dashboard...
    return {"status": "awaiting_approval", "thread_id": "ticket-9421"}

# Phase 2 (separate request, after human decides):
decision = {"outcome": "approved", "approver": "alice@co.com"}
result = await app.ainvoke(Command(resume=decision), config=config)

The interrupt does not block the process. The agent’s full state is persisted by the checkpointer; the calling code returns; the human’s reply comes in through whatever channel you wire up. When you re-invoke with Command(resume=...), the runtime loads the checkpoint and continues from the interrupted node.

Three rules that make this reliable in practice:

  1. Make the interrupt payload self-contained. The approver UI should not need to call your DB to render the request. Put the proposed action, the relevant context, and the trace excerpt in the payload.
  2. Idempotent execute nodes. If the human approves twice (race condition, retry, network hiccup), the action should run once. The simplest mechanism is a request_id derived from (thread_id, step_count).
  3. A timeout policy. Interrupts can hang forever. Add a scheduled job that resumes long-paused threads with decision = {"outcome": "timed_out"} and routes them to a graceful explain-decline path.

This is also where dynamic breakpoints belong — interrupt_before=["execute"] in the compile call makes every run pause before that node, useful for staging and for debugging.

app = graph.compile(
    checkpointer=PostgresSaver.from_conn_string(POSTGRES_URL),
    interrupt_before=["execute"],
)

When to reach for which

SituationPattern
Bounded ReAct loopCycle + step/token budget
Parallel work over a listFan-out / fan-in with Send
Destructive action behind approvalInterrupt + Command resume
User can intervene mid-conversationInterrupt at the top of the loop
Conditional path based on early resultConditional edge from the relevant node
”Subagent” that may or may not runFan-out of size 0 or 1

A real production agent uses all three. The triage agent in our running example: loops with a budget, fans out to summarize the user’s last five tickets in parallel, and interrupts before issuing a refund. Each pattern is a few lines once you’ve internalized the primitives.

What the graph looks like when you’re done

STARTplanroutecall_toolforce_finishbudget cap— parallel fan-out (Send API) —summarize₁summarize₂summarize₃aggregate (fan-in)propose_actioninterrupt → human approvalexecuteif approvedEND

Dashed lines are the budget escape (force_finish) and the interrupt edge (human approval). Solid lines are the happy path. The whole graph is a couple hundred lines of code, fully typed, fully traced, and resumable from any node. That’s the production shape.

Next week we move to multi-agent orchestration with CrewAI, where the unit of composition stops being the node and starts being the role.

References

Suggest changes