Skip to content

Commit

Permalink
use participation flags to score attestations for blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
tersec committed Feb 24, 2024
1 parent d09bf3b commit 7ada74b
Showing 1 changed file with 140 additions and 43 deletions.
183 changes: 140 additions & 43 deletions beacon_chain/consensus_object_pools/attestation_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@
import
# Status libraries
metrics,
chronicles, stew/byteutils,
chronicles,
stew/bitseqs as bs, # ssz_serialization defines BitSeqs too in bitseqs module
stew/byteutils,
# Internal
../spec/[
beaconstate, eth2_merkleization, forks, state_transition_epoch, validator],
../spec/[eth2_merkleization, forks],
"."/[spec_cache, blockchain_dag, block_quarantine],
../fork_choice/fork_choice,
../beacon_clock

from std/sequtils import keepItIf, maxIndex
from std/sequtils import keepItIf, mapIt, maxIndex
from stew/bitseqs import combine
from ../spec/beaconstate import check_attestation, dependent_root
from ../spec/state_transition_epoch import compute_unrealized_finality
from ../spec/validator import
get_beacon_committee, get_committee_count_per_slot, get_committee_indices

export blockchain_dag, fork_choice

Expand All @@ -32,19 +38,22 @@ const
type
OnAttestationCallback = proc(data: Attestation) {.gcsafe, raises: [].}

CommitteeParticipationFlags = array[3, bs.BitSeq]

Validation = object
## Validations collect a set of signatures for a distict attestation - in
## eth2, a single bit is used to keep track of which signatures have been
## added to the aggregate meaning that only non-overlapping aggregates may
## be further combined.
aggregation_bits: CommitteeValidatorsBits
committee_participation_flags: CommitteeParticipationFlags
aggregate_signature: AggregateSignature

AttestationEntry = object
## Each entry holds the known signatures for a particular, distinct vote
data: AttestationData
committee_len: int
singles: Table[int, CookedSig] ## \
singles: Table[int, (CommitteeParticipationFlags, CookedSig)] ## \
## On the attestation subnets, only attestations with a single vote are
## allowed - these can be collected separately to top up aggregates with -
## here we collect them by mapping index in committee to a vote
Expand Down Expand Up @@ -239,18 +248,24 @@ func updateAggregates(entry: var AttestationEntry) =
if entry.aggregates.len() == 0:
# If there are singles, we can create an aggregate from them that will
# represent our best knowledge about the current votes
for index_in_committee, signature in entry.singles:
for index_in_committee, foo in entry.singles:
let (flags, signature) = foo
if entry.aggregates.len() == 0:
# Create aggregate on first iteration..
let empty_committee_bits = bs.BitSeq.init(entry.committee_len)
entry.aggregates.add(
Validation(
aggregation_bits: CommitteeValidatorsBits.init(entry.committee_len),
committee_participation_flags:
[empty_committee_bits, empty_committee_bits,
empty_committee_bits],
aggregate_signature: AggregateSignature.init(signature)
))
else:
entry.aggregates[0].aggregate_signature.aggregate(signature)

entry.aggregates[0].aggregation_bits.setBit(index_in_committee)
entry.aggregates[0].committee_participation_flags = flags
else:
# There already exist aggregates - we'll try to top them up by adding
# singles to them - for example, it may happen that we're being asked to
Expand All @@ -259,10 +274,14 @@ func updateAggregates(entry: var AttestationEntry) =
# In theory, also aggregates could be combined but finding the best
# combination is hard, so we'll pragmatically use singles only here
var updated = false
for index_in_committee, signature in entry.singles:
for index_in_committee, foo in entry.singles:
let (flags, signature) = foo
for v in entry.aggregates.mitems():
if not v.aggregation_bits[index_in_committee]:
v.aggregation_bits.setBit(index_in_committee)
combine(v.committee_participation_flags[0], flags[0])
combine(v.committee_participation_flags[1], flags[1])
combine(v.committee_participation_flags[2], flags[2])
v.aggregate_signature.aggregate(signature)
updated = true

Expand Down Expand Up @@ -294,12 +313,12 @@ func covers(entry: AttestationEntry, bits: CommitteeValidatorsBits): bool =

proc addAttestation(entry: var AttestationEntry,
attestation: Attestation,
committee_participation_flags: CommitteeParticipationFlags,
signature: CookedSig): bool =
logScope:
attestation = shortLog(attestation)

let
singleIndex = oneIndex(attestation.aggregation_bits)
let singleIndex = oneIndex(attestation.aggregation_bits)

if singleIndex.isSome():
if singleIndex.get() in entry.singles:
Expand All @@ -313,7 +332,7 @@ proc addAttestation(entry: var AttestationEntry,
singles = entry.singles.len(),
aggregates = entry.aggregates.len()

entry.singles[singleIndex.get()] = signature
entry.singles[singleIndex.get()] = (committee_participation_flags, signature)
else:
# More than one vote in this attestation
if entry.covers(attestation.aggregation_bits):
Expand All @@ -326,6 +345,7 @@ proc addAttestation(entry: var AttestationEntry,

entry.aggregates.add(Validation(
aggregation_bits: attestation.aggregation_bits,
committee_participation_flags: committee_participation_flags,
aggregate_signature: AggregateSignature.init(signature)))

debug "Aggregate resolved",
Expand Down Expand Up @@ -358,13 +378,53 @@ proc addAttestation*(pool: var AttestationPool,
startingSlot = pool.startingSlot
return

let attestation_data_root = hash_tree_root(attestation.data)
let
attestation_data_root = hash_tree_root(attestation.data)
empty_committee_bits =
bs.BitSeq.init(attestation.aggregation_bits.len)
current_epoch = pool.dag.headState.get_current_epoch

template get_epoch_participation_flags(i: ValidatorIndex): auto =
withState(pool.dag.headState):
when consensusFork == ConsensusFork.Phase0:
# This is irrelevant when phase0, because the loop will just
# fill in from is_present properly; there's no actual
# participation flag information in phase0
7.ParticipationFlags
else:
if attestation.data.target.epoch == current_epoch:
forkyState.data.current_epoch_participation[i]
else:
forkyState.data.previous_epoch_participation[i]

var cmf = [
empty_committee_bits, empty_committee_bits, empty_committee_bits]
var attesting_index_count = 0

# attesting_indices will line up with attestation.aggregation_bits by
# construction via get_beacon_committee/get_attesting_indices
for (idx_in_committee, is_present) in attestation.aggregation_bits.pairs:
if attesting_index_count >= attesting_indices.len():
#doAssert false
#debugEcho "FOO3: ", attesting_indices, " ", attesting_index_count, " ", $attestation.aggregation_bits
break
if not is_present:
continue
let validator_index = attesting_indices[attesting_index_count]
let flags = get_epoch_participation_flags(validator_index)
if has_flag(flags, TIMELY_SOURCE_FLAG_INDEX):
cmf[TIMELY_SOURCE_FLAG_INDEX.int].setBit(idx_in_committee)
if has_flag(flags, TIMELY_TARGET_FLAG_INDEX):
cmf[TIMELY_TARGET_FLAG_INDEX.int].setBit(idx_in_committee)
if has_flag(flags, TIMELY_HEAD_FLAG_INDEX):
cmf[TIMELY_HEAD_FLAG_INDEX.int].setBit(idx_in_committee)
inc attesting_index_count

# TODO withValue is an abomination but hard to use anything else too without
# creating an unnecessary AttestationEntry on the hot path and avoiding
# multiple lookups
pool.candidates[candidateIdx.get()].withValue(attestation_data_root, entry) do:
if not addAttestation(entry[], attestation, signature):
if not addAttestation(entry[], attestation, cmf, signature):
return
do:
if not addAttestation(
Expand All @@ -373,7 +433,7 @@ proc addAttestation*(pool: var AttestationPool,
AttestationEntry(
data: attestation.data,
committee_len: attestation.aggregation_bits.len())),
attestation, signature):
attestation, cmf, signature):
return

pool.addForkChoiceVotes(
Expand Down Expand Up @@ -438,7 +498,8 @@ iterator attestations*(pool: AttestationPool, slot: Opt[Slot],
aggregation_bits: CommitteeValidatorsBits.init(entry.committee_len),
data: entry.data)

for index, signature in entry.singles:
for index, foo in entry.singles:
let (participation_flags, signature) = foo
singleAttestation.aggregation_bits.setBit(index)
singleAttestation.signature = signature.toValidatorSig()
yield singleAttestation
Expand All @@ -449,7 +510,7 @@ iterator attestations*(pool: AttestationPool, slot: Opt[Slot],

type
AttestationCacheKey = (Slot, uint64)
AttestationCache = Table[AttestationCacheKey, CommitteeValidatorsBits] ##\
AttestationCache = Table[AttestationCacheKey, CommitteeParticipationFlags] ##\
## Cache for quick lookup during beacon block construction of attestations
## which have already been included, and therefore should be skipped.

Expand All @@ -460,25 +521,40 @@ func getAttestationCacheKey(ad: AttestationData): AttestationCacheKey =

func add(
attCache: var AttestationCache, data: AttestationData,
aggregation_bits: CommitteeValidatorsBits) =
committee_participation_flags: CommitteeParticipationFlags) =
let key = data.getAttestationCacheKey()
attCache.withValue(key, v) do:
v[].incl(aggregation_bits)
doAssert committee_participation_flags.len() == v[].len(),
"committee participation flag length constructed to be consistent"

combine(v[0], committee_participation_flags[0])
combine(v[1], committee_participation_flags[1])
combine(v[2], committee_participation_flags[2])
do:
attCache[key] = aggregation_bits
attCache[key] = committee_participation_flags

func init(
T: type AttestationCache, state: phase0.HashedBeaconState, _: StateCache):
T =
# Load attestations that are scheduled for being given rewards for
# As long as it's consistent, it's not particularly important what the
# mapping from validation bits to more granular participation flags is
template allParticipation(a: untyped): CommitteeParticipationFlags =
# pretend all timely flags were hit
var committee_bits = bs.BitSeq.init(a.len)
for idx, is_present in a:
if is_present:
committee_bits.setBit(idx)
[committee_bits, committee_bits, committee_bits]

for i in 0..<state.data.previous_epoch_attestations.len():
result.add(
state.data.previous_epoch_attestations[i].data,
state.data.previous_epoch_attestations[i].aggregation_bits)
state.data.previous_epoch_attestations[i].aggregation_bits.allParticipation)
for i in 0..<state.data.current_epoch_attestations.len():
result.add(
state.data.current_epoch_attestations[i].data,
state.data.current_epoch_attestations[i].aggregation_bits)
state.data.current_epoch_attestations[i].aggregation_bits.allParticipation)

func init(
T: type AttestationCache,
Expand All @@ -497,42 +573,58 @@ func init(
state.data, epoch, cache)
for committee_index in get_committee_indices(committees_per_slot):
for slot in epoch.slots():
let committee = get_beacon_committee(
let
committee = get_beacon_committee(
state.data, slot, committee_index, cache)
var
validator_bits = CommitteeValidatorsBits.init(committee.len)
empty_committee_bits = bs.BitSeq.init(committee.len)
var committee_participation_flags =
[empty_committee_bits, empty_committee_bits, empty_committee_bits]
for index_in_committee, validator_index in committee:
if participation_bitmap[validator_index] != 0:
# If any flag got set, there was an attestation from this validator.
validator_bits[index_in_committee] = true
result[(slot, committee_index.uint64)] = validator_bits
let pf = participation_bitmap[validator_index]
for i in 0 ..< 3:
if has_flag(pf, i.TimelyFlag):
committee_participation_flags[i].setBit(index_in_committee)
result[(slot, committee_index.uint64)] = committee_participation_flags

# This treats all types of rewards as equivalent, which isn't ideal
update_attestation_pool_cache(
prev_epoch, state.data.previous_epoch_participation)
update_attestation_pool_cache(
cur_epoch, state.data.current_epoch_participation)

# from ssz_serialization
func countOnes(x: bs.BitSeq): int =
# Count the number of set bits
var res = 0
for w in words(x):
res += w.countOnes()
res

func countOverlap(a, b: bs.BitSeq): int =
var res = 0
for wa, wb in words(a, b):
res += countOnes(wa and wb)
res

func score(
attCache: var AttestationCache, data: AttestationData,
aggregation_bits: CommitteeValidatorsBits): int =
# The score of an attestation is loosely based on how many new votes it brings
# to the state - a more accurate score function would also look at inclusion
# distance and effective balance.
committee_participation_flags: CommitteeParticipationFlags): int =
# A more accurate score function would also look at effective balance, but
# nearly every active validator tends to have 32 ETH effective balance
# TODO cache not var, but `withValue` requires it
let
key = data.getAttestationCacheKey()
bitsScore = aggregation_bits.countOnes()
let foo = mapIt(committee_participation_flags, it.countOnes)

attCache.withValue(key, value):
doAssert aggregation_bits.len() == value[].len(),
attCache.withValue(data.getAttestationCacheKey(), value):
doAssert committee_participation_flags[0].len() == value[][0].len(),
"check_attestation ensures committee length"

# How many votes were in the attestation minues the votes that are the same
return bitsScore - aggregation_bits.countOverlap(value[])
# Marginal weights. There's a tradeoff in data representation efficiency
# for construction in attestation cache initialization and usage here in
# both Nim idiomatic code and more customized/lower-level constructions.
let bar = mapIt(0 ..< 3, foo[it] - countOverlap(committee_participation_flags[it], value[][it]))
return bar[0] * 14 + bar[1] * 26 + bar[2] * 14

# Not found in cache - fresh vote meaning all attestations count
bitsScore
countOnes(committee_participation_flags[0]) * 14 + countOnes(committee_participation_flags[1]) * 26 + countOnes(committee_participation_flags[2]) * 14

proc check_attestation_compatible*(
dag: ChainDAGRef,
Expand Down Expand Up @@ -606,8 +698,9 @@ proc getAttestationsForBlock*(pool: var AttestationPool,
state.data, attestation, {skipBlsValidation}, cache).isOk():
continue

# in principle, depends on effective balances, but nearly all are 32 ETH
let score = attCache.score(
entry.data, entry.aggregates[j].aggregation_bits)
entry.data, entry.aggregates[j].committee_participation_flags)
if score == 0:
# 0 score means the attestation would not bring any votes - discard
# it early
Expand Down Expand Up @@ -666,7 +759,8 @@ proc getAttestationsForBlock*(pool: var AttestationPool,

# Update cache so that the new votes are taken into account when updating
# the score below
attCache.add(entry[].data, entry[].aggregates[j].aggregation_bits)
attCache.add(
entry[].data, entry[].aggregates[j].committee_participation_flags)

entry[].data.getAttestationCacheKey

Expand All @@ -680,7 +774,7 @@ proc getAttestationsForBlock*(pool: var AttestationPool,

it.score = attCache.score(
it.entry[].data,
it.entry[].aggregates[it.validation].aggregation_bits)
it.entry[].aggregates[it.validation].committee_participation_flags)

candidates.keepItIf:
# Only keep candidates that might add coverage
Expand All @@ -704,6 +798,9 @@ proc getAttestationsForBlock*(pool: var AttestationPool,

func bestValidation(aggregates: openArray[Validation]): (int, int) =
# Look for best validation based on number of votes in the aggregate
# This is not really optimal after phase 0, but also is not directly
# rewarded, so lower priority to change, along with its only caller,
# getAggregatedAttestation().
doAssert aggregates.len() > 0,
"updateAggregates should have created at least one aggregate"
var
Expand Down

0 comments on commit 7ada74b

Please sign in to comment.