Skip to content

Commit

Permalink
add recover to contract monitor (#1256)
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense authored Mar 3, 2023
1 parent 199e124 commit d756076
Showing 1 changed file with 101 additions and 93 deletions.
194 changes: 101 additions & 93 deletions storagemarket/contract_deal_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,106 +90,113 @@ func (c *ContractDealMonitor) Start(ctx context.Context) error {
}
return
case resp := <-responseCh:
err := func() error {
event := resp.Result.(map[string]interface{})
topicContractAddress := event["address"].(string)
topicDealProposalID := event["topics"].([]interface{})[1].(string)

// allowlist check
if len(c.cfg.AllowlistContracts) != 0 && !slices.Contains(c.cfg.AllowlistContracts, topicContractAddress) {
return fmt.Errorf("allowlist does not contain this contract address: %s", topicContractAddress)
}

res, err := c.getDealProposal(ctx, topicContractAddress, topicDealProposalID, fromEthAddr)
if err != nil {
return fmt.Errorf("eth call for get deal proposal failed: %w", err)
}

resParams, err := c.getExtraData(ctx, topicContractAddress, topicDealProposalID, fromEthAddr)
if err != nil {
return fmt.Errorf("eth call for extra data failed: %w", err)
}

var dpc market.DealProposal
err = dpc.UnmarshalCBOR(bytes.NewReader(res))
if err != nil {
return fmt.Errorf("cbor unmarshal failed: %w", err)
}

var pv1 types.ContractParamsVersion1
err = pv1.UnmarshalCBOR(bytes.NewReader(resParams))
if err != nil {
return fmt.Errorf("params cbor unmarshal failed: %w", err)
}

rootCidStr, err := dpc.Label.ToString()
if err != nil {
return fmt.Errorf("getting cid from label failed: %w", err)
}

rootCid, err := cid.Parse(rootCidStr)
if err != nil {
return fmt.Errorf("parsing cid failed: %w", err)
}

prop := market.DealProposal{
PieceCID: dpc.PieceCID,
PieceSize: dpc.PieceSize,
VerifiedDeal: dpc.VerifiedDeal,
Client: dpc.Client,
Provider: c.maddr,

Label: dpc.Label,
func() {
defer func() {
if r := recover(); r != nil {
log.Errorw("recovered from panic from handling eth_subscribe event", "recover", r)
}
}()

err := func() error {
event := resp.Result.(map[string]interface{})
topicContractAddress := event["address"].(string)
topicDealProposalID := event["topics"].([]interface{})[1].(string)

// allowlist check
if len(c.cfg.AllowlistContracts) != 0 && !slices.Contains(c.cfg.AllowlistContracts, topicContractAddress) {
return fmt.Errorf("allowlist does not contain this contract address: %s", topicContractAddress)
}

res, err := c.getDealProposal(ctx, topicContractAddress, topicDealProposalID, fromEthAddr)
if err != nil {
return fmt.Errorf("eth call for get deal proposal failed: %w", err)
}

resParams, err := c.getExtraData(ctx, topicContractAddress, topicDealProposalID, fromEthAddr)
if err != nil {
return fmt.Errorf("eth call for extra data failed: %w", err)
}

var dpc market.DealProposal
err = dpc.UnmarshalCBOR(bytes.NewReader(res))
if err != nil {
return fmt.Errorf("cbor unmarshal failed: %w", err)
}

var pv1 types.ContractParamsVersion1
err = pv1.UnmarshalCBOR(bytes.NewReader(resParams))
if err != nil {
return fmt.Errorf("params cbor unmarshal failed: %w", err)
}

rootCidStr, err := dpc.Label.ToString()
if err != nil {
return fmt.Errorf("getting cid from label failed: %w", err)
}

rootCid, err := cid.Parse(rootCidStr)
if err != nil {
return fmt.Errorf("parsing cid failed: %w", err)
}

prop := market.DealProposal{
PieceCID: dpc.PieceCID,
PieceSize: dpc.PieceSize,
VerifiedDeal: dpc.VerifiedDeal,
Client: dpc.Client,
Provider: c.maddr,

Label: dpc.Label,

StartEpoch: dpc.StartEpoch,
EndEpoch: dpc.EndEpoch,
StoragePricePerEpoch: dpc.StoragePricePerEpoch,

ProviderCollateral: dpc.ProviderCollateral,
ClientCollateral: dpc.ClientCollateral,
}

proposal := types.DealParams{
DealUUID: uuid.New(),
IsOffline: false,
ClientDealProposal: market.ClientDealProposal{
Proposal: prop,
// signature is garbage, but it still needs to serialize, so shouldnt be empty!!
ClientSignature: crypto.Signature{
Type: crypto.SigTypeBLS,
Data: []byte{0xde, 0xad},
},
},
DealDataRoot: rootCid,
Transfer: types.Transfer{
Type: "http",
Params: []byte(fmt.Sprintf(`{"URL":"%s"}`, pv1.LocationRef)),
Size: pv1.CarSize,
},
RemoveUnsealedCopy: pv1.RemoveUnsealedCopy,
SkipIPNIAnnounce: pv1.SkipIpniAnnounce,
}

StartEpoch: dpc.StartEpoch,
EndEpoch: dpc.EndEpoch,
StoragePricePerEpoch: dpc.StoragePricePerEpoch,
log.Infow("received contract deal proposal", "id", proposal.DealUUID, "client-peer", dpc.Client)

ProviderCollateral: dpc.ProviderCollateral,
ClientCollateral: dpc.ClientCollateral,
}
reason, err := c.prov.ExecuteDeal(context.Background(), &proposal, "")
if err != nil {
log.Warnw("contract deal proposal failed", "id", proposal.DealUUID, "err", err, "reason", reason.Reason)
return nil
}

proposal := types.DealParams{
DealUUID: uuid.New(),
IsOffline: false,
ClientDealProposal: market.ClientDealProposal{
Proposal: prop,
// signature is garbage, but it still needs to serialize, so shouldnt be empty!!
ClientSignature: crypto.Signature{
Type: crypto.SigTypeBLS,
Data: []byte{0xde, 0xad},
},
},
DealDataRoot: rootCid,
Transfer: types.Transfer{
Type: "http",
Params: []byte(fmt.Sprintf(`{"URL":"%s"}`, pv1.LocationRef)),
Size: pv1.CarSize,
},
//TODO: maybe add to pv1?? RemoveUnsealedCopy: paramsAndVersion.RemoveUnsealedCopy,
RemoveUnsealedCopy: false,
SkipIPNIAnnounce: pv1.SkipIpniAnnounce,
}
if reason.Accepted {
log.Infow("contract deal proposal accepted", "id", proposal.DealUUID)
} else {
log.Warnw("contract deal proposal rejected", "id", proposal.DealUUID, "err", err, "reason", reason.Reason)
}

log.Infow("received contract deal proposal", "id", proposal.DealUUID, "client-peer", dpc.Client)

reason, err := c.prov.ExecuteDeal(context.Background(), &proposal, "")
if err != nil {
log.Warnw("contract deal proposal failed", "id", proposal.DealUUID, "err", err, "reason", reason.Reason)
return nil
}()
if err != nil {
log.Errorw("handling DealProposalCreate event erred", "err", err)
}

if reason.Accepted {
log.Infow("contract deal proposal accepted", "id", proposal.DealUUID)
} else {
log.Warnw("contract deal proposal rejected", "id", proposal.DealUUID, "err", err, "reason", reason.Reason)
}

return nil
}()
if err != nil {
log.Errorw("handling DealProposalCreate event erred", "err", err)
}
}
}
}()
Expand Down Expand Up @@ -244,6 +251,7 @@ func lengthPrefixPointsTo(output []byte) (int, int, error) {
if size.Cmp(boutputLen) > 0 {
return 0, 0, fmt.Errorf("length insufficient %v require %v", boutputLen, size)
}

return int(boffset.Uint64()), int(lengthBig.Uint64()), nil
}

Expand Down

0 comments on commit d756076

Please sign in to comment.