Skip to content

Commit

Permalink
"Release version 0.0.15"
Browse files Browse the repository at this point in the history
  • Loading branch information
teilomillet committed Dec 21, 2024
1 parent a50e683 commit 783129e
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 91 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
A lightweight HTTP server for Large Language Model (LLM) interactions, built with Go.

## Version
v0.0.14
v0.0.15

## Features

Expand Down
273 changes: 187 additions & 86 deletions server/provider/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,114 +5,215 @@ import (
"fmt"
"time"

"github.com/sony/gobreaker"
"github.com/teilomillet/gollm"
"github.com/teilomillet/hapax/server/circuitbreaker" // Added import for custom circuit breaker
"go.uber.org/zap"
)

// Execute runs an LLM operation with circuit breaker protection and retry limits
// result represents the outcome of an LLM operation
type result struct {
err error
status HealthStatus
name string
}

// Execute coordinates provider execution with proper error handling
func (m *Manager) Execute(ctx context.Context, operation func(llm gollm.LLM) error, prompt *gollm.Prompt) error {
// Create a consistent key based on the prompt content and role only
// Don't include anything that would make identical requests unique
key := fmt.Sprintf("%s-%s", prompt.Messages[0].Content, prompt.Messages[0].Role)
key := m.generateRequestKey(prompt)
m.logger.Debug("Starting Execute", zap.String("key", key))

type result struct {
err error
status HealthStatus
name string
v, err, shared := m.group.Do(key, func() (interface{}, error) {
return m.executeWithRetries(ctx, operation)
})

if err != nil {
m.logger.Debug("Execute failed", zap.Error(err))
return err
}

v, err, shared := m.group.Do(key, func() (interface{}, error) {
var lastErr error
retryCount := 0

for retryCount < maxProviderRetries {
retryCount++
m.logger.Debug("provider attempt", zap.Int("retry", retryCount))

// Get provider preference list under read lock
m.mu.RLock()
preference := make([]string, len(m.cfg.ProviderPreference))
copy(preference, m.cfg.ProviderPreference)
m.mu.RUnlock()

for _, name := range preference {
// Check context cancellation
if err := ctx.Err(); err != nil {
return &result{err: fmt.Errorf("context cancelled: %w", err)}, nil
}

// Get provider and health status under read lock
m.mu.RLock()
provider, exists := m.providers[name]
if !exists {
m.mu.RUnlock()
continue
}
status := m.GetHealthStatus(name)
breaker := m.breakers[name]
m.mu.RUnlock()

if !status.Healthy || breaker == nil {
continue
}

// Execute with circuit breaker
err := breaker.Execute(func() error {
return operation(provider)
})

if err != nil {
lastErr = err
m.logger.Debug("operation failed",
zap.String("provider", name),
zap.Error(err),
zap.Int("retry", retryCount))

return &result{
err: err,
status: HealthStatus{
Healthy: false,
LastCheck: time.Now(),
ErrorCount: status.ErrorCount + 1,
},
name: name,
}, nil
}

// Success case
return &result{
err: nil,
status: HealthStatus{
Healthy: true,
LastCheck: time.Now(),
ErrorCount: 0,
},
name: name,
}, nil
m.handleRequestMetrics(shared)
return m.processResult(v.(*result))
}

func (m *Manager) executeWithRetries(ctx context.Context, operation func(llm gollm.LLM) error) (*result, error) {
preference := m.getProviderPreference()
if len(preference) == 0 {
return &result{
err: fmt.Errorf("no providers configured"),
}, fmt.Errorf("no providers configured")
}

var lastResult *result

// Try each provider in sequence
for _, name := range preference {
provider, breaker, status := m.getProviderResources(name)
if provider == nil || breaker == nil || !status.Healthy {
continue
}

// Try the current provider
currentResult := m.executeOperation(ctx, operation, provider, breaker, status, name)
lastResult = currentResult

if currentResult.err == nil {
// Success case - return immediately
return currentResult, nil
}

// **Key Insight**
// =================
//
// The key insight nderstand the relationship between single-request behavior and cross-request state.
// The circuit breaker maintains state across requests, but each individual request needs clear, predictable behavior.

// **Request Flow**
// ===============
//
// When the first request comes in:
// 1. The breaker is closed (not open).
// 2. We hit the else clause.
// 3. We return the primary error immediately.
// 4. This failure gets recorded in the circuit breaker's state.

// For the second request:
// 1. The primary provider fails again.
// 2. This triggers the circuit breaker to open.
// 3. Because the breaker is now open, we hit the first condition.
// 4. The continue statement moves us to try the backup provider.
// 5. All of this happens within the same request.

// **Properties Maintained**
// =======================
//
// This pattern maintains two important properties:
// 1. **Isolation**: Each request has clear, predictable behavior.
// 2. **State Evolution**: The circuit breaker accumulates state across requests.

// Circuit Breaker Logic
if breaker.State() == gobreaker.StateOpen {
// If the circuit breaker is open, we check if we're at the last provider in the preference list.
// If we are, we return the primary error immediately.
if name == preference[len(preference)-1] {
return currentResult, currentResult.err // This gives us the immediate failure
}
// Continue to the next provider if we are not at the last one.
continue
} else {
// If the breaker is closed, we return the primary error immediately.
return currentResult, currentResult.err // This gives us the immediate failure
}
}

// Error Handling
// We always maintain a valid result structure to prevent nil pointer dereference.
if lastResult == nil {
return &result{
err: fmt.Errorf("no healthy provider available"),
}, fmt.Errorf("no healthy provider available")
}

return lastResult, lastResult.err
}

if lastErr != nil {
return &result{err: fmt.Errorf("max retries (%d) exceeded, last error: %w", maxProviderRetries, lastErr)}, nil
// executeOperation handles a single operation attempt with proper resource cleanup
func (m *Manager) executeOperation(
ctx context.Context,
operation func(llm gollm.LLM) error,
provider gollm.LLM,
breaker *circuitbreaker.CircuitBreaker,
status HealthStatus,
name string) *result {

start := time.Now()

err := breaker.Execute(func() error {
// Always check context before executing operation
if err := ctx.Err(); err != nil {
return err
}
return &result{err: fmt.Errorf("max retries (%d) exceeded, no healthy provider available", maxProviderRetries)}, nil
return operation(provider)
})

duration := time.Since(start)
breakerState := breaker.State()
breakerCounts := breaker.Counts()

if err != nil {
m.logger.Debug("Execute failed", zap.Error(err))
return err
m.logger.Debug("operation failed",
zap.String("provider", name),
zap.Error(err),
zap.Duration("duration", duration),
zap.String("breaker_state", breakerState.String()),
zap.Uint32("consecutive_failures", breakerCounts.ConsecutiveFailures))

return &result{
err: err,
status: HealthStatus{
Healthy: false,
LastCheck: time.Now(),
ErrorCount: status.ErrorCount + 1,
ConsecutiveFails: int(breakerCounts.ConsecutiveFailures),
Latency: duration,
RequestCount: status.RequestCount + 1,
},
name: name,
}
}

// Update metrics for deduplicated requests
return &result{
err: nil,
status: HealthStatus{
Healthy: true,
LastCheck: time.Now(),
ErrorCount: 0,
ConsecutiveFails: 0,
Latency: duration,
RequestCount: status.RequestCount + 1,
},
name: name,
}
}

// generateRequestKey creates a consistent key based on the prompt content and role
func (m *Manager) generateRequestKey(prompt *gollm.Prompt) string {
return fmt.Sprintf("%s-%s", prompt.Messages[0].Content, prompt.Messages[0].Role)
}

// getProviderPreference safely retrieves the current provider preference list
func (m *Manager) getProviderPreference() []string {
m.mu.RLock()
defer m.mu.RUnlock()
preference := make([]string, len(m.cfg.ProviderPreference))
copy(preference, m.cfg.ProviderPreference)
return preference
}

// getProviderResources safely retrieves provider-related resources
func (m *Manager) getProviderResources(name string) (gollm.LLM, *circuitbreaker.CircuitBreaker, HealthStatus) {
m.mu.RLock()
defer m.mu.RUnlock()

provider, exists := m.providers[name]
if !exists {
return nil, nil, HealthStatus{}
}

return provider, m.breakers[name], m.GetHealthStatus(name)
}

// handleRequestMetrics updates metrics for deduplicated requests
func (m *Manager) handleRequestMetrics(shared bool) {
if shared {
m.deduplicatedRequests.Inc()
}
}

// Update health status after singleflight completes
if r, ok := v.(*result); ok && r.name != "" {
// processResult handles the final result and updates provider health status
func (m *Manager) processResult(r *result) error {
if r.name != "" {
m.UpdateHealthStatus(r.name, r.status)
}

return v.(*result).err
return r.err
}
4 changes: 0 additions & 4 deletions server/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ import (
"golang.org/x/sync/singleflight"
)

// maxProviderRetries defines the maximum number of times we'll retry through the provider list
// before giving up. This prevents infinite loops when all providers are unhealthy.
const maxProviderRetries = 3

// Manager handles LLM provider management and selection
type Manager struct {
providers map[string]gollm.LLM
Expand Down
44 changes: 44 additions & 0 deletions server/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,20 @@ func TestProviderHealth(t *testing.T) {
}
}

// TestProviderFailover verifies the behavior of the circuit breaker
// and the handling of failures across requests.
// It ensures that the first request fails as expected, the second request
// triggers the circuit breaker and attempts to use backup providers,
// and that proper error messages are returned when no providers are available.

func TestProviderFailover(t *testing.T) {
// This test case is crucial in verifying the circuit breaker behavior
// and its impact on the overall system reliability.

logger, _ := zap.NewDevelopment()
logger.Info("Starting TestProviderFailover")

// Setup test cases for primary and backup providers
// Simple config with just two providers
cfg := &config.Config{
TestMode: true,
Expand Down Expand Up @@ -166,6 +176,36 @@ func TestProviderFailover(t *testing.T) {
Messages: []gollm.PromptMessage{{Role: "user", Content: "test"}},
}

// **Key Insights**
//
// The key insight is to understand the relationship between single-request behavior and cross-request state.
// The circuit breaker maintains state across requests, but each individual request needs clear, predictable behavior.
//
// **Single-Request Behavior**
//
// When the first request comes in:
// 1. The breaker is closed (not open).
// 2. We hit the else clause.
// 3. We return the primary error immediately.
// 4. This failure gets recorded in the circuit breaker's state.
//
// **Cross-Request State Evolution**
//
// For the second request:
// 1. The primary provider fails again.
// 2. This triggers the circuit breaker to open.
// 3. Because the breaker is now open, we hit the first condition.
// 4. The continue statement moves us to try the backup provider.
// 5. All of this happens within the same request.
//
// **Properties Maintained**
//
// This pattern maintains two important properties:
// 1. **Isolation**: Each request has clear, predictable behavior.
// 2. **State Evolution**: The circuit breaker accumulates state across requests.

// First request should fail with the primary error
// This verifies that the circuit breaker records the failure correctly.
logger.Info("Executing first request (should fail)")
err = manager.Execute(ctx, func(llm gollm.LLM) error {
result, err := llm.Generate(ctx, prompt)
Expand All @@ -175,6 +215,10 @@ func TestProviderFailover(t *testing.T) {
require.Error(t, err)
logger.Info("First request completed", zap.Error(err))

// Second request should try primary, detect circuit breaker opening,
// and immediately try backup providers.
// If all providers fail, we need to ensure proper error handling
// and meaningful error messages are returned.
logger.Info("Executing second request (should succeed using backup)")
err = manager.Execute(ctx, func(llm gollm.LLM) error {
result, err := llm.Generate(ctx, prompt)
Expand Down

0 comments on commit 783129e

Please sign in to comment.