Skip to content

Commit

Permalink
bug(client): reset response state
Browse files Browse the repository at this point in the history
  • Loading branch information
sjrusso8 committed Mar 24, 2024
1 parent c039840 commit e5f9411
Showing 1 changed file with 44 additions and 2 deletions.
46 changes: 44 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ impl ChannelBuilder {
let spark_connnect_client = SparkConnectClient {
stub: client.clone(),
builder: self.clone(),
handler: ResponseHandler::default(),
analyzer: AnalyzeHandler::default(),
handler: ResponseHandler::new(),
analyzer: AnalyzeHandler::new(),
};

Ok(SparkSession::new(spark_connnect_client))
Expand Down Expand Up @@ -283,6 +283,42 @@ pub struct AnalyzeHandler {
pub get_storage_level: Option<spark::StorageLevel>,
}

impl ResponseHandler {
fn new() -> Self {
Self {
metrics: None,
observed_metrics: None,
schema: None,
batches: Vec::new(),
sql_command_result: None,
write_stream_operation_start_result: None,
streaming_query_command_result: None,
get_resources_command_result: None,
streaming_query_manager_command_result: None,
result_complete: None,
total_count: 0,
}
}
}

impl AnalyzeHandler {
fn new() -> Self {
Self {
schema: None,
explain: None,
tree_string: None,
is_local: None,
is_streaming: None,
input_files: None,
spark_version: None,
ddl_parse: None,
same_semantics: None,
semantic_hash: None,
get_storage_level: None,
}
}
}

#[derive(Clone, Debug)]
pub struct SparkConnectClient<T> {
stub: Arc<Mutex<SparkConnectServiceClient<T>>>,
Expand Down Expand Up @@ -341,6 +377,9 @@ where

drop(client);

// clear out any prior responses
self.handler = ResponseHandler::new();

while let Some(resp) = resp.message().await.map_err(|err| {
SparkError::IoError(
err.to_string(),
Expand All @@ -363,6 +402,9 @@ where

let mut client = self.stub.lock().await;

// clear out any prior responses
self.analyzer = AnalyzeHandler::new();

let resp = client.analyze_plan(req).await?.into_inner();

drop(client);
Expand Down

0 comments on commit e5f9411

Please sign in to comment.