Skip to content

Commit

Permalink
Merge pull request #143 from kaleido-io/ffrestyhooksandmetrics
Browse files Browse the repository at this point in the history
ffresty metrics and hooks
  • Loading branch information
peterbroadhurst authored Jun 6, 2024
2 parents 30b1a08 + 4a8cca0 commit f0084b0
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 1 deletion.
66 changes: 65 additions & 1 deletion pkg/ffresty/ffresty.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io"
"net"
"net/http"
"net/url"
"regexp"
"strings"
"time"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-common/pkg/metric"
"github.com/sirupsen/logrus"
)

Expand All @@ -50,6 +52,12 @@ type Config struct {
HTTPConfig
}

var (
metricsManager metric.MetricsManager
onErrorHooks []resty.ErrorHook
onSuccessHooks []resty.SuccessHook
)

// HTTPConfig is all the optional configuration separate to the URL you wish to invoke.
// This is JSON serializable with docs, so you can embed it into API objects.
type HTTPConfig struct {
Expand Down Expand Up @@ -77,6 +85,48 @@ type HTTPConfig struct {
OnBeforeRequest func(req *resty.Request) error `json:"-"` // called before each request, even retry
}

func EnableClientMetrics(ctx context.Context, metricsRegistry metric.MetricsRegistry) error {
// create a metrics manager (if not already set)
if metricsManager == nil {
mm, err := metricsRegistry.NewMetricsManagerForSubsystem(ctx, "resty")
metricsManager = mm
if err != nil {
return err
}
metricsManager.NewCounterMetricWithLabels(ctx, "http_response", "HTTP response", []string{"status", "error", "host", "method"}, false)
metricsManager.NewCounterMetricWithLabels(ctx, "network_error", "Network error", []string{"host", "method"}, false)
}

// create hooks
onErrorMetricsHook := func(req *resty.Request, _ error) {
method := req.Method
u, _ := url.Parse(req.URL)
host := u.Host
// whilst there it is a possibility to get an response returned in the error here (and resty doc for OnError shows this) it seems to be a special case and the statuscode in such cases was not set.
// therefore we log all cases as network_error we may in future find reason to extract more detail from the error
metricsManager.IncCounterMetricWithLabels(ctx, "network_error", map[string]string{"host": host, "method": method}, nil)
}
RegisterGlobalOnError(onErrorMetricsHook)

onSuccessMetricsHook := func(_ *resty.Client, resp *resty.Response) {
method := resp.Request.Method
u, _ := url.Parse(resp.Request.URL)
host := u.Host
code := resp.RawResponse.StatusCode
metricsManager.IncCounterMetricWithLabels(ctx, "http_response", map[string]string{"status": fmt.Sprintf("%d", code), "error": "false", "host": host, "method": method}, nil)
}
RegisterGlobalOnSuccess(onSuccessMetricsHook)
return nil
}

func RegisterGlobalOnError(onError func(req *resty.Request, err error)) {
onErrorHooks = append(onErrorHooks, onError)
}

func RegisterGlobalOnSuccess(onSuccess func(c *resty.Client, resp *resty.Response)) {
onSuccessHooks = append(onSuccessHooks, onSuccess)
}

// OnAfterResponse when using SetDoNotParseResponse(true) for streaming binary replies,
// the caller should invoke ffresty.OnAfterResponse on the response manually.
// The middleware is disabled on this path :-(
Expand All @@ -96,6 +146,18 @@ func OnAfterResponse(c *resty.Client, resp *resty.Response) {
log.L(rCtx).Logf(level, "<== %s %s [%d] (%.2fms)", resp.Request.Method, resp.Request.URL, status, elapsed)
}

func OnError(req *resty.Request, err error) {
for _, hook := range onErrorHooks {
hook(req, err)
}
}

func OnSuccess(c *resty.Client, resp *resty.Response) {
for _, hook := range onSuccessHooks {
hook(c, resp)
}
}

// New creates a new Resty client, using static configuration (from the config file)
// from a given section in the static configuration
//
Expand Down Expand Up @@ -204,9 +266,11 @@ func NewWithConfig(ctx context.Context, ffrestyConfig Config) (client *resty.Cli
})

// Note that callers using SetNotParseResponse will need to invoke this themselves

client.OnAfterResponse(func(c *resty.Client, r *resty.Response) error { OnAfterResponse(c, r); return nil })

client.OnError(func(req *resty.Request, e error) { OnError(req, e) })
client.OnSuccess(func(c *resty.Client, r *resty.Response) { OnSuccess(c, r) })

for k, v := range ffrestyConfig.HTTPHeaders {
if vs, ok := v.(string); ok {
client.SetHeader(k, vs)
Expand Down
105 changes: 105 additions & 0 deletions pkg/ffresty/ffresty_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly-common/pkg/fftls"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/metric"

"github.com/jarcoal/httpmock"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -512,3 +514,106 @@ func TestMTLSClientWithServer(t *testing.T) {
}
cancelCtx()
}

func TestEnableClientMetricsError(t *testing.T) {
ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("testerr")
//claim the "resty subsystem before resty can :/"
_, _ = mr.NewMetricsManagerForSubsystem(ctx, "resty")
err := EnableClientMetrics(ctx, mr)
assert.Error(t, err)
}

func TestEnableClientMetrics(t *testing.T) {

ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")

err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)

}



func TestEnableClientMetricsIdempotent(t *testing.T) {
ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")
_ = EnableClientMetrics(ctx, mr)
err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)
}

func TestHooks(t *testing.T) {

ctx := context.Background()
mr := metric.NewPrometheusMetricsRegistry("test")

err := EnableClientMetrics(ctx, mr)
assert.NoError(t, err)

onErrorCount := 0
onSuccessCount := 0

customOnError := func(req *resty.Request, err error){
onErrorCount++
}

customOnSuccess := func(c *resty.Client, resp *resty.Response){
onSuccessCount++
}

RegisterGlobalOnError(customOnError)
RegisterGlobalOnSuccess(customOnSuccess)

resetConf()
utConf.Set(HTTPConfigURL, "http://localhost:12345")
utConf.Set(HTTPConfigRetryEnabled, false)

c, err := New(ctx, utConf)
assert.Nil(t, err)
httpmock.ActivateNonDefault(c.GetClient())
defer httpmock.DeactivateAndReset()

resText := strings.Builder{}
for i := 0; i < 512; i++ {
resText.WriteByte(byte('a' + (i % 26)))
}
httpmock.RegisterResponder("GET", "http://localhost:12345/testerr",
httpmock.NewErrorResponder(fmt.Errorf("pop")))

httpmock.RegisterResponder("GET", "http://localhost:12345/testerrhttp",
func(req *http.Request) (*http.Response, error) {
return httpmock.NewStringResponse(502, `{"Status": "Service Not Available"}`), fmt.Errorf("Service Not Available")
})

httpmock.RegisterResponder("GET", "http://localhost:12345/testok",
func(req *http.Request) (*http.Response, error) {
return httpmock.NewStringResponder(200, `{"some": "data"}`)(req)
})

resp, err := c.R().Get("/testerr")
err = WrapRestErr(ctx, resp, err, i18n.MsgConfigFailed)
assert.Error(t, err)

assert.Equal(t, onErrorCount, 1)
assert.Equal(t, onSuccessCount, 0)

resp, err = c.R().Get("/testerrhttp")
err = WrapRestErr(ctx, resp, err, i18n.MsgConfigFailed)
assert.Error(t, err)

assert.Equal(t, onErrorCount, 2)
assert.Equal(t, onSuccessCount, 0)

resp, err = c.R().Get("/testok")
assert.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode())
assert.Equal(t, `{"some": "data"}`, resp.String())

assert.Equal(t, 3, httpmock.GetTotalCallCount())

assert.Equal(t, onErrorCount, 2)
assert.Equal(t, onSuccessCount, 1)

}

0 comments on commit f0084b0

Please sign in to comment.