diff --git a/DESCRIPTION b/DESCRIPTION index 4980600..fcec84f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -13,4 +13,4 @@ Roxygen: list(markdown = TRUE) RoxygenNote: 7.2.3 URL: https://github.com/DylanKierans/rTrace BugReports: https://github.com/DylanKierans/rTrace/issues -SystemRequirements: otf-trace, libotf-trace-dev, zeromq +SystemRequirements: otf-trace, libotf-trace-dev, zeromq, papi, perf diff --git a/R/rTrace.R b/R/rTrace.R index a53a860..ccd6a9a 100644 --- a/R/rTrace.R +++ b/R/rTrace.R @@ -45,45 +45,63 @@ library("Rcpp") #' \item{PRINT_INSTRUMENTS}{} #' \item{PRINT_SKIPS}{} #' \item{PRINT_FUNC_INDEXES}{} +#' \item{COLLECT_METRICS}{} +#' \item{RTRACE_VARS}{} #' } #' pkg.env <- new.env(parent = emptyenv()) ### SECTION - Init section for instrumentation ### # @name INSTRUMENTATION_INIT -# @description Checked when instrumenting functions to ensure init() has been called +# @description Boolean - Checked when instrumenting functions to ensure init() has been called pkg.env$INSTRUMENTATION_INIT <- FALSE # @name INSTRUMENTATION_ENABLED -# @description Current status of instrumentation +# @description Boolean - Current status of instrumentation pkg.env$INSTRUMENTATION_ENABLED <- FALSE # @name INSTRUMENTATION_STATUS_SAVED -# @description Saved status of instrumentation +# @description Boolean - Saved status of instrumentation pkg.env$INSTRUMENTATION_STATUS_SAVED <- FALSE ### SECTION - Instrument Flags ### # @name MAX_FUNCTION_DEPTH -# @description Max depth of functions to creat instrumententation events for +# @description int - Max depth of functions to creat instrumententation events for pkg.env$MAX_FUNCTION_DEPTH <- 10 # @name FUNCTION_DEPTH -# @description Current instrumentation depth +# @description int - Current instrumentation depth pkg.env$FUNCTION_DEPTH <- 0 # @name UNLOCK_ENVS -# @description Keep package envs unlocked when instrumenting functions +# @description Boolean - Keep package envs unlocked when instrumenting functions pkg.env$UNLOCK_ENVS <- TRUE # Not sure if this is safe to set TRUE, but should be quicker! ### SECTION - Output Flags ### # @name PRINT_INSTRUMENTS -# @description Print which functions are being instrumented +# @description Boolean - Print which functions are being instrumented pkg.env$PRINT_INSTRUMENTS <- FALSE # @name PRINT_SKIPS -# @description Print which functions are being skipped due to exception +# @description Boolean - Print which functions are being skipped due to exception pkg.env$PRINT_SKIPS <- FALSE # @name PRINT_FUNC_INDEXES -# @description Print function indexes when called (only intended for verbose debugging) +# @description Boolean - Print function indexes when called (only intended for verbose debugging) pkg.env$PRINT_FUNC_INDEXES <- FALSE + + +# @name COLLECT_METRICS +# @description Boolean - Enable collection of Hardware Performance Metrics (HWPC) +pkg.env$COLLECT_METRICS <- FALSE + + +# @name RTRACE_VARS +# @description List of all global rTrace variables +pkg.env$RTRACE_VARS <- c( "INSTRUMENTATION_INIT", "INSTRUMENTATION_ENABLED", + "INSTRUMENTATION_STATUS_SAVED", "MAX_FUNCTION_DEPTH", + "FUNCTION_DEPTH", "UNLOCK_ENVS", "PRINT_INSTRUMENTS", + "PRINT_SKIPS", "PRINT_FUNC_INDEXES", + "COLLECT_METRICS" + ) + diff --git a/R/r_fork_functions.R b/R/r_fork_functions.R index da4b8b1..214549d 100644 --- a/R/r_fork_functions.R +++ b/R/r_fork_functions.R @@ -397,17 +397,12 @@ master_init_slave <- function(cl) { parallel::clusterEvalQ(cl, eval(parse(text = pkg_cmd))) # Export rTrace variables - vars <- c( "INSTRUMENTATION_INIT", "INSTRUMENTATION_ENABLED", - "INSTRUMENTATION_STATUS_SAVED", "MAX_FUNCTION_DEPTH", - "FUNCTION_DEPTH", "UNLOCK_ENVS", "PRINT_INSTRUMENTS", - "PRINT_SKIPS", "PRINT_FUNC_INDEXES" - ) - parallel::clusterExport(cl, c("vars"), envir=environment()) - parallel::clusterExport(cl, vars, envir=pkg.env) + parallel::clusterExport(cl, c("RTRACE_VARS"), envir=environment()) + parallel::clusterExport(cl, RTRACE_VARS, envir=pkg.env) parallel::clusterEvalQ(cl, { FUNCTION_DEPTH <- 0 # Reset function depth unlock_envs("rTrace") - for(n in vars) { assign(n, get(n, .GlobalEnv), pkg.env) } + for(n in RTRACE_VARS) { assign(n, get(n, .GlobalEnv), pkg.env) } lock_envs("rTrace") }) diff --git a/R/r_instrument_hl.R b/R/r_instrument_hl.R index b5e602c..283ab55 100644 --- a/R/r_instrument_hl.R +++ b/R/r_instrument_hl.R @@ -54,18 +54,27 @@ is_instrumentation_enabled <- function() { } +## YOU ARE HERE #' instrumentation_init #' @description Create otf2 objs for instrumentation, and initiate global vars #' @param flag_user_functions Boolean - TRUE to include user functions in dataframe -#' @param verbose_wrapping Boolean - Print info about skipping or instrumenting each function. Produces large amount of info to stdout +#' @param collect_metrics Boolean - Enable papi/perf metric collection with instrumentation +#' @param verbose_wrapping Boolean - Print info about skipping or instrumenting each function. Produces large amount of info to stdout. Intended for developers #' @export -instrumentation_init <- function(flag_user_functions=T, verbose_wrapping=F) +instrumentation_init <- function(flag_user_functions=T, collect_metrics=F, verbose_wrapping=F) { ## Update package vars pkg.env$PRINT_INSTRUMENTS <- verbose_wrapping pkg.env$PRINT_SKIPS <- verbose_wrapping pkg.env$INSTRUMENTATION_INIT <- TRUE + ### YOU ARE HERE + ## Interface to pmpmeas + if (collect_metrics){ + pkg.env$COLLECT_METRICS <- TRUE + pmpmeas_init() + } + ## Initiate new proc - close R if not Master ret <- init_otf2_logger(parallelly::availableCores()) # Master R proc returns 0 if (ret != 0){ quit(save="no"); } # Unintended fork R proc for otf2 logger @@ -147,4 +156,4 @@ instrumentation_wrapper <- function(func, ...) instrumentation_finalize() ret -} \ No newline at end of file +} diff --git a/src/Makevars b/src/Makevars index 9f8fde7..79cfb95 100644 --- a/src/Makevars +++ b/src/Makevars @@ -6,5 +6,9 @@ OTF2_PKG_LIBS=-L${OTF2_ROOT}/lib -lotf2 ZEROMQ_PKG_CPPFLAGS=-I${ZEROMQ_ROOT}/include ZEROMQ_PKG_LIBS=-L${ZEROMQ_ROOT}/lib -lzmq -PKG_CPPFLAGS=-I. ${OTF2_PKG_CPPFLAGS} ${ZEROMQ_PKG_CPPFLAGS} -PKG_LIBS=-L. ${OTF2_PKG_LIBS} ${ZEROMQ_PKG_LIBS} +# pmpmeas - papi +PMPMEAS_PKG_CPPFLAGS=`pkg-config --cflags papi` +PMPMEAS_PKG_LIBS=`pkg-config --libs papi` + +PKG_CPPFLAGS=-I. ${OTF2_PKG_CPPFLAGS} ${ZEROMQ_PKG_CPPFLAGS} ${PMPMEAS_PKG_CPPFLAGS} +PKG_LIBS=-L. ${OTF2_PKG_LIBS} ${ZEROMQ_PKG_LIBS} ${PMPMMEAS_PKG_LIBS} diff --git a/src/rTrace.cpp b/src/rTrace.cpp index 141fa04..f762db5 100644 --- a/src/rTrace.cpp +++ b/src/rTrace.cpp @@ -1,7 +1,7 @@ // @file rTrace.cpp // @brief Rcpp functions for underlying otf2 library -// @date 2024-03-23 -// @version 0.02 +// @date 2024-07-02 +// @version 0.03 // @author D.Kierans (dylanki@kth.se) // @note Location ~= thread, LocationGroup ~= Process, SystemTree ~= Node // @note errno 98 indicates socket alreayd in use @@ -18,6 +18,10 @@ // @todo Test removal of regionRef on client end. // Would reduce synchronization time when spawning new procs, but may increase // evtWrite time on server due to random memory access +// +// @note Use pmpmeas_init to parse environment variables +// @note New object Meas for each meas_type, contains multiple metrics via _cnt +// Names and values stored in papi/perf interface (include/papiinf.hh) #include "Rcpp.h" #include @@ -32,6 +36,10 @@ #include #include +// PMPMEAS +#include "pmpmeas.h" +#include "meas.h" + //#define DEBUG /* Uncomment to enable verbose debug info */ //#define DUMMY_TIMESTEPS /* Uncomment for 1s timestep for each subsequent event call */ #define MAX_FUNCTION_NAME_LEN 40 // Max length of R function @@ -87,6 +95,12 @@ static OTF2_LocationRef maxLocationRef=0; ///* Cap for max number of R procs static OTF2_LocationRef maxUsedLocationRef=1; ///* Maximum number of used R procs 0 if child // [[Rcpp::export]] RcppExport int init_otf2_logger(int max_nprocs, Rcpp::String archivePath = "./rTrace", - Rcpp::String archiveName = "rTrace", bool flag_print_pids=false) + Rcpp::String archiveName = "rTrace", bool collect_metrics=false, + bool flag_print_pids=false) { // TODO: Verify this acts as intended to save child proc signal(SIGHUP, signal_hup_handler); + // YOU ARE HERE + // Set COLLECT_METRICS global on server and client before fork + if (collect_metrics){ + #ifndef _COLLECT_METRICS + Rcpp::stop("rTrace not build for metric collection. Rebuild or run with `collect_metrics=false`"); + #endif + pmpmeas_init(); + pmpmeas_read_init(&pmpmeas_vals, &pmpmeas_n); + } + COLLECT_METRICS = collect_metrics; + pid_t child_pid = fork(); if (child_pid == (pid_t) -1 ){ // ERROR report_and_exit("Forking logger process", NULL); @@ -240,12 +282,16 @@ RcppExport int init_otf2_logger(int max_nprocs, Rcpp::String archivePath = "./rT // Init zmq context context = zmq_ctx_new (); + if (COLLECT_METRICS){ + // Server creates metrics from pmpmeas objects + globalDefWriter_metrics_server(); + } + // Assign array for regionRefs of each func assign_regionRef_array_server(); fupdate_server(fp, "assign_regionRef_array_server complete\n"); - - // Server for logging GlobalDefWriter strings®ions + // Server listens for GlobalDefWriter strings®ions globalDefWriter_server(); fupdate_server(fp, "globalDefWriter_server complete\n"); @@ -268,6 +314,11 @@ RcppExport int init_otf2_logger(int max_nprocs, Rcpp::String archivePath = "./rT fupdate_server(fp, "COMPLETE!\n"); if (fp!=NULL){fclose(fp);} + // Clean up pmpmeas metric_array memory + if (COLLECT_METRICS){ + pmpmeas_read_finalize(); + } + return(1); // exit(0); } else { @@ -345,15 +396,15 @@ void assign_regionRef_array_server(){ // Bind socket to recv num_funcs regionRef_socket_server = zmq_socket(context, ZMQ_PULL); // ZMQ_PULL rc = zmq_bind(regionRef_socket_server, "tcp://*:5554"); - if (rc!=0){ report_and_exit("globalDefWriter_server regionRef_socket_server", regionRef_socket_server); } + if (rc!=0){ report_and_exit("assign_regionRef_array_server regionRef_socket_server", regionRef_socket_server); } // Assign regionRef array of length num_funcs zmq_ret = zmq_recv(regionRef_socket_server, &NUM_FUNCS, sizeof(NUM_FUNCS), 0); // ZMQ ID: 0 - if ( zmq_ret <= 0 ) { report_and_exit("globalDefWriter_server pull recv", regionRef_socket_server); } + if ( zmq_ret <= 0 ) { report_and_exit("assign_regionRef_array_server pull recv", regionRef_socket_server); } // Assign and reset all values to -1 for debugging regionRef_array = (OTF2_RegionRef*) malloc(NUM_FUNCS*sizeof(*regionRef_array)); - if (regionRef_array==NULL){ report_and_exit("globalDefWriter_server regionRef_array", regionRef_socket_server); } + if (regionRef_array==NULL){ report_and_exit("assign_regionRef_array_server regionRef_array", regionRef_socket_server); } for (int i=0; i::iterator m = pmpmeas_match_lst.begin(); m != pmpmeas_match_lst.end(); m++){ + switch( (*m)->_type() ) + { + case (MeasType::PAPI): + metricType = OTF2_METRIC_TYPE_PAPI; + break; + case (MeasType::PERF): + metricType = OTF2_METRIC_TYPE_PERF; + break; + default: // TIME or unrecognized + continue; + break; + } + + // Cycle through each metric of given type, and create metric + for ( int i=0; i<(*m)->ncnt(); i++ ){ + char *ename = (*m)->ename(i); + Rcpp::String stringEname(ename); + + stringRef_name = globalDefWriter_WriteString_server(stringEname); + + ret = OTF2_GlobalDefWriter_WriteMetricMember(globalDefWriter, + NUM_METRICS++ /* MetricMemberRef */, + stringRef_name, + 0 /*stringRef_description*/, + metricType, + OTF2_METRIC_ACCUMULATED_START /* placeholder */, + OTF2_TYPE_INT64, + OTF2_BASE_DECIMAL, + 0 /* exponent */, + 0 /*stringRef_unit*/); + + if (ret != OTF2_SUCCESS){ + report_and_exit("globalDefWriter_metrics_server WriteMetricMember"); + } + } + } + + OTF2_MetricMemberRef *metricMembers; + + metricMembers = (OTF2_MetricMemberRef*)malloc(NUM_METRICS*sizeof(*OTF2_MetricMemberRef)); + typeIDs = (OTF2_Type*)malloc(NUM_METRICS*sizeof(*typeIDs)); + + for (OTF2_MetricMemberRef i=0; i maxUsedLocationRef){ maxUsedLocationRef = buffer.regionRef; } + } else if (buffer.datatype == ZMQ_OTF2_SOCK_CLUSTER_ON){ // ZMQ ID: 5f nprocs = buffer.regionRef; if (get_regionRef_array_server(nprocs, new_proc_rep) != 0){ @@ -981,6 +1181,7 @@ void run_evtWriters_server(bool flag_log){ OTF2_EvtWriter_Enter( evt_writers[i], NULL /* attributeList */, buffer.time, slaveActive_regionRef /* region */ ); } + } else if (buffer.datatype == ZMQ_OTF2_SOCK_CLUSTER_OFF){ // ZMQ ID: 5g for (int i=1; i<=nprocs; ++i){ OTF2_EvtWriter_Leave( evt_writers[i], NULL /* attributeList */, @@ -988,11 +1189,13 @@ void run_evtWriters_server(bool flag_log){ } nprocs=1; } + } else if (zmq_ret > 0) { // Unknown datatype zmq_close(new_proc_rep); zmq_close(puller); report_and_exit("run_evtWriters_server puller unknown data", NULL); } + } snprintf(fp_buffer, 50, "(pid: %d) Finished listening for evtWriters\n", getpid()); @@ -1001,6 +1204,7 @@ void run_evtWriters_server(bool flag_log){ // Cleanup socket zmq_close(puller); zmq_close(new_proc_rep); + } // @name get_regionRef_array_server