diff --git a/.github/workflows/ci-go.yml b/.github/workflows/ci-go.yml index 6cd7c76..411ac22 100644 --- a/.github/workflows/ci-go.yml +++ b/.github/workflows/ci-go.yml @@ -19,5 +19,6 @@ jobs: args: --timeout=10m --disable-all -E goimports - uses: dominikh/staticcheck-action@v1.2.0 with: + version: v0.4.7 install-go: false - run: go test -v ./... diff --git a/go.mod b/go.mod index 1339e51..f3a37f9 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,11 @@ module github.com/vkcom/statshouse-go go 1.19 + +require github.com/stretchr/testify v1.9.0 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..60ce688 --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/statshouse.go b/statshouse.go index 0c455d0..1f139db 100644 --- a/statshouse.go +++ b/statshouse.go @@ -9,8 +9,8 @@ package statshouse import ( "encoding/binary" "log" - "math" "net" + "runtime" "sync" "sync/atomic" "time" @@ -56,6 +56,14 @@ func ConfigureNetwork(logf LoggerFunc, network string, statsHouseAddr string, de globalClient.configure(logf, network, statsHouseAddr, defaultEnv) } +func TrackBucketCount() { + globalClient.TrackBucketCount() +} + +func BucketCount() { + globalClient.BucketCount() +} + // Close calls [*Client.Close] on the global [Client]. // Make sure to call Close during app exit to avoid losing the last batch of metrics. func Close() error { @@ -64,13 +72,23 @@ func Close() error { // Metric calls [*Client.Metric] on the global [Client]. // It is valid to call Metric before [Configure]. -func Metric(metric string, tags Tags) *MetricRef { +func MetricRef(metric string, tags Tags) metricRef { + return globalClient.MetricRef(metric, tags) +} + +// Deprecated: use either MetricRef or TODO. +func Metric(metric string, tags Tags) *metricRef { return globalClient.Metric(metric, tags) } // MetricNamed calls [*Client.MetricNamed] on the global [Client]. // It is valid to call MetricNamed before [Configure]. -func MetricNamed(metric string, tags NamedTags) *MetricRef { +func MetricNamedRef(metric string, tags NamedTags) metricRef { + return globalClient.MetricNamedRef(metric, tags) +} + +// Deprecated: use either MetricRef or TODO. +func MetricNamed(metric string, tags NamedTags) *metricRef { return globalClient.MetricNamed(metric, tags) } @@ -93,15 +111,17 @@ func StopRegularMeasurement(id int) { // Specifying empty StatsHouse address will make the client silently discard all metrics. // if you get compiler error after updating to recent version of library, pass statshouse.DefaultNetwork to network parameter func NewClient(logf LoggerFunc, network string, statsHouseAddr string, defaultEnv string) *Client { + if logf == nil { + logf = log.Printf + } c := &Client{ logf: logf, addr: statsHouseAddr, network: network, packetBuf: make([]byte, batchHeaderLen, maxPayloadSize), // can grow larger than maxPayloadSize if writing huge header close: make(chan chan struct{}), - cur: &MetricRef{}, - w: map[metricKey]*MetricRef{}, - wn: map[metricKeyNamed]*MetricRef{}, + w: map[metricKey]*bucket{}, + wn: map[metricKeyNamed]*bucket{}, env: defaultEnv, regularFuncs: map[int]func(*Client){}, } @@ -127,51 +147,59 @@ type NamedTags [][2]string // Tags are used to call [*Client.Metric]. type Tags [maxTags]string -type metricKeyValue struct { - k metricKey - v *MetricRef -} - -type metricKeyValueNamed struct { - k metricKeyNamed - v *MetricRef -} - // Client manages metric aggregation and transport to a StatsHouse agent. type Client struct { - confMu sync.Mutex - logf LoggerFunc - network string - addr string - conn net.Conn - - writeErrTime time.Time // we use it to reduce # of errors reported - - closeOnce sync.Once - closeErr error - close chan chan struct{} - packetBuf []byte - batchCount int - cur *MetricRef - - mu sync.RWMutex - w map[metricKey]*MetricRef - r []metricKeyValue - wn map[metricKeyNamed]*MetricRef - rn []metricKeyValueNamed - env string // if set, will be put into key0/env + // logging + logMu sync.Mutex + logf LoggerFunc + writeErrTime time.Time // to reduce # of errors reported + + // transport + transportMu sync.RWMutex + env string // if set, will be put into key0/env + network string + addr string + conn net.Conn + packetBuf []byte + batchCount int + + // data + mu sync.RWMutex + w map[metricKey]*bucket + r []*bucket + wn map[metricKeyNamed]*bucket + rn []*bucket + tsUnixSec uint32 + + // external callbacks regularFuncsMu sync.Mutex regularFuncs map[int]func(*Client) nextRegularID int + + // shutdown + closeOnce sync.Once + closeErr error + close chan chan struct{} + + // debug + bucketCount *atomic.Int32 } // SetEnv changes the default environment associated with [Client]. func (c *Client) SetEnv(env string) { - c.mu.Lock() - defer c.mu.Unlock() + c.transportMu.Lock() + defer c.transportMu.Unlock() c.env = env } +func (c *Client) TrackBucketCount() { + c.bucketCount = &atomic.Int32{} +} + +func (c *Client) BucketCount() int32 { + return c.bucketCount.Load() +} + // Close the [Client] and flush unsent metrics to the StatsHouse agent. // No data will be sent after Close has returned. func (c *Client) Close() error { @@ -180,8 +208,8 @@ func (c *Client) Close() error { c.close <- ch <-ch - c.confMu.Lock() - defer c.confMu.Unlock() + c.transportMu.Lock() + defer c.transportMu.Unlock() if c.conn != nil { c.closeErr = c.conn.Close() @@ -225,14 +253,18 @@ func (c *Client) callRegularFuncs(regularCache []func(*Client)) []func(*Client) } func (c *Client) configure(logf LoggerFunc, network string, statsHouseAddr string, env string) { - c.mu.Lock() - defer c.mu.Unlock() + // update logger first + c.logMu.Lock() + if logf != nil { + c.logf = logf + } else { + c.logf = log.Printf + } + c.logMu.Unlock() + // then transport + c.transportMu.Lock() + defer c.transportMu.Unlock() c.env = env - - c.confMu.Lock() - defer c.confMu.Unlock() - - c.logf = logf c.network = network c.addr = statsHouseAddr if c.conn != nil { @@ -249,55 +281,40 @@ func (c *Client) configure(logf LoggerFunc, network string, statsHouseAddr strin } func (c *Client) getLog() LoggerFunc { - c.confMu.Lock() - defer c.confMu.Unlock() + c.logMu.Lock() + defer c.logMu.Unlock() return c.logf // return func instead of calling it here to not alter the callstack information in the log } -func tillNextHalfPeriod(now time.Time) time.Duration { - return now.Truncate(defaultSendPeriod).Add(defaultSendPeriod * 3 / 2).Sub(now) -} - func (c *Client) run() { var regularCache []func(*Client) - tick := time.After(tillNextHalfPeriod(time.Now())) - for { + // get a resettable timer + timer := time.NewTimer(time.Hour) + if !timer.Stop() { + <-timer.C + } + // loop + for now := time.Now(); ; { + timer.Reset(now.Truncate(defaultSendPeriod).Sub(now)) select { - case now := <-tick: + case now = <-timer.C: regularCache = c.callRegularFuncs(regularCache[:0]) - c.send() - tick = time.After(tillNextHalfPeriod(now)) + c.send(uint32(now.Unix())) case ch := <-c.close: - c.send() // last send: we will lose all metrics produced "after" + c.send(uint32(time.Now().Unix())) // last send: we will lose all metrics produced "after" close(ch) return } } } -func (c *Client) load() ([]metricKeyValue, []metricKeyValueNamed, string) { - c.mu.Lock() - defer c.mu.Unlock() - - return c.r, c.rn, c.env -} - -func (c *Client) swapToCur(s *MetricRef) { - n := atomicLoadFloat64(&s.atomicCount) - for !atomicCASFloat64(&s.atomicCount, n, 0) { - n = atomicLoadFloat64(&s.atomicCount) - } - atomicStoreFloat64(&c.cur.atomicCount, n) - - s.mu.Lock() - defer s.mu.Unlock() - - c.cur.value = append(c.cur.value[:0], s.value...) - c.cur.unique = append(c.cur.unique[:0], s.unique...) - c.cur.stop = append(c.cur.stop[:0], s.stop...) - s.value = s.value[:0] - s.unique = s.unique[:0] - s.stop = s.stop[:0] +func (b *bucket) swapToSend() { + b.mu.Lock() + defer b.mu.Unlock() + b.count, b.countToSend = 0, b.count + b.value, b.valueToSend = b.valueToSend[:0], b.value + b.unique, b.uniqueToSend = b.uniqueToSend[:0], b.unique + b.stop, b.stopToSend = b.stopToSend[:0], b.stop } type metricKeyTransport struct { @@ -316,48 +333,100 @@ func fillTag(k *metricKeyTransport, tagName string, tagValue string) { k.hasEnv = k.hasEnv || tagName == "0" || tagName == "env" || tagName == "key0" // TODO - keep only "0", rest are legacy } -func (c *Client) send() { - ss, ssn, env := c.load() - for _, s := range ss { - k := metricKeyTransport{name: s.k.name} - for i, v := range s.k.tags { - fillTag(&k, tagIDs[i], v) +func (c *Client) send(tsUnixSec uint32) { + // load & switch second + var ss, ssn []*bucket + c.mu.Lock() + ss, ssn = c.r, c.rn + c.tsUnixSec = tsUnixSec + c.mu.Unlock() + // swap + for i := 0; i < len(ss); i++ { + ss[i].swapToSend() + } + for i := 0; i < len(ssn); i++ { + ssn[i].swapToSend() + } + // send + for i := 0; i < len(ss); i++ { + if ss[i].emptySend() { + continue } - if !k.hasEnv { - fillTag(&k, "0", env) + ss[i].send(c, tsUnixSec) + } + for i := 0; i < len(ssn); i++ { + if ssn[i].emptySend() { + continue } - - c.swapToCur(s.v) - if n := atomicLoadFloat64(&c.cur.atomicCount); n > 0 { - c.sendCounter(&k, "", n, 0) + ssn[i].send(c, tsUnixSec) + } + c.transportMu.Lock() + c.flush() + c.transportMu.Unlock() + // remove unused & compact + i, n := 0, len(ss) + for i < n { + b := c.r[i] + if !b.emptySend() { + i++ + continue } - c.sendValues(&k, "", 0, 0, c.cur.value) - c.sendUniques(&k, "", 0, 0, c.cur.unique) - for _, skey := range c.cur.stop { - c.sendCounter(&k, skey, 1, 0) + b.mu.Lock() + c.mu.Lock() + if b.empty() { + n-- + c.w[b.k] = nil // release bucket reference + delete(c.w, b.k) + c.r[i] = c.r[n] + c.r[n] = nil // release bucket reference + b.attached = false + } else { + i++ } + c.mu.Unlock() + b.mu.Unlock() } - for _, s := range ssn { - k := metricKeyTransport{name: s.k.name} - for _, v := range s.k.tags { - fillTag(&k, v[0], v[1]) + if d := len(ss) - n; d != 0 { + c.mu.Lock() + for i := len(ss); i < len(c.r); i++ { + c.r[i-d] = c.r[i] + c.r[i] = nil // release bucket reference } - if !k.hasEnv { - fillTag(&k, "0", env) + c.r = c.r[:len(c.r)-d] + c.mu.Unlock() + } + // remove unused & compact (named) + i, n = 0, len(ssn) + for i < n { + b := c.rn[i] + if !b.emptySend() { + i++ + continue } - - c.swapToCur(s.v) - if n := atomicLoadFloat64(&c.cur.atomicCount); n > 0 { - c.sendCounter(&k, "", n, 0) + b.mu.Lock() + c.mu.Lock() + if b.empty() { + n-- + c.wn[b.kn] = nil // release bucket reference + delete(c.wn, b.kn) + c.rn[i] = c.rn[n] + c.rn[n] = nil // release bucket reference + b.attached = false + } else { + i++ } - c.sendValues(&k, "", 0, 0, c.cur.value) - c.sendUniques(&k, "", 0, 0, c.cur.unique) - for _, skey := range c.cur.stop { - c.sendCounter(&k, skey, 1, 0) + c.mu.Unlock() + b.mu.Unlock() + } + if d := len(ssn) - n; d != 0 { + c.mu.Lock() + for i := len(ssn); i < len(c.rn); i++ { + c.rn[i-d] = c.rn[i] + c.rn[i] = nil // release bucket reference } + c.rn = c.rn[:len(c.rn)-d] + c.mu.Unlock() } - - c.flush() } func (c *Client) sendCounter(k *metricKeyTransport, skey string, counter float64, tsUnixSec uint32) { @@ -419,9 +488,6 @@ func (c *Client) flush() { c.packetBuf = c.packetBuf[:batchHeaderLen] c.batchCount = 0 - c.confMu.Lock() - defer c.confMu.Unlock() - if c.conn == nil && c.addr != "" { conn, err := net.Dial(c.network, c.addr) if err != nil { @@ -430,13 +496,13 @@ func (c *Client) flush() { } c.conn = conn } - if c.conn != nil && c.addr != "" { + if c.conn != nil { _, err := c.conn.Write(data) if err != nil { now := time.Now() if now.Sub(c.writeErrTime) > errorReportingPeriod { c.writeErrTime = now - c.logf("[statshouse] failed to send data to statshouse: %v", err) // not using getLog() because confMu is already locked + c.getLog()("[statshouse] failed to send data to statshouse: %v", err) } } } @@ -499,7 +565,7 @@ func (c *Client) writeTag(tagName string, tagValue string) { c.packetBuf = basictl.StringWriteTruncated(c.packetBuf, tagValue) } -// Metric is the preferred way to access [MetricRef] to record observations. +// Metric is the preferred way to access [metricRef] to record observations. // Metric calls should be encapsulated in helper functions. Direct calls like // // statshouse.Metric("packet_size", statshouse.Tags{1: "ok"}).Value(float64(len(pkg))) @@ -521,29 +587,27 @@ func (c *Client) writeTag(tagName string, tagValue string) { // var countPacketOK = statshouse.Metric("foo", statshouse.Tags{1: "ok"}) // // countPacketOK.Count(1) // lowest overhead possible -func (c *Client) Metric(metric string, tags Tags) *MetricRef { +func (c *Client) MetricRef(metric string, tags Tags) metricRef { // We must do absolute minimum of work here k := metricKey{name: metric, tags: tags} c.mu.RLock() e, ok := c.w[k] c.mu.RUnlock() if ok { - return e + return metricRef{e} } - - c.mu.Lock() - e, ok = c.w[k] - if !ok { - e = &MetricRef{} - c.w[k] = e - c.r = append(c.r, metricKeyValue{k: k, v: e}) + b := &bucket{c: c, k: k} + if c.bucketCount != nil { + c.bucketCount.Add(1) + runtime.SetFinalizer(b, func(_ *bucket) { + c.bucketCount.Add(-1) + }) } - c.mu.Unlock() - return e + return metricRef{b} } -// MetricNamed is similar to [*Client.Metric] but slightly slower, and allows to specify tags by name. -func (c *Client) MetricNamed(metric string, tags NamedTags) *MetricRef { +// MetricNamedRef is similar to [*Client.MetricRef] but slightly slower, and allows to specify tags by name. +func (c *Client) MetricNamedRef(metric string, tags NamedTags) metricRef { // We must do absolute minimum of work here k := metricKeyNamed{name: metric} copy(k.tags[:], tags) @@ -552,90 +616,251 @@ func (c *Client) MetricNamed(metric string, tags NamedTags) *MetricRef { e, ok := c.wn[k] c.mu.RUnlock() if ok { - return e + return metricRef{e} } - - c.mu.Lock() - e, ok = c.wn[k] - if !ok { - e = &MetricRef{} - c.wn[k] = e - c.rn = append(c.rn, metricKeyValueNamed{k: k, v: e}) + b := &bucket{c: c, kn: k} + if c.bucketCount != nil { + c.bucketCount.Add(1) + runtime.SetFinalizer(b, func(_ *bucket) { + c.bucketCount.Add(-1) + }) } - c.mu.Unlock() - return e + return metricRef{b} +} + +// Deprecated: use either MetricRef or TODO. +func (c *Client) Metric(metric string, tags Tags) *metricRef { + v := c.MetricRef(metric, tags) + return &v +} + +// Deprecated: use either MetricNameRef or TODO. +func (c *Client) MetricNamed(metric string, tags NamedTags) *metricRef { + v := c.MetricNamedRef(metric, tags) + return &v } -// MetricRef pointer is obtained via [*Client.Metric] or [*Client.MetricNamed] +// metricRef pointer is obtained via [*Client.Metric] or [*Client.MetricNamed] // and is used to record attributes of observed events. -type MetricRef struct { - // Place atomics first to ensure proper alignment, see https://pkg.go.dev/sync/atomic#pkg-note-BUG - atomicCount uint64 +type metricRef struct { + *bucket +} + +type bucket struct { + // readonly + c *Client + k metricKey + kn metricKeyNamed - mu sync.Mutex + mu sync.Mutex // always taken before "c.mu" + count float64 value []float64 unique []int64 stop []string + + // values to send, access only by "send" goroutine, not protected + countToSend float64 + valueToSend []float64 + uniqueToSend []int64 + stopToSend []string + + tsUnixSec uint32 + attached bool } // Count records the number of events or observations. -func (m *MetricRef) Count(n float64) { - c := atomicLoadFloat64(&m.atomicCount) - for !atomicCASFloat64(&m.atomicCount, c, c+n) { - c = atomicLoadFloat64(&m.atomicCount) - } +func (m *metricRef) Count(n float64) { + m.write(0, func(b *bucket) { + b.count += n + }) +} + +func (m *metricRef) CountTS(n float64, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.count += n + }) } // Value records the observed value for distribution estimation. -func (m *MetricRef) Value(value float64) { - m.mu.Lock() - m.value = append(m.value, value) - m.mu.Unlock() +func (m *metricRef) Value(value float64) { + m.write(0, func(b *bucket) { + b.value = append(b.value, value) + }) +} + +func (m *metricRef) ValueTS(value float64, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.value = append(b.value, value) + }) } // Values records the observed values for distribution estimation. -func (m *MetricRef) Values(values []float64) { - m.mu.Lock() - m.value = append(m.value, values...) - m.mu.Unlock() +func (m *metricRef) Values(values []float64) { + m.write(0, func(b *bucket) { + b.value = append(b.value, values...) + }) +} + +func (m *metricRef) ValuesTS(values []float64, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.value = append(b.value, values...) + }) } // Unique records the observed value for cardinality estimation. -func (m *MetricRef) Unique(value int64) { - m.mu.Lock() - m.unique = append(m.unique, value) - m.mu.Unlock() +func (m *metricRef) Unique(value int64) { + m.write(0, func(b *bucket) { + b.unique = append(b.unique, value) + }) +} + +func (m *metricRef) UniqueTS(value int64, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.unique = append(b.unique, value) + }) } // Uniques records the observed values for cardinality estimation. -func (m *MetricRef) Uniques(values []int64) { - m.mu.Lock() - m.unique = append(m.unique, values...) - m.mu.Unlock() +func (m *metricRef) Uniques(values []int64) { + m.write(0, func(b *bucket) { + b.unique = append(b.unique, values...) + }) +} + +func (m *metricRef) UniquesTS(values []int64, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.unique = append(b.unique, values...) + }) } // StringTop records the observed value for popularity estimation. -func (m *MetricRef) StringTop(value string) { - m.mu.Lock() - m.stop = append(m.stop, value) - m.mu.Unlock() +func (m *metricRef) StringTop(value string) { + m.write(0, func(b *bucket) { + b.stop = append(b.stop, value) + }) +} + +func (m *metricRef) StringTopTS(value string, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.stop = append(b.stop, value) + }) } // StringsTop records the observed values for popularity estimation. -func (m *MetricRef) StringsTop(values []string) { - m.mu.Lock() - m.stop = append(m.stop, values...) +func (m *metricRef) StringsTop(values []string) { + m.write(0, func(b *bucket) { + b.stop = append(b.stop, values...) + }) +} + +func (m *metricRef) StringsTopTS(values []string, tsUnixSec uint32) { + m.write(tsUnixSec, func(b *bucket) { + b.stop = append(b.stop, values...) + }) +} + +func (m *metricRef) write(tsUnixSec uint32, fn func(*bucket)) { + m.bucket.mu.Lock() + tsZeroOrEqual := tsUnixSec == 0 || tsUnixSec == m.tsUnixSec + if m.attached && tsZeroOrEqual { + // fast path + fn(m.bucket) + m.bucket.mu.Unlock() + return + } + c := m.c + if !tsZeroOrEqual { + m.bucket.mu.Unlock() + var b bucket + fn(&b) + b.send(c, tsUnixSec) + return + } + // attach then write + for i := 0; i < 2; i++ { + var b *bucket + var emptyKey bool + c.mu.Lock() + if m.k.name != "" { + if b = c.w[m.k]; b == nil { + fn(m.bucket) + c.w[m.k] = m.bucket + c.r = append(c.r, m.bucket) + m.bucket.attached = true // attach current + } + } else if m.kn.name != "" { + if b = c.wn[m.kn]; b == nil { + fn(m.bucket) + c.wn[m.kn] = m.bucket + c.rn = append(c.rn, m.bucket) + m.bucket.attached = true // attach current + } + } else { + emptyKey = true // should not happen! + } + c.mu.Unlock() + if emptyKey { + m.mu.Unlock() + c.getLog()("[statshouse] metric key is empty, discarding") + return + } + if m.bucket.attached { // attached current + fn(m.bucket) + m.mu.Unlock() + return + } + m.bucket.mu.Unlock() + m.bucket = b + // now state is equvalent to what it was at function invocation, start over + b.mu.Lock() + if b.attached { + fn(b) + b.mu.Unlock() + return + } + } m.mu.Unlock() + c.getLog()("[statshouse] safety counter limit reached, discarding") } -func atomicLoadFloat64(addr *uint64) float64 { - return math.Float64frombits(atomic.LoadUint64(addr)) +func (b *bucket) send(c *Client, tsUnixSec uint32) { + c.transportMu.Lock() + defer c.transportMu.Unlock() + var k metricKeyTransport + if b.k.name != "" { + k.name = b.k.name + for i, v := range b.k.tags { + fillTag(&k, tagIDs[i], v) + } + } else { + k.name = b.kn.name + for _, v := range b.kn.tags { + fillTag(&k, v[0], v[1]) + } + } + if !k.hasEnv { + fillTag(&k, "0", c.env) + } + if b.countToSend > 0 { + c.sendCounter(&k, "", b.countToSend, tsUnixSec) + } + c.sendValues(&k, "", 0, tsUnixSec, b.valueToSend) + c.sendUniques(&k, "", 0, tsUnixSec, b.uniqueToSend) + for _, skey := range b.stopToSend { + c.sendCounter(&k, skey, 1, tsUnixSec) + } } -func atomicStoreFloat64(addr *uint64, val float64) { - atomic.StoreUint64(addr, math.Float64bits(val)) +func (b *bucket) empty() bool { + return b.count == 0 && + len(b.value) == 0 && + len(b.unique) == 0 && + len(b.stop) == 0 } -func atomicCASFloat64(addr *uint64, old float64, new float64) (swapped bool) { - return atomic.CompareAndSwapUint64(addr, math.Float64bits(old), math.Float64bits(new)) +func (b *bucket) emptySend() bool { + return b.countToSend == 0 && + len(b.valueToSend) == 0 && + len(b.uniqueToSend) == 0 && + len(b.stopToSend) == 0 } diff --git a/statshouse_test.go b/statshouse_test.go index 766aaa6..fa8537c 100644 --- a/statshouse_test.go +++ b/statshouse_test.go @@ -7,9 +7,15 @@ package statshouse_test import ( + "context" + "fmt" + "runtime" + "strconv" "sync" "testing" + "time" + "github.com/stretchr/testify/require" "github.com/vkcom/statshouse-go" ) @@ -29,10 +35,53 @@ func TestCountRace(t *testing.T) { wg.Wait() } +func TestBucketRelease(t *testing.T) { + var wg sync.WaitGroup + c := statshouse.NewClient(t.Logf, "udp", "" /* avoid sending anything */, "") + c.TrackBucketCount() + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + for i := 1; i <= 10; i++ { + wg.Add(2) + go func(i int) { + defer wg.Done() + name := fmt.Sprintf("test_metric%d", i) + var tags statshouse.NamedTags + for j := 0; j < i; j++ { + tags = append(tags, [2]string{strconv.Itoa(j), fmt.Sprintf("name%d", j)}) + } + n := 0 + for ; ctx.Err() == nil; n++ { + c.MetricNamed(name, tags).Count(float64(i)) + } + t.Logf("send # %d", n) + }(i) + go func(i int) { + defer wg.Done() + name := fmt.Sprintf("test_metric%d", i) + var tags statshouse.Tags + for j := 0; j < i; j++ { + tags[j] = strconv.Itoa(j) + } + n := 0 + for ; ctx.Err() == nil; n++ { + c.Metric(name, tags).Count(float64(i)) + } + t.Logf("send # %d", n) + }(i) + } + wg.Wait() + runtime.GC() + for i := 0; i < 10 && c.BucketCount() != 0; i++ { + time.Sleep(time.Second) + runtime.GC() + } + require.Zero(t, c.BucketCount()) +} + func BenchmarkValue2(b *testing.B) { c := statshouse.NewClient(b.Logf, "udp", "" /* avoid sending anything */, "") b.ResetTimer() - for i := 0; i < b.N; i++ { c.Metric("test_stat", statshouse.Tags{1: "hello", 2: "world"}).Value(float64(i)) } @@ -40,9 +89,8 @@ func BenchmarkValue2(b *testing.B) { func BenchmarkRawValue(b *testing.B) { c := statshouse.NewClient(b.Logf, "udp", "" /* avoid sending anything */, "") - s := c.Metric("test_stat", statshouse.Tags{1: "hello", 2: "world"}) + s := c.MetricRef("test_stat", statshouse.Tags{1: "hello", 2: "world"}) b.ResetTimer() - for i := 0; i < b.N; i++ { s.Value(float64(i)) } @@ -59,7 +107,7 @@ func BenchmarkCount4(b *testing.B) { func BenchmarkRawCount(b *testing.B) { c := statshouse.NewClient(b.Logf, "udp", "" /* avoid sending anything */, "") - s := c.Metric("test_stat", statshouse.Tags{1: "hello", 2: "brave", 3: "new", 4: "world"}) + s := c.MetricRef("test_stat", statshouse.Tags{1: "hello", 2: "brave", 3: "new", 4: "world"}) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -78,7 +126,7 @@ func BenchmarkLabeledValue2(b *testing.B) { func BenchmarkRawLabeledValue(b *testing.B) { c := statshouse.NewClient(b.Logf, "udp", "" /* avoid sending anything */, "") - s := c.MetricNamed("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}}) + s := c.MetricNamedRef("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}}) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -97,7 +145,7 @@ func BenchmarkLabeledCount4(b *testing.B) { func BenchmarkRawLabeledCount(b *testing.B) { c := statshouse.NewClient(b.Logf, "udp", "" /* avoid sending anything */, "") - s := c.MetricNamed("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}, {"hello1", "world"}, {"world1", "hello"}}) + s := c.MetricNamedRef("test_stat", statshouse.NamedTags{{"hello", "world"}, {"world", "hello"}, {"hello1", "world"}, {"world1", "hello"}}) b.ResetTimer() for i := 0; i < b.N; i++ {