diff --git a/crates/fluvio-stream-dispatcher/src/metadata/local.rs b/crates/fluvio-stream-dispatcher/src/metadata/local.rs index 973d62c5cd..d14ac07da3 100644 --- a/crates/fluvio-stream-dispatcher/src/metadata/local.rs +++ b/crates/fluvio-stream-dispatcher/src/metadata/local.rs @@ -1326,6 +1326,69 @@ spec: drop(meta_folder) } + #[fluvio_future::test] + async fn test_parent_linking_with_multiple_children_and_do_not_add_children_to_parents_with_stale_version() { + // given + let meta_folder = tempfile::tempdir().expect("temp dir created"); + let meta_store = LocalMetadataStorage::new(&meta_folder); + let (mut parent, mut children) = test_parent_with_children(4); + + let child = children.remove(0); + let child2 = children.remove(0); + let child3 = children.remove(0); + let child4 = children.remove(0); + + // only parent without children + parent.ctx_mut().item_mut().set_children(Default::default()); + meta_store + .apply(parent.clone()) + .await + .expect("applied parent"); + + // when applying 4 children and one parent + let (r1, r2, r3, r4, r5) = tokio::join!( + meta_store.apply(child.clone()), + meta_store.apply(parent.clone()), + meta_store.apply(child2.clone()), + meta_store.apply(child3.clone()), + meta_store.apply(child4.clone()) + ); + + r1.expect("applied child"); + // parent is old, should not be accepted + // it was updated by the child 1 + assert_eq!(r2.unwrap_err().to_string(), "attempt to update by stale value: current version: 1, proposed: 0"); + r3.expect("applied child"); + r4.expect("applied child"); + r5.expect("applied child"); + + let parent_meta = meta_store + .retrieve_items::(&NameSpace::All) + .await + .expect("items") + .items + .remove(0) + .ctx_owned() + .into_inner(); + + + // then + assert_eq!(parent_meta.children().unwrap().len(), 1); + let children = parent_meta + .children() + .unwrap() + .get(TestSpec::LABEL) + .expect("test spec children"); + + assert_eq!(children.len(), 4); + assert!(children.iter().any(|c| c.id == child.ctx.item().id)); + assert!(children.iter().any(|c| c.id == child2.ctx.item().id)); + assert!(children.iter().any(|c| c.id == child3.ctx.item().id)); + assert!(children.iter().any(|c| c.id == child4.ctx.item().id)); + drop(meta_folder) + } + + #[fluvio_future::test] async fn test_parent_is_not_existed() { //given