diff --git a/src/code.cloudfoundry.org/go.mod b/src/code.cloudfoundry.org/go.mod index f14c0874..ed0a22bd 100644 --- a/src/code.cloudfoundry.org/go.mod +++ b/src/code.cloudfoundry.org/go.mod @@ -5,21 +5,21 @@ go 1.20 replace github.com/nats-io/gnatsd => github.com/nats-io/gnatsd v1.4.1 require ( - code.cloudfoundry.org/cf-networking-helpers v0.0.0-20231017144728-583bfb3f8b2c + code.cloudfoundry.org/cf-networking-helpers v0.0.0-20231025144627-414cbe44463b code.cloudfoundry.org/lager/v3 v3.0.2 code.cloudfoundry.org/tlsconfig v0.0.0-20231017135636-f0e44068c22f github.com/nats-io/gnatsd v1.4.1 github.com/nats-io/go-nats v1.7.2 - github.com/nats-io/nats-server/v2 v2.10.3 + github.com/nats-io/nats-server/v2 v2.10.4 github.com/onsi/ginkgo/v2 v2.13.0 - github.com/onsi/gomega v1.28.1 + github.com/onsi/gomega v1.29.0 github.com/tedsuo/ifrit v0.0.0-20230516164442-7862c310ad26 ) require ( filippo.io/edwards25519 v1.0.0 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect - github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/logr v1.3.0 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.6.0 // indirect diff --git a/src/code.cloudfoundry.org/go.sum b/src/code.cloudfoundry.org/go.sum index cc2b3f18..4729af94 100644 --- a/src/code.cloudfoundry.org/go.sum +++ b/src/code.cloudfoundry.org/go.sum @@ -592,8 +592,8 @@ cloud.google.com/go/workflows v1.7.0/go.mod h1:JhSrZuVZWuiDfKEFxU0/F1PQjmpnpcoIS cloud.google.com/go/workflows v1.8.0/go.mod h1:ysGhmEajwZxGn1OhGOGKsTXc5PyxOc0vfKf5Af+to4M= cloud.google.com/go/workflows v1.9.0/go.mod h1:ZGkj1aFIOd9c8Gerkjjq7OW7I5+l6cSvT3ujaO/WwSA= cloud.google.com/go/workflows v1.10.0/go.mod h1:fZ8LmRmZQWacon9UCX1r/g/DfAXx5VcPALq2CxzdePw= -code.cloudfoundry.org/cf-networking-helpers v0.0.0-20231017144728-583bfb3f8b2c h1:Rx3ltkKS1ZdmRm8wcoMZrt4wWXbQfI/XbnsPoXOnbW8= -code.cloudfoundry.org/cf-networking-helpers v0.0.0-20231017144728-583bfb3f8b2c/go.mod h1:zbF4Byg2o0Pu8tyYfMCOor7AhpulhMRFlBnqcrZ3Exk= +code.cloudfoundry.org/cf-networking-helpers v0.0.0-20231025144627-414cbe44463b h1:4T/8CBetJ0k33J299u+kOIH8Nf7zjH3Mm8xcLjkOQkg= +code.cloudfoundry.org/cf-networking-helpers v0.0.0-20231025144627-414cbe44463b/go.mod h1:hu/v+WmVSzD6dRVEH8kyLY15WyFhdOF4sOI5umsmbJk= code.cloudfoundry.org/lager/v3 v3.0.2 h1:H0dcQY+814G1Ea0e5K/AMaMpcr+Pe5Iv+AALJEwrP9U= code.cloudfoundry.org/lager/v3 v3.0.2/go.mod h1:zA6tOIWhr5uZUez+PGpdfBHDWQOfhOrr0cgKDagZPwk= code.cloudfoundry.org/tlsconfig v0.0.0-20231017135636-f0e44068c22f h1:5OUq3fp3kg9ztdzVX7V7SvSZ06rKWhG3CybVmXsj8O8= @@ -682,8 +682,9 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2 github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2CSIqUrmQPqA0gdRIlnLEY0gK5JGjh37zN5U= github.com/go-latex/latex v0.0.0-20210823091927-c0d11ff05a81/go.mod h1:SX0U8uGpxhq9o2S/CELCSUxEWWAuoCUcVCQWv7G2OCk= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= @@ -836,9 +837,9 @@ github.com/nats-io/go-nats v1.7.2 h1:cJujlwCYR8iMz5ofZSD/p2WLW8FabhkQ2lIEVbSvNSA github.com/nats-io/go-nats v1.7.2/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0= github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.10.3 h1:nk2QVLpJUh3/AhZCJlQdTfj2oeLDvWnn1Z6XzGlNFm0= -github.com/nats-io/nats-server/v2 v2.10.3/go.mod h1:lzrskZ/4gyMAh+/66cCd+q74c6v7muBypzfWhP/MAaM= -github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY= +github.com/nats-io/nats-server/v2 v2.10.4 h1:uB9xcwon3tPXWAdmTJqqqC6cie3yuPWHJjjTBgaPNus= +github.com/nats-io/nats-server/v2 v2.10.4/go.mod h1:eWm2JmHP9Lqm2oemB6/XGi0/GwsZwtWf8HIPUsh+9ns= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -880,8 +881,8 @@ github.com/onsi/gomega v1.27.1/go.mod h1:aHX5xOykVYzWOV4WqQy0sy8BQptgukenXpCXfad github.com/onsi/gomega v1.27.3/go.mod h1:5vG284IBtfDAmDyrK+eGyZmUgUlmi+Wngqo557cZ6Gw= github.com/onsi/gomega v1.27.4/go.mod h1:riYq/GJKh8hhoM01HN6Vmuy93AarCXCBGpvFDK3q3fQ= github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= -github.com/onsi/gomega v1.28.1 h1:MijcGUbfYuznzK/5R4CPNoUP/9Xvuo20sXfEm6XxoTA= -github.com/onsi/gomega v1.28.1/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= +github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= +github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ= github.com/openzipkin/zipkin-go v0.4.2 h1:zjqfqHjUpPmB3c1GlCvvgsM1G4LkvqQbBDueDOCg/jA= github.com/openzipkin/zipkin-go v0.4.2/go.mod h1:ZeVkFjuuBiSy13y8vpSDCjMi9GoI3hPpCJSBx/EYFhY= github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY= @@ -981,7 +982,6 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/README.md b/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/README.md index ab593118..a8c29bfb 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/README.md +++ b/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/README.md @@ -1,6 +1,7 @@ # A minimal logging API for Go [![Go Reference](https://pkg.go.dev/badge/github.com/go-logr/logr.svg)](https://pkg.go.dev/github.com/go-logr/logr) +[![OpenSSF Scorecard](https://api.securityscorecards.dev/projects/github.com/go-logr/logr/badge)](https://securityscorecards.dev/viewer/?platform=github.com&org=go-logr&repo=logr) logr offers an(other) opinion on how Go programs and libraries can do logging without becoming coupled to a particular logging implementation. This is not @@ -73,6 +74,29 @@ received: If the Go standard library had defined an interface for logging, this project probably would not be needed. Alas, here we are. +When the Go developers started developing such an interface with +[slog](https://github.com/golang/go/issues/56345), they adopted some of the +logr design but also left out some parts and changed others: + +| Feature | logr | slog | +|---------|------|------| +| High-level API | `Logger` (passed by value) | `Logger` (passed by [pointer](https://github.com/golang/go/issues/59126)) | +| Low-level API | `LogSink` | `Handler` | +| Stack unwinding | done by `LogSink` | done by `Logger` | +| Skipping helper functions | `WithCallDepth`, `WithCallStackHelper` | [not supported by Logger](https://github.com/golang/go/issues/59145) | +| Generating a value for logging on demand | `Marshaler` | `LogValuer` | +| Log levels | >= 0, higher meaning "less important" | positive and negative, with 0 for "info" and higher meaning "more important" | +| Error log entries | always logged, don't have a verbosity level | normal log entries with level >= `LevelError` | +| Passing logger via context | `NewContext`, `FromContext` | no API | +| Adding a name to a logger | `WithName` | no API | +| Modify verbosity of log entries in a call chain | `V` | no API | +| Grouping of key/value pairs | not supported | `WithGroup`, `GroupValue` | + +The high-level slog API is explicitly meant to be one of many different APIs +that can be layered on top of a shared `slog.Handler`. logr is one such +alternative API, with [interoperability](#slog-interoperability) provided by the [`slogr`](slogr) +package. + ### Inspiration Before you consider this package, please read [this blog post by the @@ -118,6 +142,91 @@ There are implementations for the following logging libraries: - **github.com/go-kit/log**: [gokitlogr](https://github.com/tonglil/gokitlogr) (also compatible with github.com/go-kit/kit/log since v0.12.0) - **bytes.Buffer** (writing to a buffer): [bufrlogr](https://github.com/tonglil/buflogr) (useful for ensuring values were logged, like during testing) +## slog interoperability + +Interoperability goes both ways, using the `logr.Logger` API with a `slog.Handler` +and using the `slog.Logger` API with a `logr.LogSink`. [slogr](./slogr) provides `NewLogr` and +`NewSlogHandler` API calls to convert between a `logr.Logger` and a `slog.Handler`. +As usual, `slog.New` can be used to wrap such a `slog.Handler` in the high-level +slog API. `slogr` itself leaves that to the caller. + +## Using a `logr.Sink` as backend for slog + +Ideally, a logr sink implementation should support both logr and slog by +implementing both the normal logr interface(s) and `slogr.SlogSink`. Because +of a conflict in the parameters of the common `Enabled` method, it is [not +possible to implement both slog.Handler and logr.Sink in the same +type](https://github.com/golang/go/issues/59110). + +If both are supported, log calls can go from the high-level APIs to the backend +without the need to convert parameters. `NewLogr` and `NewSlogHandler` can +convert back and forth without adding additional wrappers, with one exception: +when `Logger.V` was used to adjust the verbosity for a `slog.Handler`, then +`NewSlogHandler` has to use a wrapper which adjusts the verbosity for future +log calls. + +Such an implementation should also support values that implement specific +interfaces from both packages for logging (`logr.Marshaler`, `slog.LogValuer`, +`slog.GroupValue`). logr does not convert those. + +Not supporting slog has several drawbacks: +- Recording source code locations works correctly if the handler gets called + through `slog.Logger`, but may be wrong in other cases. That's because a + `logr.Sink` does its own stack unwinding instead of using the program counter + provided by the high-level API. +- slog levels <= 0 can be mapped to logr levels by negating the level without a + loss of information. But all slog levels > 0 (e.g. `slog.LevelWarning` as + used by `slog.Logger.Warn`) must be mapped to 0 before calling the sink + because logr does not support "more important than info" levels. +- The slog group concept is supported by prefixing each key in a key/value + pair with the group names, separated by a dot. For structured output like + JSON it would be better to group the key/value pairs inside an object. +- Special slog values and interfaces don't work as expected. +- The overhead is likely to be higher. + +These drawbacks are severe enough that applications using a mixture of slog and +logr should switch to a different backend. + +## Using a `slog.Handler` as backend for logr + +Using a plain `slog.Handler` without support for logr works better than the +other direction: +- All logr verbosity levels can be mapped 1:1 to their corresponding slog level + by negating them. +- Stack unwinding is done by the `slogr.SlogSink` and the resulting program + counter is passed to the `slog.Handler`. +- Names added via `Logger.WithName` are gathered and recorded in an additional + attribute with `logger` as key and the names separated by slash as value. +- `Logger.Error` is turned into a log record with `slog.LevelError` as level + and an additional attribute with `err` as key, if an error was provided. + +The main drawback is that `logr.Marshaler` will not be supported. Types should +ideally support both `logr.Marshaler` and `slog.Valuer`. If compatibility +with logr implementations without slog support is not important, then +`slog.Valuer` is sufficient. + +## Context support for slog + +Storing a logger in a `context.Context` is not supported by +slog. `logr.NewContext` and `logr.FromContext` can be used with slog like this +to fill this gap: + + func HandlerFromContext(ctx context.Context) slog.Handler { + logger, err := logr.FromContext(ctx) + if err == nil { + return slogr.NewSlogHandler(logger) + } + return slog.Default().Handler() + } + + func ContextWithHandler(ctx context.Context, handler slog.Handler) context.Context { + return logr.NewContext(ctx, slogr.NewLogr(handler)) + } + +The downside is that storing and retrieving a `slog.Handler` needs more +allocations compared to using a `logr.Logger`. Therefore the recommendation is +to use the `logr.Logger` API in code which uses contextual logging. + ## FAQ ### Conceptual @@ -241,7 +350,9 @@ Otherwise, you can start out with `0` as "you always want to see this", Then gradually choose levels in between as you need them, working your way down from 10 (for debug and trace style logs) and up from 1 (for chattier -info-type logs.) +info-type logs). For reference, slog pre-defines -4 for debug logs +(corresponds to 4 in logr), which matches what is +[recommended for Kubernetes](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-instrumentation/logging.md#what-method-to-use). #### How do I choose my keys? diff --git a/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/SECURITY.md b/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/SECURITY.md new file mode 100644 index 00000000..1ca756fc --- /dev/null +++ b/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/SECURITY.md @@ -0,0 +1,18 @@ +# Security Policy + +If you have discovered a security vulnerability in this project, please report it +privately. **Do not disclose it as a public issue.** This gives us time to work with you +to fix the issue before public exposure, reducing the chance that the exploit will be +used before a patch is released. + +You may submit the report in the following ways: + +- send an email to go-logr-security@googlegroups.com +- send us a [private vulnerability report](https://github.com/go-logr/logr/security/advisories/new) + +Please provide the following information in your report: + +- A description of the vulnerability and its impact +- How to reproduce the issue + +We ask that you give us 90 days to work on a fix before public exposure. diff --git a/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/funcr/funcr.go b/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/funcr/funcr.go index e52f0cd0..12e5807c 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/funcr/funcr.go +++ b/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/funcr/funcr.go @@ -116,17 +116,17 @@ type Options struct { // Equivalent hooks are offered for key-value pairs saved via // logr.Logger.WithValues or Formatter.AddValues (see RenderValuesHook) and // for user-provided pairs (see RenderArgsHook). - RenderBuiltinsHook func(kvList []interface{}) []interface{} + RenderBuiltinsHook func(kvList []any) []any // RenderValuesHook is the same as RenderBuiltinsHook, except that it is // only called for key-value pairs saved via logr.Logger.WithValues. See // RenderBuiltinsHook for more details. - RenderValuesHook func(kvList []interface{}) []interface{} + RenderValuesHook func(kvList []any) []any // RenderArgsHook is the same as RenderBuiltinsHook, except that it is only // called for key-value pairs passed directly to Info and Error. See // RenderBuiltinsHook for more details. - RenderArgsHook func(kvList []interface{}) []interface{} + RenderArgsHook func(kvList []any) []any // MaxLogDepth tells funcr how many levels of nested fields (e.g. a struct // that contains a struct, etc.) it may log. Every time it finds a struct, @@ -163,7 +163,7 @@ func (l fnlogger) WithName(name string) logr.LogSink { return &l } -func (l fnlogger) WithValues(kvList ...interface{}) logr.LogSink { +func (l fnlogger) WithValues(kvList ...any) logr.LogSink { l.Formatter.AddValues(kvList) return &l } @@ -173,12 +173,12 @@ func (l fnlogger) WithCallDepth(depth int) logr.LogSink { return &l } -func (l fnlogger) Info(level int, msg string, kvList ...interface{}) { +func (l fnlogger) Info(level int, msg string, kvList ...any) { prefix, args := l.FormatInfo(level, msg, kvList) l.write(prefix, args) } -func (l fnlogger) Error(err error, msg string, kvList ...interface{}) { +func (l fnlogger) Error(err error, msg string, kvList ...any) { prefix, args := l.FormatError(err, msg, kvList) l.write(prefix, args) } @@ -229,7 +229,7 @@ func newFormatter(opts Options, outfmt outputFormat) Formatter { type Formatter struct { outputFormat outputFormat prefix string - values []interface{} + values []any valuesStr string depth int opts *Options @@ -246,10 +246,10 @@ const ( ) // PseudoStruct is a list of key-value pairs that gets logged as a struct. -type PseudoStruct []interface{} +type PseudoStruct []any // render produces a log line, ready to use. -func (f Formatter) render(builtins, args []interface{}) string { +func (f Formatter) render(builtins, args []any) string { // Empirically bytes.Buffer is faster than strings.Builder for this. buf := bytes.NewBuffer(make([]byte, 0, 1024)) if f.outputFormat == outputJSON { @@ -292,7 +292,7 @@ func (f Formatter) render(builtins, args []interface{}) string { // This function returns a potentially modified version of kvList, which // ensures that there is a value for every key (adding a value if needed) and // that each key is a string (substituting a key if needed). -func (f Formatter) flatten(buf *bytes.Buffer, kvList []interface{}, continuing bool, escapeKeys bool) []interface{} { +func (f Formatter) flatten(buf *bytes.Buffer, kvList []any, continuing bool, escapeKeys bool) []any { // This logic overlaps with sanitize() but saves one type-cast per key, // which can be measurable. if len(kvList)%2 != 0 { @@ -334,7 +334,7 @@ func (f Formatter) flatten(buf *bytes.Buffer, kvList []interface{}, continuing b return kvList } -func (f Formatter) pretty(value interface{}) string { +func (f Formatter) pretty(value any) string { return f.prettyWithFlags(value, 0, 0) } @@ -343,7 +343,7 @@ const ( ) // TODO: This is not fast. Most of the overhead goes here. -func (f Formatter) prettyWithFlags(value interface{}, flags uint32, depth int) string { +func (f Formatter) prettyWithFlags(value any, flags uint32, depth int) string { if depth > f.opts.MaxLogDepth { return `""` } @@ -614,7 +614,7 @@ func isEmpty(v reflect.Value) bool { return false } -func invokeMarshaler(m logr.Marshaler) (ret interface{}) { +func invokeMarshaler(m logr.Marshaler) (ret any) { defer func() { if r := recover(); r != nil { ret = fmt.Sprintf("", r) @@ -675,12 +675,12 @@ func (f Formatter) caller() Caller { const noValue = "" -func (f Formatter) nonStringKey(v interface{}) string { +func (f Formatter) nonStringKey(v any) string { return fmt.Sprintf("", f.snippet(v)) } // snippet produces a short snippet string of an arbitrary value. -func (f Formatter) snippet(v interface{}) string { +func (f Formatter) snippet(v any) string { const snipLen = 16 snip := f.pretty(v) @@ -693,7 +693,7 @@ func (f Formatter) snippet(v interface{}) string { // sanitize ensures that a list of key-value pairs has a value for every key // (adding a value if needed) and that each key is a string (substituting a key // if needed). -func (f Formatter) sanitize(kvList []interface{}) []interface{} { +func (f Formatter) sanitize(kvList []any) []any { if len(kvList)%2 != 0 { kvList = append(kvList, noValue) } @@ -727,8 +727,8 @@ func (f Formatter) GetDepth() int { // FormatInfo renders an Info log message into strings. The prefix will be // empty when no names were set (via AddNames), or when the output is // configured for JSON. -func (f Formatter) FormatInfo(level int, msg string, kvList []interface{}) (prefix, argsStr string) { - args := make([]interface{}, 0, 64) // using a constant here impacts perf +func (f Formatter) FormatInfo(level int, msg string, kvList []any) (prefix, argsStr string) { + args := make([]any, 0, 64) // using a constant here impacts perf prefix = f.prefix if f.outputFormat == outputJSON { args = append(args, "logger", prefix) @@ -745,10 +745,10 @@ func (f Formatter) FormatInfo(level int, msg string, kvList []interface{}) (pref } // FormatError renders an Error log message into strings. The prefix will be -// empty when no names were set (via AddNames), or when the output is +// empty when no names were set (via AddNames), or when the output is // configured for JSON. -func (f Formatter) FormatError(err error, msg string, kvList []interface{}) (prefix, argsStr string) { - args := make([]interface{}, 0, 64) // using a constant here impacts perf +func (f Formatter) FormatError(err error, msg string, kvList []any) (prefix, argsStr string) { + args := make([]any, 0, 64) // using a constant here impacts perf prefix = f.prefix if f.outputFormat == outputJSON { args = append(args, "logger", prefix) @@ -761,12 +761,12 @@ func (f Formatter) FormatError(err error, msg string, kvList []interface{}) (pre args = append(args, "caller", f.caller()) } args = append(args, "msg", msg) - var loggableErr interface{} + var loggableErr any if err != nil { loggableErr = err.Error() } args = append(args, "error", loggableErr) - return f.prefix, f.render(args, kvList) + return prefix, f.render(args, kvList) } // AddName appends the specified name. funcr uses '/' characters to separate @@ -781,7 +781,7 @@ func (f *Formatter) AddName(name string) { // AddValues adds key-value pairs to the set of saved values to be logged with // each log line. -func (f *Formatter) AddValues(kvList []interface{}) { +func (f *Formatter) AddValues(kvList []any) { // Three slice args forces a copy. n := len(f.values) f.values = append(f.values[:n:n], kvList...) diff --git a/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/logr.go b/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/logr.go index e027aea3..2a5075a1 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/logr.go +++ b/src/code.cloudfoundry.org/vendor/github.com/go-logr/logr/logr.go @@ -127,9 +127,9 @@ limitations under the License. // such a value can call its methods without having to check whether the // instance is ready for use. // -// Calling methods with the null logger (Logger{}) as instance will crash -// because it has no LogSink. Therefore this null logger should never be passed -// around. For cases where passing a logger is optional, a pointer to Logger +// The zero logger (= Logger{}) is identical to Discard() and discards all log +// entries. Code that receives a Logger by value can simply call it, the methods +// will never crash. For cases where passing a logger is optional, a pointer to Logger // should be used. // // # Key Naming Conventions @@ -258,6 +258,12 @@ type Logger struct { // Enabled tests whether this Logger is enabled. For example, commandline // flags might be used to set the logging verbosity and disable some info logs. func (l Logger) Enabled() bool { + // Some implementations of LogSink look at the caller in Enabled (e.g. + // different verbosity levels per package or file), but we only pass one + // CallDepth in (via Init). This means that all calls from Logger to the + // LogSink's Enabled, Info, and Error methods must have the same number of + // frames. In other words, Logger methods can't call other Logger methods + // which call these LogSink methods unless we do it the same in all paths. return l.sink != nil && l.sink.Enabled(l.level) } @@ -267,11 +273,11 @@ func (l Logger) Enabled() bool { // line. The key/value pairs can then be used to add additional variable // information. The key/value pairs must alternate string keys and arbitrary // values. -func (l Logger) Info(msg string, keysAndValues ...interface{}) { +func (l Logger) Info(msg string, keysAndValues ...any) { if l.sink == nil { return } - if l.Enabled() { + if l.sink.Enabled(l.level) { // see comment in Enabled if withHelper, ok := l.sink.(CallStackHelperLogSink); ok { withHelper.GetCallStackHelper()() } @@ -289,7 +295,7 @@ func (l Logger) Info(msg string, keysAndValues ...interface{}) { // while the err argument should be used to attach the actual error that // triggered this log line, if present. The err parameter is optional // and nil may be passed instead of an error instance. -func (l Logger) Error(err error, msg string, keysAndValues ...interface{}) { +func (l Logger) Error(err error, msg string, keysAndValues ...any) { if l.sink == nil { return } @@ -314,9 +320,16 @@ func (l Logger) V(level int) Logger { return l } +// GetV returns the verbosity level of the logger. If the logger's LogSink is +// nil as in the Discard logger, this will always return 0. +func (l Logger) GetV() int { + // 0 if l.sink nil because of the if check in V above. + return l.level +} + // WithValues returns a new Logger instance with additional key/value pairs. // See Info for documentation on how key/value pairs work. -func (l Logger) WithValues(keysAndValues ...interface{}) Logger { +func (l Logger) WithValues(keysAndValues ...any) Logger { if l.sink == nil { return l } @@ -467,15 +480,15 @@ type LogSink interface { // The level argument is provided for optional logging. This method will // only be called when Enabled(level) is true. See Logger.Info for more // details. - Info(level int, msg string, keysAndValues ...interface{}) + Info(level int, msg string, keysAndValues ...any) // Error logs an error, with the given message and key/value pairs as // context. See Logger.Error for more details. - Error(err error, msg string, keysAndValues ...interface{}) + Error(err error, msg string, keysAndValues ...any) // WithValues returns a new LogSink with additional key/value pairs. See // Logger.WithValues for more details. - WithValues(keysAndValues ...interface{}) LogSink + WithValues(keysAndValues ...any) LogSink // WithName returns a new LogSink with the specified name appended. See // Logger.WithName for more details. @@ -546,5 +559,5 @@ type Marshaler interface { // with exported fields // // It may return any value of any type. - MarshalLog() interface{} + MarshalLog() any } diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/README.md b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/README.md index 47216304..ce81b254 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/README.md +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/README.md @@ -24,7 +24,7 @@ If you are interested in contributing to NATS, read about our... -- [Contributing guide](https://nats.io/community/#contribute) +- [Contributing guide](./CONTRIBUTING.md) - [Report issues or propose Pull Requests](https://github.com/nats-io) [License-Url]: https://www.apache.org/licenses/LICENSE-2.0 @@ -37,8 +37,8 @@ If you are interested in contributing to NATS, read about our... [Fossa-Image]: https://app.fossa.io/api/projects/git%2Bgithub.com%2Fnats-io%2Fnats-server.svg?type=shield [Build-Status-Url]: https://travis-ci.com/github/nats-io/nats-server [Build-Status-Image]: https://travis-ci.com/nats-io/nats-server.svg?branch=main -[Release-Url]: https://github.com/nats-io/nats-server/releases/tag/v2.10.3 -[Release-image]: https://img.shields.io/badge/release-v2.10.3-1eb0fc.svg +[Release-Url]: https://github.com/nats-io/nats-server/releases/tag/v2.10.4 +[Release-image]: https://img.shields.io/badge/release-v2.10.4-1eb0fc.svg [Coverage-Url]: https://coveralls.io/r/nats-io/nats-server?branch=main [Coverage-image]: https://coveralls.io/repos/github/nats-io/nats-server/badge.svg?branch=main [ReportCard-Url]: https://goreportcard.com/report/nats-io/nats-server diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/client.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/client.go index e3364c8a..6aba4395 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/client.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/client.go @@ -141,6 +141,7 @@ const ( expectConnect // Marks if this connection is expected to send a CONNECT connectProcessFinished // Marks if this connection has finished the connect process. compressionNegotiated // Marks if this connection has negotiated compression level with remote. + didTLSFirst // Marks if this connection requested and was accepted doing the TLS handshake first (prior to INFO). ) // set the flag (would be equivalent to set the boolean to true) @@ -2800,6 +2801,11 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { ) acc.mu.RLock() + // If this is from a service import, ignore. + if sub.si { + acc.mu.RUnlock() + return nil + } subj := string(sub.subject) if len(acc.imports.streams) > 0 { tokens = tokenizeSubjectIntoSlice(tsa[:0], subj) @@ -5226,7 +5232,7 @@ func (c *client) reconnect() { // Check for a solicited route. If it was, start up a reconnect unless // we are already connected to the other end. - if c.isSolicitedRoute() || retryImplicit { + if didSolicit := c.isSolicitedRoute(); didSolicit || retryImplicit { srv.mu.Lock() defer srv.mu.Unlock() @@ -5236,7 +5242,7 @@ func (c *client) reconnect() { rtype := c.route.routeType rurl := c.route.url accName := string(c.route.accName) - checkRID := accName == _EMPTY_ && srv.routesPoolSize <= 1 && rid != _EMPTY_ + checkRID := accName == _EMPTY_ && srv.getOpts().Cluster.PoolSize < 1 && rid != _EMPTY_ c.mu.Unlock() // It is possible that the server is being shutdown. @@ -5246,6 +5252,14 @@ func (c *client) reconnect() { } if checkRID && srv.routes[rid] != nil { + // This is the case of "no pool". Make sure that the registered one + // is upgraded to solicited if the connection trying to reconnect + // was a solicited one. + if didSolicit { + if remote := srv.routes[rid][0]; remote != nil { + upgradeRouteToSolicited(remote, rurl, rtype) + } + } srv.Debugf("Not attempting reconnect for solicited route, already connected to %q", rid) return } else if rid == srv.info.ID { @@ -5305,10 +5319,10 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) { // Check our cache. if pac, ok = c.in.pacache[string(c.pa.pacache)]; ok { // Check the genid to see if it's still valid. - // sl could be swapped out on reload so need to lock. - pac.acc.mu.RLock() + // Since v2.10.0, the config reload of accounts has been fixed + // and an account's sublist pointer should not change, so no need to + // lock to access it. sl := pac.acc.sl - pac.acc.mu.RUnlock() if genid := atomic.LoadUint64(&sl.genid); genid != pac.genid { ok = false @@ -5320,15 +5334,15 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) { } if !ok { - // Match correct account and sublist. - if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil { - return nil, nil + if c.kind == ROUTER && len(c.route.accName) > 0 { + acc = c.acc + } else { + // Match correct account and sublist. + if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil { + return nil, nil + } } - - // sl could be swapped out on reload so need to lock. - acc.mu.RLock() sl := acc.sl - acc.mu.RUnlock() // Match against the account sublist. r = sl.Match(string(c.pa.subject)) diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/const.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/const.go index 1342d4dd..603a3ae4 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.10.3" + VERSION = "2.10.4" // PROTO is the currently supported protocol. // 0 was the original @@ -82,6 +82,12 @@ const ( // TLS_TIMEOUT is the TLS wait time. TLS_TIMEOUT = 2 * time.Second + // DEFAULT_TLS_HANDSHAKE_FIRST_FALLBACK_DELAY is the default amount of + // time for the server to wait for the TLS handshake with a client to + // be initiated before falling back to sending the INFO protocol first. + // See TLSHandshakeFirst and TLSHandshakeFirstFallback options. + DEFAULT_TLS_HANDSHAKE_FIRST_FALLBACK_DELAY = 50 * time.Millisecond + // AUTH_TIMEOUT is the authorization wait time. AUTH_TIMEOUT = 2 * time.Second diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/consumer.go index 94e000a4..1f96e772 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/consumer.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/consumer.go @@ -272,6 +272,10 @@ var ( AckNext = []byte("+NXT") // Terminate delivery of the message. AckTerm = []byte("+TERM") + + // reasons to supply when terminating messages using limits + ackTermLimitsReason = "Message deleted by stream limits" + ackTermUnackedLimitsReason = "Unacknowledged message was deleted" ) // Calculate accurate replicas for the consumer config with the parent stream config. @@ -292,6 +296,7 @@ type consumer struct { // This will be checked in checkPending to abort processing // and let ack be processed in priority. awl int64 + leader atomic.Bool mu sync.RWMutex js *jetStream mset *stream @@ -1131,19 +1136,21 @@ func (o *consumer) isLeader() bool { func (o *consumer) setLeader(isLeader bool) { o.mu.RLock() - mset := o.mset - isRunning := o.ackSub != nil + mset, closed := o.mset, o.closed + movingToClustered := o.node != nil && o.pch == nil + wasLeader := o.leader.Swap(isLeader) o.mu.RUnlock() // If we are here we have a change in leader status. if isLeader { - if mset == nil { + if closed || mset == nil { return } - if isRunning { + + if wasLeader { // If we detect we are scaling up, make sure to create clustered routines and channels. - o.mu.Lock() - if o.node != nil && o.pch == nil { + if movingToClustered { + o.mu.Lock() // We are moving from R1 to clustered. o.pch = make(chan struct{}, 1) go o.loopAndForwardProposals(o.qch) @@ -1153,8 +1160,8 @@ func (o *consumer) setLeader(isLeader bool) { default: } } + o.mu.Unlock() } - o.mu.Unlock() return } @@ -1988,8 +1995,12 @@ func (o *consumer) processAck(subject, reply string, hdr int, rmsg []byte) { o.processNak(sseq, dseq, dc, msg) case bytes.Equal(msg, AckProgress): o.progressUpdate(sseq) - case bytes.Equal(msg, AckTerm): - o.processTerm(sseq, dseq, dc) + case bytes.HasPrefix(msg, AckTerm): + var reason string + if buf := msg[len(AckTerm):]; len(buf) > 0 { + reason = string(bytes.TrimSpace(buf)) + } + o.processTerm(sseq, dseq, dc, reason) } // Ack the ack if requested. @@ -2316,7 +2327,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { } // Process a TERM -func (o *consumer) processTerm(sseq, dseq, dc uint64) { +func (o *consumer) processTerm(sseq, dseq, dc uint64, reason string) { // Treat like an ack to suppress redelivery. o.processAckMsg(sseq, dseq, dc, false) @@ -2335,6 +2346,7 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64) { ConsumerSeq: dseq, StreamSeq: sseq, Deliveries: dc, + Reason: reason, Domain: o.srv.getOpts().JetStreamDomain, } @@ -2543,7 +2555,11 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo { // If we are replicated and we are not the leader we need to pull certain data from our store. if rg != nil && rg.node != nil && !o.isLeader() && o.store != nil { - state, _ := o.store.BorrowState() + state, err := o.store.BorrowState() + if err != nil { + o.mu.Unlock() + return nil + } info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream info.NumAckPending = len(state.Pending) @@ -3643,7 +3659,7 @@ func (o *consumer) checkAckFloor() { o.mu.RUnlock() // If it was pending for us, get rid of it. if isPending { - o.processTerm(seq, p.Sequence, rdc) + o.processTerm(seq, p.Sequence, rdc, ackTermLimitsReason) } } } else if numPending > 0 { @@ -3668,7 +3684,7 @@ func (o *consumer) checkAckFloor() { for i := 0; i < len(toTerm); i += 3 { seq, dseq, rdc := toTerm[i], toTerm[i+1], toTerm[i+2] - o.processTerm(seq, dseq, rdc) + o.processTerm(seq, dseq, rdc, ackTermLimitsReason) } } @@ -4641,16 +4657,6 @@ type lastSeqSkipList struct { seqs []uint64 } -// Will create a skip list for us from a store's subjects state. -func createLastSeqSkipList(mss map[string]SimpleState) []uint64 { - seqs := make([]uint64, 0, len(mss)) - for _, ss := range mss { - seqs = append(seqs, ss.Last) - } - sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) - return seqs -} - // Let's us know we have a skip list, which is for deliver last per subject and we are just starting. // Lock should be held. func (o *consumer) hasSkipListPending() bool { @@ -4681,37 +4687,49 @@ func (o *consumer) selectStartingSeqNo() { } } } else if o.cfg.DeliverPolicy == DeliverLastPerSubject { - if o.subjf == nil { - if mss := o.mset.store.SubjectsState(o.cfg.FilterSubject); len(mss) > 0 { - o.lss = &lastSeqSkipList{ - resume: state.LastSeq, - seqs: createLastSeqSkipList(mss), - } - o.sseq = o.lss.seqs[0] + // If our parent stream is set to max msgs per subject of 1 this is just + // a normal consumer at this point. We can avoid any heavy lifting. + if o.mset.cfg.MaxMsgsPer == 1 { + o.sseq = state.FirstSeq + } else { + // A threshold for when we switch from get last msg to subjects state. + const numSubjectsThresh = 256 + lss := &lastSeqSkipList{resume: state.LastSeq} + var filters []string + if o.subjf == nil { + filters = append(filters, o.cfg.FilterSubject) } else { - // If no mapping info just set to last. - o.sseq = state.LastSeq + for _, filter := range o.subjf { + filters = append(filters, filter.subject) + } } - return - } - lss := &lastSeqSkipList{ - resume: state.LastSeq, - } - for _, filter := range o.subjf { - if mss := o.mset.store.SubjectsState(filter.subject); len(mss) > 0 { - lss.seqs = append(lss.seqs, createLastSeqSkipList(mss)...) + for _, filter := range filters { + if st := o.mset.store.SubjectsTotals(filter); len(st) < numSubjectsThresh { + var smv StoreMsg + for subj := range st { + if sm, err := o.mset.store.LoadLastMsg(subj, &smv); err == nil { + lss.seqs = append(lss.seqs, sm.seq) + } + } + } else if mss := o.mset.store.SubjectsState(filter); len(mss) > 0 { + for _, ss := range mss { + lss.seqs = append(lss.seqs, ss.Last) + } + } } - } - if len(lss.seqs) == 0 { - o.sseq = state.LastSeq - } - // Sort the skip list - sort.Slice(lss.seqs, func(i, j int) bool { - return lss.seqs[j] > lss.seqs[i] - }) - o.lss = lss - if len(o.lss.seqs) != 0 { - o.sseq = o.lss.seqs[0] + // Sort the skip list if needed. + if len(lss.seqs) > 1 { + sort.Slice(lss.seqs, func(i, j int) bool { + return lss.seqs[j] > lss.seqs[i] + }) + } + if len(lss.seqs) == 0 { + o.sseq = state.LastSeq + } else { + o.sseq = lss.seqs[0] + } + // Assign skip list. + o.lss = lss } } else if o.cfg.OptStartTime != nil { // If we are here we are time based. @@ -5144,11 +5162,10 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { o.mu.Unlock() // If it was pending process it like an ack. - // TODO(dlc) - we could do a term here instead with a reason to generate the advisory. if wasPending { // We could have lock for stream so do this in a go routine. // TODO(dlc) - We should do this with ipq vs naked go routines. - go o.processTerm(sseq, p.Sequence, rdc) + go o.processTerm(sseq, p.Sequence, rdc, ackTermUnackedLimitsReason) } } diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/events.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/events.go index 0f761a47..601ed85a 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/events.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/events.go @@ -1598,14 +1598,16 @@ func (s *Server) shutdownEventing() { s.mu.Lock() clearTimer(&s.sys.sweeper) clearTimer(&s.sys.stmr) - sys := s.sys + rc := s.sys.resetCh + s.sys.resetCh = nil + wg := &s.sys.wg s.mu.Unlock() // We will queue up a shutdown event and wait for the // internal send loop to exit. s.sendShutdownEvent() - sys.wg.Wait() - close(sys.resetCh) + wg.Wait() + close(rc) s.mu.Lock() defer s.mu.Unlock() diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index 4238ed21..3157e678 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -176,6 +176,7 @@ type fileStore struct { blks []*msgBlock bim map[uint32]*msgBlock psim map[string]*psi + tsl int hh hash.Hash64 qch chan struct{} fch chan struct{} @@ -428,7 +429,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim prior := fs.state // Reset anything that could have been set from above. fs.state = StreamState{} - fs.psim = make(map[string]*psi) + fs.psim, fs.tsl = make(map[string]*psi), 0 fs.bim = make(map[uint32]*msgBlock) fs.blks = nil fs.tombs = nil @@ -467,7 +468,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Check if we have any left over tombstones to process. if len(fs.tombs) > 0 { for _, seq := range fs.tombs { - fs.removeMsg(seq, false, false, false) + fs.removeMsg(seq, false, true, false) fs.removeFromLostData(seq) } // Not needed after this phase. @@ -937,6 +938,16 @@ func (mb *msgBlock) ensureLastChecksumLoaded() { copy(mb.lchk[0:], mb.lastChecksum()) } +// Perform a recover but do not update PSIM. +// Lock should be held. +func (fs *fileStore) recoverMsgBlockNoSubjectUpdates(index uint32) (*msgBlock, error) { + psim := fs.psim + fs.psim = nil + mb, err := fs.recoverMsgBlock(index) + fs.psim = psim + return mb, err +} + // Lock held on entry func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) { mb := fs.initMsgBlock(index) @@ -1218,6 +1229,8 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { mb.clearCacheAndOffset() buf, err := mb.loadBlock(nil) + defer recycleMsgBlockBuf(buf) + if err != nil || len(buf) == 0 { var ld *LostStreamData // No data to rebuild from here. @@ -1553,7 +1566,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { // Check for per subject info. if numSubjects := int(readU64()); numSubjects > 0 { - fs.psim = make(map[string]*psi, numSubjects) + fs.psim, fs.tsl = make(map[string]*psi, numSubjects), 0 for i := 0; i < numSubjects; i++ { if lsubj := int(readU64()); lsubj > 0 { if bi+lsubj > len(buf) { @@ -1570,6 +1583,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { psi.lblk = psi.fblk } fs.psim[subj] = psi + fs.tsl += len(subj) } } } @@ -1646,18 +1660,18 @@ func (fs *fileStore) recoverFullState() (rerr error) { return errPriorState } if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched { - // Remove the last message block since we will re-process below. + // Remove the last message block since recover will add in the new one. fs.removeMsgBlockFromList(mb) + if nmb, err := fs.recoverMsgBlockNoSubjectUpdates(mb.index); err != nil && !os.IsNotExist(err) { + os.Remove(fn) + return errCorruptState + } else if nmb != nil { + fs.adjustAccounting(mb, nmb) + } } // We may need to check other blocks. Even if we matched last checksum we will see if there is another block. - // If we did not match we re-process the last block. - start := blkIndex - if matched { - start++ - } - - for bi := start; ; bi++ { + for bi := blkIndex + 1; ; bi++ { nmb, err := fs.recoverMsgBlock(bi) if err != nil { if os.IsNotExist(err) { @@ -1668,13 +1682,6 @@ func (fs *fileStore) recoverFullState() (rerr error) { return err } if nmb != nil { - // Check if we have to account for a partial message block. - if !matched && mb != nil && mb.index == nmb.index { - if err := fs.adjustAccounting(mb, nmb); err != nil { - fs.warn("Stream state could not adjust accounting") - return err - } - } // Update top level accounting. if fs.state.FirstSeq == 0 || nmb.first.seq < fs.state.FirstSeq { fs.state.FirstSeq = nmb.first.seq @@ -1691,9 +1698,9 @@ func (fs *fileStore) recoverFullState() (rerr error) { } // adjustAccounting will be called when a stream state was only partially accounted for -// with a message block, e.g. additional records were added after the stream state. +// within a message block, e.g. additional records were added after the stream state. // Lock should be held. -func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error { +func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) { nmb.mu.Lock() defer nmb.mu.Unlock() @@ -1703,23 +1710,29 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error { } nmb.ensurePerSubjectInfoLoaded() - // Walk all the original mb's sequences that were included in the stream state. + // Walk only new messages and update accounting at fs level. Any messages that should have + // triggered limits exceeded will be handled after the recovery and prior to the stream + // being available to the system. var smv StoreMsg - for seq := mb.first.seq; seq <= mb.last.seq; seq++ { - // If we had already declared it deleted we can move on since you can not undelete. - if mb.dmap.Exists(seq) { - continue - } - // Lookup the message. + for seq := mb.last.seq + 1; seq <= nmb.last.seq; seq++ { + // Lookup the message. If an error will be deleted, so can skip. sm, err := nmb.cacheLookup(seq, &smv) if err != nil { - return err + continue } // Since we found it we just need to adjust fs totals and psim. - fs.state.Msgs-- - fs.state.Bytes -= fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) + fs.state.Msgs++ + fs.state.Bytes += fileStoreMsgSize(sm.subj, sm.hdr, sm.msg) if len(sm.subj) > 0 && fs.psim != nil { - fs.removePerSubject(sm.subj) + if info, ok := fs.psim[sm.subj]; ok { + info.total++ + if nmb.index > info.lblk { + info.lblk = nmb.index + } + } else { + fs.psim[sm.subj] = &psi{total: 1, fblk: nmb.index, lblk: nmb.index} + fs.tsl += len(sm.subj) + } } } @@ -1729,7 +1742,15 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error { nmb.first = mb.first } - return nil + // Update top level accounting. + if fs.state.FirstSeq == 0 || nmb.first.seq < fs.state.FirstSeq { + fs.state.FirstSeq = nmb.first.seq + fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC() + } + if nmb.last.seq > fs.state.LastSeq { + fs.state.LastSeq = nmb.last.seq + fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC() + } } // Grabs last checksum for the named block file. @@ -2041,7 +2062,7 @@ func (fs *fileStore) expireMsgsOnRecover() { lmb.writeTombstone(last.seq, last.ts) } // Clear any global subject state. - fs.psim = make(map[string]*psi) + fs.psim, fs.tsl = make(map[string]*psi), 0 } // If we purged anything, make sure we kick flush state loop. @@ -2405,7 +2426,7 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { fs.mu.RLock() defer fs.mu.RUnlock() - if fs.state.Msgs == 0 { + if fs.state.Msgs == 0 || fs.noTrackSubjects() { return nil } @@ -2432,8 +2453,12 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { } mb.mu.Lock() - // Make sure we have fss loaded. - mb.ensurePerSubjectInfoLoaded() + var shouldExpire bool + if mb.fss == nil { + // Make sure we have fss loaded. + mb.loadMsgsWithLock() + shouldExpire = true + } for subj, ss := range mb.fss { if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) { if ss.firstNeedsUpdate { @@ -2449,6 +2474,10 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState { } } } + if shouldExpire { + // Expire this cache before moving on. + mb.tryForceExpireCacheLocked() + } mb.mu.Unlock() if mb == stop { @@ -2940,6 +2969,7 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in } } else { fs.psim[subj] = &psi{total: 1, fblk: index, lblk: index} + fs.tsl += len(subj) } } @@ -3214,7 +3244,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) { if numMsgs != fs.state.Msgs { fs.warn("Detected skew in subject-based total (%d) vs raw total (%d), rebuilding", numMsgs, fs.state.Msgs) // Clear any global subject state. - fs.psim = make(map[string]*psi) + fs.psim, fs.tsl = make(map[string]*psi), 0 for _, mb := range fs.blks { ld, _, err := mb.rebuildState() if err != nil && ld != nil { @@ -3323,6 +3353,7 @@ func (fs *fileStore) removePerSubject(subj string) { info.fblk = info.lblk } else if info.total == 0 { delete(fs.psim, subj) + fs.tsl -= len(subj) } } } @@ -4268,7 +4299,7 @@ func (fs *fileStore) checkMsgs() *LostStreamData { fs.checkAndFlushAllBlocks() // Clear any global subject state. - fs.psim = make(map[string]*psi) + fs.psim, fs.tsl = make(map[string]*psi), 0 for _, mb := range fs.blks { // Make sure encryption loaded if needed for the block. @@ -5985,7 +6016,7 @@ func compareFn(subject string) func(string, string) bool { // Will return the number of purged messages. func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) { if subject == _EMPTY_ || subject == fwcs { - if keep == 0 && (sequence == 0 || sequence == 1) { + if keep == 0 && sequence == 0 { return fs.Purge() } if sequence > 1 { @@ -6141,7 +6172,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { fs.lmb = nil fs.bim = make(map[uint32]*msgBlock) // Clear any per subject tracking. - fs.psim = make(map[string]*psi) + fs.psim, fs.tsl = make(map[string]*psi), 0 // Mark dirty fs.dirty++ @@ -6414,7 +6445,7 @@ func (fs *fileStore) reset() error { fs.blks, fs.lmb = nil, nil // Reset subject mappings. - fs.psim = make(map[string]*psi) + fs.psim, fs.tsl = make(map[string]*psi), 0 fs.bim = make(map[uint32]*msgBlock) // If we purged anything, make sure we kick flush state loop. @@ -6701,7 +6732,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si // Lock should be held. func (fs *fileStore) resetGlobalPerSubjectInfo() { // Clear any global subject state. - fs.psim = make(map[string]*psi) + fs.psim, fs.tsl = make(map[string]*psi), 0 for _, mb := range fs.blks { fs.populateGlobalPerSubjectInfo(mb) } @@ -6725,7 +6756,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error { if err := mb.loadMsgsWithLock(); err != nil { return err } - // indexCaceheBuf can produce fss now, so if non-nil we are good. + // indexCacheBuf can produce fss now, so if non-nil we are good. if mb.fss != nil { return nil } @@ -6799,6 +6830,7 @@ func (fs *fileStore) populateGlobalPerSubjectInfo(mb *msgBlock) { } } else { fs.psim[subj] = &psi{total: ss.Msgs, fblk: mb.index, lblk: mb.index} + fs.tsl += len(subj) } } } @@ -6949,10 +6981,19 @@ func (fs *fileStore) writeFullState() error { return nil } - var _buf [32 * 1024]byte - _buf[0], _buf[1] = fullStateMagic, fullStateVersion - buf := _buf[:hdrLen] + // For calculating size. + numSubjects := len(fs.psim) + + // Calculate and estimate of the uper bound on the size to avoid multiple allocations. + sz := 2 + // Magic and Version + (binary.MaxVarintLen64 * 6) + // FS data + binary.MaxVarintLen64 + fs.tsl + // NumSubjects + total subject length + numSubjects*(binary.MaxVarintLen64*4) + // psi record + len(fs.blks)*((binary.MaxVarintLen64*6)+512) + // msg blocks, 512 is est for dmap + binary.MaxVarintLen64 + 8 // last index + checksum + buf := make([]byte, hdrLen, sz) + buf[0], buf[1] = fullStateMagic, fullStateVersion buf = binary.AppendUvarint(buf, fs.state.Msgs) buf = binary.AppendUvarint(buf, fs.state.Bytes) buf = binary.AppendUvarint(buf, fs.state.FirstSeq) @@ -6961,9 +7002,7 @@ func (fs *fileStore) writeFullState() error { buf = binary.AppendVarint(buf, timestampNormalized(fs.state.LastTime)) // Do per subject information map if applicable. - numSubjects := len(fs.psim) buf = binary.AppendUvarint(buf, uint64(numSubjects)) - if numSubjects > 0 { for subj, psi := range fs.psim { buf = binary.AppendUvarint(buf, uint64(len(subj))) @@ -6985,6 +7024,7 @@ func (fs *fileStore) writeFullState() error { // Use basetime to save some space. baseTime := timestampNormalized(fs.state.FirstTime) + var scratch [8 * 1024]byte for _, mb := range fs.blks { mb.mu.RLock() @@ -6998,7 +7038,6 @@ func (fs *fileStore) writeFullState() error { numDeleted := mb.dmap.Size() buf = binary.AppendUvarint(buf, uint64(numDeleted)) if numDeleted > 0 { - var scratch [8 * 1024]byte dmap, _ := mb.dmap.Encode(scratch[:0]) buf = append(buf, dmap...) } @@ -7038,6 +7077,10 @@ func (fs *fileStore) writeFullState() error { // Release lock. fs.mu.Unlock() + if cap(buf) > sz { + fs.warn("WriteFullState reallocated from %d to %d", sz, cap(buf)) + } + // Write to a tmp file and rename. const tmpPre = streamStreamStateFile + tsep f, err := os.CreateTemp(filepath.Join(fs.fcfg.StoreDir, msgDir), tmpPre) @@ -7317,6 +7360,9 @@ func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumer } } + // Write out full state as well before proceeding. + fs.writeFullState() + pr, pw := net.Pipe() // Set a write deadline here to protect ourselves. diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go index 756e75a5..e029f4cd 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go @@ -1038,6 +1038,10 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro } s.mu.RLock() + if s.sys == nil { + s.mu.RUnlock() + return ErrServerNotRunning + } sendq := s.sys.sendq s.mu.RUnlock() diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go index 7a3b9203..c4163c85 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_api.go @@ -4120,7 +4120,9 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account, } for _, o := range obs[offset:] { - resp.Consumers = append(resp.Consumers, o.info()) + if cinfo := o.info(); cinfo != nil { + resp.Consumers = append(resp.Consumers, cinfo) + } if len(resp.Consumers) >= JSApiListLimit { break } diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index 67278f20..b5436fac 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -2016,7 +2016,7 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor if storage == FileStorage { fs, err := newFileStoreWithCreated( FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute, srv: s}, - StreamConfig{Name: rg.Name, Storage: FileStorage}, + StreamConfig{Name: rg.Name, Storage: FileStorage, Metadata: labels}, time.Now().UTC(), s.jsKeyGen(s.getOpts().JetStreamKey, rg.Name), s.jsKeyGen(s.getOpts().JetStreamOldKey, rg.Name), @@ -6472,7 +6472,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) { var si StreamInfo if err := json.Unmarshal(msg, &si); err != nil { - s.Warnf("Error unmarshaling clustered stream info response:%v", err) + s.Warnf("Error unmarshalling clustered stream info response:%v", err) return } select { @@ -8121,7 +8121,7 @@ func (mset *stream) handleClusterSyncRequest(sub *subscription, c *client, _ *Ac func (js *jetStream) offlineClusterInfo(rg *raftGroup) *ClusterInfo { s := js.srv - ci := &ClusterInfo{Name: s.ClusterName()} + ci := &ClusterInfo{Name: s.ClusterName(), RaftGroup: rg.Name} for _, peer := range rg.Peers { if sir, ok := s.nodeToInfo.Load(peer); ok && sir != nil { si := sir.(nodeInfo) @@ -8150,8 +8150,9 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo { n := rg.node ci := &ClusterInfo{ - Name: s.cachedClusterName(), - Leader: s.serverNameForNode(n.GroupLeader()), + Name: s.cachedClusterName(), + Leader: s.serverNameForNode(n.GroupLeader()), + RaftGroup: rg.Name, } now := time.Now() diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go index 2e1c7313..ab7c6191 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/jetstream_events.go @@ -134,6 +134,7 @@ type JSConsumerDeliveryTerminatedAdvisory struct { ConsumerSeq uint64 `json:"consumer_seq"` StreamSeq uint64 `json:"stream_seq"` Deliveries uint64 `json:"deliveries"` + Reason string `json:"reason,omitempty"` Domain string `json:"domain,omitempty"` } diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/memstore.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/memstore.go index 3ffaacaf..0d037be6 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/memstore.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/memstore.go @@ -427,9 +427,17 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje // We will adjust from the totals above by scanning what we need to exclude. ss.First = first var adjust uint64 + var tss *SimpleState + for seq := ms.state.FirstSeq; seq < first; seq++ { if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && isMatch(sm.subj) { - adjust++ + if lastPerSubject { + tss = ms.fss[sm.subj] + } + // If we are last per subject, make sure to only adjust if all messages are before our first. + if tss == nil || tss.Last < first { + adjust++ + } if seen != nil { seen[sm.subj] = true } @@ -515,8 +523,7 @@ func (ms *memStore) SubjectsTotals(filterSubject string) map[string]uint64 { // NumPending will return the number of pending messages matching the filter subject starting at sequence. func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) { - // This needs to be a write lock, as filteredStateLocked can - // mutate the per-subject state. + // This needs to be a write lock, as filteredStateLocked can mutate the per-subject state. ms.mu.Lock() defer ms.mu.Unlock() @@ -629,7 +636,7 @@ func (ms *memStore) expireMsgs() { // Will return the number of purged messages. func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) { if subject == _EMPTY_ || subject == fwcs { - if keep == 0 && (sequence == 0 || sequence == 1) { + if keep == 0 && sequence == 0 { return ms.Purge() } if sequence > 1 { diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/monitor.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/monitor.go index 66f5e81a..073c468e 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/monitor.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/monitor.go @@ -130,6 +130,7 @@ type ConnInfo struct { TLSVersion string `json:"tls_version,omitempty"` TLSCipher string `json:"tls_cipher_suite,omitempty"` TLSPeerCerts []*TLSPeerCert `json:"tls_peer_certs,omitempty"` + TLSFirst bool `json:"tls_first,omitempty"` AuthorizedUser string `json:"authorized_user,omitempty"` Account string `json:"account,omitempty"` Subs []string `json:"subscriptions_list,omitempty"` @@ -568,6 +569,7 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time, auth bool) if auth && len(cs.PeerCertificates) > 0 { ci.TLSPeerCerts = makePeerCerts(cs.PeerCertificates) } + ci.TLSFirst = client.flags.isSet(didTLSFirst) } } diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go index 5e5f8dab..73027220 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/mqtt.go @@ -85,6 +85,7 @@ const ( mqttConnAckRCServerUnavailable = byte(0x3) mqttConnAckRCBadUserOrPassword = byte(0x4) mqttConnAckRCNotAuthorized = byte(0x5) + mqttConnAckRCQoS2WillRejected = byte(0x10) // Maximum payload size of a control packet mqttMaxPayloadSize = 0xFFFFFFF @@ -345,6 +346,14 @@ type mqtt struct { asm *mqttAccountSessionManager // quick reference to account session manager, immutable after processConnect() sess *mqttSession // quick reference to session, immutable after processConnect() cid string // client ID + + // rejectQoS2Pub tells the MQTT client to not accept QoS2 PUBLISH, instead + // error and terminate the connection. + rejectQoS2Pub bool + + // downgradeQOS2Sub tells the MQTT client to downgrade QoS2 SUBSCRIBE + // requests to QoS1. + downgradeQoS2Sub bool } type mqttPending struct { @@ -475,7 +484,11 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client { } now := time.Now() - c := &client{srv: s, nc: conn, mpay: maxPay, msubs: maxSubs, start: now, last: now, mqtt: &mqtt{}, ws: ws} + mqtt := &mqtt{ + rejectQoS2Pub: opts.MQTT.rejectQoS2Pub, + downgradeQoS2Sub: opts.MQTT.downgradeQoS2Sub, + } + c := &client{srv: s, nc: conn, mpay: maxPay, msubs: maxSubs, start: now, last: now, mqtt: mqtt, ws: ws} c.headers = true c.mqtt.pp = &mqttPublish{} // MQTT clients don't send NATS CONNECT protocols. So make it an "echo" @@ -598,9 +611,9 @@ func validateMQTTOptions(o *Options) error { if mo.Port == 0 { return nil } - // We have to force the server name to be explicitly set. There are conditions - // where we need a unique, repeatable name. - if o.ServerName == _EMPTY_ { + // We have to force the server name to be explicitly set and be unique when + // in cluster mode. + if o.ServerName == _EMPTY_ && (o.Cluster.Port != 0 || o.Gateway.Port != 0) { return errMQTTServerNameMustBeSet } // If there is a NoAuthUser, we need to have Users defined and @@ -2029,9 +2042,6 @@ func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, acc // Add/Replace this message from the retained messages map. // If a message for this topic already existed, the existing record is updated // with the provided information. -// This function will return the stream sequence of the record before its update, -// or 0 if the record was added instead of updated. -// // Lock not held on entry. func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetainedMsgRef) { as.mu.Lock() @@ -2040,7 +2050,7 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai as.retmsgs = make(map[string]*mqttRetainedMsgRef) as.sl = NewSublistWithCache() } else { - // Check if we already had one. If so, update the existing one. + // Check if we already had one retained message. If so, update the existing one. if erm, exists := as.retmsgs[key]; exists { // If the new sequence is below the floor or the existing one, // then ignore the new one. @@ -2057,6 +2067,7 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rm *mqttRetai erm.sub = &subscription{subject: []byte(key)} as.sl.Insert(erm.sub) } + return } } rm.sub = &subscription{subject: []byte(key)} @@ -2228,6 +2239,10 @@ func (as *mqttAccountSessionManager) processSubs(sess *mqttSession, c *client, if f.qos > 2 { f.qos = 2 } + if c.mqtt.downgradeQoS2Sub && f.qos == 2 { + c.Warnf("Downgrading subscription QoS2 to QoS1 for %q, as configured", f.filter) + f.qos = 1 + } subject := f.filter sid := subject @@ -2331,6 +2346,10 @@ func (as *mqttAccountSessionManager) serializeRetainedMsgsForSub(sess *mqttSessi if qos > sub.mqtt.qos { qos = sub.mqtt.qos } + if c.mqtt.rejectQoS2Pub && qos == 2 { + c.Warnf("Rejecting retained message with QoS2 for subscription %q, as configured", sub.subject) + continue + } if qos > 0 { pi = sess.trackPublishRetained() @@ -3003,6 +3022,10 @@ func (c *client) mqttParseConnect(r *mqttReader, pl int, hasMappings bool) (byte hasWill = true } + if c.mqtt.rejectQoS2Pub && hasWill && wqos == 2 { + return mqttConnAckRCQoS2WillRejected, nil, fmt.Errorf("server does not accept QoS2 for Will messages") + } + // Spec [MQTT-3.1.2-19] hasUser := cp.flags&mqttConnFlagUsernameFlag != 0 // Spec [MQTT-3.1.2-21] @@ -3387,6 +3410,11 @@ func (c *client) mqttParsePub(r *mqttReader, pl int, pp *mqttPublish, hasMapping if qos > 2 { return fmt.Errorf("QoS=%v is invalid in MQTT", qos) } + + if c.mqtt.rejectQoS2Pub && qos == 2 { + return fmt.Errorf("QoS=2 is disabled for PUBLISH messages") + } + // Keep track of where we are when starting to read the variable header start := r.pos diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/opts.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/opts.go index 039f982c..ff46afb0 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/opts.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/opts.go @@ -327,11 +327,23 @@ type Options struct { TLSConfig *tls.Config `json:"-"` TLSPinnedCerts PinnedCertSet `json:"-"` TLSRateLimit int64 `json:"-"` - AllowNonTLS bool `json:"-"` - WriteDeadline time.Duration `json:"-"` - MaxClosedClients int `json:"-"` - LameDuckDuration time.Duration `json:"-"` - LameDuckGracePeriod time.Duration `json:"-"` + // When set to true, the server will perform the TLS handshake before + // sending the INFO protocol. For clients that are not configured + // with a similar option, their connection will fail with some sort + // of timeout or EOF error since they are expecting to receive an + // INFO protocol first. + TLSHandshakeFirst bool `json:"-"` + // If TLSHandshakeFirst is true and this value is strictly positive, + // the server will wait for that amount of time for the TLS handshake + // to start before falling back to previous behavior of sending the + // INFO protocol first. It allows for a mix of newer clients that can + // require a TLS handshake first, and older clients that can't. + TLSHandshakeFirstFallback time.Duration `json:"-"` + AllowNonTLS bool `json:"-"` + WriteDeadline time.Duration `json:"-"` + MaxClosedClients int `json:"-"` + LameDuckDuration time.Duration `json:"-"` + LameDuckGracePeriod time.Duration `json:"-"` // MaxTracedMsgLen is the maximum printable length for traced messages. MaxTracedMsgLen int `json:"-"` @@ -550,6 +562,14 @@ type MQTTOpts struct { // Snapshot of configured TLS options. tlsConfigOpts *TLSConfigOpts + + // rejectQoS2Pub tells the MQTT client to not accept QoS2 PUBLISH, instead + // error and terminate the connection. + rejectQoS2Pub bool + + // downgradeQOS2Sub tells the MQTT client to downgrade QoS2 SUBSCRIBE + // requests to QoS1. + downgradeQoS2Sub bool } type netResolver interface { @@ -638,7 +658,8 @@ type TLSConfigOpts struct { Insecure bool Map bool TLSCheckKnownURLs bool - HandshakeFirst bool // Indicate that the TLS handshake should occur first, before sending the INFO protocol + HandshakeFirst bool // Indicate that the TLS handshake should occur first, before sending the INFO protocol. + FallbackDelay time.Duration // Where supported, indicates how long to wait for the handshake before falling back to sending the INFO protocol first. Timeout float64 RateLimit int64 Ciphers []uint16 @@ -1072,6 +1093,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error o.TLSMap = tc.Map o.TLSPinnedCerts = tc.PinnedCerts o.TLSRateLimit = tc.RateLimit + o.TLSHandshakeFirst = tc.HandshakeFirst + o.TLSHandshakeFirstFallback = tc.FallbackDelay // Need to keep track of path of the original TLS config // and certs path for OCSP Stapling monitoring. @@ -4312,7 +4335,30 @@ func parseTLS(v interface{}, isClientCtx bool) (t *TLSConfigOpts, retErr error) } tc.CertMatch = certMatch case "handshake_first", "first", "immediate": - tc.HandshakeFirst = mv.(bool) + switch mv := mv.(type) { + case bool: + tc.HandshakeFirst = mv + case string: + switch strings.ToLower(mv) { + case "true", "on": + tc.HandshakeFirst = true + case "false", "off": + tc.HandshakeFirst = false + case "auto", "auto_fallback": + tc.HandshakeFirst = true + tc.FallbackDelay = DEFAULT_TLS_HANDSHAKE_FIRST_FALLBACK_DELAY + default: + // Check to see if this is a duration. + if dur, err := time.ParseDuration(mv); err == nil { + tc.HandshakeFirst = true + tc.FallbackDelay = dur + break + } + return nil, &configErr{tk, fmt.Sprintf("field %q's value %q is invalid", mk, mv)} + } + default: + return nil, &configErr{tk, fmt.Sprintf("field %q should be a boolean or a string, got %T", mk, mv)} + } case "ocsp_peer": switch vv := mv.(type) { case bool: @@ -4593,6 +4639,11 @@ func parseMQTT(v interface{}, o *Options, errors *[]error, warnings *[]error) er case "consumer_inactive_threshold", "consumer_auto_cleanup": o.MQTT.ConsumerInactiveThreshold = parseDuration("consumer_inactive_threshold", tk, mv, errors, warnings) + case "reject_qos2_publish": + o.MQTT.rejectQoS2Pub = mv.(bool) + case "downgrade_qos2_subscribe": + o.MQTT.downgradeQoS2Sub = mv.(bool) + default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/raft.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/raft.go index 7baa949d..b0f30786 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/raft.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/raft.go @@ -133,8 +133,7 @@ type raft struct { wtype StorageType track bool werr error - state RaftState - isLeader atomic.Bool + state atomic.Int32 // RaftState hh hash.Hash64 snapfile string csz int @@ -382,7 +381,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe wal: cfg.Log, wtype: cfg.Log.Type(), track: cfg.Track, - state: Follower, csz: ps.clusterSize, qn: ps.clusterSize/2 + 1, hash: hash, @@ -653,12 +651,11 @@ func (s *Server) transferRaftLeaders() bool { // Propose will propose a new entry to the group. // This should only be called on the leader. func (n *raft) Propose(data []byte) error { - n.RLock() - if state := n.state; state != Leader { - n.RUnlock() + if state := n.State(); state != Leader { n.debug("Proposal ignored, not leader (state: %v)", state) return errNotLeader } + n.RLock() // Error if we had a previous write error. if werr := n.werr; werr != nil { n.RUnlock() @@ -674,12 +671,11 @@ func (n *raft) Propose(data []byte) error { // ProposeDirect will propose entries directly. // This should only be called on the leader. func (n *raft) ProposeDirect(entries []*Entry) error { - n.RLock() - if n.state != Leader { - n.RUnlock() - n.debug("Direct proposal ignored, not leader (state: %v)", n.state) + if state := n.State(); state != Leader { + n.debug("Direct proposal ignored, not leader (state: %v)", state) return errNotLeader } + n.RLock() // Error if we had a previous write error. if werr := n.werr; werr != nil { n.RUnlock() @@ -706,11 +702,10 @@ func (n *raft) ForwardProposal(entry []byte) error { // ProposeAddPeer is called to add a peer to the group. func (n *raft) ProposeAddPeer(peer string) error { - n.RLock() - if n.state != Leader { - n.RUnlock() + if n.State() != Leader { return errNotLeader } + n.RLock() // Error if we had a previous write error. if werr := n.werr; werr != nil { n.RUnlock() @@ -742,7 +737,7 @@ func (n *raft) doRemovePeerAsLeader(peer string) { func (n *raft) ProposeRemovePeer(peer string) error { n.RLock() prop, subj := n.prop, n.rpsubj - isLeader := n.state == Leader + isLeader := n.State() == Leader werr := n.werr n.RUnlock() @@ -794,11 +789,10 @@ func (n *raft) AdjustBootClusterSize(csz int) error { // AdjustClusterSize will change the cluster set size. // Must be the leader. func (n *raft) AdjustClusterSize(csz int) error { - n.Lock() - if n.state != Leader { - n.Unlock() + if n.State() != Leader { return errNotLeader } + n.Lock() // Same floor as bootstrap. if csz < 2 { csz = 2 @@ -816,14 +810,15 @@ func (n *raft) AdjustClusterSize(csz int) error { // PauseApply will allow us to pause processing of append entries onto our // external apply chan. func (n *raft) PauseApply() error { + if n.State() == Leader { + return errAlreadyLeader + } + n.Lock() defer n.Unlock() - if n.state == Leader { - return errAlreadyLeader - } // If we are currently a candidate make sure we step down. - if n.state == Candidate { + if n.State() == Candidate { n.stepdown.push(noLeader) } @@ -944,12 +939,12 @@ func (n *raft) SendSnapshot(data []byte) error { // all of the log entries up to and including index. This should not be called with // entries that have been applied to the FSM but have not been applied to the raft state. func (n *raft) InstallSnapshot(data []byte) error { - n.Lock() - if n.state == Closed { - n.Unlock() + if n.State() == Closed { return errNodeClosed } + n.Lock() + if werr := n.werr; werr != nil { n.Unlock() return werr @@ -1164,7 +1159,7 @@ func (n *raft) Leader() bool { if n == nil { return false } - return n.isLeader.Load() + return n.State() == Leader } func (n *raft) isCatchingUp() bool { @@ -1178,7 +1173,7 @@ func (n *raft) isCatchingUp() bool { // Lock should be held. func (n *raft) isCurrent(includeForwardProgress bool) bool { // Check if we are closed. - if n.state == Closed { + if n.State() == Closed { n.debug("Not current, node is closed") return false } @@ -1190,7 +1185,7 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { } // Make sure we are the leader or we know we have heard from the leader recently. - if n.state == Leader { + if n.State() == Leader { return true } @@ -1300,7 +1295,7 @@ func (n *raft) StepDown(preferred ...string) error { return errTooManyPrefs } - if n.state != Leader { + if n.State() != Leader { n.Unlock() return errNotLeader } @@ -1394,7 +1389,7 @@ func randCampaignTimeout() time.Duration { // Lock should be held. func (n *raft) campaign() error { n.debug("Starting campaign") - if n.state == Leader { + if n.State() == Leader { return errAlreadyLeader } n.resetElect(randCampaignTimeout()) @@ -1405,7 +1400,7 @@ func (n *raft) campaign() error { // Lock should be held. func (n *raft) xferCampaign() error { n.debug("Starting transfer campaign") - if n.state == Leader { + if n.State() == Leader { n.lxfer = false return errAlreadyLeader } @@ -1415,9 +1410,7 @@ func (n *raft) xferCampaign() error { // State returns the current state for this node. func (n *raft) State() RaftState { - n.RLock() - defer n.RUnlock() - return n.state + return RaftState(n.state.Load()) } // Progress returns the current index, commit and applied values. @@ -1478,7 +1471,7 @@ func (n *raft) UpdateKnownPeers(knownPeers []string) { // Process like peer state update. ps := &peerState{knownPeers, len(knownPeers), n.extSt} n.processPeerState(ps) - isLeader := n.state == Leader + isLeader := n.State() == Leader n.Unlock() // If we are the leader send this update out as well. @@ -1507,7 +1500,14 @@ func (n *raft) Delete() { func (n *raft) shutdown(shouldDelete bool) { n.Lock() - if n.state == Closed { + + // Returned swap value is the previous state. It looks counter-intuitive + // to do this atomic operation with the lock held, but we have to do so in + // order to make sure that switchState() is not already running. If it is + // then it can potentially update the n.state back to a non-closed state, + // allowing shutdown() to be called again. If that happens then the below + // close(n.quit) will panic from trying to close an already-closed channel. + if n.state.Swap(int32(Closed)) == int32(Closed) { n.Unlock() return } @@ -1525,7 +1525,6 @@ func (n *raft) shutdown(shouldDelete bool) { } c.closeConnection(InternalClient) } - n.state = Closed s, g, wal := n.s, n.group, n.wal // Delete our peer state and vote state and any snapshots. @@ -1788,7 +1787,7 @@ func (n *raft) processAppendEntries() { } func (n *raft) runAsFollower() { - for { + for n.State() == Follower { elect := n.electTimer() select { @@ -2156,11 +2155,11 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account, } func (n *raft) runAsLeader() { - n.RLock() - if n.state == Closed { - n.RUnlock() + if n.State() == Closed { return } + + n.RLock() psubj, rpsubj := n.psubj, n.rpsubj n.RUnlock() @@ -2193,7 +2192,7 @@ func (n *raft) runAsLeader() { lq := time.NewTicker(lostQuorumCheck) defer lq.Stop() - for { + for n.State() == Leader { select { case <-n.s.quitCh: n.shutdown(false) @@ -2541,7 +2540,7 @@ func (n *raft) loadEntry(index uint64) (*appendEntry, error) { // applyCommit will update our commit index and apply the entry to the apply chan. // lock should be held. func (n *raft) applyCommit(index uint64) error { - if n.state == Closed { + if n.State() == Closed { return errNodeClosed } if index <= n.commit { @@ -2551,7 +2550,7 @@ func (n *raft) applyCommit(index uint64) error { original := n.commit n.commit = index - if n.state == Leader { + if n.State() == Leader { delete(n.acks, index) } @@ -2568,7 +2567,7 @@ func (n *raft) applyCommit(index uint64) error { if ae, err = n.loadEntry(index); err != nil { if err != ErrStoreClosed && err != ErrStoreEOF { n.warn("Got an error loading %d index: %v - will reset", index, err) - if n.state == Leader { + if n.State() == Leader { n.stepdown.push(n.selectNextLeader()) } // Reset and cancel any catchup. @@ -2596,7 +2595,7 @@ func (n *raft) applyCommit(index uint64) error { case EntrySnapshot: committed = append(committed, e) case EntryPeerState: - if n.state != Leader { + if n.State() != Leader { if ps, err := decodePeerState(e.Data); err == nil { n.processPeerState(ps) } @@ -2646,7 +2645,7 @@ func (n *raft) applyCommit(index uint64) error { } // If this is us and we are the leader we should attempt to stepdown. - if peer == n.id && n.state == Leader { + if peer == n.id && n.State() == Leader { n.stepdown.push(n.selectNextLeader()) } @@ -2674,12 +2673,12 @@ func (n *raft) applyCommit(index uint64) error { // Used to track a success response and apply entries. func (n *raft) trackResponse(ar *appendEntryResponse) { - n.Lock() - if n.state == Closed { - n.Unlock() + if n.State() == Closed { return } + n.Lock() + // Update peer's last index. if ps := n.peers[ar.peer]; ps != nil && ar.index > ps.li { ps.li = ar.index @@ -2736,7 +2735,7 @@ func (n *raft) adjustClusterSizeAndQuorum() { n.lsut = time.Now() } else if ncsz < pcsz { n.debug("Decreasing our clustersize: %d -> %d", pcsz, ncsz) - if n.state == Leader { + if n.State() == Leader { go n.sendHeartbeat() } } @@ -2749,7 +2748,7 @@ func (n *raft) trackPeer(peer string) error { if n.removed != nil { _, isRemoved = n.removed[peer] } - if n.state == Leader { + if n.State() == Leader { if lp, ok := n.peers[peer]; !ok || !lp.kp { // Check if this peer had been removed previously. needPeerAdd = !isRemoved @@ -2780,7 +2779,7 @@ func (n *raft) runAsCandidate() { // We vote for ourselves. votes := 1 - for { + for n.State() == Candidate { elect := n.electTimer() select { case <-n.entry.ch: @@ -2962,7 +2961,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } // Just return if closed or we had previous write error. - if n.state == Closed || n.werr != nil { + if n.State() == Closed || n.werr != nil { n.Unlock() return } @@ -2972,7 +2971,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { arbuf := scratch[:] // Are we receiving from another leader. - if n.state == Leader { + if n.State() == Leader { // If we are the same we should step down to break the tie. if ae.term >= n.term { n.term = ae.term @@ -2993,7 +2992,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } // If we received an append entry as a candidate we should convert to a follower. - if n.state == Candidate { + if n.State() == Candidate { n.debug("Received append entry in candidate state from %q, converting to follower", ae.leader) if n.term < ae.term { n.term = ae.term @@ -3059,13 +3058,13 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if isNew { n.writeTermVote() } - if n.state != Follower { - n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.state, ae.leader) + if n.State() != Follower { + n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader) n.stepdown.push(ae.leader) } } - if isNew && n.leader != ae.leader && n.state == Follower { + if isNew && n.leader != ae.leader && n.State() == Follower { n.debug("AppendEntry updating leader to %q", ae.leader) n.updateLeader(ae.leader) n.writeTermVote() @@ -3333,7 +3332,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error { // Sanity checking for now. if index := ae.pindex + 1; index != seq { n.warn("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset", ae, seq, n.pindex) - if n.state == Leader { + if n.State() == Leader { n.stepdown.push(n.selectNextLeader()) } // Reset and cancel any catchup. @@ -3564,7 +3563,7 @@ func (n *raft) readTermVote() (term uint64, voted string, err error) { // Lock should be held. func (n *raft) setWriteErrLocked(err error) { // Check if we are closed already. - if n.state == Closed { + if n.State() == Closed { return } // Ignore if already set. @@ -3598,9 +3597,7 @@ func (n *raft) setWriteErrLocked(err error) { // Helper to check if we are closed when we do not hold a lock already. func (n *raft) isClosed() bool { - n.RLock() - defer n.RUnlock() - return n.state == Closed + return n.State() == Closed } // Capture our write error if any and hold. @@ -3746,9 +3743,9 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { // If this is a higher term go ahead and stepdown. if vr.term > n.term { - if n.state != Follower { + if n.State() != Follower { n.debug("Stepping down from %s, detected higher term: %d vs %d", - strings.ToLower(n.state.String()), vr.term, n.term) + strings.ToLower(n.State().String()), vr.term, n.term) n.stepdown.push(noLeader) n.term = vr.term } @@ -3787,7 +3784,7 @@ func (n *raft) handleVoteRequest(sub *subscription, c *client, _ *Account, subje func (n *raft) requestVote() { n.Lock() - if n.state != Candidate { + if n.State() != Candidate { n.Unlock() return } @@ -3829,9 +3826,6 @@ func (n *raft) quorumNeeded() int { // Lock should be held. func (n *raft) updateLeadChange(isLeader bool) { - // Update our atomic about being the leader. - n.isLeader.Store(isLeader) - // We don't care about values that have not been consumed (transitory states), // so we dequeue any state that is pending and push the new one. for { @@ -3851,25 +3845,25 @@ func (n *raft) updateLeadChange(isLeader bool) { // Lock should be held. func (n *raft) switchState(state RaftState) { - if n.state == Closed { + if n.State() == Closed { return } // Reset the election timer. n.resetElectionTimeout() - if n.state == Leader && state != Leader { + if n.State() == Leader && state != Leader { n.updateLeadChange(false) // Drain the response queue. n.resp.drain() - } else if state == Leader && n.state != Leader { + } else if state == Leader && n.State() != Leader { if len(n.pae) > 0 { n.pae = make(map[uint64]*appendEntry) } n.updateLeadChange(true) } - n.state = state + n.state.Store(int32(state)) n.writeTermVote() } @@ -3879,11 +3873,13 @@ const ( ) func (n *raft) switchToFollower(leader string) { - n.Lock() - defer n.Unlock() - if n.state == Closed { + if n.State() == Closed { return } + + n.Lock() + defer n.Unlock() + n.debug("Switching to follower") n.lxfer = false @@ -3892,17 +3888,19 @@ func (n *raft) switchToFollower(leader string) { } func (n *raft) switchToCandidate() { - n.Lock() - defer n.Unlock() - if n.state == Closed { + if n.State() == Closed { return } + + n.Lock() + defer n.Unlock() + // If we are catching up or are in observer mode we can not switch. if n.observer || n.paused { return } - if n.state != Candidate { + if n.State() != Candidate { n.debug("Switching to candidate") } else { if n.lostQuorumLocked() && time.Since(n.llqrt) > 20*time.Second { @@ -3919,11 +3917,12 @@ func (n *raft) switchToCandidate() { } func (n *raft) switchToLeader() { - n.Lock() - if n.state == Closed { - n.Unlock() + if n.State() == Closed { return } + + n.Lock() + n.debug("Switching to leader") var state StreamState diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/reload.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/reload.go index 23988171..4e1c2f71 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/reload.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/reload.go @@ -266,6 +266,28 @@ func (t *tlsPinnedCertOption) Apply(server *Server) { server.Noticef("Reloaded: %d pinned_certs", len(t.newValue)) } +// tlsHandshakeFirst implements the option interface for the tls `handshake first` setting. +type tlsHandshakeFirst struct { + noopOption + newValue bool +} + +// Apply is a no-op because the timeout will be reloaded after options are applied. +func (t *tlsHandshakeFirst) Apply(server *Server) { + server.Noticef("Reloaded: Client TLS handshake first: %v", t.newValue) +} + +// tlsHandshakeFirstFallback implements the option interface for the tls `handshake first fallback delay` setting. +type tlsHandshakeFirstFallback struct { + noopOption + newValue time.Duration +} + +// Apply is a no-op because the timeout will be reloaded after options are applied. +func (t *tlsHandshakeFirstFallback) Apply(server *Server) { + server.Noticef("Reloaded: Client TLS handshake first fallback delay: %v", t.newValue) +} + // authOption is a base struct that provides default option behaviors. type authOption struct { noopOption @@ -1222,6 +1244,10 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { diffOpts = append(diffOpts, &tlsTimeoutOption{newValue: newValue.(float64)}) case "tlspinnedcerts": diffOpts = append(diffOpts, &tlsPinnedCertOption{newValue: newValue.(PinnedCertSet)}) + case "tlshandshakefirst": + diffOpts = append(diffOpts, &tlsHandshakeFirst{newValue: newValue.(bool)}) + case "tlshandshakefirstfallback": + diffOpts = append(diffOpts, &tlsHandshakeFirstFallback{newValue: newValue.(time.Duration)}) case "username": diffOpts = append(diffOpts, &usernameOption{}) case "password": diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/route.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/route.go index 493f03e4..4cada7c8 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/route.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/route.go @@ -1071,9 +1071,30 @@ func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) { // in the server's opts.Routes, false otherwise. // Server lock is assumed to be held by caller. func (s *Server) hasThisRouteConfigured(info *Info) bool { - urlToCheckExplicit := strings.ToLower(net.JoinHostPort(info.Host, strconv.Itoa(info.Port))) - for _, ri := range s.getOpts().Routes { - if strings.ToLower(ri.Host) == urlToCheckExplicit { + routes := s.getOpts().Routes + if len(routes) == 0 { + return false + } + // This could possibly be a 0.0.0.0 host so we will also construct a second + // url with the host section of the `info.IP` (if present). + sPort := strconv.Itoa(info.Port) + urlOne := strings.ToLower(net.JoinHostPort(info.Host, sPort)) + var urlTwo string + if info.IP != _EMPTY_ { + if u, _ := url.Parse(info.IP); u != nil { + urlTwo = strings.ToLower(net.JoinHostPort(u.Hostname(), sPort)) + // Ignore if same than the first + if urlTwo == urlOne { + urlTwo = _EMPTY_ + } + } + } + for _, ri := range routes { + rHost := strings.ToLower(ri.Host) + if rHost == urlOne { + return true + } + if urlTwo != _EMPTY_ && rHost == urlTwo { return true } } @@ -1860,6 +1881,17 @@ const ( func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string) bool { id := info.ID + var acc *Account + if accName != _EMPTY_ { + var err error + acc, err = s.LookupAccount(accName) + if err != nil { + c.sendErrAndErr(fmt.Sprintf("Unable to lookup account %q: %v", accName, err)) + c.closeConnection(MissingAccount) + return false + } + } + s.mu.Lock() if !s.isRunning() || s.routesReject { s.mu.Unlock() @@ -1935,6 +1967,9 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string if c.last.IsZero() { c.last = time.Now() } + if acc != nil { + c.acc = acc + } c.mu.Unlock() // Store this route with key being the route id hash + account name @@ -1984,6 +2019,21 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string s.mu.Unlock() return false } + // Look if there is a solicited route in the pool. If there is one, + // they should all be, so stop at the first. + if url, rtype, hasSolicited := hasSolicitedRoute(conns); hasSolicited { + upgradeRouteToSolicited(c, url, rtype) + } + } else { + // If we solicit, upgrade to solicited all non-solicited routes that + // we may have registered. + c.mu.Lock() + url := c.route.url + rtype := c.route.routeType + c.mu.Unlock() + for _, r := range conns { + upgradeRouteToSolicited(r, url, rtype) + } } // For all cases (solicited and not) we need to count how many connections // we already have, and for solicited route, we will find a free spot in @@ -2109,6 +2159,41 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string return !exists } +func hasSolicitedRoute(conns []*client) (*url.URL, RouteType, bool) { + var url *url.URL + var rtype RouteType + for _, r := range conns { + if r == nil { + continue + } + r.mu.Lock() + if r.route.didSolicit { + url = r.route.url + rtype = r.route.routeType + } + r.mu.Unlock() + if url != nil { + return url, rtype, true + } + } + return nil, 0, false +} + +func upgradeRouteToSolicited(r *client, url *url.URL, rtype RouteType) { + if r == nil { + return + } + r.mu.Lock() + if !r.route.didSolicit { + r.route.didSolicit = true + r.route.url = url + } + if rtype == Explicit { + r.route.routeType = Explicit + } + r.mu.Unlock() +} + func handleDuplicateRoute(remote, c *client, setNoReconnect bool) { // We used to clear some fields when closing a duplicate connection // to prevent sending INFO protocols for the remotes to update diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/server.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/server.go index d1d0d109..b1138903 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/server.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/server.go @@ -2573,6 +2573,9 @@ func (s *Server) AcceptLoop(clr chan struct{}) { // Alert of TLS enabled. if opts.TLSConfig != nil { s.Noticef("TLS required for client connections") + if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 { + s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect") + } } // If server was started with RANDOM_PORT (-1), opts.Port would be equal @@ -3041,10 +3044,37 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client { c.Debugf("Client connection created") - // Send our information. - // Need to be sent in place since writeLoop cannot be started until - // TLS handshake is done (if applicable). - c.sendProtoNow(c.generateClientInfoJSON(info)) + // Save info.TLSRequired value since we may neeed to change it back and forth. + orgInfoTLSReq := info.TLSRequired + + var tlsFirstFallback time.Duration + // Check if we should do TLS first. + tlsFirst := opts.TLSConfig != nil && opts.TLSHandshakeFirst + if tlsFirst { + // Make sure info.TLSRequired is set to true (it could be false + // if AllowNonTLS is enabled). + info.TLSRequired = true + // Get the fallback delay value if applicable. + if f := opts.TLSHandshakeFirstFallback; f > 0 { + tlsFirstFallback = f + } else if inProcess { + // For in-process connection, we will always have a fallback + // delay. It allows support for non-TLS, TLS and "TLS First" + // in-process clients to successfully connect. + tlsFirstFallback = DEFAULT_TLS_HANDSHAKE_FIRST_FALLBACK_DELAY + } + } + + // Decide if we are going to require TLS or not and generate INFO json. + tlsRequired := info.TLSRequired + infoBytes := c.generateClientInfoJSON(info) + + // Send our information, except if TLS and TLSHandshakeFirst is requested. + if !tlsFirst { + // Need to be sent in place since writeLoop cannot be started until + // TLS handshake is done (if applicable). + c.sendProtoNow(infoBytes) + } // Unlock to register c.mu.Unlock() @@ -3077,20 +3107,50 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client { } s.clients[c.cid] = c - tlsRequired := info.TLSRequired s.mu.Unlock() // Re-Grab lock c.mu.Lock() - // Connection could have been closed while sending the INFO proto. isClosed := c.isClosed() - var pre []byte + // We need first to check for "TLS First" fallback delay. + if !isClosed && tlsFirstFallback > 0 { + // We wait and see if we are getting any data. Since we did not send + // the INFO protocol yet, only clients that use TLS first should be + // sending data (the TLS handshake). We don't really check the content: + // if it is a rogue agent and not an actual client performing the + // TLS handshake, the error will be detected when performing the + // handshake on our side. + pre = make([]byte, 4) + c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback)) + n, _ := io.ReadFull(c.nc, pre[:]) + c.nc.SetReadDeadline(time.Time{}) + // If we get any data (regardless of possible timeout), we will proceed + // with the TLS handshake. + if n > 0 { + pre = pre[:n] + } else { + // We did not get anything so we will send the INFO protocol. + pre = nil + + // Restore the original info.TLSRequired value if it is + // different that the current value and regenerate infoBytes. + if orgInfoTLSReq != info.TLSRequired { + info.TLSRequired = orgInfoTLSReq + infoBytes = c.generateClientInfoJSON(info) + } + c.sendProtoNow(infoBytes) + // Set the boolean to false for the rest of the function. + tlsFirst = false + // Check closed status again + isClosed = c.isClosed() + } + } // If we have both TLS and non-TLS allowed we need to see which // one the client wants. We'll always allow this for in-process // connections. - if !isClosed && opts.TLSConfig != nil && (inProcess || opts.AllowNonTLS) { + if !isClosed && !tlsFirst && opts.TLSConfig != nil && (inProcess || opts.AllowNonTLS) { pre = make([]byte, 4) c.nc.SetReadDeadline(time.Now().Add(secondsToDuration(opts.TLSTimeout))) n, _ := io.ReadFull(c.nc, pre[:]) @@ -3125,12 +3185,18 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client { } } - // If connection is marked as closed, bail out. + // Now, send the INFO if it was delayed + if !isClosed && tlsFirst { + c.flags.set(didTLSFirst) + c.sendProtoNow(infoBytes) + // Check closed status + isClosed = c.isClosed() + } + + // Connection could have been closed while sending the INFO proto. if isClosed { c.mu.Unlock() - // Connection could have been closed due to TLS timeout or while trying - // to send the INFO protocol. We need to call closeConnection() to make - // sure that proper cleanup is done. + // We need to call closeConnection() to make sure that proper cleanup is done. c.closeConnection(WriteError) return nil } diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/signal_windows.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/signal_windows.go index d55589fe..b262bc0b 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/signal_windows.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/signal_windows.go @@ -34,10 +34,15 @@ func (s *Server) handleSignals() { signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { - for sig := range c { - s.Debugf("Trapped %q signal", sig) - s.Shutdown() - os.Exit(0) + for { + select { + case sig := <-c: + s.Debugf("Trapped %q signal", sig) + s.Shutdown() + os.Exit(0) + case <-s.quitCh: + return + } } }() } diff --git a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/stream.go index 096c850e..ea34bf55 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/src/code.cloudfoundry.org/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -160,9 +160,10 @@ type StreamAlternate struct { // ClusterInfo shows information about the underlying set of servers // that make up the stream or consumer. type ClusterInfo struct { - Name string `json:"name,omitempty"` - Leader string `json:"leader,omitempty"` - Replicas []*PeerInfo `json:"replicas,omitempty"` + Name string `json:"name,omitempty"` + RaftGroup string `json:"raft_group,omitempty"` + Leader string `json:"leader,omitempty"` + Replicas []*PeerInfo `json:"replicas,omitempty"` } // PeerInfo shows information about all the peers in the cluster that @@ -2126,6 +2127,14 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup) // Signal the caller that we have captured the above fields. ready.Done() + // Make sure we have valid ipq for msgs. + if msgs == nil { + mset.mu.Lock() + mset.cancelMirrorConsumer() + mset.mu.Unlock() + return + } + t := time.NewTicker(sourceHealthCheckInterval) defer t.Stop() diff --git a/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/CHANGELOG.md b/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/CHANGELOG.md index 4f512a43..4fc45f29 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/CHANGELOG.md +++ b/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/CHANGELOG.md @@ -1,3 +1,8 @@ +## 1.29.0 + +### Features +- MatchError can now take an optional func(error) bool + description [2b39142] + ## 1.28.1 ### Maintenance diff --git a/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/gomega_dsl.go b/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/gomega_dsl.go index 0625053e..ba082146 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/gomega_dsl.go +++ b/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/gomega_dsl.go @@ -22,7 +22,7 @@ import ( "github.com/onsi/gomega/types" ) -const GOMEGA_VERSION = "1.28.1" +const GOMEGA_VERSION = "1.29.0" const nilGomegaPanic = `You are trying to make an assertion, but haven't registered Gomega's fail handler. If you're using Ginkgo then you probably forgot to put your assertion in an It(). diff --git a/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/matchers.go b/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/matchers.go index 88f10043..cd3f431d 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/matchers.go +++ b/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/matchers.go @@ -88,19 +88,44 @@ func Succeed() types.GomegaMatcher { } // MatchError succeeds if actual is a non-nil error that matches the passed in -// string, error, or matcher. +// string, error, function, or matcher. // // These are valid use-cases: // -// Expect(err).Should(MatchError("an error")) //asserts that err.Error() == "an error" -// Expect(err).Should(MatchError(SomeError)) //asserts that err == SomeError (via reflect.DeepEqual) -// Expect(err).Should(MatchError(ContainSubstring("sprocket not found"))) // asserts that err.Error() contains substring "sprocket not found" +// When passed a string: +// +// Expect(err).To(MatchError("an error")) +// +// asserts that err.Error() == "an error" +// +// When passed an error: +// +// Expect(err).To(MatchError(SomeError)) +// +// First checks if errors.Is(err, SomeError). +// If that fails then it checks if reflect.DeepEqual(err, SomeError) repeatedly for err and any errors wrapped by err +// +// When passed a matcher: +// +// Expect(err).To(MatchError(ContainSubstring("sprocket not found"))) +// +// the matcher is passed err.Error(). In this case it asserts that err.Error() contains substring "sprocket not found" +// +// When passed a func(err) bool and a description: +// +// Expect(err).To(MatchError(os.IsNotExist, "IsNotExist")) +// +// the function is passed err and matches if the return value is true. The description is required to allow Gomega +// to print a useful error message. // // It is an error for err to be nil or an object that does not implement the // Error interface -func MatchError(expected interface{}) types.GomegaMatcher { +// +// The optional second argument is a description of the error function, if used. This is required when passing a function but is ignored in all other cases. +func MatchError(expected interface{}, functionErrorDescription ...any) types.GomegaMatcher { return &matchers.MatchErrorMatcher{ - Expected: expected, + Expected: expected, + FuncErrDescription: functionErrorDescription, } } diff --git a/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/matchers/match_error_matcher.go b/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/matchers/match_error_matcher.go index 827475ea..c539dd38 100644 --- a/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/matchers/match_error_matcher.go +++ b/src/code.cloudfoundry.org/vendor/github.com/onsi/gomega/matchers/match_error_matcher.go @@ -9,10 +9,14 @@ import ( ) type MatchErrorMatcher struct { - Expected interface{} + Expected any + FuncErrDescription []any + isFunc bool } -func (matcher *MatchErrorMatcher) Match(actual interface{}) (success bool, err error) { +func (matcher *MatchErrorMatcher) Match(actual any) (success bool, err error) { + matcher.isFunc = false + if isNil(actual) { return false, fmt.Errorf("Expected an error, got nil") } @@ -42,6 +46,17 @@ func (matcher *MatchErrorMatcher) Match(actual interface{}) (success bool, err e return actualErr.Error() == expected, nil } + v := reflect.ValueOf(expected) + t := v.Type() + errorInterface := reflect.TypeOf((*error)(nil)).Elem() + if t.Kind() == reflect.Func && t.NumIn() == 1 && t.In(0).Implements(errorInterface) && t.NumOut() == 1 && t.Out(0).Kind() == reflect.Bool { + if len(matcher.FuncErrDescription) == 0 { + return false, fmt.Errorf("MatchError requires an additional description when passed a function") + } + matcher.isFunc = true + return v.Call([]reflect.Value{reflect.ValueOf(actualErr)})[0].Bool(), nil + } + var subMatcher omegaMatcher var hasSubMatcher bool if expected != nil { @@ -57,9 +72,15 @@ func (matcher *MatchErrorMatcher) Match(actual interface{}) (success bool, err e } func (matcher *MatchErrorMatcher) FailureMessage(actual interface{}) (message string) { + if matcher.isFunc { + return format.Message(actual, fmt.Sprintf("to match error function %s", matcher.FuncErrDescription[0])) + } return format.Message(actual, "to match error", matcher.Expected) } func (matcher *MatchErrorMatcher) NegatedFailureMessage(actual interface{}) (message string) { + if matcher.isFunc { + return format.Message(actual, fmt.Sprintf("not to match error function %s", matcher.FuncErrDescription[0])) + } return format.Message(actual, "not to match error", matcher.Expected) } diff --git a/src/code.cloudfoundry.org/vendor/modules.txt b/src/code.cloudfoundry.org/vendor/modules.txt index 4d893515..bcc95d4b 100644 --- a/src/code.cloudfoundry.org/vendor/modules.txt +++ b/src/code.cloudfoundry.org/vendor/modules.txt @@ -1,4 +1,4 @@ -# code.cloudfoundry.org/cf-networking-helpers v0.0.0-20231017144728-583bfb3f8b2c +# code.cloudfoundry.org/cf-networking-helpers v0.0.0-20231025144627-414cbe44463b ## explicit; go 1.20 code.cloudfoundry.org/cf-networking-helpers/certauthority code.cloudfoundry.org/cf-networking-helpers/portauthority @@ -16,8 +16,8 @@ filippo.io/edwards25519 filippo.io/edwards25519/field # github.com/fsnotify/fsnotify v1.5.4 ## explicit; go 1.16 -# github.com/go-logr/logr v1.2.4 -## explicit; go 1.16 +# github.com/go-logr/logr v1.3.0 +## explicit; go 1.18 github.com/go-logr/logr github.com/go-logr/logr/funcr # github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 @@ -58,7 +58,7 @@ github.com/nats-io/go-nats/util # github.com/nats-io/jwt/v2 v2.5.2 ## explicit; go 1.18 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.10.3 +# github.com/nats-io/nats-server/v2 v2.10.4 ## explicit; go 1.20 github.com/nats-io/nats-server/v2 github.com/nats-io/nats-server/v2/conf @@ -98,7 +98,7 @@ github.com/onsi/ginkgo/v2/internal/parallel_support github.com/onsi/ginkgo/v2/internal/testingtproxy github.com/onsi/ginkgo/v2/reporters github.com/onsi/ginkgo/v2/types -# github.com/onsi/gomega v1.28.1 +# github.com/onsi/gomega v1.29.0 ## explicit; go 1.18 github.com/onsi/gomega github.com/onsi/gomega/format