From c294ae1b1e13711e9141bbcf4067c7effd908744 Mon Sep 17 00:00:00 2001 From: DylanKierans Date: Mon, 25 Mar 2024 11:12:46 +0100 Subject: [PATCH] Correcting auto-generated documentation for non-exported functions. Some internal functions are still exported unnecessarily --- R/RcppExports.R | 63 +----- R/r_fork_functions.R | 22 +- R/r_instrument_ll.R | 1 - man/finalize_otf2_client.Rd | 2 +- man/get_regionRef_array_slave.Rd | 2 +- man/get_regionRef_from_array_slave.Rd | 2 +- man/init_otf2_logger.Rd | 11 +- man/print_errnos.Rd | 2 +- src/RcppExports.cpp | 9 +- src/rTrace.cpp | 297 ++++++++++++++------------ 10 files changed, 190 insertions(+), 221 deletions(-) diff --git a/R/RcppExports.R b/R/RcppExports.R index 419881b..44d8c91 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -1,64 +1,14 @@ # Generated by using Rcpp::compileAttributes() -> do not edit by hand # Generator token: 10BE3573-1514-4C36-9D1C-5A225CD40393 -#' finalize_otf2_server -#' @return R_NilValue -NULL - -#' Initialize static otf2 {archive} objs -#' @param archivePath Path to the archive i.e. the directory where the anchor file is located. -#' @param archiveName Name of the archive. It is used to generate sub paths e.g. "archiveName.otf2" -#' @return R_NilValue -NULL - -#' Close static otf2 {archive} objs -NULL - -#' Initialize static otf2 {evt_writers} objs -NULL - -#' Close static otf2 {evt_writers} objs -NULL - -#' Enable or disable event measurement -#' @param evt_writer Event writer linked to proc -#' @param time Timestamp -#' @param measurementMode True to enable, else disable -NULL - -#' Init static otf2 {globaldefwriter} obj -NULL - -#' Finalize static otf2 {globaldefwriter} obj -#' Write clock information before ending tracing -NULL - -#' Define new id-value pair in globaldefwriter -#' @param stringRefValue String assigned to given id -#' @return NUM_STRINGREF -NULL - -#' globalDefWriter_WriteRegion -#' Define new region description in global writer -#' @param stringRef_RegionName Name to be associated with region -#' @return regionRef id/index for string -NULL - -#' Write the system tree including a definition for the location group to the global definition writer. -#' @param stringRef_name Name to be associated with SystemTreeNode (eg MyHost) -#' @param stringRef_class Class to be associated with SystemTreeNode (eg node) -NULL - -#' Write a definition for the location to the global definition writer. -NULL - #' Fork and initialize zeromq sockets for writing globalDef definitions #' @param max_nprocs Maximum number of R processes (ie evtWriters required) #' @param archivePath Path to otf2 archive #' @param archiveName Name of otf2 archive -#' @return R_NilValue -init_otf2_logger <- function(max_nprocs, archivePath = "./rTrace", archiveName = "rTrace") { - .Call('_rTrace_init_otf2_logger', PACKAGE = 'rTrace', max_nprocs, archivePath, archiveName) +#' @param flag_print_pids True to print pids of parent and child procs +#' @return <0 if error, 0 if R master, else >0 if child +init_otf2_logger <- function(max_nprocs, archivePath = "./rTrace", archiveName = "rTrace", flag_print_pids = FALSE) { + .Call('_rTrace_init_otf2_logger', PACKAGE = 'rTrace', max_nprocs, archivePath, archiveName, flag_print_pids) } #' assign_regionRef_array_master @@ -78,7 +28,7 @@ assign_regionRef_array_slave <- function(num_funcs) { } #' get_regionRef_from_array_slave -#' @param func_index Index of function to fet regionRef for +#' @param func_index Index of function to get regionRef for #' @return regionRef get_regionRef_from_array_slave <- function(func_index) { .Call('_rTrace_get_regionRef_from_array_slave', PACKAGE = 'rTrace', func_index) @@ -111,6 +61,7 @@ finalize_EvtWriter_client <- function() { } #' finalize_otf2_client +#' @description Send signal to server to stop collecting event information #' @return R_NilValue finalize_otf2_client <- function() { .Call('_rTrace_finalize_otf2_client', PACKAGE = 'rTrace') @@ -164,6 +115,7 @@ set_maxUsedLocationRef_client <- function(nprocs) { } #' print_errnos +#' @description Print error numbers relating to zmq sockets #' @return R_NilValue print_errnos <- function() { .Call('_rTrace_print_errnos', PACKAGE = 'rTrace') @@ -193,6 +145,7 @@ get_regionRef_array_master <- function(nprocs) { } #' get_regionRef_array_slave +#' @description Requests regionRef array from logger proc #' @param num_funcs Total number of functions in R namespace #' @return R_NilValue get_regionRef_array_slave <- function(num_funcs) { diff --git a/R/r_fork_functions.R b/R/r_fork_functions.R index a1ab732..71eb21b 100644 --- a/R/r_fork_functions.R +++ b/R/r_fork_functions.R @@ -196,7 +196,7 @@ get_fork_wrapper_expression <- function() { exit_exp <- expression( { on.exit({ ## DEBUGGING - print(paste0("makeForkCluster nnodes: ", nnodes)) + #print(paste0("makeForkCluster nnodes: ", nnodes)) # Set r proc IDs - note master=0 clusterApply(cl, 1:as.integer(nnodes), function(x){ set_locationRef(x); }) @@ -307,7 +307,6 @@ get_psock_wrapper_expression <- function() { # Save instrumentation state INSTRUMENTATION_ENABLED_BEFORE <- is_instrumentation_enabled() - ## DEBUGGING if (pkg.env$INSTRUMENTATION_ENABLED) { ## Append to depth counter pkg.env$FUNCTION_DEPTH <- pkg.env$FUNCTION_DEPTH + 1 @@ -336,13 +335,6 @@ get_psock_wrapper_expression <- function() { # WARNING: muster go after master_init_slave(), after importing function clusterApply(cl, 1:nnodes, function(x){ set_locationRef(x); }) - ## DEBUGGING - clusterEvalQ(cl, print(.packages())) - print(.packages()) - - # YOU ARE HERE #1 - ## DEBUGGING - Check if func_list are equal or nej - # Reopen sockets on all procs clusterEvalQ(cl, {open_EvtWriterSocket_client()}); @@ -393,10 +385,6 @@ master_init_slave <- function(cl) { pkg_cmd <- paste0(tmp, pkg_cmd) } - ## DEBUGGING - print("package_list: ") - print(pkg_cmd) - # Exports libraries parallel::clusterExport(cl, c("pkg_cmd"), envir=environment()) parallel::clusterEvalQ(cl, eval(parse(text = pkg_cmd))) @@ -419,13 +407,5 @@ master_init_slave <- function(cl) { user_func_list <- get_user_function_list() parallel::clusterExport(cl, names(user_func_list), envir=.GlobalEnv) - ## DEBUGGING - #print(names(user_func_list)) - - ## Assign regionRef_array on slave - #parallel::clusterEvalQ(cl, assign_regionRef_array_slave( - # sum(get_num_functions(flag_user_functions = T)) - #)) - } diff --git a/R/r_instrument_ll.R b/R/r_instrument_ll.R index 515e71a..7d286db 100644 --- a/R/r_instrument_ll.R +++ b/R/r_instrument_ll.R @@ -288,7 +288,6 @@ try_insert_instrumentation <- function(func_info, func_ptrs, env_is_locked, # Get new body for funcs of type: {fork_function, end_fork_function, default} body(func_ptr) <- get_new_function_body(func_ptr, func_name, regionRef) - ## DEBUGGING: Comment out to disable compiling for testing ## TODO: Add check for if compiled before, recompile #func_ptr <- compiler::cmpfun(func_ptr) diff --git a/man/finalize_otf2_client.Rd b/man/finalize_otf2_client.Rd index b81b0b5..0b91cdf 100644 --- a/man/finalize_otf2_client.Rd +++ b/man/finalize_otf2_client.Rd @@ -10,5 +10,5 @@ finalize_otf2_client() R_NilValue } \description{ -finalize_otf2_client +Send signal to server to stop collecting event information } diff --git a/man/get_regionRef_array_slave.Rd b/man/get_regionRef_array_slave.Rd index 3402299..a877922 100644 --- a/man/get_regionRef_array_slave.Rd +++ b/man/get_regionRef_array_slave.Rd @@ -13,5 +13,5 @@ get_regionRef_array_slave(num_funcs) R_NilValue } \description{ -get_regionRef_array_slave +Requests regionRef array from logger proc } diff --git a/man/get_regionRef_from_array_slave.Rd b/man/get_regionRef_from_array_slave.Rd index 7aaaafa..cb945ea 100644 --- a/man/get_regionRef_from_array_slave.Rd +++ b/man/get_regionRef_from_array_slave.Rd @@ -7,7 +7,7 @@ get_regionRef_from_array_slave(func_index) } \arguments{ -\item{func_index}{Index of function to fet regionRef for} +\item{func_index}{Index of function to get regionRef for} } \value{ regionRef diff --git a/man/init_otf2_logger.Rd b/man/init_otf2_logger.Rd index 6672bf6..30c7785 100644 --- a/man/init_otf2_logger.Rd +++ b/man/init_otf2_logger.Rd @@ -4,7 +4,12 @@ \alias{init_otf2_logger} \title{Fork and initialize zeromq sockets for writing globalDef definitions} \usage{ -init_otf2_logger(max_nprocs, archivePath = "./rTrace", archiveName = "rTrace") +init_otf2_logger( + max_nprocs, + archivePath = "./rTrace", + archiveName = "rTrace", + flag_print_pids = FALSE +) } \arguments{ \item{max_nprocs}{Maximum number of R processes (ie evtWriters required)} @@ -12,9 +17,11 @@ init_otf2_logger(max_nprocs, archivePath = "./rTrace", archiveName = "rTrace") \item{archivePath}{Path to otf2 archive} \item{archiveName}{Name of otf2 archive} + +\item{flag_print_pids}{True to print pids of parent and child procs} } \value{ -R_NilValue +<0 if error, 0 if R master, else >0 if child } \description{ Fork and initialize zeromq sockets for writing globalDef definitions diff --git a/man/print_errnos.Rd b/man/print_errnos.Rd index eb24804..a5e3b6f 100644 --- a/man/print_errnos.Rd +++ b/man/print_errnos.Rd @@ -10,5 +10,5 @@ print_errnos() R_NilValue } \description{ -print_errnos +Print error numbers relating to zmq sockets } diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index 824a3bd..d3048a0 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -11,15 +11,16 @@ Rcpp::Rostream& Rcpp::Rcerr = Rcpp::Rcpp_cerr_get(); #endif // init_otf2_logger -RcppExport int init_otf2_logger(int max_nprocs, Rcpp::String archivePath, Rcpp::String archiveName); -RcppExport SEXP _rTrace_init_otf2_logger(SEXP max_nprocsSEXP, SEXP archivePathSEXP, SEXP archiveNameSEXP) { +RcppExport int init_otf2_logger(int max_nprocs, Rcpp::String archivePath, Rcpp::String archiveName, bool flag_print_pids); +RcppExport SEXP _rTrace_init_otf2_logger(SEXP max_nprocsSEXP, SEXP archivePathSEXP, SEXP archiveNameSEXP, SEXP flag_print_pidsSEXP) { BEGIN_RCPP Rcpp::RObject rcpp_result_gen; Rcpp::RNGScope rcpp_rngScope_gen; Rcpp::traits::input_parameter< int >::type max_nprocs(max_nprocsSEXP); Rcpp::traits::input_parameter< Rcpp::String >::type archivePath(archivePathSEXP); Rcpp::traits::input_parameter< Rcpp::String >::type archiveName(archiveNameSEXP); - rcpp_result_gen = Rcpp::wrap(init_otf2_logger(max_nprocs, archivePath, archiveName)); + Rcpp::traits::input_parameter< bool >::type flag_print_pids(flag_print_pidsSEXP); + rcpp_result_gen = Rcpp::wrap(init_otf2_logger(max_nprocs, archivePath, archiveName, flag_print_pids)); return rcpp_result_gen; END_RCPP } @@ -247,7 +248,7 @@ END_RCPP } static const R_CallMethodDef CallEntries[] = { - {"_rTrace_init_otf2_logger", (DL_FUNC) &_rTrace_init_otf2_logger, 3}, + {"_rTrace_init_otf2_logger", (DL_FUNC) &_rTrace_init_otf2_logger, 4}, {"_rTrace_assign_regionRef_array_master", (DL_FUNC) &_rTrace_assign_regionRef_array_master, 1}, {"_rTrace_assign_regionRef_array_slave", (DL_FUNC) &_rTrace_assign_regionRef_array_slave, 1}, {"_rTrace_get_regionRef_from_array_slave", (DL_FUNC) &_rTrace_get_regionRef_from_array_slave, 1}, diff --git a/src/rTrace.cpp b/src/rTrace.cpp index 1c870ba..f8631d3 100644 --- a/src/rTrace.cpp +++ b/src/rTrace.cpp @@ -38,6 +38,7 @@ using namespace Rcpp; +// Different events during entry collection phase typedef enum { ZMQ_OTF2_EVENT_ENTER, ZMQ_OTF2_EVENT_LEAVE, @@ -47,19 +48,21 @@ typedef enum { ZMQ_OTF2_SOCK_CLUSTER } zmq_otf2_datatypes; +// Struct used for majority of data transfer during event collection phase typedef struct Zmq_otf2_data { - OTF2_TimeStamp time; - OTF2_RegionRef regionRef; ///< Could probably generalize this datatype better, see set_maxUsedLocationRef() + OTF2_TimeStamp time; + OTF2_RegionRef regionRef; ///< Could probably generalize this datatype better, used for diverse int-like datatypes, see set_maxUsedLocationRef() pid_t pid; zmq_otf2_datatypes datatype; } Zmq_otf2_data; +// Struct used for defining globalDefWriter key-values typedef struct Zmq_otf2_defWriter { char func_name[MAX_FUNCTION_NAME_LEN]; int func_index; } Zmq_otf2_defWriter; - +// OTF2 objects for logger static OTF2_Archive* archive; static OTF2_GlobalDefWriter* global_def_writer; OTF2_TimeStamp epoch_start, epoch_end; // OTF2_GlobalDefWriter_WriteClockProperties @@ -72,15 +75,16 @@ static void *requester; ///* zmq socket - master(5555) and slaves(5559) (comm static void *pusher; ///* zmq socket - clients (comm with puller for EvtWriter) // Counters -static OTF2_StringRef NUM_STRINGREF=0; ///* Number of events recorded with WriteString +static const OTF2_StringRef OFFSET_NUM_STRINGREF=10; ///* Offset for NUM_STRINGREF to avoid overwriting +static OTF2_StringRef NUM_STRINGREF=OFFSET_NUM_STRINGREF; ///* Number of events recorded with WriteString, offset to avoid overwriting static OTF2_RegionRef NUM_REGIONREF=0; ///* Number of regions recorded with WriteRegion static OTF2_RegionRef *regionRef_array; ///* regionRef for each func_index on server static int NUM_FUNCS; ///* total num R functions to instrument - length(reigonRef_array) // IDs -static OTF2_LocationRef maxLocationRef=0; ///< Cap for max number of R procs -static OTF2_LocationRef maxUsedLocationRef=1; ///< Maximum number of used R procs 0 if child // [[Rcpp::export]] RcppExport int init_otf2_logger(int max_nprocs, Rcpp::String archivePath = "./rTrace", - Rcpp::String archiveName = "rTrace" ) + Rcpp::String archiveName = "rTrace", bool flag_print_pids=false) { // TODO: Verify this acts as intended to save child proc signal(SIGHUP, signal_hup_handler); - // - //signal(SIGTERM, signal_term_handler); - pid_t child_pid = fork(); if (child_pid == (pid_t) -1 ){ // ERROR report_and_exit("Forking logger process", NULL); @@ -221,38 +216,42 @@ RcppExport int init_otf2_logger(int max_nprocs, Rcpp::String archivePath = "./rT if (child_pid == 0) { // Child process // DEBUGGING - Rcout << "LOGGER PROC - pid: " << getpid() << ", child_pid:" << child_pid << "\n"; + if (flag_print_pids){ + Rcout << "LOGGER PROC - pid: " << getpid() << ", child_pid:" << child_pid << "\n"; + } IS_LOGGER = true; maxLocationRef = max_nprocs; // Open log file fp = fopen(log_filename, "w"); - if (fp==NULL){ report_and_exit("Opening file", NULL); } - fupdate(fp, "File opened\n"); + if (fp==NULL){ report_and_exit("Opening log file", NULL); } + fupdate_server(fp, "File opened\n"); // OTF2 Objs init_Archive_server(archivePath, archiveName); - fupdate(fp, "Init archive complete\n"); + fupdate_server(fp, "Init archive complete\n"); init_EvtWriters_server( ); - fupdate(fp, "Init evt_writers complete\n"); + fupdate_server(fp, "Init evt_writers complete\n"); init_GlobalDefWriter_server(); - fupdate(fp, "Init of otf2 objs complete\n"); + fupdate_server(fp, "Init of otf2 objs complete\n"); // Init zmq context context = zmq_ctx_new (); // Assign array for regionRefs of each func assign_regionRef_array_server(); + fupdate_server(fp, "assign_regionRef_array_server complete\n"); + // Server for logging GlobalDefWriter strings®ions globalDefWriter_server(); - fupdate(fp, "globalDefWriter_server complete\n"); + fupdate_server(fp, "globalDefWriter_server complete\n"); // Server listens for events - fupdate(fp, "evtWriter\n"); + fupdate_server(fp, "evtWriter\n"); run_evtWriters_server(); - fupdate(fp, "evtWriter complete\n"); + fupdate_server(fp, "evtWriter complete\n"); // Write definitions for proc structures globalDefWriter_WriteSystemTreeNode_server(0,0); // 1 system tree node @@ -265,7 +264,7 @@ RcppExport int init_otf2_logger(int max_nprocs, Rcpp::String archivePath = "./rT finalize_Archive_server(); finalize_otf2_server(); free_regionRef_array_server(); - fupdate(fp, "COMPLETE!\n"); + fupdate_server(fp, "COMPLETE!\n"); if (fp!=NULL){fclose(fp);} return(1); @@ -273,7 +272,9 @@ RcppExport int init_otf2_logger(int max_nprocs, Rcpp::String archivePath = "./rT } else { // DEBUGGING - Rcout << "MASTER PROC - pid: " << getpid() << ", child_pid:" << child_pid << "\n"; + if (flag_print_pids){ + Rcout << "MASTER PROC - pid: " << getpid() << ", child_pid:" << child_pid << "\n"; + } IS_LOGGER = false; context = zmq_ctx_new(); @@ -327,13 +328,15 @@ RcppExport SEXP assign_regionRef_array_slave(int num_funcs) { } //' get_regionRef_from_array_slave -//' @param func_index Index of function to fet regionRef for +//' @param func_index Index of function to get regionRef for //' @return regionRef // [[Rcpp::export]] RcppExport int get_regionRef_from_array_slave(int func_index) { return(regionRef_array[func_index-1]); // Fix offset in C } +// @name assign_regionRef_array_server +// @description Listen for num_funcs then alloc OTF2_RegionRef[] as required void assign_regionRef_array_server(){ void *regionRef_socket_server; int rc, zmq_ret; @@ -354,11 +357,10 @@ void assign_regionRef_array_server(){ zmq_close(regionRef_socket_server); // Close socket immediately (reopened later for events) - // DEBUGGING - fupdate(fp, "Finished assign_regionRef_array_server"); - } +// @name free_regionRef_array_server +// @description Free memory assigned for regionRef_array void free_regionRef_array_server(){ free(regionRef_array); } @@ -371,7 +373,9 @@ RcppExport SEXP free_regionRef_array_slave(){ return(R_NilValue); } -// Send 0 length signal to end this portion +// @name globalDefWriter_server +// @description Receive globalDef strings, and return with send regionRef +// Ends when recvs message of length 0 from client void globalDefWriter_server() { // Server int zmq_ret, rc; // Error check send/recvs, and sockets void *responder; // Recv global_defs, respond with regionRef @@ -384,7 +388,7 @@ void globalDefWriter_server() { // Server // DEBUGGING char fp_buffer[50]; snprintf(fp_buffer, 50, "(pid: %d) Listening for globalDefWriter\n", getpid()); - fupdate(fp, fp_buffer); + fupdate_server(fp, fp_buffer); // Receive globalDef strings, and return with send regionRef int iter=0; ///< Number of messages received @@ -417,7 +421,7 @@ void globalDefWriter_server() { // Server // DEBUGGING snprintf(fp_buffer, 50, "(pid: %d) Finished listening for globalDefWriter\n", getpid()); - fupdate(fp, fp_buffer); + fupdate_server(fp, fp_buffer); // Cleanup socket zmq_close(responder); @@ -439,7 +443,6 @@ RcppExport SEXP finalize_GlobalDefWriter_client() { // client } -// TODO - add func_index here! //' define_otf2_regionRef_client //' @param func_name Name of function to create event for //' @param func_index Global index of function in R namespace @@ -450,7 +453,7 @@ RcppExport int define_otf2_regionRef_client(Rcpp::String func_name, int func_ind OTF2_RegionRef regionRef; int zmq_ret; - // Send function index and name + // Send function index and name in Zmq_otf2_defWriter struct buffer.func_index = func_index; strncpy(buffer.func_name, func_name.get_cstring(), sizeof(buffer.func_name)); zmq_ret = zmq_send(requester, &buffer, sizeof(buffer), 0); // ZMQ ID: 1 @@ -491,6 +494,7 @@ RcppExport SEXP finalize_EvtWriter_client() { } //' finalize_otf2_client +//' @description Send signal to server to stop collecting event information //' @return R_NilValue // [[Rcpp::export]] RcppExport SEXP finalize_otf2_client() { @@ -531,8 +535,9 @@ RcppExport SEXP finalize_otf2_client() { } -//' finalize_otf2_server -//' @return R_NilValue +// @name finalize_otf2_server +// @description Signal to client end of server work (synchronization) +// @return R_NilValue void finalize_otf2_server() { int zmq_ret; void *syncer_server; @@ -553,10 +558,10 @@ void finalize_otf2_server() { } -//' Initialize static otf2 {archive} objs -//' @param archivePath Path to the archive i.e. the directory where the anchor file is located. -//' @param archiveName Name of the archive. It is used to generate sub paths e.g. "archiveName.otf2" -//' @return R_NilValue +// @name init_Archive_server +// @description Initialize static otf2 {archive} objs +// @param archivePath Path to the archive i.e. the directory where the anchor file is located. +// @param archiveName Name of the archive. It is used to generate sub paths e.g. "archiveName.otf2" void init_Archive_server(Rcpp::String archivePath="./rTrace", Rcpp::String archiveName="rTrace") { archive = OTF2_Archive_Open( archivePath.get_cstring(), @@ -579,7 +584,8 @@ void init_Archive_server(Rcpp::String archivePath="./rTrace", Rcpp::String archi } -//' Close static otf2 {archive} objs +// @name finalize_Archive_server +// @description Close static otf2 {archive} objs void finalize_Archive_server() { // DEBUGGING if (archive == NULL) { report_and_exit("finalize_Archive archive", NULL); } @@ -588,13 +594,13 @@ void finalize_Archive_server() { OTF2_Archive_Close( archive ); // Reset counters - NUM_STRINGREF = 0; + NUM_STRINGREF = OFFSET_NUM_STRINGREF; NUM_REGIONREF = 0; } -// TODO: Multiproc this! -//' Initialize static otf2 {evt_writers} objs +// @name init_EvtWriters_server +// @description Initialize static otf2 {evt_writers} objs void init_EvtWriters_server() { // DEBUGGING: OTF2_Archive_GetEvtWriter throwing error if (archive == NULL) { report_and_exit("init_EvtWriters_server archive", NULL); } @@ -602,11 +608,6 @@ void init_EvtWriters_server() { // Make sure maxLocationRef set if (maxLocationRef < 1){ report_and_exit("init_EvtWriters_server maxLocationRef", NULL); } - // DEBUGGING - char fp_buffer[50]; - snprintf(fp_buffer, 50, "maxLocationRef: %lu\n", maxLocationRef); - fupdate(fp, fp_buffer); - // Get a event writer for each location evt_writers = (OTF2_EvtWriter**)malloc(maxLocationRef*sizeof(*evt_writers)); if (evt_writers == NULL) { report_and_exit("init_EvtWriters_server evt_writers", NULL); } @@ -616,7 +617,8 @@ void init_EvtWriters_server() { } -//' Close static otf2 {evt_writers} objs +// @name finalize_EvtWriters_server +// @description Close static otf2 {evt_writers} objs void finalize_EvtWriters_server() { // DEBUGGING if (evt_writers == NULL) { report_and_exit("finalize_EvtWriters_server evt_writers", NULL); } @@ -634,10 +636,11 @@ void finalize_EvtWriters_server() { } -//' Enable or disable event measurement -//' @param evt_writer Event writer linked to proc -//' @param time Timestamp -//' @param measurementMode True to enable, else disable +// Enable or disable event measurement +// @name evtWriter_MeasurementOnOff_server +// @param evt_writer Event writer linked to proc +// @param time Timestamp +// @param measurementMode True to enable, else disable void evtWriter_MeasurementOnOff_server(OTF2_EvtWriter *evt_writer, OTF2_TimeStamp time, bool measurementMode) { if (measurementMode){ OTF2_EvtWriter_MeasurementOnOff(evt_writer, @@ -673,7 +676,8 @@ RcppExport SEXP evtWriter_MeasurementOnOff_client(bool measurementMode) { } -//' Init static otf2 {globaldefwriter} obj +// @name init_GlobalDefWriter_server +// @description Initialize static otf2 {globaldefwriter} obj void init_GlobalDefWriter_server() { // DEBUGGING if (archive == NULL) { report_and_exit("init_GlobalDefWriter archive", NULL); } @@ -689,8 +693,9 @@ void init_GlobalDefWriter_server() { globalDefWriter_WriteString_server(stringRefValue); } -//' Finalize static otf2 {globaldefwriter} obj -//' Write clock information before ending tracing +// @name finalize_GlobalDefWriter_server +// @description Finalize static otf2 {globaldefwriter} obj +// Write clock information before ending tracing void finalize_GlobalDefWriter_server() { #ifdef DUMMY_TIMESTEPS // We need to define the clock used for this trace and the overall timestamp range. @@ -714,9 +719,10 @@ void finalize_GlobalDefWriter_server() { } -//' Define new id-value pair in globaldefwriter -//' @param stringRefValue String assigned to given id -//' @return NUM_STRINGREF +// @name globalDefWriter_WriteString_server +// @description Define new id-value pair in globaldefwriter +// @param stringRefValue String assigned to given id +// @return NUM_STRINGREF OTF2_StringRef globalDefWriter_WriteString_server(Rcpp::String stringRefValue) { OTF2_GlobalDefWriter_WriteString(global_def_writer, NUM_STRINGREF, stringRefValue.get_cstring() ); @@ -724,10 +730,10 @@ OTF2_StringRef globalDefWriter_WriteString_server(Rcpp::String stringRefValue) } -//' globalDefWriter_WriteRegion -//' Define new region description in global writer -//' @param stringRef_RegionName Name to be associated with region -//' @return regionRef id/index for string +// @name globalDefWriter_WriteRegion +// @description Define new region description in global writer +// @param stringRef_RegionName Name to be associated with region +// @return regionRef id/index for string OTF2_RegionRef globalDefWriter_WriteRegion_server(OTF2_StringRef stringRef_RegionName) { OTF2_GlobalDefWriter_WriteRegion( global_def_writer, NUM_REGIONREF /* RegionRef */, @@ -746,12 +752,12 @@ OTF2_RegionRef globalDefWriter_WriteRegion_server(OTF2_StringRef stringRef_Regio // TODO: Get names from sys calls -//' Write the system tree including a definition for the location group to the global definition writer. -//' @param stringRef_name Name to be associated with SystemTreeNode (eg MyHost) -//' @param stringRef_class Class to be associated with SystemTreeNode (eg node) +// @name globalDefWriter_WriteSystemTreeNode_server +// @description Write the system tree including a definition for the location group to the global definition writer. +// @param stringRef_name Name to be associated with SystemTreeNode (eg MyHost) +// @param stringRef_class Class to be associated with SystemTreeNode (eg node) void globalDefWriter_WriteSystemTreeNode_server( OTF2_StringRef stringRef_name, OTF2_StringRef stringRef_class) { - // TODO: Get NodeName from syscall // Write the system tree incl definition for location group to global definition writer. OTF2_StringRef stringRef_SystemTreeNodeName = globalDefWriter_WriteString_server("MyHost"); OTF2_StringRef stringRef_SystemTreeNodeClass = globalDefWriter_WriteString_server("node"); @@ -762,7 +768,8 @@ void globalDefWriter_WriteSystemTreeNode_server( OTF2_StringRef stringRef_name, OTF2_UNDEFINED_SYSTEM_TREE_NODE /* parent */ ); } -// Write LocationGroup (ie proc) information +// @name globalDefWriter_WriteLocationGroups_server +// @description Write LocationGroup (ie proc) information void globalDefWriter_WriteLocationGroups_server() { // Do master process first @@ -786,13 +793,14 @@ void globalDefWriter_WriteLocationGroups_server() { } } -//' Write a definition for the location to the global definition writer. +// @name globalDefWriter_WriteLocations_server +// @description Write a definition for the location to the global definition writer. void globalDefWriter_WriteLocations_server() { OTF2_StringRef stringRef_LocationName = globalDefWriter_WriteString_server("Main thread"); char fp_buffer[100]; snprintf(fp_buffer, 100, "maxUsedLocationRef: %lu\n", maxUsedLocationRef); - fupdate(fp, fp_buffer); + fupdate_server(fp, fp_buffer); for (OTF2_LocationRef i=0; i maxUsedLocationRef){ maxUsedLocationRef = buffer.regionRef; } } else if (buffer.datatype == ZMQ_OTF2_SOCK_CLUSTER){ // ZMQ ID: 5f - // TODO put in function get_regionRef_array_server OTF2_RegionRef num_new_procs = buffer.regionRef; - for (OTF2_RegionRef i = 0; i< num_new_procs; ++i){ - int proc_id; - zmq_ret = zmq_recv(new_proc_rep, &proc_id, sizeof(proc_id), 0); // ZMQ ID: 6a - if (zmq_ret < 0 ) { - zmq_close(new_proc_rep); - zmq_close(puller); - report_and_exit("run_evtWriters_server new_proc_rep zmq_recv", NULL); - } - snprintf(fp_buffer, 50, "Received newproc signal from proc %d\n", proc_id); - fupdate(fp, fp_buffer); - - fupdate(fp, "Starting send regionRef_array"); - zmq_ret = zmq_send(new_proc_rep, regionRef_array, NUM_FUNCS*sizeof(*regionRef_array), 0); // ZMQ ID: 6b - if ( zmq_ret < 0 ){ - zmq_close(new_proc_rep); - zmq_close(puller); - report_and_exit("run_evtWriters_server new_proc_rep zmq_send", NULL); - } - fupdate(fp, "Finished send regionRef_array"); - + if (get_regionRef_array_server(num_new_procs, new_proc_rep) != 0){ + zmq_close(new_proc_rep); + zmq_close(puller); + report_and_exit("get_regionRef_array_server"); } } } else if (zmq_ret > 0) { // Unknown datatype @@ -981,19 +975,54 @@ void run_evtWriters_server(){ } snprintf(fp_buffer, 50, "(pid: %d) Finished listening for evtWriters\n", getpid()); - fupdate(fp, fp_buffer); + fupdate_server(fp, fp_buffer); // Cleanup socket zmq_close(puller); zmq_close(new_proc_rep); } +// @name get_regionRef_array_server +// @description Triggered by receiving ZMQ_OTF2_SOCK_CLUSTER +// @param num_new_procs Number of new procs spawned +// @param responder ZMQ_REP socket recv proc_id, send regionRef_array +// @return non-zero if error +int get_regionRef_array_server(OTF2_RegionRef num_new_procs, void *responder){ + int proc_id; + int zmq_ret; + char fp_buffer[50]; + + for (OTF2_RegionRef i = 0; i < num_new_procs; ++i){ + + zmq_ret = zmq_recv(responder, &proc_id, sizeof(proc_id), 0); // ZMQ ID: 6a + if (zmq_ret < 0 ) { return (-1); } + + // DEBUGGING + snprintf(fp_buffer, 50, "Received newproc signal from proc %d\n", proc_id); + fupdate_server(fp, fp_buffer); + fupdate_server(fp, "Starting send regionRef_array"); + + zmq_ret = zmq_send(responder, regionRef_array, NUM_FUNCS*sizeof(*regionRef_array), 0); // ZMQ ID: 6b + if ( zmq_ret < 0 ){ return (-2); } + + // DEBUGGING + fupdate_server(fp, "Finished send regionRef_array"); + } + return(0); + +} /////////////////////////////// // Helper functions /////////////////////////////// -// @TODO: Replace usage of this with more R-friendly version +//@TODO: Replace usage of this with more R-compliant exit strategy +//@TODO: Replace usage of this with more R-friendly error message strategy +// @name report_and_exit +// @description Print error to log file, close zmq sockets +// and context then exit +// @param msg Error message to display +// @param socket Additional non-global zmq socket to close void report_and_exit(const char* msg, void *socket){ // Close any open sockets if (pusher != NULL ) zmq_close(pusher); @@ -1001,7 +1030,7 @@ void report_and_exit(const char* msg, void *socket){ if (socket != NULL ) zmq_ctx_destroy(socket); if (context != NULL ) zmq_ctx_destroy(context); - if (IS_LOGGER){ + if (IS_LOGGER){ // Print to log file if (fp==NULL){ // Open log file @@ -1012,14 +1041,18 @@ void report_and_exit(const char* msg, void *socket){ fprintf(fp, "File closing\n"); fclose(fp); - } else { + } else { // Print to Rcout (recommend using logfile for makeCluster) + FILE *fp; + char filename[20]; + + snprintf(filename, 20 , "slave_error_%d.log", locationRef); Rcout << "[R proc id: " << locationRef << "] CLIENT ERROR: " << msg << "\n"; Rcout << "ERROR INFO - pid:" << getpid() << ", ppid: " << getppid() << ",sid: " << getsid(getpid()) << "\n"; Rcout << "ERROR INFO - Errno:" << errno << "\n"; + Rcout << "ERROR INFO - Output file: " << filename << "\n"; - FILE *fp; - fp = fopen("error.log","w"); + fp = fopen(filename,"w"); fprintf(fp, "[R proc id: %d] CLIENT ERROR: %s\n", locationRef, msg); fprintf(fp, " ERROR INFO - pid: %d , ppid: %d, sid: %d\n", getpid(), getppid(), getsid(getpid()) ); fprintf(fp, "ERROR INFO - Errno: %d\n", errno); @@ -1030,17 +1063,11 @@ void report_and_exit(const char* msg, void *socket){ } -// Attempt to throw error to parent process -//void report_and_exit(const char* msg, void *socket){ -// // Cleanup sockets and zmq -// if (socket!=NULL){ zmq_close(socket); } -// if (context!=NULL){ zmq_ctx_destroy(context); } -// -// //pid_t ppid = getppid(); -// kill(0, SIGTERM); -//} - -void fupdate(FILE *fp, const char* msg){ +// @name fupdate_server +// @description Write message to server log file +// @param fp File pointer to log file +// @param msg Message to write to log file +void fupdate_server(FILE *fp, const char* msg){ if (fp!=NULL){ fprintf(fp, "%s\n", msg); } else { @@ -1090,6 +1117,7 @@ RcppExport int set_maxUsedLocationRef_client(int nprocs) { } //' print_errnos +//' @description Print error numbers relating to zmq sockets //' @return R_NilValue // [[Rcpp::export]] RcppExport SEXP print_errnos() { @@ -1154,6 +1182,7 @@ RcppExport SEXP get_regionRef_array_master(const int nprocs){ } //' get_regionRef_array_slave +//' @description Requests regionRef array from logger proc //' @param num_funcs Total number of functions in R namespace //' @return R_NilValue // [[Rcpp::export]]