Skip to content

Commit

Permalink
Optionally delete stale distributed state (#55)
Browse files Browse the repository at this point in the history
* Optionally delete stale distributed state

* Add unit test to confirm files are deleted

* Review feedback
  • Loading branch information
divergentdave authored Jun 12, 2024
1 parent b188193 commit 9f619ad
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 8 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ This is an HTTP handler module, so it can be used wherever `http.handlers` modul
}
"distributed": {
"write_interval": "",
"read_interval": ""
"read_interval": "",
"purge_age": ""
},
}
```
Expand Down Expand Up @@ -130,6 +131,7 @@ rate_limit {
distributed {
read_interval <duration>
write_interval <duration>
purge_age <duration>
}
storage <module...>
jitter <percent>
Expand Down
14 changes: 14 additions & 0 deletions caddyfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func parseCaddyfile(helper httpcaddyfile.Helper) (caddyhttp.MiddlewareHandler, e
// distributed {
// read_interval <duration>
// write_interval <duration>
// purge_age <duration>
// }
// storage <module...>
// jitter <percent>
Expand Down Expand Up @@ -150,6 +151,19 @@ func (h *Handler) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
return d.Errf("invalid write interval '%s': %v", d.Val(), err)
}
h.Distributed.WriteInterval = caddy.Duration(interval)

case "purge_age":
if !d.NextArg() {
return d.ArgErr()
}
if h.Distributed.PurgeAge != 0 {
return d.Errf("purge age already specified: %v", h.Distributed.PurgeAge)
}
age, err := caddy.ParseDuration(d.Val())
if err != nil {
return d.Errf("invalid purge age '%s': %v", d.Val(), err)
}
h.Distributed.PurgeAge = caddy.Duration(age)
}
}

Expand Down
14 changes: 14 additions & 0 deletions distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type DistributedRateLimiting struct {
// Default: 5s
ReadInterval caddy.Duration `json:"read_interval,omitempty"`

// How long to wait before deleting stale states from other instances.
// Default: never
PurgeAge caddy.Duration `json:"purge_age,omitempty"`

instanceID string

otherStates []rlState
Expand Down Expand Up @@ -153,6 +157,16 @@ func (h Handler) syncDistributedRead(ctx context.Context) error {
continue
}

if h.Distributed.PurgeAge != 0 && state.Timestamp.Before(now().Add(-time.Duration(h.Distributed.PurgeAge))) {
err = h.storage.Delete(ctx, instanceFile)
if err != nil {
h.logger.Error("cannot delete stale rate limiter state file",
zap.String("key", instanceFile),
zap.Error(err))
}
continue
}

otherStates = append(otherStates, state)
}

Expand Down
86 changes: 79 additions & 7 deletions distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"os"
"path"
"strings"
"testing"
"time"
Expand All @@ -26,18 +27,23 @@ import (
"github.com/caddyserver/caddy/v2/caddytest"
"github.com/caddyserver/certmagic"
"github.com/google/uuid"
"go.uber.org/zap"
)

func TestDistributed(t *testing.T) {
initTime()
window := 60
maxEvents := 10

func ensureAppDataDir(t *testing.T) {
// Make sure AppDataDir exists, because otherwise the caddytest.Tester won't
// be able to generate an instance ID
if err := os.MkdirAll(caddy.AppDataDir(), 0700); err != nil {
t.Fatalf("failed to create app data dir %s: %s", caddy.AppDataDir(), err)
}
}

func TestDistributed(t *testing.T) {
initTime()
window := 60
maxEvents := 10

ensureAppDataDir(t)

testCases := []struct {
name string
Expand Down Expand Up @@ -85,7 +91,7 @@ func TestDistributed(t *testing.T) {
t.Run(testCase.name, func(t *testing.T) {
storageDir := t.TempDir()
// Use a random UUID as the zone so that rate limits from multiple test runs
// collide with each other
// don't collide with each other
zone := uuid.New().String()

// To simulate a peer in a rate limiting cluster, constuct a
Expand Down Expand Up @@ -158,7 +164,8 @@ func TestDistributed(t *testing.T) {
},
"distributed": {
"write_interval": "3600s",
"read_interval": "3600s"
"read_interval": "3600s",
"purge_age": "7200s"
}
},
{
Expand Down Expand Up @@ -189,3 +196,68 @@ func TestDistributed(t *testing.T) {
})
}
}

func TestPurgeDistributedState(t *testing.T) {
initTime()
ensureAppDataDir(t)
logger, err := zap.NewDevelopment()
if err != nil {
t.Fatalf("failed to create logger: %s", err)
}

storageDir := t.TempDir()
storage := certmagic.FileStorage{
Path: storageDir,
}

// Seed the storage directory with a rate limit state file from another instance.
otherRlState := rlState{
Timestamp: now(),
Zones: make(map[string]map[string]rlStateValue, 0),
}
if err := writeRateLimitState(context.Background(), otherRlState, "12345678-1234-1234-1234-123456789abc", &storage); err != nil {
t.Fatalf("failed to write state to storage: %s", err)
}

handler := Handler{
Distributed: &DistributedRateLimiting{
instanceID: "99999999-9999-9999-9999-999999999999",
PurgeAge: caddy.Duration(time.Hour),
},
storage: &storage,
logger: logger,
}

// Perform initial read, and confirm it picks up the existing state file.
err = handler.syncDistributedRead(context.Background())
if err != nil {
t.Fatalf("reading distributed state failed: %s", err)
}
if len(handler.Distributed.otherStates) != 1 {
t.Fatalf("did not read other states correctly: %v", handler.Distributed.otherStates)
}
dirEntries, err := os.ReadDir(path.Join(storageDir, "rate_limit", "instances"))
if err != nil {
t.Fatalf("couldn't list directory: %s", err)
}
if len(dirEntries) != 1 {
t.Fatalf("wrong number of files present in storage directory: %v", dirEntries)
}

// Advance time and sync again. The old state file should be deleted now.
advanceTime(2 * 60 * 60)
err = handler.syncDistributedRead(context.Background())
if err != nil {
t.Fatalf("reading distributed state failed: %s", err)
}
if len(handler.Distributed.otherStates) != 0 {
t.Fatalf("expected other state to be deleted: %v", handler.Distributed.otherStates)
}
dirEntries, err = os.ReadDir(path.Join(storageDir, "rate_limit", "instances"))
if err != nil {
t.Fatalf("couldn't list directory: %s", err)
}
if len(dirEntries) != 0 {
t.Fatalf("storage directory was not empty: %v", dirEntries)
}
}

0 comments on commit 9f619ad

Please sign in to comment.