From 2efcc5524ae247a5897a39ab07bc9ca94cd6cf12 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Tue, 4 Feb 2025 10:19:42 +0100 Subject: [PATCH] Adds a check that can check a request-reply service Signed-off-by: R.I.Pienaar --- monitor/request.go | 100 +++++++++++++++++++++++++++++ monitor/request_test.go | 138 ++++++++++++++++++++++++++++++++++++++++ monitor/result.go | 6 ++ monitor/stream_msg.go | 12 ++-- 4 files changed, 249 insertions(+), 7 deletions(-) create mode 100644 monitor/request.go create mode 100644 monitor/request_test.go diff --git a/monitor/request.go b/monitor/request.go new file mode 100644 index 0000000..1817fd7 --- /dev/null +++ b/monitor/request.go @@ -0,0 +1,100 @@ +// Copyright 2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitor + +import ( + "regexp" + "time" + + "github.com/nats-io/nats.go" +) + +type CheckRequestOptions struct { + // Subject is the subject to send the request to + Subject string `json:"subject" yaml:"subject"` + // Payload is the payload to send to the service + Payload string `json:"payload" yaml:"payload"` + // Header is headers to send with the request + Header map[string]string `json:"header" yaml:"header"` + //HeaderMatch to send in the payload + HeaderMatch map[string]string `json:"headers" yaml:"headers"` + // ResponseMatch applies regular expression match against the payload + ResponseMatch string `json:"response_match" yaml:"response_match"` + // ResponseTimeWarn warns when the response takes longer than a certain time + ResponseTimeWarn time.Duration `json:"response_time_warn" yaml:"response_time_warn"` + // ResponseTimeCritical logs critical when the response takes longer than a certain time + ResponseTimeCritical time.Duration `json:"response_time_crit" yaml:"response_time_crit"` +} + +func CheckRequest(server string, nopts []nats.Option, check *Result, timeout time.Duration, opts CheckRequestOptions) error { + nc, err := nats.Connect(server, nopts...) + if check.CriticalIfErr(err, "could not load info: %v", err) { + return nil + } + + if opts.Subject == "" { + check.Critical("no subject specified") + return nil + } + + msg := nats.NewMsg(opts.Subject) + msg.Data = []byte(opts.Payload) + for k, v := range opts.Header { + msg.Header.Add(k, v) + } + + start := time.Now() + resp, err := nc.RequestMsg(msg, timeout) + since := time.Since(start) + + check.Pd(&PerfDataItem{ + Help: "How long the request took", + Name: "time", + Value: float64(since.Round(time.Millisecond).Seconds()), + Warn: opts.ResponseTimeWarn.Seconds(), + Crit: opts.ResponseTimeCritical.Seconds(), + Unit: "s", + }) + if check.CriticalIfErr(err, "could not send request: %v", err) { + return nil + } + + if opts.ResponseMatch != "" { + re, err := regexp.Compile(opts.ResponseMatch) + if check.CriticalIfErr(err, "content regex compile failed: %v", err) { + return nil + } + + if !re.Match(resp.Data) { + check.Critical("response does not match regexp") + } + } + + for k, v := range opts.HeaderMatch { + rv := resp.Header.Get(k) + if rv != v { + check.Critical("invalid header %q = %q", k, rv) + } + } + + if opts.ResponseTimeCritical > 0 && since > opts.ResponseTimeCritical { + check.Critical("response took %v", since.Round(time.Millisecond)) + } else if opts.ResponseTimeWarn > 0 && since > opts.ResponseTimeWarn { + check.Warn("response took %v", since.Round(time.Millisecond)) + } + + check.OkIfNoWarningsOrCriticals("Valid response") + + return nil +} diff --git a/monitor/request_test.go b/monitor/request_test.go new file mode 100644 index 0000000..f002ea8 --- /dev/null +++ b/monitor/request_test.go @@ -0,0 +1,138 @@ +// Copyright 2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitor_test + +import ( + "regexp" + "testing" + "time" + + "github.com/nats-io/jsm.go/monitor" + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" +) + +func TestCheckRequest(t *testing.T) { + t.Run("Body match", func(t *testing.T) { + withJetStream(t, func(srv *server.Server, nc *nats.Conn) { + check := &monitor.Result{} + + _, err := nc.Subscribe("test", func(msg *nats.Msg) { + msg.Respond([]byte("test payload")) + }) + assertNoError(t, err) + + assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{ + Subject: "test", + ResponseMatch: "no match", + })) + assertListIsEmpty(t, check.OKs) + assertListIsEmpty(t, check.Warnings) + assertListEquals(t, check.Criticals, "response does not match regexp") + + check = &monitor.Result{} + assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{ + Subject: "test", + ResponseMatch: ".+payload", + })) + assertListIsEmpty(t, check.Criticals) + assertListIsEmpty(t, check.Warnings) + assertListEquals(t, check.OKs, "Valid response") + }) + }) + + t.Run("Headers", func(t *testing.T) { + withJetStream(t, func(srv *server.Server, nc *nats.Conn) { + check := &monitor.Result{} + + _, err := nc.Subscribe("test", func(msg *nats.Msg) { + rmsg := nats.NewMsg(msg.Reply) + rmsg.Header.Add("test", "test header") + msg.RespondMsg(rmsg) + }) + assertNoError(t, err) + + assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{ + Subject: "test", + HeaderMatch: map[string]string{"test": "no match", "other": "header"}, + })) + assertListIsEmpty(t, check.OKs) + assertListIsEmpty(t, check.Warnings) + assertListEquals(t, check.Criticals, `invalid header "other" = ""`, `invalid header "test" = "test header"`) + + check = &monitor.Result{} + assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{ + Subject: "test", + HeaderMatch: map[string]string{"test": "test header"}, + })) + assertListIsEmpty(t, check.Criticals) + assertListIsEmpty(t, check.Warnings) + assertListEquals(t, check.OKs, "Valid response") + }) + }) + + t.Run("Response Time", func(t *testing.T) { + withJetStream(t, func(srv *server.Server, nc *nats.Conn) { + check := &monitor.Result{} + _, err := nc.Subscribe("test", func(msg *nats.Msg) { + time.Sleep(500 * time.Millisecond) + msg.Respond([]byte("test payload")) + }) + assertNoError(t, err) + + assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{ + Subject: "test", + ResponseTimeWarn: 20 * time.Millisecond, + ResponseTimeCritical: time.Second, + })) + assertListIsEmpty(t, check.Criticals) + assertListIsEmpty(t, check.OKs) + if len(check.Warnings) != 1 { + t.Fatalf("expected 1 warning, got %d", len(check.Warnings)) + } + m, err := regexp.MatchString("^response took \\d+ms", check.Warnings[0]) + assertNoError(t, err) + if !m { + t.Fatalf("warning not match %s", check.Warnings[0]) + } + + check = &monitor.Result{} + assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{ + Subject: "test", + ResponseTimeWarn: 20 * time.Millisecond, + ResponseTimeCritical: 400 * time.Millisecond, + })) + assertListIsEmpty(t, check.Warnings) + assertListIsEmpty(t, check.OKs) + if len(check.Criticals) != 1 { + t.Fatalf("expected 1 warning, got %d", len(check.Criticals)) + } + m, err = regexp.MatchString("^response took \\d+ms", check.Criticals[0]) + assertNoError(t, err) + if !m { + t.Fatalf("warning not match %s", check.Criticals[0]) + } + + check = &monitor.Result{} + assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{ + Subject: "test", + ResponseTimeWarn: 800 * time.Millisecond, + ResponseTimeCritical: 1000 * time.Millisecond, + })) + assertListIsEmpty(t, check.Warnings) + assertListIsEmpty(t, check.Criticals) + assertListEquals(t, check.OKs, "Valid response") + }) + }) +} diff --git a/monitor/result.go b/monitor/result.go index 525e4dd..317b148 100644 --- a/monitor/result.go +++ b/monitor/result.go @@ -75,6 +75,12 @@ func (r *Result) Ok(format string, a ...any) { r.OKs = append(r.OKs, fmt.Sprintf(format, a...)) } +func (r *Result) OkIfNoWarningsOrCriticals(format string, a ...any) { + if len(r.Warnings) == 0 && len(r.Criticals) == 0 { + r.Ok(format, a...) + } +} + func (r *Result) CriticalExitIfErr(err error, format string, a ...any) bool { if err == nil { return false diff --git a/monitor/stream_msg.go b/monitor/stream_msg.go index a2602ea..526610c 100644 --- a/monitor/stream_msg.go +++ b/monitor/stream_msg.go @@ -85,12 +85,10 @@ func CheckStreamMessage(server string, nopts []nats.Option, check *Result, opts since := time.Since(ts) - if opts.AgeWarning > 0 || opts.AgeCritical > 0 { - if opts.AgeCritical > 0 && since > secondsToDuration(opts.AgeCritical) { - check.Critical("%v old", since.Round(time.Millisecond)) - } else if opts.AgeWarning > 0 && since > secondsToDuration(opts.AgeWarning) { - check.Warn("%v old", time.Since(ts).Round(time.Millisecond)) - } + if opts.AgeCritical > 0 && since > secondsToDuration(opts.AgeCritical) { + check.Critical("%v old", since.Round(time.Millisecond)) + } else if opts.AgeWarning > 0 && since > secondsToDuration(opts.AgeWarning) { + check.Warn("%v old", time.Since(ts).Round(time.Millisecond)) } if opts.Content != "" { @@ -104,7 +102,7 @@ func CheckStreamMessage(server string, nopts []nats.Option, check *Result, opts } } - check.Ok("Valid message on %s > %s", opts.StreamName, opts.Subject) + check.OkIfNoWarningsOrCriticals("Valid message on %s > %s", opts.StreamName, opts.Subject) return nil }