From 68585daa781763e625ff14e5334d5b35c839ca0e Mon Sep 17 00:00:00 2001 From: polebug Date: Fri, 13 Dec 2024 17:58:03 +0800 Subject: [PATCH] chore: add retry logic for bluesky client --- .../component/federated/handler_handles.go | 8 +- provider/atproto/bluesky/client.go | 84 ++++++++++++++----- 2 files changed, 69 insertions(+), 23 deletions(-) diff --git a/internal/node/component/federated/handler_handles.go b/internal/node/component/federated/handler_handles.go index 5901ec67..d47ec35c 100644 --- a/internal/node/component/federated/handler_handles.go +++ b/internal/node/component/federated/handler_handles.go @@ -2,6 +2,7 @@ package federated import ( "context" + "fmt" "net/http" "github.com/creasty/defaults" @@ -37,8 +38,13 @@ func (c *Component) GetHandles(ctx echo.Context) error { cursor string ) - if request.Platform == federated.PlatformMastodon { + switch request.Platform { + case federated.PlatformMastodon: handles, err = c.getMastodonHandles(ctx.Request().Context(), request) + case federated.PlatformBluesky: + handles, err = c.getBlueskyHandles(ctx.Request().Context(), request) + default: + return response.BadRequestError(ctx, fmt.Errorf("unsupported platform")) } if err != nil { diff --git a/provider/atproto/bluesky/client.go b/provider/atproto/bluesky/client.go index c3e2b052..a82c5c02 100644 --- a/provider/atproto/bluesky/client.go +++ b/provider/atproto/bluesky/client.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/avast/retry-go/v4" "net" "net/http" "strings" @@ -63,8 +64,13 @@ func (c *Client) SyncListRepos(ctx context.Context, cursor string, limit int64) } // Fetch the list of repos - results, err := atproto.SyncListRepos(ctx, client.Client, cursor, limit) - if err != nil { + var results *atproto.SyncListRepos_Output + + if err := c.retrySource(ctx, func(ctx context.Context) error { + results, err = atproto.SyncListRepos(ctx, client.Client, cursor, limit) + + return err + }); err != nil { zap.L().Error("sync list repos failed", zap.Error(err)) return nil, nil, fmt.Errorf("sync list repos failed: %w", err) @@ -131,17 +137,27 @@ func (c *Client) SyncGetRepo(ctx context.Context, repoData *atproto.SyncListRepo return nil, fmt.Errorf("create xrpc client: %w", err) } + var handle string + // Get user handle from profile - handle, err := c.GetHandle(ctx, client, did) - if err != nil { + if err := c.retrySource(ctx, func(ctx context.Context) error { + handle, err = c.GetHandle(ctx, client, did) + + return err + }); err != nil { zap.L().Error("get profile failed", zap.Error(err)) return nil, fmt.Errorf("get profile: %w", err) } + var repoBytes []byte + // Fetch repo data - repoBytes, err := atproto.SyncGetRepo(ctx, client.Client, did.String(), "") - if err != nil { + if err := c.retrySource(ctx, func(ctx context.Context) error { + repoBytes, err = atproto.SyncGetRepo(ctx, client.Client, did.String(), "") + + return err + }); err != nil { zap.L().Error("get repo failed", zap.Error(err)) return nil, fmt.Errorf("get repo: %w", err) @@ -296,26 +312,36 @@ func (c *Client) GetRecord(ctx context.Context, repo string, path string) (*at.M Rkey: rkey, } - // Retrieve the record - resp, err := atproto.RepoGetRecord(ctx, client.Client, "", collection, repo, rkey) - if err != nil || resp == nil { - // fallback to default client - defaultClient, err := c.GetXrpcClient(ctx, BskyEndpoint) - if err != nil { - zap.L().Error("get xrpc client failed", zap.Error(err)) + var resp *atproto.RepoGetRecord_Output - return nil, nil, fmt.Errorf("get xrpc client: %w", err) - } + // Retrieve the record + if err := c.retrySource(ctx, func(ctx context.Context) error { + resp, err = atproto.RepoGetRecord(ctx, client.Client, "", collection, repo, rkey) + if err != nil || resp == nil { + // fallback to default client + defaultClient, err := c.GetXrpcClient(ctx, BskyEndpoint) + if err != nil { + zap.L().Error("get xrpc client failed", zap.Error(err)) - resp, err = atproto.RepoGetRecord(ctx, defaultClient.Client, "", collection, repo, rkey) - if err != nil { - // Handle CID too short error gracefully - if strings.Contains(err.Error(), cid.ErrCidTooShort.Error()) || strings.Contains(err.Error(), "XRPC ERROR 400") { - return message, nil, nil + return fmt.Errorf("get xrpc client: %w", err) } - return nil, nil, fmt.Errorf("repo get record: %w", err) + resp, err = atproto.RepoGetRecord(ctx, defaultClient.Client, "", collection, repo, rkey) + if err != nil { + // Handle CID too short error gracefully + if strings.Contains(err.Error(), cid.ErrCidTooShort.Error()) || strings.Contains(err.Error(), "XRPC ERROR 400") { + return nil + } + + return fmt.Errorf("repo get record: %w", err) + } } + + return nil + }); err != nil { + zap.L().Error("repo get record failed", zap.Error(err)) + + return nil, nil, fmt.Errorf("repo get record: %w", err) } if resp != nil && resp.Value != nil { @@ -345,7 +371,7 @@ func (c *Client) GetHandle(ctx context.Context, client *XrpcClient, did syntax.D return resp.Handle, nil } - return "", nil + return "", fmt.Errorf("handle not found for DID: %s", did) } // ParseRecord processes different types of records and updates the message. @@ -604,6 +630,20 @@ func (c *Client) LookupDIDEndpoint(ctx context.Context, did syntax.DID) string { return BskyEndpoint } +func (c *Client) retrySource(ctx context.Context, Func func(ctx context.Context) error) error { + return retry.Do( + func() error { + return Func(ctx) + }, + retry.Attempts(0), + retry.Delay(5*time.Second), + retry.DelayType(retry.BackOffDelay), + retry.OnRetry(func(n uint, err error) { + zap.L().Warn("retry bluesky source", zap.Uint("retry", n), zap.Error(err)) + }), + ) +} + type Option func(client *Client) error func WithFilter(filter []string) Option {