Aleph
Automation

Events

Event-driven architecture for real-time monitoring, integrations, and reactive workflows

Aleph is built on an event-driven architecture. Every significant action — a tool call, a session starting, a cron job firing, a configuration change — emits a structured event. You can subscribe to these events in real time over WebSocket, enabling dashboards, monitoring systems, external integrations, and reactive automation pipelines.

Architecture

Events flow through two internal buses:

  1. Core Event Bus — Handles agent lifecycle events (AlephEvent). Built on tokio::sync::broadcast channels for low-latency in-process delivery. Every component of the agent loop publishes here: tool calls, session state changes, planning events, sub-agent activity.

  2. Daemon Event Bus — Handles system-level events (DaemonEvent). The perception daemon watches the environment — filesystem changes, process activity, system state, time triggers — and publishes observations here.

Both buses feed into the Gateway's event subscription system, which exposes them as a unified stream to external clients over WebSocket.

┌──────────────────────────────────────────────────────────────┐
│                      Internal Buses                           │
│                                                               │
│  Agent Loop ──▶ Core Event Bus ──┐                           │
│                                   ├──▶ Gateway Event Router  │
│  Daemon    ──▶ Daemon Event Bus ──┘         │                │
│                                              │                │
│                                     ┌────────┴────────┐      │
│                                     │  Pattern Matcher │      │
│                                     └────────┬────────┘      │
│                                              │                │
│                              ┌───────────────┼───────────┐   │
│                              ▼               ▼           ▼   │
│                         Client A        Client B    Client C  │
│                        stream.*        agent.*        *       │
└──────────────────────────────────────────────────────────────┘

Event Categories

Streaming Events (stream.*)

Emitted during agent response generation. These are the most frequent events and power real-time UIs.

TopicDescriptionKey Fields
stream.startResponse streaming beginsrun_id
stream.chunkText chunk from the agentrun_id, content
stream.tool_startAgent is invoking a toolrun_id, tool_name, args
stream.tool_endTool execution completedrun_id, tool_name, result, duration_ms
stream.thinkingReasoning step (chain-of-thought)run_id, step_type, content
stream.endResponse streaming finishedrun_id

Agent Events (agent.*)

Agent run lifecycle — start, completion, error, cancellation.

TopicDescriptionKey Fields
agent.startedAgent run has startedrun_id, session_key
agent.completedAgent run finishedrun_id, session_key, tokens_used, duration_ms
agent.errorAgent run encountered an errorrun_id, error
agent.cancelledAgent run was cancelledrun_id

Session Events (session.*)

Session lifecycle changes.

TopicDescriptionKey Fields
session.createdNew session createdsession_key
session.deletedSession was deletedsession_key
session.compactedSession was compactedsession_key, tokens_before, tokens_after

Cron Events (cron.*)

Scheduled job execution events.

TopicDescriptionKey Fields
cron.triggeredJob execution startedjob_id, run_id, trigger_source
cron.completedJob finished successfullyjob_id, run_id, duration_ms
cron.failedJob failed after all retriesjob_id, run_id, error

Config Events (config.*)

Configuration changes.

TopicDescriptionKey Fields
config.changedConfiguration was modifiedchanged_keys, source
config.reloadedConfiguration reloaded from filesource

Exec Events (exec.*)

Command execution approval flow.

TopicDescriptionKey Fields
exec.approval_requestedApproval needed for a commandid, command, risk_level
exec.approval_resolvedApproval decision madeid, decision

Daemon Events

The daemon emits its own event hierarchy for environmental awareness:

Raw events — direct observations from watchers:

  • TimeEvent — cron schedule matches, interval ticks, absolute time triggers
  • ProcessEvent — process started, stopped, CPU/memory threshold exceeded
  • FsEvent — file created, modified, deleted, renamed
  • SystemStateEvent — battery level, network status, display sleep, user activity, system load

Derived events — inferred from patterns in raw events:

  • ActivityChanged — user activity type changed (coding, browsing, meeting, etc.)
  • ProgrammingSessionStarted / ProgrammingSessionEnded — detected coding sessions
  • ResourcePressureChanged — system resource pressure level transitions
  • MeetingStateChanged — detected meeting start/end
  • IdleStateChanged — user went idle or returned

System events — daemon self-management:

  • WatcherStarted / WatcherStopped — watcher lifecycle
  • ConfigReloaded — daemon configuration changes
  • Error — daemon-level errors

Subscribing to Events

Via WebSocket (Gateway RPC)

The primary way to receive events. Connect to the gateway WebSocket and subscribe with glob patterns:

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "events.subscribe",
  "params": {
    "patterns": ["stream.*", "agent.*", "cron.*"]
  }
}

Events arrive as JSON-RPC notifications (no id field):

{
  "jsonrpc": "2.0",
  "method": "event",
  "params": {
    "topic": "agent.completed",
    "data": {
      "run_id": "run-uuid-123",
      "session_key": "agent:main:main",
      "tokens_used": 1247,
      "duration_ms": 5000
    },
    "timestamp": 1706400000000
  }
}

Events are fire-and-forget. The server does not wait for acknowledgement. If the client disconnects, events during the disconnection are lost. There is no replay or catch-up mechanism — design your consumers to be tolerant of gaps.

Managing Subscriptions

Subscriptions are additive. Calling events.subscribe multiple times adds new patterns without removing existing ones.

// Add more patterns
{ "method": "events.subscribe", "params": { "patterns": ["config.*"] } }

// Remove specific patterns
{ "method": "events.unsubscribe", "params": { "patterns": ["stream.*"] } }

// List active subscriptions
{ "method": "events.list" }

See events.* RPC methods for the full API specification.

Glob Pattern Matching

Event subscription patterns use glob-style matching against the topic string:

PatternMatches
stream.*All streaming events
agent.*All agent lifecycle events
*Everything
stream.chunkExact match only
*.completedAny completed event across all namespaces
cron.*All cron job events

The * wildcard matches any sequence of non-dot characters. Use * alone to subscribe to every event in the system.

Event Payload Structure

Every event delivered over WebSocket follows this envelope:

interface EventNotification {
  jsonrpc: "2.0";
  method: "event";
  params: {
    topic: string;        // e.g. "agent.completed"
    data: Record<string, any>;  // event-specific payload
    timestamp: number;    // Unix timestamp in milliseconds
  };
}

Core Event Types (Internal)

Internally, Aleph uses a typed event system with sequence numbers for ordering:

pub struct TimestampedEvent {
    pub event: AlephEvent,
    pub timestamp: i64,      // millisecond precision
    pub sequence: u64,       // monotonically increasing
}

The AlephEvent enum covers all core event types: InputReceived, ToolCallStarted, ToolCallCompleted, ToolCallFailed, SessionCreated, SubAgentStarted, AiResponseGenerated, and more. The gateway maps these to the topic-based format for external consumption.

Building Reactive Integrations

Real-Time Dashboard

Subscribe to all events and render them in a web UI:

const ws = new WebSocket("ws://localhost:3001");

ws.onopen = () => {
  ws.send(JSON.stringify({
    jsonrpc: "2.0",
    id: 1,
    method: "events.subscribe",
    params: { patterns: ["*"] }
  }));
};

ws.onmessage = (msg) => {
  const event = JSON.parse(msg.data);
  if (event.method === "event") {
    const { topic, data, timestamp } = event.params;
    updateDashboard(topic, data, timestamp);
  }
};

Monitoring and Alerting

Watch for errors and failures across the system:

// Subscribe to failure events
ws.send(JSON.stringify({
  jsonrpc: "2.0",
  id: 1,
  method: "events.subscribe",
  params: { patterns: ["agent.error", "cron.failed", "*.error"] }
}));

ws.onmessage = (msg) => {
  const event = JSON.parse(msg.data);
  if (event.method === "event") {
    sendAlert({
      channel: "#aleph-alerts",
      text: `[${event.params.topic}] ${JSON.stringify(event.params.data)}`
    });
  }
};

Tool Usage Analytics

Track which tools the agent uses most and how long they take:

ws.send(JSON.stringify({
  jsonrpc: "2.0",
  id: 1,
  method: "events.subscribe",
  params: { patterns: ["stream.tool_start", "stream.tool_end"] }
}));

const toolMetrics = {};

ws.onmessage = (msg) => {
  const event = JSON.parse(msg.data);
  if (event.method !== "event") return;

  const { topic, data } = event.params;

  if (topic === "stream.tool_start") {
    toolMetrics[data.run_id] = {
      tool: data.tool_name,
      startedAt: Date.now()
    };
  }

  if (topic === "stream.tool_end") {
    const start = toolMetrics[data.run_id];
    if (start) {
      recordMetric(start.tool, Date.now() - start.startedAt);
      delete toolMetrics[data.run_id];
    }
  }
};

Cron Job Status Board

Monitor scheduled job health:

ws.send(JSON.stringify({
  jsonrpc: "2.0",
  id: 1,
  method: "events.subscribe",
  params: { patterns: ["cron.*"] }
}));

ws.onmessage = (msg) => {
  const event = JSON.parse(msg.data);
  if (event.method !== "event") return;

  const { topic, data } = event.params;

  switch (topic) {
    case "cron.triggered":
      setJobStatus(data.job_id, "running", data.trigger_source);
      break;
    case "cron.completed":
      setJobStatus(data.job_id, "completed", `${data.duration_ms}ms`);
      break;
    case "cron.failed":
      setJobStatus(data.job_id, "failed", data.error);
      alertOnFailure(data.job_id, data.error);
      break;
  }
};

Daemon Event-Driven Automation

The daemon's event system enables environment-aware automation. The daemon watches the world and emits events that can trigger agent actions:

Context-Aware Responses

The daemon detects when you start a programming session and can proactively offer help:

  • ProgrammingSessionStarted — Detected IDE launch + file changes in a project directory
  • ActivityChanged — Transitions between coding, browsing, meetings, etc.
  • IdleStateChanged — User went idle (opportunity for background tasks)
  • ResourcePressureChanged — System running low on memory or battery

Time-Based Triggers

The daemon evaluates cron expressions and interval timers, emitting TimeEvent events:

  • Cron { expression } — Matched a cron schedule
  • Interval { seconds } — Interval timer tick
  • Absolute { time } — Reached a specific timestamp

These time events are what drive the cron job system internally — the daemon fires the trigger, and the cron scheduler picks it up for execution.

Design Principles

  1. Events are immutable facts — Once emitted, events are never modified. They represent something that happened.
  2. Fire-and-forget delivery — Publishers do not wait for consumers. This keeps the agent loop fast.
  3. No guaranteed delivery — Events can be lost if no subscriber is listening or if the buffer overflows. Design consumers accordingly.
  4. Typed internally, stringly externally — The core uses Rust enums for compile-time safety. The gateway maps them to dot-separated topic strings for language-agnostic consumption.
  5. Sequence ordering — Each event carries a monotonically increasing sequence number for causal ordering within a single Aleph instance.

See Also

  • events.* RPC Methods — Full RPC API specification for event subscriptions
  • Hooks — React to lifecycle events with shell commands and prompts
  • Cron Jobs — Schedule recurring tasks that emit cron events
  • Gateway Protocol — WebSocket transport and event delivery format

On this page