From c7d41a811a5530c5e8023cb60543860cff95627e Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 5 Dec 2024 18:48:24 -0800 Subject: [PATCH 1/7] Fix timestamp millisecond and add tests to check --- kernel/src/engine/default/filesystem.rs | 32 ++++++++++++++- kernel/src/engine/sync/fs_client.rs | 53 ++++++++++++++++++++----- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index c114d0ea7..50d830c11 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -68,7 +68,7 @@ impl FileSystemClient for ObjectStoreFileSystemClient { sender .send(Ok(FileMeta { location, - last_modified: meta.last_modified.timestamp(), + last_modified: meta.last_modified.timestamp_millis(), size: meta.size, })) .ok(); @@ -216,6 +216,36 @@ mod tests { assert_eq!(data[2], Bytes::from("el-da")); } + #[tokio::test] + async fn test_file_meta_is_correct() { + let tmp = tempfile::tempdir().unwrap(); + let tmp_store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap(); + + let data = Bytes::from("kernel-data"); + let name = delta_path_for_version(1, "json"); + tmp_store.put(&name, data.clone().into()).await.unwrap(); + + let url = Url::from_directory_path(tmp.path()).unwrap(); + let prefix = Path::from_url_path(url.path()).expect("Couldn't get path"); + let store = Arc::new(LocalFileSystem::new()); + let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new())); + let files: Vec<_> = engine + .get_file_system_client() + .list_from(&Url::parse("file://").unwrap()) + .unwrap() + .try_collect() + .unwrap(); + + let object_meta = tmp_store.head(&name).await.unwrap(); + let expected_location = url.join(name.as_ref()).unwrap(); + let expected_file_meta = FileMeta { + location: expected_location, + // We assert that the timestamp is in milliseconds + last_modified: object_meta.last_modified.timestamp_millis(), + size: object_meta.size, + }; + assert_eq!(files, vec![expected_file_meta]); + } #[tokio::test] async fn test_default_engine_listing() { let tmp = tempfile::tempdir().unwrap(); diff --git a/kernel/src/engine/sync/fs_client.rs b/kernel/src/engine/sync/fs_client.rs index 4632dc213..003d6b2f1 100644 --- a/kernel/src/engine/sync/fs_client.rs +++ b/kernel/src/engine/sync/fs_client.rs @@ -51,19 +51,19 @@ impl FileSystemClient for SyncFilesystemClient { .sorted_by_key(|ent| ent.path()) .map(|ent| { ent.metadata().map_err(Error::IOError).and_then(|metadata| { - let last_modified: u64 = metadata + let last_modified = metadata .modified() - .map( - |modified| match modified.duration_since(SystemTime::UNIX_EPOCH) { - Ok(d) => d.as_secs(), - Err(_) => 0, - }, - ) + .ok() + .and_then(|modified| { + modified.duration_since(SystemTime::UNIX_EPOCH).ok() + }) + .and_then(|modified| modified.as_millis().try_into().ok()) .unwrap_or(0); + Url::from_file_path(ent.path()) .map(|location| FileMeta { location, - last_modified: last_modified as i64, + last_modified, size: metadata.len() as usize, }) .map_err(|_| Error::Generic(format!("Invalid path: {:?}", ent.path()))) @@ -97,20 +97,53 @@ impl FileSystemClient for SyncFilesystemClient { #[cfg(test)] mod tests { - use std::fs::File; use std::io::Write; + use std::os::unix::fs::MetadataExt; + use std::{fs::File, time::UNIX_EPOCH}; use bytes::{BufMut, BytesMut}; + use itertools::Itertools; use url::Url; use super::SyncFilesystemClient; - use crate::FileSystemClient; + use crate::{FileMeta, FileSystemClient}; /// generate json filenames that follow the spec (numbered padded to 20 chars) fn get_json_filename(index: usize) -> String { format!("{index:020}.json") } + #[test] + fn test_file_meta_is_correct() -> Result<(), Box> { + let client = SyncFilesystemClient; + let tmp_dir = tempfile::tempdir().unwrap(); + + let path = tmp_dir.path().join(get_json_filename(1)); + let mut f = File::create(path)?; + writeln!(f, "null")?; + + let url_path = tmp_dir.path().join(get_json_filename(1)); + let url = Url::from_file_path(url_path).unwrap(); + let list: Vec<_> = client.list_from(&url)?.try_collect()?; + + let metadata = f.metadata()?; + // We assert that the timestamp is in milliseconds + let expected_timestamp = metadata + .modified()? + .duration_since(UNIX_EPOCH)? + .as_millis() + .try_into()?; + let expected_size = metadata.size().try_into()?; + let expected_file_meta = FileMeta { + location: url, + last_modified: expected_timestamp, + size: expected_size, + }; + + assert_eq!(list, vec![expected_file_meta]); + Ok(()) + } + #[test] fn test_list_from() -> Result<(), Box> { let client = SyncFilesystemClient; From dcc2d497d684da2247ccc917afc576391208be7d Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Sat, 7 Dec 2024 16:35:34 -0800 Subject: [PATCH 2/7] Fix file meta tests --- kernel/src/engine/default/filesystem.rs | 33 ++++++++++++------------- kernel/src/engine/sync/fs_client.rs | 29 +++++++++------------- 2 files changed, 27 insertions(+), 35 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 50d830c11..99582f980 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -155,7 +155,9 @@ impl FileSystemClient for ObjectStoreFileSystemClient { #[cfg(test)] mod tests { use std::ops::Range; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; + use object_store::memory::InMemory; use object_store::{local::LocalFileSystem, ObjectStore}; use test_utils::delta_path_for_version; @@ -218,33 +220,30 @@ mod tests { #[tokio::test] async fn test_file_meta_is_correct() { - let tmp = tempfile::tempdir().unwrap(); - let tmp_store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap(); + let store = Arc::new(InMemory::new()); + + let begin_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + // The [`FileMeta`]s must be greater than 2 minute ago + let allowed_time = begin_time - Duration::from_secs(120); let data = Bytes::from("kernel-data"); let name = delta_path_for_version(1, "json"); - tmp_store.put(&name, data.clone().into()).await.unwrap(); + store.put(&name, data.clone().into()).await.unwrap(); - let url = Url::from_directory_path(tmp.path()).unwrap(); - let prefix = Path::from_url_path(url.path()).expect("Couldn't get path"); - let store = Arc::new(LocalFileSystem::new()); - let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new())); + let table_root = Url::parse("memory:///").expect("valid url"); + let path = Path::from(format!("_delta_log/{}", name)); + let engine = DefaultEngine::new(store, path, Arc::new(TokioBackgroundExecutor::new())); let files: Vec<_> = engine .get_file_system_client() - .list_from(&Url::parse("file://").unwrap()) + .list_from(&table_root) .unwrap() .try_collect() .unwrap(); - let object_meta = tmp_store.head(&name).await.unwrap(); - let expected_location = url.join(name.as_ref()).unwrap(); - let expected_file_meta = FileMeta { - location: expected_location, - // We assert that the timestamp is in milliseconds - last_modified: object_meta.last_modified.timestamp_millis(), - size: object_meta.size, - }; - assert_eq!(files, vec![expected_file_meta]); + for meta in files.into_iter() { + let meta_time = Duration::from_millis(meta.last_modified.try_into().unwrap()); + assert!(allowed_time < meta_time); + } } #[tokio::test] async fn test_default_engine_listing() { diff --git a/kernel/src/engine/sync/fs_client.rs b/kernel/src/engine/sync/fs_client.rs index 003d6b2f1..597e5fa12 100644 --- a/kernel/src/engine/sync/fs_client.rs +++ b/kernel/src/engine/sync/fs_client.rs @@ -98,7 +98,7 @@ impl FileSystemClient for SyncFilesystemClient { #[cfg(test)] mod tests { use std::io::Write; - use std::os::unix::fs::MetadataExt; + use std::time::{Duration, SystemTime}; use std::{fs::File, time::UNIX_EPOCH}; use bytes::{BufMut, BytesMut}; @@ -106,7 +106,7 @@ mod tests { use url::Url; use super::SyncFilesystemClient; - use crate::{FileMeta, FileSystemClient}; + use crate::FileSystemClient; /// generate json filenames that follow the spec (numbered padded to 20 chars) fn get_json_filename(index: usize) -> String { @@ -118,29 +118,22 @@ mod tests { let client = SyncFilesystemClient; let tmp_dir = tempfile::tempdir().unwrap(); + let begin_time = SystemTime::now().duration_since(UNIX_EPOCH)?; + // The [`FileMeta`]s must be greater than 2 minute ago + let allowed_time = begin_time - Duration::from_secs(120); + let path = tmp_dir.path().join(get_json_filename(1)); let mut f = File::create(path)?; writeln!(f, "null")?; + f.flush()?; let url_path = tmp_dir.path().join(get_json_filename(1)); let url = Url::from_file_path(url_path).unwrap(); let list: Vec<_> = client.list_from(&url)?.try_collect()?; - - let metadata = f.metadata()?; - // We assert that the timestamp is in milliseconds - let expected_timestamp = metadata - .modified()? - .duration_since(UNIX_EPOCH)? - .as_millis() - .try_into()?; - let expected_size = metadata.size().try_into()?; - let expected_file_meta = FileMeta { - location: url, - last_modified: expected_timestamp, - size: expected_size, - }; - - assert_eq!(list, vec![expected_file_meta]); + for meta in list.iter() { + let meta_time = Duration::from_millis(meta.last_modified.try_into()?); + assert!(allowed_time < meta_time,); + } Ok(()) } From b3bde7679139e8956fcd9ccaf1517e80e97f8926 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Sat, 7 Dec 2024 16:36:42 -0800 Subject: [PATCH 3/7] make tests use 60 seconds instead --- kernel/src/engine/default/filesystem.rs | 4 ++-- kernel/src/engine/sync/fs_client.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 99582f980..10e94b43b 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -223,8 +223,8 @@ mod tests { let store = Arc::new(InMemory::new()); let begin_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - // The [`FileMeta`]s must be greater than 2 minute ago - let allowed_time = begin_time - Duration::from_secs(120); + // The [`FileMeta`]s must be greater than 1 minute ago + let allowed_time = begin_time - Duration::from_secs(60); let data = Bytes::from("kernel-data"); let name = delta_path_for_version(1, "json"); diff --git a/kernel/src/engine/sync/fs_client.rs b/kernel/src/engine/sync/fs_client.rs index 597e5fa12..47cef55db 100644 --- a/kernel/src/engine/sync/fs_client.rs +++ b/kernel/src/engine/sync/fs_client.rs @@ -119,8 +119,8 @@ mod tests { let tmp_dir = tempfile::tempdir().unwrap(); let begin_time = SystemTime::now().duration_since(UNIX_EPOCH)?; - // The [`FileMeta`]s must be greater than 2 minute ago - let allowed_time = begin_time - Duration::from_secs(120); + // The [`FileMeta`]s must be greater than 1 minute ago + let allowed_time = begin_time - Duration::from_secs(60); let path = tmp_dir.path().join(get_json_filename(1)); let mut f = File::create(path)?; From 97e61a66b4055e83d02eed9be31b4a231a021fa1 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Sat, 7 Dec 2024 16:43:04 -0800 Subject: [PATCH 4/7] Move FileMeta construction to TryFrom --- kernel/src/engine/sync/fs_client.rs | 21 +-------------------- kernel/src/lib.rs | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/kernel/src/engine/sync/fs_client.rs b/kernel/src/engine/sync/fs_client.rs index 47cef55db..ab30416fa 100644 --- a/kernel/src/engine/sync/fs_client.rs +++ b/kernel/src/engine/sync/fs_client.rs @@ -49,26 +49,7 @@ impl FileSystemClient for SyncFilesystemClient { let it = all_ents .into_iter() .sorted_by_key(|ent| ent.path()) - .map(|ent| { - ent.metadata().map_err(Error::IOError).and_then(|metadata| { - let last_modified = metadata - .modified() - .ok() - .and_then(|modified| { - modified.duration_since(SystemTime::UNIX_EPOCH).ok() - }) - .and_then(|modified| modified.as_millis().try_into().ok()) - .unwrap_or(0); - - Url::from_file_path(ent.path()) - .map(|location| FileMeta { - location, - last_modified, - size: metadata.len() as usize, - }) - .map_err(|_| Error::Generic(format!("Invalid path: {:?}", ent.path()))) - }) - }); + .map(TryFrom::try_from); Ok(Box::new(it)) } else { Err(Error::generic("Can only read local filesystem")) diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 110a822e7..e771379d1 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -59,7 +59,9 @@ )] use std::any::Any; +use std::fs::DirEntry; use std::sync::Arc; +use std::time::SystemTime; use std::{cmp::Ordering, ops::Range}; use bytes::Bytes; @@ -142,6 +144,31 @@ impl PartialOrd for FileMeta { } } +impl TryFrom for FileMeta { + type Error = Error; + + fn try_from(ent: DirEntry) -> DeltaResult { + let metadata = ent.metadata()?; + let last_modified = metadata + .modified()? + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|_| Error::generic("Failed to convert file timestamp to milliseconds"))?; + let location = Url::from_file_path(ent.path()) + .map_err(|_| Error::generic(format!("Invalid path: {:?}", ent.path())))?; + let last_modified = last_modified.as_millis().try_into().map_err(|_| { + Error::generic(format!( + "Failed to convert file modification time {:?} into i64", + last_modified.as_millis() + )) + })?; + Ok(FileMeta { + location, + last_modified, + size: metadata.len() as usize, + }) + } +} + impl FileMeta { /// Create a new instance of `FileMeta` pub fn new(location: Url, last_modified: i64, size: usize) -> Self { From 7103d277bb2ff26c58170c8914788e7cd6372e74 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Sat, 7 Dec 2024 16:50:18 -0800 Subject: [PATCH 5/7] remove compilation error --- kernel/src/engine/sync/fs_client.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/kernel/src/engine/sync/fs_client.rs b/kernel/src/engine/sync/fs_client.rs index ab30416fa..9d9ff38f7 100644 --- a/kernel/src/engine/sync/fs_client.rs +++ b/kernel/src/engine/sync/fs_client.rs @@ -1,5 +1,3 @@ -use std::time::SystemTime; - use bytes::Bytes; use itertools::Itertools; use url::Url; From dd71d00a16601eff54cd1e3abddd7d657121c0bc Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 9 Dec 2024 09:55:25 -0800 Subject: [PATCH 6/7] Fixup tests --- kernel/src/engine/default/filesystem.rs | 14 ++++++++++---- kernel/src/engine/sync/fs_client.rs | 15 +++++++++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 10e94b43b..433c79eea 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -224,15 +224,15 @@ mod tests { let begin_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); // The [`FileMeta`]s must be greater than 1 minute ago - let allowed_time = begin_time - Duration::from_secs(60); + let allowed_begin_time = begin_time - Duration::from_secs(60); let data = Bytes::from("kernel-data"); let name = delta_path_for_version(1, "json"); store.put(&name, data.clone().into()).await.unwrap(); let table_root = Url::parse("memory:///").expect("valid url"); - let path = Path::from(format!("_delta_log/{}", name)); - let engine = DefaultEngine::new(store, path, Arc::new(TokioBackgroundExecutor::new())); + let prefix = Path::from_url_path(table_root.path()).expect("Couldn't get path"); + let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new())); let files: Vec<_> = engine .get_file_system_client() .list_from(&table_root) @@ -240,9 +240,15 @@ mod tests { .try_collect() .unwrap(); + let end_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + // The [`FileMeta`]s must be less than one minute in the future + let allowed_end_time = end_time + Duration::from_secs(60); + + assert!(!files.is_empty()); for meta in files.into_iter() { let meta_time = Duration::from_millis(meta.last_modified.try_into().unwrap()); - assert!(allowed_time < meta_time); + assert!(allowed_begin_time < meta_time); + assert!(meta_time < allowed_end_time); } } #[tokio::test] diff --git a/kernel/src/engine/sync/fs_client.rs b/kernel/src/engine/sync/fs_client.rs index 9d9ff38f7..ea7d91b5e 100644 --- a/kernel/src/engine/sync/fs_client.rs +++ b/kernel/src/engine/sync/fs_client.rs @@ -99,7 +99,7 @@ mod tests { let begin_time = SystemTime::now().duration_since(UNIX_EPOCH)?; // The [`FileMeta`]s must be greater than 1 minute ago - let allowed_time = begin_time - Duration::from_secs(60); + let allowed_begin_time = begin_time - Duration::from_secs(60); let path = tmp_dir.path().join(get_json_filename(1)); let mut f = File::create(path)?; @@ -108,10 +108,17 @@ mod tests { let url_path = tmp_dir.path().join(get_json_filename(1)); let url = Url::from_file_path(url_path).unwrap(); - let list: Vec<_> = client.list_from(&url)?.try_collect()?; - for meta in list.iter() { + let files: Vec<_> = client.list_from(&url)?.try_collect()?; + + let end_time = SystemTime::now().duration_since(UNIX_EPOCH)?; + // The [`FileMeta`]s must be less than one minute in the future + let allowed_end_time = end_time + Duration::from_secs(60); + + assert!(!files.is_empty()); + for meta in files.iter() { let meta_time = Duration::from_millis(meta.last_modified.try_into()?); - assert!(allowed_time < meta_time,); + assert!(allowed_begin_time < meta_time); + assert!(meta_time < allowed_end_time); } Ok(()) } From c765eaed9fdc88aebefa7804d67fa097f16f12eb Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 9 Dec 2024 10:38:30 -0800 Subject: [PATCH 7/7] change test to use absolute, bring down to 10s --- kernel/src/engine/default/filesystem.rs | 9 +-------- kernel/src/engine/sync/fs_client.rs | 9 +-------- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 433c79eea..36b110774 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -223,8 +223,6 @@ mod tests { let store = Arc::new(InMemory::new()); let begin_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - // The [`FileMeta`]s must be greater than 1 minute ago - let allowed_begin_time = begin_time - Duration::from_secs(60); let data = Bytes::from("kernel-data"); let name = delta_path_for_version(1, "json"); @@ -240,15 +238,10 @@ mod tests { .try_collect() .unwrap(); - let end_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - // The [`FileMeta`]s must be less than one minute in the future - let allowed_end_time = end_time + Duration::from_secs(60); - assert!(!files.is_empty()); for meta in files.into_iter() { let meta_time = Duration::from_millis(meta.last_modified.try_into().unwrap()); - assert!(allowed_begin_time < meta_time); - assert!(meta_time < allowed_end_time); + assert!(meta_time.abs_diff(begin_time) < Duration::from_secs(10)); } } #[tokio::test] diff --git a/kernel/src/engine/sync/fs_client.rs b/kernel/src/engine/sync/fs_client.rs index ea7d91b5e..a5c889da5 100644 --- a/kernel/src/engine/sync/fs_client.rs +++ b/kernel/src/engine/sync/fs_client.rs @@ -98,8 +98,6 @@ mod tests { let tmp_dir = tempfile::tempdir().unwrap(); let begin_time = SystemTime::now().duration_since(UNIX_EPOCH)?; - // The [`FileMeta`]s must be greater than 1 minute ago - let allowed_begin_time = begin_time - Duration::from_secs(60); let path = tmp_dir.path().join(get_json_filename(1)); let mut f = File::create(path)?; @@ -110,15 +108,10 @@ mod tests { let url = Url::from_file_path(url_path).unwrap(); let files: Vec<_> = client.list_from(&url)?.try_collect()?; - let end_time = SystemTime::now().duration_since(UNIX_EPOCH)?; - // The [`FileMeta`]s must be less than one minute in the future - let allowed_end_time = end_time + Duration::from_secs(60); - assert!(!files.is_empty()); for meta in files.iter() { let meta_time = Duration::from_millis(meta.last_modified.try_into()?); - assert!(allowed_begin_time < meta_time); - assert!(meta_time < allowed_end_time); + assert!(meta_time.abs_diff(begin_time) < Duration::from_secs(10)); } Ok(()) }