From 9f619ad46dbd1efd7799bcbfdfbc4a4cfaae948a Mon Sep 17 00:00:00 2001 From: David Cook Date: Wed, 12 Jun 2024 16:16:32 -0500 Subject: [PATCH] Optionally delete stale distributed state (#55) * Optionally delete stale distributed state * Add unit test to confirm files are deleted * Review feedback --- README.md | 4 ++- caddyfile.go | 14 ++++++++ distributed.go | 14 ++++++++ distributed_test.go | 86 +++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 110 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index edc4a4c..2920a8c 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,8 @@ This is an HTTP handler module, so it can be used wherever `http.handlers` modul } "distributed": { "write_interval": "", - "read_interval": "" + "read_interval": "", + "purge_age": "" }, } ``` @@ -130,6 +131,7 @@ rate_limit { distributed { read_interval write_interval + purge_age } storage jitter diff --git a/caddyfile.go b/caddyfile.go index 1fa2c39..acba24c 100644 --- a/caddyfile.go +++ b/caddyfile.go @@ -46,6 +46,7 @@ func parseCaddyfile(helper httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, e // distributed { // read_interval // write_interval +// purge_age // } // storage // jitter @@ -150,6 +151,19 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { return d.Errf("invalid write interval '%s': %v", d.Val(), err) } h.Distributed.WriteInterval = caddy.Duration(interval) + + case "purge_age": + if !d.NextArg() { + return d.ArgErr() + } + if h.Distributed.PurgeAge != 0 { + return d.Errf("purge age already specified: %v", h.Distributed.PurgeAge) + } + age, err := caddy.ParseDuration(d.Val()) + if err != nil { + return d.Errf("invalid purge age '%s': %v", d.Val(), err) + } + h.Distributed.PurgeAge = caddy.Duration(age) } } diff --git a/distributed.go b/distributed.go index 6258880..d44eeb0 100644 --- a/distributed.go +++ b/distributed.go @@ -46,6 +46,10 @@ type DistributedRateLimiting struct { // Default: 5s ReadInterval caddy.Duration `json:"read_interval,omitempty"` + // How long to wait before deleting stale states from other instances. + // Default: never + PurgeAge caddy.Duration `json:"purge_age,omitempty"` + instanceID string otherStates []rlState @@ -153,6 +157,16 @@ func (h Handler) syncDistributedRead(ctx context.Context) error { continue } + if h.Distributed.PurgeAge != 0 && state.Timestamp.Before(now().Add(-time.Duration(h.Distributed.PurgeAge))) { + err = h.storage.Delete(ctx, instanceFile) + if err != nil { + h.logger.Error("cannot delete stale rate limiter state file", + zap.String("key", instanceFile), + zap.Error(err)) + } + continue + } + otherStates = append(otherStates, state) } diff --git a/distributed_test.go b/distributed_test.go index 9e39697..03ae172 100644 --- a/distributed_test.go +++ b/distributed_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "os" + "path" "strings" "testing" "time" @@ -26,18 +27,23 @@ import ( "github.com/caddyserver/caddy/v2/caddytest" "github.com/caddyserver/certmagic" "github.com/google/uuid" + "go.uber.org/zap" ) -func TestDistributed(t *testing.T) { - initTime() - window := 60 - maxEvents := 10 - +func ensureAppDataDir(t *testing.T) { // Make sure AppDataDir exists, because otherwise the caddytest.Tester won't // be able to generate an instance ID if err := os.MkdirAll(caddy.AppDataDir(), 0700); err != nil { t.Fatalf("failed to create app data dir %s: %s", caddy.AppDataDir(), err) } +} + +func TestDistributed(t *testing.T) { + initTime() + window := 60 + maxEvents := 10 + + ensureAppDataDir(t) testCases := []struct { name string @@ -85,7 +91,7 @@ func TestDistributed(t *testing.T) { t.Run(testCase.name, func(t *testing.T) { storageDir := t.TempDir() // Use a random UUID as the zone so that rate limits from multiple test runs - // collide with each other + // don't collide with each other zone := uuid.New().String() // To simulate a peer in a rate limiting cluster, constuct a @@ -158,7 +164,8 @@ func TestDistributed(t *testing.T) { }, "distributed": { "write_interval": "3600s", - "read_interval": "3600s" + "read_interval": "3600s", + "purge_age": "7200s" } }, { @@ -189,3 +196,68 @@ func TestDistributed(t *testing.T) { }) } } + +func TestPurgeDistributedState(t *testing.T) { + initTime() + ensureAppDataDir(t) + logger, err := zap.NewDevelopment() + if err != nil { + t.Fatalf("failed to create logger: %s", err) + } + + storageDir := t.TempDir() + storage := certmagic.FileStorage{ + Path: storageDir, + } + + // Seed the storage directory with a rate limit state file from another instance. + otherRlState := rlState{ + Timestamp: now(), + Zones: make(map[string]map[string]rlStateValue, 0), + } + if err := writeRateLimitState(context.Background(), otherRlState, "12345678-1234-1234-1234-123456789abc", &storage); err != nil { + t.Fatalf("failed to write state to storage: %s", err) + } + + handler := Handler{ + Distributed: &DistributedRateLimiting{ + instanceID: "99999999-9999-9999-9999-999999999999", + PurgeAge: caddy.Duration(time.Hour), + }, + storage: &storage, + logger: logger, + } + + // Perform initial read, and confirm it picks up the existing state file. + err = handler.syncDistributedRead(context.Background()) + if err != nil { + t.Fatalf("reading distributed state failed: %s", err) + } + if len(handler.Distributed.otherStates) != 1 { + t.Fatalf("did not read other states correctly: %v", handler.Distributed.otherStates) + } + dirEntries, err := os.ReadDir(path.Join(storageDir, "rate_limit", "instances")) + if err != nil { + t.Fatalf("couldn't list directory: %s", err) + } + if len(dirEntries) != 1 { + t.Fatalf("wrong number of files present in storage directory: %v", dirEntries) + } + + // Advance time and sync again. The old state file should be deleted now. + advanceTime(2 * 60 * 60) + err = handler.syncDistributedRead(context.Background()) + if err != nil { + t.Fatalf("reading distributed state failed: %s", err) + } + if len(handler.Distributed.otherStates) != 0 { + t.Fatalf("expected other state to be deleted: %v", handler.Distributed.otherStates) + } + dirEntries, err = os.ReadDir(path.Join(storageDir, "rate_limit", "instances")) + if err != nil { + t.Fatalf("couldn't list directory: %s", err) + } + if len(dirEntries) != 0 { + t.Fatalf("storage directory was not empty: %v", dirEntries) + } +}