Aleph
Concepts

Task Scheduling

Lane-based sub-agent scheduling with resource isolation, anti-starvation, recursion limits, cron jobs, and heartbeat monitoring.

The scheduler and tasks modules handle task execution scheduling in Aleph. The scheduler manages sub-agent execution with resource constraints, while the tasks module provides time-based (cron) and health-check (heartbeat) job scheduling.

Design Philosophy

The scheduling system follows three principles:

  1. Resource isolation — Each "lane" of execution has independent quotas and concurrency limits
  2. Anti-starvation — Long-waiting jobs receive priority boosts to prevent indefinite blocking
  3. Recursion safety — Sub-agent spawning is tracked and limited to prevent infinite recursion

Lane-Based Scheduler

The LaneScheduler provides structured concurrency for sub-agent execution:

┌─────────────────────────────────────────────┐
│           LaneScheduler                      │
├─────────────────────────────────────────────┤
│                                             │
│  Lane A ( foreground )  max=3, quota=100    │
│    ├─ Run 1 (running)                       │
│    ├─ Run 2 (queued)                        │
│    └─ Run 3 (queued)                        │
│                                             │
│  Lane B ( background )  max=1, quota=50     │
│    └─ Run 4 (running)                       │
│                                             │
│  Global limit: 5 concurrent runs            │
└─────────────────────────────────────────────┘

Lane

An isolated execution lane with its own quota and concurrency limit:

pub struct Lane {
    name: String,
    max_concurrent: usize,
    quota: LaneQuota,
}

LaneScheduler

pub struct LaneScheduler {
    lanes: Vec<Lane>,
    global_max: usize,
    recursion_tracker: RecursionTracker,
}

impl LaneScheduler {
    pub async fn schedule(
        &self,
        run: QueuedRun,
    ) -> Result<ScheduleGuard> { /* ... */ }

    pub fn stats(&self,
    ) -> SchedulerStats { /* ... */ }
}

ScheduleGuard uses RAII — when dropped, the run's permit is automatically released. The scheduler uses std::mem::forget(permit) + ScheduleGuard for manual control, which is correct but requires care: early returns between forget and guard construction would leak permits.

Anti-Starvation

pub struct WaitTimeTracker;

impl WaitTimeTracker {
    pub fn sweep_anti_starvation(
        &self,
        lanes: &mut [LaneState],
    ) { /* ... */ }
}

Jobs waiting longer than a threshold receive priority boosts. Note: boosts apply per-lane, so new jobs in a boosted lane also benefit.

Recursion Tracking

pub struct RecursionTracker {
    max_depth: usize,
}

Tracks sub-agent spawn depth. If a sub-agent tries to spawn another sub-agent beyond max_depth, the request is blocked.

LaneState

pub struct LaneState {
    queued: VecDeque<QueuedRun>,
    running: HashMap<RunId, RunningRun>,
}

Integer safety: Wait time calculations use .max(0) to prevent clock skew underflow, and .min(10) before as i8 to prevent overflow.


Cron Tasks

The tasks::cron module schedules recurring jobs:

pub struct CronService {
    db: Connection,
    clock: Arc<dyn Clock>,
}

impl CronService {
    pub async fn create_job(
        &self,
        config: JobConfig,
    ) -> Result<JobId> { /* ... */ }

    pub async fn list_jobs(&self,
    ) -> Result<Vec<Job>> { /* ... */ }

    pub async fn delete_job(
        &self,
        id: JobId,
    ) -> Result<()> { /* ... */ }
}

Features:

  • Cron expression parsing (standard Unix cron syntax)
  • Job execution history
  • Timeout handling
  • Three-phase concurrency: schedule → execute → cleanup (minimizes lock hold time)

Clock abstraction: Uses a Clock trait with FakeClock for testability.

JobRun

pub struct JobRun {
    started_at: i64,
    completed_at: Option<i64>,
}

impl JobRun {
    pub fn success(self) -> Self {
        let now = self.clock.now();
        let duration_ms = now
            .saturating_sub(self.started_at)
            .saturating_mul(1000);
        // ...
    }
}

Integer safety: Uses saturating_sub and saturating_mul to prevent clock skew underflow.


Heartbeat Tasks

The tasks::heartbeat module monitors external services:

pub struct HeartbeatService {
    intervals: Vec<HeartbeatInterval>,
}

impl HeartbeatService {
    pub async fn check_all(
        &self,
    ) -> Vec<HeartbeatResult> { /* ... */ }
}

Features:

  • URL health checking
  • Response time tracking
  • Deduplication (avoids redundant checks)
  • Configurable intervals and timeouts

Task Infrastructure

The tasks::shared module provides cross-cutting infrastructure:

  • Clock trait — Abstracts time for testability
  • Schedule — Schedule parsing and next-run calculation
  • Delivery — Task result delivery mechanisms
  • Store — Persistence layer for task state

Safety Properties

  • No underflow — All time calculations use saturating_sub/saturating_mul
  • Semaphore safetywas_running check prevents permit inflation
  • No lock issues — Uses unwrap_or_else(|e| e.into_inner())
  • No SQL injection — All queries use parameterized params![]
  • UTF-8 safetruncate_string() uses char_indices()
  • No static mut — Uses crate::sync_primitives::Arc

Code Location

Scheduler:

  • src/scheduler/mod.rs — Module entry point
  • src/scheduler/lane_scheduler.rs — Main scheduler
  • src/scheduler/lane.rs — Lane definition
  • src/scheduler/lane_config.rs — Quota and config
  • src/scheduler/lane_state.rs — Queue and running state
  • src/scheduler/anti_starvation.rs — Priority boosting
  • src/scheduler/recursion_tracker.rs — Depth tracking

Tasks:

  • src/tasks/mod.rs — Module entry point
  • src/tasks/cron/ — Cron job scheduling
  • src/tasks/heartbeat/ — Health check monitoring
  • src/tasks/shared/ — Clock, schedule, delivery, store

See Also

  • Agent Runtime — How sub-agents are spawned and managed
  • Arena — Multi-agent collaboration workspace

On this page