Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a check that can check a request-reply service #625

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions monitor/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package monitor

import (
"regexp"
"time"

"github.com/nats-io/nats.go"
)

type CheckRequestOptions struct {
// Subject is the subject to send the request to
Subject string `json:"subject" yaml:"subject"`
// Payload is the payload to send to the service
Payload string `json:"payload" yaml:"payload"`
// Header is headers to send with the request
Header map[string]string `json:"header" yaml:"header"`
//HeaderMatch to send in the payload
HeaderMatch map[string]string `json:"headers" yaml:"headers"`
// ResponseMatch applies regular expression match against the payload
ResponseMatch string `json:"response_match" yaml:"response_match"`
// ResponseTimeWarn warns when the response takes longer than a certain time
ResponseTimeWarn time.Duration `json:"response_time_warn" yaml:"response_time_warn"`
// ResponseTimeCritical logs critical when the response takes longer than a certain time
ResponseTimeCritical time.Duration `json:"response_time_crit" yaml:"response_time_crit"`
}

func CheckRequest(server string, nopts []nats.Option, check *Result, timeout time.Duration, opts CheckRequestOptions) error {
nc, err := nats.Connect(server, nopts...)
if check.CriticalIfErr(err, "could not load info: %v", err) {
return nil
}

if opts.Subject == "" {
check.Critical("no subject specified")
return nil
}

msg := nats.NewMsg(opts.Subject)
msg.Data = []byte(opts.Payload)
for k, v := range opts.Header {
msg.Header.Add(k, v)
}

start := time.Now()
resp, err := nc.RequestMsg(msg, timeout)
since := time.Since(start)

check.Pd(&PerfDataItem{
Help: "How long the request took",
Name: "time",
Value: float64(since.Round(time.Millisecond).Seconds()),
Warn: opts.ResponseTimeWarn.Seconds(),
Crit: opts.ResponseTimeCritical.Seconds(),
Unit: "s",
})
if check.CriticalIfErr(err, "could not send request: %v", err) {
return nil
}

if opts.ResponseMatch != "" {
re, err := regexp.Compile(opts.ResponseMatch)
if check.CriticalIfErr(err, "content regex compile failed: %v", err) {
return nil
}

if !re.Match(resp.Data) {
check.Critical("response does not match regexp")
}
}

for k, v := range opts.HeaderMatch {
rv := resp.Header.Get(k)
if rv != v {
check.Critical("invalid header %q = %q", k, rv)
}
}

if opts.ResponseTimeCritical > 0 && since > opts.ResponseTimeCritical {
check.Critical("response took %v", since.Round(time.Millisecond))
} else if opts.ResponseTimeWarn > 0 && since > opts.ResponseTimeWarn {
check.Warn("response took %v", since.Round(time.Millisecond))
}

check.OkIfNoWarningsOrCriticals("Valid response")

return nil
}
138 changes: 138 additions & 0 deletions monitor/request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package monitor_test

import (
"regexp"
"testing"
"time"

"github.com/nats-io/jsm.go/monitor"
"github.com/nats-io/nats-server/v2/server"
"github.com/nats-io/nats.go"
)

func TestCheckRequest(t *testing.T) {
t.Run("Body match", func(t *testing.T) {
withJetStream(t, func(srv *server.Server, nc *nats.Conn) {
check := &monitor.Result{}

_, err := nc.Subscribe("test", func(msg *nats.Msg) {
msg.Respond([]byte("test payload"))
})
assertNoError(t, err)

assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
ResponseMatch: "no match",
}))
assertListIsEmpty(t, check.OKs)
assertListIsEmpty(t, check.Warnings)
assertListEquals(t, check.Criticals, "response does not match regexp")

check = &monitor.Result{}
assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
ResponseMatch: ".+payload",
}))
assertListIsEmpty(t, check.Criticals)
assertListIsEmpty(t, check.Warnings)
assertListEquals(t, check.OKs, "Valid response")
})
})

t.Run("Headers", func(t *testing.T) {
withJetStream(t, func(srv *server.Server, nc *nats.Conn) {
check := &monitor.Result{}

_, err := nc.Subscribe("test", func(msg *nats.Msg) {
rmsg := nats.NewMsg(msg.Reply)
rmsg.Header.Add("test", "test header")
msg.RespondMsg(rmsg)
})
assertNoError(t, err)

assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
HeaderMatch: map[string]string{"test": "no match", "other": "header"},
}))
assertListIsEmpty(t, check.OKs)
assertListIsEmpty(t, check.Warnings)
assertListEquals(t, check.Criticals, `invalid header "other" = ""`, `invalid header "test" = "test header"`)

check = &monitor.Result{}
assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
HeaderMatch: map[string]string{"test": "test header"},
}))
assertListIsEmpty(t, check.Criticals)
assertListIsEmpty(t, check.Warnings)
assertListEquals(t, check.OKs, "Valid response")
})
})

t.Run("Response Time", func(t *testing.T) {
withJetStream(t, func(srv *server.Server, nc *nats.Conn) {
check := &monitor.Result{}
_, err := nc.Subscribe("test", func(msg *nats.Msg) {
time.Sleep(500 * time.Millisecond)
msg.Respond([]byte("test payload"))
})
assertNoError(t, err)

assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
ResponseTimeWarn: 20 * time.Millisecond,
ResponseTimeCritical: time.Second,
}))
assertListIsEmpty(t, check.Criticals)
assertListIsEmpty(t, check.OKs)
if len(check.Warnings) != 1 {
t.Fatalf("expected 1 warning, got %d", len(check.Warnings))
}
m, err := regexp.MatchString("^response took \\d+ms", check.Warnings[0])
assertNoError(t, err)
if !m {
t.Fatalf("warning not match %s", check.Warnings[0])
}

check = &monitor.Result{}
assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
ResponseTimeWarn: 20 * time.Millisecond,
ResponseTimeCritical: 400 * time.Millisecond,
}))
assertListIsEmpty(t, check.Warnings)
assertListIsEmpty(t, check.OKs)
if len(check.Criticals) != 1 {
t.Fatalf("expected 1 warning, got %d", len(check.Criticals))
}
m, err = regexp.MatchString("^response took \\d+ms", check.Criticals[0])
assertNoError(t, err)
if !m {
t.Fatalf("warning not match %s", check.Criticals[0])
}

check = &monitor.Result{}
assertNoError(t, monitor.CheckRequest(srv.ClientURL(), nil, check, time.Second, monitor.CheckRequestOptions{
Subject: "test",
ResponseTimeWarn: 800 * time.Millisecond,
ResponseTimeCritical: 1000 * time.Millisecond,
}))
assertListIsEmpty(t, check.Warnings)
assertListIsEmpty(t, check.Criticals)
assertListEquals(t, check.OKs, "Valid response")
})
})
}
6 changes: 6 additions & 0 deletions monitor/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ func (r *Result) Ok(format string, a ...any) {
r.OKs = append(r.OKs, fmt.Sprintf(format, a...))
}

func (r *Result) OkIfNoWarningsOrCriticals(format string, a ...any) {
if len(r.Warnings) == 0 && len(r.Criticals) == 0 {
r.Ok(format, a...)
}
}

func (r *Result) CriticalExitIfErr(err error, format string, a ...any) bool {
if err == nil {
return false
Expand Down
12 changes: 5 additions & 7 deletions monitor/stream_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,10 @@ func CheckStreamMessage(server string, nopts []nats.Option, check *Result, opts

since := time.Since(ts)

if opts.AgeWarning > 0 || opts.AgeCritical > 0 {
if opts.AgeCritical > 0 && since > secondsToDuration(opts.AgeCritical) {
check.Critical("%v old", since.Round(time.Millisecond))
} else if opts.AgeWarning > 0 && since > secondsToDuration(opts.AgeWarning) {
check.Warn("%v old", time.Since(ts).Round(time.Millisecond))
}
if opts.AgeCritical > 0 && since > secondsToDuration(opts.AgeCritical) {
check.Critical("%v old", since.Round(time.Millisecond))
} else if opts.AgeWarning > 0 && since > secondsToDuration(opts.AgeWarning) {
check.Warn("%v old", time.Since(ts).Round(time.Millisecond))
}

if opts.Content != "" {
Expand All @@ -104,7 +102,7 @@ func CheckStreamMessage(server string, nopts []nats.Option, check *Result, opts
}
}

check.Ok("Valid message on %s > %s", opts.StreamName, opts.Subject)
check.OkIfNoWarningsOrCriticals("Valid message on %s > %s", opts.StreamName, opts.Subject)

return nil
}