Skip to content

Commit

Permalink
chore: add retry logic for bluesky client
Browse files Browse the repository at this point in the history
  • Loading branch information
polebug committed Dec 13, 2024
1 parent bc65a7b commit 68585da
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 23 deletions.
8 changes: 7 additions & 1 deletion internal/node/component/federated/handler_handles.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package federated

import (
"context"
"fmt"
"net/http"

"github.com/creasty/defaults"
Expand Down Expand Up @@ -37,8 +38,13 @@ func (c *Component) GetHandles(ctx echo.Context) error {
cursor string
)

if request.Platform == federated.PlatformMastodon {
switch request.Platform {
case federated.PlatformMastodon:
handles, err = c.getMastodonHandles(ctx.Request().Context(), request)
case federated.PlatformBluesky:
handles, err = c.getBlueskyHandles(ctx.Request().Context(), request)
default:
return response.BadRequestError(ctx, fmt.Errorf("unsupported platform"))
}

if err != nil {
Expand Down
84 changes: 62 additions & 22 deletions provider/atproto/bluesky/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/avast/retry-go/v4"
"net"
"net/http"
"strings"
Expand Down Expand Up @@ -63,8 +64,13 @@ func (c *Client) SyncListRepos(ctx context.Context, cursor string, limit int64)
}

// Fetch the list of repos
results, err := atproto.SyncListRepos(ctx, client.Client, cursor, limit)
if err != nil {
var results *atproto.SyncListRepos_Output

if err := c.retrySource(ctx, func(ctx context.Context) error {
results, err = atproto.SyncListRepos(ctx, client.Client, cursor, limit)

return err
}); err != nil {
zap.L().Error("sync list repos failed", zap.Error(err))

return nil, nil, fmt.Errorf("sync list repos failed: %w", err)
Expand Down Expand Up @@ -131,17 +137,27 @@ func (c *Client) SyncGetRepo(ctx context.Context, repoData *atproto.SyncListRepo
return nil, fmt.Errorf("create xrpc client: %w", err)
}

var handle string

// Get user handle from profile
handle, err := c.GetHandle(ctx, client, did)
if err != nil {
if err := c.retrySource(ctx, func(ctx context.Context) error {
handle, err = c.GetHandle(ctx, client, did)

return err
}); err != nil {
zap.L().Error("get profile failed", zap.Error(err))

return nil, fmt.Errorf("get profile: %w", err)
}

var repoBytes []byte

// Fetch repo data
repoBytes, err := atproto.SyncGetRepo(ctx, client.Client, did.String(), "")
if err != nil {
if err := c.retrySource(ctx, func(ctx context.Context) error {
repoBytes, err = atproto.SyncGetRepo(ctx, client.Client, did.String(), "")

return err
}); err != nil {
zap.L().Error("get repo failed", zap.Error(err))

return nil, fmt.Errorf("get repo: %w", err)
Expand Down Expand Up @@ -296,26 +312,36 @@ func (c *Client) GetRecord(ctx context.Context, repo string, path string) (*at.M
Rkey: rkey,
}

// Retrieve the record
resp, err := atproto.RepoGetRecord(ctx, client.Client, "", collection, repo, rkey)
if err != nil || resp == nil {
// fallback to default client
defaultClient, err := c.GetXrpcClient(ctx, BskyEndpoint)
if err != nil {
zap.L().Error("get xrpc client failed", zap.Error(err))
var resp *atproto.RepoGetRecord_Output

return nil, nil, fmt.Errorf("get xrpc client: %w", err)
}
// Retrieve the record
if err := c.retrySource(ctx, func(ctx context.Context) error {
resp, err = atproto.RepoGetRecord(ctx, client.Client, "", collection, repo, rkey)
if err != nil || resp == nil {
// fallback to default client
defaultClient, err := c.GetXrpcClient(ctx, BskyEndpoint)
if err != nil {
zap.L().Error("get xrpc client failed", zap.Error(err))

resp, err = atproto.RepoGetRecord(ctx, defaultClient.Client, "", collection, repo, rkey)
if err != nil {
// Handle CID too short error gracefully
if strings.Contains(err.Error(), cid.ErrCidTooShort.Error()) || strings.Contains(err.Error(), "XRPC ERROR 400") {
return message, nil, nil
return fmt.Errorf("get xrpc client: %w", err)
}

return nil, nil, fmt.Errorf("repo get record: %w", err)
resp, err = atproto.RepoGetRecord(ctx, defaultClient.Client, "", collection, repo, rkey)
if err != nil {
// Handle CID too short error gracefully
if strings.Contains(err.Error(), cid.ErrCidTooShort.Error()) || strings.Contains(err.Error(), "XRPC ERROR 400") {
return nil
}

return fmt.Errorf("repo get record: %w", err)
}
}

return nil
}); err != nil {
zap.L().Error("repo get record failed", zap.Error(err))

return nil, nil, fmt.Errorf("repo get record: %w", err)
}

if resp != nil && resp.Value != nil {
Expand Down Expand Up @@ -345,7 +371,7 @@ func (c *Client) GetHandle(ctx context.Context, client *XrpcClient, did syntax.D
return resp.Handle, nil
}

return "", nil
return "", fmt.Errorf("handle not found for DID: %s", did)
}

// ParseRecord processes different types of records and updates the message.
Expand Down Expand Up @@ -604,6 +630,20 @@ func (c *Client) LookupDIDEndpoint(ctx context.Context, did syntax.DID) string {
return BskyEndpoint
}

func (c *Client) retrySource(ctx context.Context, Func func(ctx context.Context) error) error {
return retry.Do(
func() error {
return Func(ctx)
},
retry.Attempts(0),
retry.Delay(5*time.Second),
retry.DelayType(retry.BackOffDelay),
retry.OnRetry(func(n uint, err error) {
zap.L().Warn("retry bluesky source", zap.Uint("retry", n), zap.Error(err))
}),
)
}

type Option func(client *Client) error

func WithFilter(filter []string) Option {
Expand Down

0 comments on commit 68585da

Please sign in to comment.