diff --git a/plugins/inputs/firehose/firehose.go b/plugins/inputs/firehose/firehose.go index 166747257d74b..0ed708433a991 100644 --- a/plugins/inputs/firehose/firehose.go +++ b/plugins/inputs/firehose/firehose.go @@ -122,11 +122,19 @@ func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) { return } - requestID := req.Header.Get("x-amz-firehose-request-id") r := &request{req: req} + requestID := r.req.Header.Get("x-amz-firehose-request-id") + if requestID == "" { + r.res.statusCode = http.StatusBadRequest + f.Log.Errorf("x-amz-firehose-request-id header is not set") + if err := r.sendResponse(res); err != nil { + f.Log.Errorf("sending response failed: %v", err) + } + return + } if err := r.validate(); err != nil { - f.Log.Errorf("validation failed: %v", err) + f.Log.Errorf("validation failed for request %q: %v", requestID, err) if err = r.sendResponse(res); err != nil { f.Log.Errorf("sending response to request %q failed: %v", requestID, err) } @@ -134,7 +142,7 @@ func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) { } if err := r.authenticate(f.AccessKey); err != nil { - f.Log.Errorf("authentication failed: %v", err) + f.Log.Errorf("authentication failed for request %q: %v", requestID, err) if err = r.sendResponse(res); err != nil { f.Log.Errorf("sending response to request %q failed: %v", requestID, err) } @@ -150,6 +158,16 @@ func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) { return } + paramTags, err := r.extractParameterTags(f.ParameterTags) + if err != nil { + f.Log.Errorf("extracting parameter tags for request %q failed: %v", requestID, err) + r.res.statusCode = http.StatusBadRequest + if err = r.sendResponse(res); err != nil { + f.Log.Errorf("sending response to request %q failed: %v", requestID, err) + } + return + } + var metrics []telegraf.Metric for _, record := range records { m, err := f.parser.Parse(record) @@ -172,16 +190,6 @@ func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) { return } - paramTags, err := r.extractParameterTags(f.ParameterTags) - if err != nil { - f.Log.Errorf("extracting parameter tags for request %q failed: %v", requestID, err) - r.res.statusCode = http.StatusBadRequest - if err = r.sendResponse(res); err != nil { - f.Log.Errorf("sending response to request %q failed: %v", requestID, err) - } - return - } - for _, m := range metrics { for k, v := range paramTags { m.AddTag(k, v) diff --git a/plugins/inputs/firehose/firehose_request.go b/plugins/inputs/firehose/firehose_request.go index ea21cd80f97a2..66f37f77abcbe 100644 --- a/plugins/inputs/firehose/firehose_request.go +++ b/plugins/inputs/firehose/firehose_request.go @@ -54,23 +54,17 @@ func (r *request) authenticate(expected config.Secret) error { } if !match { r.res.statusCode = http.StatusUnauthorized - return fmt.Errorf("unauthorized request %s from %v", r.req.Header.Get("x-amz-firehose-request-id"), r.req.RemoteAddr) + return fmt.Errorf("unauthorized request from %v", r.req.RemoteAddr) } return nil } func (r *request) validate() error { - requestID := r.req.Header.Get("x-amz-firehose-request-id") - if requestID == "" { - r.res.statusCode = http.StatusBadRequest - return errors.New("x-amz-firehose-request-id header is not set") - } - // The maximum body size can be up to a maximum of 64 MiB. // https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html if r.req.ContentLength > int64(64*1024*1024) { r.res.statusCode = http.StatusRequestEntityTooLarge - return fmt.Errorf("content length too large in request %s", requestID) + return errors.New("content length is too large") } switch r.req.Method { @@ -78,20 +72,20 @@ func (r *request) validate() error { // Do nothing, those methods are allowed default: r.res.statusCode = http.StatusMethodNotAllowed - return fmt.Errorf("method %q in request %q is not allowed", r.req.Method, requestID) + return fmt.Errorf("method %q is not allowed", r.req.Method) } contentType := r.req.Header.Get("content-type") if contentType != "application/json" { r.res.statusCode = http.StatusBadRequest - return fmt.Errorf("unaccepted content type, %s, in request %s", contentType, requestID) + return fmt.Errorf("content type %s is not allowed", contentType) } encoding := r.req.Header.Get("content-encoding") body, err := internal.NewStreamContentDecoder(encoding, r.req.Body) if err != nil { r.res.statusCode = http.StatusBadRequest - return fmt.Errorf("creating decoder for %q failed: %w", encoding, err) + return fmt.Errorf("creating %q decoder failed: %w", encoding, err) } defer r.req.Body.Close() @@ -100,9 +94,9 @@ func (r *request) validate() error { return fmt.Errorf("decode body failed: %w", err) } - if requestID != r.body.RequestID { + if r.body.RequestID != r.req.Header.Get("x-amz-firehose-request-id") { r.res.statusCode = http.StatusBadRequest - return errors.New("requestId in the body does not match the value of the request header, x-amz-firehose-request-id") + return errors.New("requestId in the body does not match x-amz-firehose-request-id request header") } return nil @@ -130,15 +124,15 @@ func (r *request) extractParameterTags(parameterTags []string) (map[string]strin } var parameters map[string]interface{} if err := json.Unmarshal([]byte(attributesHeader), ¶meters); err != nil { - return nil, fmt.Errorf("decode json data in x-amz-firehose-common-attributes header from request %q failed: %w", r.body.RequestID, err) + return nil, fmt.Errorf("decode json data in x-amz-firehose-common-attributes header failed: %w", err) } paramsRaw, ok := parameters["commonAttributes"] if !ok { - return nil, errors.New("commonAttributes key not found in json data in x-amz-firehose-common-attributes header from request %q") + return nil, errors.New("commonAttributes key not found in json data in x-amz-firehose-common-attributes header") } parameters, ok = paramsRaw.(map[string]interface{}) if !ok { - return nil, fmt.Errorf("parse parameters data from request %q failed", r.body.RequestID) + return nil, errors.New("parse parameters data failed") } for _, param := range parameterTags { if value, ok := parameters[param]; ok {