From 2ece33306ceb063dd8e19dc57e76a6a55b7f0d03 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Sat, 18 Jan 2025 14:17:33 +0000 Subject: [PATCH] Revert "adapter: don't block the Coordinator on cluster status events (#31070)" This reverts commit bb372ed6e3aa6ca049ae6bd87018cae7bc269ae9. --- src/adapter/src/coord.rs | 21 -------------------- src/adapter/src/coord/message_handler.rs | 25 +++++++++--------------- 2 files changed, 9 insertions(+), 37 deletions(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 2f26cff656e02..75974e79352ac 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -3495,33 +3495,12 @@ impl Coordinator { } /// Publishes a notice message to all sessions. - /// - /// TODO(parkmycar): This code is dead, but is a nice parallel to [`Coordinator::broadcast_notice_tx`] - /// so we keep it around. - #[allow(dead_code)] pub(crate) fn broadcast_notice(&self, notice: AdapterNotice) { for meta in self.active_conns.values() { let _ = meta.notice_tx.send(notice.clone()); } } - /// Returns a closure that will publish a notice to all sessions that were active at the time - /// this method was called. - pub(crate) fn broadcast_notice_tx( - &self, - ) -> Box () + Send + 'static> { - let senders: Vec<_> = self - .active_conns - .values() - .map(|meta| meta.notice_tx.clone()) - .collect(); - Box::new(move |notice| { - for tx in senders { - let _ = tx.send(notice.clone()); - } - }) - } - pub(crate) fn active_conns(&self) -> &BTreeMap { &self.active_conns } diff --git a/src/adapter/src/coord/message_handler.rs b/src/adapter/src/coord/message_handler.rs index 374479754c17f..e0f93c810f182 100644 --- a/src/adapter/src/coord/message_handler.rs +++ b/src/adapter/src/coord/message_handler.rs @@ -763,8 +763,13 @@ impl Coordinator { ); let builtin_table_updates = vec![builtin_table_retraction, builtin_table_addition]; - // Returns a Future that completes when the Builtin Table write is completed. - let builtin_table_completion = self.builtin_table_update().defer(builtin_table_updates); + + self.builtin_table_update() + .execute(builtin_table_updates) + .await + .0 + .instrument(info_span!("coord::message_cluster_event::table_updates")) + .await; let cluster = self.catalog().get_cluster(event.cluster_id); let replica = cluster.replica(event.replica_id).expect("Replica exists"); @@ -773,24 +778,12 @@ impl Coordinator { .get_cluster_replica_status(event.cluster_id, event.replica_id); if old_replica_status != new_replica_status { - let notifier = self.broadcast_notice_tx(); - let notice = AdapterNotice::ClusterReplicaStatusChanged { + self.broadcast_notice(AdapterNotice::ClusterReplicaStatusChanged { cluster: cluster.name.clone(), replica: replica.name.clone(), status: new_replica_status, time: event.time, - }; - // In a separate task, so we don't block the Coordinator, wait for the builtin - // table update to complete, and then notify active sessions. - mz_ore::task::spawn( - || format!("cluster_event-{}-{}", event.cluster_id, event.replica_id), - async move { - // Wait for the builtin table updates to complete. - builtin_table_completion.await; - // Notify all sessions that were active at the time the cluster status changed. - (notifier)(notice) - }, - ); + }); } } }