From e3e98315ecae3de6295930821355cf34fcdb2575 Mon Sep 17 00:00:00 2001 From: Wietse van Gerwen <41104904+wvangerwen@users.noreply.github.com> Date: Mon, 10 Feb 2025 12:46:37 +0100 Subject: [PATCH] revert juan changes --- .../breaches/upload_files_ldo.py | 360 ++++++++---------- 1 file changed, 152 insertions(+), 208 deletions(-) diff --git a/hhnk_threedi_tools/breaches/upload_files_ldo.py b/hhnk_threedi_tools/breaches/upload_files_ldo.py index 1ce9b51..de22dd0 100644 --- a/hhnk_threedi_tools/breaches/upload_files_ldo.py +++ b/hhnk_threedi_tools/breaches/upload_files_ldo.py @@ -61,10 +61,10 @@ import requests from breaches import Breaches -LDO_API_URL = "https://www.overstromingsinformatie.nl/auth/" +LDO_API_URL = "https://www.overstromingsinformatie.nl/api/v1/" # Generate api key on de LDO_API_URL website. And place it in api_ldo_key.txt -LDO_API_KEY = Path("api_ldo_key.txt").read_text() +LDO_API_KEY = Path("api_ldo_key.txt").read_text("utf8") logger = hrt.logging.get_logger(__name__) @@ -88,203 +88,167 @@ # %% -class LDO_API_AUTH: - def __init__(self, url_auth, api_key, tenant=4): - self.url_auth = url_auth +class LDO_API: + def __init__(self, api_key=LDO_API_KEY, tenant=4, url=LDO_API_URL): + self.url = url self.api_key = api_key self.tenant = tenant # organisation, 4=hhnk. - self._token = None - self._refresh_token = None + # Authorisation goes through the auth endpoint + self.url_auth = self.url.replace("/api/", "/auth/") - @property # to get health url - def health(self): - return self.url_auth[:-5] + "/health/" # Health is not under v1. + self._refresh_token = None # set on calling self.access_token + self._access_token = None # Property - @property - def token(self): - """Token is required to get the refresh_token which is going to be used in in this website: - https://www.overstromingsinformatie.nl/api/v1/excel-imports?mode=create - That is different from LDO_API_URL - """ - if self._token is None: - token_url = self.url_auth + "v1/token/" - self._token = ( - requests.post( - url=token_url, - json={"tenant": self.tenant}, - auth=("__key__", self.api_key), - ) - ).json()["refresh"] - return self._token + # TODO FROM LDO_API_UPLOAD + self.headers_excel = { + "accept": "application/json", + "authorization": f"Bearer {self.access_token}", + } @property - def refresh_token(self): - """Get refresh token - If we do not use the refresh token, the api formo the website - "https://www.overstromingsinformatie.nl/api/v1/excel-imports?mode=create" - will not work. with out that refersh_token API does not work. - """ + def access_token(self): + """Get refresh token so we can interact with the api.""" if self._refresh_token is None: - url_refresh = self.url_auth + "v1/token/refresh/" - self._refresh_token = ( - requests.post( - url=url_refresh, - json={"refresh": self.token}, - auth=("__key__", self.api_key), - ) - ).json()["access"] - return self._refresh_token + r = requests.post( + url=self.url_auth + "token/", + json={"tenant": self.tenant}, + auth=("__key__", self.api_key), + timeout=5, + ).json() + self._refresh_token = r["refresh"] + self._access_token = r["access"] + return self._access_token def get_tenants(self): """Tenant / organisation which has an id and name. - Prints the tenants. The id can be use to get the token. + Prints the tenants. The id can be used to get the token. """ - tenant_url = self.url_auth + "v1/tenants/" - tenants = requests.get(url=tenant_url, auth=("__key__", self.api_key)).json() + tenant_url = self.url + "v1/tenants/" + tenants = requests.get(url=tenant_url, auth=("__key__", self.api_key), timeout=5).json() for tenant in tenants: logger.info(tenant) return tenants def test_api(self): - """Test api connection""" - response_health = requests.get(url=self.health) + """Test api connection without requiring authorisation""" + health_url = self.url[:-4] + "/health/" # Health is not under v1. + response_health = requests.get(url=health_url, timeout=5) assert response_health.status_code == 200 + def upload_excel(self, metadata_xlsx): + """Upload excel file of the scenario and retrieve id of the excel upload. + In case there is an error it will print the reason. + """ + url_excel_import = self.url + "excel-imports?mode=create" + with open(metadata_xlsx, "rb") as excel_files: + excel_files = { + "file": ( + metadata_xlsx.name, + excel_files, + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ) + } + excel_response = requests.post(url=url_excel_import, headers=self.headers_excel, files=excel_files) + response_json = json.loads(excel_response.content.decode("utf-8")) + if response_json.__contains__("message"): + msg = response_json["detail"][0]["msg"] + logger.error("The excel file has an error") + raise ValueError(msg) -# %% + else: + status = response_json["status"] + excel_id = response_json["id"] + scenario_id = response_json["scenario_ids"][0] + logger.info( + f"The excel file is {status}, and has been uploaded with excel_id: {excel_id}, scenario_id: {scenario_id}" + ) + return excel_id, scenario_id + + def upload_zip_file(self, zip_path, excel_id): + """Upload the zip file using the excel ID.""" + file_import_url = self.url + f"excel-imports/{excel_id}/files/{zip_path.name}/upload" + + logger.info(f"Uploading zip to {file_import_url}") + # Create link to upload zip file + response = requests.put(url=file_import_url, headers=self.headers_excel) + upload_url = response.json()["url"] + # Upload data using link + with open(zip_path, "rb") as data: + r = requests.put(upload_url, data=data) + logger.info(f"status code: {r.status_code}") + logger.info(f"reason: {r.reason}") + logger.info(f"Finished uploading {zip_path.name}") -class SelectFolder(hrt.Folder): - """ - An object to ease the accessibility, creation and checks of folders and - files that need to be uploaded to LDO. - Main Folder object - ├── dem.tif - ├── results_3di.nc - """ +# %% - def __init__(self, base, create=True): - self.path = base.path - self.scenario_name = base.name - self.zip_kb = None + +class LdoUploadFolder(hrt.Folder): + def __init__(self, base, scenario_results_path, create=True): + """ + Folder to store files that need to be uploaded to LDO. + This is a temporary dir that will be removed after a successful + upload + + Folder + ├── dem.tif + ├── results_3di.nc + ├── Folder.zip (zipped dem.tif + results_3di.nc) + + Parameters + ---------- + base : Union[str,Path] + output directory where upload files will be placed + scenario_results_path + directory where scenario results are stored. + """ super().__init__(base, create=create) - self.zipfile_location = None + self.scenario_results_path = scenario_results_path + self.zip_size = None + self.zip_path = None + + def _find_scenario_folder(self): + """Get Path to scenario results""" + scenario_paths = [j for i in self.scenario_results_path.glob("*/") for j in list(i.glob("*/"))] + for scenario_path in scenario_paths: + if scenario_path.name == self.name: + return scenario_path + raise FileNotFoundError(f"{self.name} not found in {self.scenario_results_path}") def copy_files(self): - # Folder location from where the scenarios are going to be copied - output_folder = r"E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024\output" - - def select_folder(output_folder, scenario_name): - scenario_paths = [j for i in Path(output_folder).glob("*/") for j in list(i.glob("*/"))] - for scenario_path in scenario_paths: - if scenario_path.name == scenario_name: - return scenario_path - - scenario_folder = select_folder(output_folder, self.scenario_name) + """Copy NetCDF and DEM to the upload folder""" + scenario_folder = self._find_scenario_folder() breach = Breaches(scenario_folder) - raster_compress_path = os.path.join(breach.wss.path, "dem_clip.tif") - netcdf_path = os.path.join(breach.netcdf.path, "results_3di.nc") + raster_compress_path = breach.wss.path.joinpath("dem_clip.tif") + netcdf_path = breach.netcdf.path.joinpath("results_3di.nc") - shutil.copy2(netcdf_path, self.path) shutil.copy2(raster_compress_path, self.path) + shutil.copy2(netcdf_path, self.path) - return print(f"Scenario {self.scenario_name} has been copy in the folder structure") + logger.info(f"Scenario {self.name} has been copied in the folder structure") - # Create the zip file to uploaded. def zip_files(self): - # Create name of the zip file - zip_name = self.scenario_name + ".zip" - zip_name = zip_name.replace(" ", "_") + """Zip files so they can be uploaded""" + zip_name = self.name.replace(" ", "_") + ".zip" # Set the zip file path - folder_structure_path = Path(os.path.join(self.path)) - self.zipfile_location = Path(os.path.join(self.path, zip_name)) + self.zip_path = self.path.joinpath(zip_name) # Zip the folder to be uploaded - with zipfile.ZipFile(self.zipfile_location, "w") as zipf: - # Walk through the folder and add files to the zip file - for root, dirs, files in os.walk(folder_structure_path): - for file in files: - if file != f"{zip_name}": - # root = r'E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024\ldo_structuur' - file_path = os.path.join(root, file) - arcname = os.path.relpath(file_path, folder_structure_path) - zipf.write( - file_path, - arcname=os.path.join(f"{self.scenario_name}", arcname), - ) - # Get zipfile size. - zp = zipfile.ZipFile(f"{self.zipfile_location}") - size = sum([zinfo.file_size for zinfo in zp.filelist]) - self.zip_kb = float(size) / 1000 # kB - print(f"zip file created with size {self.zip_kb} kb") - return self.zipfile_location - + with zipfile.ZipFile(self.zip_path, "w") as zipf: + for file in self.path.glob("*"): + if file.name != zip_name: + zipf.write(file, arcname=file.name) -# %% - - -class LDO_API_UPLOAD: - def __init__(self, metadata_folder_path, refresh_token, scenario_name): - self.metadata_folder_path = metadata_folder_path - self.metadata_file = Path(os.path.join(metadata_folder_path, scenario_name + ".xlsx")) - self.refresh_token = refresh_token - self.headers_excel = { - "accept": "application/json", - "authorization": f"Bearer {self.refresh_token}", - # 'content-type':'multiplart/form-data', - } - self.url_uploadfile = "https://www.overstromingsinformatie.nl/api/v1/excel-imports" - self.scenario_id = None - self.id_excel = None - - # UPLOAD EXCEL FILE OF THE SCENARIO and retrieve id of the excel upload. In case there is an - # error y will print the reason. - def upload_excel(self): - excel_import_url = self.url_uploadfile + "?mode=create" - excel_name = self.metadata_file.name - with open(self.metadata_file, "rb") as excel_files: - excel_files = { - "file": ( - f"{excel_name}", - excel_files, - "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", - ) - } - excel_response = requests.post(url=excel_import_url, headers=self.headers_excel, files=excel_files) - response_json = json.loads(excel_response.content.decode("utf-8")) - if response_json.__contains__("message"): - msg = response_json["detail"][0]["msg"] - print(f"The excel file has a error, reason {msg}") - - else: - status = response_json["status"] - self.id_excel = response_json["id"] - self.scenario_id = response_json["scenario_ids"][0] - print(f"The excel file is {status}, and has been uploaded with id_excel number: {self.id_excel}") - return response_json + self.zip_size = round(self.zip_path.stat().st_size / 1024, 0) # KB + logger.info(f"Zip {zip_name} created with size {self.zip_size / 1024} MB") + return self.zip_path - # Upload the zip file using the excel ID. - def upload_zip_files(self, zipfile_location): - # With this link the zip file is not going to be uploaded. - zip_name = zipfile_location.name - file_import_url = self.url_uploadfile + f"/{self.id_excel}/files/{zip_name}/upload" - # Create link to upload zip file - response = requests.put(url=file_import_url, headers=self.headers_excel) - upload_url = response.json()["url"] - - # upload data using link - with open(f"{zipfile_location}", "rb") as data: - r = requests.put(upload_url, data=data) - print(r.status_code) - print(r.reason) - print("uploading") - print(file_import_url) - return print(f"the scenario {zip_name} has been uploaded") +# %% # %% @@ -292,88 +256,68 @@ def upload_zip_files(self, zipfile_location): # Set Paths from the data to be uploaded # Excel files per scenario. - metadata_folder = ( - r"E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024\ldo_structuur\metadata_per_scenario" - ) + base_path = Path(r"E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024") + metadata_folder = base_path.joinpath(r"ldo_structuur\metadata_per_scenario") # Excel file where the ID and size of the upload is going to be stored - id_scenarios = r"E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024\ldo_structuur\scenarios_ids.xlsx" + id_scenarios = base_path.joinpath(r"ldo_structuur\scenarios_ids.xlsx") # Folder location where the scenarios are going to be copied - ldo_structuur_path = r"E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024\ldo_structuur" + ldo_structuur_path = base_path.joinpath("ldo_structuur") - # List the scenario name to be uploaded - scenario_names = os.listdir(metadata_folder) + # Folder where scenario results are stored. + scenario_results_path = base_path.joinpath("output") # data frame from the scenarios that are gonig to be uploaded. pd_scenarios = pd.read_excel(id_scenarios) # Select scenarios ids that area already uploaded to be skiped - scenario_done = pd_scenarios.loc[pd_scenarios["ID_SCENARIO"] > 0, "Naam van het scenario"].to_list() + scenarios_done = pd_scenarios.loc[pd_scenarios["ID_SCENARIO"] > 0, "Naam van het scenario"].to_list() # Sleep time to not burn out the API - sleeptime = 420 + sleeptime = 420 # FIXME 7 minutes seems alot? - # Create a list to delete scenarios already uploaded. - delete_file = [] + # Set API key + ldo_api = LDO_API(api_key=LDO_API_KEY) # Loop over al the scenarios - for excel_file_name in scenario_names: + scenarios = list(metadata_folder.glob("*.xlsx")) + for metadata_xlsx in scenarios: # Set Scenario Name - scenario_name = excel_file_name[:-5] + scenario_name = metadata_xlsx.stem # %% # If the scenario is done the continue - if scenario_name in scenario_done: + if scenario_name in scenarios_done: continue else: # Set folder with scenario name to be uploaded to LDO - path = hrt.Folder(os.path.join(ldo_structuur_path, scenario_name)) - - # Create Folder as Oboject - ldo_structuur = SelectFolder(path) + scenario_path = ldo_structuur_path.joinpath(scenario_name) - # Copy file NetCDF and DEM inside the previous folder. + # Create folder with data to upload. + ldo_structuur = LdoUploadFolder(scenario_path, scenario_results_path=scenario_results_path) ldo_structuur.copy_files() - - # Zip the file and retrieve the path of its location - ldo_structuur.zip_files() - - # Set API key - ldo_api = LDO_API_AUTH(url_auth=LDO_API_URL, api_key=LDO_API_KEY) - - # Set UPLOAD as an object - ldo_upload = LDO_API_UPLOAD(metadata_folder, ldo_api.refresh_token, scenario_name) - - # get metadata file of the scenario that is been uploaded - metadata_file = ldo_upload.metadata_file + zip_path = ldo_structuur.zip_files() # Upload excel file from the scenario, and retrieve json infomration - excel_response = ldo_upload.upload_excel() + excel_id, scenario_id = ldo_api.upload_excel(metadata_xlsx=metadata_xlsx) - # Get size of the zip folder. - zip_size = ldo_structuur.zip_kb - # store scenario id in the metadata - scenario_id = ldo_upload.scenario_id - - # Upload zip file using the - ldo_upload.upload_zip_files(ldo_structuur.zipfile_location) - time.sleep(sleeptime) + # Upload zip file + ldo_api.upload_zip_file(zip_path=zip_path, excel_id=excel_id) # Save the id of upload from the scenario pd_scenarios.loc[pd_scenarios["Naam van het scenario"] == scenario_name, "ID_SCENARIO"] = scenario_id - # Save the size of the scenario in the metdata dataframe - pd_scenarios.loc[pd_scenarios["Naam van het scenario"] == scenario_name, "SIZE_KB"] = zip_size + # Save the size of the scenario in the metdata dataframe + pd_scenarios.loc[pd_scenarios["Naam van het scenario"] == scenario_name, "SIZE_KB"] = ( + ldo_structuur.zip_size + ) - # REMOVE/DELETE ZIP AND FOLDER FROM THE SCENARIO THAT IS ALREADY UPLOADED. - delete_file.append(ldo_structuur.path) - - if len(delete_file) > 1: - previous_folder = delete_file.pop(0) - shutil.rmtree(previous_folder) - - print(f"the scenario {scenario_name} has been uploaded") + # Clear outputs + shutil.rmtree(ldo_structuur.path) # Save the excel file. pd_scenarios.to_excel(id_scenarios, index=False, engine="openpyxl") + logger.info(f"Finished processing {scenario_name}") + time.sleep(sleeptime) + # %%