The Coordinator
Overview
The Coordinator serves as the central control plane for the Strata runtime. Built using Tonic (gRPC) and Axum (HTTP), it manages the lifecycle of distributed workers, orchestrates data shard assignments, and maintains global synchronization via barriers.
It is designed for high-throughput environments, capable of handling over 10,000 requests per second with sub-50ms latency for barrier synchronization across hundreds of nodes.
Key Responsibilities
- Worker Lifecycle Management: Handles registration, health monitoring via heartbeats, and automatic failure detection.
- Data Orchestration: Interface for dataset registration and consistent-hash-based shard assignment.
- Global Synchronization: Manages distributed barriers to ensure workers stay synchronized during training steps.
- State Persistence: Coordinates distributed checkpointing and interacts with the storage backends (S3/Local).
- Observability: Exposes a REST API to power the real-time monitoring dashboard.
Worker Lifecycle & Heartbeats
Workers interact with the Coordinator primarily through a gRPC stream. The lifecycle is state-driven:
| State | Description |
| :--- | :--- |
| Initializing | Worker has registered but is not yet ready for work. |
| Idle | Worker is active and awaiting a task or shard assignment. |
| LoadingData | Worker is currently fetching/sharding data. |
| Training | Worker is actively processing training batches. |
| Checkpointing | Worker is persisting state to the configured storage backend. |
| Recovering | Worker is restoring state after a failure. |
Heartbeat Mechanism
Workers must send heartbeats at regular intervals (default: 5 seconds). If a worker fails to send a heartbeat within the configured timeout, the Coordinator marks the worker as Failed and triggers a Shard Rebalance to redistribute its workload to healthy nodes.
The gRPC Interface
The primary communication channel for workers is defined in the coordinator.proto. Key services include:
RegisterWorker
Initializes the connection between a node and the control plane.
- Input:
WorkerConfig(ID, IP, GPU count, etc.) - Output:
WorkerInfo(Assigned ID, heartbeat intervals)
Heartbeat
Maintains the session and reports resource utilization.
- Input:
HeartbeatRequest(Current step, epoch, CPU/GPU usage, current task) - Output:
HeartbeatResponse(Command to stop, pause, or reconfigure)
GetShards
Requests data assignments for a specific dataset and epoch.
- Input:
ShardRequest(Dataset ID, Worker ID, Epoch) - Output:
ShardAssignment(List of shard IDs, start/end indices, file paths)
WaitAtBarrier
Synchronizes workers at specific training boundaries.
- Input:
BarrierRequest(Barrier ID, Worker ID, Step) - Output:
BarrierResponse(Released status, arrival order)
The Dashboard API (REST)
The Coordinator exposes an asynchronous HTTP API (defaulting to port 3000) for management and visualization.
| Endpoint | Method | Description |
| :--- | :--- | :--- |
| /api/status | GET | Returns coordinator uptime and version. |
| /api/workers | GET | Lists all registered workers and their current metrics. |
| /api/datasets | GET | Returns metadata for all registered training sets. |
| /api/checkpoints | GET | Provides a history of saved model states. |
| /api/tasks | POST | Dispatches a new training task to available workers. |
| /api/dashboard | GET | Aggregated state for the React frontend. |
Usage Example
To start the Coordinator within a Rust application:
use coordinator::{CoordinatorServer, CoordinatorService};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Initialize the service with custom heartbeat timeouts
let service = CoordinatorService::with_config(
checkpoint::CheckpointManagerConfig::default(),
1000, // Max workers
Duration::from_secs(30), // Heartbeat timeout
).await?;
// 2. Start the gRPC + HTTP server
// This hosts the gRPC endpoint for workers and the REST API for the dashboard
let addr = "0.0.0.0:50051";
println!("Coordinator listening on {}", addr);
CoordinatorServer::new(addr, service).run().await?;
Ok(())
}
Configuration
The Coordinator behavior is influenced by the following environment variables:
| Variable | Type | Default | Description |
| :--- | :--- | :--- | :--- |
| STORAGE_BACKEND | String | local | local or s3 for checkpoint storage. |
| CHECKPOINT_BUCKET | String | - | S3 bucket name (required if s3 is selected). |
| HEARTBEAT_INTERVAL | Integer | 5000 | Frequency in milliseconds for worker check-ins. |
| RUST_LOG | String | info | Logging verbosity (debug, info, warn, error). |