Skip to content

Commit

Permalink
Use std::thread for Files driver task
Browse files Browse the repository at this point in the history
  • Loading branch information
vruello committed Apr 9, 2024
1 parent b1aa2da commit 5d7226e
Showing 1 changed file with 39 additions and 55 deletions.
94 changes: 39 additions & 55 deletions server/src/drivers/files.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
use async_trait::async_trait;
use log::{debug, info, warn};
use tokio::fs::OpenOptions;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tokio::sync::oneshot;

use crate::event::EventMetadata;
use crate::output::OutputDriver;
use anyhow::{anyhow, bail, Context, Result};
use common::subscription::FilesConfiguration;
use std::collections::HashMap;
use std::fs::{create_dir_all, File, OpenOptions};
use std::io::Write;
use std::net::IpAddr;
use std::sync::mpsc::{self, Receiver};
use std::sync::Arc;
use std::{path::PathBuf, str::FromStr};
use tokio::fs::{create_dir_all, File};
use tokio::io::AsyncWriteExt;

enum WriteFilesMessage {
Write(WriteMessage),
Stop,
}

#[derive(Debug)]
pub struct WriteFilesMessage {
pub struct WriteMessage {
path: PathBuf,
content: String,
resp: oneshot::Sender<Result<()>>,
}

async fn handle_message(
file_handles: &mut HashMap<PathBuf, File>,
message: &WriteFilesMessage,
) -> Result<()> {
fn handle_message(file_handles: &mut HashMap<PathBuf, File>, message: &WriteMessage) -> Result<()> {
let parent = message
.path
.parent()
Expand All @@ -40,14 +41,13 @@ async fn handle_message(
None => {
// Create directory (if it does not already exist)
debug!("Create directory {}", parent.display());
create_dir_all(parent).await?;
create_dir_all(parent)?;
// Open file
debug!("Open file {}", path.display());
let file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await
.with_context(|| format!("Failed to open file {}", path.display()))?;
// Insert it into file_buffers map
file_handles.insert(path.clone(), file);
Expand All @@ -57,60 +57,44 @@ async fn handle_message(
.ok_or_else(|| anyhow!("Could not find newly inserted File in file handles"))?
}
};
file.write_all(message.content.as_bytes()).await?;
file.write_all(message.content.as_bytes())?;
Ok(())
}

pub async fn run(mut task_rx: mpsc::Receiver<WriteFilesMessage>, task_ct: CancellationToken) {
info!("File output task started");
fn run(rx: Receiver<WriteFilesMessage>) {
info!("File output thread started");
let mut file_handles: HashMap<PathBuf, File> = HashMap::new();
loop {
tokio::select! {
Some(message) = task_rx.recv() => {
let result = handle_message(&mut file_handles, &message).await;
if let Err(e) = message
.resp
.send(result) {
warn!("Failed to send File write result because the receiver dropped. Result was: {:?}", e);
}
},
_ = task_ct.cancelled() => {
break;
}
};
while let Ok(WriteFilesMessage::Write(message)) = rx.recv() {
let result = handle_message(&mut file_handles, &message);
if let Err(e) = message.resp.send(result) {
warn!(
"Failed to send File write result because the receiver dropped. Result was: {:?}",
e
);
}
}
info!("Exiting File output task");
info!("Exiting File output thread");
}

pub struct OutputFiles {
config: FilesConfiguration,
task_tx: mpsc::Sender<WriteFilesMessage>,
task_ct: CancellationToken,
tx: mpsc::Sender<WriteFilesMessage>,
}

impl OutputFiles {
pub fn new(config: &FilesConfiguration) -> Self {
debug!(
"Initialize Files driver with config {:?}",
config
);
debug!("Initialize Files driver with config {:?}", config);
// Create a communication channel with the task responsible for file management
// TODO: Why 32?
let (task_tx, task_rx) = mpsc::channel(32);
let (tx, rx) = mpsc::channel();

// Use a CancellationToken to tell the task to end itself
let task_ct = CancellationToken::new();
let cloned_task_ct = task_ct.clone();

// Launch the task responsible for handling file system operations
tokio::spawn(async move {
run(task_rx, cloned_task_ct).await;
// Launch a dedicated thread responsible for handling file system operations
std::thread::spawn(move || {
run(rx);
});

OutputFiles {
config: config.clone(),
task_tx,
task_ct,
tx,
}
}

Expand Down Expand Up @@ -256,13 +240,11 @@ impl OutputDriver for OutputFiles {

// Create a oneshot channel to retrieve the result of the operation
let (tx, rx) = oneshot::channel();
self.task_tx
.send(WriteFilesMessage {
path,
content,
resp: tx,
})
.await?;
self.tx.send(WriteFilesMessage::Write(WriteMessage {
path,
content,
resp: tx,
}))?;

// Wait for the result
rx.await??;
Expand All @@ -273,7 +255,9 @@ impl OutputDriver for OutputFiles {

impl Drop for OutputFiles {
fn drop(&mut self) {
self.task_ct.cancel();
if let Err(e) = self.tx.send(WriteFilesMessage::Stop) {
warn!("Failed to send Stop message to Files handler thread: {}", e);
}
}
}

Expand Down

0 comments on commit 5d7226e

Please sign in to comment.