Skip to content

Commit

Permalink
Implementation of task-oriented workflow within summon_familiar.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexzwanenburg committed Nov 5, 2024
1 parent f378d7f commit 862cb59
Show file tree
Hide file tree
Showing 10 changed files with 392 additions and 430 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ Collate:
'StringUtilities.R'
'TaskFeatureInfo.R'
'TaskMain.R'
'TaskVimp.R'
'TestDataCreators.R'
'TestFunctions.R'
'TrainS4Methods.R'
Expand Down
275 changes: 0 additions & 275 deletions R/DataPreProcessing.R
Original file line number Diff line number Diff line change
@@ -1,278 +1,3 @@
run_preprocessing <- function(
cl,
feature_info_list = NULL,
project_info,
settings,
file_paths,
message_indent = 0L,
verbose
) {

# Suppress NOTES due to non-standard evaluation in data.table
data_id <- run_id <- list_name <- complete <- NULL

# Determine how parallel processing takes place.
if (settings$prep$do_parallel %in% c("TRUE", "inner")) {
# Parallel processing in inner function, i.e. within each data subset.
cl_inner <- cl
cl_outer <- NULL

} else if (settings$prep$do_parallel %in% c("outer")) {
# Parallel processing in outer loop, i.e. over all data subsets.
cl_inner <- NULL
cl_outer <- cl

if (!is.null(cl_outer)) {
logger_message(
paste0(
"\nPre-processing: Load-balanced parallel processing is done in the outer loop. ",
"No progress can be displayed."
),
indent = message_indent,
verbose = verbose
)
}

} else {
# No parallel processing.
cl_inner <- cl_outer <- NULL
}

# Check if a feature info list was already created. This will typically
# generate a generic feature info list when called from summon_familiar.
if (is.null(feature_info_list)) {
feature_info_list <- .get_feature_info_data(
data = get_data_from_backend(),
file_paths = file_paths,
project_id = project_info$project_id,
outcome_type = settings$data$outcome_type
)
}

# TODO: Check if the generic contains all the required data -- particularly
# for externally provided feature information.

# Create a list of runs for which pre-processing information should be
# obtained. First find the data ids over which should be iterated.
data_id <- c(
.get_process_step_data_identifier(
project_info = project_info,
process_step = "vimp"
),
.get_process_step_data_identifier(
project_info = project_info,
process_step = "mb"
)
)

# Create a list of runs, with data_id and run_id.
run_list <- data.table::rbindlist(lapply(
unique(data_id),
function(data_id, project_info) {
# Find the current data identifier for pre-processing. This may or may not
# be data_id.
pre_process_data_id <- .get_preprocessing_iteration_identifiers(run = .get_run_list(
iteration_list = project_info$iter_list,
data_id = data_id,
run_id = 1L
))$data

# Find data and run ids.
iteration_list <- .get_run_list(
iteration_list = project_info$iter_list,
data_id = pre_process_data_id
)

# Iterate over the iteration list, extract the run-table and return it.
return(data.table::rbindlist(lapply(
iteration_list,
function(x) (tail(x$run_table, n = 1L))
)))
},
project_info = project_info
))

# Remove duplicates.
run_list <- unique(run_list)

# Add list names and check for completeness.
run_list[, ":="(
"list_name" = .get_feature_info_list_name(data_id = data_id, run_id = run_id),
"complete" = FALSE
)]

# Iterate over the runs and check which feature information lists are already
# fully complete.
run_list[, "complete" := feature_info_complete(feature_info_list[[list_name]]), by = "list_name"]

# Get all runs which are not (fully) complete, and add some additional data.
run_list <- run_list[complete == FALSE, ]

if (!is_empty(run_list)) {
# Set preprocessing run identifier and total number of datasets.
run_list[, ":="(
"preprocessing_run_id" = .I,
"n_preprocessing_runs" = nrow(run_list)
)]

# Iterate over data subsets for which parameters have not yet been set.
new_feature_info_list <- fam_mapply_lb(
cl = cl_outer,
assign = "data",
FUN = .run_preprocessing,
progress_bar = !is.null(cl_outer),
run = split(run_list, by = c("preprocessing_run_id")),
MoreArgs = list(
"cl" = cl_inner,
"feature_info_list" = feature_info_list,
"project_info" = project_info,
"settings" = settings,
"message_indent" = message_indent,
"verbose" = verbose & is.null(cl_outer)
)
)

# Set names of the new feature list.
names(new_feature_info_list) <- run_list$list_name

# Update lists with feature information.
feature_info_list[run_list$list_name] <- new_feature_info_list
}

# Save to file, if necessary.
if (!is.null(file_paths)) {
# Determine file name
feature_info_file <- .get_feature_info_file_name(
file_paths = file_paths,
project_id = project_info$project_id
)

# Write to file
saveRDS(feature_info_list, file = feature_info_file)
}

# Attach the feature info file to the backend.
.assign_feature_info_to_backend(feature_info_list = feature_info_list)

return(invisible(TRUE))
}



.run_preprocessing <- function(
cl = NULL,
run,
feature_info_list,
project_info,
settings,
message_indent,
verbose
) {

logger_message(
paste0(
"\nPre-processing: Starting preprocessing for run ",
run$preprocessing_run_id, " of ",
run$n_preprocessing_runs, "."
),
indent = message_indent,
verbose = verbose
)

# Selected feature information list.
template_feature_info <- combine_feature_info_list(
preferred = feature_info_list[[run$list_name]],
custom = feature_info_list[["custom"]],
generic = feature_info_list[["generic"]]
)

# Find pre-processing parameters
feature_info_list <- determine_preprocessing_parameters(
cl = cl,
feature_info_list = template_feature_info,
data_id = run$data_id,
run_id = run$run_id,
project_info = project_info,
settings = settings,
message_indent = message_indent + 1L,
verbose = verbose
)

return(feature_info_list)
}



determine_preprocessing_parameters <- function(
cl = NULL,
feature_info_list,
data_id,
run_id,
project_info,
settings,
message_indent,
verbose
) {

# Add workflow control info.
feature_info_list <- add_control_info(
feature_info_list = feature_info_list,
data_id = data_id,
run_id = run_id
)

# Add signature feature info.
feature_info_list <- add_signature_info(
feature_info_list = feature_info_list,
signature = settings$data$signature
)

# Add novelty feature info.
feature_info_list <- add_novelty_info(
feature_info_list = feature_info_list,
novelty_features = settings$data$novelty_features
)

# Find the run list.
run_list <- .get_run_list(
iteration_list = project_info$iter_list,
data_id = data_id,
run_id = run_id
)

# Select unique samples.
sample_identifiers <- .get_sample_identifiers(
run = run_list,
train_or_validate = "train"
)
sample_identifiers <- unique(sample_identifiers)

# Find currently available features.
available_features <- get_available_features(feature_info_list = feature_info_list)

# Create a dataObject.
data <- methods::new(
"dataObject",
data = get_data_from_backend(sample_identifiers = sample_identifiers),
preprocessing_level = "none",
outcome_type = settings$data$outcome_type
)

# Remove unavailable features from the data object.
data <- filter_features(data = data, available_features = available_features)

return(.determine_preprocessing_parameters(
cl = cl,
data = data,
feature_info_list = feature_info_list,
settings = settings,
message_indent = message_indent,
verbose = verbose
))
}



.get_feature_info_file_name <- function(file_paths, project_id) {
# Generate file name of pre-processing file
file_name <- paste0(project_id, "_feature_info.RDS")
Expand Down
6 changes: 3 additions & 3 deletions R/DataProcessing.R
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,19 @@
# Get the main data id for a step in the overall modelling process.

# Suppress NOTES due to non-standard evaluation in data.table
feat_sel <- model_building <- external_validation <- NULL
vimp <- train <- external_validation <- NULL

# Load experiment data table
experiment_table <- project_info$experiment_setup

if (process_step == "vimp") {
# Find row on where feature selection takes place and extract the main data
# id.
main_data_id <- experiment_table[feat_sel == TRUE, ]$main_data_id[1L]
main_data_id <- experiment_table[vimp == TRUE, ]$main_data_id[1L]

} else if (process_step %in% c("mb")) {
# Find row where model building takes place and extract the main data id.
main_data_id <- experiment_table[model_building == TRUE, ]$main_data_id[1L]
main_data_id <- experiment_table[train == TRUE, ]$main_data_id[1L]

} else if (process_step == "ev") {
# Check if external validation is present; otherwise return an illegal main
Expand Down
Loading

0 comments on commit 862cb59

Please sign in to comment.