Task Scheduling
Lane-based sub-agent scheduling with resource isolation, anti-starvation, recursion limits, cron jobs, and heartbeat monitoring.
The scheduler and tasks modules handle task execution scheduling in Aleph. The scheduler manages sub-agent execution with resource constraints, while the tasks module provides time-based (cron) and health-check (heartbeat) job scheduling.
Design Philosophy
The scheduling system follows three principles:
- Resource isolation — Each "lane" of execution has independent quotas and concurrency limits
- Anti-starvation — Long-waiting jobs receive priority boosts to prevent indefinite blocking
- Recursion safety — Sub-agent spawning is tracked and limited to prevent infinite recursion
Lane-Based Scheduler
The LaneScheduler provides structured concurrency for sub-agent execution:
┌─────────────────────────────────────────────┐
│ LaneScheduler │
├─────────────────────────────────────────────┤
│ │
│ Lane A ( foreground ) max=3, quota=100 │
│ ├─ Run 1 (running) │
│ ├─ Run 2 (queued) │
│ └─ Run 3 (queued) │
│ │
│ Lane B ( background ) max=1, quota=50 │
│ └─ Run 4 (running) │
│ │
│ Global limit: 5 concurrent runs │
└─────────────────────────────────────────────┘Lane
An isolated execution lane with its own quota and concurrency limit:
pub struct Lane {
name: String,
max_concurrent: usize,
quota: LaneQuota,
}LaneScheduler
pub struct LaneScheduler {
lanes: Vec<Lane>,
global_max: usize,
recursion_tracker: RecursionTracker,
}
impl LaneScheduler {
pub async fn schedule(
&self,
run: QueuedRun,
) -> Result<ScheduleGuard> { /* ... */ }
pub fn stats(&self,
) -> SchedulerStats { /* ... */ }
}ScheduleGuard uses RAII — when dropped, the run's permit is automatically released. The scheduler uses std::mem::forget(permit) + ScheduleGuard for manual control, which is correct but requires care: early returns between forget and guard construction would leak permits.
Anti-Starvation
pub struct WaitTimeTracker;
impl WaitTimeTracker {
pub fn sweep_anti_starvation(
&self,
lanes: &mut [LaneState],
) { /* ... */ }
}Jobs waiting longer than a threshold receive priority boosts. Note: boosts apply per-lane, so new jobs in a boosted lane also benefit.
Recursion Tracking
pub struct RecursionTracker {
max_depth: usize,
}Tracks sub-agent spawn depth. If a sub-agent tries to spawn another sub-agent beyond max_depth, the request is blocked.
LaneState
pub struct LaneState {
queued: VecDeque<QueuedRun>,
running: HashMap<RunId, RunningRun>,
}Integer safety: Wait time calculations use .max(0) to prevent clock skew underflow, and .min(10) before as i8 to prevent overflow.
Cron Tasks
The tasks::cron module schedules recurring jobs:
pub struct CronService {
db: Connection,
clock: Arc<dyn Clock>,
}
impl CronService {
pub async fn create_job(
&self,
config: JobConfig,
) -> Result<JobId> { /* ... */ }
pub async fn list_jobs(&self,
) -> Result<Vec<Job>> { /* ... */ }
pub async fn delete_job(
&self,
id: JobId,
) -> Result<()> { /* ... */ }
}Features:
- Cron expression parsing (standard Unix cron syntax)
- Job execution history
- Timeout handling
- Three-phase concurrency: schedule → execute → cleanup (minimizes lock hold time)
Clock abstraction: Uses a Clock trait with FakeClock for testability.
JobRun
pub struct JobRun {
started_at: i64,
completed_at: Option<i64>,
}
impl JobRun {
pub fn success(self) -> Self {
let now = self.clock.now();
let duration_ms = now
.saturating_sub(self.started_at)
.saturating_mul(1000);
// ...
}
}Integer safety: Uses saturating_sub and saturating_mul to prevent clock skew underflow.
Heartbeat Tasks
The tasks::heartbeat module monitors external services:
pub struct HeartbeatService {
intervals: Vec<HeartbeatInterval>,
}
impl HeartbeatService {
pub async fn check_all(
&self,
) -> Vec<HeartbeatResult> { /* ... */ }
}Features:
- URL health checking
- Response time tracking
- Deduplication (avoids redundant checks)
- Configurable intervals and timeouts
Task Infrastructure
The tasks::shared module provides cross-cutting infrastructure:
Clocktrait — Abstracts time for testabilitySchedule— Schedule parsing and next-run calculationDelivery— Task result delivery mechanismsStore— Persistence layer for task state
Safety Properties
- No underflow — All time calculations use
saturating_sub/saturating_mul - Semaphore safety —
was_runningcheck prevents permit inflation - No lock issues — Uses
unwrap_or_else(|e| e.into_inner()) - No SQL injection — All queries use parameterized
params![] - UTF-8 safe —
truncate_string()useschar_indices() - No
static mut— Usescrate::sync_primitives::Arc
Code Location
Scheduler:
src/scheduler/mod.rs— Module entry pointsrc/scheduler/lane_scheduler.rs— Main schedulersrc/scheduler/lane.rs— Lane definitionsrc/scheduler/lane_config.rs— Quota and configsrc/scheduler/lane_state.rs— Queue and running statesrc/scheduler/anti_starvation.rs— Priority boostingsrc/scheduler/recursion_tracker.rs— Depth tracking
Tasks:
src/tasks/mod.rs— Module entry pointsrc/tasks/cron/— Cron job schedulingsrc/tasks/heartbeat/— Health check monitoringsrc/tasks/shared/— Clock, schedule, delivery, store
See Also
- Agent Runtime — How sub-agents are spawned and managed
- Arena — Multi-agent collaboration workspace