Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use participation flags to score attestations for blocks #5468

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 140 additions & 43 deletions beacon_chain/consensus_object_pools/attestation_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@
import
# Status libraries
metrics,
chronicles, stew/byteutils,
chronicles,
stew/bitseqs as bs, # ssz_serialization defines BitSeqs too in bitseqs module
stew/byteutils,
# Internal
../spec/[
beaconstate, eth2_merkleization, forks, state_transition_epoch, validator],
../spec/[eth2_merkleization, forks],
"."/[spec_cache, blockchain_dag, block_quarantine],
../fork_choice/fork_choice,
../beacon_clock

from std/sequtils import keepItIf, maxIndex
from std/sequtils import keepItIf, mapIt, maxIndex
from stew/bitseqs import combine
from ../spec/beaconstate import check_attestation, dependent_root
from ../spec/state_transition_epoch import compute_unrealized_finality
from ../spec/validator import
get_beacon_committee, get_committee_count_per_slot, get_committee_indices

export blockchain_dag, fork_choice

Expand All @@ -32,19 +38,22 @@ const
type
OnAttestationCallback = proc(data: Attestation) {.gcsafe, raises: [].}

CommitteeParticipationFlags = array[3, bs.BitSeq]

Validation = object
## Validations collect a set of signatures for a distict attestation - in
## eth2, a single bit is used to keep track of which signatures have been
## added to the aggregate meaning that only non-overlapping aggregates may
## be further combined.
aggregation_bits: CommitteeValidatorsBits
committee_participation_flags: CommitteeParticipationFlags
aggregate_signature: AggregateSignature

AttestationEntry = object
## Each entry holds the known signatures for a particular, distinct vote
data: AttestationData
committee_len: int
singles: Table[int, CookedSig] ## \
singles: Table[int, (CommitteeParticipationFlags, CookedSig)] ## \
## On the attestation subnets, only attestations with a single vote are
## allowed - these can be collected separately to top up aggregates with -
## here we collect them by mapping index in committee to a vote
Expand Down Expand Up @@ -239,18 +248,24 @@ func updateAggregates(entry: var AttestationEntry) =
if entry.aggregates.len() == 0:
# If there are singles, we can create an aggregate from them that will
# represent our best knowledge about the current votes
for index_in_committee, signature in entry.singles:
for index_in_committee, foo in entry.singles:
let (flags, signature) = foo
if entry.aggregates.len() == 0:
# Create aggregate on first iteration..
let empty_committee_bits = bs.BitSeq.init(entry.committee_len)
entry.aggregates.add(
Validation(
aggregation_bits: CommitteeValidatorsBits.init(entry.committee_len),
committee_participation_flags:
[empty_committee_bits, empty_committee_bits,
empty_committee_bits],
aggregate_signature: AggregateSignature.init(signature)
))
else:
entry.aggregates[0].aggregate_signature.aggregate(signature)

entry.aggregates[0].aggregation_bits.setBit(index_in_committee)
entry.aggregates[0].committee_participation_flags = flags
else:
# There already exist aggregates - we'll try to top them up by adding
# singles to them - for example, it may happen that we're being asked to
Expand All @@ -259,10 +274,14 @@ func updateAggregates(entry: var AttestationEntry) =
# In theory, also aggregates could be combined but finding the best
# combination is hard, so we'll pragmatically use singles only here
var updated = false
for index_in_committee, signature in entry.singles:
for index_in_committee, foo in entry.singles:
let (flags, signature) = foo
for v in entry.aggregates.mitems():
if not v.aggregation_bits[index_in_committee]:
v.aggregation_bits.setBit(index_in_committee)
combine(v.committee_participation_flags[0], flags[0])
combine(v.committee_participation_flags[1], flags[1])
combine(v.committee_participation_flags[2], flags[2])
v.aggregate_signature.aggregate(signature)
updated = true

Expand Down Expand Up @@ -294,12 +313,12 @@ func covers(entry: AttestationEntry, bits: CommitteeValidatorsBits): bool =

proc addAttestation(entry: var AttestationEntry,
attestation: Attestation,
committee_participation_flags: CommitteeParticipationFlags,
signature: CookedSig): bool =
logScope:
attestation = shortLog(attestation)

let
singleIndex = oneIndex(attestation.aggregation_bits)
let singleIndex = oneIndex(attestation.aggregation_bits)

if singleIndex.isSome():
if singleIndex.get() in entry.singles:
Expand All @@ -313,7 +332,7 @@ proc addAttestation(entry: var AttestationEntry,
singles = entry.singles.len(),
aggregates = entry.aggregates.len()

entry.singles[singleIndex.get()] = signature
entry.singles[singleIndex.get()] = (committee_participation_flags, signature)
else:
# More than one vote in this attestation
if entry.covers(attestation.aggregation_bits):
Expand All @@ -326,6 +345,7 @@ proc addAttestation(entry: var AttestationEntry,

entry.aggregates.add(Validation(
aggregation_bits: attestation.aggregation_bits,
committee_participation_flags: committee_participation_flags,
aggregate_signature: AggregateSignature.init(signature)))

debug "Aggregate resolved",
Expand Down Expand Up @@ -358,13 +378,53 @@ proc addAttestation*(pool: var AttestationPool,
startingSlot = pool.startingSlot
return

let attestation_data_root = hash_tree_root(attestation.data)
let
attestation_data_root = hash_tree_root(attestation.data)
empty_committee_bits =
bs.BitSeq.init(attestation.aggregation_bits.len)
current_epoch = pool.dag.headState.get_current_epoch

template get_epoch_participation_flags(i: ValidatorIndex): auto =
withState(pool.dag.headState):
when consensusFork == ConsensusFork.Phase0:
# This is irrelevant when phase0, because the loop will just
# fill in from is_present properly; there's no actual
# participation flag information in phase0
7.ParticipationFlags
else:
if attestation.data.target.epoch == current_epoch:
forkyState.data.current_epoch_participation[i]
else:
forkyState.data.previous_epoch_participation[i]

var cmf = [
empty_committee_bits, empty_committee_bits, empty_committee_bits]
var attesting_index_count = 0

# attesting_indices will line up with attestation.aggregation_bits by
# construction via get_beacon_committee/get_attesting_indices
for (idx_in_committee, is_present) in attestation.aggregation_bits.pairs:
if attesting_index_count >= attesting_indices.len():
#doAssert false
#debugEcho "FOO3: ", attesting_indices, " ", attesting_index_count, " ", $attestation.aggregation_bits
break
if not is_present:
continue
let validator_index = attesting_indices[attesting_index_count]
let flags = get_epoch_participation_flags(validator_index)
if has_flag(flags, TIMELY_SOURCE_FLAG_INDEX):
cmf[TIMELY_SOURCE_FLAG_INDEX.int].setBit(idx_in_committee)
if has_flag(flags, TIMELY_TARGET_FLAG_INDEX):
cmf[TIMELY_TARGET_FLAG_INDEX.int].setBit(idx_in_committee)
if has_flag(flags, TIMELY_HEAD_FLAG_INDEX):
cmf[TIMELY_HEAD_FLAG_INDEX.int].setBit(idx_in_committee)
inc attesting_index_count

# TODO withValue is an abomination but hard to use anything else too without
# creating an unnecessary AttestationEntry on the hot path and avoiding
# multiple lookups
pool.candidates[candidateIdx.get()].withValue(attestation_data_root, entry) do:
if not addAttestation(entry[], attestation, signature):
if not addAttestation(entry[], attestation, cmf, signature):
return
do:
if not addAttestation(
Expand All @@ -373,7 +433,7 @@ proc addAttestation*(pool: var AttestationPool,
AttestationEntry(
data: attestation.data,
committee_len: attestation.aggregation_bits.len())),
attestation, signature):
attestation, cmf, signature):
return

pool.addForkChoiceVotes(
Expand Down Expand Up @@ -438,7 +498,8 @@ iterator attestations*(pool: AttestationPool, slot: Opt[Slot],
aggregation_bits: CommitteeValidatorsBits.init(entry.committee_len),
data: entry.data)

for index, signature in entry.singles:
for index, foo in entry.singles:
let (participation_flags, signature) = foo
singleAttestation.aggregation_bits.setBit(index)
singleAttestation.signature = signature.toValidatorSig()
yield singleAttestation
Expand All @@ -449,7 +510,7 @@ iterator attestations*(pool: AttestationPool, slot: Opt[Slot],

type
AttestationCacheKey = (Slot, uint64)
AttestationCache = Table[AttestationCacheKey, CommitteeValidatorsBits] ##\
AttestationCache = Table[AttestationCacheKey, CommitteeParticipationFlags] ##\
## Cache for quick lookup during beacon block construction of attestations
## which have already been included, and therefore should be skipped.

Expand All @@ -460,25 +521,40 @@ func getAttestationCacheKey(ad: AttestationData): AttestationCacheKey =

func add(
attCache: var AttestationCache, data: AttestationData,
aggregation_bits: CommitteeValidatorsBits) =
committee_participation_flags: CommitteeParticipationFlags) =
let key = data.getAttestationCacheKey()
attCache.withValue(key, v) do:
v[].incl(aggregation_bits)
doAssert committee_participation_flags.len() == v[].len(),
"committee participation flag length constructed to be consistent"

combine(v[0], committee_participation_flags[0])
combine(v[1], committee_participation_flags[1])
combine(v[2], committee_participation_flags[2])
do:
attCache[key] = aggregation_bits
attCache[key] = committee_participation_flags

func init(
T: type AttestationCache, state: phase0.HashedBeaconState, _: StateCache):
T =
# Load attestations that are scheduled for being given rewards for
# As long as it's consistent, it's not particularly important what the
# mapping from validation bits to more granular participation flags is
template allParticipation(a: untyped): CommitteeParticipationFlags =
# pretend all timely flags were hit
var committee_bits = bs.BitSeq.init(a.len)
for idx, is_present in a:
if is_present:
committee_bits.setBit(idx)
[committee_bits, committee_bits, committee_bits]

for i in 0..<state.data.previous_epoch_attestations.len():
result.add(
state.data.previous_epoch_attestations[i].data,
state.data.previous_epoch_attestations[i].aggregation_bits)
state.data.previous_epoch_attestations[i].aggregation_bits.allParticipation)
for i in 0..<state.data.current_epoch_attestations.len():
result.add(
state.data.current_epoch_attestations[i].data,
state.data.current_epoch_attestations[i].aggregation_bits)
state.data.current_epoch_attestations[i].aggregation_bits.allParticipation)

func init(
T: type AttestationCache,
Expand All @@ -497,42 +573,58 @@ func init(
state.data, epoch, cache)
for committee_index in get_committee_indices(committees_per_slot):
for slot in epoch.slots():
let committee = get_beacon_committee(
let
committee = get_beacon_committee(
state.data, slot, committee_index, cache)
var
validator_bits = CommitteeValidatorsBits.init(committee.len)
empty_committee_bits = bs.BitSeq.init(committee.len)
var committee_participation_flags =
[empty_committee_bits, empty_committee_bits, empty_committee_bits]
for index_in_committee, validator_index in committee:
if participation_bitmap[validator_index] != 0:
# If any flag got set, there was an attestation from this validator.
validator_bits[index_in_committee] = true
result[(slot, committee_index.uint64)] = validator_bits
let pf = participation_bitmap[validator_index]
for i in 0 ..< 3:
if has_flag(pf, i.TimelyFlag):
committee_participation_flags[i].setBit(index_in_committee)
result[(slot, committee_index.uint64)] = committee_participation_flags

# This treats all types of rewards as equivalent, which isn't ideal
update_attestation_pool_cache(
prev_epoch, state.data.previous_epoch_participation)
update_attestation_pool_cache(
cur_epoch, state.data.current_epoch_participation)

# from ssz_serialization
func countOnes(x: bs.BitSeq): int =
# Count the number of set bits
var res = 0
for w in words(x):
res += w.countOnes()
res

func countOverlap(a, b: bs.BitSeq): int =
var res = 0
for wa, wb in words(a, b):
res += countOnes(wa and wb)
res

func score(
attCache: var AttestationCache, data: AttestationData,
aggregation_bits: CommitteeValidatorsBits): int =
# The score of an attestation is loosely based on how many new votes it brings
# to the state - a more accurate score function would also look at inclusion
# distance and effective balance.
committee_participation_flags: CommitteeParticipationFlags): int =
# A more accurate score function would also look at effective balance, but
# nearly every active validator tends to have 32 ETH effective balance
# TODO cache not var, but `withValue` requires it
let
key = data.getAttestationCacheKey()
bitsScore = aggregation_bits.countOnes()
let foo = mapIt(committee_participation_flags, it.countOnes)

attCache.withValue(key, value):
doAssert aggregation_bits.len() == value[].len(),
attCache.withValue(data.getAttestationCacheKey(), value):
doAssert committee_participation_flags[0].len() == value[][0].len(),
"check_attestation ensures committee length"

# How many votes were in the attestation minues the votes that are the same
return bitsScore - aggregation_bits.countOverlap(value[])
# Marginal weights. There's a tradeoff in data representation efficiency
# for construction in attestation cache initialization and usage here in
# both Nim idiomatic code and more customized/lower-level constructions.
let bar = mapIt(0 ..< 3, foo[it] - countOverlap(committee_participation_flags[it], value[][it]))
return bar[0] * 14 + bar[1] * 26 + bar[2] * 14

# Not found in cache - fresh vote meaning all attestations count
bitsScore
countOnes(committee_participation_flags[0]) * 14 + countOnes(committee_participation_flags[1]) * 26 + countOnes(committee_participation_flags[2]) * 14

proc check_attestation_compatible*(
dag: ChainDAGRef,
Expand Down Expand Up @@ -606,8 +698,9 @@ proc getAttestationsForBlock*(pool: var AttestationPool,
state.data, attestation, {skipBlsValidation}, cache).isOk():
continue

# in principle, depends on effective balances, but nearly all are 32 ETH
let score = attCache.score(
entry.data, entry.aggregates[j].aggregation_bits)
entry.data, entry.aggregates[j].committee_participation_flags)
if score == 0:
# 0 score means the attestation would not bring any votes - discard
# it early
Expand Down Expand Up @@ -666,7 +759,8 @@ proc getAttestationsForBlock*(pool: var AttestationPool,

# Update cache so that the new votes are taken into account when updating
# the score below
attCache.add(entry[].data, entry[].aggregates[j].aggregation_bits)
attCache.add(
entry[].data, entry[].aggregates[j].committee_participation_flags)

entry[].data.getAttestationCacheKey

Expand All @@ -680,7 +774,7 @@ proc getAttestationsForBlock*(pool: var AttestationPool,

it.score = attCache.score(
it.entry[].data,
it.entry[].aggregates[it.validation].aggregation_bits)
it.entry[].aggregates[it.validation].committee_participation_flags)

candidates.keepItIf:
# Only keep candidates that might add coverage
Expand All @@ -704,6 +798,9 @@ proc getAttestationsForBlock*(pool: var AttestationPool,

func bestValidation(aggregates: openArray[Validation]): (int, int) =
# Look for best validation based on number of votes in the aggregate
# This is not really optimal after phase 0, but also is not directly
# rewarded, so lower priority to change, along with its only caller,
# getAggregatedAttestation().
doAssert aggregates.len() > 0,
"updateAggregates should have created at least one aggregate"
var
Expand Down
Loading