Skip to content

Commit

Permalink
[ApiServerDB] JsonReceivedAck
Browse files Browse the repository at this point in the history
  • Loading branch information
GuyPerets106 committed Feb 18, 2024
1 parent ef0c6b5 commit 29a063e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 12 deletions.
2 changes: 2 additions & 0 deletions src_erl/NerlnetApp/src/Init/jsonHandler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

%%%%%% Getting files in multipart format.
init(Req0, [ApplicationPid]) ->
io:format("@JsonHAndler: Got ~p~n" , [cowboy_req:parse_header(<<"content-type">>, Req0)]),
case cowboy_req:parse_header(<<"content-type">>, Req0) of
{<<"multipart">>, <<"form-data">>, _} ->
nerl_tools:deleteOldJson(?JSON_ADDR++?LOCAL_DC_FILE_NAME),
nerl_tools:deleteOldJson(?JSON_ADDR++?LOCAL_COMM_FILE_NAME),
%% get files from Req
% io:format("parsing json of req with body: ~p~n",[cowboy_req:read_body(Req0)]),
{_Req, Data} = nerl_tools:multipart(Req0, []), % multipart also save data to file %% Data = [FileName1, FileName2]
io:format("@JsonHandler: got here~n"),
ApplicationPid ! {jsonAddress,{lists:nth(1, Data),lists:nth(2, Data)}};
_Other ->
{ok,Body,_} = cowboy_req:read_body(Req0), %% shouldn't be here, files expected
Expand Down
3 changes: 2 additions & 1 deletion src_erl/NerlnetApp/src/MainServer/ackHandler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ init(Req0, [Who,Main_genserver_Pid]) ->
case Who of
dataReady -> gen_server:cast(Main_genserver_Pid, {sourceAckDataReady,Body}); %% when source data is ready
sourceDone -> gen_server:cast(Main_genserver_Pid, {sourceDone,Body}); %% when source finished casting
clientAck -> gen_server:cast(Main_genserver_Pid, {clientAck,Body}) %% when client received message (new state)
clientAck -> gen_server:cast(Main_genserver_Pid, {clientAck,Body}); %% when client received message (new state)
jsonReceived -> gen_server:cast(Main_genserver_Pid, {jsonReceived,Body}) %% when other devices got the json and ready to start
end,
Reply = io_lib:format("Body Received: ~p ~n ", [Body]),
Req = cowboy_req:reply(200,
Expand Down
27 changes: 26 additions & 1 deletion src_erl/NerlnetApp/src/MainServer/mainGenserver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ handle_call(_Call, _From, State) ->
{ok, State :: #main_genserver_state{}} | {ok, State :: #main_genserver_state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).

init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph}) ->
init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph , DeviceName}) ->
nerl_tools:setup_logger(?MODULE),
inets:start(),
MyNameStr = atom_to_list(MyName),
Expand All @@ -61,6 +61,7 @@ init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph}) ->
MainServerEts = ets:new(main_server_ets , [set]),
put(main_server_ets, MainServerEts),
put(nerlnet_graph, NerlnetGraph),
put(device_name, DeviceName),
EtsStats = ets:new(stats , [set]),
put(etsStats, EtsStats), %% All entities including mainServer ets tables statistics
Entities = [digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:vertices(NerlnetGraph)--[?API_SERVER_ATOM]],
Expand All @@ -71,6 +72,7 @@ init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph}) ->
ets:insert(MainServerEts , {workers_map , WorkersMap}),
ets:insert(MainServerEts , {clients_names_list , ClientsNames}),
ets:insert(MainServerEts , {counter_received_stats, 0}),
ets:insert(MainServerEts , {json_received_counter, 0}),
% Getting the router that main server is connected with
{MyRouterHost,MyRouterPort} = nerl_tools:getShortPath(MyName,hd(ClientsNames),get(nerlnet_graph)),
ets:insert(MainServerEts, {my_router,{MyRouterHost,MyRouterPort}}),
Expand All @@ -88,6 +90,29 @@ handle_cast({initCSV, _SourceName ,_SourceData}, State) ->
?LOG_ERROR("initCSV is only applicalble when main server is in idle state!"),
{noreply, State#main_genserver_state{}};

handle_cast({jsonReceived,Body}, State = #main_genserver_state{}) ->
StatsEts = get_entity_stats_ets(?MAIN_SERVER_ATOM),
stats:increment_messages_received(StatsEts),
{DeviceName , TotalNumberOfDevices} = binary_to_term(Body),
case TotalNumberOfDevices of
0 -> ack(atom_to_list(received_jsons_done)); %% if the exeperiment runs on a single device
_ -> ok
end,
MainServerDeviceName = get(device_name),
case DeviceName of
MainServerDeviceName -> ?LOG_NOTICE("Device ~p received the json files and is ready to start", [DeviceName]);
_OtherDevice ->
?LOG_NOTICE("Device ~p received the json files and is ready to start", [DeviceName]),
ets:update_counter(get(main_server_ets), json_received_counter, 1),
NumOfDevicesReady = ets:lookup_element(get(main_server_ets), json_received_counter, ?DATA_IDX),
if NumOfDevicesReady == TotalNumberOfDevices ->
ack(atom_to_list(received_jsons_done));
true -> ok
end
end,
stats:increment_messages_sent(StatsEts),
{noreply, State#main_genserver_state{}};

handle_cast({clientsPhaseUpdate , Phase}, State = #main_genserver_state{myName = MyName}) ->
?LOG_INFO("Received clientsPhaseUpdate message with phase ~p",[binary_to_list(Phase)]),
StatsEts = get_entity_stats_ets(?MAIN_SERVER_ATOM),
Expand Down
33 changes: 23 additions & 10 deletions src_erl/NerlnetApp/src/nerlnetApp_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ start(_StartType, _StartArgs) ->

waitForInit() ->
receive
{jsonAddress, MSG} -> {_ArchitectureAdderess,_CommunicationMapAdderess} = MSG; % TODO GUY this is the case for main server which spread the message using http direct requests to devices
{jsonAddress, MSG} -> io:format("@NERLNET_APP got here~n"),{_ArchitectureAdderess,_CommunicationMapAdderess} = MSG; % TODO GUY this is the case for main server which spread the message using http direct requests to devices
Other -> ?LOG_WARNING(?LOG_HEADER++"Got bad message: ~p,~ncontinue listening for init Json~n",[Other]), waitForInit()
after ?PYTHON_SERVER_WAITING_TIMEOUT_MS -> waitForInit()
end.
Expand Down Expand Up @@ -167,15 +167,27 @@ parseJsonAndStartNerlnet(ThisDeviceIP) ->
createSources(BatchSize, DefaultFrequency, ThisDeviceIP), % TODO extract all of this args from ETS

HostOfMainServer = ets:member(nerlnet_data, mainServer),
DevicesListWithoutMainServerDevice = [], % TODO extract this devices list
createMainServer(HostOfMainServer,BatchSize,ThisDeviceIP),
ThisDeviceName = maps:get(ThisDeviceIP , ets:lookup_element(nerlnet_data , ipv4_to_devices , ?DATA_IDX)),
DevicesMap = ets:lookup_element(nerlnet_data, devices_map , ?DATA_IDX), % format of key value pairs: DeviceName => {Host,Port}
DevicesListWithoutMainServerDevice = maps:to_list(maps:remove(ThisDeviceName, DevicesMap)), %% Form: [{device_atom , {IP,Port}} , ..]
createMainServer(HostOfMainServer,BatchSize,ThisDeviceIP , ThisDeviceName),
if
HostOfMainServer -> send_jsons_to_other_devices(DCJsonFileBytes, CommunicationMapFileBytes, DevicesListWithoutMainServerDevice);
true -> skip % rest of devices should send ready to main server
end.
HostOfMainServer ->
send_jsons_to_other_devices(DCJsonFileBytes, CommunicationMapFileBytes, DevicesListWithoutMainServerDevice);
true -> ok % Other devices get here and notify the main server they're ready
end,
NerlnetGraph = ets:lookup_element(nerlnet_data, communicationGraph, ?DATA_IDX),
{?MAIN_SERVER_ATOM , {MainServerIP , MainServerPort , _MainServerDeviceName}} = digraph:vertex(NerlnetGraph, ?MAIN_SERVER_ATOM),
URL = "http://" ++ MainServerIP ++ ":" ++ integer_to_list(MainServerPort) ++ "/jsonReceived",
httpc:request(post , {URL , [] , "application/x-www-form-urlencoded" , term_to_binary({ThisDeviceName , length(DevicesListWithoutMainServerDevice)})}, [], []).

send_jsons_to_other_devices(_DCJsonFileBytes, _CommunicationMapFileBytes, []) -> ?LOG_INFO("This experiment is running on a single device!",[]);
send_jsons_to_other_devices(DCJsonFileBytes, CommunicationMapFileBytes, DevicesList) ->
send_with_direct_http_cowboy_request_to_all_devices.
Fun = fun({DeviceName, {Host, Port}}) ->
?LOG_INFO("Sending jsons to ~p",[DeviceName]),
{ok, _} = httpc:request(post, {Host ++ ":" ++ integer_to_list(Port) ++ "/sendJsons", [], "application/json", term_to_binary({DCJsonFileBytes , CommunicationMapFileBytes})}, [], [])
end,
lists:foreach(Fun, DevicesList).

%% internal functions
port_validator(Port, EntityName) ->
Expand Down Expand Up @@ -279,8 +291,8 @@ createRouters(MapOfRouters, HostName) ->
end,
maps:foreach(Func, MapOfRouters). % iterates as key/values

createMainServer(false,_BatchSize,_HostName) -> none;
createMainServer(true,BatchSize,HostName) ->
createMainServer(false,_BatchSize,_HostName,_DeviceName) -> none;
createMainServer(true,BatchSize,HostName,DeviceName) ->
Name = mainServer,
{Port, _Args} = ets:lookup_element(nerlnet_data, mainServer, ?DATA_IDX),
port_validator(Port, Name),
Expand All @@ -291,12 +303,13 @@ createMainServer(true,BatchSize,HostName) ->
NerlnetGraph = ets:lookup_element(nerlnet_data, communicationGraph, ?DATA_IDX),
%%Create a gen_Server for maintaining Database for Main Server.
%% all http requests will be handled by Cowboy which updates main_genserver if necessary.
MainGenServer_Args= {Name,ClientsNames,BatchSize,WorkersMap,NerlnetGraph}, %%TODO change from mainserverport to routerport . also make this a list of client
MainGenServer_Args= {Name,ClientsNames,BatchSize,WorkersMap,NerlnetGraph,DeviceName}, %%TODO change from mainserverport to routerport . also make this a list of client
MainGenServerPid = mainGenserver:start_link(MainGenServer_Args),

MainServerDispatcher = cowboy_router:compile([
{'_', [
%Nerlnet actions
{"/jsonReceived",[],ackHandler,[jsonReceived,MainGenServerPid]},
{"/updateCSV",[],initHandler,[MainGenServerPid]},
{"/lossFunction",[],actionHandler,[lossFunction,MainGenServerPid]},
{"/predictRes",[],actionHandler,[predictRes,MainGenServerPid]},
Expand Down

0 comments on commit 29a063e

Please sign in to comment.