diff --git a/proof/courier.go b/proof/courier.go index 5dbc274a8..908ba3c22 100644 --- a/proof/courier.go +++ b/proof/courier.go @@ -439,16 +439,19 @@ func (h *HashMailBox) RecvAck(ctx context.Context, sid streamID) error { return fmt.Errorf("unable to create read stream: %w", err) } + log.Debugf("Exec stream Recv for receiver ACK (sid=%x)", sid[:]) msg, err := readStream.Recv() if err != nil { - return err + return fmt.Errorf("failed on stream Recv (sid=%x): %w", sid[:], + err) } if bytes.Equal(msg.Msg, ackMsg) { + log.Debugf("Received ACK from sender (sid=%x)", sid[:]) return nil } - return fmt.Errorf("expected ack, got %x", msg.Msg) + return fmt.Errorf("expected ACK from hashmail service, got %x", msg.Msg) } // CleanUp attempts to tear down the mailbox as specified by the passed sid. @@ -867,13 +870,20 @@ func (h *HashMailCourier) ensureConnect(ctx context.Context) error { func (h *HashMailCourier) DeliverProof(ctx context.Context, recipient Recipient, proof *AnnotatedProof) error { - log.Infof("Attempting to deliver receiver proof for send of "+ - "asset_id=%v, amt=%v", recipient.AssetID, recipient.Amount) - - // Compute the stream IDs for the sender and receiver. + // Compute the stream IDs for the sender and receiver. Note that these + // stream IDs are derived from the recipient's script key only. Which + // means that stream IDs will be identical for multiple proofs sent to + // the same recipient. senderStreamID := deriveSenderStreamID(recipient) receiverStreamID := deriveReceiverStreamID(recipient) + log.Infof("Delivering proof to asset transfer receiver "+ + "(amt=%v, asset_id=%v, script_pub_key=%x, "+ + "sender_sid=%x, receiver_sid=%x)", + recipient.Amount, recipient.AssetID, + recipient.ScriptKey.SerializeCompressed(), senderStreamID, + receiverStreamID) + // Interact with the hashmail service using a backoff procedure to // ensure that we don't overwhelm the service with delivery attempts. deliveryExec := func() error { @@ -901,8 +911,7 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context, // TODO(roasbeef): do ecies here // (this ^ TODO relates to encrypting proofs for the receiver // before uploading to the courier) - log.Infof("Sending receiver proof via sid=%x", - senderStreamID) + log.Infof("Writing proof to mailbox (sid=%x)", senderStreamID) err = h.mailbox.WriteProof( ctx, senderStreamID, proof.Blob, ) @@ -911,10 +920,11 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context, "transfer receiver: %w", err) } - // Wait to receive the ACK from the remote party over - // their stream. - log.Infof("Waiting (%v) for receiver ACK via sid=%x", - h.cfg.ReceiverAckTimeout, receiverStreamID) + // Wait to receive ACK from proof transfer receiving peer over + // hashmail service. + log.Infof("Waiting for receiver ACK from hashmail service "+ + "(timeout=%v, sid=%x)", h.cfg.ReceiverAckTimeout, + receiverStreamID) ctxTimeout, cancel := context.WithTimeout( ctx, h.cfg.ReceiverAckTimeout, @@ -922,10 +932,15 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context, defer cancel() err = h.mailbox.RecvAck(ctxTimeout, receiverStreamID) if err != nil { - return fmt.Errorf("failed to receive ACK from "+ - "receiver within timeout: %w", err) + return fmt.Errorf("failed to retrieve proof transfer "+ + "receiver ACK within timeout (sid=%x): %w", + receiverStreamID, err) } + log.Infof("Retrieved proof transfer receiver ACK from "+ + "hashmail service (timeout=%v, sid=%x)", + h.cfg.ReceiverAckTimeout, receiverStreamID) + return nil } err := h.backoffHandle.Exec( @@ -937,12 +952,12 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context, "failed: %w", err) } - log.Infof("Received ACK from receiver! Cleaning up mailboxes...") - defer h.Close() - // Once we receive this ACK, we can clean up our mailbox and also the - // receiver's mailbox. + // If the backoff handler's exec routine completes successfully, we can + // remove our mailbox and the receiver's mailbox. + log.Infof("Removing sender and recipient mailboxes from hashmail " + + "service") if err := h.mailbox.CleanUp(ctx, senderStreamID); err != nil { return fmt.Errorf("failed to cleanup sender mailbox: %w", err) } diff --git a/tapfreighter/chain_porter.go b/tapfreighter/chain_porter.go index 94fe60785..3fbb6790d 100644 --- a/tapfreighter/chain_porter.go +++ b/tapfreighter/chain_porter.go @@ -763,6 +763,57 @@ func (p *ChainPorter) updateAssetProofFile(ctx context.Context, }, nil } +// reportProofTransfers logs a summary of the transfer outputs that require +// proof delivery and those that do not. +func reportProofTransfers(notDeliveringOutputs []TransferOutput, + pendingDeliveryOutputs []TransferOutput) { + + log.Debugf("Count of transfer output(s) by proof delivery status: "+ + "(count_delivery_not_applicable=%d, count_pending_delivery=%d)", + len(notDeliveringOutputs), len(pendingDeliveryOutputs)) + + // Report the transfer outputs that do not require proof delivery. + if len(notDeliveringOutputs) > 0 { + logEntries := make([]string, 0, len(notDeliveringOutputs)) + for idx := range notDeliveringOutputs { + out := notDeliveringOutputs[idx] + key := out.ScriptKey.PubKey + + entry := fmt.Sprintf("transfer_output_position=%d, "+ + "proof_delivery_status=%v, "+ + "script_key=%x", out.Position, + out.ProofDeliveryComplete, + key.SerializeCompressed()) + logEntries = append(logEntries, entry) + } + + entriesJoin := strings.Join(logEntries, "\n") + log.Debugf("Transfer outputs that do not require proof "+ + "delivery:\n%v", entriesJoin) + } + + // Report the transfer outputs that require proof delivery. + if len(pendingDeliveryOutputs) > 0 { + logEntries := make([]string, 0, len(pendingDeliveryOutputs)) + for idx := range pendingDeliveryOutputs { + out := pendingDeliveryOutputs[idx] + key := out.ScriptKey.PubKey + + entry := fmt.Sprintf("transfer_output_position=%d, "+ + "proof_delivery_status=%v, "+ + "proof_courier_addr=%s, "+ + "script_key=%x", out.Position, + out.ProofDeliveryComplete, out.ProofCourierAddr, + key.SerializeCompressed()) + logEntries = append(logEntries, entry) + } + + entriesJoin := strings.Join(logEntries, "\n") + log.Debugf("Transfer outputs that require proof delivery:\n%v", + entriesJoin) + } +} + // transferReceiverProof retrieves the sender and receiver proofs from the // archive and then transfers the receiver's proof to the receiver. Upon // successful transfer, the asset parcel delivery is marked as complete. @@ -770,8 +821,14 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { ctx, cancel := p.WithCtxQuitNoTimeout() defer cancel() - deliver := func(ctx context.Context, out TransferOutput) error { - key := out.ScriptKey.PubKey + // Classify transfer outputs into those that require proof delivery and + // those that do not. + var ( + notDeliveringOutputs []TransferOutput + pendingDeliveryOutputs []TransferOutput + ) + for idx := range pkg.OutboundPkg.Outputs { + out := pkg.OutboundPkg.Outputs[idx] // We'll first check to see if the proof should be delivered. shouldDeliverProof, err := out.ShouldDeliverProof() @@ -781,15 +838,20 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { } if !shouldDeliverProof { - log.Debugf("Transfer ouput proof does not require "+ - "delivery (transfer_output_position=%d, "+ - "proof_delivery_status=%v, "+ - "script_key=%x)", out.Position, - out.ProofDeliveryComplete, - key.SerializeCompressed()) - return nil + notDeliveringOutputs = append(notDeliveringOutputs, out) + continue } + pendingDeliveryOutputs = append(pendingDeliveryOutputs, out) + } + + // Log a summary of the transfer outputs that require proof delivery and + // those that do not. + reportProofTransfers(notDeliveringOutputs, pendingDeliveryOutputs) + + deliver := func(ctx context.Context, out TransferOutput) error { + key := out.ScriptKey.PubKey + // We just look for the full proof in the list of final proofs // by matching the content of the proof suffix. var receiverProof *proof.AnnotatedProof @@ -871,7 +933,7 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error { // If we have a non-interactive proof, then we'll launch several // goroutines to deliver the proof(s) to the receiver(s). instanceErrors, err := fn.ParSliceErrCollect( - ctx, pkg.OutboundPkg.Outputs, deliver, + ctx, pendingDeliveryOutputs, deliver, ) if err != nil { return fmt.Errorf("error delivering proof(s): %w", err)