Skip to content

Commit

Permalink
simplify session task
Browse files Browse the repository at this point in the history
  • Loading branch information
marcmo authored and DmitryAstafyev committed Jan 15, 2024
1 parent 7f7af53 commit ed67816
Showing 1 changed file with 37 additions and 50 deletions.
87 changes: 37 additions & 50 deletions application/apps/indexer/session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
tracker,
tracker::OperationTrackerAPI,
};
use futures::Future;
use indexer_base::progress::Severity;
use log::{debug, error, warn};
use processor::{grabber::LineRange, search::filter::SearchFilter};
Expand Down Expand Up @@ -67,12 +68,9 @@ impl Session {
};
let destroyed = session.destroyed.clone();
let destroying = session.destroying.clone();
let (tx, rx): (
oneshot::Sender<JoinHandle<()>>,
oneshot::Receiver<JoinHandle<()>>,
) = oneshot::channel();
let (tx, rx) = oneshot::channel();
let handle = task::spawn(async move {
let self_handle = match rx.await {
let self_handle: JoinHandle<()> = match rx.await {
Ok(handle) => handle,
Err(_) => {
error!("Fail to get handle of session task");
Expand Down Expand Up @@ -101,51 +99,24 @@ impl Session {
},
async {
join!(
async {
operations::run(
rx_operations,
state_api.clone(),
tracker_api.clone(),
tx_callback_events.clone(),
)
.await;
},
async {
if let Err(err) =
state::run(rx_state_api, tx_callback_events_state).await
{
error!("State loop exits with error:: {:?}", err);
if let Err(err) = Session::send_stop_signal(
Uuid::new_v4(),
&tx_operations,
None,
&destroying,
)
.await
{
error!("Fail to send stop signal (on state fail):: {:?}", err);
}
}
},
async {
if let Err(err) = tracker::run(state_api.clone(), rx_tracker_api).await
{
error!("Tracker loop exits with error:: {:?}", err);
if let Err(err) = Session::send_stop_signal(
Uuid::new_v4(),
&tx_operations,
None,
&destroying,
)
.await
{
error!(
"Fail to send stop signal (on tracker fail):: {:?}",
err
);
}
}
},
operations::run(
rx_operations,
state_api.clone(),
tracker_api.clone(),
tx_callback_events.clone(),
),
Self::run(
&tx_operations,
&destroying,
"state",
state::run(rx_state_api, tx_callback_events_state)
),
Self::run(
&tx_operations,
&destroying,
"tracker",
tracker::run(state_api.clone(), rx_tracker_api)
),
);
destroyed.cancel();
debug!("Session is finished");
Expand All @@ -160,6 +131,22 @@ impl Session {
}
}

async fn run(
tx_operations: &UnboundedSender<Operation>,
destroying: &CancellationToken,
name: &str,
f: impl Future<Output = Result<(), crate::events::NativeError>> + Send + 'static,
) {
if let Err(err) = f.await {
error!("State loop exits with error:: {:?}", err);
if let Err(err) =
Session::send_stop_signal(Uuid::new_v4(), tx_operations, None, destroying).await
{
error!("Fail to send stop signal (on {} fail):: {:?}", name, err);
}
}
}

pub fn get_uuid(&self) -> Uuid {
self.uuid
}
Expand Down

0 comments on commit ed67816

Please sign in to comment.