diff --git a/node/config/def.go b/node/config/def.go index b8a216a62..911177af0 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -98,7 +98,7 @@ func DefaultBoost() *Boost { DealProposalLogDuration: Duration(time.Hour * 24), RetrievalLogDuration: Duration(time.Hour * 24), - StalledRetrievalTimeout: Duration(time.Minute * 30), + StalledRetrievalTimeout: Duration(time.Second * 30), RetrievalPricing: &lotus_config.RetrievalPricing{ Strategy: RetrievalPricingDefaultMode, diff --git a/retrievalmarket/rtvllog/retrieval_log.go b/retrievalmarket/rtvllog/retrieval_log.go index c1f27e350..3711bf577 100644 --- a/retrievalmarket/rtvllog/retrieval_log.go +++ b/retrievalmarket/rtvllog/retrieval_log.go @@ -2,6 +2,7 @@ package rtvllog import ( "context" + "errors" "sync" "time" @@ -295,22 +296,36 @@ func (r *RetrievalLog) gcRetrievals(ctx context.Context) { continue } + var wg sync.WaitGroup for _, row := range rows { - chid := datatransfer.ChannelID{Initiator: row.PeerID, Responder: row.LocalPeerID, ID: row.TransferID} - // Try to cancel via unpaid graphsync first - err := r.gsur.CancelTransfer(ctx, row.TransferID, &row.PeerID) - - if err != nil { - // Attempt to terminate legacy, paid retrievals if we didnt cancel a free retrieval - err = r.dataTransfer.CloseDataTransferChannel(ctx, chid) - } - - if err != nil { - log.Debugw("error canceling retrieval", "dealID", row.DealID, "err", err) - } else { - log.Infof("Canceled retrieval %s, older than %s", row.DealID, r.stalledTimeout) + if row.TransferID <= 0 { + continue } + wg.Add(1) + go func(s RetrievalDealState) { + // Don't wait for more than 5 seconds for the cancel + // message to be sent when cancelling an unpaid retrieval + unpaidRtrvCtx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + defer wg.Done() + + // Try to cancel an unpaid retrieval with the given transfer id first + err := r.gsur.CancelTransfer(unpaidRtrvCtx, s.TransferID, &s.PeerID) + if err != nil && errors.Is(err, server.ErrRetrievalNotFound) { + // Couldn't find an unpaid retrieval with that id, try + // to cancel a legacy, paid retrieval + chid := datatransfer.ChannelID{Initiator: s.PeerID, Responder: s.LocalPeerID, ID: s.TransferID} + err = r.dataTransfer.CloseDataTransferChannel(ctx, chid) + } + + if err != nil { + log.Debugw("error canceling retrieval", "dealID", s.DealID, "err", err) + } else { + log.Infof("Canceled retrieval %s, older than %s", s.DealID, r.stalledTimeout) + } + }(row) } + wg.Wait() } } } diff --git a/retrievalmarket/server/gsunpaidretrieval.go b/retrievalmarket/server/gsunpaidretrieval.go index 46a42ce58..76d591e8f 100644 --- a/retrievalmarket/server/gsunpaidretrieval.go +++ b/retrievalmarket/server/gsunpaidretrieval.go @@ -30,6 +30,7 @@ import ( ) var log = logging.Logger("boostgs") +var ErrRetrievalNotFound = fmt.Errorf("no transfer found") var incomingReqExtensions = []graphsync.ExtensionName{ extension.ExtensionIncomingRequest1_1, @@ -175,7 +176,7 @@ func (g *GraphsyncUnpaidRetrieval) CancelTransfer(ctx context.Context, id datatr if state == nil { g.activeRetrievalsLk.Unlock() - return fmt.Errorf("no transfer with id %d", id) + return fmt.Errorf("failed to cancel with id %d: %w", id, ErrRetrievalNotFound) } rcpt := state.cs.recipient