From 33a4420542827d05dd15c6998b02517b4b9ffab3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Te=C3=AFlo=20M?= Date: Fri, 3 Jan 2025 00:17:12 +0100 Subject: [PATCH] queue: improve documentation and fix context key type safety --- ROADMAP.md | 11 +- cmd/hapax/main.go | 2 +- server/middleware/queue.go | 174 ++++++++++++++++++++++++++++---- server/middleware/queue_test.go | 2 +- 4 files changed, 157 insertions(+), 32 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 8c5c460..4de7b12 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -7,16 +7,7 @@ Build a production-grade LLM gateway that makes deploying and managing LLM infra Focus: Enhance reliability, scalability, and deployability for production environments. ### Performance & Operations -- [ ] Request queueing - - Queue size configuration with dynamic adjustment - - Priority queuing based on client tiers - - Queue metrics with Prometheus integration - - Backpressure handling with client feedback - - Queue persistence across restarts - - Queue cleanup and maintenance - - Timeout handling for queued requests - - Maximum queue time configuration - +- [x] Request queueing - [ ] QUIC Implementation - Integration with quic-go library - HTTP/3 support for improved latency diff --git a/cmd/hapax/main.go b/cmd/hapax/main.go index 2e9cf6f..92254bd 100644 --- a/cmd/hapax/main.go +++ b/cmd/hapax/main.go @@ -20,7 +20,7 @@ var ( version = flag.Bool("version", false, "Print version and exit") ) -const Version = "v0.0.22" +const Version = "v0.0.23" func main() { flag.Parse() diff --git a/server/middleware/queue.go b/server/middleware/queue.go index b187788..a359a4a 100644 --- a/server/middleware/queue.go +++ b/server/middleware/queue.go @@ -14,31 +14,90 @@ import ( "github.com/teilomillet/hapax/server/metrics" ) -// QueueState represents the persistent state of the queue -type QueueState struct { - MaxSize int64 `json:"max_size"` - QueueLength int `json:"queue_length"` - LastSaved time.Time `json:"last_saved"` -} +// queueContextKey is a custom type for queue-specific context keys to avoid collisions +type queueContextKey string + +// Queue-specific context keys +const ( + queuePositionKey queueContextKey = "queue_position" +) +// QueueMiddleware implements a request queue with built-in self-cleaning capabilities. +// Core Design: +// 1. Request Lifecycle: +// - Incoming requests are added to a FIFO queue if space is available +// - Each queued request gets a channel that signals its completion +// - When a request completes, its resources are automatically cleaned up +// - Queue position is passed to request context for tracking +// +// 2. Self-Cleaning Mechanisms: +// - Channel-based: Each request's done channel is closed on completion +// - Defer-based: Resources are released even if request panics +// - Queue-based: Completed requests are removed from queue automatically +// - Memory-based: Go's GC reclaims unused resources +// +// 3. Thread Safety: +// - RWMutex protects queue operations (add/remove) +// - Atomic operations for counters (maxSize, processing) +// - Channel-based synchronization for request completion +// +// 4. Health Monitoring: +// - Tracks active requests (queued vs processing) +// - Measures queue wait times and request duration +// - Counts errors (queue full, persistence failures) +// - Monitors queue size against configured maximum +// +// 5. State Persistence: +// - Periodic saves of queue state if configured +// - Atomic file operations prevent corruption +// - Automatic recovery on restart type QueueMiddleware struct { - queue *queue.Queue[chan struct{}] - maxSize atomic.Int64 - mu sync.RWMutex - processing int32 - metrics *metrics.Metrics - statePath string - persistTicker *time.Ticker - done chan struct{} + queue *queue.Queue[chan struct{}] // FIFO queue holding channels that signal request completion + maxSize atomic.Int64 // Maximum queue size, updated atomically + mu sync.RWMutex // Protects queue operations + processing int32 // Count of requests being processed + metrics *metrics.Metrics // Prometheus metrics for monitoring + statePath string // Path for state persistence + persistTicker *time.Ticker // Timer for state saves + done chan struct{} // Signals shutdown } +// QueueState represents the persistent state of the queue that can be saved and restored. +// This enables the queue to maintain its configuration across server restarts. +// Fields: +// - MaxSize: Maximum number of requests allowed in queue +// - QueueLength: Number of requests in queue at time of save +// - LastSaved: Timestamp of last successful save +type QueueState struct { + MaxSize int64 `json:"max_size"` // Maximum allowed queue size + QueueLength int `json:"queue_length"` // Current number of items in queue + LastSaved time.Time `json:"last_saved"` // Timestamp of last save +} + +// QueueConfig defines the operational parameters for the queue middleware. +// These settings control the queue's behavior, capacity, and persistence strategy. +// Fields: +// - InitialSize: Starting maximum queue size if no saved state +// - Metrics: Prometheus metrics collector for monitoring +// - StatePath: File path for state persistence (empty = no persistence) +// - SaveInterval: Frequency of state saves (0 = no periodic saves) type QueueConfig struct { - InitialSize int64 - Metrics *metrics.Metrics - StatePath string // Path to store queue state - SaveInterval time.Duration // How often to save state (0 means no persistence) + InitialSize int64 // Starting maximum queue size + Metrics *metrics.Metrics // Metrics collector for monitoring + StatePath string // Path to store queue state, empty disables persistence + SaveInterval time.Duration // How often to save state (0 means no persistence) } +// NewQueueMiddleware initializes a new queue middleware with the given configuration. +// Initialization Process: +// 1. Creates queue data structures and channels +// 2. Attempts to restore previous state if persistence enabled +// 3. Starts background state persistence if configured +// 4. Initializes metrics collection +// +// The queue begins accepting requests immediately after initialization. +// If state persistence is enabled, it will attempt to restore the previous +// configuration, falling back to InitialSize if no state exists. func NewQueueMiddleware(cfg QueueConfig) *QueueMiddleware { qm := &QueueMiddleware{ queue: queue.New[chan struct{}](), @@ -66,6 +125,14 @@ func NewQueueMiddleware(cfg QueueConfig) *QueueMiddleware { return qm } +// loadState attempts to restore the queue's previous state from disk. +// Recovery Process: +// 1. Verifies storage directory exists/is accessible +// 2. Reads and validates stored state file +// 3. Restores previous queue configuration +// +// If the state file doesn't exist or is invalid, the queue will +// use its default configuration without returning an error. func (qm *QueueMiddleware) loadState() error { if qm.statePath == "" { return nil @@ -90,6 +157,14 @@ func (qm *QueueMiddleware) loadState() error { return nil } +// saveState persists the current queue state to disk atomically. +// Save Process: +// 1. Captures current queue metrics under read lock +// 2. Serializes state to temporary file +// 3. Atomically replaces old state file +// +// This method ensures state file consistency by using atomic +// file operations, preventing corruption during saves. func (qm *QueueMiddleware) saveState() error { if qm.statePath == "" { return nil @@ -121,6 +196,15 @@ func (qm *QueueMiddleware) saveState() error { return os.Rename(tmpFile, qm.statePath) } +// persistStateRoutine manages periodic state persistence. +// Operation: +// 1. Saves state at configured intervals +// 2. Handles persistence errors with metrics +// 3. Performs final save on shutdown +// +// This routine runs in the background until shutdown is signaled +// via the done channel. Errors during saves are recorded in metrics +// but don't stop the routine. func (qm *QueueMiddleware) persistStateRoutine() { for { select { @@ -142,6 +226,15 @@ func (qm *QueueMiddleware) persistStateRoutine() { } } +// Shutdown initiates a graceful shutdown of the queue middleware. +// Shutdown Process: +// 1. Signals shutdown via done channel +// 2. Stops periodic state persistence +// 3. Waits for queued requests to complete (with timeout) +// 4. Performs final state save +// +// The shutdown will timeout if requests don't complete within +// 5 seconds, recording a metric and returning an error. func (qm *QueueMiddleware) Shutdown(ctx context.Context) error { // Only close the done channel once select { @@ -183,28 +276,69 @@ func (qm *QueueMiddleware) Shutdown(ctx context.Context) error { return nil } +// SetMaxSize updates the maximum number of requests allowed in the queue. +// Update Process: +// 1. Atomically updates size limit +// 2. Triggers async state save if persistence enabled +// +// This is a thread-safe operation that takes effect immediately. +// New requests will be rejected if queue length reaches the new limit. func (qm *QueueMiddleware) SetMaxSize(size int64) { qm.maxSize.Store(size) // Save state after size change if qm.statePath != "" { - go qm.saveState() // Non-blocking save + go func() { + if err := qm.saveState(); err != nil && qm.metrics != nil { + qm.metrics.ErrorsTotal.WithLabelValues("queue_persistence").Inc() + } + }() } } +// GetQueueSize returns the current queue length. +// Thread-safe operation protected by mutex. func (qm *QueueMiddleware) GetQueueSize() int { qm.mu.RLock() defer qm.mu.RUnlock() return qm.queue.Length() } +// GetMaxSize returns the current maximum queue size. +// Thread-safe operation using atomic load. func (qm *QueueMiddleware) GetMaxSize() int64 { return qm.maxSize.Load() } +// GetProcessing returns the number of requests currently being processed. +// Thread-safe operation using atomic load. func (qm *QueueMiddleware) GetProcessing() int32 { return atomic.LoadInt32(&qm.processing) } +// Handler manages the request lifecycle through the queue. +// Request Flow: +// 1. Queue Check: +// - Verifies space available in queue +// - Rejects request if queue full +// +// 2. Request Queuing: +// - Creates completion channel +// - Adds request to queue +// - Updates queue metrics +// +// 3. Request Processing: +// - Tracks processing state +// - Passes queue position to request context +// - Forwards request to next handler +// +// 4. Automatic Cleanup: +// - Removes request from queue +// - Updates metrics +// - Closes completion channel +// - Records timing metrics +// +// All operations are thread-safe and self-cleaning through +// defer blocks and channel-based synchronization. func (qm *QueueMiddleware) Handler(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() @@ -257,6 +391,6 @@ func (qm *QueueMiddleware) Handler(next http.Handler) http.Handler { } }() - next.ServeHTTP(w, r.WithContext(context.WithValue(r.Context(), "queue_position", currentSize))) + next.ServeHTTP(w, r.WithContext(context.WithValue(r.Context(), queuePositionKey, currentSize))) }) } diff --git a/server/middleware/queue_test.go b/server/middleware/queue_test.go index 266743c..b712fb5 100644 --- a/server/middleware/queue_test.go +++ b/server/middleware/queue_test.go @@ -230,7 +230,7 @@ func TestQueueMiddleware(t *testing.T) { var mu sync.Mutex handler := qm.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - pos := r.Context().Value("queue_position").(int) + pos := r.Context().Value(queuePositionKey).(int) mu.Lock() positions = append(positions, pos) mu.Unlock()