Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/telemetry: revamp #2996

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions internal/newtelemetry/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package newtelemetry

import (
"net/http"

"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/types"
)

type ClientConfig struct {
// AgentlessURL is the full URL to the agentless telemetry endpoint.
// Defaults to https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry
AgentlessURL string

// AgentURL is the url to the agent without the path,
AgentURL string

// APIKey is the API key to use for sending telemetry, defaults to the env var DD_API_KEY.
APIKey string

// HTTPClient is the http client to use for sending telemetry, defaults to a http.DefaultClient copy.
HTTPClient http.RoundTripper
}

// MetricHandle can be used to submit different values for the same metric.
// MetricHandle is used to reduce lock contention when submitting metrics.
// This can also be used ephemerally to submit a single metric value like this:
//
// telemetry.Metric(telemetry.Appsec, "my-count", map[string]string{"tag1": "true", "tag2": "1.0"}).Submit(1.0)
type MetricHandle interface {
Submit(value float64)

flush()
}

// TelemetryLogger is the interface implementing the telemetry logs. Supports log deduplication. All methods are Thread-safe
// This is an interface for easier testing but all functions will be mirrored at the package level to call
// the global client.
type TelemetryLogger interface {
// WithTags creates a new Logger which will send a comma-separated list of tags with the next logs
WithTags(tags string) TelemetryLogger

// WithStackTrace creates a new Logger which will send a stack trace generated for each next log.
WithStackTrace() TelemetryLogger

// Error sends a telemetry log at the ERROR level
Error(text string)

// Warn sends a telemetry log at the WARN level
Warn(text string)

// Debug sends a telemetry log at the DEBUG level
Debug(text string)
}

// Integration is an integration that is configured to be traced.
type Integration struct {
Name string
Version string
AutoEnabled bool
Compatible bool
Error string
}

// Client constitutes all the functions available concurrently for the telemetry users. All methods are thread-safe
// This is an interface for easier testing but all functions will be mirrored at the package level to call
// the global client.
type Client interface {

// Count creates a new metric handle for the given parameters that can be used to submit values.
Count(namespace types.Namespace, name string, tags map[string]string) MetricHandle

// Rate creates a new metric handle for the given parameters that can be used to submit values.
Rate(namespace types.Namespace, name string, tags map[string]string) MetricHandle

// Gauge creates a new metric handle for the given parameters that can be used to submit values.
Gauge(namespace types.Namespace, name string, tags map[string]string) MetricHandle

// Distribution creates a new metric handle for the given parameters that can be used to submit values.
Distribution(namespace types.Namespace, name string, tags map[string]string) MetricHandle

// Logger returns an implementation of the Logger interface which sends telemetry logs.
Logger() TelemetryLogger

// ProductStarted declares a product to have started at the customer’s request
ProductStarted(product types.Namespace)

// ProductStopped declares a product to have being stopped by the customer
ProductStopped(product types.Namespace)

// ProductStartError declares that a product could not start because of the following error
ProductStartError(product types.Namespace, err error)

// AddAppConfig adds a key value pair to the app configuration and send the change to telemetry
// value has to be json serializable and the origin is the source of the change.
AddAppConfig(key string, value any, origin types.Origin)

// AddBulkAppConfig adds a list of key value pairs to the app configuration and sends the change to telemetry.
// Same as AddAppConfig but for multiple values.
AddBulkAppConfig(kvs map[string]any, origin types.Origin)

// MarkIntegrationAsLoaded marks an integration as loaded in the telemetry
MarkIntegrationAsLoaded(integration Integration)

// flush closes the client and flushes any remaining data.
flush()

// appStart sends the telemetry necessary to signal that the app is starting.
appStart() error

// appStop sends the telemetry necessary to signal that the app is stopping and calls Close()
appStop()
}
89 changes: 89 additions & 0 deletions internal/newtelemetry/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package newtelemetry

import (
"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/types"
)

// NewClient creates a new telemetry client with the given service, environment, and version and config.
func NewClient(service, env, version string, config ClientConfig) (Client, error) {
return nil, nil
}

type client struct{}

func (c client) MarkIntegrationAsLoaded(integration Integration) {
//TODO implement me
panic("implement me")
}

func (c client) Count(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
//TODO implement me
panic("implement me")
}

func (c client) Rate(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
//TODO implement me
panic("implement me")
}

func (c client) Gauge(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
//TODO implement me
panic("implement me")
}

func (c client) Distribution(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
//TODO implement me
panic("implement me")
}

func (c client) Logger() TelemetryLogger {
//TODO implement me
panic("implement me")
}

func (c client) ProductStarted(product types.Namespace) {
//TODO implement me
panic("implement me")
}

func (c client) ProductStopped(product types.Namespace) {
//TODO implement me
panic("implement me")
}

func (c client) ProductStartError(product types.Namespace, err error) {
//TODO implement me
panic("implement me")
}

func (c client) AddAppConfig(key string, value any, origin types.Origin) {
//TODO implement me
panic("implement me")
}

func (c client) AddBulkAppConfig(kvs map[string]any, origin types.Origin) {
//TODO implement me
panic("implement me")
}

func (c client) flush() {
//TODO implement me
panic("implement me")
}

func (c client) appStart() error {
//TODO implement me
panic("implement me")
}

func (c client) appStop() {
//TODO implement me
panic("implement me")
}

var _ Client = (*client)(nil)
41 changes: 41 additions & 0 deletions internal/newtelemetry/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package newtelemetry

import (
"net"
"net/http"
"time"
)

var (
// We copy the transport to avoid using the default one, as it might be
// augmented with tracing and we don't want these calls to be recorded.
// See https://golang.org/pkg/net/http/#DefaultTransport .
//orchestrion:ignore
defaultHTTPClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: 5 * time.Second,
}

// agentlessURL is the endpoint used to send telemetry in an agentless environment. It is
// also the default URL in case connecting to the agent URL fails.
agentlessURL = "https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry"

// defaultHeartbeatInterval is the default interval at which the agent sends a heartbeat.
defaultHeartbeatInterval = 60.0
)
89 changes: 89 additions & 0 deletions internal/newtelemetry/globalclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package newtelemetry

import (
"sync/atomic"

"gopkg.in/DataDog/dd-trace-go.v1/internal/newtelemetry/types"
)

var globalClient atomic.Pointer[Client]

// StartApp starts the telemetry client with the given client send the app-started telemetry and sets it as the global client.
func StartApp(client Client) error {
if err := client.appStart(); err != nil {
return err
}
SwapClient(client)
return nil
}

// SwapClient swaps the global client with the given client and flush the old client.
func SwapClient(client Client) {
if oldClient := globalClient.Swap(&client); oldClient != nil && *oldClient != nil {
(*oldClient).flush()
}
}

// StopApp creates the app-stopped telemetry, adding to the queue and flush all the queue before stopping the client.
func StopApp() {
if client := globalClient.Swap(nil); client != nil && *client != nil {
(*client).appStop()
(*client).flush()
}
}

func Count(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
return (*globalClient.Load()).Count(namespace, name, tags)
}

// Rate creates a new metric handle for the given parameters that can be used to submit values.
func Rate(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
return (*globalClient.Load()).Rate(namespace, name, tags)
}

// Gauge creates a new metric handle for the given parameters that can be used to submit values.
func Gauge(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
return (*globalClient.Load()).Gauge(namespace, name, tags)
}

// Distribution creates a new metric handle for the given parameters that can be used to submit values.
func Distribution(namespace types.Namespace, name string, tags map[string]string) MetricHandle {
return (*globalClient.Load()).Distribution(namespace, name, tags)
}

// Logger returns an implementation of the TelemetryLogger interface which sends telemetry logs.
func Logger() TelemetryLogger {
return (*globalClient.Load()).Logger()
}

// ProductStarted declares a product to have started at the customer’s request
func ProductStarted(product types.Namespace) {
(*globalClient.Load()).ProductStarted(product)
}

// ProductStopped declares a product to have being stopped by the customer
func ProductStopped(product types.Namespace) {
(*globalClient.Load()).ProductStopped(product)
}

// ProductStartError declares that a product could not start because of the following error
func ProductStartError(product types.Namespace, err error) {
(*globalClient.Load()).ProductStartError(product, err)
}

// AddAppConfig adds a key value pair to the app configuration and send the change to telemetry
// value has to be json serializable and the origin is the source of the change.
func AddAppConfig(key string, value any, origin types.Origin) {
(*globalClient.Load()).AddAppConfig(key, value, origin)
}

// AddBulkAppConfig adds a list of key value pairs to the app configuration and sends the change to telemetry.
// Same as AddAppConfig but for multiple values.
func AddBulkAppConfig(kvs map[string]any, origin types.Origin) {
(*globalClient.Load()).AddBulkAppConfig(kvs, origin)
}
47 changes: 47 additions & 0 deletions internal/newtelemetry/hot_pointer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024 Datadog, Inc.

package newtelemetry

import (
"sync"
)

// hotPointer is a type that allows for atomic swapping of a value while keeping the other on standby to prevent allocations.
type hotPointer[T any] struct {
// value is the current value that can be extracted.
value *T
// standby is the value that will be swapped in.
standby *T
// writeMu is used to lock the value.
writeMu sync.Mutex
// swapMu is used to lock the swap.
swapMu sync.Mutex
}

// Lock take the lock and return the current value
func (hp *hotPointer[T]) Lock() *T {
hp.writeMu.Lock()
return hp.value
}

// Unlock unlocks the lock
func (hp *hotPointer[T]) Unlock() {
hp.writeMu.Unlock()
}

// StandbyValue returns the standby value WITHOUT locking. Which means it cannot be used concurrently.
func (hp *hotPointer[T]) StandbyValue() *T {
return hp.standby
}

// Swap swaps the current value with the standby value and return the standby value using the lock.
// the value returned is NOT protected by the lock.
func (hp *hotPointer[T]) Swap() *T {
hp.Lock()
defer hp.Unlock()
hp.value, hp.standby = hp.standby, hp.value
return hp.standby
}
Loading
Loading