Skip to content

Commit

Permalink
feat: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
vgorkavenko committed Feb 3, 2025
1 parent 684afa5 commit f7d620f
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 64 deletions.
159 changes: 114 additions & 45 deletions src/modules/csm/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from src.types import BlockRoot, BlockStamp, EpochNumber, SlotNumber, ValidatorIndex
from src.utils.blockstamp import build_blockstamp
from src.utils.range import sequence
from src.utils.slot import get_next_non_missed_slot, get_prev_non_missed_slot
from src.utils.slot import get_prev_non_missed_slot
from src.utils.timeit import timeit
from src.utils.types import hex_str_to_bytes
from src.utils.web3converter import Web3Converter
Expand All @@ -24,6 +24,7 @@


class MinStepIsNotReached(Exception): ...
class SlotOutOfRootsRange(Exception): ...


@dataclass
Expand Down Expand Up @@ -112,13 +113,25 @@ def _is_min_step_reached(self):
type AttestationCommittees = dict[tuple[Slot, CommitteeIndex], list[ValidatorDuty]]


class SyncCommitteesCache(dict):

max_size = variables.CSM_ORACLE_MAX_CONCURRENCY

def __setitem__(self, committee_network_index: int, value: SyncCommittee | None):
if len(self) >= self.max_size:
self.pop(min(self))
super().__setitem__(committee_network_index, value)


SYNC_COMMITTEE_CACHE = SyncCommitteesCache()


class FrameCheckpointProcessor:
cc: ConsensusClient
converter: Web3Converter

state: State
finalized_blockstamp: BlockStamp
current_sync_committee: SyncCommittee | None = None

def __init__(self, cc: ConsensusClient, state: State, converter: Web3Converter, finalized_blockstamp: BlockStamp):
self.cc = cc
Expand All @@ -136,10 +149,10 @@ def exec(self, checkpoint: FrameCheckpoint) -> int:
return 0
block_roots = self._get_block_roots(checkpoint.slot)
duty_epochs_roots = {
duty_epoch: self._select_block_roots(duty_epoch, block_roots, checkpoint.slot)
duty_epoch: self._select_block_roots(block_roots, duty_epoch, checkpoint.slot)
for duty_epoch in unprocessed_epochs
}
self._process(unprocessed_epochs, duty_epochs_roots)
self._process(block_roots, checkpoint.slot, unprocessed_epochs, duty_epochs_roots)
self.state.commit()
return len(unprocessed_epochs)

Expand All @@ -155,7 +168,7 @@ def _get_block_roots(self, checkpoint_slot: SlotNumber):
return [br[i] if i == pivot_index or br[i] != br[i - 1] else None for i in range(len(br))]

def _select_block_roots(
self, duty_epoch: EpochNumber, block_roots: list[BlockRoot | None], checkpoint_slot: SlotNumber
self, block_roots: list[BlockRoot | None], duty_epoch: EpochNumber, checkpoint_slot: SlotNumber
) -> tuple[list[SlotBlockRoot], list[SlotBlockRoot]]:
roots_to_check = []
# To check duties in the current epoch you need to
Expand All @@ -165,25 +178,38 @@ def _select_block_roots(
self.converter.get_epoch_last_slot(EpochNumber(duty_epoch + 1)),
)
for slot_to_check in slots:
# From spec
# https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#get_block_root_at_slot
if not slot_to_check < checkpoint_slot <= slot_to_check + SLOTS_PER_HISTORICAL_ROOT:
raise ValueError("Slot is out of the state block roots range")
roots_to_check.append((slot_to_check, block_roots[slot_to_check % SLOTS_PER_HISTORICAL_ROOT]))
block_root = self._select_block_root_by_slot(block_roots, checkpoint_slot, slot_to_check)
roots_to_check.append((slot_to_check, block_root))

duty_epoch_roots, next_epoch_roots = roots_to_check[:32], roots_to_check[32:]

return duty_epoch_roots, next_epoch_roots

@staticmethod
def _select_block_root_by_slot(block_roots: list[BlockRoot | None], checkpoint_slot: SlotNumber, root_slot: SlotNumber) -> BlockRoot | None:
# From spec
# https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/beacon-chain.md#get_block_root_at_slot
if not root_slot < checkpoint_slot <= root_slot + SLOTS_PER_HISTORICAL_ROOT:
raise SlotOutOfRootsRange("Slot is out of the state block roots range")
return block_roots[root_slot % SLOTS_PER_HISTORICAL_ROOT]

def _process(
self,
checkpoint_block_roots: list[BlockRoot | None],
checkpoint_slot: SlotNumber,
unprocessed_epochs: list[EpochNumber],
duty_epochs_roots: dict[EpochNumber, tuple[list[SlotBlockRoot], list[SlotBlockRoot]]]
):
executor = ThreadPoolExecutor(max_workers=variables.CSM_ORACLE_MAX_CONCURRENCY)
try:
futures = {
executor.submit(self._check_duties, duty_epoch, *duty_epochs_roots[duty_epoch])
executor.submit(
self._check_duties,
checkpoint_block_roots,
checkpoint_slot,
duty_epoch,
*duty_epochs_roots[duty_epoch]
)
for duty_epoch in unprocessed_epochs
}
for future in as_completed(futures):
Expand All @@ -199,22 +225,24 @@ def _process(
@timeit(lambda args, duration: logger.info({"msg": f"Epoch {args.duty_epoch} processed in {duration:.2f} seconds"}))
def _check_duties(
self,
checkpoint_block_roots: list[BlockRoot | None],
checkpoint_slot: SlotNumber,
duty_epoch: EpochNumber,
duty_epoch_roots: list[SlotBlockRoot],
next_epoch_roots: list[SlotBlockRoot],
):
logger.info({"msg": f"Processing epoch {duty_epoch}"})

att_committees = self._prepare_att_committees(EpochNumber(duty_epoch))
propose_duties = self._prepare_propose_duties(EpochNumber(duty_epoch), self.finalized_blockstamp)
propose_duties = self._prepare_propose_duties(EpochNumber(duty_epoch), checkpoint_block_roots, checkpoint_slot)
sync_committees = self._prepare_sync_committee(EpochNumber(duty_epoch), duty_epoch_roots)
for slot, root in [*duty_epoch_roots, *next_epoch_roots]:
missed_slot = root is None
if missed_slot:
continue
attestations, sync_aggregate = self.cc.get_block_attestations_and_sync(BlockRoot(root))
process_attestations(attestations, att_committees)
if root in duty_epoch_roots:
if (slot, root) in duty_epoch_roots:
propose_duties[slot].included = True
process_sync(slot, sync_aggregate, sync_committees)

Expand Down Expand Up @@ -269,60 +297,101 @@ def _prepare_att_committees(self, epoch: EpochNumber) -> AttestationCommittees:
)
)
def _prepare_sync_committee(
self, epoch: EpochNumber, block_roots: list[SlotBlockRoot]
self, epoch: EpochNumber, duty_block_roots: list[SlotBlockRoot]
) -> dict[SlotNumber, list[ValidatorDuty]]:
# TODO: should be under lock?
sync_committee_epochs = epoch % EPOCHS_PER_SYNC_COMMITTEE_PERIOD
# TODO: check real committee and from this func on border cases
if not self.current_sync_committee or sync_committee_epochs == 0:
# TODO: do better
epochs_range = EPOCHS_PER_SYNC_COMMITTEE_PERIOD - sync_committee_epochs
logger.info({"msg": f"Preparing Sync Committee for {epochs_range} epochs"})
first_slot_root, *_ = block_roots
slot, _ = first_slot_root
blockstamp = build_blockstamp(
get_next_non_missed_slot(
self.cc,
slot,
self.finalized_blockstamp.slot_number
)
)
# TODO: can we use lru cache here?
self.current_sync_committee = self.cc.get_sync_committee(blockstamp, epoch)

sync_committee = self._get_cached_sync_committee(epoch)

duties = {}
for slot, root in block_roots:
for slot, root in duty_block_roots:
missed_slot = root is None
if missed_slot:
continue
duties[slot] = [
ValidatorDuty(index=ValidatorIndex(int(validator)), included=False)
for validator in self.current_sync_committee.validators
for validator in sync_committee.validators
]

return duties

def _get_cached_sync_committee(self, epoch: EpochNumber) -> SyncCommittee:
sync_committee_index = epoch // EPOCHS_PER_SYNC_COMMITTEE_PERIOD
with lock:
sync_committee = SYNC_COMMITTEE_CACHE.get(sync_committee_index)
if not sync_committee:
epochs_before_new_sync_committee = epoch % EPOCHS_PER_SYNC_COMMITTEE_PERIOD
epochs_range = EPOCHS_PER_SYNC_COMMITTEE_PERIOD - epochs_before_new_sync_committee
logger.info({"msg": f"Preparing cached Sync Committee for {epochs_range} epochs from {epoch} epoch"})
state_blockstamp = build_blockstamp(
get_prev_non_missed_slot(
self.cc,
self.converter.get_epoch_first_slot(epoch),
self.finalized_blockstamp.slot_number
)
)
sync_committee = self.cc.get_sync_committee(state_blockstamp, epoch)
SYNC_COMMITTEE_CACHE[sync_committee_index] = sync_committee
return sync_committee

@timeit(
lambda args, duration: logger.info(
{"msg": f"Propose Duties for epoch {args.epoch} prepared in {duration:.2f} seconds"}
)
)
def _prepare_propose_duties(self, epoch: EpochNumber, blockstamp: BlockStamp) -> dict[SlotNumber, ValidatorDuty]:
def _prepare_propose_duties(
self,
epoch: EpochNumber,
checkpoint_block_roots: list[BlockRoot | None],
checkpoint_slot: SlotNumber
) -> dict[SlotNumber, ValidatorDuty]:
duties = {}
dependent_slot = self.converter.get_epoch_last_slot(EpochNumber(epoch - 1))
# TODO: can we just take root from the state?
dependent_non_missed_slot = SlotNumber(int(
get_prev_non_missed_slot(
self.cc,
dependent_slot,
blockstamp.slot_number
).message.slot)
)
for duty in self.cc.get_proposer_duties(epoch, dependent_non_missed_slot):
dependent_root = self._get_dependent_root_for_proposer_duties(epoch, checkpoint_block_roots, checkpoint_slot)
proposer_duties = self.cc.get_proposer_duties(epoch, dependent_root)
for duty in proposer_duties:
duties[SlotNumber(int(duty.slot))] = ValidatorDuty(
index=ValidatorIndex(int(duty.validator_index)), included=False
)
return duties

def _get_dependent_root_for_proposer_duties(
self,
epoch: EpochNumber,
checkpoint_block_roots: list[BlockRoot | None],
checkpoint_slot: SlotNumber
) -> BlockRoot:
dependent_root = None
dependent_slot = self.converter.get_epoch_last_slot(EpochNumber(epoch - 1))
try:
while not dependent_root:
dependent_root = self._select_block_root_by_slot(
checkpoint_block_roots, checkpoint_slot, dependent_slot
)
if dependent_root:
logger.debug(
{
"msg": f"Got dependent root from state block roots for epoch {epoch}. "
f"{dependent_slot=} {dependent_root=}"
}
)
break
dependent_slot -= 1
except SlotOutOfRootsRange:
dependent_non_missed_slot = SlotNumber(int(
get_prev_non_missed_slot(
self.cc,
dependent_slot,
self.finalized_blockstamp.slot_number
).message.slot)
)
dependent_root = self.cc.get_block_root(dependent_non_missed_slot).root
logger.debug(
{
"msg": f"Got dependent root from CL for epoch {epoch}. "
f"{dependent_non_missed_slot=} {dependent_root=}"
}
)
return dependent_root


def process_sync(slot: SlotNumber, sync_aggregate: SyncAggregate, committees: SyncCommittees) -> None:
committee = committees[slot]
Expand Down
33 changes: 20 additions & 13 deletions src/modules/csm/csm.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,10 @@ def _calculate_distribution_in_frame(
prop_network_perf = self.state.get_prop_network_aggr(frame).perf
sync_network_perf = self.state.get_sync_network_aggr(frame).perf

network_perf = 56/64 * att_network_perf + 8/64 * prop_network_perf + 2/64 * sync_network_perf
network_perf = 54/64 * att_network_perf + 8/64 * prop_network_perf + 2/64 * sync_network_perf

if network_perf > 1:
raise ValueError(f"Invalid network performance: {network_perf=}")

threshold = network_perf - self.w3.csm.oracle.perf_leeway_bp(blockstamp.block_hash) / TOTAL_BASIS_POINTS

Expand All @@ -283,9 +286,10 @@ def _calculate_distribution_in_frame(
log = FramePerfLog(blockstamp, frame, threshold)

for (_, no_id), validators in operators_to_validators.items():
# TODO: Do we need to check it later to have other data in logs?
log_operator = log.operators[no_id]

if no_id in stuck_operators:
log.operators[no_id].stuck = True
log_operator.stuck = True
continue

for v in validators:
Expand All @@ -296,39 +300,42 @@ def _calculate_distribution_in_frame(
if att_aggr is None:
# It's possible that the validator is not assigned to any duty, hence it's performance
# is not presented in the aggregates (e.g. exited, pending for activation etc).
# TODO: do we need to check sync_aggr to strike the validator?
# TODO: check `sync_aggr` to strike (in case of bad sync performance) after validator exit
continue

log_data = log.operators[no_id].validators[v.index]
log_validator = log_operator.validators[v.index]

if v.validator.slashed is True:
# It means that validator was active during the frame and got slashed and didn't meet the exit
# epoch, so we should not count such validator for operator's share.
log_data.slashed = True
log_validator.slashed = True
continue

performance = att_aggr.perf

if prop_aggr is not None and sync_aggr is not None:
performance = 56/64 * att_aggr.perf + 8/64 * prop_aggr.perf + 2/64 * sync_aggr.perf
performance = 54/64 * att_aggr.perf + 8/64 * prop_aggr.perf + 2/64 * sync_aggr.perf

if prop_aggr is not None and sync_aggr is None:
performance = 56/62 * att_aggr.perf + 8/62 * prop_aggr.perf
performance = 54/62 * att_aggr.perf + 8/62 * prop_aggr.perf

if prop_aggr is None and sync_aggr is not None:
performance = 54 / 56 * att_aggr.perf + 2 / 56 * sync_aggr.perf
performance = 54/56 * att_aggr.perf + 2/56 * sync_aggr.perf

if performance > 1:
raise ValueError(f"Invalid performance: {performance=}")

if performance > threshold:
# Count of assigned attestations used as a metrics of time
# the validator was active in the current frame.
distribution[no_id] += att_aggr.assigned

log_data.performance = performance
log_data.attestations = att_aggr
log_validator.performance = performance
log_validator.attestations = att_aggr
if prop_aggr is not None:
log_data.proposals = prop_aggr
log_validator.proposals = prop_aggr
if sync_aggr is not None:
log_data.sync_committee = sync_aggr
log_validator.sync_committee = sync_aggr

# Calculate share of each CSM node operator.
shares = defaultdict[NodeOperatorId, int](int)
Expand Down
12 changes: 7 additions & 5 deletions src/providers/consensus/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,18 @@ def get_sync_committee(self, blockstamp: BlockStamp, epoch: EpochNumber) -> Sync
return SyncCommittee.from_response(**data)

@list_of_dataclasses(ProposerDuties.from_response)
def get_proposer_duties(self, epoch: EpochNumber, dependent_non_missed_slot: SlotNumber) -> list[ProposerDuties]:
def get_proposer_duties(self, epoch: EpochNumber, expected_dependent_root: BlockRoot) -> list[ProposerDuties]:
"""Spec: https://ethereum.github.io/beacon-APIs/#/Validator/getProposerDuties"""
# It is recommended by spec to use the dependent root to ensure the epoch is correct
proposer_data, proposer_meta = self._get(self.API_GET_PROPOSER_DUTIES, path_params=(epoch,))
if not isinstance(proposer_data, list):
raise ValueError("Expected list response from getProposerDuties")
dependent_root = self.get_block_root(dependent_non_missed_slot).root
if proposer_meta['dependent_root'] != dependent_root:
# TODO: better logging
raise ValueError("Dependent root mismatch in proposer duties response")
response_dependent_root = proposer_meta['dependent_root']
if response_dependent_root != expected_dependent_root:
raise ValueError(
"Dependent root for proposer duties request mismatch: "
f"{response_dependent_root=} is not {expected_dependent_root=}. Probably, CL node is not fully synced"
)
return proposer_data

@lru_cache(maxsize=1)
Expand Down
2 changes: 1 addition & 1 deletion src/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
CSM_MODULE_ADDRESS: Final = os.getenv('CSM_MODULE_ADDRESS')
FINALIZATION_BATCH_MAX_REQUEST_COUNT: Final = int(os.getenv('FINALIZATION_BATCH_MAX_REQUEST_COUNT', 1000))
EL_REQUESTS_BATCH_SIZE: Final = int(os.getenv('EL_REQUESTS_BATCH_SIZE', 500))
CSM_ORACLE_MAX_CONCURRENCY: Final = int(os.getenv('CSM_ORACLE_MAX_CONCURRENCY', 2)) or None
CSM_ORACLE_MAX_CONCURRENCY: Final = min(32, (os.cpu_count() or 1) + 4, int(os.getenv('CSM_ORACLE_MAX_CONCURRENCY', 2)))

# We add some gas to the transaction to be sure that we have enough gas to execute corner cases
# eg when we tried to submit a few reports in a single block
Expand Down

0 comments on commit f7d620f

Please sign in to comment.