From a2bb01acb5490fe763066ab057d807f030f51632 Mon Sep 17 00:00:00 2001 From: Holger Roth Date: Mon, 4 Mar 2024 11:29:38 -0500 Subject: [PATCH] add client executor launcher; upgrade flwr scripts to 1.7.0 versions --- .../flower/fedprox/flower_fedprox.ipynb | 379 +++++++++++------- .../app/config/config_fed_client.conf | 49 +-- .../jobs/flwr_cifar10/app/custom/client.py | 173 +++++--- .../app/custom/controller_launcher.py | 27 +- .../app/custom/executor_launcher.py | 89 ++++ .../jobs/flwr_cifar10/app/custom/server.py | 23 +- 6 files changed, 476 insertions(+), 264 deletions(-) create mode 100644 examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/executor_launcher.py diff --git a/examples/advanced/flower/fedprox/flower_fedprox.ipynb b/examples/advanced/flower/fedprox/flower_fedprox.ipynb index dc6dfb1dd7..c1062f1f34 100644 --- a/examples/advanced/flower/fedprox/flower_fedprox.ipynb +++ b/examples/advanced/flower/fedprox/flower_fedprox.ipynb @@ -10,7 +10,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "c8ee32ae-cee9-454e-9b68-3427a2b7e991", "metadata": {}, "outputs": [], @@ -20,10 +20,79 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "3479da05-40df-4d47-b942-825c10e03a8a", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "The following are the variables you can change in the template\n", + "\n", + "---------------------------------------------------------------------------------------------------------------------------------------\n", + " \n", + " job folder: ./jobs/_flwr_cifar10 \n", + " \n", + "---------------------------------------------------------------------------------------------------------------------------------------\n", + " file_name var_name value component \n", + "---------------------------------------------------------------------------------------------------------------------------------------\n", + " meta.conf app ['@ALL'] \n", + " meta.conf mandatory_clients [] \n", + " meta.conf min_clients 2 \n", + "\n", + " config_fed_client.conf app_config \n", + " config_fed_client.conf app_script client.py \n", + " config_fed_client.conf component_ids ['metric_relay'] ExternalConfigurator \n", + " config_fed_client.conf config_file_name client_api_config.json ExternalConfigurator \n", + " config_fed_client.conf evaluate_task_name evaluate \n", + " config_fed_client.conf event_type fed.analytix_log_stats MetricRelay \n", + " config_fed_client.conf external_execution_wait 5.0 \n", + " config_fed_client.conf fed_event True MetricRelay \n", + " config_fed_client.conf heartbeat_interval 5.0 MetricRelay \n", + " config_fed_client.conf heartbeat_timeout 30.0 MetricRelay \n", + " config_fed_client.conf last_result_transfer_timeout 300.0 \n", + " config_fed_client.conf launch_once True SubprocessLauncher \n", + " config_fed_client.conf mode PASSIVE CellPipe \n", + " config_fed_client.conf monitor_interval 0.01 \n", + " config_fed_client.conf params_exchange_format pytorch \n", + " config_fed_client.conf params_transfer_type DIFF \n", + " config_fed_client.conf pipe_channel_name metric MetricRelay \n", + " config_fed_client.conf read_interval 0.5 \n", + " config_fed_client.conf root_url {ROOT_URL} CellPipe \n", + " config_fed_client.conf script python3 custom/{app_script} {app_c SubprocessLauncher \n", + " config_fed_client.conf secure_mode {SECURE_MODE} CellPipe \n", + " config_fed_client.conf site_name {SITE_NAME} CellPipe \n", + " config_fed_client.conf token {JOB_ID} CellPipe \n", + " config_fed_client.conf train_with_evaluation True \n", + " config_fed_client.conf workers 4 \n", + " config_fed_client.conf workspace_dir {WORKSPACE} CellPipe \n", + "\n", + " config_fed_server.conf allow_empty_global_weights False ScatterAndGather \n", + " config_fed_server.conf best_global_model_file_name best_FL_global_model.pt PTFileModelPersistor \n", + " config_fed_server.conf events ['fed.analytix_log_stats'] TBAnalyticsReceiver \n", + " config_fed_server.conf expected_data_kind WEIGHT_DIFF InTimeAccumulateWeightedAggregator \n", + " config_fed_server.conf global_model_file_name FL_global_model.pt PTFileModelPersistor \n", + " config_fed_server.conf ignore_result_error False ScatterAndGather \n", + " config_fed_server.conf key_metric accuracy IntimeModelSelector \n", + " config_fed_server.conf min_clients 2 ScatterAndGather \n", + " config_fed_server.conf model_class_path net.Net \n", + " config_fed_server.conf negate_key_metric False IntimeModelSelector \n", + " config_fed_server.conf num_rounds 2 ScatterAndGather \n", + " config_fed_server.conf persist_every_n_rounds 1 ScatterAndGather \n", + " config_fed_server.conf snapshot_every_n_rounds 1 ScatterAndGather \n", + " config_fed_server.conf start_round 0 ScatterAndGather \n", + " config_fed_server.conf task_check_period 0.5 ScatterAndGather \n", + " config_fed_server.conf train_timeout 0 ScatterAndGather \n", + " config_fed_server.conf validation_metric_name initial_metrics IntimeModelSelector \n", + " config_fed_server.conf wait_time_after_min_received 0 ScatterAndGather \n", + " config_fed_server.conf weigh_by_local_iter False IntimeModelSelector \n", + "\n", + "---------------------------------------------------------------------------------------------------------------------------------------\n" + ] + } + ], "source": [ "app_script=\"client.py\"\n", "\n", @@ -32,7 +101,17 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 3, + "id": "88e86dad-ba2d-454a-9315-631691673ea4", + "metadata": {}, + "outputs": [], + "source": [ + "#!pip install flwr flwr_datasets" + ] + }, + { + "cell_type": "code", + "execution_count": null, "id": "9e23fe35-0a67-4a37-b22e-314e6898cd59", "metadata": {}, "outputs": [ @@ -40,23 +119,27 @@ "name": "stdout", "output_type": "stream", "text": [ - "2024-03-01 17:06:12,108 - SimulatorRunner - INFO - Create the Simulator Server.\n", - "2024-03-01 17:06:12,111 - CoreCell - INFO - server: creating listener on tcp://0:50099\n", - "2024-03-01 17:06:12,127 - CoreCell - INFO - server: created backbone external listener for tcp://0:50099\n", - "2024-03-01 17:06:12,128 - ConnectorManager - INFO - 169400: Try start_listener Listener resources: {'secure': False, 'host': 'localhost'}\n", - "2024-03-01 17:06:12,130 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connector [CH00002 PASSIVE tcp://0:34641] is starting\n", - "2024-03-01 17:06:12,632 - CoreCell - INFO - server: created backbone internal listener for tcp://localhost:34641\n", - "2024-03-01 17:06:12,635 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connector [CH00001 PASSIVE tcp://0:50099] is starting\n", - "2024-03-01 17:06:12,711 - nvflare.fuel.hci.server.hci - INFO - Starting Admin Server localhost on Port 45881\n", - "2024-03-01 17:06:12,712 - SimulatorRunner - INFO - Deploy the Apps.\n", - "2024-03-01 17:06:12,719 - SimulatorRunner - INFO - Create the simulate clients.\n", - "2024-03-01 17:06:12,723 - ClientManager - INFO - Client: New client site-1@192.168.1.203 joined. Sent token: ffd2053b-063f-4471-ae66-ca6204ae9131. Total clients: 1\n", - "2024-03-01 17:06:12,725 - FederatedClient - INFO - Successfully registered client:site-1 for project simulator_server. Token:ffd2053b-063f-4471-ae66-ca6204ae9131 SSID:\n", - "2024-03-01 17:06:12,726 - SimulatorRunner - INFO - Set the client status ready.\n", - "2024-03-01 17:06:12,727 - SimulatorRunner - INFO - Deploy and start the Server App.\n", - "2024-03-01 17:06:12,730 - Cell - INFO - Register blob CB for channel='server_command', topic='*'\n", - "2024-03-01 17:06:12,732 - Cell - INFO - Register blob CB for channel='aux_communication', topic='*'\n", - "2024-03-01 17:06:12,732 - ServerCommandAgent - INFO - ServerCommandAgent cell register_request_cb: server.simulate_job\n" + "2024-03-04 11:28:44,582 - SimulatorRunner - INFO - Create the Simulator Server.\n", + "2024-03-04 11:28:44,586 - CoreCell - INFO - server: creating listener on tcp://0:53189\n", + "2024-03-04 11:28:44,600 - CoreCell - INFO - server: created backbone external listener for tcp://0:53189\n", + "2024-03-04 11:28:44,601 - ConnectorManager - INFO - 393835: Try start_listener Listener resources: {'secure': False, 'host': 'localhost'}\n", + "2024-03-04 11:28:44,603 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connector [CH00002 PASSIVE tcp://0:60917] is starting\n", + "2024-03-04 11:28:45,105 - CoreCell - INFO - server: created backbone internal listener for tcp://localhost:60917\n", + "2024-03-04 11:28:45,108 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connector [CH00001 PASSIVE tcp://0:53189] is starting\n", + "2024-03-04 11:28:45,183 - nvflare.fuel.hci.server.hci - INFO - Starting Admin Server localhost on Port 33759\n", + "2024-03-04 11:28:45,184 - SimulatorRunner - INFO - Deploy the Apps.\n", + "2024-03-04 11:28:45,190 - SimulatorRunner - INFO - Create the simulate clients.\n", + "2024-03-04 11:28:45,195 - ClientManager - INFO - Client: New client site-1@192.168.1.203 joined. Sent token: baa784ed-a3ca-47fd-967e-faff6cc0f297. Total clients: 1\n", + "2024-03-04 11:28:45,196 - FederatedClient - INFO - Successfully registered client:site-1 for project simulator_server. Token:baa784ed-a3ca-47fd-967e-faff6cc0f297 SSID:\n", + "2024-03-04 11:28:45,198 - ClientManager - INFO - Client: New client site-2@192.168.1.203 joined. Sent token: 0c47ef75-6105-47dc-b3a3-850832e77540. Total clients: 2\n", + "2024-03-04 11:28:45,199 - FederatedClient - INFO - Successfully registered client:site-2 for project simulator_server. Token:0c47ef75-6105-47dc-b3a3-850832e77540 SSID:\n", + "2024-03-04 11:28:45,201 - ClientManager - INFO - Client: New client site-3@192.168.1.203 joined. Sent token: 6f2e5c05-f2d1-4f6c-9170-c249b383ed8f. Total clients: 3\n", + "2024-03-04 11:28:45,202 - FederatedClient - INFO - Successfully registered client:site-3 for project simulator_server. Token:6f2e5c05-f2d1-4f6c-9170-c249b383ed8f SSID:\n", + "2024-03-04 11:28:45,203 - SimulatorRunner - INFO - Set the client status ready.\n", + "2024-03-04 11:28:45,204 - SimulatorRunner - INFO - Deploy and start the Server App.\n", + "2024-03-04 11:28:45,206 - Cell - INFO - Register blob CB for channel='server_command', topic='*'\n", + "2024-03-04 11:28:45,208 - Cell - INFO - Register blob CB for channel='aux_communication', topic='*'\n", + "2024-03-04 11:28:45,210 - ServerCommandAgent - INFO - ServerCommandAgent cell register_request_cb: server.simulate_job\n" ] }, { @@ -71,133 +154,145 @@ "name": "stdout", "output_type": "stream", "text": [ - "2024-03-01 17:06:14,131 - ServerRunner - INFO - [identity=simulator_server, run=simulate_job]: Server runner starting ...\n", - "2024-03-01 17:06:14,133 - ServerRunner - INFO - [identity=simulator_server, run=simulate_job]: starting workflow controller_launcher () ...\n", - "2024-03-01 17:06:14,135 - ControllerLauncher - INFO - [identity=simulator_server, run=simulate_job, wf=controller_launcher]: Initializing ModelController workflow.\n", - "2024-03-01 17:06:14,138 - ServerRunner - INFO - [identity=simulator_server, run=simulate_job, wf=controller_launcher]: Workflow controller_launcher () started\n", - "2024-03-01 17:06:14,139 - ControllerLauncher - INFO - [identity=simulator_server, run=simulate_job, wf=controller_launcher]: Beginning model controller run.\n", - "2024-03-01 17:06:14,140 - ControllerLauncher - INFO - [identity=simulator_server, run=simulate_job, wf=controller_launcher]: Start Controller Launcher.\n", - "2024-03-01 17:06:14,670 - SubprocessLauncher - INFO - Run Server code...\n", - "2024-03-01 17:06:14,743 - SimulatorClientRunner - INFO - Start the clients run simulation.\n", - "2024-03-01 17:06:15,747 - SimulatorClientRunner - INFO - Simulate Run client: site-1 on GPU group: None\n", - "2024-03-01 17:06:16,851 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00004 127.0.0.1:50099 <= 127.0.0.1:58144] is created: PID: 169400\n", - "2024-03-01 17:06:16,786 - ClientTaskWorker - INFO - ClientTaskWorker started to run\n", - "2024-03-01 17:06:16,848 - CoreCell - INFO - site-1.simulate_job: created backbone external connector to tcp://localhost:50099\n", - "2024-03-01 17:06:16,849 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connector [CH00001 ACTIVE tcp://localhost:50099] is starting\n", - "2024-03-01 17:06:16,850 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00002 127.0.0.1:58144 => 127.0.0.1:50099] is created: PID: 169506\n", - "2024-03-01 17:06:18,207 - Cell - INFO - Register blob CB for channel='aux_communication', topic='*'\n", - "2024-03-01 17:06:18,729 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00005 127.0.0.1:50099 <= 127.0.0.1:42014] is created: PID: 169400\n", - "2024-03-01 17:06:18,713 - Cell - INFO - broadcast: channel='aux_communication', topic='__sync_runner__', targets=['server.simulate_job'], timeout=2.0\n", - "2024-03-01 17:06:18,726 - ClientRunner - INFO - [identity=site-1, run=simulate_job]: synced to Server Runner in 0.5141777992248535 seconds\n", - "2024-03-01 17:06:18,727 - CoreCell - INFO - site-1_simulate_job_passive: created backbone external connector to tcp://0:50099\n", - "2024-03-01 17:06:18,727 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connector [CH00002 ACTIVE tcp://0:50099] is starting\n", - "2024-03-01 17:06:18,728 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00004 127.0.0.1:42014 => 127.0.0.1:50099] is created: PID: 169506\n", - "2024-03-01 17:06:18,729 - CellPipe - INFO - registered CellPipe request CB for cell_pipe.metric\n", - "2024-03-01 17:06:18,731 - CellPipe - INFO - registered CellPipe request CB for cell_pipe.task\n", - "2024-03-01 17:06:18,736 - ClientRunner - INFO - [identity=site-1, run=simulate_job]: client runner started\n", - "2024-03-01 17:06:18,736 - ClientTaskWorker - INFO - Initialize ClientRunner for client: site-1\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "2024-03-01 17:06:48,781 - MetricRelay - INFO - metric pipe status changed to _PEER_GONE_\n", - "100.0%3-01 17:06:49,513 - SubprocessLauncher - INFO - \n", - "2024-03-01 17:06:51,680 - SubprocessLauncher - INFO - WARNING flwr 2024-03-01 17:06:51,680 | logger.py:118 |\n", - "2024-03-01 17:06:51,681 - SubprocessLauncher - INFO - DEPRECATED FEATURE: flwr.client.start_numpy_client() is deprecated.\n", - "2024-03-01 17:06:51,681 - SubprocessLauncher - INFO - \tInstead, use `flwr.client.start_client()` by ensuring you first call the `.to_client()` method as shown below:\n", - "2024-03-01 17:06:51,681 - SubprocessLauncher - INFO - \tflwr.client.start_client(\n", - "2024-03-01 17:06:51,681 - SubprocessLauncher - INFO - \t\tserver_address=':',\n", - "2024-03-01 17:06:51,681 - SubprocessLauncher - INFO - \t\tclient=FlowerClient().to_client(), # <-- where FlowerClient is of type flwr.client.NumPyClient object\n", - "2024-03-01 17:06:51,681 - SubprocessLauncher - INFO - \t)\n", - "2024-03-01 17:06:51,681 - SubprocessLauncher - INFO - \tUsing `start_numpy_client()` is deprecated.\n", - "2024-03-01 17:06:51,682 - SubprocessLauncher - INFO - \n", - "2024-03-01 17:06:51,682 - SubprocessLauncher - INFO - This is a deprecated feature. It will be removed\n", - "2024-03-01 17:06:51,682 - SubprocessLauncher - INFO - entirely in future versions of Flower.\n", - "2024-03-01 17:06:51,682 - SubprocessLauncher - INFO - \n", - "2024-03-01 17:06:51,686 - SubprocessLauncher - INFO - INFO flwr 2024-03-01 17:06:51,686 | grpc.py:52 | Opened insecure gRPC connection (no certificates were passed)\n", - "2024-03-01 17:06:51,688 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-01 17:06:51,688 | connection.py:55 | ChannelConnectivity.IDLE\n", - "2024-03-01 17:06:51,688 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-01 17:06:51,688 | connection.py:55 | ChannelConnectivity.CONNECTING\n", - "2024-03-01 17:06:51,691 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-01 17:06:51,691 | connection.py:220 | gRPC channel closed\n", - "2024-03-01 17:06:51,691 - SubprocessLauncher - INFO - Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz\n", - "2024-03-01 17:06:51,691 - SubprocessLauncher - INFO - Extracting ./data/cifar-10-python.tar.gz to ./data\n", - "2024-03-01 17:06:51,691 - SubprocessLauncher - INFO - Files already downloaded and verified\n", - "2024-03-01 17:06:51,691 - SubprocessLauncher - INFO - Traceback (most recent call last):\n", - "2024-03-01 17:06:51,691 - SubprocessLauncher - INFO - File \"/tmp/nvflare/flwr_cifar10/simulate_job/app_site-1/custom/client.py\", line 98, in \n", - "2024-03-01 17:06:51,691 - SubprocessLauncher - INFO - fl.client.start_numpy_client(server_address=\"0.0.0.0:8080\", client=FlowerClient(), insecure=True) # \"127.0.0.1:8080\"\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - File \"/media/hroth/NVIDIA/home_old/hroth/Code2/flower/baselines/fedprox/.venv_fedprox/lib/python3.10/site-packages/flwr/client/app.py\", line 500, in start_numpy_client\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - start_client(\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - File \"/media/hroth/NVIDIA/home_old/hroth/Code2/flower/baselines/fedprox/.venv_fedprox/lib/python3.10/site-packages/flwr/client/app.py\", line 248, in start_client\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - _start_client_internal(\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - File \"/media/hroth/NVIDIA/home_old/hroth/Code2/flower/baselines/fedprox/.venv_fedprox/lib/python3.10/site-packages/flwr/client/app.py\", line 361, in _start_client_internal\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - message = receive()\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - File \"/media/hroth/NVIDIA/home_old/hroth/Code2/flower/baselines/fedprox/.venv_fedprox/lib/python3.10/site-packages/flwr/client/grpc_client/connection.py\", line 132, in receive\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - proto = next(server_message_iterator)\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - File \"/media/hroth/NVIDIA/home_old/hroth/Code2/flower/baselines/fedprox/.venv_fedprox/lib/python3.10/site-packages/grpc/_channel.py\", line 426, in __next__\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - return self._next()\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - File \"/media/hroth/NVIDIA/home_old/hroth/Code2/flower/baselines/fedprox/.venv_fedprox/lib/python3.10/site-packages/grpc/_channel.py\", line 826, in _next\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - raise self\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - \tstatus = StatusCode.UNAVAILABLE\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - \tdetails = \"failed to connect to all addresses; last error: UNKNOWN: ipv4:0.0.0.0:8080: Failed to connect to remote host: Connection refused\"\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - \tdebug_error_string = \"UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:0.0.0.0:8080: Failed to connect to remote host: Connection refused {created_time:\"2024-03-01T17:06:51.689676657-05:00\", grpc_status:14}\"\n", - "2024-03-01 17:06:51,692 - SubprocessLauncher - INFO - >\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "2024-03-01 17:07:18,832 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00005 Not Connected] is closed PID: 169400\n", - "2024-03-01 17:07:18,829 - PTClientAPILauncherExecutor - INFO - pipe status changed to _PEER_GONE_\n", - "2024-03-01 17:07:18,832 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00004 Not Connected] is closed PID: 169506\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n", - "Running task ... [python3 custom/server.py ]\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "Process Process-20:\n", - "Traceback (most recent call last):\n", - " File \"/usr/lib/python3.10/multiprocessing/process.py\", line 314, in _bootstrap\n", - " self.run()\n", - " File \"/usr/lib/python3.10/multiprocessing/process.py\", line 108, in run\n", - " self._target(*self._args, **self._kwargs)\n", - " File \"/media/hroth/NVIDIA/home_old/hroth/Code2/flower/baselines/fedprox/.venv_fedprox/lib/python3.10/site-packages/nvflare/private/fed/app/simulator/simulator_runner.py\", line 383, in run_process\n", - " run_status = mpm.run(\n", - " File \"/media/hroth/NVIDIA/home_old/hroth/Code2/flower/baselines/fedprox/.venv_fedprox/lib/python3.10/site-packages/nvflare/fuel/f3/mpm.py\", line 154, in run\n", - " rc = main_func(**kwargs)\n", - " File \"/media/hroth/NVIDIA/home_old/hroth/Code2/flower/baselines/fedprox/.venv_fedprox/lib/python3.10/site-packages/nvflare/private/fed/app/simulator/simulator_runner.py\", line 430, in simulator_run_main\n", - " executor.shutdown()\n", - " File \"/usr/lib/python3.10/concurrent/futures/thread.py\", line 235, in shutdown\n", - " t.join()\n", - " File \"/usr/lib/python3.10/threading.py\", line 1096, in join\n", - " self._wait_for_tstate_lock()\n", - " File \"/usr/lib/python3.10/threading.py\", line 1116, in _wait_for_tstate_lock\n", - " if lock.acquire(block, timeout):\n", - "KeyboardInterrupt\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Simulator finished with run_status -9\n" + "2024-03-04 11:28:46,717 - ServerRunner - INFO - [identity=simulator_server, run=simulate_job]: Server runner starting ...\n", + "2024-03-04 11:28:46,720 - ServerRunner - INFO - [identity=simulator_server, run=simulate_job]: starting workflow controller_launcher () ...\n", + "2024-03-04 11:28:46,721 - ControllerLauncher - INFO - [identity=simulator_server, run=simulate_job, wf=controller_launcher]: Initializing ModelController workflow.\n", + "2024-03-04 11:28:46,723 - ServerRunner - INFO - [identity=simulator_server, run=simulate_job, wf=controller_launcher]: Workflow controller_launcher () started\n", + "2024-03-04 11:28:46,724 - ControllerLauncher - INFO - [identity=simulator_server, run=simulate_job, wf=controller_launcher]: Beginning model controller run.\n", + "2024-03-04 11:28:46,725 - ControllerLauncher - INFO - [identity=simulator_server, run=simulate_job, wf=controller_launcher]: Start Controller Launcher.\n", + "2024-03-04 11:28:47,213 - SimulatorClientRunner - INFO - Start the clients run simulation.\n", + "2024-03-04 11:28:47,300 - SubprocessLauncher - INFO - INFO flwr 2024-03-04 11:28:47,300 | app.py:163 | Starting Flower server, config: ServerConfig(num_rounds=100, round_timeout=None)\n", + "2024-03-04 11:28:47,310 - SubprocessLauncher - INFO - INFO flwr 2024-03-04 11:28:47,310 | app.py:176 | Flower ECE: gRPC server running (100 rounds), SSL is disabled\n", + "2024-03-04 11:28:47,312 - SubprocessLauncher - INFO - INFO flwr 2024-03-04 11:28:47,310 | server.py:89 | Initializing global parameters\n", + "2024-03-04 11:28:47,313 - SubprocessLauncher - INFO - INFO flwr 2024-03-04 11:28:47,310 | server.py:276 | Requesting initial parameters from one random client\n", + "2024-03-04 11:28:48,217 - SimulatorClientRunner - INFO - Simulate Run client: site-1 on GPU group: None\n", + "2024-03-04 11:28:48,220 - SimulatorClientRunner - INFO - Simulate Run client: site-2 on GPU group: None\n", + "2024-03-04 11:28:48,236 - SimulatorClientRunner - INFO - Simulate Run client: site-3 on GPU group: None\n", + "2024-03-04 11:28:49,327 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00006 127.0.0.1:53189 <= 127.0.0.1:44884] is created: PID: 393835\n", + "2024-03-04 11:28:49,330 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00007 127.0.0.1:53189 <= 127.0.0.1:44900] is created: PID: 393835\n", + "2024-03-04 11:28:49,339 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00008 127.0.0.1:53189 <= 127.0.0.1:44904] is created: PID: 393835\n", + "2024-03-04 11:28:49,261 - ClientTaskWorker - INFO - ClientTaskWorker started to run\n", + "2024-03-04 11:28:49,263 - ClientTaskWorker - INFO - ClientTaskWorker started to run\n", + "2024-03-04 11:28:49,271 - ClientTaskWorker - INFO - ClientTaskWorker started to run\n", + "2024-03-04 11:28:49,325 - CoreCell - INFO - site-2.simulate_job: created backbone external connector to tcp://localhost:53189\n", + "2024-03-04 11:28:49,325 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connector [CH00001 ACTIVE tcp://localhost:53189] is starting\n", + "2024-03-04 11:28:49,326 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00002 127.0.0.1:44884 => 127.0.0.1:53189] is created: PID: 394012\n", + "2024-03-04 11:28:49,329 - CoreCell - INFO - site-1.simulate_job: created backbone external connector to tcp://localhost:53189\n", + "2024-03-04 11:28:49,329 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connector [CH00001 ACTIVE tcp://localhost:53189] is starting\n", + "2024-03-04 11:28:49,330 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00002 127.0.0.1:44900 => 127.0.0.1:53189] is created: PID: 394009\n", + "2024-03-04 11:28:49,338 - CoreCell - INFO - site-3.simulate_job: created backbone external connector to tcp://localhost:53189\n", + "2024-03-04 11:28:49,338 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connector [CH00001 ACTIVE tcp://localhost:53189] is starting\n", + "2024-03-04 11:28:49,339 - nvflare.fuel.f3.sfm.conn_manager - INFO - Connection [CN00002 127.0.0.1:44904 => 127.0.0.1:53189] is created: PID: 394014\n", + "2024-03-04 11:28:50,864 - Cell - INFO - Register blob CB for channel='aux_communication', topic='*'\n", + "2024-03-04 11:28:50,864 - Cell - INFO - Register blob CB for channel='aux_communication', topic='*'\n", + "2024-03-04 11:28:50,887 - Cell - INFO - Register blob CB for channel='aux_communication', topic='*'\n", + "2024-03-04 11:28:51,369 - Cell - INFO - broadcast: channel='aux_communication', topic='__sync_runner__', targets=['server.simulate_job'], timeout=2.0\n", + "2024-03-04 11:28:51,370 - Cell - INFO - broadcast: channel='aux_communication', topic='__sync_runner__', targets=['server.simulate_job'], timeout=2.0\n", + "2024-03-04 11:28:51,382 - ClientRunner - INFO - [identity=site-1, run=simulate_job]: synced to Server Runner in 0.5141603946685791 seconds\n", + "2024-03-04 11:28:51,383 - ExecutorLauncher - INFO - [identity=site-1, run=simulate_job]: Start Executor Launcher.\n", + "2024-03-04 11:28:51,384 - ClientRunner - INFO - [identity=site-3, run=simulate_job]: synced to Server Runner in 0.5155651569366455 seconds\n", + "2024-03-04 11:28:51,384 - ExecutorLauncher - INFO - [identity=site-3, run=simulate_job]: Start Executor Launcher.\n", + "2024-03-04 11:28:51,392 - Cell - INFO - broadcast: channel='aux_communication', topic='__sync_runner__', targets=['server.simulate_job'], timeout=2.0\n", + "2024-03-04 11:28:51,399 - ClientRunner - INFO - [identity=site-2, run=simulate_job]: synced to Server Runner in 0.5077931880950928 seconds\n", + "2024-03-04 11:28:51,400 - ExecutorLauncher - INFO - [identity=site-2, run=simulate_job]: Start Executor Launcher.\n", + "2024-03-04 11:28:56,502 - SubprocessLauncher - INFO - INFO flwr 2024-03-04 11:28:56,502 | server.py:280 | Received initial parameters from one random client\n", + "2024-03-04 11:28:56,503 - SubprocessLauncher - INFO - INFO flwr 2024-03-04 11:28:56,502 | server.py:91 | Evaluating initial parameters\n", + "2024-03-04 11:28:56,504 - SubprocessLauncher - INFO - INFO flwr 2024-03-04 11:28:56,502 | server.py:104 | FL starting\n", + "2024-03-04 11:28:56,506 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:28:56,506 | server.py:222 | fit_round 1: strategy sampled 2 clients (out of 2)\n", + "2024-03-04 11:28:56,493 - SubprocessLauncher - INFO - INFO flwr 2024-03-04 11:28:56,493 | grpc.py:52 | Opened insecure gRPC connection (no certificates were passed)\n", + "2024-03-04 11:28:56,494 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:28:56,494 | connection.py:55 | ChannelConnectivity.IDLE\n", + "2024-03-04 11:28:56,496 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:28:56,496 | connection.py:55 | ChannelConnectivity.CONNECTING\n", + "2024-03-04 11:28:56,496 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:28:56,496 | connection.py:55 | ChannelConnectivity.READY\n", + "2024-03-04 11:28:56,503 - SubprocessLauncher - INFO - INFO flwr 2024-03-04 11:28:56,503 | grpc.py:52 | Opened insecure gRPC connection (no certificates were passed)\n", + "2024-03-04 11:28:56,505 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:28:56,504 | connection.py:55 | ChannelConnectivity.IDLE\n", + "2024-03-04 11:28:56,505 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:28:56,505 | connection.py:55 | ChannelConnectivity.CONNECTING\n", + "2024-03-04 11:28:56,506 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:28:56,506 | connection.py:55 | ChannelConnectivity.READY\n", + "2024-03-04 11:28:56,515 - SubprocessLauncher - INFO - START FLOWER CLIENT [node_id=2]\n", + "2024-03-04 11:28:56,517 - SubprocessLauncher - INFO - START FLOWER CLIENT [node_id=2]\n", + "Running ... [python3 custom/server.py ]\n", + "2024-03-04 11:28:56,730 - SubprocessLauncher - INFO - INFO flwr 2024-03-04 11:28:56,730 | grpc.py:52 | Opened insecure gRPC connection (no certificates were passed)\n", + "2024-03-04 11:28:56,731 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:28:56,731 | connection.py:55 | ChannelConnectivity.IDLE\n", + "2024-03-04 11:28:56,732 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:28:56,732 | connection.py:55 | ChannelConnectivity.CONNECTING\n", + "2024-03-04 11:28:56,733 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:28:56,732 | connection.py:55 | ChannelConnectivity.READY\n", + "2024-03-04 11:29:00,110 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:00,110 | server.py:236 | fit_round 1 received 2 results and 0 failures\n", + "2024-03-04 11:29:00,115 - SubprocessLauncher - INFO - WARNING flwr 2024-03-04 11:29:00,115 | fedavg.py:250 | No fit_metrics_aggregation_fn provided\n", + "2024-03-04 11:29:00,117 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:00,115 | server.py:173 | evaluate_round 1: strategy sampled 3 clients (out of 3)\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 118.12it/s]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 116.16it/s]\n", + "2024-03-04 11:29:00,143 - SubprocessLauncher - INFO - START FLOWER CLIENT [node_id=1]\n", + "2024-03-04 11:29:01,127 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:01,127 | server.py:187 | evaluate_round 1 received 3 results and 0 failures\n", + "2024-03-04 11:29:01,129 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:01,127 | server.py:222 | fit_round 2: strategy sampled 3 clients (out of 3)\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 125.51it/s]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 122.42it/s]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 106.83it/s]\n", + "2024-03-04 11:29:04,591 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:04,591 | server.py:236 | fit_round 2 received 3 results and 0 failures\n", + "2024-03-04 11:29:04,593 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:04,593 | server.py:173 | evaluate_round 2: strategy sampled 3 clients (out of 3)\n", + "Running ... [python3 custom/client.py ]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 127.10it/s]s]\n", + "Running ... [python3 custom/client.py ]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 122.68it/s]]\n", + "Running ... [python3 custom/client.py ]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 121.22it/s]\n", + "2024-03-04 11:29:05,378 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:05,378 | server.py:187 | evaluate_round 2 received 3 results and 0 failures\n", + "2024-03-04 11:29:05,379 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:05,378 | server.py:222 | fit_round 3: strategy sampled 3 clients (out of 3)\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 136.91it/s]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 136.91it/s]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 136.96it/s]\n", + "Running ... [python3 custom/server.py ]\n", + "2024-03-04 11:29:08,826 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:08,826 | server.py:236 | fit_round 3 received 3 results and 0 failures\n", + "2024-03-04 11:29:08,830 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:08,830 | server.py:173 | evaluate_round 3: strategy sampled 3 clients (out of 3)\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 122.74it/s]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 122.49it/s]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 121.17it/s]\n", + "2024-03-04 11:29:09,603 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:09,603 | server.py:187 | evaluate_round 3 received 3 results and 0 failures\n", + "2024-03-04 11:29:09,605 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:09,603 | server.py:222 | fit_round 4: strategy sampled 3 clients (out of 3)\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 140.08it/s]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 138.56it/s]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 138.07it/s]\n", + "2024-03-04 11:29:13,007 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:13,007 | server.py:236 | fit_round 4 received 3 results and 0 failures\n", + "2024-03-04 11:29:13,013 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:13,013 | server.py:173 | evaluate_round 4: strategy sampled 3 clients (out of 3)\n", + "Running ... [python3 custom/client.py ]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 127.84it/s]s]\n", + "Running ... [python3 custom/client.py ]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 124.49it/s]\n", + "Running ... [python3 custom/client.py ]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 123.10it/s]\n", + "2024-03-04 11:29:13,785 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:13,785 | server.py:187 | evaluate_round 4 received 3 results and 0 failures\n", + "2024-03-04 11:29:13,787 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:13,785 | server.py:222 | fit_round 5: strategy sampled 3 clients (out of 3)\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 138.08it/s]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 137.92it/s]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 138.98it/s]\n", + "Running ... [python3 custom/server.py ]\n", + "2024-03-04 11:29:17,181 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:17,180 | server.py:236 | fit_round 5 received 3 results and 0 failures\n", + "2024-03-04 11:29:17,185 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:17,185 | server.py:173 | evaluate_round 5: strategy sampled 3 clients (out of 3)\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 127.02it/s]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 124.27it/s]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 123.29it/s]\n", + "2024-03-04 11:29:17,971 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:17,970 | server.py:187 | evaluate_round 5 received 3 results and 0 failures\n", + "2024-03-04 11:29:17,972 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:17,971 | server.py:222 | fit_round 6: strategy sampled 3 clients (out of 3)\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 137.40it/s]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 137.06it/s]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 134.70it/s]\n", + "2024-03-04 11:29:21,364 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:21,363 | server.py:236 | fit_round 6 received 3 results and 0 failures\n", + "2024-03-04 11:29:21,367 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:21,367 | server.py:173 | evaluate_round 6: strategy sampled 3 clients (out of 3)\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 128.11it/s]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 123.80it/s]\n", + "Training: 100%|██████████| 417/417 [00:03<00:00, 123.35it/s]\n", + "2024-03-04 11:29:22,152 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:22,152 | server.py:187 | evaluate_round 6 received 3 results and 0 failures\n", + "2024-03-04 11:29:22,154 - SubprocessLauncher - INFO - DEBUG flwr 2024-03-04 11:29:22,152 | server.py:222 | fit_round 7: strategy sampled 3 clients (out of 3)\n", + "Running ... [python3 custom/client.py ]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 137.70it/s]\n", + "Running ... [python3 custom/client.py ]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 137.09it/s]\n", + "Running ... [python3 custom/client.py ]\n", + "Testing: 100%|██████████| 105/105 [00:00<00:00, 135.14it/s]\n" ] } ], "source": [ "from nvflare import SimulatorRunner \n", "\n", - "n_clients = 1\n", + "n_clients = 3\n", "\n", "simulator = SimulatorRunner(\n", " job_folder=f\"./jobs/flwr_cifar10\",\n", diff --git a/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/config/config_fed_client.conf b/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/config/config_fed_client.conf index c4d18548f2..c2d353fc48 100644 --- a/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/config/config_fed_client.conf +++ b/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/config/config_fed_client.conf @@ -8,14 +8,9 @@ "train" ] executor { - path = "nvflare.app_opt.pt.client_api_launcher_executor.PTClientAPILauncherExecutor" + path = "executor_launcher.ExecutorLauncher" args { launcher_id = "launcher" - pipe_id = "pipe" - heartbeat_timeout = 60 - params_exchange_format = "pytorch" - params_transfer_type = "DIFF" - train_with_evaluation = true } } } @@ -31,47 +26,5 @@ launch_once = true } } - { - id = "pipe" - path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" - args { - mode = "PASSIVE" - site_name = "{SITE_NAME}" - token = "{JOB_ID}" - root_url = "{ROOT_URL}" - secure_mode = "{SECURE_MODE}" - workspace_dir = "{WORKSPACE}" - } - } - { - id = "metrics_pipe" - path = "nvflare.fuel.utils.pipe.cell_pipe.CellPipe" - args { - mode = "PASSIVE" - site_name = "{SITE_NAME}" - token = "{JOB_ID}" - root_url = "{ROOT_URL}" - secure_mode = "{SECURE_MODE}" - workspace_dir = "{WORKSPACE}" - } - } - { - id = "metric_relay" - path = "nvflare.app_common.widgets.metric_relay.MetricRelay" - args { - pipe_id = "metrics_pipe" - event_type = "fed.analytix_log_stats" - read_interval = 0.1 - } - } - { - id = "config_preparer" - path = "nvflare.app_common.widgets.external_configurator.ExternalConfigurator" - args { - component_ids = [ - "metric_relay" - ] - } - } ] } diff --git a/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/client.py b/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/client.py index 5d698d7ed3..c47855d9c5 100644 --- a/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/client.py +++ b/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/client.py @@ -1,98 +1,141 @@ -from collections import OrderedDict +import argparse import warnings +from collections import OrderedDict import flwr as fl +from flwr_datasets import FederatedDataset +import numpy as np import torch import torch.nn as nn import torch.nn.functional as F -from torchvision.transforms import Compose, ToTensor, Normalize from torch.utils.data import DataLoader -from torchvision.datasets import CIFAR10 +from torchvision.transforms import Compose, Normalize, ToTensor +from tqdm import tqdm + # ############################################################################# -# Regular PyTorch pipeline: nn.Module, train, test, and DataLoader +# 1. Regular PyTorch pipeline: nn.Module, train, test, and DataLoader # ############################################################################# warnings.filterwarnings("ignore", category=UserWarning) DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + class Net(nn.Module): - """Model (simple CNN adapted from 'PyTorch: A 60 Minute Blitz')""" - - def __init__(self) -> None: - super(Net, self).__init__() - self.conv1 = nn.Conv2d(3, 6, 5) - self.pool = nn.MaxPool2d(2, 2) - self.conv2 = nn.Conv2d(6, 16, 5) - self.fc1 = nn.Linear(16 * 5 * 5, 120) - self.fc2 = nn.Linear(120, 84) - self.fc3 = nn.Linear(84, 10) - - def forward(self, x: torch.Tensor) -> torch.Tensor: - x = self.pool(F.relu(self.conv1(x))) - x = self.pool(F.relu(self.conv2(x))) - x = x.view(-1, 16 * 5 * 5) - x = F.relu(self.fc1(x)) - x = F.relu(self.fc2(x)) - return self.fc3(x) + """Model (simple CNN adapted from 'PyTorch: A 60 Minute Blitz')""" + + def __init__(self) -> None: + super(Net, self).__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + x = self.pool(F.relu(self.conv1(x))) + x = self.pool(F.relu(self.conv2(x))) + x = x.view(-1, 16 * 5 * 5) + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + return self.fc3(x) + def train(net, trainloader, epochs): - """Train the model on the training set.""" - criterion = torch.nn.CrossEntropyLoss() - optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9) - for _ in range(epochs): - for images, labels in trainloader: - print("train...") - optimizer.zero_grad() - criterion(net(images.to(DEVICE)), labels.to(DEVICE)).backward() - optimizer.step() + """Train the model on the training set.""" + criterion = torch.nn.CrossEntropyLoss() + optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9) + for _ in range(epochs): + for batch in tqdm(trainloader, "Training"): + images = batch["img"] + labels = batch["label"] + optimizer.zero_grad() + criterion(net(images.to(DEVICE)), labels.to(DEVICE)).backward() + optimizer.step() + def test(net, testloader): - """Validate the model on the test set.""" - criterion = torch.nn.CrossEntropyLoss() - correct, total, loss = 0, 0, 0.0 - with torch.no_grad(): - for images, labels in testloader: - outputs = net(images.to(DEVICE)) - loss += criterion(outputs, labels.to(DEVICE)).item() - total += labels.size(0) - correct += (torch.max(outputs.data, 1)[1] == labels).sum().item() - return loss / len(testloader.dataset), correct / total - -def load_data(): - """Load CIFAR-10 (training and test set).""" - trf = Compose([ToTensor(), Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) - trainset = CIFAR10("./data", train=True, download=True, transform=trf) - testset = CIFAR10("./data", train=False, download=True, transform=trf) - return DataLoader(trainset, batch_size=32, shuffle=True), DataLoader(testset) + """Validate the model on the test set.""" + criterion = torch.nn.CrossEntropyLoss() + correct, loss = 0, 0.0 + with torch.no_grad(): + for batch in tqdm(testloader, "Testing"): + images = batch["img"].to(DEVICE) + labels = batch["label"].to(DEVICE) + outputs = net(images) + loss += criterion(outputs, labels).item() + correct += (torch.max(outputs.data, 1)[1] == labels).sum().item() + accuracy = correct / len(testloader.dataset) + return loss, accuracy + + +def load_data(node_id): + """Load partition CIFAR10 data.""" + fds = FederatedDataset(dataset="cifar10", partitioners={"train": 3}) + partition = fds.load_partition(node_id) + # Divide data on each node: 80% train, 20% test + partition_train_test = partition.train_test_split(test_size=0.2) + pytorch_transforms = Compose( + [ToTensor(), Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] + ) + + def apply_transforms(batch): + """Apply transforms to the partition from FederatedDataset.""" + batch["img"] = [pytorch_transforms(img) for img in batch["img"]] + return batch + + partition_train_test = partition_train_test.with_transform(apply_transforms) + trainloader = DataLoader(partition_train_test["train"], batch_size=32, shuffle=True) + testloader = DataLoader(partition_train_test["test"], batch_size=32) + return trainloader, testloader + # ############################################################################# -# Federating the pipeline with Flower +# 2. Federation of the pipeline with Flower # ############################################################################# +# Get node id +#parser = argparse.ArgumentParser(description="Flower") +#parser.add_argument( +# "--node-id", +# choices=[0, 1, 2], +# required=True, +# type=int, +# help="Partition of the dataset divided into 3 iid partitions created artificially.", +#) +#node_id = parser.parse_args().node_id +node_id = np.random.randint(0,3) +print(f"START FLOWER CLIENT [node_id={node_id}]") + # Load model and data (simple CNN, CIFAR-10) net = Net().to(DEVICE) -trainloader, testloader = load_data() +trainloader, testloader = load_data(node_id=node_id) + # Define Flower client class FlowerClient(fl.client.NumPyClient): - def get_parameters(self, config): - return [val.cpu().numpy() for _, val in net.state_dict().items()] + def get_parameters(self, config): + return [val.cpu().numpy() for _, val in net.state_dict().items()] + + def set_parameters(self, parameters): + params_dict = zip(net.state_dict().keys(), parameters) + state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict}) + net.load_state_dict(state_dict, strict=True) - def set_parameters(self, parameters): - params_dict = zip(net.state_dict().keys(), parameters) - state_dict = OrderedDict({k: torch.tensor(v) for k, v in params_dict}) - net.load_state_dict(state_dict, strict=True) + def fit(self, parameters, config): + self.set_parameters(parameters) + train(net, trainloader, epochs=1) + return self.get_parameters(config={}), len(trainloader.dataset), {} - def fit(self, parameters, config): - self.set_parameters(parameters) - train(net, trainloader, epochs=1) - return self.get_parameters(config={}), len(trainloader.dataset), {} + def evaluate(self, parameters, config): + self.set_parameters(parameters) + loss, accuracy = test(net, testloader) + return loss, len(testloader.dataset), {"accuracy": accuracy} - def evaluate(self, parameters, config): - self.set_parameters(parameters) - loss, accuracy = test(net, testloader) - return float(loss), len(testloader.dataset), {"accuracy": float(accuracy)} # Start Flower client -fl.client.start_numpy_client(server_address="0.0.0.0:8080", client=FlowerClient(), insecure=True) # "127.0.0.1:8080" +fl.client.start_client( + server_address="127.0.0.1:8080", + client=FlowerClient().to_client(), +) diff --git a/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/controller_launcher.py b/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/controller_launcher.py index dcdaff565c..17a775ac20 100644 --- a/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/controller_launcher.py +++ b/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/controller_launcher.py @@ -18,6 +18,7 @@ from nvflare.app_common.abstract.launcher import Launcher, LauncherRunStatus from nvflare.apis.fl_context import FLContext from nvflare.apis.shareable import Shareable +from nvflare.app_common.app_constant import AppConstants, ValidateType from nvflare.fuel.utils.validation_utils import check_object_type @@ -40,9 +41,14 @@ class ControllerLauncher(ModelController): - def run(self) """ - def __init__(self, launcher_id): + def __init__(self, + launcher_id, + task_name=AppConstants.TASK_TRAIN + ): super().__init__() self._launcher_id = launcher_id + self._task_name = task_name + self.is_initialized = False def _init_launcher(self, fl_ctx: FLContext): engine = fl_ctx.get_engine() @@ -51,18 +57,27 @@ def _init_launcher(self, fl_ctx: FLContext): raise RuntimeError(f"Launcher can not be found using {self._launcher_id}") check_object_type(self._launcher_id, launcher, Launcher) self.launcher = launcher + self.is_initialized = True def run(self): self.info("Start Controller Launcher.") - self._init_launcher(self.fl_ctx) + if not self.is_initialized: + self._init_launcher(self.fl_ctx) - #self.launcher.launch_task("train", shareable=Shareable(), fl_ctx=self.fl_ctx, abort_signal=self.abort_signal) self.launcher.initialize(fl_ctx=self.fl_ctx) while True: - time.sleep(10) - print(f"Running task ... [{self.launcher._script}]") - + time.sleep(10.0) + run_status = self.launcher.check_run_status(task_name=self._task_name, fl_ctx=self.fl_ctx) + if run_status == LauncherRunStatus.RUNNING: + print(f"Running ... [{self.launcher._script}]") + elif run_status == LauncherRunStatus.COMPLETE_SUCCESS: + print("run success") + break + else: + print(f"run failed or not start: {run_status}") + break + self.launcher.finalize(fl_ctx=self.fl_ctx) self.info("Stop Controller Launcher.") diff --git a/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/executor_launcher.py b/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/executor_launcher.py new file mode 100644 index 0000000000..b208a2446c --- /dev/null +++ b/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/executor_launcher.py @@ -0,0 +1,89 @@ +# Copyright (c) 2021, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +from nvflare.apis.dxo import MetaKey, from_shareable +from nvflare.apis.event_type import EventType +from nvflare.apis.executor import Executor +from nvflare.apis.fl_constant import ReturnCode +from nvflare.apis.fl_context import FLContext +from nvflare.apis.shareable import Shareable, make_reply +from nvflare.apis.signal import Signal +from nvflare.app_common.abstract.learner_spec import Learner +from nvflare.app_common.app_constant import AppConstants, ValidateType +from nvflare.security.logging import secure_format_exception +from nvflare.app_common.workflows.model_controller import ModelController +from nvflare.app_common.abstract.launcher import Launcher, LauncherRunStatus +from nvflare.apis.fl_context import FLContext +from nvflare.apis.shareable import Shareable +from nvflare.fuel.utils.validation_utils import check_object_type + + +class ExecutorLauncher(Executor): + def __init__( + self, + launcher_id="launcher", + task_name=AppConstants.TASK_TRAIN + ): + """Key component to run learner on clients. + + Args: + learner_id (str): id of the learner object + train_task (str, optional): task name for train. Defaults to AppConstants.TASK_TRAIN. + submit_model_task (str, optional): task name for submit model. Defaults to AppConstants.TASK_SUBMIT_MODEL. + validate_task (str, optional): task name for validation. Defaults to AppConstants.TASK_VALIDATION. + """ + super().__init__() + self._launcher_id = launcher_id + self._task_name = task_name + self.is_initialized = False + + def _init_launcher(self, fl_ctx: FLContext): + engine = fl_ctx.get_engine() + launcher: Launcher = engine.get_component(self._launcher_id) + if launcher is None: + raise RuntimeError(f"Launcher can not be found using {self._launcher_id}") + check_object_type(self._launcher_id, launcher, Launcher) + self.launcher = launcher + self.is_initialized = True + + def handle_event(self, event_type: str, fl_ctx: FLContext): + if event_type == EventType.START_RUN: + if not self.is_initialized: + self._init_launcher(fl_ctx) + + self._launch_script(fl_ctx) + + def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) -> Shareable: + pass + + def _launch_script(self, fl_ctx: FLContext): + self.log_info(fl_ctx, "Start Executor Launcher.") + + self.launcher.initialize(fl_ctx=fl_ctx) + + while True: + time.sleep(10.0) + run_status = self.launcher.check_run_status(task_name=self._task_name, fl_ctx=fl_ctx) + if run_status == LauncherRunStatus.RUNNING: + print(f"Running ... [{self.launcher._script}]") + elif run_status == LauncherRunStatus.COMPLETE_SUCCESS: + print("run success") + break + else: + print(f"run failed or not start: {run_status}") + break + self.launcher.finalize(fl_ctx=fl_ctx) + self.log_info(fl_ctx, "Stop Executor Launcher.") diff --git a/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/server.py b/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/server.py index f739796ce6..93798b7dfc 100644 --- a/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/server.py +++ b/examples/advanced/flower/fedprox/jobs/flwr_cifar10/app/custom/server.py @@ -1,8 +1,25 @@ +from typing import List, Tuple + import flwr as fl +from flwr.common import Metrics + + +# Define metric aggregation function +def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics: + # Multiply accuracy of each client by number of examples used + accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics] + examples = [num_examples for num_examples, _ in metrics] + + # Aggregate and return custom metric (weighted average) + return {"accuracy": sum(accuracies) / sum(examples)} + + +# Define strategy +strategy = fl.server.strategy.FedAvg(evaluate_metrics_aggregation_fn=weighted_average) # Start Flower server -print("Running Server code...") fl.server.start_server( - server_address="0.0.0.0:8080", - config=fl.server.ServerConfig(num_rounds=3), + server_address="0.0.0.0:8080", + config=fl.server.ServerConfig(num_rounds=100), + strategy=strategy, )