Skip to content

Commit

Permalink
fix: link_parent more realible
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Oct 27, 2024
1 parent b650cae commit 7e4bb89
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 66 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/fluvio-sc/src/controllers/topics/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl<C: MetadataItem> TopicNextState<C> {
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand All @@ -239,7 +239,7 @@ impl<C: MetadataItem> TopicNextState<C> {
.await;
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand All @@ -251,7 +251,7 @@ impl<C: MetadataItem> TopicNextState<C> {
let mut next_state = TopicNextState::same_next_state(topic);
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand Down Expand Up @@ -335,7 +335,7 @@ impl<C: MetadataItem> TopicNextState<C> {
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand Down
30 changes: 14 additions & 16 deletions crates/fluvio-sc/src/stores/topic/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use fluvio_stream_model::{
store::{MetadataStoreObject, LocalStore},
core::MetadataItem,
};
use tracing::{debug, trace};
use tracing::{debug, info, trace};
use async_trait::async_trait;

use crate::stores::partition::PartitionLocalStore;
Expand All @@ -19,7 +19,7 @@ pub type DefaultTopicLocalStore = TopicLocalStore<u32>;

#[async_trait]
pub trait TopicMd<C: MetadataItem> {
async fn partitions_from_replicas(
async fn create_new_partitions(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>>;
Expand All @@ -30,32 +30,32 @@ impl<C: MetadataItem> TopicMd<C> for TopicMetadata<C>
where
C: MetadataItem + Send + Sync,
{
/// get partitions from replica map
async fn partitions_from_replicas(
/// create new partitions from the replica map if it doesn't exists
async fn create_new_partitions(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>> {
let mut partitions = vec![];
let replica_map = &self.status.replica_map;
trace!(?replica_map, "creating new partitions for topic");
let store = partition_store.read().await;
for (idx, replicas) in replica_map.iter() {
let mirror = self.status.mirror_map.get(idx);

let replica_key = ReplicaKey::new(self.key(), *idx);

let partition_spec = PartitionSpec::from_replicas(replicas.clone(), &self.spec, mirror);
let store = partition_store.read().await;
let partition = store.get(&replica_key);
if let Some(p) = partition {
partitions.push(p.inner().clone());
} else {
debug!(?replica_key, ?partition_spec, "creating new partition");
if !partition_store.contains_key(&replica_key).await {
info!(?replica_key, ?partition_spec, "creating new partition");
partitions.push(
MetadataStoreObject::with_spec(replica_key, partition_spec)
.with_context(self.ctx.create_child()),
)
} else {
debug!(?replica_key, "partition already exists");
}
}
drop(store);
partitions
}
}
Expand Down Expand Up @@ -250,12 +250,10 @@ mod test {
let topic = MetadataStoreObject::<TopicSpec, u32>::new(key, spec, status);
let partition_store = DefaultPartitionStore::bulk_new(vec![partition_stored]);

let partitions = topic.partitions_from_replicas(&partition_store).await;
let partitions = topic.create_new_partitions(&partition_store).await;

assert_eq!(partitions.len(), 2);
assert_eq!(partitions[0].key, ReplicaKey::new("topic-1", 0_u32));
assert_eq!(partitions[0].spec.leader, 0);
assert_eq!(partitions[1].key, ReplicaKey::new("topic-1", 1_u32));
assert_eq!(partitions[1].spec.leader, 1);
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].key, ReplicaKey::new("topic-1", 1_u32));
assert_eq!(partitions[0].spec.leader, 1);
}
}
2 changes: 1 addition & 1 deletion crates/fluvio-stream-dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-stream-dispatcher"
edition = "2021"
version = "0.13.5"
version = "0.13.6"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Fluvio Event Stream access"
repository = "https://github.com/infinyon/fluvio"
Expand Down
61 changes: 17 additions & 44 deletions crates/fluvio-stream-dispatcher/src/metadata/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,12 @@ cfg_if::cfg_if! {
trace!(?value, "apply");
let store = self.get_store::<S>()?;
value.ctx_mut().item_mut().id = value.key().to_string();
if let Some(owner) = value.ctx().item().owner() {
self.link_parent::<S>(owner, value.ctx().item()).await?;
let item = value.ctx().item().clone();
store.apply(value).await?;
if let Some(owner) = item.owner() {
self.link_parent::<S>(owner, &item).await?;
}
store.apply(value).await
Ok(())
}

async fn update_spec<S>(&self, metadata: LocalMetadataItem, spec: S) -> Result<()>
Expand Down Expand Up @@ -290,7 +292,6 @@ cfg_if::cfg_if! {
#[derive(Debug, Clone)]
struct SpecPointer {
inner: Arc<dyn Any + Send + Sync>,
revision: u64,
store_revision: u64,
path: PathBuf,
}
Expand Down Expand Up @@ -344,14 +345,11 @@ cfg_if::cfg_if! {
) -> Result<()> {
trace!(?parent, ?child, "link parent");
let parent_store = self.get_store::<S::Owner>()?;
parent_store
.mut_in_place::<S::Owner, _>(parent.uid(), |parent_obj| {
parent_obj
.ctx_mut()
.item_mut()
.put_child(S::LABEL, child.clone());
})
.await?;
let mut parent_obj = parent_store.retrieve_item::<S::Owner>(parent).await?;
let mut children_without_parent = child.clone();
children_without_parent.parent = None;
parent_obj.ctx_mut().item_mut().put_child(S::LABEL, children_without_parent);
parent_store.apply(parent_obj).await?;
Ok(())
}

Expand All @@ -362,14 +360,11 @@ cfg_if::cfg_if! {
) -> Result<()> {
trace!(?parent, ?child, "link parent");
let parent_store = self.get_store::<S::Owner>()?;
parent_store
.mut_in_place::<S::Owner, _>(parent.uid(), |parent_obj| {
parent_obj
.ctx_mut()
.item_mut()
.remove_child(S::LABEL, child);
})
.await?;
let mut parent_obj = parent_store.retrieve_item::<S::Owner>(parent).await?;
let mut children_without_parent = child.clone();
children_without_parent.parent = None;
parent_obj.ctx_mut().item_mut().remove_child(S::LABEL, &children_without_parent);
parent_store.apply(parent_obj).await?;
Ok(())
}

Expand Down Expand Up @@ -530,21 +525,6 @@ cfg_if::cfg_if! {
self.path.join(format!("{name}.yaml"))
}

async fn mut_in_place<S: Spec, F>(&self, key: &str, func: F) -> Result<()>
where
F: Fn(&mut LocalStoreObject<S>),
{
if let Some(spec) = self.data.write().get_mut(key) {
let mut obj = spec.downcast::<S>()?;
func(&mut obj);
spec.set(obj);
spec.flush::<S>()?;
Ok(())
} else {
anyhow::bail!("'{key}' not found");
}
}

async fn send_update(&self, mut update: SpecUpdate) {
let store_revision = self
.version
Expand All @@ -559,14 +539,12 @@ cfg_if::cfg_if! {

impl SpecPointer {
fn new<S: Spec, P: AsRef<Path>>(path: P, obj: LocalStoreObject<S>) -> Self {
let revision = obj.ctx().item().revision;
let inner = Arc::new(obj);
let path = path.as_ref().to_path_buf();
let store_revision = Default::default();
Self {
inner,
path,
revision,
store_revision,
}
}
Expand Down Expand Up @@ -600,11 +578,6 @@ cfg_if::cfg_if! {
serde_yaml::to_writer(std::fs::File::create(&self.path)?, &storage)?;
Ok(())
}

fn set<S: Spec>(&mut self, obj: LocalStoreObject<S>) {
self.revision = obj.ctx().item().revision;
self.inner = Arc::new(obj);
}
}

impl SpecUpdate {
Expand Down Expand Up @@ -1327,12 +1300,12 @@ spec:
1
);

assert!(parent_meta
assert_eq!(parent_meta
.children()
.unwrap()
.get(TestSpec::LABEL)
.expect("test spec children")
.contains(child.ctx().item()),);
.first().unwrap().id, child.ctx().item().id);

meta_store
.delete_item::<TestSpec>(child.ctx().item().clone())
Expand Down

0 comments on commit 7e4bb89

Please sign in to comment.