Skip to content

Commit

Permalink
Fixed rds file path error by modifying .Rgitignore
Browse files Browse the repository at this point in the history
  • Loading branch information
Insang Song committed Nov 17, 2023
1 parent d10c8bc commit 3d811f2
Show file tree
Hide file tree
Showing 10 changed files with 374 additions and 269 deletions.
1 change: 0 additions & 1 deletion .Rbuildignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
rds$
/tools/*
/.github
scomps*.html$
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export(sp_indexing)
export(switch_packbound)
export(validate_and_repair_vectors)
import(future)
import(future.apply)
import(progressr)
importFrom(dplyr,across)
importFrom(methods,is)
Expand Down
245 changes: 147 additions & 98 deletions R/scale_process.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,86 +17,109 @@
#' # distribute_process_grid()
#' @import future
#' @export
distribute_process_grid <- function(
grids,
grid_target_id = NULL,
fun,
...) {
if (is.character(grid_target_id) && !grepl(":", grid_target_id)) {
stop("Character grid_target_id should be in a form of 'startid:endid'.\n")
}
if (is.numeric(grid_target_id) && length(grid_target_id) != 2) {
stop("Numeric grid_target_id should be in a form of c(startid, endid).\n")
}
# subset using grids and grid_id
if (is.null(grid_target_id)) {
grid_target_ids <- unlist(grids[["CGRIDID"]])
}
if (is.character(grid_target_id)) {
grid_id_parsed <- strsplit(grid_target_id, ":", fixed = TRUE)[[1]]
grid_target_ids <- c(which(unique(grids[["CGRIDID"]]) == grid_id_parsed[1]),
which(unique(grids[["CGRIDID"]]) == grid_id_parsed[2]))
}
if (is.numeric(grid_target_id)) {
grid_target_ids <- unique(grids[["CGRIDID"]])[grid_target_id]
}
par_fun <- list(...)

grids_target <- grids[grid_target_ids %in% unlist(grids[["CGRIDID"]]),]
grids_target_list <- split(grids_target, unlist(grids_target[["CGRIDID"]]))

results_distributed <- future.apply::future_lapply(
grids_target_list,
\(x) {
sf::sf_use_s2(FALSE)

run_result <- tryCatch({
res <- fun(..., grid_ref = x)
cat(sprintf("Your input function %s was successfully run at CGRIDID: %s\n",
paste0(quote(fun)), as.character(unlist(x[["CGRIDID"]]))))
distribute_process_grid <-
function(
grids,
grid_target_id = NULL,
fun,
...) {
if (is.character(grid_target_id) && !grepl(":", grid_target_id)) {
stop("Character grid_target_id should be in a form of 'startid:endid'.\n")
}
if (is.numeric(grid_target_id) && length(grid_target_id) != 2) {
stop("Numeric grid_target_id should be in a form of c(startid, endid).\n")
}
# subset using grids and grid_id
if (is.null(grid_target_id)) {
grid_target_ids <- unlist(grids[["CGRIDID"]])
}
if (is.character(grid_target_id)) {
grid_id_parsed <- strsplit(grid_target_id, ":", fixed = TRUE)[[1]]
grid_target_ids <- c(which(unique(grids[["CGRIDID"]]) == grid_id_parsed[1]),
which(unique(grids[["CGRIDID"]]) == grid_id_parsed[2]))
}
if (is.numeric(grid_target_id)) {
grid_target_ids <- unique(grids[["CGRIDID"]])[grid_target_id]
}
par_fun <- list(...)

grids_target <- grids[grid_target_ids %in% unlist(grids[["CGRIDID"]]),]
grids_target_list <- split(grids_target, unlist(grids_target[["CGRIDID"]]))

results_distributed <- future.apply::future_lapply(
grids_target_list,
\(x) {
sf::sf_use_s2(FALSE)

run_result <- tryCatch({
res <- fun(..., grid_ref = x)
cat(sprintf("Your input function %s was
successfully run at CGRIDID: %s\n",
paste0(quote(fun)), as.character(unlist(x[["CGRIDID"]]))))

return(res)
},
error = function(e) return(data.frame(ID = NA)))

return(res)
return(run_result)
},
error = function(e) return(data.frame(ID = NA)))

return(run_result)
},
future.seed = TRUE,
future.packages = c("terra", "sf", "dplyr", "scomps", "future"))
results_distributed <- do.call(dplyr::bind_rows, results_distributed)
results_distributed <- results_distributed[!is.na(results_distributed[["ID"]]),]

# post-processing
detected_id <- grep("^id", names(par_fun), value = TRUE)
detected_point <- grep("^(points|poly)", names(par_fun), value = TRUE)
names(results_distributed)[1] <- par_fun[[detected_id]]
results_distributed[[par_fun[[detected_id]]]] <-
unlist(par_fun[[detected_point]][[par_fun[[detected_id]]]])

return(results_distributed)
future.seed = TRUE,
future.packages = c("terra", "sf", "dplyr", "scomps", "future"))
results_distributed <- do.call(dplyr::bind_rows, results_distributed)
results_distributed <- results_distributed[!is.na(results_distributed[["ID"]]),]

# post-processing
detected_id <- grep("^id", names(par_fun), value = TRUE)
detected_point <- grep("^(points|poly)", names(par_fun), value = TRUE)
names(results_distributed)[1] <- par_fun[[detected_id]]
results_distributed[[par_fun[[detected_id]]]] <-
unlist(par_fun[[detected_point]][[par_fun[[detected_id]]]])

return(results_distributed)
}


#' @title Process a given function using a hierarchy in input data
#'
#' @description "Hierarchy" refers to a system, which divides the entire study region into multiple subregions. It is oftentimes reflected in an area code system (e.g., FIPS for US Census geographies, HUC-4, -6, -8, etc.). Currently only accepting \link[future]{multicore} setting (single node, single process, and multiple threads). For details of the terminology in \code{future} package, refer to \link[future]{plan}. This function assumes that users have one raster file and a sizable and spatially distributed target locations. Each thread will process ceiling(|Ng|/|Nt|) grids where |Ng| denotes the number of grids and |Nt| denotes the number of threads. Please be advised that accessing the same file simultaneously with multiple processes may result in errors.
#' @param regions sf/SpatVector object. Computational regions. Only polygons are accepted.
#' @param split_level character(nrow(regions)) or character(1). The regions will be split by the common level value. The level should be higher than the original data level. A field name with the higher level information is also accepted.
#' @description "Hierarchy" refers to a system,
#' which divides the entire study region into multiple subregions.
#' It is oftentimes reflected in an area code system
#' (e.g., FIPS for US Census geographies, HUC-4, -6, -8, etc.).
#' Currently only accepting \link[future]{multicore} setting
#' (single node, single process, and multiple threads).
#' For details of the terminology in \code{future} package,
#' refer to \link[future]{plan}.
#' This function assumes that users have one raster file and
#' a sizable and spatially distributed target locations.
#' Each thread will process ceiling(|Ng|/|Nt|) grids where
#' |Ng| denotes the number of grids and |Nt| denotes
#' the number of threads. Please be advised that
#' accessing the same file simultaneously with
#' multiple processes may result in errors.
#' @param regions sf/SpatVector object.
#' Computational regions. Only polygons are accepted.
#' @param split_level character(nrow(regions)) or character(1).
#' The regions will be split by the common level value.
#' The level should be higher than the original data level.
#' A field name with the higher level information is also accepted.
#' @param fun function supported in scomps.
#' @param ... Arguments passed to the argument \code{fun}.
#' @return a data.frame object with computation results. For entries of the results, consult the function used in \code{fun} argument.
#' @return a data.frame object with computation results.
#' For entries of the results, consult the function used in
#' \code{fun} argument.
#' @author Insang Song \email{geoissong@@gmail.com}
#'
#' @examples
#'
#' @examples
#' library(future)
#' plan(multicore, workers = 4)
#' # Does not run ...
#' # distribute_process_hierarchy()
#' @import future
#' @import future.apply
#' @import progressr
#' @export
distribute_process_hierarchy <- function(
regions,
regions,
split_level = NULL,
fun,
...) {
Expand All @@ -112,23 +135,30 @@ distribute_process_hierarchy <- function(
# pgrs <- progressr::progressor(along = seq_len(split_level))
regions_list <- split(regions, split_level)

results_distributed <- future.apply::future_lapply(
regions_list,
\(x) {
sf::sf_use_s2(FALSE)

run_result <- tryCatch({
res <- fun(..., grid_ref = x)
return(res)
},
error = function(e) return(data.frame(ID = NA)))
return(run_result)
},
future.seed = TRUE,
future.packages = c("terra", "sf", "dplyr", "scomps", "future"))
results_distributed <-
future_lapply(
regions_list,
\(x) {
sf::sf_use_s2(FALSE)
run_result <-
tryCatch(
{
res <- fun(..., grid_ref = x)
return(res)
},
error =
function(e) {
return(data.frame(ID = NA))
})
return(run_result)
},
future.seed = TRUE,
future.packages = c("terra", "sf", "dplyr",
"scomps", "future"))
results_distributed <- do.call(dplyr::bind_rows, results_distributed)
results_distributed <- results_distributed[!is.na(results_distributed[["ID"]]),]

results_distributed <-
results_distributed[!is.na(results_distributed[["ID"]]), ]

# post-processing
detected_id <- grep("^id", names(par_fun), value = TRUE)
detected_point <- grep("^(points|poly)", names(par_fun), value = TRUE)
Expand All @@ -144,11 +174,25 @@ distribute_process_hierarchy <- function(

#' @title Process a given function over multiple large rasters
#'
#' @description Large raster files usually exceed the memory capacity in size. Cropping a large raster into a small subset even consumes a lot of memory and adds processing time. This function leverages terra SpatRaster proxy to distribute computation jobs over multiple cores. It is assumed that users have multiple large raster files in their disk, then each file path is assigned to a thread. Each thread will directly read raster values from the disk using C++ pointers that operate in terra functions. For use, it is strongly recommended to use vector data with small and confined spatial extent for computation to avoid out-of-memory error. For this, users may need to make subsets of input vector objects in advance.
#' @param filenames character(n). A vector or list of full file paths of raster files. n is the total number of raster files.
#' @description Large raster files usually exceed the memory capacity in size.
#' Cropping a large raster into a small subset even consumes
#' a lot of memory and adds processing time.
#' This function leverages terra SpatRaster proxy
#' to distribute computation jobs over multiple cores.
#' It is assumed that users have multiple large raster files
#' in their disk, then each file path is assigned to a thread.
#' Each thread will directly read raster values from
#' the disk using C++ pointers that operate in terra functions.
#' For use, it is strongly recommended to use vector data with
#' small and confined spatial extent for computation to avoid
#' out-of-memory error. For this, users may need
#' to make subsets of input vector objects in advance.
#' @param filenames character(n). A vector or list of
#' full file paths of raster files. n is the total number of raster files.
#' @param fun function supported in scomps.
#' @param ... Arguments passed to the argument \code{fun}.
#' @return a data.frame object with computation results. For entries of the results, consult the function used in \code{fun} argument.
#' @return a data.frame object with computation results.
#' For entries of the results, consult the function used in \code{fun} argument.
#' @author Insang Song \email{geoissong@@gmail.com}
#'
#' @examples
Expand All @@ -157,10 +201,11 @@ distribute_process_hierarchy <- function(
#' # Does not run ...
#' # distribute_process_multirasters()
#' @import future
#' @import future.apply
#' @import progressr
#' @export
distribute_process_multirasters <- function(
filenames,
filenames,
fun,
...) {
par_fun <- list(...)
Expand All @@ -170,23 +215,27 @@ distribute_process_multirasters <- function(
}

file_list <- split(filenames, filenames)
results_distributed <- future.apply::future_lapply(
file_list,
\(x) {
sf::sf_use_s2(FALSE)

run_result <- tryCatch({
res <- fun(...)
return(res)
},
error = function(e) return(data.frame(ID = NA)))
return(run_result)
},
future.seed = TRUE,
future.packages = c("terra", "sf", "dplyr", "scomps", "future"))
results_distributed <-
future_lapply(
file_list,
\(x) {
sf::sf_use_s2(FALSE)

run_result <-
tryCatch({
res <- fun(...)
return(res)
},
error = function(e) return(data.frame(ID = NA)))
return(run_result)
},
future.seed = TRUE,
future.packages =
c("terra", "sf", "dplyr", "scomps", "future"))
results_distributed <- do.call(dplyr::bind_rows, results_distributed)
results_distributed <- results_distributed[!is.na(results_distributed[["ID"]]),]

results_distributed <-
results_distributed[!is.na(results_distributed[["ID"]]), ]

# post-processing
detected_id <- grep("^id", names(par_fun), value = TRUE)
detected_point <- grep("^(points|poly)", names(par_fun), value = TRUE)
Expand Down
23 changes: 0 additions & 23 deletions man/clip_as_extent_ras2.Rd

This file was deleted.

32 changes: 0 additions & 32 deletions man/distribute_process.Rd

This file was deleted.

Loading

0 comments on commit 3d811f2

Please sign in to comment.