Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "adapter: don't block the Coordinator on cluster status events (#31070) #31111

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3495,33 +3495,12 @@ impl Coordinator {
}

/// Publishes a notice message to all sessions.
///
/// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`]
/// so we keep it around.
#[allow(dead_code)]
pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) {
for meta in self.active_conns.values() {
let _ = meta.notice_tx.send(notice.clone());
}
}

/// Returns a closure that will publish a notice to all sessions that were active at the time
/// this method was called.
pub(crate) fn broadcast_notice_tx(
&self,
) -> Box<dyn FnOnce(AdapterNotice) -> () + Send + 'static> {
let senders: Vec<_> = self
.active_conns
.values()
.map(|meta| meta.notice_tx.clone())
.collect();
Box::new(move |notice| {
for tx in senders {
let _ = tx.send(notice.clone());
}
})
}

pub(crate) fn active_conns(&self) -> &BTreeMap<ConnectionId, ConnMeta> {
&self.active_conns
}
Expand Down
25 changes: 9 additions & 16 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,13 @@ impl Coordinator {
);

let builtin_table_updates = vec![builtin_table_retraction, builtin_table_addition];
// Returns a Future that completes when the Builtin Table write is completed.
let builtin_table_completion = self.builtin_table_update().defer(builtin_table_updates);

self.builtin_table_update()
.execute(builtin_table_updates)
.await
.0
.instrument(info_span!("coord::message_cluster_event::table_updates"))
.await;

let cluster = self.catalog().get_cluster(event.cluster_id);
let replica = cluster.replica(event.replica_id).expect("Replica exists");
Expand All @@ -773,24 +778,12 @@ impl Coordinator {
.get_cluster_replica_status(event.cluster_id, event.replica_id);

if old_replica_status != new_replica_status {
let notifier = self.broadcast_notice_tx();
let notice = AdapterNotice::ClusterReplicaStatusChanged {
self.broadcast_notice(AdapterNotice::ClusterReplicaStatusChanged {
cluster: cluster.name.clone(),
replica: replica.name.clone(),
status: new_replica_status,
time: event.time,
};
// In a separate task, so we don't block the Coordinator, wait for the builtin
// table update to complete, and then notify active sessions.
mz_ore::task::spawn(
|| format!("cluster_event-{}-{}", event.cluster_id, event.replica_id),
async move {
// Wait for the builtin table updates to complete.
builtin_table_completion.await;
// Notify all sessions that were active at the time the cluster status changed.
(notifier)(notice)
},
);
});
}
}
}
Expand Down
Loading