diff --git a/src_erl/NerlnetApp/src/Init/jsonHandler.erl b/src_erl/NerlnetApp/src/Init/jsonHandler.erl index b870c346..b58d3137 100644 --- a/src_erl/NerlnetApp/src/Init/jsonHandler.erl +++ b/src_erl/NerlnetApp/src/Init/jsonHandler.erl @@ -19,6 +19,7 @@ %%%%%% Getting files in multipart format. init(Req0, [ApplicationPid]) -> + io:format("@JsonHAndler: Got ~p~n" , [cowboy_req:parse_header(<<"content-type">>, Req0)]), case cowboy_req:parse_header(<<"content-type">>, Req0) of {<<"multipart">>, <<"form-data">>, _} -> nerl_tools:deleteOldJson(?JSON_ADDR++?LOCAL_DC_FILE_NAME), @@ -26,6 +27,7 @@ init(Req0, [ApplicationPid]) -> %% get files from Req % io:format("parsing json of req with body: ~p~n",[cowboy_req:read_body(Req0)]), {_Req, Data} = nerl_tools:multipart(Req0, []), % multipart also save data to file %% Data = [FileName1, FileName2] + io:format("@JsonHandler: got here~n"), ApplicationPid ! {jsonAddress,{lists:nth(1, Data),lists:nth(2, Data)}}; _Other -> {ok,Body,_} = cowboy_req:read_body(Req0), %% shouldn't be here, files expected diff --git a/src_erl/NerlnetApp/src/MainServer/ackHandler.erl b/src_erl/NerlnetApp/src/MainServer/ackHandler.erl index ef842fa2..13e36f6a 100644 --- a/src_erl/NerlnetApp/src/MainServer/ackHandler.erl +++ b/src_erl/NerlnetApp/src/MainServer/ackHandler.erl @@ -20,7 +20,8 @@ init(Req0, [Who,Main_genserver_Pid]) -> case Who of dataReady -> gen_server:cast(Main_genserver_Pid, {sourceAckDataReady,Body}); %% when source data is ready sourceDone -> gen_server:cast(Main_genserver_Pid, {sourceDone,Body}); %% when source finished casting - clientAck -> gen_server:cast(Main_genserver_Pid, {clientAck,Body}) %% when client received message (new state) + clientAck -> gen_server:cast(Main_genserver_Pid, {clientAck,Body}); %% when client received message (new state) + jsonReceived -> gen_server:cast(Main_genserver_Pid, {jsonReceived,Body}) %% when other devices got the json and ready to start end, Reply = io_lib:format("Body Received: ~p ~n ", [Body]), Req = cowboy_req:reply(200, diff --git a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl index c688ac0b..49815f8d 100644 --- a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl +++ b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl @@ -50,7 +50,7 @@ handle_call(_Call, _From, State) -> {ok, State :: #main_genserver_state{}} | {ok, State :: #main_genserver_state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph}) -> +init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph , DeviceName}) -> nerl_tools:setup_logger(?MODULE), inets:start(), MyNameStr = atom_to_list(MyName), @@ -61,6 +61,7 @@ init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph}) -> MainServerEts = ets:new(main_server_ets , [set]), put(main_server_ets, MainServerEts), put(nerlnet_graph, NerlnetGraph), + put(device_name, DeviceName), EtsStats = ets:new(stats , [set]), put(etsStats, EtsStats), %% All entities including mainServer ets tables statistics Entities = [digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:vertices(NerlnetGraph)--[?API_SERVER_ATOM]], @@ -71,6 +72,7 @@ init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph}) -> ets:insert(MainServerEts , {workers_map , WorkersMap}), ets:insert(MainServerEts , {clients_names_list , ClientsNames}), ets:insert(MainServerEts , {counter_received_stats, 0}), + ets:insert(MainServerEts , {json_received_counter, 0}), % Getting the router that main server is connected with {MyRouterHost,MyRouterPort} = nerl_tools:getShortPath(MyName,hd(ClientsNames),get(nerlnet_graph)), ets:insert(MainServerEts, {my_router,{MyRouterHost,MyRouterPort}}), @@ -88,6 +90,29 @@ handle_cast({initCSV, _SourceName ,_SourceData}, State) -> ?LOG_ERROR("initCSV is only applicalble when main server is in idle state!"), {noreply, State#main_genserver_state{}}; +handle_cast({jsonReceived,Body}, State = #main_genserver_state{}) -> + StatsEts = get_entity_stats_ets(?MAIN_SERVER_ATOM), + stats:increment_messages_received(StatsEts), + {DeviceName , TotalNumberOfDevices} = binary_to_term(Body), + case TotalNumberOfDevices of + 0 -> ack(atom_to_list(received_jsons_done)); %% if the exeperiment runs on a single device + _ -> ok + end, + MainServerDeviceName = get(device_name), + case DeviceName of + MainServerDeviceName -> ?LOG_NOTICE("Device ~p received the json files and is ready to start", [DeviceName]); + _OtherDevice -> + ?LOG_NOTICE("Device ~p received the json files and is ready to start", [DeviceName]), + ets:update_counter(get(main_server_ets), json_received_counter, 1), + NumOfDevicesReady = ets:lookup_element(get(main_server_ets), json_received_counter, ?DATA_IDX), + if NumOfDevicesReady == TotalNumberOfDevices -> + ack(atom_to_list(received_jsons_done)); + true -> ok + end + end, + stats:increment_messages_sent(StatsEts), + {noreply, State#main_genserver_state{}}; + handle_cast({clientsPhaseUpdate , Phase}, State = #main_genserver_state{myName = MyName}) -> ?LOG_INFO("Received clientsPhaseUpdate message with phase ~p",[binary_to_list(Phase)]), StatsEts = get_entity_stats_ets(?MAIN_SERVER_ATOM), diff --git a/src_erl/NerlnetApp/src/nerlnetApp_app.erl b/src_erl/NerlnetApp/src/nerlnetApp_app.erl index 4251aa4f..5a8da4fc 100644 --- a/src_erl/NerlnetApp/src/nerlnetApp_app.erl +++ b/src_erl/NerlnetApp/src/nerlnetApp_app.erl @@ -107,7 +107,7 @@ start(_StartType, _StartArgs) -> waitForInit() -> receive - {jsonAddress, MSG} -> {_ArchitectureAdderess,_CommunicationMapAdderess} = MSG; % TODO GUY this is the case for main server which spread the message using http direct requests to devices + {jsonAddress, MSG} -> io:format("@NERLNET_APP got here~n"),{_ArchitectureAdderess,_CommunicationMapAdderess} = MSG; % TODO GUY this is the case for main server which spread the message using http direct requests to devices Other -> ?LOG_WARNING(?LOG_HEADER++"Got bad message: ~p,~ncontinue listening for init Json~n",[Other]), waitForInit() after ?PYTHON_SERVER_WAITING_TIMEOUT_MS -> waitForInit() end. @@ -167,15 +167,27 @@ parseJsonAndStartNerlnet(ThisDeviceIP) -> createSources(BatchSize, DefaultFrequency, ThisDeviceIP), % TODO extract all of this args from ETS HostOfMainServer = ets:member(nerlnet_data, mainServer), - DevicesListWithoutMainServerDevice = [], % TODO extract this devices list - createMainServer(HostOfMainServer,BatchSize,ThisDeviceIP), + ThisDeviceName = maps:get(ThisDeviceIP , ets:lookup_element(nerlnet_data , ipv4_to_devices , ?DATA_IDX)), + DevicesMap = ets:lookup_element(nerlnet_data, devices_map , ?DATA_IDX), % format of key value pairs: DeviceName => {Host,Port} + DevicesListWithoutMainServerDevice = maps:to_list(maps:remove(ThisDeviceName, DevicesMap)), %% Form: [{device_atom , {IP,Port}} , ..] + createMainServer(HostOfMainServer,BatchSize,ThisDeviceIP , ThisDeviceName), if - HostOfMainServer -> send_jsons_to_other_devices(DCJsonFileBytes, CommunicationMapFileBytes, DevicesListWithoutMainServerDevice); - true -> skip % rest of devices should send ready to main server - end. + HostOfMainServer -> + send_jsons_to_other_devices(DCJsonFileBytes, CommunicationMapFileBytes, DevicesListWithoutMainServerDevice); + true -> ok % Other devices get here and notify the main server they're ready + end, + NerlnetGraph = ets:lookup_element(nerlnet_data, communicationGraph, ?DATA_IDX), + {?MAIN_SERVER_ATOM , {MainServerIP , MainServerPort , _MainServerDeviceName}} = digraph:vertex(NerlnetGraph, ?MAIN_SERVER_ATOM), + URL = "http://" ++ MainServerIP ++ ":" ++ integer_to_list(MainServerPort) ++ "/jsonReceived", + httpc:request(post , {URL , [] , "application/x-www-form-urlencoded" , term_to_binary({ThisDeviceName , length(DevicesListWithoutMainServerDevice)})}, [], []). +send_jsons_to_other_devices(_DCJsonFileBytes, _CommunicationMapFileBytes, []) -> ?LOG_INFO("This experiment is running on a single device!",[]); send_jsons_to_other_devices(DCJsonFileBytes, CommunicationMapFileBytes, DevicesList) -> - send_with_direct_http_cowboy_request_to_all_devices. + Fun = fun({DeviceName, {Host, Port}}) -> + ?LOG_INFO("Sending jsons to ~p",[DeviceName]), + {ok, _} = httpc:request(post, {Host ++ ":" ++ integer_to_list(Port) ++ "/sendJsons", [], "application/json", term_to_binary({DCJsonFileBytes , CommunicationMapFileBytes})}, [], []) + end, + lists:foreach(Fun, DevicesList). %% internal functions port_validator(Port, EntityName) -> @@ -279,8 +291,8 @@ createRouters(MapOfRouters, HostName) -> end, maps:foreach(Func, MapOfRouters). % iterates as key/values -createMainServer(false,_BatchSize,_HostName) -> none; -createMainServer(true,BatchSize,HostName) -> +createMainServer(false,_BatchSize,_HostName,_DeviceName) -> none; +createMainServer(true,BatchSize,HostName,DeviceName) -> Name = mainServer, {Port, _Args} = ets:lookup_element(nerlnet_data, mainServer, ?DATA_IDX), port_validator(Port, Name), @@ -291,12 +303,13 @@ createMainServer(true,BatchSize,HostName) -> NerlnetGraph = ets:lookup_element(nerlnet_data, communicationGraph, ?DATA_IDX), %%Create a gen_Server for maintaining Database for Main Server. %% all http requests will be handled by Cowboy which updates main_genserver if necessary. - MainGenServer_Args= {Name,ClientsNames,BatchSize,WorkersMap,NerlnetGraph}, %%TODO change from mainserverport to routerport . also make this a list of client + MainGenServer_Args= {Name,ClientsNames,BatchSize,WorkersMap,NerlnetGraph,DeviceName}, %%TODO change from mainserverport to routerport . also make this a list of client MainGenServerPid = mainGenserver:start_link(MainGenServer_Args), MainServerDispatcher = cowboy_router:compile([ {'_', [ %Nerlnet actions + {"/jsonReceived",[],ackHandler,[jsonReceived,MainGenServerPid]}, {"/updateCSV",[],initHandler,[MainGenServerPid]}, {"/lossFunction",[],actionHandler,[lossFunction,MainGenServerPid]}, {"/predictRes",[],actionHandler,[predictRes,MainGenServerPid]},