Event System
Event-driven architecture with EventBus, typed events, filters, and handlers. Decouples subsystems via asynchronous event passing.
The event module implements an event-driven architecture that decouples Aleph's subsystems. Instead of direct method calls between modules, components publish events to a central bus and subscribe to the events they care about.
Design Philosophy
Event-driven architecture provides three benefits in Aleph:
- Decoupling — The Memory system doesn't need to know about the Gateway; both just use events
- Observability — Every significant action produces an event that can be logged, audited, or streamed to clients
- Extensibility — New features can subscribe to existing events without modifying publishers
The EventBus is a singleton (initialized via Lazy) that lives for the lifetime of the process. All event handling is asynchronous and non-blocking.
Architecture
┌─────────────────────────────────────────┐
│ EventBus (Singleton) │
│ ┌─────────────┐ ┌──────────────────┐ │
│ │ EventQueue │ │ HandlerRegistry │ │
│ └─────────────┘ └──────────────────┘ │
└─────────────────────────────────────────┘
│ ▲
│ publish(Event) │ dispatch(Event)
▼ │
┌──────────┐ ┌──────────────┐
│ Publisher │ │ Handler │
│ (any │ │ (implements │
│ module) │ │ EventHandler)│
└──────────┘ └──────────────┘Core Components
EventBus
The central event distribution system.
// Singleton access
let bus = event::bus();
bus.publish(SessionCreated { session_id: id }).await?;The EventBus:
- Maintains a typed event queue per event type
- Dispatches events to all registered handlers asynchronously
- Never blocks publishers — events are queued and processed concurrently
EventHandler Trait
#[async_trait]
pub trait EventHandler<E: Event>: Send + Sync {
async fn handle(&self, event: &E) -> Result<(), EventError>;
}Implementors receive events of a specific type and perform side effects:
- Logging handlers write to disk
- WebSocket handlers forward to connected clients
- Memory handlers update the knowledge graph
EventFilter
Selective subscription — handlers only receive events matching their filter.
pub enum EventFilter {
All, // Receive all events
Type(EventType), // Only specific event types
Source(String), // Only from specific source module
Custom(Box<dyn Fn(&Event) -> bool>),
}Example: The Gateway's WebSocket handler filters for TraceEvent types to stream agent execution traces to the Panel UI.
Event Types
Core Events
| Event | Description | Published By |
|---|---|---|
SessionCreated | New session initialized | SessionService |
MessageReceived | User message arrived | Gateway |
ToolExecuted | Tool finished execution | Engine |
MemoryUpdated | Fact added or invalidated | Memory |
ConfigReloaded | Configuration hot-reloaded | Config watcher |
Event Sequencing
Every event carries a monotonic sequence number:
pub struct EventMetadata {
pub sequence: u64, // Global ordering
pub timestamp: i64, // Unix millis
pub source: String, // Module name
}Sequence numbers enable:
- Replay of events for debugging
- Deterministic state reconstruction
- Ordering guarantees across distributed handlers
Permissions and Safety
EventPermission
Not all handlers can see all events. The Permission layer checks handler authorization:
pub enum EventPermission {
Owner, // Full access
Guest, // Only events for their session
System, // Internal events only
}Example: A guest's WebSocket connection only receives events for their own session, preventing cross-session information leakage.
EventQuestion
For sensitive operations, the EventBus can pause and ask for confirmation:
pub enum EventQuestion {
Allow, // Proceed with the event
Deny, // Drop the event
Modify(Event), // Replace with modified event
}Example: Before executing a bash_exec tool, the system publishes a ToolExecutionRequested event. If the Approval module denies it, the execution is cancelled before any harm is done.
Usage Example
use aleph::event::{EventBus, EventHandler, Event};
// Define an event
#[derive(Clone, Debug)]
pub struct MemoryCreated {
pub fact_id: String,
pub content: String,
}
impl Event for MemoryCreated {}
// Implement a handler
pub struct MemoryLogger;
#[async_trait]
impl EventHandler<MemoryCreated> for MemoryLogger {
async fn handle(&self, event: &MemoryCreated) -> Result<(), EventError> {
tracing::info!("Memory created: {}", event.fact_id);
Ok(())
}
}
// Register and publish
let bus = event::bus();
bus.register(MemoryLogger).await?;
bus.publish(MemoryCreated {
fact_id: "fact-123".into(),
content: "User prefers dark mode".into(),
}).await?;Code Location
src/event/mod.rs— EventBus and core typessrc/event/handler.rs— EventHandler traitsrc/event/filter.rs— EventFilter implementationssrc/event/permission.rs— Permission checkssrc/event/question.rs— Interactive confirmation
See Also
- Gateway — How events are forwarded to WebSocket clients
- Session Service — Session lifecycle events
- Memory System — Memory update events
Messages
How messages flow through Aleph from any interface to the agent and back, including message types, routing rules, session threading, and persistence.
Intent Detection
Three-layer intent classification and execution routing that determines whether user input triggers task execution or conversational response.