entorin

LangGraph

Wrap each node and drive the graph through HITL — substrate primitives flow through the same RunContext / EventBus / Ledger as every other path.

LangGraph owns its own state graph, scheduler, and checkpointer; the control plane cannot wrap a single transport boundary the way the Claude / Codex paths do. The adapter exposes two seam helpers — one wraps a node callable, one drives the graph through HITL — and the rest is plain control-plane primitives the node body imports directly.

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import END, START
from langgraph.graph import StateGraph
from langgraph.types import interrupt
from typing_extensions import TypedDict

from adapters.langgraph import instrument_node, run_with_hitl
from entorin.auth import Capability, Principal
from entorin.budget import MemoryLedger
from entorin.checkpoint import CAPABILITY_KIND as CKPT_CAP
from entorin.context import RunContext
from entorin.events import EventBus

# 1. The same control-plane primitives as every other path.
bus    = EventBus()
ledger = MemoryLedger()
principal = Principal(
    user_id="alice",
    caps=(Capability(kind=CKPT_CAP), ...),  # plus your model.* keys
)
ctx = RunContext(run_id=str(uuid.uuid4()), principal=principal)

class State(TypedDict, total=False):
    question: str
    draft: str
    answer: str

# 2. Each node fn is a plain callable that uses control-plane primitives.
async def answer(state: State) -> State:
    response = await my_model.acomplete(ctx, [Message(role="user", content=state["question"])])
    return {"draft": "\n".join(b.text for b in response.content if isinstance(b, TextBlock))}

def confirm(state: State) -> State:
    interrupt({"draft": state["draft"]})  # pauses the graph
    return {"answer": state["draft"]}

# 3. Wrap each node so node.pre / node.post / entorin.node span fire,
#    and a budget gate ticks before the body runs.
builder = StateGraph(State)
builder.add_node(
    "answer",
    instrument_node(answer, node_name="answer", run_context=ctx, event_bus=bus, ledger=ledger),
)
builder.add_node(
    "confirm",
    instrument_node(confirm, node_name="confirm", run_context=ctx, event_bus=bus, ledger=ledger),
)
builder.add_edge(START, "answer").add_edge("answer", "confirm").add_edge("confirm", END)

# 4. Compile with a checkpointer (interrupt() requires one).
graph = builder.compile(checkpointer=InMemorySaver())

# 5. Drive the graph through HITL. Every interrupt() routes via your
#    CheckpointTransport (StdinCheckpointTransport, http_sse, your own).
final = await run_with_hitl(
    graph=graph,
    input={"question": "..."},
    config={"configurable": {"thread_id": ctx.run_id}},
    ctx=ctx, bus=bus,
    transport=my_checkpoint_transport,
)

Outcomes

  • Each node emits node.pre / node.post and an entorin.node span; the control plane ticks the budget gate before the body runs.
  • LLM calls inside a node go through the same entorin.model adapters as the bare-loop path — the control plane emits llm.call.pre / llm.call.post and ticks the ledger.
  • interrupt() pauses the graph; run_with_hitl translates the pause into a checkpoint.request event, routes the prompt through the transport, then resumes via Command(resume=...) keyed to the firing interrupt.
  • A capability-denied principal triggers policy.violation at the control-plane boundary, never the LangGraph boundary.

Quirks worth knowing

  • run_with_hitl requires both a checkpointer at compile time and thread_id in config["configurable"]. Either omission raises ValueError at entry.
  • LangGraph checkpoint state I/O (BaseCheckpointSaver.put / get) is not observed by the control plane — that’s a LangGraph-internal concern; pick any saver implementation.
  • A runnable end-to-end demo with a stub model and an auto-approve transport: examples/langgraph_qa.py.