Skip to content

Commit

Permalink
close body
Browse files Browse the repository at this point in the history
  • Loading branch information
panshuai111 committed Jan 11, 2024
1 parent 0cb0016 commit c653ae0
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 17 deletions.
1 change: 1 addition & 0 deletions pkg/infra/search/storage/elasticsearch/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func createIndex(client *elasticsearch.Client, mapping string, indexName string)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.IsError() {
msg := resp.String()
if strings.Contains(resp.String(), "resource_already_exists_exception") {
Expand Down
20 changes: 10 additions & 10 deletions pkg/infra/search/storage/elasticsearch/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ func (s *ESClient) insertObj(ctx context.Context, cluster string, obj runtime.Ob
Body: bytes.NewReader(body),
Index: s.indexName,
}
res, err := req.Do(ctx, s.client)
resp, err := req.Do(ctx, s.client)
if err != nil {
return err
}

if res.IsError() {
defer resp.Body.Close()
if resp.IsError() {
return &ESError{
StatusCode: res.StatusCode,
Message: res.String(),
StatusCode: resp.StatusCode,
Message: resp.String(),
}
}
return nil
Expand All @@ -98,15 +98,15 @@ func (s *ESClient) delete(ctx context.Context, body io.Reader) error {
Index: []string{s.indexName},
Body: body,
}
res, err := req.Do(ctx, s.client)
resp, err := req.Do(ctx, s.client)
if err != nil {
return err
}

if res.IsError() {
defer resp.Body.Close()
if resp.IsError() {
return &ESError{
StatusCode: res.StatusCode,
Message: res.String(),
StatusCode: resp.StatusCode,
Message: resp.String(),
}
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/infra/search/storage/elasticsearch/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,25 @@ func (s *ESClient) searchByQuery(ctx context.Context, query map[string]interface

func (s *ESClient) search(ctx context.Context, body io.Reader, pageSize, page int) (*SearchResponse, error) {
from := (page - 1) * pageSize
res, err := s.client.Search(
resp, err := s.client.Search(
s.client.Search.WithContext(ctx),
s.client.Search.WithIndex(s.indexName),
s.client.Search.WithBody(body),
s.client.Search.WithSize(pageSize),
s.client.Search.WithFrom(from),
)
defer res.Body.Close()
if err != nil {
return nil, err
}
if res.IsError() {
defer resp.Body.Close()
if resp.IsError() {
return nil, &ESError{
StatusCode: res.StatusCode,
Message: res.String(),
StatusCode: resp.StatusCode,
Message: resp.String(),
}
}
sr := &SearchResponse{}
if err := json.NewDecoder(res.Body).Decode(sr); err != nil {
if err := json.NewDecoder(resp.Body).Decode(sr); err != nil {
return nil, err
}
return sr, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *ResourceSyncer) processNextWorkItem(ctx context.Context) bool {
}

if err := s.sync(ctx, key); err != nil {
s.queue.AddAfter(key, 5*time.Second)
s.queue.AddRateLimited(key)
return errors.Wrapf(err, "error syncing '%s/%s', requeuing", s.source.SyncRule().Resource, key)
}

Expand Down

0 comments on commit c653ae0

Please sign in to comment.