From 721f21bf467c84bc73a79ff7af007b2a6aa29d6f Mon Sep 17 00:00:00 2001 From: DylanKierans Date: Mon, 18 Mar 2024 17:29:03 +0100 Subject: [PATCH] Tmp files for PSOCK cluster --- R/makePSOCKcluster.R | 70 ++++++++++++++++++++++++++++++++++++++++ R/r_instrument_ll.R | 12 +++---- src/makePSOCKcluster.cpp | 38 ++++++++++++++++++++++ 3 files changed, 114 insertions(+), 6 deletions(-) create mode 100644 R/makePSOCKcluster.R create mode 100644 src/makePSOCKcluster.cpp diff --git a/R/makePSOCKcluster.R b/R/makePSOCKcluster.R new file mode 100644 index 0000000..6d35fd5 --- /dev/null +++ b/R/makePSOCKcluster.R @@ -0,0 +1,70 @@ +# Functions for makePSOCKcluster, and potential makeSOCKcluster + +insert_instrumentation_on_new_proc <- function() +{ + flag_debug <- FALSE + + func_ptrs <- get_function_list() + num_func_ptrs <- sum(get_num_functions()) + + function_exception_list <- get_function_exception_list() + function_methods_exception_list <- get_function_methods(function_exception_list) + + assign_regionRef_array_client(num_function_ptrs) + open_otf2_regionRef_sockets() + + ## Starting new here + for (func_index in 1:num_func_ptrs){ + + func_ptr <- func_ptrs[[func_index]] + func_name <- names(func_ptrs)[[func_index]] + #package_name <- names(func_ptrs)[[func_index]] + env <- environment(func_ptrs[[func_index]]) + package_name <- environmentName(env) + + ## DEBUGGING - Display current function (before checks) + if (flag_debug) { + # print("#######################################") + # print(paste0("func_index: ", func_index)) + print(func_ptr) + print(func_name) + print(env) + print(paste0("package: ", package_name, ", function: ", func_name)) + } + + flag_user_function=FALSE + if (env==.GlobalEnv){flag_user_function=TRUE} + + #if (env == NULL){ print(paste0("NULL env, func_name: ", func_name)) } + print(paste0("func_name: ", func_name)) + + ## Test if function should be skipped + if ( skip_function(func_ptr, func_name, env, function_exception_list, function_methods_exception_list)) { + print(paste0("Skipping: ", func_name)) + next; # skip to next loop + } + + ## Get otf2 regionRef + regionRef <- get_regionReg_from_array_client(func_index) + if (pkg.env$PRINT_INSTRUMENTS) { + print(paste0("INSTRUMENTING: function `", func_name,"`", + ", regionRef: ", regionRef)) + } + + ## Wrap function with debug info + insert_instrumentation(func_ptr, func_name, func_index, + regionRef, package_name, + env_is_locked=!pkg.env$UNLOCK_ENVS, + flag_user_function=flag_user_function) + + } + close_otf2_regionRef_sockets() +} + + +#if (FALSE){ + + +# ## Label as instrumented in instrumentation dataframe +# pkg.env$PROFILE_INSTRUMENTATION_DF[["function_instrumented"]][func_global_index] <- TRUE +# diff --git a/R/r_instrument_ll.R b/R/r_instrument_ll.R index 8a3099c..4f644a2 100644 --- a/R/r_instrument_ll.R +++ b/R/r_instrument_ll.R @@ -435,6 +435,12 @@ skip_function <- function(func_ptr, func_name, env, return(TRUE) } + ## 3 - Skip if primitive function - DEBUGGING (some are problematic) + if ( is.primitive(func_ptr) ) { + if (pkg.env$PRINT_SKIPS) print(paste0("SKIPPING: function `", func_name, "` is PRIMITVE function")) + return(TRUE) + } + ## Skip if function not defined in current package if ( !exists(func_name, envir = env, inherits=T)) { if (pkg.env$PRINT_SKIPS) print(paste0("SKIPPING: function `", func_name, "` DOES NOT exist in package env: ", env)) @@ -455,12 +461,6 @@ skip_function <- function(func_ptr, func_name, env, return(TRUE) } - ## 3 - Skip if primitive function - DEBUGGING (some are problematic) - if ( is.primitive(func_ptr) ) { - if (pkg.env$PRINT_SKIPS) print(paste0("SKIPPING: function `", func_name, "` is PRIMITVE function")) - return(TRUE) - } - ## 4 - Skip if not language body - DEBUGGING (symbol in na.null() was causing issues) if ( typeof(body(func_ptr)) != "language" ) { if (pkg.env$PRINT_SKIPS) print(paste0("SKIPPING: function `", func_name, "` body is type: ", typeof(body(func_ptr)))) diff --git a/src/makePSOCKcluster.cpp b/src/makePSOCKcluster.cpp new file mode 100644 index 0000000..925742c --- /dev/null +++ b/src/makePSOCKcluster.cpp @@ -0,0 +1,38 @@ +void *context; +void *regionRef_socket_client; +void *regionRef_socket_server; +int *regionReg_array; + +//int *regionReg_array = malloc(num_func * sizeof(*region_ref_vs_func_index)); +//free(regionReg_array); + +// Confirm num_functions, then send all regionRef in order +void open_otf2_regionRef_sockets_server(){ + context = zmq_ctx_new (); + regionRef_socket_server = zmq_socket (context, ZMQ_PUSH); + int rc = zmq_bind (regionRef_socket_server, "tcp://*:5558"); + + int zmq_ret = zmq_send(regionRef_socket_server, regionReg_array, num_functions*sizeof(*region_ref_vs_func_index)); +} + +RcppExport SEXP open_otf2_regionRef_sockets_clients(){ + context = zmq_ctx_new (); + regionRef_socket_client = zmq_socket (context, ZMQ_PULL); + zmq_bind (regionRef_socket_client, "tcp://localhost:5558"); + + int zmq_ret = zmq_recv(); +} + +RcppExport SEXP assign_regionRef_array_client(int num_function_ptrs){ + regionReg_array = malloc(num_function_ptrs*sizeof(*regionRef_array)); + return(R_NilValue); +} + +RcppExport SEXP get_regionRef_from_array_client(int func_index){ + return(regionRef_array[func_index]); +} + +RcppExport SEXP free_regionRef_array_client(){ + free(regionReg_array); + return(R_NilValue); +} \ No newline at end of file