-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: metadata columns #14057
base: main
Are you sure you want to change the base?
feat: metadata columns #14057
Conversation
return metadata.qualified_field(i - self.inner.len()); | ||
} | ||
} | ||
self.inner.qualified_field(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better not to mix inner field and meta field?
maybe we need another method meta_field(&self, i: usize)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually implementing another method was my first attempt. but I found that I need to change a lot of code, because column index is used everywhere. that's why in currently implementation metadata column has index + len(fields).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't only where you need meta columns you need to change the code with meta_field
? Others code that call with field
remain the same.
The downside of the current approach is that whenever the schema is changed, the index of meta columns need to adjust too. I think this is error prone. Minimize the dependency of meta schema and schema is better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. it's error prone. Can we change the offsets of metadata columns, e.g. (-1 as usize) (-2 as usize) then there's no such problem. I see some databases use this trick.
Isn't only where you need meta columns you need to change the code with meta_field? Others code that call with field remain the same.
yes, we can. but many apis use Vec to represent columns. I have to change many structs and method defnitions to pass extra parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(-1 as usize)
how does this large offset work? We have vector instead of map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jayzhan211 I pushed a commit, could you please review it again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay this approach looks good to me.
datafusion/common/src/dfschema.rs
Outdated
.collect() | ||
let mut fields: Vec<&Field> = self.inner.fields_with_unqualified_name(name); | ||
if let Some(schema) = self.metadata_schema() { | ||
fields.append(&mut schema.fields_with_unqualified_name(name)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fields.append(&mut schema.fields_with_unqualified_name(name)); | |
fields.append(schema.fields_with_unqualified_name(name)); |
datafusion/common/src/dfschema.rs
Outdated
let mut fields: Vec<(Option<&TableReference>, &Field)> = | ||
self.inner.qualified_fields_with_unqualified_name(name); | ||
if let Some(schema) = self.metadata_schema() { | ||
fields.append(&mut schema.qualified_fields_with_unqualified_name(name)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fields.append(&mut schema.qualified_fields_with_unqualified_name(name)); | |
fields.append(schema.qualified_fields_with_unqualified_name(name)); |
return ( | ||
Some(table_name.clone()), | ||
Arc::new( | ||
metadata.field(*i - METADATA_OFFSET).clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handle where i < METADATA_OFFSET
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, wait others to review this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @chenkovsky and @jayzhan211 -- this is a neat feature and I think has also been asked for before 💯
Also, I think the code is well structured and tested.
Before we merge this PR I think we need
- a test for more than one metadata column
- ensure this doesn't slow down planning (I will run benchmarks and report back)
I would strongly recommend we do in this PR (but could do as a follow on)
- More documentation (to help others and our future selves use it)
- Change the test to use
assert_batches_eq
&self.inner.schema | ||
} | ||
|
||
pub fn with_metadata_schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please document these APIs
@@ -55,6 +55,11 @@ pub trait TableProvider: Debug + Sync + Send { | |||
/// Get a reference to the schema for this table | |||
fn schema(&self) -> SchemaRef; | |||
|
|||
/// Get metadata columns of this table. | |||
fn metadata_columns(&self) -> Option<SchemaRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please document this better -- specifically:
- A link to the prior art (spark metadata columns)
- A brief summary of what metadata columns are used for and an example (you can copy the content from the spark docs)
datafusion/common/src/dfschema.rs
Outdated
metadata: Option<QualifiedSchema>, | ||
} | ||
|
||
pub const METADATA_OFFSET: usize = usize::MAX >> 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please document what this is and how it relates to DFSchema::inner
datafusion/common/src/dfschema.rs
Outdated
inner: QualifiedSchema, | ||
/// Stores functional dependencies in the schema. | ||
functional_dependencies: FunctionalDependencies, | ||
/// metadata columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide more documentation here to document what these are (perhaps adding a link to the higher level description you write on TableProvider::metadata_columns
)
pub const METADATA_OFFSET: usize = usize::MAX >> 1; | ||
|
||
#[derive(Debug, Clone, PartialEq, Eq)] | ||
pub struct QualifiedSchema { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document what this struct is used for
} | ||
} | ||
|
||
pub fn metadata_schema(&self) -> &Option<QualifiedSchema> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add documentation -- imagine you are someone using this API and are not familar with metadata_schema or the content of this API. I think you would want a short summary of what this is and then a link to the full details
use datafusion_common::METADATA_OFFSET; | ||
use itertools::Itertools; | ||
|
||
/// A User, with an id and a bank account |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is is actually quite a cool example of using metadata index
Eventually I think it would be great to add an example in https://github.com/apache/datafusion/tree/main/datafusion-examples
.unwrap(); | ||
let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); | ||
assert_eq!(batch.num_rows(), 2); | ||
let serializer = CsvSerializer::new().with_header(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To check the results, can you please use assert_batches_eq
instead of converting to CSV?
That is
- more consistent with the rest of the codebase
- easier to read
- easier to update
For example:
datafusion/datafusion/core/tests/sql/select.rs
Lines 69 to 95 in 167c11e
let expected = vec![ | |
"+----+----+", | |
"| c1 | c2 |", | |
"+----+----+", | |
"| 1 | 1 |", | |
"| 1 | 2 |", | |
"| 1 | 3 |", | |
"| 1 | 4 |", | |
"| 1 | 5 |", | |
"| 1 | 6 |", | |
"| 1 | 7 |", | |
"| 1 | 8 |", | |
"| 1 | 9 |", | |
"| 1 | 10 |", | |
"| 2 | 1 |", | |
"| 2 | 2 |", | |
"| 2 | 3 |", | |
"| 2 | 4 |", | |
"| 2 | 5 |", | |
"| 2 | 6 |", | |
"| 2 | 7 |", | |
"| 2 | 8 |", | |
"| 2 | 9 |", | |
"| 2 | 10 |", | |
"+----+----+", | |
]; | |
assert_batches_sorted_eq!(expected, &results); |
let all_batchs = df5.collect().await.unwrap(); | ||
let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); | ||
let bytes = serializer.serialize(batch, true).unwrap(); | ||
assert_eq!(bytes, "1,2\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please also add a test for more than one metadata column?
Something other people have asked for in the past (whihc I can't find now) is the ability to know what file a particular row came from in a listing table that combines multiple files Update: I found it at #8906 To be clear I think this PR would enable selecting a subset of files, as described on #8906 (comment) |
We want this as well to hide "special" internal columns we create to speed up JSON columns. +1 for the feature! |
My only question is if "metadata" is the right name for these columns? Could it be "system" columns or something like that? |
Metadata column is the name I'm familiar with in other systems. For example, spark/databricks |
I guess the naming doesn't really hurt our use case so okay let's go with that if it means something in the domain in general 👍🏻 |
FWIW I ran the planning benchmarks on this branch and see no measurable difference. ✅
|
Can these metadata columns utilize normal column properties, like ordering equivalences, constantness, distinctness etc.? For example, AFAIU rowid is an ordered column, and if I sort the table by rowid, the SortExec would be removed? (it seems to me not yet at this point) Can we iterate over the design to support those capabilities, too? |
I with this PR a custom table provider that was ordered by row_id could communicate that information to avoid a SortExec From what I can tell, the metadata columns is only a notion in the Specifically, the |
What I mean is: When I print this query, there exists a SortExec for _rowid. But what I understand is _rowid should be a one-by-one increasing column? |
Maybe not, I use vec to store values in test, but if the inner datastructure is btree, the scan order is not always increasing. |
Here's what I think is a much simpler and more flexible change: #14362 |
datafusion-testing
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be reverted
@@ -105,11 +105,11 @@ use uuid::Uuid; | |||
/// # #[tokio::main] | |||
/// # async fn main() -> Result<()> { | |||
/// let state = SessionStateBuilder::new() | |||
/// .with_config(SessionConfig::new()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert?
Update here is we are very close to cutting the 45 release branch. See
Once we do that let's plan to have this PR ready to merge. Thanks again @chenkovsky and @adriangb |
Yes, thanks @adriangb, you also prepared nice UTs for scenario that I have not covered. I'm fixing these UTs now. |
Great let's fix those unit tests on your branch then we can look at the pros/cons of the approaches we've come up with. |
I don't know whether you agree, when make a design, every component should do one thing, and do it well. reuse metadata map violates this, it takes two roles. what makes things worse is that this map is mutable by user. but for metadata column or system column, we wish it's const for every data source. |
I have to disagree on that. Field metadata is a hook point to do these sorts of things without having to pipe major code changes throughout the entire codebase. I think this is the use case for field metadata.
Who do you consider the "user" in this scenario? I am a system implementer and a user of DataFusion. By design and necessity I edit metadata on fields (e.g. to indicate a UTF8 columns is JSON data). The users of the system I implement do not edit field metadata in my system. Maybe you're coming at it from a different perspective of "user" that I'm not understanding?
Maybe but I don't see how it's any different for a Ultimately I think using field metadata will result in a smaller change in terms of LOC, less new methods and other API changes in DataFusion, will be less likely to break DataFusion implementers code (e.g. because they make assumptions about field indexes being contiguous; I'd like to see some tests against |
@adriangb as I have said, it seems that you are thinking about this from database side, I'm talking about compute engine problem. the users i mean are big data engineer. changing metadata dict is very easy through dataframe api. compute engine should not make any assumption on input data. BTW, for _rowid save and load problem, you have a solution now?
If you have read the discussion history of this PR. in my initial implement, field index is contiguous. METADATA_OFFSET is not the key of this design. In my initial design, metadata columns are appended at the end of normal column array virtually. If everyone think METADATA_OFFSET is evil, It's easy to revert it back. that's also why I didn't implement metadata column support for other logical plan. I want to hear more ideas first. |
@@ -1330,7 +1330,7 @@ impl SessionStateBuilder { | |||
/// let url = Url::try_from("file://").unwrap(); | |||
/// let object_store = object_store::local::LocalFileSystem::new(); | |||
/// let state = SessionStateBuilder::new() | |||
/// .with_config(SessionConfig::new()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this file add no value and should be reverted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @Omega359 could you please review latested commits. I think these are already reverted before.
/// Gather the schema representating the metadata columns that this plan outputs. | ||
/// This is done by recursively traversing the plan and collecting the metadata columns that are output | ||
/// from inner nodes. | ||
/// See [TableProvider](../catalog/trait.TableProvider.html#method.metadata_columns) for more information on metadata columns in general. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't seem to be able to find that reference in this PR ... was this added elsewhere in a PR that has yet to be merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry my fault . i will delete these lines. please ignore them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't seem to be able to find that reference in this PR ... was this added elsewhere in a PR that has yet to be merged?
@Omega359 also please review latest codes, it's removed already.
@adriangb feel free to correct me, I know maybe I'm wrong. it seems that schema adapater has no relationship with metadata column. metadata column only takes effect on logical plan and physical plan. after selected as recordbatch, no need to distinguish metadata column from normal column. |
@chenkovsky the main difference between the two approaches is how to transmit the information on which columns are system columns and which aren't. The approach in this PR does it explicitly by modifying How about we check with @alamb, @Omega359 and @jayzhan211 what they think sounds best? Either way I still think we should name these |
of course, I want to listen other's opinions. and I also think name is a small thing. changing to system column is also ok. besides _rowid save and load problem. before compare pros and cons, would you mind to add some tests about stopping system column propagation? I haven't seen them on your branch? |
have you seen these? datafusion/datafusion/core/tests/sql/system_columns.rs Lines 318 to 376 in af6e972
|
so for _row_id save load problem. so in your implementation "a system column stops being a system column once it's projected" ? for stopping system column propagation, have you tested other logical plans e.g. union intersect? |
@chenkovsky You mentioned that the issues of #14362 are 1) duplicated field issues 2) HashMap I think overall datafusion only care about the system columns that are generated by datafusion, other system columns from other engine should be considered normal columns, but since this is just based on my guess not from any practical experience, is there any concern of this assumption? For HashMap, I don't think it has performance issue since we only check boolean from it and we don't need to access it frequently given the field should be fixed once created. |
datafusion supports loading files, without such feature, of course, we don't need to take care about this. but with this feature we have to take care about this. and could you please also see the discussion in #14362. we have more different opinions. for example, system/metadata column propagation problem. for _rowid save load problem. currently, data engineers have to write a with clause in #14362 . when using dataframe api, data engineers also have to take more care about metadata dict in #14362. I haven't seen such behavior in other systems. It adds a lot of burden to data engineers. anyway, after #14362 adds more ut about stopping propagation, let's pros and cons. |
Which issue does this PR close?
Closes #13975.
Rationale for this change
many databases support pseudo columns, for example, file_path, file_name, file_size, rowid.
for pseudo columns, we don't want to get them by default, but we want to be able to use them explicitly.
for the database that supports rowid.
select * from tb
won't return rowid. but we can get rowid byselect rowid, * from tb
. spark has already supported metadata columns. this PR want to support it in datafusion.What changes are included in this PR?
Are these changes tested?
Unit test is added
Are there any user-facing changes?
No
For FFI table provider API, one function that returns metadata column is added.