Skip to content

Commit

Permalink
Merge pull request #398 from leondavi/api_load_indicator
Browse files Browse the repository at this point in the history
Using zlib to compress data
  • Loading branch information
leondavi authored Dec 1, 2024
2 parents 0916ba0 + 8409569 commit ea3df3c
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"connectionsMap":
{
"r1":["mainServer", "c1", "s1", "r2"],
"r2":["s2", "r3"],
"r3":["s3","r4"],
"r4":["s4", "r1"]
}
}
146 changes: 146 additions & 0 deletions inputJsonsFiles/DistributedConfig/dc_paper_test_5d_4r_4s_1c_1w.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
{
"nerlnetSettings": {
"frequency": "100",
"batchSize": "50"
},
"mainServer": {
"port": "8080",
"args": ""
},
"apiServer": {
"port": "8081",
"args": ""
},
"devices": [
{
"name": "NerlNist-MS",
"ipv4": "10.0.0.46",
"entities": "mainServer,apiServer,c1"
},
{
"name": "NerlNist-1",
"ipv4": "10.0.0.36",
"entities": "s1,r1"
},
{
"name": "NerlNist-2",
"ipv4": "10.0.0.37",
"entities": "s2,r2"
},
{
"name": "NerlNist-4",
"ipv4": "10.0.0.40",
"entities": "s3,r3"
},
{
"name": "NerlNist-7",
"ipv4": "10.0.0.42",
"entities": "s4,r4"
}
],
"routers": [
{
"name": "r1",
"port": "8090",
"policy": "0"
},
{
"name": "r2",
"port": "8091",
"policy": "0"
},
{
"name": "r3",
"port": "8092",
"policy": "0"
},
{
"name": "r4",
"port": "8093",
"policy": "0"
}
],
"sources": [
{
"name": "s1",
"port": "8086",
"frequency": "30",
"policy": "0",
"epochs": "1",
"type": "0"
},
{
"name": "s2",
"port": "8087",
"frequency": "30",
"policy": "0",
"epochs": "1",
"type": "0"
},
{
"name": "s3",
"port": "8088",
"frequency": "30",
"policy": "0",
"epochs": "1",
"type": "0"
},
{
"name": "s4",
"port": "8089",
"frequency": "30",
"policy": "0",
"epochs": "1",
"type": "0"
}
],
"clients": [
{
"name": "c1",
"port": "8082",
"workers": "w1"
}
],
"workers": [
{
"name": "w1",
"model_sha": "9c5f1261068be7be96487a2cae282aa22e8c1cb482a5bf8d557bc8e1e2b6fef0"
}
],
"model_sha": {
"9c5f1261068be7be96487a2cae282aa22e8c1cb482a5bf8d557bc8e1e2b6fef0": {
"modelType": "0",
"_doc_modelType": " nn:0 | approximation:1 | classification:2 | forecasting:3 | image_classification:4 | text_classification:5 | text_generation:6 | auto_association:7 | autoencoder:8 | ae_classifier:9 |",
"modelArgs": "",
"_doc_modelArgs": "Extra arguments to model",
"layersSizes": "28x28x1k5x5x1x6p0s1t1,28x28x6k2x2p0s2,14x14x6k4x4x6x12p0s1t0,1,32,10",
"_doc_layersSizes": "List of postive integers [L0, L1, ..., LN]",
"layerTypesList": "2,4,2,9,3,5",
"_doc_LayerTypes": " Default:0 | Scaling:1 | Conv:2 | Perceptron:3 | Pooling:4 | Probabilistic:5 | LSTM:6 | Reccurrent:7 | Unscaling:8 | Flatten:9 | Bounding:10 |",
"layers_functions": "6,2,6,1,6,4",
"_doc_layers_functions_activation": " Threshold:1 | Sign:2 | Logistic:3 | Tanh:4 | Linear:5 | ReLU:6 | eLU:7 | SeLU:8 | Soft-plus:9 | Soft-sign:10 | Hard-sigmoid:11 |",
"_doc_layer_functions_pooling": " none:1 | Max:2 | Avg:3 |",
"_doc_layer_functions_probabilistic": " Binary:1 | Logistic:2 | Competitive:3 | Softmax:4 |",
"_doc_layer_functions_scaler": " none:1 | MinMax:2 | MeanStd:3 | STD:4 | Log:5 |",
"lossMethod": "6",
"lossArgs": "",
"_doc_lossMethod": " SSE:1 | MSE:2 | NSE:3 | MinkowskiE:4 | WSE:5 | CEE:6 |",
"lr": "0.01",
"_doc_lr": "Positve float",
"epochs": "1",
"_doc_epochs": "Positve Integer",
"optimizer": "5",
"_doc_optimizer": " GD:0 | CGD:1 | SGD:2 | QuasiNeuton:3 | LVM:4 | ADAM:5 |",
"optimizerArgs": "none",
"_doc_optimizerArgs": "String",
"infraType": "0",
"_doc_infraType": " opennn:0 | wolfengine:1 |",
"distributedSystemType": "0",
"_doc_distributedSystemType": " none:0 | FedClientAvg:1 | FedServerAvg:2 | FedClientWeightedAvgClassification:3 | FedServerWeightedAvgClassification:4 | FedClientAE:5 | FedServerAE:6 | tiles:7 |",
"distributedSystemArgs": "none",
"_doc_distributedSystemArgs": "String",
"distributedSystemToken": "none",
"_doc_distributedSystemToken": "Token that associates distributed group of workers and parameter-server"
}
}
}
70 changes: 70 additions & 0 deletions inputJsonsFiles/experimentsFlow/exp_paper_test_5d_4r_4s_1c_1w.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{
"experimentName": "mnist_rr",
"experimentType": "classification",
"batchSize": 50,
"csvFilePath": "/tmp/nerlnet/data/NerlnetData-master/nerlnet/mnist_norm/mnist_train_255_norm.csv",
"numOfFeatures": "784",
"numOfLabels": "10",
"headersNames": "0,1,2,3,4,5,6,7,8,9",
"Phases":
[
{
"phaseName": "training_phase",
"phaseType": "training",
"sourcePieces":
[
{
"sourceName": "s1",
"startingSample": "0",
"numOfBatches": "50",
"workers": "w1",
"nerltensorType": "float"
},
{
"sourceName": "s2",
"startingSample": "10000",
"numOfBatches": "50",
"workers": "w1",
"nerltensorType": "float"
},
{
"sourceName": "s3",
"startingSample": "20000",
"numOfBatches": "50",
"workers": "w1",
"nerltensorType": "float"
},
{
"sourceName": "s4",
"startingSample": "30000",
"numOfBatches": "50",
"workers": "w1",
"nerltensorType": "float"
}
]
},
{
"phaseName": "prediction_phase",
"phaseType": "prediction",
"sourcePieces":
[
{
"sourceName": "s3",
"startingSample": "40000",
"numOfBatches": "50",
"workers": "w1",
"nerltensorType": "float"
},
{
"sourceName": "s4",
"startingSample": "50000",
"numOfBatches": "50",
"workers": "w1",
"nerltensorType": "float"
}
]
}
]
}


8 changes: 3 additions & 5 deletions src_erl/NerlnetApp/src/MainServer/initHandler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@ init(Req0, [Main_genServer_Pid]) ->

%Bindings also can be accessed as once, giving a map of all bindings of Req0:
{_,Body,_} = cowboy_req:read_body(Req0, #{length => ?DATA_LEN}), %read up to X MB (default was 8MB)
Decoded_body = binary_to_list(Body),
[Index, TotalSources, SourceName, WorkersStr, Phase, NumOfBatches, NerlTensorType, Data] = string:split(Decoded_body, "#", all),
DecodedBody = binary_to_list(zlib:uncompress(Body)),
[Index, TotalSources, SourceName, WorkersStr, Phase, NumOfBatches, NerlTensorType, Data] = string:split(DecodedBody, "#", all),
gen_server:cast(Main_genServer_Pid,{initCSV, Index, TotalSources, SourceName, WorkersStr, Phase, NumOfBatches, NerlTensorType, Data}),
%[Source|WorkersAndInput] = re:split(binary_to_list(Body), "#", [{return, list}]),
%{Workers,SourceData} = getWorkerInput(WorkersAndInput,[]),

Reply = io_lib:format("Body Received: ~p, Decoded Body = ~p ~n State:~p~n", [Body,Decoded_body, Main_genServer_Pid]),

Req = cowboy_req:reply(200,
#{<<"content-type">> => <<"text/plain">>},
Reply,
"OK",
Req0),
{ok, Req, Main_genServer_Pid}.

Expand Down
4 changes: 2 additions & 2 deletions src_erl/NerlnetApp/src/MainServer/mainGenserver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ handle_cast({initCSV, _Index, TotalSources, SourceName, WorkersList, Phase, NumO
{TotalSourcesInt, _Rest} = string:to_integer(TotalSources),
% MessageBody = WorkersList ++ "#" ++ NumOfBatches ++ "#" ++ NerlTensorType ++ "#" ++ Data,
WorkersListSeperated = string:split(WorkersList, ",", all),
MessageBody = {WorkersListSeperated, Phase, NumOfBatches, NerlTensorType, Data},
MessageBody = {WorkersListSeperated, Phase, NumOfBatches, NerlTensorType, zlib:compress(list_to_binary(Data))},
nerl_tools:http_router_request(RouterHost,RouterPort, [SourceName], ActionStr, MessageBody), % update the source with its data
UpdatedSourceWaitingList = SourcesWaitingList++[list_to_atom(SourceName)],
{SourcesDataReadyCtr, NewTotalSources} =
Expand Down Expand Up @@ -241,7 +241,7 @@ handle_cast({sourceDone,Body}, State = #main_genserver_state{myName = MyName, so
update_clients_phase(PhaseAtom, MyName),
ListOfClients = ets:lookup_element(get(main_server_ets), clients_names_list, ?DATA_IDX),
stats:increment_messages_sent(StatsEts),
NextState = State#main_genserver_state{state = idle, sourcesCastingList = UpdatedSourcesCastingList, clientsWaitingList = ListOfClients};
NextState = State#main_genserver_state{state = idle, sourcesCastingList = UpdatedSourcesCastingList, clientsWaitingList = ListOfClients, total_sources = 0};
_ -> NextState = State#main_genserver_state{state = casting, sourcesCastingList = UpdatedSourcesCastingList}
end,
{noreply, NextState};
Expand Down
3 changes: 2 additions & 1 deletion src_erl/NerlnetApp/src/Source/castingHandler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ init(Req0, [Action,Source_StateM_Pid]) ->
case Action of
csv -> {_ , Body} = nerl_tools:read_all_data(Req0 , <<>>),
{WorkersList, Phase, NumOfBatches, NerlTensorType, Data} = binary_to_term(Body),
gen_statem:cast(Source_StateM_Pid, {batchList, WorkersList, list_to_atom(Phase), list_to_integer(NumOfBatches), NerlTensorType , Data});
UncompressedData = binary_to_list(zlib:uncompress(Data)),
gen_statem:cast(Source_StateM_Pid, {batchList, WorkersList, list_to_atom(Phase), list_to_integer(NumOfBatches), NerlTensorType , UncompressedData});
startCasting -> {_,Body,_} = cowboy_req:read_body(Req0),
gen_statem:cast(Source_StateM_Pid, {startCasting,Body});
statistics -> gen_statem:cast(Source_StateM_Pid, {statistics});
Expand Down
1 change: 1 addition & 0 deletions src_py/apiServer/apiServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def send_data_to_sources(self, csv_dataset: CsvDataSet, experiment_phase: Experi
for source_piece_inst in sources_pieces_list:
source_generated_csv_path = csv_dataset.generate_source_piece_ds_csv_file(source_piece_inst, experiment_phase.get_phase_type())
source_files_to_send.append(source_generated_csv_path)
LOG_INFO("Done generating source pieces")

events_sync_inst.set_event_wait(EventSync.UPDATE_CSV)
self.transmitter.update_csv(source_files_to_send, sources_pieces_list)
Expand Down
30 changes: 15 additions & 15 deletions src_py/apiServer/hf_repo_ids.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,35 @@
"name": "ForestCover",
"description": "Dataset for AEC"
},
{
"id": "Nerlnet/MNist_sorted",
"idx": 4,
"name": "Tiles_Mnist_sorted",
"description": "Mnist tiled,sorted by parts - tiled"
},
{
"id": "Nerlnet/KDD14",
"idx": 5,
"idx": 4,
"name": "KDD14",
"description": "KDD14 Anomaly Detection Dataset"
},
{
"id": "Nerlnet/forest_cover_tiles",
"idx": 6,
"name": "mnist_normalized",
"description": "MNist Dataset for CNN experiments - 255 normalized"
},
{
"id": "Nerlnet/cifar10",
"idx": 7,
"idx": 5,
"name": "cifar10",
"description": "cifar10"
},
{
"id": "Nerlnet/cifar10_tiles",
"idx": 8,
"idx": 6,
"name": "cifar10_tiles",
"description": "cifar10 tiles"
},
{
"id": "Nerlnet/Mnist_normalized",
"idx": 7,
"name": "mnist_norm",
"description": "mnist_normalized"
},
{
"id": "Nerlnet/mnist_tiles",
"idx": 8,
"name": "mnist_tiles",
"description": "mnist_tiles"
}
]
}
8 changes: 6 additions & 2 deletions src_py/apiServer/transmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import globalVars as globe
import sys
import os
import zlib
from definitions import *
from logger import *
from experiment_flow import *
Expand Down Expand Up @@ -79,16 +80,19 @@ def update_csv(self, csv_files: list, source_pieces: list):
with open(csv_file, 'r') as file:
csvfile = file.read()
data_str = f'{index + 1}#{total_sources}#{source_name}#{target_workers}#{phase_type}#{num_of_batches}#{nerltensor_type}#{csvfile}'
data_zip = zlib.compress(data_str.encode())
try:
response = requests.post(self.updateCSVAddress, data = data_str)
if not response.ok:
response = requests.post(self.updateCSVAddress, data = data_zip)
if not response.ok: # If Code =/= 200
LOG_ERROR(f"Failed to update {csv_file} to Main Server")
except ConnectionRefusedError:
LOG_ERROR(f"Connection Refused Error: failed to connect to {self.updateCSVAddress}")
raise ConnectionRefusedError
except ConnectionError:
LOG_ERROR(f"Connection Error: failed to connect to {self.updateCSVAddress}")
raise ConnectionError
LOG_INFO(f'{((index+1)/total_sources)*100:.2f}% Sent')
LOG_INFO(f'Data Transmission To Sources Is Completed!')

def start_casting(self, experiment_phase : ExperimentPhase):
dataStr = f"{experiment_phase.get_sources_str_list()}"
Expand Down

0 comments on commit ea3df3c

Please sign in to comment.