Skip to content

Commit

Permalink
op-heartbeat: handle heartbeat spamming (ethereum-optimism#4507)
Browse files Browse the repository at this point in the history
  • Loading branch information
protolambda authored Dec 15, 2022
1 parent 0669b2e commit 8d8219f
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 39 deletions.
69 changes: 63 additions & 6 deletions op-heartbeat/metrics.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,53 @@
package op_heartbeat

import (
"fmt"
"strconv"
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-node/heartbeat"
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/ethereum-optimism/optimism/op-node/heartbeat"
)

const MetricsNamespace = "op_heartbeat"
const (
MetricsNamespace = "op_heartbeat"
MinHeartbeatInterval = 10*time.Minute - 10*time.Second
UsersCacheSize = 10_000
)

type Metrics interface {
RecordHeartbeat(payload heartbeat.Payload)
RecordHeartbeat(payload heartbeat.Payload, ip string)
RecordVersion(version string)
}

type metrics struct {
heartbeats *prometheus.CounterVec
version *prometheus.GaugeVec
sameIP *prometheus.HistogramVec

// Groups heartbeats per unique IP, version and chain ID combination.
// string(IP ++ version ++ chainID) -> *heartbeatEntry
heartbeatUsers *lru.Cache
}

type heartbeatEntry struct {
// Count number of heartbeats per interval, atomically updated
Count uint64
// Changes once per heartbeat interval
Time time.Time
}

func NewMetrics(r *prometheus.Registry) Metrics {
lruCache, _ := lru.New(UsersCacheSize)
m := &metrics{
heartbeats: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: MetricsNamespace,
Name: "heartbeats",
Help: "Counts number of heartbeats by chain ID",
Help: "Counts number of heartbeats by chain ID, version and filtered to unique IPs",
}, []string{
"chain_id",
"version",
Expand All @@ -37,11 +59,21 @@ func NewMetrics(r *prometheus.Registry) Metrics {
}, []string{
"version",
}),
sameIP: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Name: "heartbeat_same_ip",
Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128},
Help: "Histogram of events within same heartbeat interval per unique IP, by chain ID and version",
}, []string{
"chain_id",
"version",
}),
heartbeatUsers: lruCache,
}
return m
}

func (m *metrics) RecordHeartbeat(payload heartbeat.Payload) {
func (m *metrics) RecordHeartbeat(payload heartbeat.Payload, ip string) {
var chainID string
if AllowedChainIDs[payload.ChainID] {
chainID = strconv.FormatUint(payload.ChainID, 10)
Expand All @@ -54,7 +86,32 @@ func (m *metrics) RecordHeartbeat(payload heartbeat.Payload) {
} else {
version = "unknown"
}
m.heartbeats.WithLabelValues(chainID, version).Inc()

key := fmt.Sprintf("%s;%s;%s", ip, version, chainID)
now := time.Now()
previous, ok, _ := m.heartbeatUsers.PeekOrAdd(key, &heartbeatEntry{Time: now, Count: 1})
if !ok {
// if it's a new entry, observe it and exit.
m.sameIP.WithLabelValues(chainID, version).Observe(1)
m.heartbeats.WithLabelValues(chainID, version).Inc()
return
}

entry := previous.(*heartbeatEntry)
if now.Sub(entry.Time) < MinHeartbeatInterval {
// if the span is still going, then add it up
atomic.AddUint64(&entry.Count, 1)
} else {
// if the span ended, then meter it, and reset it
m.sameIP.WithLabelValues(chainID, version).Observe(float64(atomic.LoadUint64(&entry.Count)))
entry.Time = now
atomic.StoreUint64(&entry.Count, 1)

m.heartbeats.WithLabelValues(chainID, version).Inc()
}

// always add, to keep LRU accurate
m.heartbeatUsers.Add(key, entry)
}

func (m *metrics) RecordVersion(version string) {
Expand Down
24 changes: 18 additions & 6 deletions op-heartbeat/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@ import (
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

"github.com/urfave/cli"

"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-node/heartbeat"
"github.com/ethereum-optimism/optimism/op-service/httputil"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli"
)

const HTTPMaxBodySize = 1024 * 1024
const (
HTTPMaxHeaderSize = 10 * 1024
HTTPMaxBodySize = 1024 * 1024
)

func Main(version string) func(ctx *cli.Context) error {
return func(cliCtx *cli.Context) error {
Expand Down Expand Up @@ -87,7 +93,7 @@ func Start(ctx context.Context, l log.Logger, cfg Config, version string) error

server := &http.Server{
Addr: net.JoinHostPort(cfg.HTTPAddr, strconv.Itoa(cfg.HTTPPort)),
MaxHeaderBytes: HTTPMaxBodySize,
MaxHeaderBytes: HTTPMaxHeaderSize,
Handler: mw,
WriteTimeout: 30 * time.Second,
IdleTimeout: time.Minute,
Expand All @@ -99,8 +105,14 @@ func Start(ctx context.Context, l log.Logger, cfg Config, version string) error

func Handler(l log.Logger, metrics Metrics) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ipStr := r.Header.Get("X-Forwarded-For")
// XFF can be a comma-separated list. Left-most is the original client.
if i := strings.Index(ipStr, ","); i >= 0 {
ipStr = ipStr[:i]
}

innerL := l.New(
"xff", r.Header.Get("X-Forwarded-For"),
"ip", ipStr,
"user_agent", r.Header.Get("User-Agent"),
"remote_addr", r.RemoteAddr,
)
Expand All @@ -122,7 +134,7 @@ func Handler(l log.Logger, metrics Metrics) http.HandlerFunc {
"chain_id", payload.ChainID,
)

metrics.RecordHeartbeat(payload)
metrics.RecordHeartbeat(payload, ipStr)

w.WriteHeader(204)
}
Expand Down
77 changes: 51 additions & 26 deletions op-heartbeat/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import (
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-node/heartbeat"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/require"

"github.com/ethereum-optimism/optimism/op-node/heartbeat"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
)

func TestService(t *testing.T) {
Expand Down Expand Up @@ -45,66 +46,90 @@ func TestService(t *testing.T) {
}

tests := []struct {
name string
hb heartbeat.Payload
metricName string
metricValue int
name string
hbs []heartbeat.Payload
metric string
ip string
}{
{
"no whitelisted version",
heartbeat.Payload{
[]heartbeat.Payload{{
Version: "not_whitelisted",
Meta: "whatever",
Moniker: "whatever",
PeerID: "1X2398ug",
ChainID: 10,
},
`op_heartbeat_heartbeats{chain_id="10",version="unknown"}`,
1,
}},
`op_heartbeat_heartbeats{chain_id="10",version="unknown"} 1`,
"1.2.3.100",
},
{
"no whitelisted chain",
heartbeat.Payload{
[]heartbeat.Payload{{
Version: "v0.1.0-beta.1",
Meta: "whatever",
Moniker: "whatever",
PeerID: "1X2398ug",
ChainID: 999,
},
`op_heartbeat_heartbeats{chain_id="unknown",version="v0.1.0-beta.1"}`,
1,
}},
`op_heartbeat_heartbeats{chain_id="unknown",version="v0.1.0-beta.1"} 1`,
"1.2.3.101",
},
{
"both whitelisted",
heartbeat.Payload{
[]heartbeat.Payload{{
Version: "v0.1.0-beta.1",
Meta: "whatever",
Moniker: "whatever",
PeerID: "1X2398ug",
ChainID: 10,
}},
`op_heartbeat_heartbeats{chain_id="10",version="v0.1.0-beta.1"} 1`,
"1.2.3.102",
},
{
"spamming",
[]heartbeat.Payload{
{
Version: "v0.1.0-goerli-rehearsal.1",
Meta: "whatever",
Moniker: "alice",
PeerID: "1X2398ug",
ChainID: 10,
},
{
Version: "v0.1.0-goerli-rehearsal.1",
Meta: "whatever",
Moniker: "bob",
PeerID: "1X2398ug",
ChainID: 10,
},
},
`op_heartbeat_heartbeats{chain_id="10",version="v0.1.0-beta.1"}`,
1,
`op_heartbeat_heartbeat_same_ip_bucket{chain_id="10",version="v0.1.0-goerli-rehearsal.1",le="32"} 1`,
"1.2.3.103",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data, err := json.Marshal(tt.hb)
require.NoError(t, err)
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://127.0.0.1:%d", httpPort), bytes.NewReader(data))
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, res.StatusCode, 204)
for _, hb := range tt.hbs {
data, err := json.Marshal(hb)
require.NoError(t, err)
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://127.0.0.1:%d", httpPort), bytes.NewReader(data))
require.NoError(t, err)
req.Header.Set("X-Forwarded-For", tt.ip)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
res.Body.Close()
require.Equal(t, res.StatusCode, 204)
}

metricsRes, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d", metricsPort))
require.NoError(t, err)
defer metricsRes.Body.Close()
require.NoError(t, err)
metricsBody, err := io.ReadAll(metricsRes.Body)
require.NoError(t, err)
require.Contains(t, string(metricsBody), fmt.Sprintf("%s %d", tt.metricName, tt.metricValue))
require.Contains(t, string(metricsBody), tt.metric)
})
}

Expand Down
3 changes: 2 additions & 1 deletion op-node/heartbeat/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"github.com/ethereum/go-ethereum/log"
)

var SendInterval = 10 * time.Minute
// SendInterval determines the delay between requests. This must be larger than the MinHeartbeatInterval in the server.
const SendInterval = 10 * time.Minute

type Payload struct {
Version string `json:"version"`
Expand Down

0 comments on commit 8d8219f

Please sign in to comment.