Skip to content

Commit

Permalink
queue: improve documentation and fix context key type safety
Browse files Browse the repository at this point in the history
  • Loading branch information
teilomillet committed Jan 2, 2025
1 parent 79323c8 commit 33a4420
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 32 deletions.
11 changes: 1 addition & 10 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,7 @@ Build a production-grade LLM gateway that makes deploying and managing LLM infra
Focus: Enhance reliability, scalability, and deployability for production environments.

### Performance & Operations
- [ ] Request queueing
- Queue size configuration with dynamic adjustment
- Priority queuing based on client tiers
- Queue metrics with Prometheus integration
- Backpressure handling with client feedback
- Queue persistence across restarts
- Queue cleanup and maintenance
- Timeout handling for queued requests
- Maximum queue time configuration

- [x] Request queueing
- [ ] QUIC Implementation
- Integration with quic-go library
- HTTP/3 support for improved latency
Expand Down
2 changes: 1 addition & 1 deletion cmd/hapax/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
version = flag.Bool("version", false, "Print version and exit")
)

const Version = "v0.0.22"
const Version = "v0.0.23"

func main() {
flag.Parse()
Expand Down
174 changes: 154 additions & 20 deletions server/middleware/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,90 @@ import (
"github.com/teilomillet/hapax/server/metrics"
)

// QueueState represents the persistent state of the queue
type QueueState struct {
MaxSize int64 `json:"max_size"`
QueueLength int `json:"queue_length"`
LastSaved time.Time `json:"last_saved"`
}
// queueContextKey is a custom type for queue-specific context keys to avoid collisions
type queueContextKey string

// Queue-specific context keys
const (
queuePositionKey queueContextKey = "queue_position"
)

// QueueMiddleware implements a request queue with built-in self-cleaning capabilities.
// Core Design:
// 1. Request Lifecycle:
// - Incoming requests are added to a FIFO queue if space is available
// - Each queued request gets a channel that signals its completion
// - When a request completes, its resources are automatically cleaned up
// - Queue position is passed to request context for tracking
//
// 2. Self-Cleaning Mechanisms:
// - Channel-based: Each request's done channel is closed on completion
// - Defer-based: Resources are released even if request panics
// - Queue-based: Completed requests are removed from queue automatically
// - Memory-based: Go's GC reclaims unused resources
//
// 3. Thread Safety:
// - RWMutex protects queue operations (add/remove)
// - Atomic operations for counters (maxSize, processing)
// - Channel-based synchronization for request completion
//
// 4. Health Monitoring:
// - Tracks active requests (queued vs processing)
// - Measures queue wait times and request duration
// - Counts errors (queue full, persistence failures)
// - Monitors queue size against configured maximum
//
// 5. State Persistence:
// - Periodic saves of queue state if configured
// - Atomic file operations prevent corruption
// - Automatic recovery on restart
type QueueMiddleware struct {
queue *queue.Queue[chan struct{}]
maxSize atomic.Int64
mu sync.RWMutex
processing int32
metrics *metrics.Metrics
statePath string
persistTicker *time.Ticker
done chan struct{}
queue *queue.Queue[chan struct{}] // FIFO queue holding channels that signal request completion
maxSize atomic.Int64 // Maximum queue size, updated atomically
mu sync.RWMutex // Protects queue operations
processing int32 // Count of requests being processed
metrics *metrics.Metrics // Prometheus metrics for monitoring
statePath string // Path for state persistence
persistTicker *time.Ticker // Timer for state saves
done chan struct{} // Signals shutdown
}

// QueueState represents the persistent state of the queue that can be saved and restored.
// This enables the queue to maintain its configuration across server restarts.
// Fields:
// - MaxSize: Maximum number of requests allowed in queue
// - QueueLength: Number of requests in queue at time of save
// - LastSaved: Timestamp of last successful save
type QueueState struct {
MaxSize int64 `json:"max_size"` // Maximum allowed queue size
QueueLength int `json:"queue_length"` // Current number of items in queue
LastSaved time.Time `json:"last_saved"` // Timestamp of last save
}

// QueueConfig defines the operational parameters for the queue middleware.
// These settings control the queue's behavior, capacity, and persistence strategy.
// Fields:
// - InitialSize: Starting maximum queue size if no saved state
// - Metrics: Prometheus metrics collector for monitoring
// - StatePath: File path for state persistence (empty = no persistence)
// - SaveInterval: Frequency of state saves (0 = no periodic saves)
type QueueConfig struct {
InitialSize int64
Metrics *metrics.Metrics
StatePath string // Path to store queue state
SaveInterval time.Duration // How often to save state (0 means no persistence)
InitialSize int64 // Starting maximum queue size
Metrics *metrics.Metrics // Metrics collector for monitoring
StatePath string // Path to store queue state, empty disables persistence
SaveInterval time.Duration // How often to save state (0 means no persistence)
}

// NewQueueMiddleware initializes a new queue middleware with the given configuration.
// Initialization Process:
// 1. Creates queue data structures and channels
// 2. Attempts to restore previous state if persistence enabled
// 3. Starts background state persistence if configured
// 4. Initializes metrics collection
//
// The queue begins accepting requests immediately after initialization.
// If state persistence is enabled, it will attempt to restore the previous
// configuration, falling back to InitialSize if no state exists.
func NewQueueMiddleware(cfg QueueConfig) *QueueMiddleware {
qm := &QueueMiddleware{
queue: queue.New[chan struct{}](),
Expand Down Expand Up @@ -66,6 +125,14 @@ func NewQueueMiddleware(cfg QueueConfig) *QueueMiddleware {
return qm
}

// loadState attempts to restore the queue's previous state from disk.
// Recovery Process:
// 1. Verifies storage directory exists/is accessible
// 2. Reads and validates stored state file
// 3. Restores previous queue configuration
//
// If the state file doesn't exist or is invalid, the queue will
// use its default configuration without returning an error.
func (qm *QueueMiddleware) loadState() error {
if qm.statePath == "" {
return nil
Expand All @@ -90,6 +157,14 @@ func (qm *QueueMiddleware) loadState() error {
return nil
}

// saveState persists the current queue state to disk atomically.
// Save Process:
// 1. Captures current queue metrics under read lock
// 2. Serializes state to temporary file
// 3. Atomically replaces old state file
//
// This method ensures state file consistency by using atomic
// file operations, preventing corruption during saves.
func (qm *QueueMiddleware) saveState() error {
if qm.statePath == "" {
return nil
Expand Down Expand Up @@ -121,6 +196,15 @@ func (qm *QueueMiddleware) saveState() error {
return os.Rename(tmpFile, qm.statePath)
}

// persistStateRoutine manages periodic state persistence.
// Operation:
// 1. Saves state at configured intervals
// 2. Handles persistence errors with metrics
// 3. Performs final save on shutdown
//
// This routine runs in the background until shutdown is signaled
// via the done channel. Errors during saves are recorded in metrics
// but don't stop the routine.
func (qm *QueueMiddleware) persistStateRoutine() {
for {
select {
Expand All @@ -142,6 +226,15 @@ func (qm *QueueMiddleware) persistStateRoutine() {
}
}

// Shutdown initiates a graceful shutdown of the queue middleware.
// Shutdown Process:
// 1. Signals shutdown via done channel
// 2. Stops periodic state persistence
// 3. Waits for queued requests to complete (with timeout)
// 4. Performs final state save
//
// The shutdown will timeout if requests don't complete within
// 5 seconds, recording a metric and returning an error.
func (qm *QueueMiddleware) Shutdown(ctx context.Context) error {
// Only close the done channel once
select {
Expand Down Expand Up @@ -183,28 +276,69 @@ func (qm *QueueMiddleware) Shutdown(ctx context.Context) error {
return nil
}

// SetMaxSize updates the maximum number of requests allowed in the queue.
// Update Process:
// 1. Atomically updates size limit
// 2. Triggers async state save if persistence enabled
//
// This is a thread-safe operation that takes effect immediately.
// New requests will be rejected if queue length reaches the new limit.
func (qm *QueueMiddleware) SetMaxSize(size int64) {
qm.maxSize.Store(size)
// Save state after size change
if qm.statePath != "" {
go qm.saveState() // Non-blocking save
go func() {
if err := qm.saveState(); err != nil && qm.metrics != nil {
qm.metrics.ErrorsTotal.WithLabelValues("queue_persistence").Inc()
}
}()
}
}

// GetQueueSize returns the current queue length.
// Thread-safe operation protected by mutex.
func (qm *QueueMiddleware) GetQueueSize() int {
qm.mu.RLock()
defer qm.mu.RUnlock()
return qm.queue.Length()
}

// GetMaxSize returns the current maximum queue size.
// Thread-safe operation using atomic load.
func (qm *QueueMiddleware) GetMaxSize() int64 {
return qm.maxSize.Load()
}

// GetProcessing returns the number of requests currently being processed.
// Thread-safe operation using atomic load.
func (qm *QueueMiddleware) GetProcessing() int32 {
return atomic.LoadInt32(&qm.processing)
}

// Handler manages the request lifecycle through the queue.
// Request Flow:
// 1. Queue Check:
// - Verifies space available in queue
// - Rejects request if queue full
//
// 2. Request Queuing:
// - Creates completion channel
// - Adds request to queue
// - Updates queue metrics
//
// 3. Request Processing:
// - Tracks processing state
// - Passes queue position to request context
// - Forwards request to next handler
//
// 4. Automatic Cleanup:
// - Removes request from queue
// - Updates metrics
// - Closes completion channel
// - Records timing metrics
//
// All operations are thread-safe and self-cleaning through
// defer blocks and channel-based synchronization.
func (qm *QueueMiddleware) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
Expand Down Expand Up @@ -257,6 +391,6 @@ func (qm *QueueMiddleware) Handler(next http.Handler) http.Handler {
}
}()

next.ServeHTTP(w, r.WithContext(context.WithValue(r.Context(), "queue_position", currentSize)))
next.ServeHTTP(w, r.WithContext(context.WithValue(r.Context(), queuePositionKey, currentSize)))
})
}
2 changes: 1 addition & 1 deletion server/middleware/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestQueueMiddleware(t *testing.T) {
var mu sync.Mutex

handler := qm.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pos := r.Context().Value("queue_position").(int)
pos := r.Context().Value(queuePositionKey).(int)
mu.Lock()
positions = append(positions, pos)
mu.Unlock()
Expand Down

0 comments on commit 33a4420

Please sign in to comment.