Skip to content

Commit

Permalink
feat(storage): Splitting table change log from HummockVersion on CN s…
Browse files Browse the repository at this point in the history
…ide (#20050)
  • Loading branch information
Li0k authored Jan 20, 2025
1 parent 323d8a2 commit bf8f076
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 57 deletions.
10 changes: 10 additions & 0 deletions src/storage/hummock_sdk/src/change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ impl<T> TableChangeLogCommon<T> {
.flat_map(|epoch_change_log| epoch_change_log.epochs.iter())
.cloned()
}

pub(crate) fn change_log_into_iter(self) -> impl Iterator<Item = EpochNewChangeLogCommon<T>> {
self.0.into_iter()
}

pub(crate) fn change_log_iter_mut(
&mut self,
) -> impl Iterator<Item = &mut EpochNewChangeLogCommon<T>> {
self.0.iter_mut()
}
}

pub type TableChangeLog = TableChangeLogCommon<SstableInfo>;
Expand Down
8 changes: 4 additions & 4 deletions src/storage/hummock_sdk/src/compact_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ impl CompactTask {
}

impl From<PbCompactTask> for CompactTask {
#[expect(deprecated)]
fn from(pb_compact_task: PbCompactTask) -> Self {
#[expect(deprecated)]
Self {
input_ssts: pb_compact_task
.input_ssts
Expand Down Expand Up @@ -168,8 +168,8 @@ impl From<PbCompactTask> for CompactTask {
}

impl From<&PbCompactTask> for CompactTask {
#[expect(deprecated)]
fn from(pb_compact_task: &PbCompactTask) -> Self {
#[expect(deprecated)]
Self {
input_ssts: pb_compact_task
.input_ssts
Expand Down Expand Up @@ -221,8 +221,8 @@ impl From<&PbCompactTask> for CompactTask {
}

impl From<CompactTask> for PbCompactTask {
#[expect(deprecated)]
fn from(compact_task: CompactTask) -> Self {
#[expect(deprecated)]
Self {
input_ssts: compact_task
.input_ssts
Expand Down Expand Up @@ -272,8 +272,8 @@ impl From<CompactTask> for PbCompactTask {
}

impl From<&CompactTask> for PbCompactTask {
#[expect(deprecated)]
fn from(compact_task: &CompactTask) -> Self {
#[expect(deprecated)]
Self {
input_ssts: compact_task
.input_ssts
Expand Down
30 changes: 19 additions & 11 deletions src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::level::{Level, LevelCommon, Levels, OverlappingLevel};
use crate::sstable_info::SstableInfo;
use crate::table_watermark::{ReadTableWatermark, TableWatermarks};
use crate::version::{
GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDelta,
GroupDelta, GroupDeltaCommon, HummockVersion, HummockVersionCommon, HummockVersionDeltaCommon,
HummockVersionStateTableInfo, IntraLevelDelta, IntraLevelDeltaCommon, ObjectIdReader,
SstableIdReader,
};
Expand All @@ -50,7 +50,7 @@ pub struct SstDeltaInfo {

pub type BranchedSstInfo = HashMap<CompactionGroupId, Vec<HummockSstableId>>;

impl HummockVersion {
impl<L> HummockVersionCommon<SstableInfo, L> {
pub fn get_compaction_group_levels(&self, compaction_group_id: CompactionGroupId) -> &Levels {
self.levels
.get(&compaction_group_id)
Expand Down Expand Up @@ -187,7 +187,7 @@ pub fn safe_epoch_read_table_watermarks_impl(
.collect()
}

impl HummockVersion {
impl<L: Clone> HummockVersionCommon<SstableInfo, L> {
pub fn count_new_ssts_in_group_split(
&self,
parent_group_id: CompactionGroupId,
Expand Down Expand Up @@ -356,7 +356,10 @@ impl HummockVersion {
.all(|level| !level.table_infos.is_empty()));
}

pub fn build_sst_delta_infos(&self, version_delta: &HummockVersionDelta) -> Vec<SstDeltaInfo> {
pub fn build_sst_delta_infos(
&self,
version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
) -> Vec<SstDeltaInfo> {
let mut infos = vec![];

// Skip trivial move delta for refiller
Expand Down Expand Up @@ -459,7 +462,10 @@ impl HummockVersion {
infos
}

pub fn apply_version_delta(&mut self, version_delta: &HummockVersionDelta) {
pub fn apply_version_delta(
&mut self,
version_delta: &HummockVersionDeltaCommon<SstableInfo, L>,
) {
assert_eq!(self.id, version_delta.prev_id);

let (changed_table_info, mut is_commit_epoch) = self.state_table_info.apply_delta(
Expand Down Expand Up @@ -934,12 +940,6 @@ impl<T> HummockVersionCommon<T>
where
T: SstableIdReader + ObjectIdReader,
{
pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
self.levels
.values()
.flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
}

pub fn get_object_ids(&self) -> HashSet<HummockSstableObjectId> {
self.get_sst_infos().map(|s| s.object_id()).collect()
}
Expand Down Expand Up @@ -1094,6 +1094,14 @@ impl Levels {
}
}

impl<T, L> HummockVersionCommon<T, L> {
pub fn get_combined_levels(&self) -> impl Iterator<Item = &'_ LevelCommon<T>> + '_ {
self.levels
.values()
.flat_map(|level| level.l0.sub_levels.iter().rev().chain(level.levels.iter()))
}
}

pub fn build_initial_compaction_group_levels(
group_id: CompactionGroupId,
compaction_config: &CompactionConfig,
Expand Down
115 changes: 103 additions & 12 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use risingwave_pb::hummock::{
};
use tracing::warn;

use crate::change_log::{ChangeLogDeltaCommon, TableChangeLogCommon};
use crate::change_log::{
ChangeLogDeltaCommon, EpochNewChangeLogCommon, TableChangeLog, TableChangeLogCommon,
};
use crate::compaction_group::hummock_version_ext::build_initial_compaction_group_levels;
use crate::compaction_group::StaticCompactionGroupId;
use crate::level::LevelsCommon;
Expand Down Expand Up @@ -217,18 +219,20 @@ impl HummockVersionStateTableInfo {
}

#[derive(Debug, Clone, PartialEq)]
pub struct HummockVersionCommon<T> {
pub struct HummockVersionCommon<T, L = T> {
pub id: HummockVersionId,
pub levels: HashMap<CompactionGroupId, LevelsCommon<T>>,
#[deprecated]
pub(crate) max_committed_epoch: u64,
pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
pub table_change_log: HashMap<TableId, TableChangeLogCommon<T>>,
pub table_change_log: HashMap<TableId, TableChangeLogCommon<L>>,
pub state_table_info: HummockVersionStateTableInfo,
}

pub type HummockVersion = HummockVersionCommon<SstableInfo>;

pub type LocalHummockVersion = HummockVersionCommon<SstableInfo, ()>;

impl Default for HummockVersion {
fn default() -> Self {
HummockVersion::from(&PbHummockVersion::default())
Expand Down Expand Up @@ -433,13 +437,6 @@ impl HummockVersion {
}
}

pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
self.state_table_info
.info()
.get(&table_id)
.map(|info| info.committed_epoch)
}

pub fn create_init_version(default_compaction_config: Arc<CompactionConfig>) -> HummockVersion {
#[expect(deprecated)]
let mut init_version = HummockVersion {
Expand Down Expand Up @@ -476,10 +473,41 @@ impl HummockVersion {
state_table_info_delta: Default::default(),
}
}

pub fn split_change_log(mut self) -> (LocalHummockVersion, HashMap<TableId, TableChangeLog>) {
let table_change_log = {
let mut table_change_log = HashMap::new();
for (table_id, log) in &mut self.table_change_log {
let change_log_iter =
log.change_log_iter_mut()
.map(|item| EpochNewChangeLogCommon {
new_value: std::mem::take(&mut item.new_value),
old_value: std::mem::take(&mut item.old_value),
epochs: item.epochs.clone(),
});
table_change_log.insert(*table_id, TableChangeLogCommon::new(change_log_iter));
}

table_change_log
};

let local_version = LocalHummockVersion::from(self);

(local_version, table_change_log)
}
}

impl<T, L> HummockVersionCommon<T, L> {
pub fn table_committed_epoch(&self, table_id: TableId) -> Option<u64> {
self.state_table_info
.info()
.get(&table_id)
.map(|info| info.committed_epoch)
}
}

#[derive(Debug, PartialEq, Clone)]
pub struct HummockVersionDeltaCommon<T> {
pub struct HummockVersionDeltaCommon<T, L = T> {
pub id: HummockVersionId,
pub prev_id: HummockVersionId,
pub group_deltas: HashMap<CompactionGroupId, GroupDeltasCommon<T>>,
Expand All @@ -488,12 +516,14 @@ pub struct HummockVersionDeltaCommon<T> {
pub trivial_move: bool,
pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
pub removed_table_ids: HashSet<TableId>,
pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<T>>,
pub change_log_delta: HashMap<TableId, ChangeLogDeltaCommon<L>>,
pub state_table_info_delta: HashMap<TableId, StateTableInfoDelta>,
}

pub type HummockVersionDelta = HummockVersionDeltaCommon<SstableInfo>;

pub type LocalHummockVersionDelta = HummockVersionDeltaCommon<SstableInfo, ()>;

impl Default for HummockVersionDelta {
fn default() -> Self {
HummockVersionDelta::from(&PbHummockVersionDelta::default())
Expand Down Expand Up @@ -1095,3 +1125,64 @@ where
self.into()
}
}

impl From<HummockVersionDelta> for LocalHummockVersionDelta {
#[expect(deprecated)]
fn from(delta: HummockVersionDelta) -> Self {
Self {
id: delta.id,
prev_id: delta.prev_id,
group_deltas: delta.group_deltas,
max_committed_epoch: delta.max_committed_epoch,
trivial_move: delta.trivial_move,
new_table_watermarks: delta.new_table_watermarks,
removed_table_ids: delta.removed_table_ids,
change_log_delta: delta
.change_log_delta
.into_iter()
.map(|(k, v)| {
(
k,
ChangeLogDeltaCommon {
truncate_epoch: v.truncate_epoch,
new_log: EpochNewChangeLogCommon {
epochs: v.new_log.epochs,
new_value: Vec::new(),
old_value: Vec::new(),
},
},
)
})
.collect(),
state_table_info_delta: delta.state_table_info_delta,
}
}
}

impl From<HummockVersion> for LocalHummockVersion {
#[expect(deprecated)]
fn from(version: HummockVersion) -> Self {
Self {
id: version.id,
levels: version.levels,
max_committed_epoch: version.max_committed_epoch,
table_watermarks: version.table_watermarks,
table_change_log: version
.table_change_log
.into_iter()
.map(|(k, v)| {
let epoch_new_change_logs: Vec<EpochNewChangeLogCommon<()>> = v
.change_log_into_iter()
.map(|epoch_new_change_log| EpochNewChangeLogCommon {
epochs: epoch_new_change_log.epochs,
new_value: Vec::new(),
old_value: Vec::new(),
})
.collect();
(k, TableChangeLogCommon::new(epoch_new_change_logs))
})
.collect(),
state_table_info: version.state_table_info,
}
}
}
Loading

0 comments on commit bf8f076

Please sign in to comment.