-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): support non_pk_prefix_watermark state cleaning #19889
base: main
Are you sure you want to change the base?
Conversation
…nto li0k/storage_non_pk_watermark_clean
…nto li0k/storage_non_pk_watermark_clean
…nto li0k/storage_non_pk_watermark_clean
…nto li0k/storage_non_pk_watermark_clean
…nto li0k/storage_non_pk_watermark_clean
…nto li0k/storage_non_pk_watermark_clean
…nto li0k/storage_non_pk_watermark_clean
…nto li0k/storage_non_pk_watermark_clean
.table_watermarks | ||
.iter() | ||
.filter_map(|(table_id, table_watermarks)| { | ||
if table_id_with_pk_prefix_watermark.contains(table_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have a WaterMarkType
define in the version, why don't we just use that to filter out table with non pk prefix watermark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, if we filter out non pk prefix watermark here, how can compactor retrieve the non pk prefix watermark? Based on the logic here, it seems that we rely on the fact that non pk prefix watermark is present in the compact task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch , we should filter the watermark by WaterMarkType directly.
And, the filtered results are only passed to the picker, while all relevant watermarks are passed to the compactor (pk or non-pk).
@@ -42,10 +47,14 @@ pub struct SkipWatermarkIterator<I> { | |||
} | |||
|
|||
impl<I: HummockIterator<Direction = Forward>> SkipWatermarkIterator<I> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: since SkipWatermarkIterator
is only used by compactor, how about moving skip_watermark.rs
into src/hummock/compactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, I will propose a separate pr for it
}); | ||
let watermark_col_in_pk = | ||
row.datum_at(*watermark_col_idx_in_pk); | ||
cmp_datum( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, if cmp_datum
returns Euqal | Greater
, based on the logic in L360, the watermark will be advanced. I think this is incorrect for non pk prefix watermark because the non pk prefix watermark and the pk doesn't have the same ordering.
…nto li0k/storage_non_pk_watermark_clean
let table_watermarks = version | ||
.latest_version() | ||
.table_watermarks | ||
.iter() | ||
.filter_map(|(table_id, table_watermarks)| { | ||
if matches!( | ||
table_watermarks.watermark_type, | ||
WatermarkSerdeType::PkPrefix, | ||
) { | ||
Some((*table_id, table_watermarks.clone())) | ||
} else { | ||
None | ||
} | ||
}) | ||
.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually why don't we do the filtering inside the picker instead like in here if the watermark type is part of TableWatermarks:
let table_watermarks = |
We can avoid cloning the table watermark, which can be large given that it stores bytes from user data, with no harm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any ideas on this comment? @Li0k
} | ||
WatermarkSerdeType::Serde(_watermark) => { | ||
// do not skip the non-pk prefix watermark when vnode is the same | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid this is still incorrect based on the semantic of advance_watermark
:
/// Return a flag indicating whether the current key will be filtered by the current watermark.
If we always return false when the table, vnode are the same here, that means none of the keys can be filtered by the watermark. Please clearfully walk through the logics of advance_watermark
, should_delete
and advance_key_and_watermark
. I am still concerned that the implementation of SkipWatermarkState
and SkipWatermarkIterator
rely on the assumption that the key ordering and watermark ordering is the same and we may still miss some changes.
…nto li0k/storage_non_pk_watermark_clean
@@ -52,6 +52,18 @@ impl OrderedRowSerde { | |||
} | |||
} | |||
|
|||
#[must_use] | |||
pub fn index(&self, idx: usize) -> Cow<'_, Self> { | |||
if 1 == self.order_types.len() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we assert idx == 1
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, row can be any length and index can be a generic function.
let table_watermarks = version | ||
.latest_version() | ||
.table_watermarks | ||
.iter() | ||
.filter_map(|(table_id, table_watermarks)| { | ||
if matches!( | ||
table_watermarks.watermark_type, | ||
WatermarkSerdeType::PkPrefix, | ||
) { | ||
Some((*table_id, table_watermarks.clone())) | ||
} else { | ||
None | ||
} | ||
}) | ||
.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any ideas on this comment? @Li0k
), | ||
&self.compact_task.table_watermarks, | ||
)) | ||
let combine_iter = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add comment and referencing issue for state cleaning based on NonPkPrefix watermark iterator as well.
return direction.datum_filter_by_watermark( | ||
watermark_col_in_pk, | ||
watermark, | ||
watermark_col_serde.get_order_types()[0], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we use [0]
here, does it mean that either the watermark column is always a single column or the order type for all watermark columns must be the same? Is this assumption checked and guaranteed somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As said above, I think this guarantee should come from clean_watermark_index_in_pk
. If the assumption is broken then using [0] should crash, which is fine with me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are multiple watermark column of different ordering type, I think this code won't crash but generate incorrect result.
…nto li0k/storage_non_pk_watermark_clean
This pull request has been modified. If you want me to regenerate unit test for any of the files related, please find the file in "Files Changed" tab and add a comment |
fn should_delete(&mut self, key: &FullKey<&[u8]>) -> bool; | ||
fn reset_watermark(&mut self); | ||
|
||
// fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove?
fn reset_watermark(&mut self); | ||
|
||
// fn new(watermarks: BTreeMap<TableId, ReadTableWatermark>) -> Self; | ||
// fn from_safe_epoch_watermarks(safe_epoch_watermarks: BTreeMap<u32, TableWatermarks>) -> Self; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove?
@@ -172,32 +165,30 @@ impl<I: HummockIterator<Direction = Forward>> HummockIterator for SkipWatermarkI | |||
self.inner.value_meta() | |||
} | |||
} | |||
pub struct SkipWatermarkState { | |||
|
|||
pub trait SkipWatermarkState: Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this trait definition to the top of this file to make the codes more readable. Also, can you add documentation for each method?
continue; | ||
} | ||
Ordering::Equal => { | ||
self.last_serde = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get why we need last_serde
. It seems that we always retrieve it from compaction_catalog_agent_ref.watermark_serde
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you probably want to cache it per table id. You should do the switch on table id switch instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
related to #18802
This PR supports
non_pk_prefix_watermark
state cleaning for Hummock.Since
non_pk_prefix_watermark
relies on catalogs, this introduces additional overhead. Therefore, this PR does not guarantee read filtering fornon_pk_prefix_watermark
and only handles expired data.The changes are as follows:
watermarks of type
non_pk_prefix_watermark
are not added toReadWatermarkIndex
.state table support to write
non_pk_prefix_watermark
and serialize.compaction catalog agent support to get watermark serde
skip watermark iterator supports filteringsupportnon_pk_prefix_watermark
.NonPkPrefixSkipWatermarkIterator
to filter table data ofNonPkPrefix
Watermark.Checklist
Documentation
Release note