Skip to content

Commit

Permalink
chore: reduce mirror fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Sep 28, 2024
1 parent 1916abb commit c2cd74c
Showing 1 changed file with 36 additions and 20 deletions.
56 changes: 36 additions & 20 deletions crates/fluvio-sc/src/controllers/mirroring/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<C: MetadataItem> RemoteMirrorController<C> {
if let Err(err) = self.inner_loop(&mut backoff).await {
error!("error with inner loop: {:#?}", err);
if let Err(err) = self
.update_status(MirrorPairStatus::DetailFailure(err.to_string()))
.update_status(MirrorPairStatus::DetailFailure(err.to_string()), None)
.await
{
error!("error updating status: {:#?}", err);
Expand All @@ -67,7 +67,7 @@ impl<C: MetadataItem> RemoteMirrorController<C> {
#[instrument(skip(self))]
async fn inner_loop(&self, backoff: &mut ExponentialBackoff) -> Result<()> {
loop {
if let Some((home, _)) = self.get_mirror_home_cluster().await {
if let Some((home, status)) = self.get_mirror_home_cluster().await {
debug!("initializing listeners");
let home_config = self.build_home_client(&home).await?;
let mut stream = self.request_stream(&home, home_config).await?;
Expand All @@ -90,7 +90,11 @@ impl<C: MetadataItem> RemoteMirrorController<C> {
}

info!("synced topics from home");
self.update_status(MirrorPairStatus::Succesful).await?;
self.update_status(
MirrorPairStatus::Succesful,
Some((home.clone(), status.clone())),
)
.await?;
backoff.reset();
}
Err(err) => {
Expand Down Expand Up @@ -170,21 +174,22 @@ impl<C: MetadataItem> RemoteMirrorController<C> {

// Get the mirror home
async fn get_mirror_home_cluster(&self) -> Option<(Home, MirrorStatus)> {
self.mirrors.store().read().await.values().find_map(|r| {
match r.spec().mirror_type.clone() {
MirrorType::Home(h) => Some((h, r.status.clone())),
_ => None,
}
})
let mirror_store = self.mirrors.store().read().await;
let home_and_status =
mirror_store
.values()
.find_map(|r| match r.spec().mirror_type.clone() {
MirrorType::Home(h) => Some((h, r.status.clone())),
_ => None,
});
drop(mirror_store);
home_and_status
}

// Delete topics that are not in the response
async fn delete_not_received_topics(&self, topics: Vec<String>, home_id: String) -> Result<()> {
let all_topics_keys = self
.topics
.store()
.read()
.await
let topic_store = self.topics.store().read().await;
let all_topics_keys = topic_store
.values()
.filter_map(|t| match t.spec().replicas() {
ReplicaSpec::Mirror(MirrorConfig::Remote(r)) => {
Expand All @@ -197,6 +202,7 @@ impl<C: MetadataItem> RemoteMirrorController<C> {
_ => None,
})
.collect::<Vec<_>>();
drop(topic_store);

for topic_key in all_topics_keys.iter() {
if !topics.contains(topic_key) {
Expand Down Expand Up @@ -227,7 +233,8 @@ impl<C: MetadataItem> RemoteMirrorController<C> {
// Check if the topic already exists
// If it does, update the replica spec
// If it doesn't, create a new topic with the replica spec
let mut remote_topic = if let Some(t) = self.topics.store().read().await.get(&topic.key) {
let topic_store = self.topics.store().read().await;
let mut remote_topic = if let Some(t) = topic_store.get(&topic.key) {
let mut topic_spec = t.spec.clone();
topic_spec.set_replicas(new_replica.clone());

Expand All @@ -245,6 +252,7 @@ impl<C: MetadataItem> RemoteMirrorController<C> {
topic_spec.set_replicas(new_replica);
topic_spec
};
drop(topic_store);

if let Some(cleanup_policy) = topic.spec.get_clean_policy() {
remote_topic.set_cleanup_policy(cleanup_policy.clone())
Expand All @@ -266,11 +274,19 @@ impl<C: MetadataItem> RemoteMirrorController<C> {
}

// Update the status of the mirror
async fn update_status(&self, pair_status: MirrorPairStatus) -> Result<()> {
let (home, status) = self
.get_mirror_home_cluster()
.await
.ok_or(anyhow!("no home cluster found"))?;
async fn update_status(
&self,
pair_status: MirrorPairStatus,
home_and_status: Option<(Home, MirrorStatus)>,
) -> Result<()> {
let (home, status) = match home_and_status {
Some(h) => h,
None => self
.get_mirror_home_cluster()
.await
.ok_or(anyhow!("no home cluster found"))?,
};

let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_millis();
Expand Down

0 comments on commit c2cd74c

Please sign in to comment.