Skip to content

Commit

Permalink
Optimize BlockSTM memory usage for delayed fields (aptos-labs#12580)
Browse files Browse the repository at this point in the history
This reduces memory footprint of amortized usage of delayed field with a single version from 3000 bytes to 600 bytes per delayed field.
Basically BTreeMap stores things in vectors of 5, and size of the value is large. This invalidates BTreeMap's cache optimization of storing them all together.
  • Loading branch information
igor-aptos authored Mar 19, 2024
1 parent bf9a0d5 commit bba6feb
Showing 1 changed file with 29 additions and 23 deletions.
52 changes: 29 additions & 23 deletions aptos-move/mvhashmap/src/versioned_delayed_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::{
fmt::Debug,
hash::Hash,
iter::DoubleEndedIterator,
ops::Deref,
sync::atomic::Ordering,
};

Expand Down Expand Up @@ -58,7 +59,7 @@ enum VersionEntry<K: Clone> {
// that update a given aggregator, alongside the corresponding entries.
#[derive(Debug)]
struct VersionedValue<K: Clone> {
versioned_map: BTreeMap<TxnIndex, CachePadded<VersionEntry<K>>>,
versioned_map: BTreeMap<TxnIndex, Box<CachePadded<VersionEntry<K>>>>,

// The value of the given aggregator prior to the block execution. None implies that
// the aggregator did not exist prior to the block.
Expand Down Expand Up @@ -101,13 +102,13 @@ impl<K: Copy + Clone + Debug + Eq> VersionedValue<K> {

match self.versioned_map.entry(txn_idx) {
Entry::Occupied(mut o) => {
let bypass = match &**o.get() {
let bypass = match o.get().as_ref().deref() {
Value(_, maybe_apply) => maybe_apply.clone().map_or(NoBypass, Bypass),
Apply(apply) => Bypass(apply.clone()),
Estimate(_) => unreachable!("Entry already marked estimate"),
};

o.insert(CachePadded::new(Estimate(bypass)));
o.insert(Box::new(CachePadded::new(Estimate(bypass))));
},
Entry::Vacant(_) => unreachable!("Versioned entry must exist when marking as estimate"),
};
Expand All @@ -118,7 +119,10 @@ impl<K: Copy + Clone + Debug + Eq> VersionedValue<K> {
// Entries should only be deleted if the transaction that produced them is
// aborted and re-executed, but abort must have marked the entry as an Estimate.
assert_matches!(
&*deleted_entry.expect("Entry must exist to be removed"),
deleted_entry
.expect("Entry must exist to be removed")
.as_ref()
.deref(),
VersionEntry::Estimate(_),
"Removed entry must be an Estimate",
);
Expand All @@ -143,7 +147,7 @@ impl<K: Copy + Clone + Debug + Eq> VersionedValue<K> {

match self.versioned_map.entry(txn_idx) {
Entry::Occupied(mut o) => {
if !match (&**o.get(), &entry) {
if !match (o.get().as_ref().deref(), &entry) {
// These are the cases where the transaction behavior with respect to the
// aggregator may change (based on the information recorded in the Estimate).
(Estimate(Bypass(apply_l)), Apply(apply_r) | Value(_, Some(apply_r))) => {
Expand Down Expand Up @@ -171,10 +175,10 @@ impl<K: Copy + Clone + Debug + Eq> VersionedValue<K> {
// TODO[agg_v2](optimize): See if we want to invalidate, when we change read_estimate_deltas
self.read_estimate_deltas = false;
}
o.insert(CachePadded::new(entry));
o.insert(Box::new(CachePadded::new(entry)));
},
Entry::Vacant(v) => {
v.insert(CachePadded::new(entry));
v.insert(Box::new(CachePadded::new(entry)));
},
}
Ok(())
Expand All @@ -185,12 +189,12 @@ impl<K: Copy + Clone + Debug + Eq> VersionedValue<K> {

match self.versioned_map.entry(txn_idx) {
Entry::Occupied(mut o) => {
match &**o.get() {
match o.get().as_ref().deref() {
Value(v, _) => assert_eq!(v, &value),
Apply(_) => (),
_ => unreachable!("When inserting final value, it needs to be either be Apply or have the same value"),
};
o.insert(CachePadded::new(VersionEntry::Value(value, None)));
o.insert(Box::new(CachePadded::new(VersionEntry::Value(value, None))));
},
Entry::Vacant(_) => unreachable!("When inserting final value, it needs to be present"),
};
Expand All @@ -213,7 +217,7 @@ impl<K: Copy + Clone + Debug + Eq> VersionedValue<K> {
.clone()
.ok_or(MVDelayedFieldsError::NotFound)
},
|(_, entry)| match &**entry {
|(_, entry)| match entry.as_ref().deref() {
Value(v, _) => Ok(v.clone()),
Apply(_) => {
unreachable!("Apply entries may not exist for committed txn indices")
Expand All @@ -228,7 +232,7 @@ impl<K: Copy + Clone + Debug + Eq> VersionedValue<K> {
// Errors of not finding a value to resolve to take precedence over a DeltaApplicationError.
fn apply_aggregator_change_suffix(
&self,
iter: &mut dyn DoubleEndedIterator<Item = (&TxnIndex, &CachePadded<VersionEntry<K>>)>,
iter: &mut dyn DoubleEndedIterator<Item = (&TxnIndex, &Box<CachePadded<VersionEntry<K>>>)>,
suffix: &DelayedApplyEntry<K>,
) -> Result<VersionedRead<K>, PanicOr<MVDelayedFieldsError>> {
use DelayedApplyEntry::*;
Expand All @@ -242,7 +246,7 @@ impl<K: Copy + Clone + Debug + Eq> VersionedValue<K> {
};

while let Some((idx, entry)) = iter.next_back() {
let delta = match (&**entry, self.read_estimate_deltas) {
let delta = match (entry.as_ref().deref(), self.read_estimate_deltas) {
(Value(DelayedFieldValue::Aggregator(v), _), _) => {
// Apply accumulated delta to resolve the aggregator value.
return accumulator
Expand Down Expand Up @@ -309,7 +313,7 @@ impl<K: Copy + Clone + Debug + Eq> VersionedValue<K> {
.map(VersionedRead::Value)
},
// Consider the latest entry below the provided version.
|(idx, entry)| match (&**entry, self.read_estimate_deltas) {
|(idx, entry)| match (entry.as_ref().deref(), self.read_estimate_deltas) {
(Value(v, _), _) => Ok(VersionedRead::Value(v.clone())),
(Apply(apply), _) | (Estimate(Bypass(apply)), true) => {
apply.get_apply_base_id_option().map_or_else(
Expand Down Expand Up @@ -512,7 +516,7 @@ impl<K: Eq + Hash + Clone + Debug + Copy> VersionedDelayedFields<K> {
.get(&idx_to_commit)
.expect("Value in commit at that transaction version needs to be in the HashMap");

let new_entry = match &**entry_to_commit {
let new_entry = match entry_to_commit.as_ref().deref() {
VersionEntry::Value(_, None) => None,
// remove delta in the commit
VersionEntry::Value(v, Some(_)) => Some(v.clone()),
Expand Down Expand Up @@ -907,9 +911,11 @@ mod test {

// Marking an Estimate (first we confirm) as estimate is not allowed.
assert_matches!(
&**v.versioned_map
v.versioned_map
.get(&3)
.expect("Expecting an Estimate entry"),
.expect("Expecting an Estimate entry")
.as_ref()
.deref(),
VersionEntry::Estimate(EstimatedEntry::NoBypass)
);
v.mark_estimate(3);
Expand Down Expand Up @@ -949,7 +955,7 @@ mod test {
let val_bypass = v.versioned_map.get(&3);
assert_some!(val_bypass);
assert_matches!(
&**val_bypass.unwrap(),
val_bypass.unwrap().as_ref().deref(),
VersionEntry::Estimate(EstimatedEntry::Bypass(
DelayedApplyEntry::AggregatorDelta { .. }
))
Expand All @@ -961,7 +967,7 @@ mod test {
let delta_bypass = v.versioned_map.get(&4);
assert_some!(delta_bypass);
assert_matches!(
&**delta_bypass.unwrap(),
delta_bypass.unwrap().as_ref().deref(),
VersionEntry::Estimate(EstimatedEntry::Bypass(
DelayedApplyEntry::AggregatorDelta { .. }
))
Expand All @@ -973,7 +979,7 @@ mod test {
let val_no_bypass = v.versioned_map.get(&2);
assert_some!(val_no_bypass);
assert_matches!(
&**val_no_bypass.unwrap(),
val_no_bypass.unwrap().as_ref().deref(),
VersionEntry::Estimate(EstimatedEntry::NoBypass)
);
assert_err_eq!(v.read(5), PanicOr::Or(MVDelayedFieldsError::Dependency(2)));
Expand Down Expand Up @@ -1015,7 +1021,7 @@ mod test {
let val_no_bypass = v.versioned_map.get(&6);
assert_some!(val_no_bypass);
assert_matches!(
&**val_no_bypass.unwrap(),
val_no_bypass.unwrap().as_ref().deref(),
VersionEntry::Estimate(EstimatedEntry::NoBypass)
);
assert_err_eq!(v.read(7), PanicOr::Or(MVDelayedFieldsError::Dependency(6)));
Expand All @@ -1039,7 +1045,7 @@ mod test {
let snapshot_bypass = v.versioned_map.get(&8);
assert_some!(snapshot_bypass);
assert_matches!(
&**snapshot_bypass.unwrap(),
snapshot_bypass.unwrap().as_ref().deref(),
VersionEntry::Estimate(EstimatedEntry::Bypass(
DelayedApplyEntry::SnapshotDelta { .. }
))
Expand Down Expand Up @@ -1075,7 +1081,7 @@ mod test {
let val_no_bypass = v.versioned_map.get(&6);
assert_some!(val_no_bypass);
assert_matches!(
&**val_no_bypass.unwrap(),
val_no_bypass.unwrap().as_ref().deref(),
VersionEntry::Estimate(EstimatedEntry::NoBypass)
);
assert_err_eq!(v.read(7), PanicOr::Or(MVDelayedFieldsError::Dependency(6)));
Expand All @@ -1099,7 +1105,7 @@ mod test {
let snapshot_bypass = v.versioned_map.get(&8);
assert_some!(snapshot_bypass);
assert_matches!(
&**snapshot_bypass.unwrap(),
snapshot_bypass.unwrap().as_ref().deref(),
VersionEntry::Estimate(EstimatedEntry::Bypass(
DelayedApplyEntry::SnapshotDerived { .. }
))
Expand Down

0 comments on commit bba6feb

Please sign in to comment.