Stateful Agents¶
Building agents that hold per-tenant state across multiple tool calls — without reinventing the runtime.
A single @mesh.tool call is stateless from the mesh's perspective: a request arrives, a handler runs on a worker loop, a response goes out. That's the right shape for most tools. But some agents are not a function — they're a session: a debate that runs over many turns, a multi-stage workflow with checkpoints, an aggregator that accumulates inputs across calls. State has to survive between calls, survive a replica restart, and (for long-running work) keep advancing even when no client is actively waiting.
This page walks through the mesh-idiomatic decomposition for that class of agent: a state agent that owns durable storage, an orchestrator agent that runs the long-lived unit of work, and a client surface (browser via mesh.route, MCP-aware clients via the MCP transport) that talks to both. The pattern is small, but it pulls in several primitives — MeshJob, DDDI, the worker-pool topology — so the "why" matters as much as the "how."
The problem¶
"Stateful" in mesh has a specific meaning:
- Multiple tool calls touch shared state. A debate agent has turns: turn N reads the transcript through turn N-1, appends, and writes back. A multi-tenant aggregator has one logical bucket per tenant that tool calls from many clients land into.
- State outlives a single MCP request-response. The client closes the connection, comes back five minutes later, picks up where they left off.
- State survives replica restart. Pods get evicted, deployments roll, OOMKills happen. The in-flight unit of work has to resume on a peer (or on the same pod after restart) without rewinding to zero.
- Background work runs between user-driven calls. A driver loop polls an upstream every 10 seconds; a workflow stage waits on an external signal; a timer fires at a deadline. None of this is gated on a user tool call landing.
Loop topology (v2.2.4+)¶
mcp-mesh runs your agent across two event loops:
- Framework loop (uvicorn main): serves
/health,/ready,/livez, and routes MCP protocol traffic. Always responsive. - User loop (single, dedicated): runs your FastMCP/FastAPI
lifespanstartup, all@mesh.tooland@app.toolbodies, andlifespanexit. One loop for everything you write.
The two-loop split means a long-running tool body (LLM call, slow registry hop, multi-minute MeshJob) holds the user loop, but never the framework loop — your K8s liveness/readiness probes stay responsive.
What this fixes: loop-bound resources work as expected when created in lifespan startup. asyncpg.Pool, redis.asyncio.Redis, and aiohttp.ClientSession are all loop-affine — they bind to the asyncio loop that created them and fail if reused from a different one. Because v2.2.4 runs your lifespan AND your tools on the same user loop, the canonical FastMCP idiom is straightforward:
# Module-level — FastMCP's `lifespan` parameter receives a FastMCP server
# instance, not a FastAPI app, so `.state` isn't available. The canonical
# Python pattern for sharing a lifespan-bound resource with tools is a
# module-level global.
_pool = None
@asynccontextmanager
async def _lifespan(server):
global _pool
_pool = await asyncpg.create_pool(...)
try:
yield
finally:
if _pool is not None:
await _pool.close()
app = FastMCP("my-agent", lifespan=_lifespan)
@app.tool()
@mesh.tool(capability="query")
async def query() -> dict:
async with _pool.acquire() as conn:
return await conn.fetchrow("SELECT ...")
No per-loop dict workarounds, no WORKERS=1 ceremony, no surprises. The pool is created on the user loop in lifespan startup; tools use it on the same user loop; lifespan exit closes it on the same loop.
Opt-in N>1 worker pool: if a tool body does sync blocking work (e.g. time.sleep, a sync HTTP client, CPU-bound number crunching) and you need concurrent calls to absorb it instead of serializing on one loop, set MCP_MESH_TOOL_WORKERS=N (N>1). You get N worker loops, dispatched round-robin. Loop-bound resources created in lifespan still bind to one loop (worker-0); tools dispatched to worker-1..N-1 cannot access them. The supported pattern for that case is the per-loop dict cache (see src/runtime/python/_mcp_mesh/engine/unified_mcp_proxy.py for the SDK's own internal use of this pattern for httpx clients).
Better escape for sync-blocking: refactor the blocking call to await asyncio.to_thread(blocking_call). The blocking call runs on Python's default thread pool; the user loop stays free; you don't need N>1 workers.
The structural answer — the one that survives replica restart, scales horizontally, and composes with the rest of the mesh — is the three-agent decomposition that follows.
The mesh-idiomatic decomposition¶
Three agents. One owns durable state. One owns the long-running unit of work. One (or more) is the user-facing surface. Each is independently deployable, independently scalable, and binds to the rest of the mesh via plain DDDI.
State agent — stateless CRUD over durable storage¶
The state agent exposes @mesh.tool functions that read and write a Postgres (or Redis) database. It does no business logic. It does no long-running work. From mesh's perspective it's an ordinary CRUD agent — every tool call is short, idempotent where possible, and returns. The pool lives inside the agent process — created in lifespan startup, stored in a module-level global, and closed in lifespan exit. Since v2.2.4, lifespan and all tool bodies share the single user loop, so a module-level pool created in lifespan startup is the supported pattern. (FastMCP's lifespan receives a FastMCP server instance — there is no .state namespace as there is on a FastAPI app.)
Schema:
- An event log table — append-only record of every state mutation. Cheap to scan, replayable for audit / debugging.
- A state snapshot table — current state per logical key (tenant, session, workflow id). Read-optimized; reconstructed from the event log if it gets out of sync.
- A pending_inputs table — inbox for events that target a running workflow (covered in Handling external events below).
import os
from contextlib import asynccontextmanager
import asyncpg
import mesh
from fastmcp import FastMCP
# Module-level — FastMCP's lifespan param is a FastMCP server instance,
# not a FastAPI app, so .state isn't available. The canonical Python
# pattern is a module-level global.
_pool = None
@asynccontextmanager
async def _lifespan(server):
global _pool
_pool = await asyncpg.create_pool(os.environ["DATABASE_URL"])
try:
yield
finally:
if _pool is not None:
await _pool.close()
app = FastMCP("State Agent", lifespan=_lifespan)
@app.tool()
@mesh.tool(capability="read_state")
async def read_state(tenant_id: str) -> dict:
async with _pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT state FROM tenant_state WHERE tenant_id=$1",
tenant_id,
)
return dict(row["state"]) if row else {}
@app.tool()
@mesh.tool(capability="append_event")
async def append_event(tenant_id: str, event_type: str, payload: dict) -> dict:
async with _pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
"INSERT INTO event_log (tenant_id, event_type, payload) "
"VALUES ($1, $2, $3)",
tenant_id, event_type, payload,
)
await conn.execute(
"INSERT INTO tenant_state (tenant_id, state) "
"VALUES ($1, $2) "
"ON CONFLICT (tenant_id) DO UPDATE "
"SET state = tenant_state.state || EXCLUDED.state",
tenant_id, payload,
)
return {"ok": True}
@mesh.agent(name="state-agent", http_port=9200, auto_run=True)
class StateAgent: pass
Deployment env:
This agent scales horizontally if you front it with Postgres-side locking on the rows that need serialization — but for most state-agent shapes the bottleneck is the database, not the agent.
Orchestrator agent — @mesh.tool(task=True) MeshJob bodies¶
The orchestrator hosts the long-running unit of work. Each unit is one @mesh.tool(capability=..., task=True) body. Inside the body:
- Call into the state agent for every mutation (no in-process state).
- Drive an LLM via
mesh.MeshLlmAgent. - Emit progress with
await job.update_progress(fraction, message). - Persist after every meaningful step. If the pod dies, the next claim cycle reroutes the job (orphan reroute is mesh-managed — see Long-Running Jobs) and the new owner reads the latest snapshot from the state agent to know where to resume.
import mesh
from fastmcp import FastMCP
from mesh import MeshJob
app = FastMCP("Orchestrator")
@app.tool()
@mesh.tool(
capability="run_workflow",
task=True,
dependencies=["read_state", "append_event"],
)
async def run_workflow(
tenant_id: str,
plan: list[str],
job: MeshJob = None,
read_state: mesh.McpMeshTool = None,
append_event: mesh.McpMeshTool = None,
llm: mesh.MeshLlmAgent = None,
) -> dict:
if job is not None:
await job.update_progress(0.0, "loading state")
state = await read_state(tenant_id=tenant_id)
completed = set(state.get("completed_stages", []))
total = max(len(plan), 1)
for i, stage in enumerate(plan):
if stage in completed:
continue # resumed run skips finished stages
# Real work for this stage.
answer = await llm(f"Execute stage {stage} for tenant {tenant_id}")
# Persist BEFORE acknowledging progress — on crash the state
# agent is source of truth.
await append_event(
tenant_id=tenant_id,
event_type="stage_completed",
payload={"stage": stage, "result": answer,
"completed_stages": list(completed | {stage})},
)
completed.add(stage)
if job is not None:
await job.update_progress((i + 1) / total, f"{stage} done")
result = {"tenant_id": tenant_id, "completed": list(completed)}
if job is not None:
await job.complete(result)
return result
@mesh.agent(name="orchestrator", http_port=9201, auto_run=True)
class Orchestrator: pass
The orchestrator is stateless from a resource-binding standpoint — all loop-bound resources live across the boundary in the state agent's process. Whether the orchestrator runs at the default single-user loop or opts into MCP_MESH_TOOL_WORKERS=N for sync-blocking parallelism is independent of this decomposition.
Client surface — mesh.route for web, MCP for tools¶
Both the browser and MCP-aware clients (over the MCP transport) hit the same backend capabilities. A FastAPI route translates HTTP into a MeshJob submit + wait:
import mesh
from fastapi import FastAPI
from mesh import MeshJob
from pydantic import BaseModel
app = FastAPI()
class StartRequest(BaseModel):
tenant_id: str
plan: list[str]
@app.post("/api/workflows")
@mesh.route(dependencies=["run_workflow"])
async def start_workflow(
body: StartRequest,
run_workflow: MeshJob = None,
) -> dict:
proxy = await run_workflow.submit(
tenant_id=body.tenant_id, plan=body.plan, max_duration=3600,
)
return {"job_id": proxy.job_id}
The browser polls __mesh_job_status (or subscribes via SSE — see Streaming for the progress-notification path) for updates. An MCP client calls run_workflow directly through the mesh as a normal tool that returns the job_id.
Why this shape¶
Each piece of the decomposition pays for itself:
- In-memory state is a cache; Postgres is authoritative. When the orchestrator pod dies mid-stage, the next claim cycle hands the job to a peer. The peer reads the state-agent snapshot, sees which stages completed, and resumes. Lost in-flight CPU is bounded by the time between the last
append_eventand the crash — typically seconds. - MeshJob owns the long-running task lifecycle. Orphan reroute on replica death, cancel propagation, progress streaming, per-attempt deadlines, retry-on-transient — all of that is the substrate's job, not yours. You write the body. See Long-Running Jobs for the full lifecycle.
- Tool calls are stateless from mesh's perspective. The orchestrator scales horizontally without coordination; the state agent scales horizontally with Postgres as the coordination point; the route agent scales horizontally trivially.
- Loop-bound resources stay inside one process. The asyncpg pool lives in the state agent — created in
lifespanstartup on the user loop, used by every tool body on the same user loop, closed inlifespanexit on the same loop. No cross-loop sharing. No cross-process sharing. The pool is invisible to the orchestrator and to the route agent.
Handling external events during a run¶
A common real-world wrinkle: while the workflow is running, the user wants to interact with it — submit input mid-stage, change a parameter, extend a deadline. The workflow body is sitting on an await inside an LLM call or a state-agent write. It needs a channel to receive these mid-flight signals.
Today's pattern: inbox-via-state-agent¶
The state agent grows a pending_inputs table. The route agent writes to it. The orchestrator polls it at iteration boundaries:
# State agent — inbox writer (called by the route agent on user input)
@app.tool()
@mesh.tool(capability="enqueue_input")
async def enqueue_input(tenant_id: str, input_type: str, payload: dict) -> dict:
async with _pool.acquire() as conn:
await conn.execute(
"INSERT INTO pending_inputs (tenant_id, input_type, payload, claimed) "
"VALUES ($1, $2, $3, FALSE)",
tenant_id, input_type, payload,
)
return {"ok": True}
# State agent — inbox reader (called by the orchestrator each iteration)
@app.tool()
@mesh.tool(capability="claim_inputs")
async def claim_inputs(tenant_id: str) -> list[dict]:
async with _pool.acquire() as conn:
rows = await conn.fetch(
"UPDATE pending_inputs SET claimed=TRUE "
"WHERE tenant_id=$1 AND NOT claimed RETURNING input_type, payload",
tenant_id,
)
return [{"input_type": r["input_type"], "payload": r["payload"]} for r in rows]
The orchestrator polls inside the loop:
The latency cost is one HTTP round-trip per poll. Against the cost of the LLM call or the work itself, that's noise.
Limitations of polling¶
Polling works when the workflow naturally cycles through iteration boundaries. It does not work for sub-iteration events: if your body is parked on a long asyncio.sleep_until(deadline) or inside a single streaming LLM call that runs for two minutes, the user clicking "extend my deadline" lands in pending_inputs and sits there until the next boundary — which may be after the event was useful.
Sub-iteration events: mesh-managed event channel (shipped in v2.2)¶
The sub-iteration gap above closes with MeshJob event injection — a per-job, ordered, append-only event log every running job carries. Anyone holding the job_id posts into the log; the running handler drains it inline. Producer-side proxy.send_event(event_type, payload) (or mesh.jobs.post_event(job_id, event_type, payload) from a caller that doesn't already have a proxy), consumer-side await job.recv_event(types=[...], timeout_secs=...) inside the handler body. Cross-runtime parity (Python / TypeScript / Java).
For long-lived observers that want to mirror events without consuming them, the stream subscription counterpart opens a non-destructive iterator with per-call cursor — multiple subscribers can mirror the same job's events independently. See Event injection and Stream subscription on the Jobs page for the canonical surface, cross-runtime examples, and the synthetic cancel-event pattern that gives handlers a graceful shutdown path.
The inbox-via-state-agent pattern above still has its place — it's the right tool when the workflow naturally syncs at iteration boundaries and the state agent's storage is also where you want the inbox to live. Reach for the event channel when sub-iteration latency matters: a handler parked on a long recv_event wakes the moment the event is posted, not on the next boundary.
Cancel and graceful shutdown¶
A long-running job needs a clean termination story. There are two cases:
- Consumer cancels via
JobProxy.cancel(reason). The owner replica receives the cancel signal and (today) raisesCancelledErrorinto the running handler. Once #1032 lands, handlers that subscribe to events will see a synthetic{"type":"cancelled","reason":...}event instead, which they can handle inline rather than via exception. Either way: persist final state to the state agent in afinallyblock before the handler exits. - Replica shutdown (SIGTERM during pod eviction / deployment rollout). The runtime's lifespan exit phase fires (fix shipped in #1029 — the exit phase is now honored on SIGTERM rather than skipped), any registered cleanup runs, and the registry sees the agent disappear. In-flight jobs become orphans and reroute to a peer within ~5 seconds.
The contract for the handler body:
@app.tool()
@mesh.tool(capability="run_workflow", task=True, ...)
async def run_workflow(...):
try:
... # main loop
except asyncio.CancelledError:
# Persist what we've done so far so the next attempt can resume.
await append_event(
tenant_id=tenant_id, event_type="cancelled",
payload={"completed_stages": list(completed)},
)
raise
finally:
# Anything that must run regardless of outcome — flush
# metrics, close per-tenant resources, etc.
...
What NOT to do¶
A handful of anti-patterns this decomposition exists to prevent:
- Dedicated background-thread engine with
loop.run_forever(). The pattern looks like: spawn a thread at agent startup, give it its ownasyncio.new_event_loop(), run an engine on that loop, and bridge@mesh.toolhandlers into it viaasyncio.run_coroutine_threadsafe. This re-invents MeshJob — orphan reroute, cancel propagation, per-attempt deadlines — in application code, and breaks horizontal scaling because state pins the agent to one replica. It also drags in hand-rolled cross-thread plumbing (sys.modules['__main__']lookups for DI-wired functions,dependency_kwargs.timeoutknobs for long-poll). There IS a narrow class of agents where this pattern is the right answer (GPU contexts, real-time aggregators with sub-10ms latency budgets) — see In-Process State — but it should never be the default reach. - Loop-bound resource cached at module level when
MCP_MESH_TOOL_WORKERS>1. At the defaultWORKERS=1, a module-level pool populated from FastMCPlifespanstartup works (lifespan and tools share the single user loop). It's brittle the moment you opt intoWORKERS=Nto absorb sync-blocking work — the second worker hits "Future attached to a different loop" because the lifespan-created pool is bound to worker-0. The supported shape at the default is the FastMCP-lifespan+ module-global pattern (binds to the single-user loop, used by all tools on the same loop) — see Loop topology above. If you also need N>1 workers for sync-blocking parallelism, switch to a per-loop dict cache (each worker lazily builds its own resource on first access). - Putting orchestration state on the orchestrator process. The orchestrator's job body should be a pure transformer: read state in, drive work, write state out. The moment it caches anything across tool calls, restart recovery breaks.
See Also¶
- Long-Running Jobs — MeshJob substrate: lifecycle, retries, cancel, orphan reroute,
task=Trueopt-in - Streaming — progress notifications and chunked text responses; pairs with MeshJob for live updates
- In-Process State (Escape Hatch) — when even MeshJob can't fit your shape, with documented caveats
- Loop Topology — two-loop model, default
MCP_MESH_TOOL_WORKERS=1, when to opt into N>1 meshctl man dependency-injection— DDDI and loop topology