Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
zachschuermann committed Feb 25, 2025
1 parent 811cc2e commit 494a470
Showing 1 changed file with 10 additions and 17 deletions.
27 changes: 10 additions & 17 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,29 +217,28 @@ impl JsonOpener {

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

use crate::arrow::array::{AsArray, RecordBatch, StringArray};
use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use futures::future;
use itertools::Itertools;
use object_store::{local::LocalFileSystem, ObjectStore};
use object_store::{
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, PutMultipartOpts,
PutOptions, PutPayload, PutResult, Result,
};

use super::*;
use crate::{
actions::get_log_schema, engine::arrow_data::ArrowEngineData,
engine::default::executor::tokio::TokioBackgroundExecutor,
};

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

use object_store::{
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, PutMultipartOpts,
PutOptions, PutPayload, PutResult, Result,
};

/// Store wrapper that wraps an inner store to purposefully delay GET requests of certain keys.
#[derive(Debug)]
struct SlowGetStore<T> {
Expand Down Expand Up @@ -455,19 +454,16 @@ mod tests {
];
let paths = paths.map(|p| std::fs::canonicalize(PathBuf::from(p)).unwrap());

// Convert &std::path::Path to object_store::path::Path
let path_string = paths[0].to_string_lossy().to_string();
let object_store_path = Path::from(path_string);
// for the first 000000.json, wait for 1 second
let key_map = (object_store_path, Duration::from_secs(1));

let store = Arc::new(SlowGetStore::new(
LocalFileSystem::new(),
vec![key_map].into_iter().collect(),
));

// Create a vector of futures, then await them all
use futures::future;

let file_futures: Vec<_> = paths
.iter()
.map(|path| {
Expand All @@ -484,11 +480,8 @@ mod tests {
}
})
.collect();

// Await all futures to get a Vec<FileMeta>
let files = future::join_all(file_futures).await;

// Same as before, but now files is a Vec<FileMeta>
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
let physical_schema = Arc::new(ArrowSchema::try_from(get_log_schema().as_ref()).unwrap());
let data: Vec<RecordBatch> = handler
Expand Down

0 comments on commit 494a470

Please sign in to comment.