Skip to content

Commit

Permalink
Update add configuration for sparklyr connection
Browse files Browse the repository at this point in the history
  • Loading branch information
Yo committed Apr 24, 2020
1 parent 6049d5a commit 6bf466c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
18 changes: 16 additions & 2 deletions src/config.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,28 @@
# ------------------------------------------------------------------------------

DEBUG = TRUE # to easily turn on/off the print statements
DISTRIBUTED = FALSE # to select either to run locally or on Gcloud Spark Server
DISTRIBUTED = TRUE # to select either to run locally or on Gcloud Spark Server

if(DISTRIBUTED) {
INPUT_FILENAME = "gs://rongcloud-bucket/data/ratings_sample.csv" # path to input data
OUTPUT_FILENAME = "gs://rongcloud-bucket/results/mean_ratings_sample.csv" # path to output data

SERVER_IP = "local" # IP of the Gcloud Spark Server
SPARK_LIB_PATH = "/usr/local/spark"
SPARK_LIB_PATH = "/usr/lib/spark" # "/usr/local/spark" in Ubuntu 16.04

SPARKLYR_SHELL_EXECUTOR_MEMORY = "10G" # TODO: Add description
SPARKLYR_SHELL_DRIVER_MEMORY = "10G" # TODO: Add description
SPARK_EXCUTOR_CORES = 8 # TODO: Add description
SPARK_EXCUTOR_MEMORY = "10" # TODO: Add description
SPARK_YARN_AM_CORES = 8 # TODO: Add description (16 available)
SPARK_YARN_AM_MEMORY = "10G" # TODO: Add description ("48G" available)
SPARK_EXCUTOR_INSTANCES = 2 # TODO: Add description
SPARK_DYNAMICALLOCATION_ENABLED = "false" # TODO: Add description
MAXIMIZERESOURCEALLOCATION = "true" # TODO: Add description
SPARK_DEFAULT_PARALLELISM = 16 # TODO: Add description

} else {
INPUT_FILENAME = "../data/ratings_sample.csv" # path to input data
OUTPUT_FILENAME = "../results/mean_ratings_sample.csv" # path to output data

}
21 changes: 19 additions & 2 deletions src/env.R
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
# ------------------------------------------------------------------------------
# Declaration of enverimental variables for main execution routine
# Setup of environment for execution of the main routine
#
# Author: Mario Loaiciga
# [email protected]
# ------------------------------------------------------------------------------

declare_libs_path <- function(spark_lib_path) {
declare_spark_lib_path <- function(spark_lib_path) {
# Declare path to the Spark library installation

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = spark_lib_path)
}
}

update_spark_connection_config <- function(sc_config) {
# Update the empty `sc_config` with user-defined configs

sc_config$'sparklyr.shell.executor-memory' <- SPARKLYR_SHELL_EXECUTOR_MEMORY
sc_config$'sparklyr.shell.driver-memory' <- SPARKLYR_SHELL_DRIVER_MEMORY
sc_config$spark.excutor.cores <- SPARK_EXCUTOR_CORES
sc_config$spark.excutor.memory <- SPARK_EXCUTOR_MEMORY
sc_config$spark.yarn.am.cores <- SPARK_YARN_AM_CORES
sc_config$spark.excutor.instances <- SPARK_EXCUTOR_INSTANCES
sc_config$spark.dynamicAllocation.enableds <- SPARK_DYNAMICALLOCATION_ENABLED
sc_config$maximizeResourceAllocation <- MAXIMIZERESOURCEALLOCATION
sc_config$spark.default.parallelism <- SPARK_DEFAULT_PARALLELISM

return (sc_config)

}
12 changes: 7 additions & 5 deletions src/main.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,22 @@ source("env.R")
source("subroutines.R")

if(DISTRIBUTED) {
declare_libs_path(SPARK_LIB_PATH)
declare_spark_lib_path(SPARK_LIB_PATH)
library(sparklyr)
sc <- spark_connect(master = SERVER_IP)
sc_config <-spark_config()
sc_config <- update_spark_connection_config(sc_config)
sc <- spark_connect(master="local", config=sc_config)
}

# Data loading
start_time <- Sys.time()
print("INFO - Loading data...")
if(DISTRIBUTED) {
ratings_df = load_distributed_csv(INPUT_FILENAME, sc, "ratings")

} else {
ratings_df = load_csv(INPUT_FILENAME)

}

# Data processing
Expand All @@ -41,7 +43,7 @@ if(DISTRIBUTED) {

} else {
export_csv(mean_ratings_df, OUTPUT_FILENAME)

}

end_time <- Sys.time()
Expand Down

0 comments on commit 6bf466c

Please sign in to comment.