Skip to content

Commit

Permalink
feat: Implement ExecutionLog on fs 🚀 (#1356)
Browse files Browse the repository at this point in the history
* 🚀 Interface placeholders

* feat: introduce log sink

* api implementation (#3)

* feat: implement log reader (#5)

* feat: implement log reader

* feat: implement write schemas (#6)

---------

Co-authored-by: VG <[email protected]>
Co-authored-by: VG <[email protected]>

* fix compilation

* feat: write schemas properly

* temp: schema and proto bug fix

* path bug fix

* file not created

* chore: fix log reader

* chore: allow cache_dir config

* chore: include args in example

* chore: fix cache_dir

* feat: mofiy schema

* chore: fix clippy

* chore: fix fmt

* chore: fix admin test

* chore: ignore conflict resolution tests

* chore: add buffered capacity

---------

Co-authored-by: VG <[email protected]>
Co-authored-by: VG <[email protected]>
  • Loading branch information
3 people authored Apr 3, 2023
1 parent ea8217c commit a4f8746
Show file tree
Hide file tree
Showing 44 changed files with 1,067 additions and 1,017 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dozer-admin/src/services/application_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ impl AppService {
.map_err(|op| ErrorResponse {
message: op.to_string(),
})?;

let applications: Vec<AppResponse> = results
.iter()
.map(|result| {
Expand Down
15 changes: 9 additions & 6 deletions dozer-admin/src/tests/applications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,24 @@ mod grpc_service {

use crate::services::application_service::AppService;
use crate::tests::utils::database_url_for_test_env;
use crate::tests::utils::{establish_test_connection, get_setup_ids};
use crate::tests::utils::establish_test_connection;
use dozer_types::grpc_types::admin::{
AppResponse, CreateAppRequest, ListAppRequest, ListAppResponse, UpdateAppRequest,
};

#[test]
pub fn list_create_update() {
let test_db_connection = database_url_for_test_env();
let db_pool = establish_test_connection(test_db_connection);
let setup_ids = get_setup_ids();

let application_service = AppService::new(db_pool);

let config = generate_connection("Postgres");
let postgres_config = generate_connection("Postgres");
let config = Config {
app_name: "new_app_name".to_owned(),
connections: vec![config],
home_dir: "dozer".to_owned(),
cache_dir: "dozer".to_owned(),
connections: vec![postgres_config],
..Default::default()
};
let request = CreateAppRequest {
Expand All @@ -38,8 +41,8 @@ mod grpc_service {
offset: Some(0),
})
.unwrap();
assert!(!result.apps.is_empty());
assert_eq!(result.apps[0].id, setup_ids.app_id);

assert_eq!(result.apps.len(), 1);

let mut updated_config = config;
updated_config.app_name = "updated_app_name".to_owned();
Expand Down
42 changes: 3 additions & 39 deletions dozer-admin/src/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use diesel::{r2d2::ConnectionManager, RunQueryDsl, SqliteConnection};
use diesel::{r2d2::ConnectionManager, SqliteConnection};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
use dozer_orchestrator::cli::generate_connection;
use dozer_types::models::app_config::Config;
Expand All @@ -18,8 +18,7 @@ struct TestConnectionCustomizer;

impl<E> CustomizeConnection<SqliteConnection, E> for TestConnectionCustomizer {
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), E> {
let setup_ids = get_setup_ids();
prepare_test_db(conn, setup_ids);
prepare_test_db(conn);
Ok(())
}
fn on_release(&self, conn: SqliteConnection) {
Expand All @@ -36,34 +35,10 @@ pub fn establish_test_connection(database_url: String) -> DbPool {
.expect("Failed to create DB pool.")
}

fn prepare_test_db(connection: &mut SqliteConnection, config_id: TestConfigId) {
fn prepare_test_db(connection: &mut SqliteConnection) {
run_migrations(connection).unwrap();
setup_data(connection, config_id)
}

pub fn get_setup_ids() -> TestConfigId {
let connection_ids: Vec<String> = vec![
"9cd38b34-3100-4b61-99fb-ca3626b90f59".to_owned(),
"2afd6d1f-f739-4f02-9683-b469011936a4".to_owned(),
"dc5d0a89-7b7a-4ab1-88a0-f23ec5c73482".to_owned(),
"67df73b7-a322-4ff7-86b4-d7a5b12416d9".to_owned(),
"7a82ead6-bfd2-4336-805c-a7058dfac3a6".to_owned(),
];

let source_ids: Vec<String> = vec![
"ebec89f4-80c7-4519-99d3-94cf55669c2b".to_owned(),
"0ea2cb76-1103-476d-935c-fe5f745bad53".to_owned(),
"28732cb6-7a68-4e34-99f4-99e356daa06d".to_owned(),
"bce87d76-93dc-42af-bffa-d47743f4c7fa".to_owned(),
"d0356a18-77f5-479f-a690-536d086707d8".to_owned(),
];
TestConfigId {
app_id: "a04376da-3af3-4051-a725-ed0073b3b598".to_owned(),
connection_ids,
source_ids,
api_ids: vec!["de3052fc-affb-46f8-b8c1-0ac69ee91a4f".to_owned()],
}
}
pub fn database_url_for_test_env() -> String {
String::from(":memory:")
}
Expand All @@ -76,17 +51,6 @@ pub fn get_sample_config() -> String {
serde_yaml::to_string(&config).unwrap()
}

fn setup_data(connection: &mut SqliteConnection, config_id: TestConfigId) {
// let generated_app_id = uuid::Uuid::new_v4().to_string();
// create app
insert_apps(connection, config_id.app_id, get_sample_config());
}

fn insert_apps(connection: &mut SqliteConnection, app_id: String, config: String) {
diesel::sql_query(format!("INSERT INTO apps (id, name, config, created_at, updated_at) VALUES('{app_id}', \'app_name\', '{config}', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP);"))
.execute(connection)
.unwrap();
}
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");

fn run_migrations(
Expand Down
1 change: 1 addition & 0 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tower = "0.4.13"
hyper = "0.14.24"
tower-http = {version = "0.3.5", features = ["full"]}
arc-swap = "1.6.0"
dozer-core = { version = "0.1.14", path = "../dozer-core" }

[dev-dependencies]
tempdir = "0.3.7"
26 changes: 26 additions & 0 deletions dozer-api/examples/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::{env, path::Path};

use dozer_api::LogReader;
use futures_util::StreamExt;

#[tokio::main]
async fn main() {
let args: Vec<String> = env::args().collect();

let mut path = ".dozer/pipeline/logs/trips";
if args.len() == 2 {
path = &args[1];
};
let log_reader = LogReader::new(Path::new(path), "logs", 0, None).unwrap();

tokio::pin!(log_reader);

let mut counter = 0;
while let Some(_op) = log_reader.next().await {
counter += 1;

if counter > 100000 {
break;
}
}
}
135 changes: 135 additions & 0 deletions dozer-api/src/cache_builder/log_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::{
fs::{File, OpenOptions},
io::{BufReader, Read},
path::Path,
thread::sleep,
time::Duration,
};

use dozer_cache::errors::{CacheError, LogError};
use dozer_core::executor::ExecutorOperation;
use dozer_types::{
bincode,
indicatif::{MultiProgress, ProgressBar, ProgressStyle},
log::trace,
};
use futures_util::{FutureExt, Stream};

pub struct LogReader {
reader: BufReader<File>,
name: String,
pos: u64,
pb: ProgressBar,
count: u64,
}
const SLEEP_TIME_MS: u16 = 300;
impl LogReader {
pub fn new(
path: &Path,
name: &str,
pos: u64,
multi_pb: Option<MultiProgress>,
) -> Result<Self, CacheError> {
let file = OpenOptions::new()
.read(true)
.open(path)
.map_err(|_| CacheError::LogFileNotFound(path.to_path_buf()))?;

let mut reader = BufReader::new(file);

reader
.seek_relative(pos as i64)
.map_err(|e| CacheError::LogError(LogError::SeekError(name.to_string(), pos, e)))?;

let pb = attach_progress(multi_pb);
pb.set_message(format!("reader: {}", name));
Ok(Self {
reader,
name: name.to_string(),
pos,
pb,
count: 0,
})
}
async fn next_op(&mut self) -> ExecutorOperation {
loop {
let msg = read_msg(&mut self.reader);
match msg {
Ok((msg, len)) => {
self.pos += len;
self.count += 1;
self.pb.set_position(self.count);
return msg;
}
Err(e) => {
trace!(
"Error reading log : {}, Going to sleep : {} ms, Error : {:?}",
self.name,
SLEEP_TIME_MS,
e
);

// go to sleep for a bit
sleep(Duration::from_millis(SLEEP_TIME_MS.into()));
}
}
}
}
}

impl Stream for LogReader {
type Item = ExecutorOperation;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
match Box::pin(this.next_op()).poll_unpin(cx) {
std::task::Poll::Ready(msg) => std::task::Poll::Ready(Some(msg)),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}

fn read_msg(reader: &mut BufReader<File>) -> Result<(ExecutorOperation, u64), LogError> {
let mut buf = [0; 8];
reader.read_exact(&mut buf).map_err(LogError::ReadError)?;
let len = u64::from_le_bytes(buf);

let buf = read_n(reader, len);
let msg = bincode::deserialize(&buf).map_err(LogError::DeserializationError)?;
Ok((msg, len + 8))
}
fn read_n<R>(reader: R, bytes_to_read: u64) -> Vec<u8>
where
R: Read,
{
let mut buf = vec![];
let mut chunk = reader.take(bytes_to_read);

let n = chunk.read_to_end(&mut buf).expect("Didn't read enough");
assert_eq!(bytes_to_read as usize, n);
buf
}

fn attach_progress(multi_pb: Option<MultiProgress>) -> ProgressBar {
let pb = ProgressBar::new_spinner();
multi_pb.as_ref().map(|m| m.add(pb.clone()));
pb.set_style(
ProgressStyle::with_template("{spinner:.blue} {msg}: {pos}: {per_sec}")
.unwrap()
// For more spinners check out the cli-spinners project:
// https://github.com/sindresorhus/cli-spinners/blob/master/spinners.json
.tick_strings(&[
"▹▹▹▹▹",
"▸▹▹▹▹",
"▹▸▹▹▹",
"▹▹▸▹▹",
"▹▹▹▸▹",
"▹▹▹▹▸",
"▪▪▪▪▪",
]),
);
pb
}
Loading

0 comments on commit a4f8746

Please sign in to comment.