Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hashmail logging cleanup #1176

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions proof/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,19 @@ func (h *HashMailBox) RecvAck(ctx context.Context, sid streamID) error {
return fmt.Errorf("unable to create read stream: %w", err)
}

log.Debugf("Exec stream Recv for receiver ACK (sid=%x)", sid[:])
msg, err := readStream.Recv()
if err != nil {
return err
return fmt.Errorf("failed on stream Recv (sid=%x): %w", sid[:],
err)
}

if bytes.Equal(msg.Msg, ackMsg) {
log.Debugf("Received ACK from sender (sid=%x)", sid[:])
return nil
}

return fmt.Errorf("expected ack, got %x", msg.Msg)
return fmt.Errorf("expected ACK from hashmail service, got %x", msg.Msg)
}

// CleanUp attempts to tear down the mailbox as specified by the passed sid.
Expand Down Expand Up @@ -867,13 +870,20 @@ func (h *HashMailCourier) ensureConnect(ctx context.Context) error {
func (h *HashMailCourier) DeliverProof(ctx context.Context,
recipient Recipient, proof *AnnotatedProof) error {

log.Infof("Attempting to deliver receiver proof for send of "+
"asset_id=%v, amt=%v", recipient.AssetID, recipient.Amount)

// Compute the stream IDs for the sender and receiver.
// Compute the stream IDs for the sender and receiver. Note that these
// stream IDs are derived from the recipient's script key only. Which
// means that stream IDs will be identical for multiple proofs sent to
// the same recipient.
senderStreamID := deriveSenderStreamID(recipient)
receiverStreamID := deriveReceiverStreamID(recipient)

log.Infof("Delivering proof to asset transfer receiver "+
"(amt=%v, asset_id=%v, script_pub_key=%x, "+
"sender_sid=%x, receiver_sid=%x)",
recipient.Amount, recipient.AssetID,
recipient.ScriptKey.SerializeCompressed(), senderStreamID,
receiverStreamID)

// Interact with the hashmail service using a backoff procedure to
// ensure that we don't overwhelm the service with delivery attempts.
deliveryExec := func() error {
Expand Down Expand Up @@ -901,8 +911,7 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context,
// TODO(roasbeef): do ecies here
// (this ^ TODO relates to encrypting proofs for the receiver
// before uploading to the courier)
log.Infof("Sending receiver proof via sid=%x",
senderStreamID)
log.Infof("Writing proof to mailbox (sid=%x)", senderStreamID)
err = h.mailbox.WriteProof(
ctx, senderStreamID, proof.Blob,
)
Expand All @@ -911,21 +920,27 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context,
"transfer receiver: %w", err)
}

// Wait to receive the ACK from the remote party over
// their stream.
log.Infof("Waiting (%v) for receiver ACK via sid=%x",
h.cfg.ReceiverAckTimeout, receiverStreamID)
// Wait to receive ACK from proof transfer receiving peer over
// hashmail service.
log.Infof("Waiting for receiver ACK from hashmail service "+
"(timeout=%v, sid=%x)", h.cfg.ReceiverAckTimeout,
receiverStreamID)

ctxTimeout, cancel := context.WithTimeout(
ctx, h.cfg.ReceiverAckTimeout,
)
defer cancel()
err = h.mailbox.RecvAck(ctxTimeout, receiverStreamID)
if err != nil {
return fmt.Errorf("failed to receive ACK from "+
"receiver within timeout: %w", err)
return fmt.Errorf("failed to retrieve proof transfer "+
"receiver ACK within timeout (sid=%x): %w",
receiverStreamID, err)
}

log.Infof("Retrieved proof transfer receiver ACK from "+
"hashmail service (timeout=%v, sid=%x)",
h.cfg.ReceiverAckTimeout, receiverStreamID)

return nil
}
err := h.backoffHandle.Exec(
Expand All @@ -937,12 +952,12 @@ func (h *HashMailCourier) DeliverProof(ctx context.Context,
"failed: %w", err)
}

log.Infof("Received ACK from receiver! Cleaning up mailboxes...")

defer h.Close()

// Once we receive this ACK, we can clean up our mailbox and also the
// receiver's mailbox.
// If the backoff handler's exec routine completes successfully, we can
// remove our mailbox and the receiver's mailbox.
log.Infof("Removing sender and recipient mailboxes from hashmail " +
"service")
if err := h.mailbox.CleanUp(ctx, senderStreamID); err != nil {
return fmt.Errorf("failed to cleanup sender mailbox: %w", err)
}
Expand Down
82 changes: 72 additions & 10 deletions tapfreighter/chain_porter.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,15 +763,72 @@ func (p *ChainPorter) updateAssetProofFile(ctx context.Context,
}, nil
}

// reportProofTransfers logs a summary of the transfer outputs that require
// proof delivery and those that do not.
func reportProofTransfers(notDeliveringOutputs []TransferOutput,
pendingDeliveryOutputs []TransferOutput) {

log.Debugf("Count of transfer output(s) by proof delivery status: "+
"(count_delivery_not_applicable=%d, count_pending_delivery=%d)",
len(notDeliveringOutputs), len(pendingDeliveryOutputs))

// Report the transfer outputs that do not require proof delivery.
if len(notDeliveringOutputs) > 0 {
logEntries := make([]string, 0, len(notDeliveringOutputs))
for idx := range notDeliveringOutputs {
out := notDeliveringOutputs[idx]
key := out.ScriptKey.PubKey

entry := fmt.Sprintf("transfer_output_position=%d, "+
"proof_delivery_status=%v, "+
"script_key=%x", out.Position,
out.ProofDeliveryComplete,
key.SerializeCompressed())
logEntries = append(logEntries, entry)
}

entriesJoin := strings.Join(logEntries, "\n")
log.Debugf("Transfer outputs that do not require proof "+
"delivery:\n%v", entriesJoin)
}

// Report the transfer outputs that require proof delivery.
if len(pendingDeliveryOutputs) > 0 {
logEntries := make([]string, 0, len(pendingDeliveryOutputs))
for idx := range pendingDeliveryOutputs {
out := pendingDeliveryOutputs[idx]
key := out.ScriptKey.PubKey

entry := fmt.Sprintf("transfer_output_position=%d, "+
"proof_delivery_status=%v, "+
"proof_courier_addr=%s, "+
"script_key=%x", out.Position,
out.ProofDeliveryComplete, out.ProofCourierAddr,
key.SerializeCompressed())
logEntries = append(logEntries, entry)
}

entriesJoin := strings.Join(logEntries, "\n")
log.Debugf("Transfer outputs that require proof delivery:\n%v",
entriesJoin)
}
}

// transferReceiverProof retrieves the sender and receiver proofs from the
// archive and then transfers the receiver's proof to the receiver. Upon
// successful transfer, the asset parcel delivery is marked as complete.
func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
ctx, cancel := p.WithCtxQuitNoTimeout()
defer cancel()

deliver := func(ctx context.Context, out TransferOutput) error {
key := out.ScriptKey.PubKey
// Classify transfer outputs into those that require proof delivery and
// those that do not.
var (
notDeliveringOutputs []TransferOutput
pendingDeliveryOutputs []TransferOutput
)
for idx := range pkg.OutboundPkg.Outputs {
out := pkg.OutboundPkg.Outputs[idx]

// We'll first check to see if the proof should be delivered.
shouldDeliverProof, err := out.ShouldDeliverProof()
Expand All @@ -781,15 +838,20 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
}

if !shouldDeliverProof {
log.Debugf("Transfer ouput proof does not require "+
"delivery (transfer_output_position=%d, "+
"proof_delivery_status=%v, "+
"script_key=%x)", out.Position,
out.ProofDeliveryComplete,
key.SerializeCompressed())
return nil
notDeliveringOutputs = append(notDeliveringOutputs, out)
continue
}

pendingDeliveryOutputs = append(pendingDeliveryOutputs, out)
}

// Log a summary of the transfer outputs that require proof delivery and
// those that do not.
reportProofTransfers(notDeliveringOutputs, pendingDeliveryOutputs)

deliver := func(ctx context.Context, out TransferOutput) error {
key := out.ScriptKey.PubKey

// We just look for the full proof in the list of final proofs
// by matching the content of the proof suffix.
var receiverProof *proof.AnnotatedProof
Expand Down Expand Up @@ -871,7 +933,7 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
// If we have a non-interactive proof, then we'll launch several
// goroutines to deliver the proof(s) to the receiver(s).
instanceErrors, err := fn.ParSliceErrCollect(
ctx, pkg.OutboundPkg.Outputs, deliver,
ctx, pendingDeliveryOutputs, deliver,
)
if err != nil {
return fmt.Errorf("error delivering proof(s): %w", err)
Expand Down
Loading