diff --git a/.chloggen/remove-jaegersampling.yaml b/.chloggen/remove-jaegersampling.yaml new file mode 100644 index 000000000000..082213eec43f --- /dev/null +++ b/.chloggen/remove-jaegersampling.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: extension/jaegerremotesampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: remove dependency on jaeger internal code + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36976] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/extension/jaegerremotesampling/extension.go b/extension/jaegerremotesampling/extension.go index 74f7d9467b99..ba9bbbc738da 100644 --- a/extension/jaegerremotesampling/extension.go +++ b/extension/jaegerremotesampling/extension.go @@ -7,13 +7,15 @@ import ( "context" "fmt" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" - "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/static" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/server/grpc" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/server/http" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source/filesource" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source/remotesource" ) var _ extension.Extension = (*jrsExtension)(nil) @@ -24,7 +26,7 @@ type jrsExtension struct { httpServer component.Component grpcServer component.Component - samplingStore samplingstrategy.Provider + samplingStore source.Source closers []func() error } @@ -44,11 +46,11 @@ func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error // - local file // we can then use a simplified logic here to assign the appropriate store if jrse.cfg.Source.File != "" { - opts := static.Options{ + opts := filesource.Options{ StrategiesFile: jrse.cfg.Source.File, ReloadInterval: jrse.cfg.Source.ReloadInterval, } - ss, err := static.NewProvider(opts, jrse.telemetry.Logger) + ss, err := filesource.NewFileSource(opts, jrse.telemetry.Logger) if err != nil { return fmt.Errorf("failed to create the local file strategy store: %w", err) } @@ -64,7 +66,7 @@ func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error return fmt.Errorf("failed to create the remote strategy store: %w", err) } jrse.closers = append(jrse.closers, conn.Close) - remoteStore, closer := internal.NewRemoteStrategyStore( + remoteStore, closer := remotesource.NewRemoteSource( conn, jrse.cfg.Source.Remote, jrse.cfg.Source.ReloadInterval, @@ -74,7 +76,7 @@ func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error } if jrse.cfg.HTTPServerConfig != nil { - httpServer, err := internal.NewHTTP(jrse.telemetry, *jrse.cfg.HTTPServerConfig, jrse.samplingStore) + httpServer, err := http.NewHTTP(jrse.telemetry, *jrse.cfg.HTTPServerConfig, jrse.samplingStore) if err != nil { return fmt.Errorf("error while creating the HTTP server: %w", err) } @@ -86,7 +88,7 @@ func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error } if jrse.cfg.GRPCServerConfig != nil { - grpcServer, err := internal.NewGRPC(jrse.telemetry, *jrse.cfg.GRPCServerConfig, jrse.samplingStore) + grpcServer, err := grpc.NewGRPC(jrse.telemetry, *jrse.cfg.GRPCServerConfig, jrse.samplingStore) if err != nil { return fmt.Errorf("error while creating the gRPC server: %w", err) } diff --git a/extension/jaegerremotesampling/go.mod b/extension/jaegerremotesampling/go.mod index f6225948128e..b68c879d5637 100644 --- a/extension/jaegerremotesampling/go.mod +++ b/extension/jaegerremotesampling/go.mod @@ -26,7 +26,6 @@ require ( ) require ( - github.com/apache/thrift v0.21.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect @@ -38,28 +37,16 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect - github.com/hashicorp/hcl v1.0.0 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.2 // indirect - github.com/magiconair/properties v1.8.7 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect - github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.2.3 // indirect - github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rs/cors v1.11.1 // indirect - github.com/sagikazarmark/locafero v0.4.0 // indirect - github.com/sagikazarmark/slog-shim v0.1.0 // indirect - github.com/sourcegraph/conc v0.3.0 // indirect - github.com/spf13/afero v1.11.0 // indirect - github.com/spf13/cast v1.6.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect - github.com/spf13/viper v1.19.0 // indirect - github.com/subosito/gotenv v1.6.0 // indirect go.opentelemetry.io/collector/client v1.23.0 // indirect go.opentelemetry.io/collector/config/configauth v0.117.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.23.0 // indirect @@ -75,13 +62,11 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect go.opentelemetry.io/otel/trace v1.32.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect google.golang.org/protobuf v1.36.2 // indirect - gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/extension/jaegerremotesampling/go.sum b/extension/jaegerremotesampling/go.sum index 8249f59d7cb5..1dabf1878bd2 100644 --- a/extension/jaegerremotesampling/go.sum +++ b/extension/jaegerremotesampling/go.sum @@ -1,21 +1,9 @@ -github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= -github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= -github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= -github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= -github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= -github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= -github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -39,10 +27,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= -github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= -github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= -github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jaegertracing/jaeger v1.64.0 h1:fc45qwaBZpBE2DeXwJlm8hK1q2PACrzGKXKgIcHLPMw= github.com/jaegertracing/jaeger v1.64.0/go.mod h1:9yVeL3z2Q9AGrW7j7TTeqohUy7Bbp9XWXgUDG3rVdO0= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= @@ -63,12 +47,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= -github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= -github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c h1:cqn374mizHuIWj+OSJCajGr/phAmuMug9qIX3l9CflE= -github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= @@ -77,55 +57,16 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mLPqKctH7Uo//I= github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= -github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= -github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= -github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= -github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.61.0 h1:3gv/GThfX0cV2lpO7gkTUwZru38mxevy90Bj8YFSRQQ= -github.com/prometheus/common v0.61.0/go.mod h1:zr29OCN/2BsJRaFwG8QOBr41D6kkchKbpeNH7pAjb/s= -github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= -github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= -github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= -github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= -github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= -github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= -github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= -github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= -github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= -github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= -github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= -github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= -github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM= -github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= -github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= -github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= -github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/collector v0.115.0 h1:qUZ0bTeNBudMxNQ7FJKS//TxTjeJ7tfU/z22mcFavWU= @@ -181,8 +122,6 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 h1:DheMAlT go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0/go.mod h1:wZcGmeVO9nzP67aYSLDqXNWK87EZWhi7JWj1v7ZXf94= go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= -go.opentelemetry.io/otel/exporters/prometheus v0.54.0 h1:rFwzp68QMgtzu9PgP3jm9XaMICI6TsofWWPcBDKwlsU= -go.opentelemetry.io/otel/exporters/prometheus v0.54.0/go.mod h1:QyjcV9qDP6VeK5qPyKETvNjmaaEc7+gqjh4SS0ZYzDU= go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= @@ -200,8 +139,6 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= -golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -239,8 +176,5 @@ google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojt gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= -gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/extension/jaegerremotesampling/internal/internal_test.go b/extension/jaegerremotesampling/internal/internal_test.go deleted file mode 100644 index 5304740b4171..000000000000 --- a/extension/jaegerremotesampling/internal/internal_test.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal - -import ( - "context" - - "github.com/jaegertracing/jaeger/proto-gen/api_v2" -) - -type mockCfgMgr struct { - getSamplingStrategyFunc func(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) -} - -func (m *mockCfgMgr) Close() error { - return nil -} - -func (m *mockCfgMgr) GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) { - if m.getSamplingStrategyFunc != nil { - return m.getSamplingStrategyFunc(ctx, serviceName) - } - return &api_v2.SamplingStrategyResponse{}, nil -} diff --git a/extension/jaegerremotesampling/internal/mocks/mock_source.go b/extension/jaegerremotesampling/internal/mocks/mock_source.go new file mode 100644 index 000000000000..66934f31fe8f --- /dev/null +++ b/extension/jaegerremotesampling/internal/mocks/mock_source.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package mocks // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/mocks" + +import ( + "context" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" +) + +type MockCfgMgr struct { + GetSamplingStrategyFunc func(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) +} + +func (m *MockCfgMgr) Close() error { + return nil +} + +func (m *MockCfgMgr) GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) { + if m.GetSamplingStrategyFunc != nil { + return m.GetSamplingStrategyFunc(ctx, serviceName) + } + return &api_v2.SamplingStrategyResponse{}, nil +} diff --git a/extension/jaegerremotesampling/internal/grpc.go b/extension/jaegerremotesampling/internal/server/grpc/grpc.go similarity index 80% rename from extension/jaegerremotesampling/internal/grpc.go rename to extension/jaegerremotesampling/internal/server/grpc/grpc.go index 5b614f90fae4..3094d90b7435 100644 --- a/extension/jaegerremotesampling/internal/grpc.go +++ b/extension/jaegerremotesampling/internal/server/grpc/grpc.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal" +package grpc // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/server/grpc" import ( "context" @@ -9,8 +9,6 @@ import ( "fmt" "net" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" @@ -18,11 +16,16 @@ import ( "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source" ) var _ component.Component = (*SamplingGRPCServer)(nil) -var errGRPCServerNotRunning = errors.New("gRPC server is not running") +var ( + errMissingStrategyStore = errors.New("the strategy store has not been provided") + errGRPCServerNotRunning = errors.New("gRPC server is not running") +) type grpcServer interface { Serve(lis net.Listener) error @@ -34,7 +37,7 @@ type grpcServer interface { func NewGRPC( telemetry component.TelemetrySettings, settings configgrpc.ServerConfig, - strategyStore samplingstrategy.Provider, + strategyStore source.Source, ) (*SamplingGRPCServer, error) { if strategyStore == nil { return nil, errMissingStrategyStore @@ -51,7 +54,7 @@ func NewGRPC( type SamplingGRPCServer struct { telemetry component.TelemetrySettings settings configgrpc.ServerConfig - strategyStore samplingstrategy.Provider + strategyStore source.Source grpcServer grpcServer } @@ -64,7 +67,7 @@ func (s *SamplingGRPCServer) Start(ctx context.Context, host component.Host) err reflection.Register(server) s.grpcServer = server - api_v2.RegisterSamplingManagerServer(server, sampling.NewGRPCHandler(s.strategyStore)) + api_v2.RegisterSamplingManagerServer(server, NewGRPCHandler(s.strategyStore)) healthServer := health.NewServer() healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING) diff --git a/extension/jaegerremotesampling/internal/server/grpc/grpc_handler.go b/extension/jaegerremotesampling/internal/server/grpc/grpc_handler.go new file mode 100644 index 000000000000..230d67ebc5aa --- /dev/null +++ b/extension/jaegerremotesampling/internal/server/grpc/grpc_handler.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package grpc // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/server/grpc" + +import ( + "context" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source" +) + +// GRPCHandler is sampling strategy handler for gRPC. +type GRPCHandler struct { + samplingProvider source.Source +} + +// NewGRPCHandler creates a handler that controls sampling strategies for services. +func NewGRPCHandler(provider source.Source) GRPCHandler { + return GRPCHandler{ + samplingProvider: provider, + } +} + +// GetSamplingStrategy returns sampling decision from store. +func (s GRPCHandler) GetSamplingStrategy(ctx context.Context, param *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) { + return s.samplingProvider.GetSamplingStrategy(ctx, param.GetServiceName()) +} diff --git a/extension/jaegerremotesampling/internal/server/grpc/grpc_handler_test.go b/extension/jaegerremotesampling/internal/server/grpc/grpc_handler_test.go new file mode 100644 index 000000000000..a6197a620705 --- /dev/null +++ b/extension/jaegerremotesampling/internal/server/grpc/grpc_handler_test.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package grpc + +import ( + "context" + "errors" + "testing" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockSamplingStore struct{} + +func (mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) { + if serviceName == "error" { + return nil, errors.New("some error") + } else if serviceName == "nil" { + return nil, nil + } + return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil +} + +func (mockSamplingStore) Close() error { + return nil +} + +func TestNewGRPCHandler(t *testing.T) { + tests := []struct { + req *api_v2.SamplingStrategyParameters + resp *api_v2.SamplingStrategyResponse + err string + }{ + {req: &api_v2.SamplingStrategyParameters{ServiceName: "error"}, err: "some error"}, + {req: &api_v2.SamplingStrategyParameters{ServiceName: "nil"}, resp: nil}, + {req: &api_v2.SamplingStrategyParameters{ServiceName: "foo"}, resp: &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}}, + } + h := NewGRPCHandler(mockSamplingStore{}) + for _, test := range tests { + resp, err := h.GetSamplingStrategy(context.Background(), test.req) + if test.err != "" { + require.EqualError(t, err, test.err) + require.Nil(t, resp) + } else { + require.NoError(t, err) + assert.Equal(t, test.resp, resp) + } + } +} diff --git a/extension/jaegerremotesampling/internal/grpc_test.go b/extension/jaegerremotesampling/internal/server/grpc/grpc_test.go similarity index 94% rename from extension/jaegerremotesampling/internal/grpc_test.go rename to extension/jaegerremotesampling/internal/server/grpc/grpc_test.go index 40cc01de9998..92437c45fb87 100644 --- a/extension/jaegerremotesampling/internal/grpc_test.go +++ b/extension/jaegerremotesampling/internal/server/grpc/grpc_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal +package grpc import ( "context" @@ -14,6 +14,8 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confignet" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/mocks" ) func TestMissingClientConfigManagerGRPC(t *testing.T) { @@ -30,7 +32,7 @@ func TestStartAndStopGRPC(t *testing.T) { Transport: confignet.TransportTypeTCP, }, } - s, err := NewGRPC(componenttest.NewNopTelemetrySettings(), srvSettings, &mockCfgMgr{}) + s, err := NewGRPC(componenttest.NewNopTelemetrySettings(), srvSettings, &mocks.MockCfgMgr{}) require.NoError(t, err) require.NotNil(t, s) diff --git a/extension/jaegerremotesampling/internal/http.go b/extension/jaegerremotesampling/internal/server/http/http.go similarity index 88% rename from extension/jaegerremotesampling/internal/http.go rename to extension/jaegerremotesampling/internal/server/http/http.go index 6ab9f75cba78..7786ff675322 100644 --- a/extension/jaegerremotesampling/internal/http.go +++ b/extension/jaegerremotesampling/internal/server/http/http.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal" +package http // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/server/http" import ( "context" @@ -12,10 +12,11 @@ import ( "net/http" "sync" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/config/confighttp" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source" ) var errMissingStrategyStore = errors.New("the strategy store has not been provided") @@ -25,14 +26,14 @@ var _ component.Component = (*SamplingHTTPServer)(nil) type SamplingHTTPServer struct { telemetry component.TelemetrySettings settings confighttp.ServerConfig - strategyStore samplingstrategy.Provider + strategyStore source.Source mux *http.ServeMux srv *http.Server shutdownWG *sync.WaitGroup } -func NewHTTP(telemetry component.TelemetrySettings, settings confighttp.ServerConfig, strategyStore samplingstrategy.Provider) (*SamplingHTTPServer, error) { +func NewHTTP(telemetry component.TelemetrySettings, settings confighttp.ServerConfig, strategyStore source.Source) (*SamplingHTTPServer, error) { if strategyStore == nil { return nil, errMissingStrategyStore } diff --git a/extension/jaegerremotesampling/internal/http_test.go b/extension/jaegerremotesampling/internal/server/http/http_test.go similarity index 86% rename from extension/jaegerremotesampling/internal/http_test.go rename to extension/jaegerremotesampling/internal/server/http/http_test.go index 075073d82215..cdbac983f811 100644 --- a/extension/jaegerremotesampling/internal/http_test.go +++ b/extension/jaegerremotesampling/internal/server/http/http_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal +package http import ( "context" @@ -18,6 +18,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/mocks" ) func TestMissingClientConfigManagerHTTP(t *testing.T) { @@ -31,7 +33,7 @@ func TestStartAndStopHTTP(t *testing.T) { srvSettings := confighttp.ServerConfig{ Endpoint: "127.0.0.1:0", } - s, err := NewHTTP(componenttest.NewNopTelemetrySettings(), srvSettings, &mockCfgMgr{}) + s, err := NewHTTP(componenttest.NewNopTelemetrySettings(), srvSettings, &mocks.MockCfgMgr{}) require.NoError(t, err) require.NotNil(t, s) @@ -53,8 +55,8 @@ func TestEndpointsAreWired(t *testing.T) { for _, tC := range testCases { t.Run(tC.desc, func(t *testing.T) { // prepare - s, err := NewHTTP(componenttest.NewNopTelemetrySettings(), confighttp.ServerConfig{}, &mockCfgMgr{ - getSamplingStrategyFunc: func(_ context.Context, _ string) (*api_v2.SamplingStrategyResponse, error) { + s, err := NewHTTP(componenttest.NewNopTelemetrySettings(), confighttp.ServerConfig{}, &mocks.MockCfgMgr{ + GetSamplingStrategyFunc: func(_ context.Context, _ string) (*api_v2.SamplingStrategyResponse, error) { return &api_v2.SamplingStrategyResponse{ ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ SamplingRate: 1, @@ -87,7 +89,7 @@ func TestEndpointsAreWired(t *testing.T) { func TestServiceNameIsRequired(t *testing.T) { // prepare - s, err := NewHTTP(componenttest.NewNopTelemetrySettings(), confighttp.ServerConfig{}, &mockCfgMgr{}) + s, err := NewHTTP(componenttest.NewNopTelemetrySettings(), confighttp.ServerConfig{}, &mocks.MockCfgMgr{}) require.NoError(t, err) require.NotNil(t, s) @@ -105,12 +107,12 @@ func TestServiceNameIsRequired(t *testing.T) { } func TestErrorFromClientConfigManager(t *testing.T) { - s, err := NewHTTP(componenttest.NewNopTelemetrySettings(), confighttp.ServerConfig{}, &mockCfgMgr{}) + s, err := NewHTTP(componenttest.NewNopTelemetrySettings(), confighttp.ServerConfig{}, &mocks.MockCfgMgr{}) require.NoError(t, err) require.NotNil(t, s) - s.strategyStore = &mockCfgMgr{ - getSamplingStrategyFunc: func(_ context.Context, _ string) (*api_v2.SamplingStrategyResponse, error) { + s.strategyStore = &mocks.MockCfgMgr{ + GetSamplingStrategyFunc: func(_ context.Context, _ string) (*api_v2.SamplingStrategyResponse, error) { return nil, errors.New("some error") }, } diff --git a/extension/jaegerremotesampling/internal/source/filesource/constants.go b/extension/jaegerremotesampling/internal/source/filesource/constants.go new file mode 100644 index 000000000000..96f6ada52263 --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/constants.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package filesource // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source/filesource" + +import ( + "github.com/jaegertracing/jaeger/proto-gen/api_v2" +) + +const ( + // samplerTypeProbabilistic is the type of sampler that samples traces + // with a certain fixed probability. + samplerTypeProbabilistic = "probabilistic" + + // samplerTypeRateLimiting is the type of sampler that samples + // only up to a fixed number of traces per second. + samplerTypeRateLimiting = "ratelimiting" + + // defaultSamplingProbability is the default sampling probability the + // Strategy Store will use if none is provided. + defaultSamplingProbability = 0.001 +) + +// defaultStrategy is the default sampling strategy the Strategy Store will return +// if none is provided. +func defaultStrategyResponse() *api_v2.SamplingStrategyResponse { + return &api_v2.SamplingStrategyResponse{ + StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: defaultSamplingProbability, + }, + } +} + +func defaultStrategies() *storedStrategies { + s := &storedStrategies{ + serviceStrategies: make(map[string]*api_v2.SamplingStrategyResponse), + } + s.defaultStrategy = defaultStrategyResponse() + return s +} diff --git a/extension/jaegerremotesampling/internal/source/filesource/filesource.go b/extension/jaegerremotesampling/internal/source/filesource/filesource.go new file mode 100644 index 000000000000..f0334c3cbe1d --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/filesource.go @@ -0,0 +1,371 @@ +// Copyright The OpenTelemetry Authors +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package filesource // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source/filesource" + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "os" + "path/filepath" + "sync/atomic" + "time" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source" +) + +// null represents "null" JSON value and +// it un-marshals to nil pointer. +var nullJSON = []byte("null") + +type samplingProvider struct { + logger *zap.Logger + + storedStrategies atomic.Value // holds *storedStrategies + + cancelFunc context.CancelFunc + + options Options +} + +type storedStrategies struct { + defaultStrategy *api_v2.SamplingStrategyResponse + serviceStrategies map[string]*api_v2.SamplingStrategyResponse +} + +type strategyLoader func() ([]byte, error) + +// NewFileSource creates a strategy store that holds static sampling strategies. +func NewFileSource(options Options, logger *zap.Logger) (source.Source, error) { + ctx, cancelFunc := context.WithCancel(context.Background()) + h := &samplingProvider{ + logger: logger, + cancelFunc: cancelFunc, + options: options, + } + h.storedStrategies.Store(defaultStrategies()) + + if options.StrategiesFile == "" { + h.logger.Info("No sampling strategies source provided, using defaults") + return h, nil + } + + loadFn := h.samplingStrategyLoader(options.StrategiesFile) + strategies, err := loadStrategies(loadFn) + if err != nil { + return nil, err + } else if strategies == nil { + h.logger.Info("No sampling strategies found or URL is unavailable, using defaults") + return h, nil + } + + if !h.options.IncludeDefaultOpStrategies { + h.logger.Warn("Default operations level strategies will not be included for Ratelimiting service strategies." + + "This behavior will be changed in future releases. " + + "Cf. https://github.com/jaegertracing/jaeger/issues/5270") + h.parseStrategiesDeprecated(strategies) + } else { + h.parseStrategies(strategies) + } + + if options.ReloadInterval > 0 { + go h.autoUpdateStrategies(ctx, options.ReloadInterval, loadFn) + } + return h, nil +} + +// GetSamplingStrategy implements StrategyStore#GetSamplingStrategy. +func (h *samplingProvider) GetSamplingStrategy(_ context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) { + storedStrategies := h.storedStrategies.Load().(*storedStrategies) + serviceStrategies := storedStrategies.serviceStrategies + if strategy, ok := serviceStrategies[serviceName]; ok { + return strategy, nil + } + h.logger.Debug("sampling strategy not found, using default", zap.String("service", serviceName)) + return storedStrategies.defaultStrategy, nil +} + +// Close stops updating the strategies +func (h *samplingProvider) Close() error { + h.cancelFunc() + return nil +} + +func (h *samplingProvider) downloadSamplingStrategies(samplingURL string) ([]byte, error) { + h.logger.Info("Downloading sampling strategies", zap.String("url", samplingURL)) + + ctx, cx := context.WithTimeout(context.Background(), time.Second) + defer cx() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, samplingURL, nil) + if err != nil { + return nil, fmt.Errorf("cannot construct HTTP request: %w", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to download sampling strategies: %w", err) + } + defer resp.Body.Close() + + buf := new(bytes.Buffer) + if _, err = buf.ReadFrom(resp.Body); err != nil { + return nil, fmt.Errorf("failed to read sampling strategies HTTP response body: %w", err) + } + + if resp.StatusCode == http.StatusServiceUnavailable { + return nullJSON, nil + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf( + "receiving %s while downloading strategies file: %s", + resp.Status, + buf.String(), + ) + } + + return buf.Bytes(), nil +} + +func isURL(str string) bool { + u, err := url.Parse(str) + return err == nil && u.Scheme != "" && u.Host != "" +} + +func (h *samplingProvider) samplingStrategyLoader(strategiesFile string) strategyLoader { + if isURL(strategiesFile) { + return func() ([]byte, error) { + return h.downloadSamplingStrategies(strategiesFile) + } + } + + return func() ([]byte, error) { + h.logger.Info("Loading sampling strategies", zap.String("filename", strategiesFile)) + currBytes, err := os.ReadFile(filepath.Clean(strategiesFile)) + if err != nil { + return nil, fmt.Errorf("failed to read strategies file %s: %w", strategiesFile, err) + } + return currBytes, nil + } +} + +func (h *samplingProvider) autoUpdateStrategies(ctx context.Context, interval time.Duration, loader strategyLoader) { + lastValue := string(nullJSON) + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + lastValue = h.reloadSamplingStrategy(loader, lastValue) + case <-ctx.Done(): + return + } + } +} + +func (h *samplingProvider) reloadSamplingStrategy(loadFn strategyLoader, lastValue string) string { + newValue, err := loadFn() + if err != nil { + h.logger.Error("failed to re-load sampling strategies", zap.Error(err)) + return lastValue + } + if lastValue == string(newValue) { + return lastValue + } + if err := h.updateSamplingStrategy(newValue); err != nil { + h.logger.Error("failed to update sampling strategies", zap.Error(err)) + return lastValue + } + return string(newValue) +} + +func (h *samplingProvider) updateSamplingStrategy(dataBytes []byte) error { + var strategies strategies + if err := json.Unmarshal(dataBytes, &strategies); err != nil { + return fmt.Errorf("failed to unmarshal sampling strategies: %w", err) + } + h.parseStrategies(&strategies) + h.logger.Info("Updated sampling strategies:" + string(dataBytes)) + return nil +} + +// TODO good candidate for a global util function +func loadStrategies(loadFn strategyLoader) (*strategies, error) { + strategyBytes, err := loadFn() + if err != nil { + return nil, err + } + + var strategies *strategies + if err := json.Unmarshal(strategyBytes, &strategies); err != nil { + return nil, fmt.Errorf("failed to unmarshal strategies: %w", err) + } + return strategies, nil +} + +func (h *samplingProvider) parseStrategiesDeprecated(strategies *strategies) { + newStore := defaultStrategies() + if strategies.DefaultStrategy != nil { + newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy) + } + + merge := true + if newStore.defaultStrategy.OperationSampling == nil || + newStore.defaultStrategy.OperationSampling.PerOperationStrategies == nil { + merge = false + } + + for _, s := range strategies.ServiceStrategies { + newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s) + + // Merge with the default operation strategies, because only merging with + // the default strategy has no effect on service strategies (the default strategy + // is not merged with and only used as a fallback). + opS := newStore.serviceStrategies[s.Service].OperationSampling + if opS == nil { + if newStore.defaultStrategy.OperationSampling == nil || + newStore.serviceStrategies[s.Service].ProbabilisticSampling == nil { + continue + } + // Service has no per-operation strategies, so just reference the default settings and change default samplingRate. + newOpS := *newStore.defaultStrategy.OperationSampling + newOpS.DefaultSamplingProbability = newStore.serviceStrategies[s.Service].ProbabilisticSampling.SamplingRate + newStore.serviceStrategies[s.Service].OperationSampling = &newOpS + continue + } + if merge { + opS.PerOperationStrategies = mergePerOperationSamplingStrategies( + opS.PerOperationStrategies, + newStore.defaultStrategy.OperationSampling.PerOperationStrategies) + } + } + h.storedStrategies.Store(newStore) +} + +func (h *samplingProvider) parseStrategies(strategies *strategies) { + newStore := defaultStrategies() + if strategies.DefaultStrategy != nil { + newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy) + } + + for _, s := range strategies.ServiceStrategies { + newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s) + + // Config for this service may not have per-operation strategies, + // but if the default strategy has them they should still apply. + + if newStore.defaultStrategy.OperationSampling == nil { + // Default strategy doens't have them either, nothing to do. + continue + } + + opS := newStore.serviceStrategies[s.Service].OperationSampling + if opS == nil { + // Service does not have its own per-operation rules, so copy (by value) from the default strategy. + newOpS := *newStore.defaultStrategy.OperationSampling + + // If the service's own default is probabilistic, then its sampling rate should take precedence. + if newStore.serviceStrategies[s.Service].ProbabilisticSampling != nil { + newOpS.DefaultSamplingProbability = newStore.serviceStrategies[s.Service].ProbabilisticSampling.SamplingRate + } + newStore.serviceStrategies[s.Service].OperationSampling = &newOpS + continue + } + + // If the service did have its own per-operation strategies, then merge them with the default ones. + opS.PerOperationStrategies = mergePerOperationSamplingStrategies( + opS.PerOperationStrategies, + newStore.defaultStrategy.OperationSampling.PerOperationStrategies) + } + h.storedStrategies.Store(newStore) +} + +// mergePerOperationSamplingStrategies merges two operation strategies a and b, where a takes precedence over b. +func mergePerOperationSamplingStrategies( + a, b []*api_v2.OperationSamplingStrategy, +) []*api_v2.OperationSamplingStrategy { + m := make(map[string]bool) + for _, aOp := range a { + m[aOp.Operation] = true + } + for _, bOp := range b { + if m[bOp.Operation] { + continue + } + a = append(a, bOp) + } + return a +} + +func (h *samplingProvider) parseServiceStrategies(strategy *serviceStrategy) *api_v2.SamplingStrategyResponse { + resp := h.parseStrategy(&strategy.strategy) + if len(strategy.OperationStrategies) == 0 { + return resp + } + opS := &api_v2.PerOperationSamplingStrategies{ + DefaultSamplingProbability: defaultSamplingProbability, + } + if resp.StrategyType == api_v2.SamplingStrategyType_PROBABILISTIC { + opS.DefaultSamplingProbability = resp.ProbabilisticSampling.SamplingRate + } + for _, operationStrategy := range strategy.OperationStrategies { + s, ok := h.parseOperationStrategy(operationStrategy, opS) + if !ok { + continue + } + + opS.PerOperationStrategies = append(opS.PerOperationStrategies, + &api_v2.OperationSamplingStrategy{ + Operation: operationStrategy.Operation, + ProbabilisticSampling: s.ProbabilisticSampling, + }) + } + resp.OperationSampling = opS + return resp +} + +func (h *samplingProvider) parseOperationStrategy( + strategy *operationStrategy, + parent *api_v2.PerOperationSamplingStrategies, +) (s *api_v2.SamplingStrategyResponse, ok bool) { + s = h.parseStrategy(&strategy.strategy) + if s.StrategyType == api_v2.SamplingStrategyType_RATE_LIMITING { + // TODO OperationSamplingStrategy only supports probabilistic sampling + h.logger.Warn( + fmt.Sprintf( + "Operation strategies only supports probabilistic sampling at the moment,"+ + "'%s' defaulting to probabilistic sampling with probability %f", + strategy.Operation, parent.DefaultSamplingProbability), + zap.Any("strategy", strategy)) + return nil, false + } + return s, true +} + +func (h *samplingProvider) parseStrategy(strategy *strategy) *api_v2.SamplingStrategyResponse { + switch strategy.Type { + case samplerTypeProbabilistic: + return &api_v2.SamplingStrategyResponse{ + StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: strategy.Param, + }, + } + case samplerTypeRateLimiting: + return &api_v2.SamplingStrategyResponse{ + StrategyType: api_v2.SamplingStrategyType_RATE_LIMITING, + RateLimitingSampling: &api_v2.RateLimitingSamplingStrategy{ + MaxTracesPerSecond: int32(strategy.Param), + }, + } + default: + h.logger.Warn("Failed to parse sampling strategy", zap.Any("strategy", strategy)) + return defaultStrategyResponse() + } +} diff --git a/extension/jaegerremotesampling/internal/source/filesource/filesource_test.go b/extension/jaegerremotesampling/internal/source/filesource/filesource_test.go new file mode 100644 index 000000000000..fae69d1e913b --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/filesource_test.go @@ -0,0 +1,588 @@ +// Copyright The OpenTelemetry Authors +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package filesource + +import ( + "bytes" + "context" + "encoding/gob" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +const snapshotLocation = "./fixtures/" + +// Snapshots can be regenerated via: +// +// REGENERATE_SNAPSHOTS=true go test -v ./plugin/sampling/strategyprovider/static/provider_test.go +var regenerateSnapshots = os.Getenv("REGENERATE_SNAPSHOTS") == "true" + +// strategiesJSON returns the strategy with +// a given probability. +func strategiesJSON(probability float32) string { + strategy := fmt.Sprintf(` + { + "default_strategy": { + "type": "probabilistic", + "param": 0.5 + }, + "service_strategies": [ + { + "service": "foo", + "type": "probabilistic", + "param": %.1f + }, + { + "service": "bar", + "type": "ratelimiting", + "param": 5 + } + ] + } + `, + probability, + ) + return strategy +} + +func deepCopy(s *api_v2.SamplingStrategyResponse) (*api_v2.SamplingStrategyResponse, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + dec := gob.NewDecoder(&buf) + err := enc.Encode(*s) + if err != nil { + return nil, err + } + var copyValue api_v2.SamplingStrategyResponse + err = dec.Decode(©Value) + return ©Value, err +} + +// Returns strategies in JSON format. Used for testing +// URL option for sampling strategies. +func mockStrategyServer(t *testing.T) (*httptest.Server, *atomic.Pointer[string]) { + var strategy atomic.Pointer[string] + value := strategiesJSON(0.8) + strategy.Store(&value) + f := func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/bad-content": + _, err := w.Write([]byte("bad-content")) + assert.NoError(t, err) + return + + case "/bad-status": + w.WriteHeader(http.StatusNotFound) + return + + case "/service-unavailable": + w.WriteHeader(http.StatusServiceUnavailable) + return + + default: + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + _, err := w.Write([]byte(*strategy.Load())) + assert.NoError(t, err) + } + } + mockserver := httptest.NewServer(http.HandlerFunc(f)) + t.Cleanup(func() { + mockserver.Close() + }) + return mockserver, &strategy +} + +func TestStrategyStoreWithFile(t *testing.T) { + _, err := NewFileSource(Options{StrategiesFile: "fileNotFound.json"}, zap.NewNop()) + require.ErrorContains(t, err, "failed to read strategies file fileNotFound.json") + + _, err = NewFileSource(Options{StrategiesFile: "fixtures/bad_strategies.json"}, zap.NewNop()) + require.EqualError(t, err, + "failed to unmarshal strategies: json: cannot unmarshal string into Go value of type filesource.strategies") + + // Test default strategy + zapCore, logs := observer.New(zap.InfoLevel) + logger := zap.New(zapCore) + provider, err := NewFileSource(Options{}, logger) + require.NoError(t, err) + message := logs.FilterMessage("No sampling strategies source provided, using defaults") + assert.Equal(t, 1, message.Len(), "Expected No sampling strategies provided log message") + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.001), *s) + + // Test reading strategies from a file + provider, err = NewFileSource(Options{StrategiesFile: "fixtures/strategies.json"}, logger) + require.NoError(t, err) + s, err = provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + s, err = provider.GetSamplingStrategy(context.Background(), "bar") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_RATE_LIMITING, 5), *s) + + s, err = provider.GetSamplingStrategy(context.Background(), "default") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.5), *s) +} + +func TestStrategyStoreWithURL(t *testing.T) { + // Test default strategy when URL is temporarily unavailable. + zapCore, logs := observer.New(zap.InfoLevel) + logger := zap.New(zapCore) + mockServer, _ := mockStrategyServer(t) + provider, err := NewFileSource(Options{StrategiesFile: mockServer.URL + "/service-unavailable"}, logger) + require.NoError(t, err) + message := logs.FilterMessage("No sampling strategies found or URL is unavailable, using defaults") + assert.Equal(t, 1, message.Len(), "Expected No sampling strategies found log message.") + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.001), *s) + + // Test downloading strategies from a URL. + provider, err = NewFileSource(Options{StrategiesFile: mockServer.URL}, logger) + require.NoError(t, err) + + s, err = provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + s, err = provider.GetSamplingStrategy(context.Background(), "bar") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_RATE_LIMITING, 5), *s) +} + +func TestPerOperationSamplingStrategies(t *testing.T) { + tests := []struct { + options Options + }{ + {Options{StrategiesFile: "fixtures/operation_strategies.json"}}, + {Options{ + StrategiesFile: "fixtures/operation_strategies.json", + IncludeDefaultOpStrategies: true, + }}, + } + + for _, tc := range tests { + zapCore, logs := observer.New(zap.InfoLevel) + logger := zap.New(zapCore) + provider, err := NewFileSource(tc.options, logger) + message := logs.FilterMessage("Operation strategies only supports probabilistic sampling at the moment," + + "'op2' defaulting to probabilistic sampling with probability 0.800000") + assert.Equal(t, 1, message.Len(), "Expected Operation strategies only supports probabilistic sampling, op2 change to probability 0.8 log message") + message = logs.FilterMessage("Operation strategies only supports probabilistic sampling at the moment," + + "'op4' defaulting to probabilistic sampling with probability 0.001000") + assert.Equal(t, 1, message.Len(), "Expected Operation strategies only supports probabilistic sampling, op2 change to probability 0.8 log message") + require.NoError(t, err) + + expected := makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.8) + + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.Equal(t, api_v2.SamplingStrategyType_PROBABILISTIC, s.StrategyType) + assert.Equal(t, *expected.ProbabilisticSampling, *s.ProbabilisticSampling) + + require.NotNil(t, s.OperationSampling) + opSampling := s.OperationSampling + assert.InDelta(t, 0.8, opSampling.DefaultSamplingProbability, 0.01) + require.Len(t, opSampling.PerOperationStrategies, 4) + + assert.Equal(t, "op6", opSampling.PerOperationStrategies[0].Operation) + assert.InDelta(t, 0.5, opSampling.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op1", opSampling.PerOperationStrategies[1].Operation) + assert.InDelta(t, 0.2, opSampling.PerOperationStrategies[1].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op0", opSampling.PerOperationStrategies[2].Operation) + assert.InDelta(t, 0.2, opSampling.PerOperationStrategies[2].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op7", opSampling.PerOperationStrategies[3].Operation) + assert.InDelta(t, 1.0, opSampling.PerOperationStrategies[3].ProbabilisticSampling.SamplingRate, 0.01) + + expected = makeResponse(api_v2.SamplingStrategyType_RATE_LIMITING, 5) + + s, err = provider.GetSamplingStrategy(context.Background(), "bar") + require.NoError(t, err) + assert.Equal(t, api_v2.SamplingStrategyType_RATE_LIMITING, s.StrategyType) + assert.Equal(t, *expected.RateLimitingSampling, *s.RateLimitingSampling) + + require.NotNil(t, s.OperationSampling) + opSampling = s.OperationSampling + assert.InDelta(t, 0.001, opSampling.DefaultSamplingProbability, 1e-4) + require.Len(t, opSampling.PerOperationStrategies, 5) + assert.Equal(t, "op3", opSampling.PerOperationStrategies[0].Operation) + assert.InDelta(t, 0.3, opSampling.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op5", opSampling.PerOperationStrategies[1].Operation) + assert.InDelta(t, 0.4, opSampling.PerOperationStrategies[1].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op0", opSampling.PerOperationStrategies[2].Operation) + assert.InDelta(t, 0.2, opSampling.PerOperationStrategies[2].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op6", opSampling.PerOperationStrategies[3].Operation) + assert.InDelta(t, 0.0, opSampling.PerOperationStrategies[3].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op7", opSampling.PerOperationStrategies[4].Operation) + assert.InDelta(t, 1.0, opSampling.PerOperationStrategies[4].ProbabilisticSampling.SamplingRate, 0.01) + + s, err = provider.GetSamplingStrategy(context.Background(), "default") + require.NoError(t, err) + expectedRsp := makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.5) + expectedRsp.OperationSampling = &api_v2.PerOperationSamplingStrategies{ + DefaultSamplingProbability: 0.5, + PerOperationStrategies: []*api_v2.OperationSamplingStrategy{ + { + Operation: "op0", + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: 0.2, + }, + }, + { + Operation: "op6", + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: 0, + }, + }, + { + Operation: "op7", + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: 1, + }, + }, + }, + } + assert.EqualValues(t, expectedRsp, *s) + } +} + +func TestMissingServiceSamplingStrategyTypes(t *testing.T) { + zapCore, logs := observer.New(zap.InfoLevel) + logger := zap.New(zapCore) + provider, err := NewFileSource(Options{StrategiesFile: "fixtures/missing-service-types.json"}, logger) + message := logs.FilterMessage("Failed to parse sampling strategy") + assert.Equal(t, "Failed to parse sampling strategy", message.All()[0].Message) + require.NoError(t, err) + + expected := makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, defaultSamplingProbability) + + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.Equal(t, api_v2.SamplingStrategyType_PROBABILISTIC, s.StrategyType) + assert.Equal(t, *expected.ProbabilisticSampling, *s.ProbabilisticSampling) + + require.NotNil(t, s.OperationSampling) + opSampling := s.OperationSampling + assert.InDelta(t, defaultSamplingProbability, opSampling.DefaultSamplingProbability, 1e-4) + require.Len(t, opSampling.PerOperationStrategies, 1) + assert.Equal(t, "op1", opSampling.PerOperationStrategies[0].Operation) + assert.InDelta(t, 0.2, opSampling.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate, 0.001) + + expected = makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, defaultSamplingProbability) + + s, err = provider.GetSamplingStrategy(context.Background(), "bar") + require.NoError(t, err) + assert.Equal(t, api_v2.SamplingStrategyType_PROBABILISTIC, s.StrategyType) + assert.Equal(t, *expected.ProbabilisticSampling, *s.ProbabilisticSampling) + + require.NotNil(t, s.OperationSampling) + opSampling = s.OperationSampling + assert.InDelta(t, 0.001, opSampling.DefaultSamplingProbability, 1e-4) + require.Len(t, opSampling.PerOperationStrategies, 2) + assert.Equal(t, "op3", opSampling.PerOperationStrategies[0].Operation) + assert.InDelta(t, 0.3, opSampling.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op5", opSampling.PerOperationStrategies[1].Operation) + assert.InDelta(t, 0.4, opSampling.PerOperationStrategies[1].ProbabilisticSampling.SamplingRate, 0.01) + + s, err = provider.GetSamplingStrategy(context.Background(), "default") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.5), *s) +} + +func TestParseStrategy(t *testing.T) { + tests := []struct { + strategy serviceStrategy + expected api_v2.SamplingStrategyResponse + }{ + { + strategy: serviceStrategy{ + Service: "svc", + strategy: strategy{Type: "probabilistic", Param: 0.2}, + }, + expected: makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.2), + }, + { + strategy: serviceStrategy{ + Service: "svc", + strategy: strategy{Type: "ratelimiting", Param: 3.5}, + }, + expected: makeResponse(api_v2.SamplingStrategyType_RATE_LIMITING, 3), + }, + } + zapCore, logs := observer.New(zap.InfoLevel) + logger := zap.New(zapCore) + provider := &samplingProvider{logger: logger} + for _, test := range tests { + tt := test + t.Run("", func(t *testing.T) { + assert.EqualValues(t, tt.expected, *provider.parseStrategy(&tt.strategy.strategy)) + }) + } + assert.Empty(t, logs.Len()) + + // Test nonexistent strategy type + actual := *provider.parseStrategy(&strategy{Type: "blah", Param: 3.5}) + expected := makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, defaultSamplingProbability) + assert.EqualValues(t, expected, actual) + message := logs.FilterMessage("Failed to parse sampling strategy") + assert.Equal(t, 1, message.Len(), "Expected Failed to parse sampling strategy log message.") +} + +func makeResponse(samplerType api_v2.SamplingStrategyType, param float64) (resp api_v2.SamplingStrategyResponse) { + resp.StrategyType = samplerType + if samplerType == api_v2.SamplingStrategyType_PROBABILISTIC { + resp.ProbabilisticSampling = &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: param, + } + } else if samplerType == api_v2.SamplingStrategyType_RATE_LIMITING { + resp.RateLimitingSampling = &api_v2.RateLimitingSamplingStrategy{ + MaxTracesPerSecond: int32(param), + } + } + return resp +} + +func TestDeepCopy(t *testing.T) { + s := &api_v2.SamplingStrategyResponse{ + StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: 0.5, + }, + } + cp, err := deepCopy(s) + require.NoError(t, err) + assert.NotSame(t, cp, s) + assert.EqualValues(t, cp, s) +} + +func TestAutoUpdateStrategyWithFile(t *testing.T) { + tempFile, _ := os.CreateTemp("", "for_go_test_*.json") + require.NoError(t, tempFile.Close()) + defer func() { + require.NoError(t, os.Remove(tempFile.Name())) + }() + + // copy known fixture content into temp file which we can later overwrite + srcFile, dstFile := "fixtures/strategies.json", tempFile.Name() + srcBytes, err := os.ReadFile(srcFile) + require.NoError(t, err) + require.NoError(t, os.WriteFile(dstFile, srcBytes, 0o600)) + + ss, err := NewFileSource(Options{ + StrategiesFile: dstFile, + ReloadInterval: time.Millisecond * 10, + }, zap.NewNop()) + require.NoError(t, err) + provider := ss.(*samplingProvider) + defer provider.Close() + + // confirm baseline value + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + // verify that reloading is a no-op + value := provider.reloadSamplingStrategy(provider.samplingStrategyLoader(dstFile), string(srcBytes)) + assert.Equal(t, string(srcBytes), value) + + // update file with new probability of 0.9 + newStr := strings.Replace(string(srcBytes), "0.8", "0.9", 1) + require.NoError(t, os.WriteFile(dstFile, []byte(newStr), 0o600)) + + // wait for reload timer + for i := 0; i < 1000; i++ { // wait up to 1sec + s, err = provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + if s.ProbabilisticSampling != nil && s.ProbabilisticSampling.SamplingRate == 0.9 { + break + } + time.Sleep(1 * time.Millisecond) + } + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.9), *s) +} + +func TestAutoUpdateStrategyWithURL(t *testing.T) { + mockServer, mockStrategy := mockStrategyServer(t) + ss, err := NewFileSource(Options{ + StrategiesFile: mockServer.URL, + ReloadInterval: 10 * time.Millisecond, + }, zap.NewNop()) + require.NoError(t, err) + provider := ss.(*samplingProvider) + defer provider.Close() + + // confirm baseline value + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + // verify that reloading in no-op + value := provider.reloadSamplingStrategy( + provider.samplingStrategyLoader(mockServer.URL), + *mockStrategy.Load(), + ) + assert.Equal(t, *mockStrategy.Load(), value) + + // update original strategies with new probability of 0.9 + { + v09 := strategiesJSON(0.9) + mockStrategy.Store(&v09) + } + + // wait for reload timer + for i := 0; i < 1000; i++ { // wait up to 1sec + s, err = provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + if s.ProbabilisticSampling != nil && s.ProbabilisticSampling.SamplingRate == 0.9 { + break + } + time.Sleep(1 * time.Millisecond) + } + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.9), *s) +} + +func TestAutoUpdateStrategyErrors(t *testing.T) { + tempFile, _ := os.CreateTemp("", "for_go_test_*.json") + require.NoError(t, tempFile.Close()) + defer func() { + _ = os.Remove(tempFile.Name()) + }() + + zapCore, logs := observer.New(zap.InfoLevel) + logger := zap.New(zapCore) + + s, err := NewFileSource(Options{ + StrategiesFile: "fixtures/strategies.json", + ReloadInterval: time.Hour, + }, logger) + require.NoError(t, err) + provider := s.(*samplingProvider) + defer provider.Close() + + // check invalid file path or read failure + assert.Equal(t, "blah", provider.reloadSamplingStrategy(provider.samplingStrategyLoader(tempFile.Name()+"bad-path"), "blah")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 1) + + // check bad file content + require.NoError(t, os.WriteFile(tempFile.Name(), []byte("bad value"), 0o600)) + assert.Equal(t, "blah", provider.reloadSamplingStrategy(provider.samplingStrategyLoader(tempFile.Name()), "blah")) + assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 1) + + // check invalid url + assert.Equal(t, "duh", provider.reloadSamplingStrategy(provider.samplingStrategyLoader("bad-url"), "duh")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 2) + + // check status code other than 200 + mockServer, _ := mockStrategyServer(t) + assert.Equal(t, "duh", provider.reloadSamplingStrategy(provider.samplingStrategyLoader(mockServer.URL+"/bad-status"), "duh")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 3) + + // check bad content from url + assert.Equal(t, "duh", provider.reloadSamplingStrategy(provider.samplingStrategyLoader(mockServer.URL+"/bad-content"), "duh")) + assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 2) +} + +func TestServiceNoPerOperationStrategies(t *testing.T) { + // given setup of strategy provider with no specific per operation sampling strategies + // and option "sampling.strategies.bugfix-5270=true" + provider, err := NewFileSource(Options{ + StrategiesFile: "fixtures/service_no_per_operation.json", + IncludeDefaultOpStrategies: true, + }, zap.NewNop()) + require.NoError(t, err) + + for _, service := range []string{"ServiceA", "ServiceB"} { + t.Run(service, func(t *testing.T) { + strategy, err := provider.GetSamplingStrategy(context.Background(), service) + require.NoError(t, err) + strategyJSON, err := json.MarshalIndent(strategy, "", " ") + require.NoError(t, err) + + testName := strings.ReplaceAll(t.Name(), "/", "_") + snapshotFile := filepath.Join(snapshotLocation, testName+".json") + expectedServiceResponse, err := os.ReadFile(snapshotFile) + require.NoError(t, err) + + assert.JSONEq(t, string(expectedServiceResponse), string(strategyJSON), + "comparing against stored snapshot. Use REGENERATE_SNAPSHOTS=true to rebuild snapshots.") + + if regenerateSnapshots { + err = os.WriteFile(snapshotFile, strategyJSON, 0o600) + require.NoError(t, err) + } + }) + } +} + +func TestServiceNoPerOperationStrategiesDeprecatedBehavior(t *testing.T) { + // test case to be removed along with removal of strategy_store.parseStrategies_deprecated, + // see https://github.com/jaegertracing/jaeger/issues/5270 for more details + + // given setup of strategy provider with no specific per operation sampling strategies + provider, err := NewFileSource(Options{ + StrategiesFile: "fixtures/service_no_per_operation.json", + }, zap.NewNop()) + require.NoError(t, err) + + for _, service := range []string{"ServiceA", "ServiceB"} { + t.Run(service, func(t *testing.T) { + strategy, err := provider.GetSamplingStrategy(context.Background(), service) + require.NoError(t, err) + strategyJSON, err := json.MarshalIndent(strategy, "", " ") + require.NoError(t, err) + + testName := strings.ReplaceAll(t.Name(), "/", "_") + snapshotFile := filepath.Join(snapshotLocation, testName+".json") + expectedServiceResponse, err := os.ReadFile(snapshotFile) + require.NoError(t, err) + + assert.JSONEq(t, string(expectedServiceResponse), string(strategyJSON), + "comparing against stored snapshot. Use REGENERATE_SNAPSHOTS=true to rebuild snapshots.") + + if regenerateSnapshots { + err = os.WriteFile(snapshotFile, strategyJSON, 0o600) + require.NoError(t, err) + } + }) + } +} + +func TestSamplingStrategyLoader(t *testing.T) { + provider := &samplingProvider{logger: zap.NewNop()} + // invalid file path + loader := provider.samplingStrategyLoader("not-exists") + _, err := loader() + require.ErrorContains(t, err, "failed to read strategies file not-exists") + + // status code other than 200 + mockServer, _ := mockStrategyServer(t) + loader = provider.samplingStrategyLoader(mockServer.URL + "/bad-status") + _, err = loader() + require.ErrorContains(t, err, "receiving 404 Not Found while downloading strategies file") + + // should download content from URL + loader = provider.samplingStrategyLoader(mockServer.URL + "/bad-content") + content, err := loader() + require.NoError(t, err) + assert.Equal(t, "bad-content", string(content)) +} diff --git a/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceA.json b/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceA.json new file mode 100644 index 000000000000..6834df079eb6 --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceA.json @@ -0,0 +1,16 @@ +{ + "probabilisticSampling": { + "samplingRate": 1 + }, + "operationSampling": { + "defaultSamplingProbability": 1, + "perOperationStrategies": [ + { + "operation": "/health", + "probabilisticSampling": { + "samplingRate": 0.1 + } + } + ] + } +} \ No newline at end of file diff --git a/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceB.json b/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceB.json new file mode 100644 index 000000000000..56e51c78391f --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceB.json @@ -0,0 +1,6 @@ +{ + "strategyType": 1, + "rateLimitingSampling": { + "maxTracesPerSecond": 3 + } +} \ No newline at end of file diff --git a/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategies_ServiceA.json b/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategies_ServiceA.json new file mode 100644 index 000000000000..6834df079eb6 --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategies_ServiceA.json @@ -0,0 +1,16 @@ +{ + "probabilisticSampling": { + "samplingRate": 1 + }, + "operationSampling": { + "defaultSamplingProbability": 1, + "perOperationStrategies": [ + { + "operation": "/health", + "probabilisticSampling": { + "samplingRate": 0.1 + } + } + ] + } +} \ No newline at end of file diff --git a/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategies_ServiceB.json b/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategies_ServiceB.json new file mode 100644 index 000000000000..cc28f904fefa --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/fixtures/TestServiceNoPerOperationStrategies_ServiceB.json @@ -0,0 +1,17 @@ +{ + "strategyType": 1, + "rateLimitingSampling": { + "maxTracesPerSecond": 3 + }, + "operationSampling": { + "defaultSamplingProbability": 0.2, + "perOperationStrategies": [ + { + "operation": "/health", + "probabilisticSampling": { + "samplingRate": 0.1 + } + } + ] + } +} \ No newline at end of file diff --git a/extension/jaegerremotesampling/internal/source/filesource/fixtures/bad_strategies.json b/extension/jaegerremotesampling/internal/source/filesource/fixtures/bad_strategies.json new file mode 100644 index 000000000000..209a97341c53 --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/fixtures/bad_strategies.json @@ -0,0 +1 @@ +"nonsense" diff --git a/extension/jaegerremotesampling/internal/source/filesource/fixtures/missing-service-types.json b/extension/jaegerremotesampling/internal/source/filesource/fixtures/missing-service-types.json new file mode 100644 index 000000000000..0d3d5f2a3c00 --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/fixtures/missing-service-types.json @@ -0,0 +1,33 @@ +{ + "default_strategy": { + "type": "probabilistic", + "param": 0.5 + }, + "service_strategies": [ + { + "service": "foo", + "operation_strategies": [ + { + "operation": "op1", + "type": "probabilistic", + "param": 0.2 + } + ] + }, + { + "service": "bar", + "operation_strategies": [ + { + "operation": "op3", + "type": "probabilistic", + "param": 0.3 + }, + { + "operation": "op5", + "type": "probabilistic", + "param": 0.4 + } + ] + } + ] +} diff --git a/extension/jaegerremotesampling/internal/source/filesource/fixtures/operation_strategies.json b/extension/jaegerremotesampling/internal/source/filesource/fixtures/operation_strategies.json new file mode 100644 index 000000000000..8a1b7677aab1 --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/fixtures/operation_strategies.json @@ -0,0 +1,74 @@ +{ + "default_strategy": { + "type": "probabilistic", + "param": 0.5, + "operation_strategies": [ + { + "operation": "op0", + "type": "probabilistic", + "param": 0.2 + }, + { + "operation": "op6", + "type": "probabilistic", + "param": 0 + }, + { + "operation": "spam", + "type": "ratelimiting", + "param": 1 + }, + { + "operation": "op7", + "type": "probabilistic", + "param": 1 + } + ] + }, + "service_strategies": [ + { + "service": "foo", + "type": "probabilistic", + "param": 0.8, + "operation_strategies": [ + { + "operation": "op6", + "type": "probabilistic", + "param": 0.5 + }, + { + "operation": "op1", + "type": "probabilistic", + "param": 0.2 + }, + { + "operation": "op2", + "type": "ratelimiting", + "param": 10 + } + ] + }, + { + "service": "bar", + "type": "ratelimiting", + "param": 5, + "operation_strategies": [ + { + "operation": "op3", + "type": "probabilistic", + "param": 0.3 + }, + { + "operation": "op4", + "type": "ratelimiting", + "param": 100 + }, + { + "operation": "op5", + "type": "probabilistic", + "param": 0.4 + } + ] + } + ] +} diff --git a/extension/jaegerremotesampling/internal/source/filesource/fixtures/service_no_per_operation.json b/extension/jaegerremotesampling/internal/source/filesource/fixtures/service_no_per_operation.json new file mode 100644 index 000000000000..29b50d9f4d3f --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/fixtures/service_no_per_operation.json @@ -0,0 +1,25 @@ +{ + "service_strategies": [ + { + "service": "ServiceA", + "type": "probabilistic", + "param": 1.0 + }, + { + "service": "ServiceB", + "type": "ratelimiting", + "param": 3 + } + ], + "default_strategy": { + "type": "probabilistic", + "param": 0.2, + "operation_strategies": [ + { + "operation": "/health", + "type": "probabilistic", + "param": 0.1 + } + ] + } +} diff --git a/extension/jaegerremotesampling/internal/source/filesource/fixtures/strategies.json b/extension/jaegerremotesampling/internal/source/filesource/fixtures/strategies.json new file mode 100644 index 000000000000..e81d43984963 --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/fixtures/strategies.json @@ -0,0 +1,18 @@ +{ + "default_strategy": { + "type": "probabilistic", + "param": 0.5 + }, + "service_strategies": [ + { + "service": "foo", + "type": "probabilistic", + "param": 0.8 + }, + { + "service": "bar", + "type": "ratelimiting", + "param": 5 + } + ] +} diff --git a/extension/jaegerremotesampling/internal/source/filesource/model.go b/extension/jaegerremotesampling/internal/source/filesource/model.go new file mode 100644 index 000000000000..90f70d9087a0 --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/model.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package filesource // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source/filesource" + +// strategy defines a sampling strategy. Type can be "probabilistic" or "ratelimiting" +// and Param will represent "sampling probability" and "max traces per second" respectively. +type strategy struct { + Type string `json:"type"` + Param float64 `json:"param"` +} + +// operationStrategy defines an operation specific sampling strategy. +type operationStrategy struct { + Operation string `json:"operation"` + strategy +} + +// serviceStrategy defines a service specific sampling strategy. +type serviceStrategy struct { + Service string `json:"service"` + OperationStrategies []*operationStrategy `json:"operation_strategies"` + strategy +} + +// strategies holds a default sampling strategy and service specific sampling strategies. +type strategies struct { + DefaultStrategy *serviceStrategy `json:"default_strategy"` + ServiceStrategies []*serviceStrategy `json:"service_strategies"` +} diff --git a/extension/jaegerremotesampling/internal/source/filesource/options.go b/extension/jaegerremotesampling/internal/source/filesource/options.go new file mode 100644 index 000000000000..f0c8ab9d2cd2 --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/filesource/options.go @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package filesource // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source/filesource" + +import ( + "time" +) + +// Options holds configuration for the static sampling strategy store. +type Options struct { + // StrategiesFile is the path for the sampling strategies file in JSON format + StrategiesFile string + // ReloadInterval is the time interval to check and reload sampling strategies file + ReloadInterval time.Duration + // Flag for enabling possibly breaking change which includes default operations level + // strategies when calculating Ratelimiting type service level strategy + // more information https://github.com/jaegertracing/jaeger/issues/5270 + IncludeDefaultOpStrategies bool +} diff --git a/extension/jaegerremotesampling/internal/source/interface.go b/extension/jaegerremotesampling/internal/source/interface.go new file mode 100644 index 000000000000..69743ee822ec --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/interface.go @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package source // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source" + +import ( + "context" + "io" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" +) + +// Source keeps track of service specific sampling strategies. +type Source interface { + // Close() from io.Closer stops the processor from calculating probabilities. + io.Closer + + // GetSamplingStrategy retrieves the sampling strategy for the specified service. + GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) +} diff --git a/extension/jaegerremotesampling/internal/source/remotesource/manager.go b/extension/jaegerremotesampling/internal/source/remotesource/manager.go new file mode 100644 index 000000000000..7fd11b15b4bc --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/remotesource/manager.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesource // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source/remotesource" + +import ( + "context" + "fmt" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "google.golang.org/grpc" +) + +// ConfigManagerProxy returns sampling decisions from collector over gRPC. +type ConfigManagerProxy struct { + client api_v2.SamplingManagerClient +} + +// NewConfigManager creates gRPC sampling manager. +func NewConfigManager(conn *grpc.ClientConn) *ConfigManagerProxy { + return &ConfigManagerProxy{ + client: api_v2.NewSamplingManagerClient(conn), + } +} + +// GetSamplingStrategy returns sampling strategies from collector. +func (s *ConfigManagerProxy) GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) { + resp, err := s.client.GetSamplingStrategy(ctx, &api_v2.SamplingStrategyParameters{ServiceName: serviceName}) + if err != nil { + return nil, fmt.Errorf("failed to get sampling strategy: %w", err) + } + return resp, nil +} diff --git a/extension/jaegerremotesampling/internal/source/remotesource/manager_test.go b/extension/jaegerremotesampling/internal/source/remotesource/manager_test.go new file mode 100644 index 000000000000..f85df51cbc41 --- /dev/null +++ b/extension/jaegerremotesampling/internal/source/remotesource/manager_test.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// Copyright (c) 2018 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesource + +import ( + "context" + "net" + "testing" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestSamplingManager_GetSamplingStrategy(t *testing.T) { + s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { + api_v2.RegisterSamplingManagerServer(s, &mockSamplingHandler{}) + }) + conn, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + t.Cleanup(func() { require.NoError(t, conn.Close()) }) + require.NoError(t, err) + defer s.GracefulStop() + manager := NewConfigManager(conn) + resp, err := manager.GetSamplingStrategy(context.Background(), "any") + require.NoError(t, err) + assert.Equal(t, &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, resp) +} + +func TestSamplingManager_GetSamplingStrategy_error(t *testing.T) { + conn, err := grpc.NewClient("foo", grpc.WithTransportCredentials(insecure.NewCredentials())) + t.Cleanup(func() { require.NoError(t, conn.Close()) }) + require.NoError(t, err) + manager := NewConfigManager(conn) + resp, err := manager.GetSamplingStrategy(context.Background(), "any") + require.Nil(t, resp) + assert.ErrorContains(t, err, "failed to get sampling strategy") +} + +type mockSamplingHandler struct{} + +func (*mockSamplingHandler) GetSamplingStrategy(context.Context, *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) { + return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil +} + +func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server)) (*grpc.Server, net.Addr) { + server := grpc.NewServer() + lis, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + beforeServe(server) + go func() { + err := server.Serve(lis) + assert.NoError(t, err) + }() + return server, lis.Addr() +} diff --git a/extension/jaegerremotesampling/internal/remote_strategy_cache.go b/extension/jaegerremotesampling/internal/source/remotesource/remote_strategy_cache.go similarity index 96% rename from extension/jaegerremotesampling/internal/remote_strategy_cache.go rename to extension/jaegerremotesampling/internal/source/remotesource/remote_strategy_cache.go index cb6375c2ce0b..4dc29c912ec4 100644 --- a/extension/jaegerremotesampling/internal/remote_strategy_cache.go +++ b/extension/jaegerremotesampling/internal/source/remotesource/remote_strategy_cache.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal" +package remotesource // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source/remotesource" import ( "context" diff --git a/extension/jaegerremotesampling/internal/remote_strategy_cache_test.go b/extension/jaegerremotesampling/internal/source/remotesource/remote_strategy_cache_test.go similarity index 99% rename from extension/jaegerremotesampling/internal/remote_strategy_cache_test.go rename to extension/jaegerremotesampling/internal/source/remotesource/remote_strategy_cache_test.go index 816c333e02c9..e4d3e7f2a082 100644 --- a/extension/jaegerremotesampling/internal/remote_strategy_cache_test.go +++ b/extension/jaegerremotesampling/internal/source/remotesource/remote_strategy_cache_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal +package remotesource import ( "context" diff --git a/extension/jaegerremotesampling/internal/remote_strategy_store.go b/extension/jaegerremotesampling/internal/source/remotesource/remote_strategy_store.go similarity index 77% rename from extension/jaegerremotesampling/internal/remote_strategy_store.go rename to extension/jaegerremotesampling/internal/source/remotesource/remote_strategy_store.go index 68ce1be8843b..a38c5c728236 100644 --- a/extension/jaegerremotesampling/internal/remote_strategy_store.go +++ b/extension/jaegerremotesampling/internal/source/remotesource/remote_strategy_store.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal" +package remotesource // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source/remotesource" import ( "context" @@ -9,30 +9,30 @@ import ( "io" "time" - grpcstore "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configopaque" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal/source" ) type grpcRemoteStrategyStore struct { headerAdditions map[string]configopaque.String - delegate *grpcstore.ConfigManagerProxy + delegate *ConfigManagerProxy cache serviceStrategyCache } -// NewRemoteStrategyStore returns a StrategyStore that delegates to the configured Jaeger gRPC endpoint, making +// NewRemoteSource returns a StrategyStore that delegates to the configured Jaeger gRPC endpoint, making // extension-configured enhancements (header additions only for now) to the gRPC context of every outbound gRPC call. // Note: it would be nice to expand the configuration surface to include an optional TTL-based caching behavior // for service-specific outbound GetSamplingStrategy calls. -func NewRemoteStrategyStore( +func NewRemoteSource( conn *grpc.ClientConn, grpcClientSettings *configgrpc.ClientConfig, reloadInterval time.Duration, -) (samplingstrategy.Provider, io.Closer) { +) (source.Source, io.Closer) { cache := newNoopStrategyCache() if reloadInterval > 0 { cache = newServiceStrategyCache(reloadInterval) @@ -40,7 +40,7 @@ func NewRemoteStrategyStore( return &grpcRemoteStrategyStore{ headerAdditions: grpcClientSettings.Headers, - delegate: grpcstore.NewConfigManager(conn), + delegate: NewConfigManager(conn), cache: cache, }, cache }