Skip to content

Observer bus

The observer bus is how AgentOS turns every tool call into a live event you can watch from a browser. The HTTP page shows the two routes (/observer/history, /observer/stream); this page is the deep dive — what’s behind those routes, why it’s shaped the way it is, and how it survives restarts and slow consumers.

At the heart of the engine is a single in-process tokio::sync::broadcast channel:

static OBSERVER_EVENTS: Lazy<broadcast::Sender<Value>> = Lazy::new(|| {
let (tx, _) = broadcast::channel(256);
tx
});

core/crates/engine/src/lib.rs:130-133. One sender, many receivers. Anything in the engine can call observer_events::emit(&event); every active subscriber sees a clone.

Why broadcast and not mpsc? mpsc is a queue: each item is consumed by exactly one receiver. The observer bus has the opposite shape — N independent consumers that all want the same stream. The CLI’s agentos watch, the bridge’s SSE endpoint, an mcp-watch debugger, and a future TUI can all subscribe simultaneously without stealing events from each other. Each subscribe() returns a fresh Receiver that begins reading from the channel head at the moment of subscription (lib.rs:135-137).

A single tool call (graph read, skill run, etc.) emits at three points:

PhaseSiteWhen
startedlib.rs:447Before dispatch, with start_body placeholder (“Searching…”, “Loading…”).
completedlib.rs:533On success, with latency_ms and the rendered result.
failedlib.rs:547On error, with latency_ms and the error string.

Display metadata (title, address, icon, body_markdown, accent_color) is computed by observer_display() (lib.rs:236-273) and embedded into every event so consumers don’t have to re-derive it. Skill colors are looked up from the skill manifest at emit time.

flowchart TD
H["engine handler<br/>call_tool()<br/>lib.rs:430"] -->|"observer_event(...)"| BC["broadcast::Sender&lt;Value&gt; (cap 256)<br/>lib.rs:130"]
BC -->|"fan-out (clone per subscriber)"| S1["observer.sock<br/>conn #1"]
BC --> S2["observer.sock<br/>conn #2"]
BC --> CLI["CLI watcher<br/>(subscriber)"]
S1 -->|"JSON line + '\n' — write_observer_line lib.rs:390"| WB["web bridge<br/>tail + filter<br/>connect_observer() web-bridge/src/lib.rs:68"]
WB -->|SSE frame| ES["EventSource<br/>(browser)"]

The observer connection handler (handle_observer_connection, lib.rs:1537) calls subscribe(), optionally drains a history prefix from the graph, then loops rx.recv()write_observer_line().

Every event is a JSON object. Two flavors share the wire — live events from observer_event() and history events from observer_history_event() — distinguished by phase.

phasetool examplesNotable fields
startedread, search, run, boot, create, delete, accountsrequest_id, skill?, operation, arguments, display
completedsameadds latency_ms, result
failedsameadds latency_ms, error
historyactivityreplayed past activity from graph::observer_history; display.title is the activity summary
laggedbroadcast receiver fell behind; error carries the skipped count (lib.rs:1615-1631)
history_failedobserver history query failed during the prefix replay

Every event carries ts, session_id, client, working_dir, and a display block (title, address, icon, body_markdown, accent_color). See observer_event() at lib.rs:275-306 for the exact schema.

A concrete completed payload for a Goodreads skill call:

{
"ts": "2026-04-14T11:42:08.317",
"request_id": "req-918",
"phase": "completed",
"tool": "run",
"skill": "goodreads",
"operation": "get_book",
"session_id": "claude-code-9f3b",
"client": "Claude Code",
"working_dir": "/Users/joe/dev/agentos",
"arguments": { "skill": "goodreads", "tool": "get_book",
"params": { "isbn": "9780140449136" } },
"display": {
"title": "goodreads · get_book",
"address": "9780140449136",
"icon": "⚡",
"body_markdown": "## Meditations\n*Marcus Aurelius* · 254 pages",
"accent_color": "#8a5a44"
},
"latency_ms": 412,
"result": { "isbn13": "9780140449136", "title": "Meditations", "...": "..." },
"error": null
}

The channel capacity is 256 events (lib.rs:131). Tokio’s broadcast is a ring: when the buffer fills and a new event is sent, the oldest event is dropped from the buffer for any receiver still pointing at it. That receiver’s next recv() returns RecvError::Lagged(n) carrying how many events it missed.

The observer connection handler turns that into a synthetic event rather than disconnecting:

Err(broadcast::error::RecvError::Lagged(skipped)) => {
let notice = json!({ "phase": "lagged",
"error": format!("observer client skipped {} events", skipped),
/* ... */ });
if !write_observer_line(&mut write_half, &notice).await { break; }
}

lib.rs:1615-1631. The slow consumer keeps its connection but learns it lost N events. Fast events are never blocked by slow consumers — broadcast is lossy by design, which is the right trade for telemetry.

The bridge has its own back-pressure story. connect_observer() retries up to 20 times with 250 ms between attempts (web-bridge/src/lib.rs:68-83) — enough to ride out an agentos restart (~10 s) while the engine relays its singleton lock and reopens observer.sock. After 20 attempts it gives up and the route returns 503 SERVICE_UNAVAILABLE.

Writes from the engine to a single observer socket are line-buffered JSON: write_observer_line() writes the serialized event, a \n, and flushes (lib.rs:390-405). One event per line, no framing layer needed — BufReader::read_line on the bridge side just works.

The two routes serve different shapes of the same stream:

  • /observer/history — one-shot. Bridge connects, sends a subscribe line with history_limit, reads up to N lines, returns them as a JSON array, closes. No live tail. (web-bridge/src/lib.rs:124-170.)
  • /observer/stream — SSE. Bridge subscribes with the same parameters, the engine first drains a history prefix from graph::observer_history (newest first, then reversed so the prefix is chronological — lib.rs:1567-1572), then switches to live tailing. The bridge yields each engine line as an SSE data: frame (web-bridge/src/lib.rs:205-236).

Last-Event-ID resumption is honored at the SSE protocol level via the prefix replay window. The “ring buffer” is really two layers stacked: the broadcast channel’s 256-event tail for in-flight events, and graph::observer_history for the durable record persisted in the graph. A reconnecting client gets up to 200 history items (lib.rs:1558) and seamlessly resumes live.

EventSource reconnects automatically. The engine’s observer.sock accepts new connections at any time, so a transient close — bridge restart, network blip — heals on its own as long as the engine is alive.

The engine exposes two distinct Unix sockets:

SocketDirectionProtocolMultiplicity
~/.agentos/engine.sockbidirectionalJSON-RPC, request/responsemany concurrent clients
~/.agentos/observer.sockserver → client fan-outnewline-delimited JSON, no responsesmany concurrent subscribers

They could in principle share one socket. They don’t, for three reasons:

  1. Framing. Engine RPC pairs id ↔ response; observer is a free-running unidirectional stream. Multiplexing them on one socket means inventing a tag layer, and every consumer becomes a partial demuxer. Separate sockets, separate framing, no demuxer.
  2. Back-pressure. A slow observer subscriber must not stall RPC writes. With one shared socket, a stuck reader pauses the writer for both protocols; with two sockets the kernel’s per-socket buffers isolate them.
  3. Trust surface. Observer is read-only; engine is write-capable. Separate sockets let us tighten observer permissions independently if we ever want unprivileged subscribers (today both are user-private).

Same daemon, same flock — just two listening sockets.

FailureWhat happens
Engine restartobserver.sock closes. Bridge sees EOF on its read_line, the SSE stream ends. Browser EventSource reconnects; the bridge re-runs connect_observer() with the 20×250 ms retry, picks up the new socket, replays history prefix, resumes live. Total user-visible gap: ~10 s.
Bridge restartEngine drops the bridge’s broadcast Receiver (Closed). All EventSource connections terminate. Browser auto-reconnects to the new bridge process.
Skill emits faster than the channel drainsBroadcast ring overruns the slow subscriber. That subscriber gets RecvError::Lagged(n), which the engine surfaces as a synthetic phase: "lagged" event over the wire. Fast subscribers are unaffected. The graph’s persistent activity log is unaffected — durability lives in the graph, not in the channel.
Bridge can’t reach the engineAfter 20 failed connect attempts, /observer/stream and /observer/history return 503. /healthz reports engine: false (web-bridge/src/lib.rs:99-103).
Observer socket exists but engine is wedgedBridge connects, sends subscribe line, reads zero lines; read_line returns Ok(0) and the SSE stream closes cleanly. No retry inside a single connection — the browser’s EventSource will reconnect and try again.