This project provides a production-leaning orchestrator designed for multi-stage LLM pipelines over Tokio, implementing bounded channels and backpressure management. With features like pluggable workers and metrics hooks, it offers a robust solution for handling complex tasks while maintaining resource efficiency and performance.
tokio-prompt-orchestrator is a sophisticated orchestrator designed for managing multi-stage LLM (Large Language Model) pipelines utilizing Tokio. It efficiently incorporates features such as bounded channels, backpressure management, and metrics hooks to enhance pipeline performance and reliability.
Architecture Overview
The orchestrator follows a well-defined five-stage pipeline architecture that operates with bounded channels to optimize resource management:
PromptRequest → RAG(512) → Assemble(512) → Inference(1024) → Post(512) → Stream(256)
↓ ↓ ↓ ↓ ↓
5ms delay format ModelWorker join tokens emit
Pipeline Stages
The orchestrator is built around the following key stages:
- RAG (Retrieval-Augmented Generation): Handles document retrieval and context injection.
- Assemble: Constructs prompts using templates and few-shot examples.
- Inference: Executes the LLM via the
ModelWorkertrait. - Post: Conducts post-processing tasks, including formatting, safety checks, and PII redaction.
- Stream: Manages output emission through various protocols such as SSE, WebSocket, and gRPC.
Key Features
- Bounded Channels: Configurable buffer sizes help prevent memory overflow, ensuring efficient memory use.
- Backpressure: Implements a strategy of graceful shedding on queue overload, maintaining system stability.
- Session Affinity: Employs hash-based sharding, enabling depth optimization with per-core pinning (planned for future development).
- Pluggable Workers: The
ModelWorkertrait allows for integration with any inference backend, facilitating flexibility. - Metrics Hooks: Supports tracing, with the ability to switch to Prometheus/OTLP metrics through feature flags.
- Clean Shutdown: Enables graceful drain procedures on closure of input channels, enhancing reliability.
Usage Example
Implementing a basic usage of the orchestrator in Rust is straightforward:
use std::sync::Arc;
use tokio_prompt_orchestrator::{EchoWorker, ModelWorker, PromptRequest, SessionId, spawn_pipeline};
#[tokio::main]
async fn main() {
let worker: Arc<dyn ModelWorker> = Arc::new(EchoWorker::new());
let handles = spawn_pipeline(worker);
let request = PromptRequest {
session: SessionId::new("session-1"),
input: "Hello, world!".to_string(),
meta: Default::default(),
};
handles.input_tx.send(request).await.unwrap();
drop(handles.input_tx);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Custom Model Worker Implementation
Custom inference logic can easily be integrated by defining a new worker:
use async_trait::async_trait;
use tokio_prompt_orchestrator::{ModelWorker, OrchestratorError};
struct MyCustomWorker {
// Your model state
}
#[async_trait]
impl ModelWorker for MyCustomWorker {
async fn infer(&self, prompt: &str) -> Result<Vec<String>, OrchestratorError> {
// Your inference logic
Ok(vec!["token1".to_string(), "token2".to_string()])
}
}
Performance Considerations
Key performance aspects include:
- Channel Sizing: Optimized for balanced throughput to ensure effective resource allocation across stages.
- Session Affinity: Utilizes a function to pin tasks to specific cores, reducing cache misses and improving overall efficiency.
Future Roadmap includes enhancements such as gRPC support, distributed scaling, advanced features like batch processing, and more, intended to elevate performance and capabilities further.
For more details, refer to the complete README.
No comments yet.
Sign in to be the first to comment.