From 8918d2abeeac3be4ceacb980086e82838b8ecbae Mon Sep 17 00:00:00 2001 From: Eleonora Kiziv Date: Tue, 3 Mar 2020 20:09:31 -0500 Subject: [PATCH 1/3] Add remove_query rpc --- noria-server/src/controller/inner.rs | 32 +++++++++++++++++++++++ noria-server/src/controller/recipe/mod.rs | 2 +- noria/src/controller.rs | 17 ++++++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/noria-server/src/controller/inner.rs b/noria-server/src/controller/inner.rs index dc99f1f58..68021f49b 100644 --- a/noria-server/src/controller/inner.rs +++ b/noria-server/src/controller/inner.rs @@ -309,6 +309,12 @@ impl ControllerInner { self.remove_nodes(vec![args].as_slice()) .map(|r| json::to_string(&r).unwrap()) }), + (Method::POST, "/remove_query") => json::from_slice(&body) + .map_err(|_| StatusCode::BAD_REQUEST) + .map(|args| { + self.remove_query(authority, args) + .map(|r| json::to_string(&r).unwrap()) + }), _ => Err(StatusCode::NOT_FOUND), } } @@ -1349,6 +1355,32 @@ impl ControllerInner { }) } } + fn remove_query( + &mut self, + authority: &Arc, + qname: &str, + ) -> Result<(), String> { + let old = self.recipe.clone(); + self.recipe.remove_query(qname); + let updated = self.recipe.clone(); + let replaced = old.replace(updated).unwrap(); + + let activation_result = self.apply_recipe(replaced); + if authority + .read_modify_write(STATE_KEY, |state: Option| match state { + None => unreachable!(), + Some(ref state) if state.epoch > self.epoch => Err(()), + Some(mut state) => { + state.recipe_version = self.recipe.version(); + Ok(state) + } + }) + .is_err() + { + return Err("Failed to persist recipe extension".to_owned()); + } + Ok(()) + } } impl Drop for ControllerInner { diff --git a/noria-server/src/controller/recipe/mod.rs b/noria-server/src/controller/recipe/mod.rs index a3076c26a..c00c752a9 100644 --- a/noria-server/src/controller/recipe/mod.rs +++ b/noria-server/src/controller/recipe/mod.rs @@ -650,7 +650,7 @@ impl Recipe { self.prior.as_ref().map(|p| &**p) } - fn remove_query(&mut self, qname: &str) -> bool { + crate fn remove_query(&mut self, qname: &str) -> bool { let qid = self.aliases.get(qname).cloned(); if qid.is_none() { warn!(self.log, "Query {} not found in expressions", qname); diff --git a/noria/src/controller.rs b/noria/src/controller.rs index 3816fea7d..fc934a2c5 100644 --- a/noria/src/controller.rs +++ b/noria/src/controller.rs @@ -447,6 +447,16 @@ impl ControllerHandle { self.rpc("remove_node", view, "failed to remove node") } + /// Remove the given query view from the graph. + /// + /// `Self::poll_ready` must have returned `Async::Ready` before you call this method. + pub fn remove_query( + &mut self, + view: &str, + ) -> impl Future + Send { + self.rpc("remove_query", view, "failed to remove query") + } + /// Construct a synchronous interface to this controller instance using the given executor to /// execute all operations. /// @@ -636,4 +646,11 @@ where let fut = self.handle()?.remove_node(view); self.run(fut) } + /// Remove the given external view from the graph. + /// + /// See [`ControllerHandle::remove_node`]. + pub fn remove_query(&mut self, view: &str) -> Result<(), failure::Error> { + let fut = self.handle()?.remove_query(view); + self.run(fut) + } } From 873781d9498ea99f99469e322322dc03edb32fb6 Mon Sep 17 00:00:00 2001 From: Malte Schwarzkopf Date: Mon, 16 Mar 2020 09:16:31 -0400 Subject: [PATCH 2/3] Add --skip-controller to noria-zk This helps debug leftover state from an exited controller. --- noria-server/src/bin/zk.rs | 41 +++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/noria-server/src/bin/zk.rs b/noria-server/src/bin/zk.rs index c68f3a81e..498484c11 100644 --- a/noria-server/src/bin/zk.rs +++ b/noria-server/src/bin/zk.rs @@ -57,26 +57,41 @@ fn main() { .required_unless("clean") .help("Print current configuration to stdout."), ) + .arg( + Arg::with_name("skip-controller") + .long("--skip-controller") + .takes_value(false) + .help("Don't print current controller."), + ) .get_matches(); let deployment = matches.value_of("deployment").unwrap(); let zookeeper_addr = format!("{}/{}", matches.value_of("zookeeper").unwrap(), deployment); let clean = matches.is_present("clean"); let dump = matches.is_present("show"); + let skip_controller = matches.is_present("skip-controller"); let zk = ZooKeeper::connect(&zookeeper_addr, Duration::from_secs(1), EventWatcher).unwrap(); if dump { - let (ref current_ctrl, ref _stat) = match zk.get_data(CONTROLLER_KEY, false) { - Ok(data) => data, - Err(e) => match e { - ZkError::NoNode => { - println!("no current Soup controller in Zookeeper!"); - return; - } - _ => panic!("{:?}", e), - }, - }; + if !skip_controller { + let (ref current_ctrl, ref _stat) = match zk.get_data(CONTROLLER_KEY, false) { + Ok(data) => data, + Err(e) => match e { + ZkError::NoNode => { + println!("no current Soup controller in Zookeeper!"); + return; + } + _ => panic!("{:?}", e), + }, + }; + + let controller: Value = serde_json::from_slice(current_ctrl).unwrap(); + println!( + "Current Soup controller in Zookeeper:\n{}\n\n", + serde_json::to_string_pretty(&controller).unwrap() + ); + } let (ref current_data, ref _stat) = match zk.get_data(STATE_KEY, false) { Ok(data) => data, @@ -89,12 +104,6 @@ fn main() { }, }; - let controller: Value = serde_json::from_slice(current_ctrl).unwrap(); - println!( - "Current Soup controller in Zookeeper:\n{}\n\n", - serde_json::to_string_pretty(&controller).unwrap() - ); - let state: Value = serde_json::from_slice(current_data).unwrap(); println!( "Current Soup configuration in Zookeeper:\n{}", From d10fd5d3c3c34ce85e7122215e7b77e25a2bfbb4 Mon Sep 17 00:00:00 2001 From: Eleonora Kiziv Date: Tue, 31 Mar 2020 15:23:42 -0400 Subject: [PATCH 3/3] Updated remove_query to work with compound queries --- noria-server/src/controller/inner.rs | 24 +++++++++++-- noria-server/src/controller/recipe/mod.rs | 16 +++++---- noria-server/src/controller/sql/mod.rs | 41 ++++++++++++++++++----- noria-server/src/integration.rs | 35 +++++++++++++++++++ 4 files changed, 97 insertions(+), 19 deletions(-) diff --git a/noria-server/src/controller/inner.rs b/noria-server/src/controller/inner.rs index 68021f49b..642d11b27 100644 --- a/noria-server/src/controller/inner.rs +++ b/noria-server/src/controller/inner.rs @@ -1185,6 +1185,7 @@ impl ControllerInner { // This query leaf node has children -- typically, these are readers, but they can also // include egress nodes or other, dependent queries. let mut has_non_reader_children = false; + let mut non_readers = Vec::default(); let readers: Vec<_> = self .ingredients .neighbors_directed(leaf, petgraph::EdgeDirection::Outgoing) @@ -1192,6 +1193,7 @@ impl ControllerInner { if self.ingredients[*ni].is_reader() { true } else { + non_readers.push(*ni); has_non_reader_children = true; false } @@ -1204,8 +1206,22 @@ impl ControllerInner { "not removing node {} yet, as it still has non-reader children", leaf.index() ); - unreachable!(); + // deleting non-reader and its children + for nr in non_readers { + if !self.ingredients[nr].is_base() { + let mut children = Vec::default(); + self.ingredients + .neighbors_directed(nr, petgraph::EdgeDirection::Outgoing) + .for_each(|ni| children.push(ni)); + for child in children { + self.remove_leaf(child); + } + self.remove_leaf(nr); + } + } + //unreachable; } + // nodes can have only one reader attached assert!(readers.len() <= 1); debug!( @@ -1217,7 +1233,8 @@ impl ControllerInner { removals.push(readers[0]); leaf = readers[0]; } else { - unreachable!(); + //unreachable!(); + // } } @@ -1248,7 +1265,7 @@ impl ControllerInner { .neighbors_directed(parent, petgraph::EdgeDirection::Outgoing) .count() == 0 { - nodes.push(parent); + // nodes.push(parent); } } @@ -1372,6 +1389,7 @@ impl ControllerInner { Some(ref state) if state.epoch > self.epoch => Err(()), Some(mut state) => { state.recipe_version = self.recipe.version(); + // state.recipes.push(updated_recipe.clone()); Ok(state) } }) diff --git a/noria-server/src/controller/recipe/mod.rs b/noria-server/src/controller/recipe/mod.rs index c00c752a9..945035f38 100644 --- a/noria-server/src/controller/recipe/mod.rs +++ b/noria-server/src/controller/recipe/mod.rs @@ -462,7 +462,7 @@ impl Recipe { // them twice. self.inc.as_mut().unwrap().remove_base(&ctq.table.name); match self.prior.as_ref().unwrap().node_addr_for(&ctq.table.name) { - Ok(ni) => Some(ni), + Ok(ni) => Some(vec![ni]), Err(e) => { crit!( self.log, @@ -473,15 +473,17 @@ impl Recipe { } } } - _ => self - .inc - .as_mut() - .unwrap() - .remove_query(n.as_ref().unwrap(), mig), + _ => { + self + .inc + .as_mut() + .unwrap() + .remove_query(n.as_ref().unwrap(), mig) + } } }) + .flatten() .collect(); - Ok(result) } diff --git a/noria-server/src/controller/sql/mod.rs b/noria-server/src/controller/sql/mod.rs index ae86aeed7..5a19ce6c5 100644 --- a/noria-server/src/controller/sql/mod.rs +++ b/noria-server/src/controller/sql/mod.rs @@ -65,6 +65,8 @@ crate struct SqlIncorporator { /// Active universes mapped to the group they belong to. /// If an user universe, mapped to None. universes: HashMap, Vec>, + + compound_mir_queries: HashMap, } impl Default for SqlIncorporator { @@ -87,6 +89,7 @@ impl Default for SqlIncorporator { reuse_type: ReuseConfigType::Finkelstein, universes: HashMap::default(), + compound_mir_queries: HashMap::default(), } } } @@ -451,10 +454,17 @@ impl SqlIncorporator { .iter() .enumerate() .map(|(i, sq)| { - Ok(self - .add_select_query(&format!("{}_csq_{}", query_name, i), &sq.1, false, mig)? - .1 - .unwrap()) + let q_name = &format!("{}_csq_{}", query_name, i); + let res = self.add_select_query(q_name, &sq.1, false, mig)?.1; + if res.is_none() { + let hash = self.named_queries[q_name].clone(); + let query = self.mir_queries[&(hash, mig.universe())].clone(); + debug!(self.log, "Fetched a mir_query from before {:?}", query); + Ok(query) + } else { + Ok(res.unwrap()) + } + }) .collect(); @@ -468,6 +478,7 @@ impl SqlIncorporator { ); let qfp = mir_query_to_flow_parts(&mut combined_mir_query, &mut mig, None); + self.compound_mir_queries.insert(query_name.to_owned(), combined_mir_query.clone()); self.register_query(query_name, None, &combined_mir_query, mig.universe()); @@ -562,12 +573,21 @@ impl SqlIncorporator { Ok((qfp, mir)) } - pub(super) fn remove_query(&mut self, query_name: &str, mig: &Migration) -> Option { + pub(super) fn remove_query(&mut self, query_name: &str, mig: &Migration) -> Option> { let nodeid = self .leaf_addresses .remove(query_name) .expect("tried to remove unknown query"); + if self.compound_mir_queries.contains_key(query_name) { + let mir_query = self.compound_mir_queries.get(query_name).unwrap().clone(); + self.compound_mir_queries.remove_entry(query_name); + self.view_schemas.remove(query_name).unwrap(); + // potentially would need to remove subqueries from named_queries + self.mir_converter.remove_query(query_name, &mir_query); + return Some(vec![nodeid]) + } + let qg_hash = self .named_queries .remove(query_name) @@ -594,8 +614,8 @@ impl SqlIncorporator { self.query_graphs.remove(&qg_hash).unwrap(); self.view_schemas.remove(query_name).unwrap(); - // trigger reader node removal - Some(nodeid) + // the node has been removed + Some(vec![nodeid]) } else { // more than one query uses this leaf // don't remove node yet! @@ -660,8 +680,11 @@ impl SqlIncorporator { self.named_queries.insert(query_name.to_owned(), qg_hash); } None => { - self.base_mir_queries - .insert(query_name.to_owned(), mir.clone()); + if self.compound_mir_queries.contains_key(query_name) { + debug!(self.log, "Dealing with a compound query {:?}", query_name.to_owned()); + } else { + self.base_mir_queries.insert(query_name.to_owned(), mir.clone()); + } } } } diff --git a/noria-server/src/integration.rs b/noria-server/src/integration.rs index 61aa604a2..245eb42e5 100644 --- a/noria-server/src/integration.rs +++ b/noria-server/src/integration.rs @@ -2514,3 +2514,38 @@ fn correct_nested_view_schema() { ]; assert_eq!(q.schema(), Some(&expected_schema[..])); } + +#[test] +fn remove_compound_query() { + let mut g = start_simple("remove_compound_query"); + let sql = " + CREATE TABLE answers_a (email_key int, lec int, answer text, PRIMARY KEY (email_key));\ + CREATE TABLE answers_b (email_key int, lec int, answer text, PRIMARY KEY (email_key));\ + QUERY answers: SELECT email_key, answer FROM answers_a WHERE lec=? UNION SELECT email_key, answer FROM answers_b WHERE lec=?; + "; + g.install_recipe(sql).unwrap(); + let mut write = g.table("answers_a").unwrap().into_sync(); + let mut write2 = g.table("answers_b").unwrap().into_sync(); + // insert a new record + write.insert(vec![1.into(), 3.into(), "hello".into()]).unwrap(); + write2.insert(vec![2.into(), 3.into(), "goodbye".into()]).unwrap(); + + g.remove_query("answers"); + assert_eq!(g.outputs().unwrap().len(), 0); + + let r1_txt = "\ + CREATE TABLE answers_c (email_key int, lec int, answer text, PRIMARY KEY (lec));\ + QUERY answers: SELECT email_key, answer FROM answers_a WHERE lec=? UNION SELECT email_key, answer FROM answers_b WHERE lec=? UNION SELECT email_key, answer FROM answers_c WHERE lec=?;\ + "; + g.extend_recipe(r1_txt).unwrap(); + assert_eq!(g.outputs().unwrap().len(), 1); + g.remove_query("answers"); + + let r2_txt = "\ + CREATE TABLE answers_d (email_key int, lec int, answer text, PRIMARY KEY (lec));\ + QUERY answers: SELECT email_key, answer FROM answers_a WHERE lec=? UNION SELECT email_key, answer FROM answers_b WHERE lec=? UNION SELECT email_key, answer FROM answers_c WHERE lec=? UNION SELECT email_key, answer FROM answers_d WHERE lec=?;\ + "; + g.extend_recipe(r2_txt).unwrap(); + assert_eq!(g.outputs().unwrap().len(), 1); + +}