Skip to content
This repository has been archived by the owner on May 22, 2023. It is now read-only.

Commit

Permalink
Merge pull request #740 from keep-network/mmmutex
Browse files Browse the repository at this point in the history
Guard execution of on-chain firewall calls with a mutex

This PR is a continuation of firewall fixes started in #737.

Keeping as a draft until properly tested.

This change is effectively reducing firewall on-chain concurrency to one
call but this is exactly what we need. The loop going through keeps is
always executing checks in a sequence. When two on-chain calls were
executed at the same time, they were always a duplicate of the same
call. Mutex should not slow anything down and should in fact speed up
the loop. Instead of executing the same check from 8 concurrent threads
and being rate-limited, we will execute 8 different checks at more or
less the same time.
  • Loading branch information
nkuba authored Mar 18, 2021
2 parents 954af08 + 2e8910d commit 057c0e7
Showing 1 changed file with 50 additions and 9 deletions.
59 changes: 50 additions & 9 deletions pkg/firewall/firewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,17 @@ type stakeOrActiveKeepPolicy struct {
func (soakp *stakeOrActiveKeepPolicy) Validate(
remotePeerPublicKey *ecdsa.PublicKey,
) error {
remotePeerNetworkPublicKey := coreKey.NetworkPublic(*remotePeerPublicKey)
remotePeerAddress := coreKey.NetworkPubKeyToEthAddress(&remotePeerNetworkPublicKey)

logger.Debugf("validating firewall rules for [%v]", remotePeerAddress)

// Validate minimum stake policy. If the remote peer has the minimum stake,
// we are fine and we should let to connect.
if err := soakp.minimumStakePolicy.Validate(remotePeerPublicKey); err == nil {
return nil
}

remotePeerNetworkPublicKey := coreKey.NetworkPublic(*remotePeerPublicKey)
remotePeerAddress := coreKey.NetworkPubKeyToEthAddress(&remotePeerNetworkPublicKey)

// Check if the remote peer has authorization on the factory.
// The authorization cannot be revoked.
// If peer has no authorization on the factory it means it has never
Expand All @@ -98,6 +100,11 @@ func (soakp *stakeOrActiveKeepPolicy) Validate(
func (soakp *stakeOrActiveKeepPolicy) validateAuthorization(
remotePeerAddress string,
) error {
logger.Debugf(
"validating authorization for [%v]",
remotePeerAddress,
)

// Before hitting ETH client, consult the in-memory time cache.
// If the caching time for the given entry elapsed or if that entry is
// not in the cache, we'll have to consult the chain and execute a call
Expand Down Expand Up @@ -138,6 +145,10 @@ func (soakp *stakeOrActiveKeepPolicy) validateAuthorization(
func (soakp *stakeOrActiveKeepPolicy) validateActiveKeepMembership(
remotePeerAddress string,
) error {
logger.Debugf(
"validating active keep membership for [%v]",
remotePeerAddress,
)

// First, check in the in-memory time cache to minimize hits to ETH client.
// If the Keep client with the given chain address is in the active members
Expand Down Expand Up @@ -245,14 +256,21 @@ func (soakp *stakeOrActiveKeepPolicy) getKeepAtIndex(
return common.HexToAddress(cachedAddress), nil
}

cache.mutex.Lock()
defer cache.mutex.Unlock()

cachedAddress, isCached = cache.address[index.String()]
if isCached {
return common.HexToAddress(cachedAddress), nil
}

logger.Debugf("fetching keep at index [%v] from the chain", index)
address, err := soakp.chain.GetKeepAtIndex(index)
if err != nil {
return common.Address{}, err
}

cache.mutex.Lock()
cache.address[index.String()] = address.Hex()
cache.mutex.Unlock()

return address, nil
}
Expand Down Expand Up @@ -280,6 +298,21 @@ func (soakp *stakeOrActiveKeepPolicy) isKeepActive(
return true, nil
}

cache.mutex.Lock()
defer cache.mutex.Unlock()

isInactive, isCached = cache.isInactive[keepAddress.String()]
if isCached && isInactive {
return false, nil
}
if cache.isActive.Has(keepAddress.String()) {
return true, nil
}

logger.Debugf(
"checking if keep with ID [%v] is active on the chain",
keepAddress.String(),
)
isActive, err := soakp.chain.IsActive(keepAddress)
if err != nil {
return false, err
Expand All @@ -288,9 +321,7 @@ func (soakp *stakeOrActiveKeepPolicy) isKeepActive(
if isActive {
cache.isActive.Add(keepAddress.String())
} else {
cache.mutex.Lock()
cache.isInactive[keepAddress.String()] = true
cache.mutex.Unlock()
}

return isActive, nil
Expand All @@ -311,6 +342,18 @@ func (soakp *stakeOrActiveKeepPolicy) getKeepMembers(
return members, nil
}

cache.mutex.Lock()
defer cache.mutex.Unlock()

members, areCached = cache.members[keepAddress.String()]
if areCached {
return members, nil
}

logger.Debugf(
"getting members of the keep with ID [%v] from the chain",
keepAddress.String(),
)
memberAddresses, err := soakp.chain.GetMembers(keepAddress)
if err != nil {
return nil, nil
Expand All @@ -321,9 +364,7 @@ func (soakp *stakeOrActiveKeepPolicy) getKeepMembers(
members[i] = member.String()
}

cache.mutex.Lock()
cache.members[keepAddress.String()] = members
cache.mutex.Unlock()

return members, nil
}
Expand Down

0 comments on commit 057c0e7

Please sign in to comment.