Skip to content

Commit

Permalink
temporary: detect schema registry HTTP errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
hchienjo committed Feb 15, 2024
1 parent 6c34fe4 commit 2a59ca5
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
5 changes: 4 additions & 1 deletion avroregistry/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"log"
"net/http"

"github.com/heetch/avro"
)
Expand Down Expand Up @@ -74,7 +76,8 @@ func (r decodingRegistry) DecodeSchemaID(msg []byte) (int64, []byte) {
//
// See https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id
func (r decodingRegistry) SchemaForID(ctx context.Context, id int64) (*avro.Type, error) {
req := r.r.newRequest(ctx, "GET", fmt.Sprintf("/schemas/ids/%d", id), nil)
req := r.r.newRequest(ctx, http.MethodGet, fmt.Sprintf("/schemas/ids/%d", id), nil)
log.Printf("=======> %d getting schema with ID: %s", id, req.URL.String())
var resp struct {
Schema string `json:"schema"`
}
Expand Down
33 changes: 28 additions & 5 deletions avroregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"strconv"
"syscall"
"time"

"github.com/heetch/avro"
Expand Down Expand Up @@ -102,7 +106,7 @@ func (r *Registry) Register(ctx context.Context, subject string, schema *avro.Ty
if err != nil {
return 0, err
}
req := r.newRequest(ctx, "POST", fmt.Sprintf("/subjects/%s/versions", subject), bytes.NewReader(data))
req := r.newRequest(ctx, http.MethodPost, fmt.Sprintf("/subjects/%s/versions", subject), bytes.NewReader(data))
var resp struct {
ID int64 `json:"id"`
}
Expand All @@ -122,14 +126,14 @@ func (r *Registry) SetCompatibility(ctx context.Context, subject string, mode av
if err != nil {
return err
}
return r.doRequest(r.newRequest(ctx, "PUT", "/config/"+subject, bytes.NewReader(data)), nil)
return r.doRequest(r.newRequest(ctx, http.MethodPut, "/config/"+subject, bytes.NewReader(data)), nil)
}

// DeleteSubject deletes the given subject from the registry.
//
// See https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)
func (r *Registry) DeleteSubject(ctx context.Context, subject string) error {
return r.doRequest(r.newRequest(ctx, "DELETE", "/subjects/"+subject, nil), nil)
return r.doRequest(r.newRequest(ctx, http.MethodDelete, "/subjects/"+subject, nil), nil)
}

// Schema gets a specific version of the schema registered under this subject
Expand Down Expand Up @@ -166,7 +170,17 @@ func validateVersion(version string) error {
}

func (r *Registry) newRequest(ctx context.Context, method string, urlStr string, body io.Reader) *http.Request {
req, err := http.NewRequestWithContext(ctx, method, r.params.ServerURL+urlStr, body)
var url string
if method == http.MethodGet {
if u, ok := os.LookupEnv("OVERRIDE_REGISTRY"); ok {
url = u
} else {
url = r.params.ServerURL
}
} else {
url = r.params.ServerURL
}
req, err := http.NewRequestWithContext(ctx, method, url+urlStr, body)
if err != nil {
// Should never happen, as we've checked the URL for validity when
// creating the registry instance.
Expand All @@ -184,11 +198,20 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error {
if r.params.Username != "" {
req.SetBasicAuth(r.params.Username, r.params.Password)
}
log.Printf("===> foo request: %s: %s", req.Method, req.URL)
ctx := req.Context()
attempt := retry.StartWithCancel(r.params.RetryStrategy, nil, ctx.Done())
for attempt.Next() {
resp, err := http.DefaultClient.Do(req)
if err != nil {
var urlError *url.Error
if errors.As(err, &urlError) {
log.Printf("=====> net error: %v: temporary: %t, timeout: %t: detected: %t", urlError, urlError.Temporary(), urlError.Timeout(), isTemporaryError(err))
}
if errors.Is(err, syscall.ECONNREFUSED) {
log.Printf("===> connection refused detected")
err = errors.Join(err, fmt.Errorf("should be a retryable error this one"))

Check failure on line 213 in avroregistry/registry.go

View workflow job for this annotation

GitHub Actions / test

undefined: errors.Join (typecheck)

Check failure on line 213 in avroregistry/registry.go

View workflow job for this annotation

GitHub Actions / test

undefined: errors.Join) (typecheck)

Check failure on line 213 in avroregistry/registry.go

View workflow job for this annotation

GitHub Actions / test

undefined: errors.Join) (typecheck)
}
if !attempt.More() || !isTemporaryError(err) {
return err
}
Expand All @@ -201,7 +224,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error {
if !attempt.More() {
return err
}
if err, ok := err.(*apiError); ok && err.StatusCode/100 != 5 {
if err, ok := err.(*apiError); ok && err.StatusCode != http.StatusInternalServerError {
// It's not a 5xx error. We want to retry on 5xx
// errors, because the Confluent Avro registry
// can occasionally return them as a matter of
Expand Down

0 comments on commit 2a59ca5

Please sign in to comment.