Skip to content

Commit

Permalink
feat: add file statistics (#3232)
Browse files Browse the repository at this point in the history
This PR adds a python method to get the `size in bytes` and `number of
pages` of each column in a Lance file.

these statistics are calculated on demand.

#3221

---------

Co-authored-by: Weston Pace <[email protected]>
  • Loading branch information
broccoliSpicy and westonpace authored Dec 13, 2024
1 parent 99ae761 commit 679b93c
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 2 deletions.
8 changes: 8 additions & 0 deletions python/python/lance/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
LanceBufferDescriptor,
LanceColumnMetadata,
LanceFileMetadata,
LanceFileStatistics,
LancePageMetadata,
)
from .lance import (
Expand Down Expand Up @@ -146,6 +147,12 @@ def metadata(self) -> LanceFileMetadata:
"""
return self._reader.metadata()

def file_statistics(self) -> LanceFileStatistics:
"""
Return file statistics of the file
"""
return self._reader.file_statistics()

def read_global_buffer(self, index: int) -> bytes:
"""
Read a global buffer from the file at a given index
Expand Down Expand Up @@ -289,4 +296,5 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"LanceColumnMetadata",
"LancePageMetadata",
"LanceBufferDescriptor",
"LanceFileStatistics",
]
7 changes: 7 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ class LanceFileMetadata:
global_buffers: List[LanceBufferDescriptor]
columns: List[LanceColumnMetadata]

class LanceFileStatistics:
columns: List[LanceColumnStatistics]

class LanceColumnStatistics:
num_pages: int
size_bytes: int

class _Session:
def size_bytes(self) -> int: ...

Expand Down
29 changes: 29 additions & 0 deletions python/python/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import os

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
Expand Down Expand Up @@ -214,6 +215,34 @@ def test_metadata(tmp_path):
assert len(page.encoding) > 0


def test_file_stat(tmp_path):
path = tmp_path / "foo.lance"
schema = pa.schema(
[pa.field("a", pa.int64()), pa.field("b", pa.list_(pa.float64(), 8))]
)

num_rows = 1_000_000

data1 = pa.array(range(num_rows))

# Create a fixed-size list of float64 with dimension 8
fixed_size_list = [np.random.rand(8).tolist() for _ in range(num_rows)]
data2 = pa.array(fixed_size_list, type=pa.list_(pa.float64(), 8))

with LanceFileWriter(str(path), schema) as writer:
writer.write_batch(pa.table({"a": data1, "b": data2}))
reader = LanceFileReader(str(path))
file_stat = reader.file_statistics()

assert len(file_stat.columns) == 2

assert file_stat.columns[0].num_pages == 1
assert file_stat.columns[0].size_bytes == 8_000_000

assert file_stat.columns[1].num_pages == 2
assert file_stat.columns[1].size_bytes == 64_000_000


def test_round_trip_parquet(tmp_path):
pq_path = tmp_path / "foo.parquet"
table = pa.table({"int": [1, 2], "list_str": [["x", "yz", "abc"], ["foo", "bar"]]})
Expand Down
123 changes: 122 additions & 1 deletion python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use lance_core::cache::FileMetadataCache;
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
use lance_file::{
v2::{
reader::{BufferDescriptor, CachedFileMetadata, FileReader, FileReaderOptions},
reader::{
BufferDescriptor, CachedFileMetadata, FileReader, FileReaderOptions, FileStatistics,
},
writer::{FileWriter, FileWriterOptions},
},
version::LanceFileVersion,
Expand Down Expand Up @@ -113,6 +115,58 @@ impl LanceColumnMetadata {
}
}

/// Statistics summarize some of the file metadata for quick summary info
#[pyclass(get_all)]
#[derive(Clone, Debug, Serialize)]
pub struct LanceFileStatistics {
/// Statistics about each of the columns in the file
columns: Vec<LanceColumnStatistics>,
}

#[pymethods]
impl LanceFileStatistics {
fn __repr__(&self) -> String {
let column_reprs: Vec<String> = self.columns.iter().map(|col| col.__repr__()).collect();
format!("FileStatistics(columns=[{}])", column_reprs.join(", "))
}
}

/// Summary information describing a column
#[pyclass(get_all)]
#[derive(Clone, Debug, Serialize)]
pub struct LanceColumnStatistics {
/// The number of pages in the column
num_pages: usize,
/// The total number of data & metadata bytes in the column
///
/// This is the compressed on-disk size
size_bytes: u64,
}

#[pymethods]
impl LanceColumnStatistics {
fn __repr__(&self) -> String {
format!(
"ColumnStatistics(num_pages={}, size_bytes={})",
self.num_pages, self.size_bytes
)
}
}

impl LanceFileStatistics {
fn new(inner: &FileStatistics) -> Self {
let columns = inner
.columns
.iter()
.map(|column_stat| LanceColumnStatistics {
num_pages: column_stat.num_pages,
size_bytes: column_stat.size_bytes,
})
.collect();
Self { columns }
}
}

#[pyclass(get_all)]
#[derive(Clone, Debug, Serialize)]
pub struct LanceFileMetadata {
Expand Down Expand Up @@ -445,6 +499,11 @@ impl LanceFileReader {
LanceFileMetadata::new(inner_meta, py)
}

pub fn file_statistics(&self) -> LanceFileStatistics {
let inner_stat = self.inner.file_statistics();
LanceFileStatistics::new(&inner_stat)
}

pub fn read_global_buffer(&mut self, index: u32) -> PyResult<Vec<u8>> {
let buffer_bytes = RT
.runtime
Expand All @@ -453,3 +512,65 @@ impl LanceFileReader {
Ok(buffer_bytes.to_vec())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_lance_file_statistics_repr_empty() {
let stats = LanceFileStatistics { columns: vec![] };

let repr_str = stats.__repr__();
assert_eq!(repr_str, "FileStatistics(columns=[])");
}

#[test]
fn test_lance_file_statistics_repr_single_column() {
let stats = LanceFileStatistics {
columns: vec![LanceColumnStatistics {
num_pages: 5,
size_bytes: 1024,
}],
};

let repr_str = stats.__repr__();
assert_eq!(
repr_str,
"FileStatistics(columns=[ColumnStatistics(num_pages=5, size_bytes=1024)])"
);
}

#[test]
fn test_lance_file_statistics_repr_multiple_columns() {
let stats = LanceFileStatistics {
columns: vec![
LanceColumnStatistics {
num_pages: 5,
size_bytes: 1024,
},
LanceColumnStatistics {
num_pages: 3,
size_bytes: 512,
},
],
};

let repr_str = stats.__repr__();
assert_eq!(
repr_str,
"FileStatistics(columns=[ColumnStatistics(num_pages=5, size_bytes=1024), ColumnStatistics(num_pages=3, size_bytes=512)])"
);
}

#[test]
fn test_lance_column_statistics_repr() {
let column_stats = LanceColumnStatistics {
num_pages: 10,
size_bytes: 2048,
};

let repr_str = column_stats.__repr__();
assert_eq!(repr_str, "ColumnStatistics(num_pages=10, size_bytes=2048)");
}
}
3 changes: 2 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use dataset::MergeInsertBuilder;
use env_logger::Env;
use file::{
LanceBufferDescriptor, LanceColumnMetadata, LanceFileMetadata, LanceFileReader,
LanceFileWriter, LancePageMetadata,
LanceFileStatistics, LanceFileWriter, LancePageMetadata,
};
use futures::StreamExt;
use lance_index::DatasetIndexExt;
Expand Down Expand Up @@ -120,6 +120,7 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<LanceFileReader>()?;
m.add_class::<LanceFileWriter>()?;
m.add_class::<LanceFileMetadata>()?;
m.add_class::<LanceFileStatistics>()?;
m.add_class::<LanceColumnMetadata>()?;
m.add_class::<LancePageMetadata>()?;
m.add_class::<LanceBufferDescriptor>()?;
Expand Down
42 changes: 42 additions & 0 deletions rust/lance-file/src/v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ pub struct BufferDescriptor {
pub size: u64,
}

/// Statistics summarize some of the file metadata for quick summary info
#[derive(Debug)]
pub struct FileStatistics {
/// Statistics about each of the columns in the file
pub columns: Vec<ColumnStatistics>,
}

/// Summary information describing a column
#[derive(Debug)]
pub struct ColumnStatistics {
/// The number of pages in the column
pub num_pages: usize,
/// The total number of data & metadata bytes in the column
///
/// This is the compressed on-disk size
pub size_bytes: u64,
}

// TODO: Caching
#[derive(Debug)]
pub struct CachedFileMetadata {
Expand Down Expand Up @@ -313,6 +331,30 @@ impl FileReader {
&self.metadata
}

pub fn file_statistics(&self) -> FileStatistics {
let column_metadatas = &self.metadata().column_metadatas;

let column_stats = column_metadatas
.iter()
.map(|col_metadata| {
let num_pages = col_metadata.pages.len();
let size_bytes = col_metadata
.pages
.iter()
.map(|page| page.buffer_sizes.iter().sum::<u64>())
.sum::<u64>();
ColumnStatistics {
num_pages,
size_bytes,
}
})
.collect();

FileStatistics {
columns: column_stats,
}
}

pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
self.scheduler
Expand Down

0 comments on commit 679b93c

Please sign in to comment.