Events
Event-driven architecture for real-time monitoring, integrations, and reactive workflows
Aleph is built on an event-driven architecture. Every significant action — a tool call, a session starting, a cron job firing, a configuration change — emits a structured event. You can subscribe to these events in real time over WebSocket, enabling dashboards, monitoring systems, external integrations, and reactive automation pipelines.
Architecture
Events flow through two internal buses:
-
Core Event Bus — Handles agent lifecycle events (
AlephEvent). Built ontokio::sync::broadcastchannels for low-latency in-process delivery. Every component of the agent loop publishes here: tool calls, session state changes, planning events, sub-agent activity. -
Daemon Event Bus — Handles system-level events (
DaemonEvent). The perception daemon watches the environment — filesystem changes, process activity, system state, time triggers — and publishes observations here.
Both buses feed into the Gateway's event subscription system, which exposes them as a unified stream to external clients over WebSocket.
┌──────────────────────────────────────────────────────────────┐
│ Internal Buses │
│ │
│ Agent Loop ──▶ Core Event Bus ──┐ │
│ ├──▶ Gateway Event Router │
│ Daemon ──▶ Daemon Event Bus ──┘ │ │
│ │ │
│ ┌────────┴────────┐ │
│ │ Pattern Matcher │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────────────┼───────────┐ │
│ ▼ ▼ ▼ │
│ Client A Client B Client C │
│ stream.* agent.* * │
└──────────────────────────────────────────────────────────────┘Event Categories
Streaming Events (stream.*)
Emitted during agent response generation. These are the most frequent events and power real-time UIs.
| Topic | Description | Key Fields |
|---|---|---|
stream.start | Response streaming begins | run_id |
stream.chunk | Text chunk from the agent | run_id, content |
stream.tool_start | Agent is invoking a tool | run_id, tool_name, args |
stream.tool_end | Tool execution completed | run_id, tool_name, result, duration_ms |
stream.thinking | Reasoning step (chain-of-thought) | run_id, step_type, content |
stream.end | Response streaming finished | run_id |
Agent Events (agent.*)
Agent run lifecycle — start, completion, error, cancellation.
| Topic | Description | Key Fields |
|---|---|---|
agent.started | Agent run has started | run_id, session_key |
agent.completed | Agent run finished | run_id, session_key, tokens_used, duration_ms |
agent.error | Agent run encountered an error | run_id, error |
agent.cancelled | Agent run was cancelled | run_id |
Session Events (session.*)
Session lifecycle changes.
| Topic | Description | Key Fields |
|---|---|---|
session.created | New session created | session_key |
session.deleted | Session was deleted | session_key |
session.compacted | Session was compacted | session_key, tokens_before, tokens_after |
Cron Events (cron.*)
Scheduled job execution events.
| Topic | Description | Key Fields |
|---|---|---|
cron.triggered | Job execution started | job_id, run_id, trigger_source |
cron.completed | Job finished successfully | job_id, run_id, duration_ms |
cron.failed | Job failed after all retries | job_id, run_id, error |
Config Events (config.*)
Configuration changes.
| Topic | Description | Key Fields |
|---|---|---|
config.changed | Configuration was modified | changed_keys, source |
config.reloaded | Configuration reloaded from file | source |
Exec Events (exec.*)
Command execution approval flow.
| Topic | Description | Key Fields |
|---|---|---|
exec.approval_requested | Approval needed for a command | id, command, risk_level |
exec.approval_resolved | Approval decision made | id, decision |
Daemon Events
The daemon emits its own event hierarchy for environmental awareness:
Raw events — direct observations from watchers:
TimeEvent— cron schedule matches, interval ticks, absolute time triggersProcessEvent— process started, stopped, CPU/memory threshold exceededFsEvent— file created, modified, deleted, renamedSystemStateEvent— battery level, network status, display sleep, user activity, system load
Derived events — inferred from patterns in raw events:
ActivityChanged— user activity type changed (coding, browsing, meeting, etc.)ProgrammingSessionStarted/ProgrammingSessionEnded— detected coding sessionsResourcePressureChanged— system resource pressure level transitionsMeetingStateChanged— detected meeting start/endIdleStateChanged— user went idle or returned
System events — daemon self-management:
WatcherStarted/WatcherStopped— watcher lifecycleConfigReloaded— daemon configuration changesError— daemon-level errors
Subscribing to Events
Via WebSocket (Gateway RPC)
The primary way to receive events. Connect to the gateway WebSocket and subscribe with glob patterns:
{
"jsonrpc": "2.0",
"id": 1,
"method": "events.subscribe",
"params": {
"patterns": ["stream.*", "agent.*", "cron.*"]
}
}Events arrive as JSON-RPC notifications (no id field):
{
"jsonrpc": "2.0",
"method": "event",
"params": {
"topic": "agent.completed",
"data": {
"run_id": "run-uuid-123",
"session_key": "agent:main:main",
"tokens_used": 1247,
"duration_ms": 5000
},
"timestamp": 1706400000000
}
}Events are fire-and-forget. The server does not wait for acknowledgement. If the client disconnects, events during the disconnection are lost. There is no replay or catch-up mechanism — design your consumers to be tolerant of gaps.
Managing Subscriptions
Subscriptions are additive. Calling events.subscribe multiple times adds new patterns without removing existing ones.
// Add more patterns
{ "method": "events.subscribe", "params": { "patterns": ["config.*"] } }
// Remove specific patterns
{ "method": "events.unsubscribe", "params": { "patterns": ["stream.*"] } }
// List active subscriptions
{ "method": "events.list" }See events.* RPC methods for the full API specification.
Glob Pattern Matching
Event subscription patterns use glob-style matching against the topic string:
| Pattern | Matches |
|---|---|
stream.* | All streaming events |
agent.* | All agent lifecycle events |
* | Everything |
stream.chunk | Exact match only |
*.completed | Any completed event across all namespaces |
cron.* | All cron job events |
The * wildcard matches any sequence of non-dot characters. Use * alone to subscribe to every event in the system.
Event Payload Structure
Every event delivered over WebSocket follows this envelope:
interface EventNotification {
jsonrpc: "2.0";
method: "event";
params: {
topic: string; // e.g. "agent.completed"
data: Record<string, any>; // event-specific payload
timestamp: number; // Unix timestamp in milliseconds
};
}Core Event Types (Internal)
Internally, Aleph uses a typed event system with sequence numbers for ordering:
pub struct TimestampedEvent {
pub event: AlephEvent,
pub timestamp: i64, // millisecond precision
pub sequence: u64, // monotonically increasing
}The AlephEvent enum covers all core event types: InputReceived, ToolCallStarted, ToolCallCompleted, ToolCallFailed, SessionCreated, SubAgentStarted, AiResponseGenerated, and more. The gateway maps these to the topic-based format for external consumption.
Building Reactive Integrations
Real-Time Dashboard
Subscribe to all events and render them in a web UI:
const ws = new WebSocket("ws://localhost:3001");
ws.onopen = () => {
ws.send(JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "events.subscribe",
params: { patterns: ["*"] }
}));
};
ws.onmessage = (msg) => {
const event = JSON.parse(msg.data);
if (event.method === "event") {
const { topic, data, timestamp } = event.params;
updateDashboard(topic, data, timestamp);
}
};Monitoring and Alerting
Watch for errors and failures across the system:
// Subscribe to failure events
ws.send(JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "events.subscribe",
params: { patterns: ["agent.error", "cron.failed", "*.error"] }
}));
ws.onmessage = (msg) => {
const event = JSON.parse(msg.data);
if (event.method === "event") {
sendAlert({
channel: "#aleph-alerts",
text: `[${event.params.topic}] ${JSON.stringify(event.params.data)}`
});
}
};Tool Usage Analytics
Track which tools the agent uses most and how long they take:
ws.send(JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "events.subscribe",
params: { patterns: ["stream.tool_start", "stream.tool_end"] }
}));
const toolMetrics = {};
ws.onmessage = (msg) => {
const event = JSON.parse(msg.data);
if (event.method !== "event") return;
const { topic, data } = event.params;
if (topic === "stream.tool_start") {
toolMetrics[data.run_id] = {
tool: data.tool_name,
startedAt: Date.now()
};
}
if (topic === "stream.tool_end") {
const start = toolMetrics[data.run_id];
if (start) {
recordMetric(start.tool, Date.now() - start.startedAt);
delete toolMetrics[data.run_id];
}
}
};Cron Job Status Board
Monitor scheduled job health:
ws.send(JSON.stringify({
jsonrpc: "2.0",
id: 1,
method: "events.subscribe",
params: { patterns: ["cron.*"] }
}));
ws.onmessage = (msg) => {
const event = JSON.parse(msg.data);
if (event.method !== "event") return;
const { topic, data } = event.params;
switch (topic) {
case "cron.triggered":
setJobStatus(data.job_id, "running", data.trigger_source);
break;
case "cron.completed":
setJobStatus(data.job_id, "completed", `${data.duration_ms}ms`);
break;
case "cron.failed":
setJobStatus(data.job_id, "failed", data.error);
alertOnFailure(data.job_id, data.error);
break;
}
};Daemon Event-Driven Automation
The daemon's event system enables environment-aware automation. The daemon watches the world and emits events that can trigger agent actions:
Context-Aware Responses
The daemon detects when you start a programming session and can proactively offer help:
ProgrammingSessionStarted— Detected IDE launch + file changes in a project directoryActivityChanged— Transitions between coding, browsing, meetings, etc.IdleStateChanged— User went idle (opportunity for background tasks)ResourcePressureChanged— System running low on memory or battery
Time-Based Triggers
The daemon evaluates cron expressions and interval timers, emitting TimeEvent events:
Cron { expression }— Matched a cron scheduleInterval { seconds }— Interval timer tickAbsolute { time }— Reached a specific timestamp
These time events are what drive the cron job system internally — the daemon fires the trigger, and the cron scheduler picks it up for execution.
Design Principles
- Events are immutable facts — Once emitted, events are never modified. They represent something that happened.
- Fire-and-forget delivery — Publishers do not wait for consumers. This keeps the agent loop fast.
- No guaranteed delivery — Events can be lost if no subscriber is listening or if the buffer overflows. Design consumers accordingly.
- Typed internally, stringly externally — The core uses Rust enums for compile-time safety. The gateway maps them to dot-separated topic strings for language-agnostic consumption.
- Sequence ordering — Each event carries a monotonically increasing sequence number for causal ordering within a single Aleph instance.
See Also
- events.* RPC Methods — Full RPC API specification for event subscriptions
- Hooks — React to lifecycle events with shell commands and prompts
- Cron Jobs — Schedule recurring tasks that emit cron events
- Gateway Protocol — WebSocket transport and event delivery format