Skip to content

Commit

Permalink
fan out in message manager
Browse files Browse the repository at this point in the history
  • Loading branch information
cam-schultz committed Oct 3, 2023
1 parent 02ee06b commit 40c568a
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 181 deletions.
146 changes: 67 additions & 79 deletions messages/block_hash_publisher/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ type destinationSenderInfo struct {
lastBlock uint64
}

func (d *destinationSenderInfo) shouldSend(blockTimestamp uint64, blockNumber uint64) bool {
if d.useTimeInterval {
interval := d.timeIntervalSeconds
if time.Unix(int64(blockTimestamp), 0).Sub(time.Unix(int64(d.lastTimeSent), 0)) < (time.Duration(interval) * time.Second) {
return false
}
} else {
interval := d.blockInterval
if blockNumber-d.lastBlock < uint64(interval) {
return false
}
}
return true
}

type messageManager struct {
destinations map[ids.ID]*destinationSenderInfo
logger logging.Logger
Expand Down Expand Up @@ -83,8 +98,6 @@ func NewMessageManager(
address: common.HexToAddress(destination.Address),
client: destinationClients[destinationID],
}
logger.Info("DEBUG DESTINATIONS", zap.String("address", destinations[destinationID].address.String()))

}

return &messageManager{
Expand All @@ -93,96 +106,71 @@ func NewMessageManager(
}, nil
}

// ShouldSendMessage returns true if the message should be sent to the destination chain
func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error) {
destination, ok := m.destinations[destinationChainID]
if !ok {
var destinationIDs []string
for id := range m.destinations {
destinationIDs = append(destinationIDs, id.String())
}
m.logger.Info(
"DEBUG",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("configuredDestinations", fmt.Sprintf("%#v", destinationIDs)),
)
return false, fmt.Errorf("relayer not configured to deliver to destination. destinationChainID=%s", destinationChainID)
}
if destination.useTimeInterval {
interval := destination.timeIntervalSeconds
if time.Unix(int64(warpMessageInfo.BlockTimestamp), 0).Sub(time.Unix(int64(destination.lastTimeSent), 0)) < (time.Duration(interval) * time.Second) {
return false, nil
}
} else {
interval := destination.blockInterval
if warpMessageInfo.BlockNumber-destination.lastBlock < uint64(interval) {
m.logger.Info(
"DEBUG",
zap.String("decision", "Not sending"),
zap.Int("blockNum", int(warpMessageInfo.BlockNumber)),
zap.Int("lastBlockNum", int(destination.lastBlock)),
)
return false, nil
// ShouldSendMessage returns true if the message should be sent to ANY of the configured destination chains
// This saves us from having to aggregate signatures in that case. Decisions about which destination chains to send to are made in SendMessage
func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, _ ids.ID) (bool, error) {
for _, destination := range m.destinations {
if destination.shouldSend(warpMessageInfo.BlockTimestamp, warpMessageInfo.BlockNumber) {
return true, nil
}
}
// Set the last approved block/time here. We don't set the last sent block/time until the message is actually sent
destination.lastApprovedBlock = warpMessageInfo.BlockNumber
destination.lastApprovedTime = warpMessageInfo.BlockTimestamp
m.logger.Info(
"DEBUG",
zap.String("decision", "Sending"),
zap.Int("blockNum", int(warpMessageInfo.BlockNumber)),
zap.Int("lastBlockNum", int(destination.lastBlock)),
)
return true, nil
return false, nil
}

func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayload []byte, destinationChainID ids.ID) error {
func (m *messageManager) SendMessage(signedMessage *warp.Message, warpMessageInfo *vmtypes.WarpMessageInfo, _ ids.ID) error {
m.logger.Info(
"Sending message to destination chain",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
"DEBUG SENDING",
zap.String("destinationInfo", fmt.Sprintf("%#v", m.destinations)),
zap.Uint64("blockTimestampe", warpMessageInfo.BlockTimestamp),
zap.Uint64("blockNumber", warpMessageInfo.BlockNumber),
)
// Construct the transaction call data to call the receive cross chain message method of the receiver precompile.
callData, err := teleporter_block_hash.PackReceiveBlockHash(teleporter_block_hash.ReceiveBlockHashInput{
MessageIndex: uint32(0),
SourceChainID: signedMessage.SourceChainID,
})
if err != nil {
m.logger.Error(
"Failed packing receiveCrossChainMessage call data",
zap.Error(err),
zap.Uint32("messageIndex", 0),
zap.String("sourceChainID", signedMessage.SourceChainID.String()),
for destinationChainID, destination := range m.destinations {
if !destination.shouldSend(warpMessageInfo.BlockTimestamp, warpMessageInfo.BlockNumber) {
continue
}

m.logger.Info(
"Sending message to destination chain",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
)
return err
}
// Construct the transaction call data to call the receive cross chain message method of the receiver precompile.
callData, err := teleporter_block_hash.PackReceiveBlockHash(teleporter_block_hash.ReceiveBlockHashInput{
MessageIndex: uint32(0),
SourceChainID: signedMessage.SourceChainID,
})
if err != nil {
m.logger.Error(
"Failed packing receiveCrossChainMessage call data",
zap.Error(err),
zap.Uint32("messageIndex", 0),
zap.String("sourceChainID", signedMessage.SourceChainID.String()),
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
)
return err
}

// Get the correct destination client from the global map
destination, ok := m.destinations[destinationChainID]
if !ok {
return fmt.Errorf("relayer not configured to deliver to destination. destinationChainID=%s", destinationChainID)
}
err = destination.client.SendTx(signedMessage, destination.address.Hex(), publishBlockHashGasLimit, callData)
if err != nil {
m.logger.Error(
"Failed to send tx.",
// Get the correct destination client from the global map
err = destination.client.SendTx(signedMessage, destination.address.Hex(), publishBlockHashGasLimit, callData)
if err != nil {
m.logger.Error(
"Failed to send tx.",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
zap.Error(err),
)
return err
}

// Set the last sent block/time
destination.lastTimeSent = destination.lastApprovedTime
destination.lastBlock = destination.lastApprovedBlock
m.logger.Info(
"Sent message to destination chain",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
zap.Error(err),
)
return err
}

// Set the last sent block/time
destination.lastTimeSent = destination.lastApprovedTime
destination.lastBlock = destination.lastApprovedBlock
m.logger.Info(
"Sent message to destination chain",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("warpMessageID", signedMessage.ID().String()),
)
return nil
}
2 changes: 1 addition & 1 deletion messages/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type MessageManager interface {
ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error)
// SendMessage sends the signed message to the destination chain. The payload parsed according to
// the VM rules is also passed in, since MessageManager does not assume any particular VM
SendMessage(signedMessage *warp.Message, parsedVmPayload []byte, destinationChainID ids.ID) error
SendMessage(signedMessage *warp.Message, warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) error
}

// NewMessageManager constructs a MessageManager for a particular message protocol, defined by the message protocol address and config
Expand Down
5 changes: 3 additions & 2 deletions messages/teleporter/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ func (m *messageManager) messageDelivered(

// SendMessage extracts the gasLimit and packs the call data to call the receiveCrossChainMessage method of the Teleporter contract,
// and dispatches transaction construction and broadcast to the destination client
func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayload []byte, destinationChainID ids.ID) error {
// TODO: Revisit the caching strategy so we can remove parsedVmPayload as a parameter
func (m *messageManager) SendMessage(signedMessage *warp.Message, warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) error {
teleporterMessage, ok := m.teleporterMessageCache.Get(signedMessage.ID())
if !ok {
m.logger.Debug(
Expand All @@ -215,7 +216,7 @@ func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayloa
zap.String("warpMessageID", signedMessage.ID().String()),
)
var err error
teleporterMessage, err = UnpackTeleporterMessage(parsedVmPayload)
teleporterMessage, err = UnpackTeleporterMessage(warpMessageInfo.WarpPayload)
if err != nil {
m.logger.Error(
"Failed unpacking teleporter message.",
Expand Down
21 changes: 6 additions & 15 deletions relayer/message_relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func newMessageRelayer(
}

func (r *messageRelayer) relayMessage(requestID uint32, messageManager messages.MessageManager) error {
// TODONOW: destinationChainID may be nil/empty, in which case it is an anycast message -> set by subscriber
// ShouldSendMessage makes decisions based ONLY on the message contents, not the relayer config.
// This means that it won't make any decisions about anycase messages based on the relayer config.
// SendMessage handles sending anycast messages to the correct destination chain, based on the relayer config
shouldSend, err := messageManager.ShouldSendMessage(r.warpMessageInfo, r.destinationChainID)
if err != nil {
r.logger.Error(
Expand Down Expand Up @@ -108,7 +112,8 @@ func (r *messageRelayer) relayMessage(requestID uint32, messageManager messages.
// create signed message latency (ms)
r.setCreateSignedMessageLatencyMS(float64(time.Since(startCreateSignedMessageTime).Milliseconds()))

err = messageManager.SendMessage(signedMessage, r.warpMessageInfo.WarpPayload, r.destinationChainID)
r.logger.Info("DEBUG: About to send message")
err = messageManager.SendMessage(signedMessage, r.warpMessageInfo, r.destinationChainID)
if err != nil {
r.logger.Error(
"Failed to send warp message",
Expand All @@ -119,7 +124,6 @@ func (r *messageRelayer) relayMessage(requestID uint32, messageManager messages.
}
r.logger.Info(
"Finished relaying message to destination chain",
zap.String("destinationChainID", r.destinationChainID.String()),
)
r.incSuccessfulRelayMessageCount()
return nil
Expand All @@ -129,7 +133,6 @@ func (r *messageRelayer) relayMessage(requestID uint32, messageManager messages.
func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, error) {
r.logger.Info(
"Starting relayer routine",
zap.String("destinationChainID", r.destinationChainID.String()),
)

sourceChainID := r.warpMessageInfo.WarpUnsignedMessage.SourceChainID
Expand Down Expand Up @@ -182,7 +185,6 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
if err != nil {
r.logger.Error(
"Failed to marshal request bytes",
zap.String("destinationChainID", r.destinationChainID.String()),
zap.Error(err),
)
return nil, err
Expand All @@ -207,7 +209,6 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
r.logger.Debug(
"Relayer collecting signatures from peers.",
zap.Int("attempt", attempt),
zap.String("destinationChainID", r.destinationChainID.String()),
zap.Int("validatorSetSize", len(validatorSet)),
zap.Int("signatureMapSize", len(signatureMap)),
zap.Int("responsesExpected", responsesExpected),
Expand Down Expand Up @@ -318,7 +319,6 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
if err != nil {
r.logger.Error(
"Failed to aggregate signature.",
zap.String("destinationChainID", r.destinationChainID.String()),
zap.Error(err),
)
return nil, err
Expand Down Expand Up @@ -347,7 +347,6 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
if signedMsg != nil {
r.logger.Info(
"Created signed message.",
zap.String("destinationChainID", r.destinationChainID.String()),
)
return signedMsg, nil
}
Expand All @@ -367,7 +366,6 @@ func (r *messageRelayer) createSignedMessage(requestID uint32) (*warp.Message, e
r.logger.Warn(
"Failed to collect a threshold of signatures",
zap.Int("attempts", maxRelayerQueryAttempts),
zap.String("destinationChainID", r.destinationChainID.String()),
)
return nil, errNotEnoughSignatures
}
Expand All @@ -383,7 +381,6 @@ func (r *messageRelayer) getCurrentCanonicalValidatorSet() ([]*warp.Validator, u
if err != nil {
r.logger.Error(
"failed to get validating subnet for destination chain",
zap.String("destinationChainID", r.destinationChainID.String()),
zap.Error(err),
)
return nil, 0, err
Expand Down Expand Up @@ -459,7 +456,6 @@ func (r *messageRelayer) isValidSignatureResponse(
r.logger.Debug(
"Response contained an empty signature",
zap.String("nodeID", response.NodeID().String()),
zap.String("destinationChainID", r.destinationChainID.String()),
)
return blsSignatureBuf{}, false
}
Expand All @@ -468,15 +464,13 @@ func (r *messageRelayer) isValidSignatureResponse(
if err != nil {
r.logger.Debug(
"Failed to create signature from response",
zap.String("destinationChainID", r.destinationChainID.String()),
)
return blsSignatureBuf{}, false
}

if !bls.Verify(pubKey, sig, r.warpMessageInfo.WarpUnsignedMessage.Bytes()) {
r.logger.Debug(
"Failed verification for signature",
zap.String("destinationChainID", r.destinationChainID.String()),
)
return blsSignatureBuf{}, false
}
Expand Down Expand Up @@ -518,15 +512,13 @@ func (r *messageRelayer) aggregateSignatures(signatureMap map[int]blsSignatureBu
func (r *messageRelayer) incSuccessfulRelayMessageCount() {
r.metrics.successfulRelayMessageCount.
WithLabelValues(
r.destinationChainID.String(),
r.relayer.sourceChainID.String(),
r.relayer.sourceSubnetID.String()).Inc()
}

func (r *messageRelayer) incFailedRelayMessageCount(failureReason string) {
r.metrics.failedRelayMessageCount.
WithLabelValues(
r.destinationChainID.String(),
r.relayer.sourceChainID.String(),
r.relayer.sourceSubnetID.String(),
failureReason).Inc()
Expand All @@ -535,7 +527,6 @@ func (r *messageRelayer) incFailedRelayMessageCount(failureReason string) {
func (r *messageRelayer) setCreateSignedMessageLatencyMS(latency float64) {
r.metrics.createSignedMessageLatencyMS.
WithLabelValues(
r.destinationChainID.String(),
r.relayer.sourceChainID.String(),
r.relayer.sourceSubnetID.String()).Set(latency)
}
6 changes: 3 additions & 3 deletions relayer/message_relayer_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewMessageRelayerMetrics(registerer prometheus.Registerer) *MessageRelayerM
Name: "successful_relay_message_count",
Help: "Number of messages that relayed successfully",
},
[]string{"destination_chain_id", "source_chain_id", "source_subnet_id"},
[]string{"source_chain_id", "source_subnet_id"},
)
registerer.MustRegister(successfulRelayMessageCount)

Expand All @@ -26,7 +26,7 @@ func NewMessageRelayerMetrics(registerer prometheus.Registerer) *MessageRelayerM
Name: "create_signed_message_latency_ms",
Help: "Latency of creating a signed message in milliseconds",
},
[]string{"destination_chain_id", "source_chain_id", "source_subnet_id"},
[]string{"source_chain_id", "source_subnet_id"},
)
registerer.MustRegister(createSignedMessageLatencyMS)

Expand All @@ -35,7 +35,7 @@ func NewMessageRelayerMetrics(registerer prometheus.Registerer) *MessageRelayerM
Name: "failed_relay_message_count",
Help: "Number of messages that failed to relay",
},
[]string{"destination_chain_id", "source_chain_id", "source_subnet_id", "failure_reason"},
[]string{"source_chain_id", "source_subnet_id", "failure_reason"},
)
registerer.MustRegister(failedRelayMessageCount)

Expand Down
2 changes: 1 addition & 1 deletion relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (r *Relayer) RelayMessage(warpMessageInfo *vmtypes.WarpMessageInfo, metrics
zap.String("warpMessageID", warpMessageInfo.WarpUnsignedMessage.ID().String()),
)

// Check that the warp message is from a support message protocol contract address.
// Check that the warp message is from a supported message protocol contract address.
messageManager, supportedMessageProtocol := r.messageManagers[warpMessageInfo.SourceAddress]
if !supportedMessageProtocol {
// Do not return an error here because it is expected for there to be messages from other contracts
Expand Down
Loading

0 comments on commit 40c568a

Please sign in to comment.