diff --git a/.github/workflows/ci-go.yml b/.github/workflows/ci-go.yml index 6cd7c76..411ac22 100644 --- a/.github/workflows/ci-go.yml +++ b/.github/workflows/ci-go.yml @@ -19,5 +19,6 @@ jobs: args: --timeout=10m --disable-all -E goimports - uses: dominikh/staticcheck-action@v1.2.0 with: + version: v0.4.7 install-go: false - run: go test -v ./... diff --git a/go.mod b/go.mod index 1339e51..f3a37f9 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,11 @@ module github.com/vkcom/statshouse-go go 1.19 + +require github.com/stretchr/testify v1.9.0 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..60ce688 --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/statshouse.go b/statshouse.go index 0c455d0..01617a6 100644 --- a/statshouse.go +++ b/statshouse.go @@ -9,8 +9,8 @@ package statshouse import ( "encoding/binary" "log" - "math" "net" + "runtime" "sync" "sync/atomic" "time" @@ -35,6 +35,7 @@ const ( tsFieldsMask = uint32(1 << 4) batchHeaderLen = 3 * tlInt32Size // tag, fields_mask, # of batches maxTags = 16 + maxEmptySendCount = 2 // before bucket detach ) var ( @@ -56,20 +57,40 @@ func ConfigureNetwork(logf LoggerFunc, network string, statsHouseAddr string, de globalClient.configure(logf, network, statsHouseAddr, defaultEnv) } +func TrackBucketCount() { + globalClient.TrackBucketCount() +} + +func BucketCount() { + globalClient.BucketCount() +} + // Close calls [*Client.Close] on the global [Client]. // Make sure to call Close during app exit to avoid losing the last batch of metrics. func Close() error { return globalClient.Close() } -// Metric calls [*Client.Metric] on the global [Client]. +// GlobalMetricRef calls [*Client.MetricRef] on the global [Client]. // It is valid to call Metric before [Configure]. +func GlobalMetricRef(metric string, tags Tags) MetricRef { + return globalClient.MetricRef(metric, tags) +} + +// Deprecated: causes unnecessary memory allocation. +// Use either [Client.MetricRef] or direct write func like [Client.Count]. func Metric(metric string, tags Tags) *MetricRef { return globalClient.Metric(metric, tags) } -// MetricNamed calls [*Client.MetricNamed] on the global [Client]. -// It is valid to call MetricNamed before [Configure]. +// GlobalMetricNamedRef calls [*Client.MetricNamedRef] on the global [Client]. +// It is valid to call GlobalMetricNamedRef before [Configure]. +func GlobalMetricNamedRef(metric string, tags NamedTags) MetricRef { + return globalClient.MetricNamedRef(metric, tags) +} + +// Deprecated: causes unnecessary memory allocation. +// Use either [Client.MetricNamedRef] or direct write func like [Client.Count]. func MetricNamed(metric string, tags NamedTags) *MetricRef { return globalClient.MetricNamed(metric, tags) } @@ -93,17 +114,20 @@ func StopRegularMeasurement(id int) { // Specifying empty StatsHouse address will make the client silently discard all metrics. // if you get compiler error after updating to recent version of library, pass statshouse.DefaultNetwork to network parameter func NewClient(logf LoggerFunc, network string, statsHouseAddr string, defaultEnv string) *Client { + if logf == nil { + logf = log.Printf + } c := &Client{ logf: logf, addr: statsHouseAddr, network: network, packetBuf: make([]byte, batchHeaderLen, maxPayloadSize), // can grow larger than maxPayloadSize if writing huge header close: make(chan chan struct{}), - cur: &MetricRef{}, - w: map[metricKey]*MetricRef{}, - wn: map[metricKeyNamed]*MetricRef{}, + w: map[metricKey]*bucket{}, + wn: map[metricKeyNamed]*bucket{}, env: defaultEnv, regularFuncs: map[int]func(*Client){}, + tsUnixSec: uint32(time.Now().Unix()), } go c.run() return c @@ -127,51 +151,61 @@ type NamedTags [][2]string // Tags are used to call [*Client.Metric]. type Tags [maxTags]string -type metricKeyValue struct { - k metricKey - v *MetricRef -} - -type metricKeyValueNamed struct { - k metricKeyNamed - v *MetricRef -} - // Client manages metric aggregation and transport to a StatsHouse agent. type Client struct { - confMu sync.Mutex - logf LoggerFunc - network string - addr string - conn net.Conn - - writeErrTime time.Time // we use it to reduce # of errors reported - - closeOnce sync.Once - closeErr error - close chan chan struct{} - packetBuf []byte - batchCount int - cur *MetricRef - - mu sync.RWMutex - w map[metricKey]*MetricRef - r []metricKeyValue - wn map[metricKeyNamed]*MetricRef - rn []metricKeyValueNamed - env string // if set, will be put into key0/env + // logging + logMu sync.Mutex + logf LoggerFunc + writeErrTime time.Time // to reduce # of errors reported + + // transport + transportMu sync.RWMutex + env string // if set, will be put into key0/env + network string + addr string + conn net.Conn + packetBuf []byte + batchCount int + + // data + mu sync.RWMutex // taken after [bucket.mu] + w map[metricKey]*bucket + r []*bucket + wn map[metricKeyNamed]*bucket + rn []*bucket + tsUnixSec uint32 + + // external callbacks regularFuncsMu sync.Mutex regularFuncs map[int]func(*Client) nextRegularID int + + // shutdown + closeOnce sync.Once + closeErr error + close chan chan struct{} + + // debug + bucketCount *atomic.Int32 } // SetEnv changes the default environment associated with [Client]. func (c *Client) SetEnv(env string) { - c.mu.Lock() - defer c.mu.Unlock() + c.transportMu.Lock() + defer c.transportMu.Unlock() c.env = env } +// For debug purposes. Should be called before sending metrics. +func (c *Client) TrackBucketCount() { + c.bucketCount = &atomic.Int32{} +} + +// For debug purposes. Panics if [Client.TrackBucketCount] wasn't called. +func (c *Client) BucketCount() int32 { + return c.bucketCount.Load() +} + // Close the [Client] and flush unsent metrics to the StatsHouse agent. // No data will be sent after Close has returned. func (c *Client) Close() error { @@ -180,8 +214,8 @@ func (c *Client) Close() error { c.close <- ch <-ch - c.confMu.Lock() - defer c.confMu.Unlock() + c.transportMu.Lock() + defer c.transportMu.Unlock() if c.conn != nil { c.closeErr = c.conn.Close() @@ -225,14 +259,18 @@ func (c *Client) callRegularFuncs(regularCache []func(*Client)) []func(*Client) } func (c *Client) configure(logf LoggerFunc, network string, statsHouseAddr string, env string) { - c.mu.Lock() - defer c.mu.Unlock() + // update logger first + c.logMu.Lock() + if logf != nil { + c.logf = logf + } else { + c.logf = log.Printf + } + c.logMu.Unlock() + // then transport + c.transportMu.Lock() + defer c.transportMu.Unlock() c.env = env - - c.confMu.Lock() - defer c.confMu.Unlock() - - c.logf = logf c.network = network c.addr = statsHouseAddr if c.conn != nil { @@ -249,55 +287,49 @@ func (c *Client) configure(logf LoggerFunc, network string, statsHouseAddr strin } func (c *Client) getLog() LoggerFunc { - c.confMu.Lock() - defer c.confMu.Unlock() + c.logMu.Lock() + defer c.logMu.Unlock() return c.logf // return func instead of calling it here to not alter the callstack information in the log } -func tillNextHalfPeriod(now time.Time) time.Duration { - return now.Truncate(defaultSendPeriod).Add(defaultSendPeriod * 3 / 2).Sub(now) -} - func (c *Client) run() { var regularCache []func(*Client) - tick := time.After(tillNextHalfPeriod(time.Now())) + // get a resettable timer + timer := time.NewTimer(time.Hour) + if !timer.Stop() { + <-timer.C + } + c.mu.Lock() + now := time.Unix(int64(c.tsUnixSec), 0) + c.mu.Unlock() + // loop for { + d := now.Truncate(defaultSendPeriod).Add(defaultSendPeriod).Sub(now) + if d == 0 { + d = time.Second + } + timer.Reset(d) select { - case now := <-tick: + case now = <-timer.C: regularCache = c.callRegularFuncs(regularCache[:0]) - c.send() - tick = time.After(tillNextHalfPeriod(now)) + c.send(uint32(now.Unix())) + now = time.Now() case ch := <-c.close: - c.send() // last send: we will lose all metrics produced "after" + c.send(0) // last send: we will lose all metrics produced "after" close(ch) return } } } -func (c *Client) load() ([]metricKeyValue, []metricKeyValueNamed, string) { - c.mu.Lock() - defer c.mu.Unlock() - - return c.r, c.rn, c.env -} - -func (c *Client) swapToCur(s *MetricRef) { - n := atomicLoadFloat64(&s.atomicCount) - for !atomicCASFloat64(&s.atomicCount, n, 0) { - n = atomicLoadFloat64(&s.atomicCount) - } - atomicStoreFloat64(&c.cur.atomicCount, n) - - s.mu.Lock() - defer s.mu.Unlock() - - c.cur.value = append(c.cur.value[:0], s.value...) - c.cur.unique = append(c.cur.unique[:0], s.unique...) - c.cur.stop = append(c.cur.stop[:0], s.stop...) - s.value = s.value[:0] - s.unique = s.unique[:0] - s.stop = s.stop[:0] +func (b *bucket) swapToSend(nowUnixSec uint32) { + b.mu.Lock() + defer b.mu.Unlock() + b.tsUnixSec = nowUnixSec + b.count, b.countToSend = 0, b.count + b.value, b.valueToSend = b.valueToSend[:0], b.value + b.unique, b.uniqueToSend = b.uniqueToSend[:0], b.unique + b.stop, b.stopToSend = b.stopToSend[:0], b.stop } type metricKeyTransport struct { @@ -316,48 +348,114 @@ func fillTag(k *metricKeyTransport, tagName string, tagValue string) { k.hasEnv = k.hasEnv || tagName == "0" || tagName == "env" || tagName == "key0" // TODO - keep only "0", rest are legacy } -func (c *Client) send() { - ss, ssn, env := c.load() - for _, s := range ss { - k := metricKeyTransport{name: s.k.name} - for i, v := range s.k.tags { - fillTag(&k, tagIDs[i], v) +func (c *Client) send(nowUnixSec uint32) { + // load & switch second + var ss, ssn []*bucket + c.mu.Lock() + ss, ssn = c.r, c.rn + sendUnixSec := c.tsUnixSec + if nowUnixSec != 0 && sendUnixSec+1 != nowUnixSec { + c.getLog()("[statshouse] sending metrics takes too long!") + } + c.tsUnixSec = nowUnixSec + c.mu.Unlock() + // swap + for i := 0; i < len(ss); i++ { + ss[i].swapToSend(nowUnixSec) + } + for i := 0; i < len(ssn); i++ { + ssn[i].swapToSend(nowUnixSec) + } + // send + c.transportMu.Lock() + for i := 0; i < len(ss); i++ { + if ss[i].emptySend() { + continue } - if !k.hasEnv { - fillTag(&k, "0", env) + ss[i].send(c, sendUnixSec) + } + for i := 0; i < len(ssn); i++ { + if ssn[i].emptySend() { + continue } - - c.swapToCur(s.v) - if n := atomicLoadFloat64(&c.cur.atomicCount); n > 0 { - c.sendCounter(&k, "", n, 0) + ssn[i].send(c, sendUnixSec) + } + c.flush() + c.transportMu.Unlock() + // remove unused & compact + i, n := 0, len(ss) + for i < n { + b := c.r[i] + if !b.emptySend() { + i++ + b.emptySendCount = 0 + continue } - c.sendValues(&k, "", 0, 0, c.cur.value) - c.sendUniques(&k, "", 0, 0, c.cur.unique) - for _, skey := range c.cur.stop { - c.sendCounter(&k, skey, 1, 0) + if b.emptySendCount < maxEmptySendCount { + i++ + b.emptySendCount++ + continue + } else { + b.emptySendCount = 0 } + // remove + b.mu.Lock() + c.mu.Lock() + n-- + c.w[b.k] = nil // release bucket reference + delete(c.w, b.k) + c.r[i] = c.r[n] + c.r[n] = nil // release bucket reference + b.attached = false + c.mu.Unlock() + b.mu.Unlock() } - for _, s := range ssn { - k := metricKeyTransport{name: s.k.name} - for _, v := range s.k.tags { - fillTag(&k, v[0], v[1]) + if d := len(ss) - n; d != 0 { + c.mu.Lock() + for i := len(ss); i < len(c.r); i++ { + c.r[i-d] = c.r[i] + c.r[i] = nil // release bucket reference } - if !k.hasEnv { - fillTag(&k, "0", env) + c.r = c.r[:len(c.r)-d] + c.mu.Unlock() + } + // remove unused & compact (named) + i, n = 0, len(ssn) + for i < n { + b := c.rn[i] + if !b.emptySend() { + i++ + b.emptySendCount = 0 + continue } - - c.swapToCur(s.v) - if n := atomicLoadFloat64(&c.cur.atomicCount); n > 0 { - c.sendCounter(&k, "", n, 0) + if b.emptySendCount < maxEmptySendCount { + i++ + b.emptySendCount++ + continue + } else { + b.emptySendCount = 0 } - c.sendValues(&k, "", 0, 0, c.cur.value) - c.sendUniques(&k, "", 0, 0, c.cur.unique) - for _, skey := range c.cur.stop { - c.sendCounter(&k, skey, 1, 0) + // remove + b.mu.Lock() + c.mu.Lock() + n-- + c.wn[b.kn] = nil // release bucket reference + delete(c.wn, b.kn) + c.rn[i] = c.rn[n] + c.rn[n] = nil // release bucket reference + b.attached = false + c.mu.Unlock() + b.mu.Unlock() + } + if d := len(ssn) - n; d != 0 { + c.mu.Lock() + for i := len(ssn); i < len(c.rn); i++ { + c.rn[i-d] = c.rn[i] + c.rn[i] = nil // release bucket reference } + c.rn = c.rn[:len(c.rn)-d] + c.mu.Unlock() } - - c.flush() } func (c *Client) sendCounter(k *metricKeyTransport, skey string, counter float64, tsUnixSec uint32) { @@ -419,9 +517,6 @@ func (c *Client) flush() { c.packetBuf = c.packetBuf[:batchHeaderLen] c.batchCount = 0 - c.confMu.Lock() - defer c.confMu.Unlock() - if c.conn == nil && c.addr != "" { conn, err := net.Dial(c.network, c.addr) if err != nil { @@ -436,7 +531,7 @@ func (c *Client) flush() { now := time.Now() if now.Sub(c.writeErrTime) > errorReportingPeriod { c.writeErrTime = now - c.logf("[statshouse] failed to send data to statshouse: %v", err) // not using getLog() because confMu is already locked + c.getLog()("[statshouse] failed to send data to statshouse: %v", err) } } } @@ -499,51 +594,40 @@ func (c *Client) writeTag(tagName string, tagValue string) { c.packetBuf = basictl.StringWriteTruncated(c.packetBuf, tagValue) } -// Metric is the preferred way to access [MetricRef] to record observations. -// Metric calls should be encapsulated in helper functions. Direct calls like -// -// statshouse.Metric("packet_size", statshouse.Tags{1: "ok"}).Value(float64(len(pkg))) -// -// should be replaced with calls via higher-level helper functions: -// -// RecordPacketSize(true, len(pkg)) -// -// func RecordPacketSize(ok bool, size int) { -// status := "fail" -// if ok { -// status = "ok" -// } -// statshouse.Metric("packet_size", statshouse.Tags{1: status}).Value(float64(size)) -// } -// -// As an optimization, it is possible to save the result of Metric for later use: -// -// var countPacketOK = statshouse.Metric("foo", statshouse.Tags{1: "ok"}) +// MetricRef is the preferred way to record observations, save the result for later use: // +// var countPacketOK = statshouse.MetricRef("foo", statshouse.Tags{1: "ok"}) // countPacketOK.Count(1) // lowest overhead possible -func (c *Client) Metric(metric string, tags Tags) *MetricRef { +func (c *Client) MetricRef(metric string, tags Tags) MetricRef { // We must do absolute minimum of work here k := metricKey{name: metric, tags: tags} c.mu.RLock() e, ok := c.w[k] c.mu.RUnlock() if ok { - return e + return MetricRef{e} } - c.mu.Lock() - e, ok = c.w[k] - if !ok { - e = &MetricRef{} - c.w[k] = e - c.r = append(c.r, metricKeyValue{k: k, v: e}) + defer c.mu.Unlock() + b := &bucket{ + c: c, + k: k, + tsUnixSec: c.tsUnixSec, + attached: true, } - c.mu.Unlock() - return e + if c.bucketCount != nil { + c.bucketCount.Add(1) + runtime.SetFinalizer(b, func(_ *bucket) { + c.bucketCount.Add(-1) + }) + } + c.w[k] = b + c.r = append(c.r, b) + return MetricRef{b} } -// MetricNamed is similar to [*Client.Metric] but slightly slower, and allows to specify tags by name. -func (c *Client) MetricNamed(metric string, tags NamedTags) *MetricRef { +// MetricNamedRef is similar to [*Client.MetricRef] but slightly slower, and allows to specify tags by name. +func (c *Client) MetricNamedRef(metric string, tags NamedTags) MetricRef { // We must do absolute minimum of work here k := metricKeyNamed{name: metric} copy(k.tags[:], tags) @@ -552,90 +636,341 @@ func (c *Client) MetricNamed(metric string, tags NamedTags) *MetricRef { e, ok := c.wn[k] c.mu.RUnlock() if ok { - return e + return MetricRef{e} } - c.mu.Lock() - e, ok = c.wn[k] - if !ok { - e = &MetricRef{} - c.wn[k] = e - c.rn = append(c.rn, metricKeyValueNamed{k: k, v: e}) + defer c.mu.Unlock() + b := &bucket{ + c: c, + kn: k, + tsUnixSec: c.tsUnixSec, + attached: true, } - c.mu.Unlock() - return e + if c.bucketCount != nil { + c.bucketCount.Add(1) + runtime.SetFinalizer(b, func(_ *bucket) { + c.bucketCount.Add(-1) + }) + } + c.wn[k] = b + c.rn = append(c.rn, b) + return MetricRef{b} +} + +// Deprecated: causes unnecessary memory allocation. +// Use either [Client.MetricRef] or direct write func like [Client.Count]. +func (c *Client) Metric(metric string, tags Tags) *MetricRef { + v := c.MetricRef(metric, tags) + return &v +} + +// Deprecated: causes unnecessary memory allocation. +// Use either [Client.MetricNamedRef] or direct write func like [Client.Count]. +func (c *Client) MetricNamed(metric string, tags NamedTags) *MetricRef { + v := c.MetricNamedRef(metric, tags) + return &v } // MetricRef pointer is obtained via [*Client.Metric] or [*Client.MetricNamed] // and is used to record attributes of observed events. type MetricRef struct { - // Place atomics first to ensure proper alignment, see https://pkg.go.dev/sync/atomic#pkg-note-BUG - atomicCount uint64 + *bucket +} + +type bucket struct { + // readonly + c *Client + k metricKey + kn metricKeyNamed - mu sync.Mutex - value []float64 - unique []int64 - stop []string + mu sync.Mutex // taken before [Client.mu] + tsUnixSec uint32 + attached bool + count float64 + value []float64 + unique []int64 + stop []string + + // access only by "send" goroutine, not protected + countToSend float64 + valueToSend []float64 + uniqueToSend []int64 + stopToSend []string + emptySendCount int } // Count records the number of events or observations. func (m *MetricRef) Count(n float64) { - c := atomicLoadFloat64(&m.atomicCount) - for !atomicCASFloat64(&m.atomicCount, c, c+n) { - c = atomicLoadFloat64(&m.atomicCount) - } + m.CountHistoric(n, 0) +} + +func (m *MetricRef) CountHistoric(n float64, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.count += n + }) +} + +func (c *Client) Count(name string, tags Tags, n float64) { + m := c.MetricRef(name, tags) + m.Count(n) +} + +func (c *Client) CountHistoric(name string, tags Tags, n float64, tsUnixSec uint32) { + m := c.MetricRef(name, tags) + m.CountHistoric(n, tsUnixSec) +} + +func (c *Client) NamedCount(name string, tags NamedTags, n float64) { + m := c.MetricNamedRef(name, tags) + m.Count(n) +} + +func (c *Client) NamedCountHistoric(name string, tags NamedTags, n float64, tsUnixSec uint32) { + m := c.MetricNamedRef(name, tags) + m.CountHistoric(n, tsUnixSec) } // Value records the observed value for distribution estimation. func (m *MetricRef) Value(value float64) { - m.mu.Lock() - m.value = append(m.value, value) - m.mu.Unlock() + m.ValueHistoric(value, 0) +} + +func (m *MetricRef) ValueHistoric(value float64, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.value = append(b.value, value) + }) +} + +func (c *Client) Value(name string, tags Tags, value float64) { + m := c.MetricRef(name, tags) + m.Value(value) +} + +func (c *Client) ValueHistoric(name string, tags Tags, value float64, tsUnixSec uint32) { + m := c.MetricRef(name, tags) + m.ValueHistoric(value, tsUnixSec) +} + +func (c *Client) NamedValue(name string, tags NamedTags, value float64) { + m := c.MetricNamedRef(name, tags) + m.Value(value) +} + +func (c *Client) NamedValueHistoric(name string, tags NamedTags, value float64, tsUnixSec uint32) { + m := c.MetricNamedRef(name, tags) + m.ValueHistoric(value, tsUnixSec) } // Values records the observed values for distribution estimation. func (m *MetricRef) Values(values []float64) { - m.mu.Lock() - m.value = append(m.value, values...) - m.mu.Unlock() + m.write(0, func(b *bucket) { + b.value = append(b.value, values...) + }) +} + +func (m *MetricRef) ValuesHistoric(values []float64, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.value = append(b.value, values...) + }) +} + +func (c *Client) Values(name string, tags Tags, values []float64) { + m := c.MetricRef(name, tags) + m.Values(values) +} + +func (c *Client) ValuesHistoric(name string, tags Tags, values []float64, tsUnixSec uint32) { + m := c.MetricRef(name, tags) + m.ValuesHistoric(values, tsUnixSec) +} + +func (c *Client) NamedValues(name string, tags NamedTags, values []float64) { + m := c.MetricNamedRef(name, tags) + m.Values(values) +} + +func (c *Client) NamedValuesHistoric(name string, tags NamedTags, values []float64, tsUnixSec uint32) { + m := c.MetricNamedRef(name, tags) + m.ValuesHistoric(values, tsUnixSec) } // Unique records the observed value for cardinality estimation. func (m *MetricRef) Unique(value int64) { - m.mu.Lock() - m.unique = append(m.unique, value) - m.mu.Unlock() + m.write(0, func(b *bucket) { + b.unique = append(b.unique, value) + }) +} + +func (m *MetricRef) UniqueHistoric(value int64, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.unique = append(b.unique, value) + }) +} + +func (c *Client) Unique(name string, tags Tags, value int64) { + m := c.MetricRef(name, tags) + m.Unique(value) +} + +func (c *Client) UniqueHistoric(name string, tags Tags, value int64, tsUnixSec uint32) { + m := c.MetricRef(name, tags) + m.UniqueHistoric(value, tsUnixSec) +} + +func (c *Client) NamedUnique(name string, tags NamedTags, value int64) { + m := c.MetricNamedRef(name, tags) + m.Unique(value) +} + +func (c *Client) NamedUniqueHistoric(name string, tags NamedTags, value int64, tsUnixSec uint32) { + m := c.MetricNamedRef(name, tags) + m.UniqueHistoric(value, tsUnixSec) } // Uniques records the observed values for cardinality estimation. func (m *MetricRef) Uniques(values []int64) { - m.mu.Lock() - m.unique = append(m.unique, values...) - m.mu.Unlock() + m.write(0, func(b *bucket) { + b.unique = append(b.unique, values...) + }) +} + +func (m *MetricRef) UniquesHistoric(values []int64, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.unique = append(b.unique, values...) + }) +} + +func (c *Client) Uniques(name string, tags Tags, values []int64) { + m := c.MetricRef(name, tags) + m.Uniques(values) +} + +func (c *Client) UniquesHistoric(name string, tags Tags, values []int64, tsUnixSec uint32) { + m := c.MetricRef(name, tags) + m.UniquesHistoric(values, tsUnixSec) +} + +func (c *Client) NamedUniques(name string, tags NamedTags, values []int64) { + m := c.MetricNamedRef(name, tags) + m.Uniques(values) +} + +func (c *Client) NamedUniquesHistoric(name string, tags NamedTags, values []int64, tsUnixSec uint32) { + m := c.MetricNamedRef(name, tags) + m.UniquesHistoric(values, tsUnixSec) } // StringTop records the observed value for popularity estimation. func (m *MetricRef) StringTop(value string) { - m.mu.Lock() - m.stop = append(m.stop, value) - m.mu.Unlock() + m.write(0, func(b *bucket) { + b.stop = append(b.stop, value) + }) } // StringsTop records the observed values for popularity estimation. func (m *MetricRef) StringsTop(values []string) { - m.mu.Lock() - m.stop = append(m.stop, values...) - m.mu.Unlock() + m.write(0, func(b *bucket) { + b.stop = append(b.stop, values...) + }) } -func atomicLoadFloat64(addr *uint64) float64 { - return math.Float64frombits(atomic.LoadUint64(addr)) +func (m *MetricRef) write(tsUnixSec uint32, fn func(*bucket)) { + m.bucket.mu.Lock() + tsZeroOrEqual := tsUnixSec == 0 || m.tsUnixSec <= tsUnixSec + if m.attached && tsZeroOrEqual { + // fast path + fn(m.bucket) + m.bucket.mu.Unlock() + return + } + c := m.c + if !tsZeroOrEqual { + m.bucket.mu.Unlock() + var b bucket + fn(&b) + c.transportMu.Lock() + b.send(c, tsUnixSec) + c.transportMu.Unlock() + return + } + // attach then write + for i := 0; i < 2; i++ { + var b *bucket + var emptyMetricName bool + c.mu.Lock() + if m.k.name != "" { + if b = c.w[m.k]; b == nil { + fn(m.bucket) + c.w[m.k] = m.bucket + c.r = append(c.r, m.bucket) + m.bucket.tsUnixSec = c.tsUnixSec + m.bucket.attached = true // attach current + } + } else if m.kn.name != "" { + if b = c.wn[m.kn]; b == nil { + fn(m.bucket) + c.wn[m.kn] = m.bucket + c.rn = append(c.rn, m.bucket) + m.bucket.tsUnixSec = c.tsUnixSec + m.bucket.attached = true // attach current + } + } else { + emptyMetricName = true + } + c.mu.Unlock() + if emptyMetricName { + m.mu.Unlock() + c.getLog()("[statshouse] metric name is empty, discarding") + return + } + if m.bucket.attached { // attached current + fn(m.bucket) + m.mu.Unlock() + return + } + m.bucket.mu.Unlock() + m.bucket = b + // now state is equvalent to what it was at function invocation, start over + b.mu.Lock() + if b.attached { + fn(b) + b.mu.Unlock() + return + } + } + m.mu.Unlock() + c.getLog()("[statshouse] safety counter limit reached, discarding") } -func atomicStoreFloat64(addr *uint64, val float64) { - atomic.StoreUint64(addr, math.Float64bits(val)) +func (b *bucket) send(c *Client, tsUnixSec uint32) { + var k metricKeyTransport + if b.k.name != "" { + k.name = b.k.name + for i, v := range b.k.tags { + fillTag(&k, tagIDs[i], v) + } + } else { + k.name = b.kn.name + for _, v := range b.kn.tags { + fillTag(&k, v[0], v[1]) + } + } + if !k.hasEnv { + fillTag(&k, "0", c.env) + } + if b.countToSend > 0 { + c.sendCounter(&k, "", b.countToSend, tsUnixSec) + } + c.sendValues(&k, "", 0, tsUnixSec, b.valueToSend) + c.sendUniques(&k, "", 0, tsUnixSec, b.uniqueToSend) + for _, skey := range b.stopToSend { + c.sendCounter(&k, skey, 1, tsUnixSec) + } } -func atomicCASFloat64(addr *uint64, old float64, new float64) (swapped bool) { - return atomic.CompareAndSwapUint64(addr, math.Float64bits(old), math.Float64bits(new)) +func (b *bucket) emptySend() bool { + return b.countToSend == 0 && + len(b.valueToSend) == 0 && + len(b.uniqueToSend) == 0 && + len(b.stopToSend) == 0 } diff --git a/statshouse_test.go b/statshouse_test.go index 766aaa6..715c575 100644 --- a/statshouse_test.go +++ b/statshouse_test.go @@ -7,9 +7,15 @@ package statshouse_test import ( + "context" + "fmt" + "runtime" + "strconv" "sync" "testing" + "time" + "github.com/stretchr/testify/require" "github.com/vkcom/statshouse-go" ) @@ -22,27 +28,70 @@ func TestCountRace(t *testing.T) { go func() { defer wg.Done() for j := 0; j < 1000; j++ { - c.Metric("test_stat", statshouse.Tags{1: "hello", 2: "world"}).Count(float64(j)) + c.Count("test_stat", statshouse.Tags{1: "hello", 2: "world"}, float64(j)) } }() } wg.Wait() } +func TestBucketRelease(t *testing.T) { + var wg sync.WaitGroup + c := statshouse.NewClient(t.Logf, "udp", "" /* avoid sending anything */, "") + c.TrackBucketCount() + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + for i := 1; i <= 10; i++ { + wg.Add(2) + go func(i int) { + defer wg.Done() + name := fmt.Sprintf("test_metric%d", i) + var tags statshouse.NamedTags + for j := 0; j < i; j++ { + tags = append(tags, [2]string{strconv.Itoa(j), fmt.Sprintf("name%d", j)}) + } + n := 0 + for ; ctx.Err() == nil; n++ { + c.MetricNamed(name, tags).Count(float64(i)) + } + t.Logf("send # %d", n) + }(i) + go func(i int) { + defer wg.Done() + name := fmt.Sprintf("test_metric%d", i) + var tags statshouse.Tags + for j := 0; j < i; j++ { + tags[j] = strconv.Itoa(j) + } + n := 0 + for ; ctx.Err() == nil; n++ { + c.Metric(name, tags).Count(float64(i)) + } + t.Logf("send # %d", n) + }(i) + } + wg.Wait() + runtime.GC() + for i := 0; i < 10 && c.BucketCount() != 0; i++ { + time.Sleep(time.Second) + runtime.GC() + } + require.Zero(t, c.BucketCount()) +} + func BenchmarkValue2(b *testing.B) { c := statshouse.NewClient(b.Logf, "udp", "" /* avoid sending anything */, "") + defer c.Close() b.ResetTimer() - for i := 0; i < b.N; i++ { - c.Metric("test_stat", statshouse.Tags{1: "hello", 2: "world"}).Value(float64(i)) + c.Value("test_stat", statshouse.Tags{1: "hello", 2: "world"}, float64(i)) } } func BenchmarkRawValue(b *testing.B) { c := statshouse.NewClient(b.Logf, "udp", "" /* avoid sending anything */, "") - s := c.Metric("test_stat", statshouse.Tags{1: "hello", 2: "world"}) + s := c.MetricRef("test_stat", statshouse.Tags{1: "hello", 2: "world"}) b.ResetTimer() - for i := 0; i < b.N; i++ { s.Value(float64(i)) } @@ -53,13 +102,13 @@ func BenchmarkCount4(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c.Metric("test_stat", statshouse.Tags{1: "hello", 2: "brave", 3: "new", 4: "world"}).Count(float64(i)) + c.Count("test_stat", statshouse.Tags{1: "hello", 2: "brave", 3: "new", 4: "world"}, float64(i)) } } func BenchmarkRawCount(b *testing.B) { c := statshouse.NewClient(b.Logf, "udp", "" /* avoid sending anything */, "") - s := c.Metric("test_stat", statshouse.Tags{1: "hello", 2: "brave", 3: "new", 4: "world"}) + s := c.MetricRef("test_stat", statshouse.Tags{1: "hello", 2: "brave", 3: "new", 4: "world"}) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -72,13 +121,13 @@ func BenchmarkLabeledValue2(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c.MetricNamed("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}}).Value(float64(i)) + c.NamedValue("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}}, float64(i)) } } func BenchmarkRawLabeledValue(b *testing.B) { c := statshouse.NewClient(b.Logf, "udp", "" /* avoid sending anything */, "") - s := c.MetricNamed("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}}) + s := c.MetricNamedRef("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}}) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -91,13 +140,13 @@ func BenchmarkLabeledCount4(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c.MetricNamed("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}, {"hello1", "world"}, {"world1", "hello"}}).Count(float64(i)) + c.NamedCount("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}, {"hello1", "world"}, {"world1", "hello"}}, float64(i)) } } func BenchmarkRawLabeledCount(b *testing.B) { c := statshouse.NewClient(b.Logf, "udp", "" /* avoid sending anything */, "") - s := c.MetricNamed("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}, {"hello1", "world"}, {"world1", "hello"}}) + s := c.MetricNamedRef("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}, {"hello1", "world"}, {"world1", "hello"}}) b.ResetTimer() for i := 0; i < b.N; i++ {