diff --git a/README.md b/README.md index ed3ece1..89bf0d9 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ A lightweight HTTP server for Large Language Model (LLM) interactions, built with Go. ## Version -v0.0.14 +v0.0.15 ## Features diff --git a/server/provider/execution.go b/server/provider/execution.go index c3fbfda..e0cbeeb 100644 --- a/server/provider/execution.go +++ b/server/provider/execution.go @@ -5,114 +5,215 @@ import ( "fmt" "time" + "github.com/sony/gobreaker" "github.com/teilomillet/gollm" + "github.com/teilomillet/hapax/server/circuitbreaker" // Added import for custom circuit breaker "go.uber.org/zap" ) -// Execute runs an LLM operation with circuit breaker protection and retry limits +// result represents the outcome of an LLM operation +type result struct { + err error + status HealthStatus + name string +} + +// Execute coordinates provider execution with proper error handling func (m *Manager) Execute(ctx context.Context, operation func(llm gollm.LLM) error, prompt *gollm.Prompt) error { - // Create a consistent key based on the prompt content and role only - // Don't include anything that would make identical requests unique - key := fmt.Sprintf("%s-%s", prompt.Messages[0].Content, prompt.Messages[0].Role) + key := m.generateRequestKey(prompt) m.logger.Debug("Starting Execute", zap.String("key", key)) - type result struct { - err error - status HealthStatus - name string + v, err, shared := m.group.Do(key, func() (interface{}, error) { + return m.executeWithRetries(ctx, operation) + }) + + if err != nil { + m.logger.Debug("Execute failed", zap.Error(err)) + return err } - v, err, shared := m.group.Do(key, func() (interface{}, error) { - var lastErr error - retryCount := 0 - - for retryCount < maxProviderRetries { - retryCount++ - m.logger.Debug("provider attempt", zap.Int("retry", retryCount)) - - // Get provider preference list under read lock - m.mu.RLock() - preference := make([]string, len(m.cfg.ProviderPreference)) - copy(preference, m.cfg.ProviderPreference) - m.mu.RUnlock() - - for _, name := range preference { - // Check context cancellation - if err := ctx.Err(); err != nil { - return &result{err: fmt.Errorf("context cancelled: %w", err)}, nil - } - - // Get provider and health status under read lock - m.mu.RLock() - provider, exists := m.providers[name] - if !exists { - m.mu.RUnlock() - continue - } - status := m.GetHealthStatus(name) - breaker := m.breakers[name] - m.mu.RUnlock() - - if !status.Healthy || breaker == nil { - continue - } - - // Execute with circuit breaker - err := breaker.Execute(func() error { - return operation(provider) - }) - - if err != nil { - lastErr = err - m.logger.Debug("operation failed", - zap.String("provider", name), - zap.Error(err), - zap.Int("retry", retryCount)) - - return &result{ - err: err, - status: HealthStatus{ - Healthy: false, - LastCheck: time.Now(), - ErrorCount: status.ErrorCount + 1, - }, - name: name, - }, nil - } - - // Success case - return &result{ - err: nil, - status: HealthStatus{ - Healthy: true, - LastCheck: time.Now(), - ErrorCount: 0, - }, - name: name, - }, nil + m.handleRequestMetrics(shared) + return m.processResult(v.(*result)) +} + +func (m *Manager) executeWithRetries(ctx context.Context, operation func(llm gollm.LLM) error) (*result, error) { + preference := m.getProviderPreference() + if len(preference) == 0 { + return &result{ + err: fmt.Errorf("no providers configured"), + }, fmt.Errorf("no providers configured") + } + + var lastResult *result + + // Try each provider in sequence + for _, name := range preference { + provider, breaker, status := m.getProviderResources(name) + if provider == nil || breaker == nil || !status.Healthy { + continue + } + + // Try the current provider + currentResult := m.executeOperation(ctx, operation, provider, breaker, status, name) + lastResult = currentResult + + if currentResult.err == nil { + // Success case - return immediately + return currentResult, nil + } + + // **Key Insight** + // ================= + // + // The key insight nderstand the relationship between single-request behavior and cross-request state. + // The circuit breaker maintains state across requests, but each individual request needs clear, predictable behavior. + + // **Request Flow** + // =============== + // + // When the first request comes in: + // 1. The breaker is closed (not open). + // 2. We hit the else clause. + // 3. We return the primary error immediately. + // 4. This failure gets recorded in the circuit breaker's state. + + // For the second request: + // 1. The primary provider fails again. + // 2. This triggers the circuit breaker to open. + // 3. Because the breaker is now open, we hit the first condition. + // 4. The continue statement moves us to try the backup provider. + // 5. All of this happens within the same request. + + // **Properties Maintained** + // ======================= + // + // This pattern maintains two important properties: + // 1. **Isolation**: Each request has clear, predictable behavior. + // 2. **State Evolution**: The circuit breaker accumulates state across requests. + + // Circuit Breaker Logic + if breaker.State() == gobreaker.StateOpen { + // If the circuit breaker is open, we check if we're at the last provider in the preference list. + // If we are, we return the primary error immediately. + if name == preference[len(preference)-1] { + return currentResult, currentResult.err // This gives us the immediate failure } + // Continue to the next provider if we are not at the last one. + continue + } else { + // If the breaker is closed, we return the primary error immediately. + return currentResult, currentResult.err // This gives us the immediate failure } + } + + // Error Handling + // We always maintain a valid result structure to prevent nil pointer dereference. + if lastResult == nil { + return &result{ + err: fmt.Errorf("no healthy provider available"), + }, fmt.Errorf("no healthy provider available") + } + + return lastResult, lastResult.err +} - if lastErr != nil { - return &result{err: fmt.Errorf("max retries (%d) exceeded, last error: %w", maxProviderRetries, lastErr)}, nil +// executeOperation handles a single operation attempt with proper resource cleanup +func (m *Manager) executeOperation( + ctx context.Context, + operation func(llm gollm.LLM) error, + provider gollm.LLM, + breaker *circuitbreaker.CircuitBreaker, + status HealthStatus, + name string) *result { + + start := time.Now() + + err := breaker.Execute(func() error { + // Always check context before executing operation + if err := ctx.Err(); err != nil { + return err } - return &result{err: fmt.Errorf("max retries (%d) exceeded, no healthy provider available", maxProviderRetries)}, nil + return operation(provider) }) + duration := time.Since(start) + breakerState := breaker.State() + breakerCounts := breaker.Counts() + if err != nil { - m.logger.Debug("Execute failed", zap.Error(err)) - return err + m.logger.Debug("operation failed", + zap.String("provider", name), + zap.Error(err), + zap.Duration("duration", duration), + zap.String("breaker_state", breakerState.String()), + zap.Uint32("consecutive_failures", breakerCounts.ConsecutiveFailures)) + + return &result{ + err: err, + status: HealthStatus{ + Healthy: false, + LastCheck: time.Now(), + ErrorCount: status.ErrorCount + 1, + ConsecutiveFails: int(breakerCounts.ConsecutiveFailures), + Latency: duration, + RequestCount: status.RequestCount + 1, + }, + name: name, + } } - // Update metrics for deduplicated requests + return &result{ + err: nil, + status: HealthStatus{ + Healthy: true, + LastCheck: time.Now(), + ErrorCount: 0, + ConsecutiveFails: 0, + Latency: duration, + RequestCount: status.RequestCount + 1, + }, + name: name, + } +} + +// generateRequestKey creates a consistent key based on the prompt content and role +func (m *Manager) generateRequestKey(prompt *gollm.Prompt) string { + return fmt.Sprintf("%s-%s", prompt.Messages[0].Content, prompt.Messages[0].Role) +} + +// getProviderPreference safely retrieves the current provider preference list +func (m *Manager) getProviderPreference() []string { + m.mu.RLock() + defer m.mu.RUnlock() + preference := make([]string, len(m.cfg.ProviderPreference)) + copy(preference, m.cfg.ProviderPreference) + return preference +} + +// getProviderResources safely retrieves provider-related resources +func (m *Manager) getProviderResources(name string) (gollm.LLM, *circuitbreaker.CircuitBreaker, HealthStatus) { + m.mu.RLock() + defer m.mu.RUnlock() + + provider, exists := m.providers[name] + if !exists { + return nil, nil, HealthStatus{} + } + + return provider, m.breakers[name], m.GetHealthStatus(name) +} + +// handleRequestMetrics updates metrics for deduplicated requests +func (m *Manager) handleRequestMetrics(shared bool) { if shared { m.deduplicatedRequests.Inc() } +} - // Update health status after singleflight completes - if r, ok := v.(*result); ok && r.name != "" { +// processResult handles the final result and updates provider health status +func (m *Manager) processResult(r *result) error { + if r.name != "" { m.UpdateHealthStatus(r.name, r.status) } - - return v.(*result).err + return r.err } diff --git a/server/provider/provider.go b/server/provider/provider.go index 7403c92..fb12dd1 100644 --- a/server/provider/provider.go +++ b/server/provider/provider.go @@ -15,10 +15,6 @@ import ( "golang.org/x/sync/singleflight" ) -// maxProviderRetries defines the maximum number of times we'll retry through the provider list -// before giving up. This prevents infinite loops when all providers are unhealthy. -const maxProviderRetries = 3 - // Manager handles LLM provider management and selection type Manager struct { providers map[string]gollm.LLM diff --git a/server/provider/provider_test.go b/server/provider/provider_test.go index 3b22b44..2a240e2 100644 --- a/server/provider/provider_test.go +++ b/server/provider/provider_test.go @@ -110,10 +110,20 @@ func TestProviderHealth(t *testing.T) { } } +// TestProviderFailover verifies the behavior of the circuit breaker +// and the handling of failures across requests. +// It ensures that the first request fails as expected, the second request +// triggers the circuit breaker and attempts to use backup providers, +// and that proper error messages are returned when no providers are available. + func TestProviderFailover(t *testing.T) { + // This test case is crucial in verifying the circuit breaker behavior + // and its impact on the overall system reliability. + logger, _ := zap.NewDevelopment() logger.Info("Starting TestProviderFailover") + // Setup test cases for primary and backup providers // Simple config with just two providers cfg := &config.Config{ TestMode: true, @@ -166,6 +176,36 @@ func TestProviderFailover(t *testing.T) { Messages: []gollm.PromptMessage{{Role: "user", Content: "test"}}, } + // **Key Insights** + // + // The key insight is to understand the relationship between single-request behavior and cross-request state. + // The circuit breaker maintains state across requests, but each individual request needs clear, predictable behavior. + // + // **Single-Request Behavior** + // + // When the first request comes in: + // 1. The breaker is closed (not open). + // 2. We hit the else clause. + // 3. We return the primary error immediately. + // 4. This failure gets recorded in the circuit breaker's state. + // + // **Cross-Request State Evolution** + // + // For the second request: + // 1. The primary provider fails again. + // 2. This triggers the circuit breaker to open. + // 3. Because the breaker is now open, we hit the first condition. + // 4. The continue statement moves us to try the backup provider. + // 5. All of this happens within the same request. + // + // **Properties Maintained** + // + // This pattern maintains two important properties: + // 1. **Isolation**: Each request has clear, predictable behavior. + // 2. **State Evolution**: The circuit breaker accumulates state across requests. + + // First request should fail with the primary error + // This verifies that the circuit breaker records the failure correctly. logger.Info("Executing first request (should fail)") err = manager.Execute(ctx, func(llm gollm.LLM) error { result, err := llm.Generate(ctx, prompt) @@ -175,6 +215,10 @@ func TestProviderFailover(t *testing.T) { require.Error(t, err) logger.Info("First request completed", zap.Error(err)) + // Second request should try primary, detect circuit breaker opening, + // and immediately try backup providers. + // If all providers fail, we need to ensure proper error handling + // and meaningful error messages are returned. logger.Info("Executing second request (should succeed using backup)") err = manager.Execute(ctx, func(llm gollm.LLM) error { result, err := llm.Generate(ctx, prompt)