Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Engines now return FileMeta with correct millisecond timestamps #565

Merged
32 changes: 31 additions & 1 deletion kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
sender
.send(Ok(FileMeta {
location,
last_modified: meta.last_modified.timestamp(),
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
}))
.ok();
Expand Down Expand Up @@ -216,6 +216,36 @@ mod tests {
assert_eq!(data[2], Bytes::from("el-da"));
}

#[tokio::test]
async fn test_file_meta_is_correct() {
let tmp = tempfile::tempdir().unwrap();
let tmp_store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();

let data = Bytes::from("kernel-data");
let name = delta_path_for_version(1, "json");
tmp_store.put(&name, data.clone().into()).await.unwrap();

let url = Url::from_directory_path(tmp.path()).unwrap();
let prefix = Path::from_url_path(url.path()).expect("Couldn't get path");
let store = Arc::new(LocalFileSystem::new());
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
let files: Vec<_> = engine
.get_file_system_client()
.list_from(&Url::parse("file://").unwrap())
.unwrap()
.try_collect()
.unwrap();

let object_meta = tmp_store.head(&name).await.unwrap();
let expected_location = url.join(name.as_ref()).unwrap();
let expected_file_meta = FileMeta {
location: expected_location,
// We assert that the timestamp is in milliseconds
last_modified: object_meta.last_modified.timestamp_millis(),
size: object_meta.size,
};
assert_eq!(files, vec![expected_file_meta]);
}
#[tokio::test]
async fn test_default_engine_listing() {
let tmp = tempfile::tempdir().unwrap();
Expand Down
53 changes: 43 additions & 10 deletions kernel/src/engine/sync/fs_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ impl FileSystemClient for SyncFilesystemClient {
.sorted_by_key(|ent| ent.path())
.map(|ent| {
ent.metadata().map_err(Error::IOError).and_then(|metadata| {
let last_modified: u64 = metadata
let last_modified = metadata
.modified()
.map(
|modified| match modified.duration_since(SystemTime::UNIX_EPOCH) {
Ok(d) => d.as_secs(),
Err(_) => 0,
},
)
.ok()
.and_then(|modified| {
modified.duration_since(SystemTime::UNIX_EPOCH).ok()
})
.and_then(|modified| modified.as_millis().try_into().ok())
.unwrap_or(0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we converting Result to Option three times here? Why is that needed?

Suggested change
.ok()
.and_then(|modified| {
modified.duration_since(SystemTime::UNIX_EPOCH).ok()
})
.and_then(|modified| modified.as_millis().try_into().ok())
.unwrap_or(0);
.and_then(|modified| modified.duration_since(SystemTime::UNIX_EPOCH))
.and_then(|modified| modified.as_millis().try_into())
.unwrap_or(0);

Or do types not match in some annoying way?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, all this monadic chaining feels like a poor substitute for a missing helper method...

impl TryFrom<DirEntry> for FileMeta {
    type Error = Error;

    fn try_from(ent: DirEntry) -> DeltaResult<FileMeta> {
        let metadata = ent.metadata()?;
        let last_modified = metadata.modified()?.duration_since(SystemTime::UNIX_EPOCH)?;
        Ok(FileMeta {
            location: Url::from_file_path(ent.path())?,
            last_modified: last_modified.as_millis().try_into()?,
            size: metadata.len() as usize,
        })
    }
}

and then

            let it = all_ents
                .into_iter()
                .sorted_by_key(|ent| ent.path())
                .map(TryFrom::try_from);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah that's clean 👌 I'm doing a lot of map_err, and I'm wondering if we should do From conversions for common things like Url::from_* and TryInto 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea seems reasonable? make an issue for us to think about?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have the From conversion for url::ParseError, so ? should Just Work? The problem with TryInto is the error type is parametrized, so there's no way to know in advance what might be needed. We'd have to go case by case (when in doubt, try ? and hopefully there's already an impl From for it to use).


Url::from_file_path(ent.path())
.map(|location| FileMeta {
location,
last_modified: last_modified as i64,
last_modified,
size: metadata.len() as usize,
})
.map_err(|_| Error::Generic(format!("Invalid path: {:?}", ent.path())))
Expand Down Expand Up @@ -97,20 +97,53 @@ impl FileSystemClient for SyncFilesystemClient {

#[cfg(test)]
mod tests {
use std::fs::File;
use std::io::Write;
use std::os::unix::fs::MetadataExt;
use std::{fs::File, time::UNIX_EPOCH};

use bytes::{BufMut, BytesMut};
use itertools::Itertools;
use url::Url;

use super::SyncFilesystemClient;
use crate::FileSystemClient;
use crate::{FileMeta, FileSystemClient};

/// generate json filenames that follow the spec (numbered padded to 20 chars)
fn get_json_filename(index: usize) -> String {
format!("{index:020}.json")
}

#[test]
fn test_file_meta_is_correct() -> Result<(), Box<dyn std::error::Error>> {
let client = SyncFilesystemClient;
let tmp_dir = tempfile::tempdir().unwrap();

let path = tmp_dir.path().join(get_json_filename(1));
let mut f = File::create(path)?;
writeln!(f, "null")?;

let url_path = tmp_dir.path().join(get_json_filename(1));
let url = Url::from_file_path(url_path).unwrap();
let list: Vec<_> = client.list_from(&url)?.try_collect()?;

let metadata = f.metadata()?;
// We assert that the timestamp is in milliseconds
let expected_timestamp = metadata
.modified()?
.duration_since(UNIX_EPOCH)?
.as_millis()
.try_into()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the exact same code the iterator uses... so it's not actually testing for correctness but rather testing that the code is equivalent. Not sure what the better test would be? Maybe we write a file and manually set its mtime, then verify the FileMeta has the same timestamp?

(same criticism applies to the default client test)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe create the file then just assert that the timestamp we get back is within 10s or 60s or something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented a test that checks that the filemeta is from the last minute.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems good for now :)

let expected_size = metadata.size().try_into()?;
let expected_file_meta = FileMeta {
location: url,
last_modified: expected_timestamp,
size: expected_size,
};

assert_eq!(list, vec![expected_file_meta]);
Ok(())
}

#[test]
fn test_list_from() -> Result<(), Box<dyn std::error::Error>> {
let client = SyncFilesystemClient;
Expand Down
Loading