From 87c21e2baa27ee6913394d39e5ab32031453d3c3 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Sun, 19 Jan 2025 20:55:25 +0800 Subject: [PATCH] fix(flow): deal with flow drop leftover (#5391) * fix: deal with flow drop leftover * chore: make it warn * chore: apply suggestion. Co-authored-by: Ruihang Xia * chore: review --------- Co-authored-by: dennis zhuang Co-authored-by: Ruihang Xia --- src/flow/src/adapter.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 373cc7e8917b..777dcbcdf88e 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -284,12 +284,29 @@ impl FlowWorkerManager { let (catalog, schema) = (table_name[0].clone(), table_name[1].clone()); let ctx = Arc::new(QueryContext::with(&catalog, &schema)); - let (is_ts_placeholder, proto_schema) = self + let (is_ts_placeholder, proto_schema) = match self .try_fetch_existing_table(&table_name) .await? .context(UnexpectedSnafu { reason: format!("Table not found: {}", table_name.join(".")), - })?; + }) { + Ok(r) => r, + Err(e) => { + if self + .table_info_source + .get_opt_table_id_from_name(&table_name) + .await? + .is_none() + { + // deal with both flow&sink table no longer exists + // but some output is still in output buf + common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join(".")); + continue; + } else { + return Err(e); + } + } + }; let schema_len = proto_schema.len(); let total_rows = reqs.iter().map(|r| r.len()).sum::();