Skip to content

Commit

Permalink
feat(inputs/firehose): update
Browse files Browse the repository at this point in the history
  • Loading branch information
syedmhashim committed Nov 5, 2024
1 parent c6b6bcc commit 3241fdc
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 29 deletions.
34 changes: 21 additions & 13 deletions plugins/inputs/firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,27 @@ func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) {
return
}

requestID := req.Header.Get("x-amz-firehose-request-id")
r := &request{req: req}
requestID := r.req.Header.Get("x-amz-firehose-request-id")
if requestID == "" {
r.res.statusCode = http.StatusBadRequest
f.Log.Errorf("x-amz-firehose-request-id header is not set")
if err := r.sendResponse(res); err != nil {
f.Log.Errorf("sending response failed: %v", err)
}
return
}

if err := r.validate(); err != nil {
f.Log.Errorf("validation failed: %v", err)
f.Log.Errorf("validation failed for request %q: %v", requestID, err)
if err = r.sendResponse(res); err != nil {
f.Log.Errorf("sending response to request %q failed: %v", requestID, err)
}
return
}

if err := r.authenticate(f.AccessKey); err != nil {
f.Log.Errorf("authentication failed: %v", err)
f.Log.Errorf("authentication failed for request %q: %v", requestID, err)
if err = r.sendResponse(res); err != nil {
f.Log.Errorf("sending response to request %q failed: %v", requestID, err)
}
Expand All @@ -150,6 +158,16 @@ func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) {
return
}

paramTags, err := r.extractParameterTags(f.ParameterTags)
if err != nil {
f.Log.Errorf("extracting parameter tags for request %q failed: %v", requestID, err)
r.res.statusCode = http.StatusBadRequest
if err = r.sendResponse(res); err != nil {
f.Log.Errorf("sending response to request %q failed: %v", requestID, err)
}
return
}

var metrics []telegraf.Metric
for _, record := range records {
m, err := f.parser.Parse(record)
Expand All @@ -172,16 +190,6 @@ func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) {
return
}

paramTags, err := r.extractParameterTags(f.ParameterTags)
if err != nil {
f.Log.Errorf("extracting parameter tags for request %q failed: %v", requestID, err)
r.res.statusCode = http.StatusBadRequest
if err = r.sendResponse(res); err != nil {
f.Log.Errorf("sending response to request %q failed: %v", requestID, err)
}
return
}

for _, m := range metrics {
for k, v := range paramTags {
m.AddTag(k, v)
Expand Down
26 changes: 10 additions & 16 deletions plugins/inputs/firehose/firehose_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,44 +54,38 @@ func (r *request) authenticate(expected config.Secret) error {
}
if !match {
r.res.statusCode = http.StatusUnauthorized
return fmt.Errorf("unauthorized request %s from %v", r.req.Header.Get("x-amz-firehose-request-id"), r.req.RemoteAddr)
return fmt.Errorf("unauthorized request from %v", r.req.RemoteAddr)
}
return nil
}

func (r *request) validate() error {
requestID := r.req.Header.Get("x-amz-firehose-request-id")
if requestID == "" {
r.res.statusCode = http.StatusBadRequest
return errors.New("x-amz-firehose-request-id header is not set")
}

// The maximum body size can be up to a maximum of 64 MiB.
// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html
if r.req.ContentLength > int64(64*1024*1024) {
r.res.statusCode = http.StatusRequestEntityTooLarge
return fmt.Errorf("content length too large in request %s", requestID)
return errors.New("content length is too large")
}

switch r.req.Method {
case http.MethodPost, http.MethodPut:
// Do nothing, those methods are allowed
default:
r.res.statusCode = http.StatusMethodNotAllowed
return fmt.Errorf("method %q in request %q is not allowed", r.req.Method, requestID)
return fmt.Errorf("method %q is not allowed", r.req.Method)
}

contentType := r.req.Header.Get("content-type")
if contentType != "application/json" {
r.res.statusCode = http.StatusBadRequest
return fmt.Errorf("unaccepted content type, %s, in request %s", contentType, requestID)
return fmt.Errorf("content type %s is not allowed", contentType)
}

encoding := r.req.Header.Get("content-encoding")
body, err := internal.NewStreamContentDecoder(encoding, r.req.Body)
if err != nil {
r.res.statusCode = http.StatusBadRequest
return fmt.Errorf("creating decoder for %q failed: %w", encoding, err)
return fmt.Errorf("creating %q decoder failed: %w", encoding, err)
}
defer r.req.Body.Close()

Expand All @@ -100,9 +94,9 @@ func (r *request) validate() error {
return fmt.Errorf("decode body failed: %w", err)
}

if requestID != r.body.RequestID {
if r.body.RequestID != r.req.Header.Get("x-amz-firehose-request-id") {
r.res.statusCode = http.StatusBadRequest
return errors.New("requestId in the body does not match the value of the request header, x-amz-firehose-request-id")
return errors.New("requestId in the body does not match x-amz-firehose-request-id request header")
}

return nil
Expand Down Expand Up @@ -130,15 +124,15 @@ func (r *request) extractParameterTags(parameterTags []string) (map[string]strin
}
var parameters map[string]interface{}
if err := json.Unmarshal([]byte(attributesHeader), &parameters); err != nil {
return nil, fmt.Errorf("decode json data in x-amz-firehose-common-attributes header from request %q failed: %w", r.body.RequestID, err)
return nil, fmt.Errorf("decode json data in x-amz-firehose-common-attributes header failed: %w", err)
}
paramsRaw, ok := parameters["commonAttributes"]
if !ok {
return nil, errors.New("commonAttributes key not found in json data in x-amz-firehose-common-attributes header from request %q")
return nil, errors.New("commonAttributes key not found in json data in x-amz-firehose-common-attributes header")
}
parameters, ok = paramsRaw.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("parse parameters data from request %q failed", r.body.RequestID)
return nil, errors.New("parse parameters data failed")
}
for _, param := range parameterTags {
if value, ok := parameters[param]; ok {
Expand Down

0 comments on commit 3241fdc

Please sign in to comment.