diff --git a/hhnk_threedi_tools/breaches/upload_files_ldo.py b/hhnk_threedi_tools/breaches/upload_files_ldo.py index 050dc2a..75302f8 100644 --- a/hhnk_threedi_tools/breaches/upload_files_ldo.py +++ b/hhnk_threedi_tools/breaches/upload_files_ldo.py @@ -50,6 +50,7 @@ # %% import json +import os import shutil import time import zipfile @@ -60,10 +61,10 @@ import requests from breaches import Breaches -LDO_API_URL = "https://www.overstromingsinformatie.nl/api/v1/" +LDO_API_URL = "https://www.overstromingsinformatie.nl/auth/" # Generate api key on de LDO_API_URL website. And place it in api_ldo_key.txt -LDO_API_KEY = Path("api_ldo_key.txt").read_text("utf8") +LDO_API_KEY = Path("api_ldo_key.txt").read_text() logger = hrt.logging.get_logger(__name__) @@ -87,23 +88,18 @@ # %% -class LDO_API: - def __init__(self, api_key=LDO_API_KEY, tenant=4, url=LDO_API_URL): - self.url = url +class LDO_API_AUTH: + def __init__(self, url_auth, api_key, tenant=4): + self.url_auth = url_auth self.api_key = api_key self.tenant = tenant # organisation, 4=hhnk. - # Authorisation goes through the auth endpoint - self.url_auth = self.url.replace("/api/", "/auth/") + self._token = None + self._refresh_token = None - self._refresh_token = None # set on calling self.access_token - self._access_token = None # Property - - # TODO FROM LDO_API_UPLOAD - self.headers_excel = { - "accept": "application/json", - "authorization": f"Bearer {self.access_token}", - } + @property # to get health url + def health(self): + return self.url_auth[:-5] + "/health/" # Health is not under v1. @property def token(self): @@ -140,158 +136,171 @@ def refresh_token(self): ).json()["access"] return self._refresh_token - def access_token(self): - """Get refresh token so we can interact with the api.""" - if self._refresh_token is None: - r = requests.post( - url=self.url_auth + "token/", - json={"tenant": self.tenant}, - auth=("__key__", self.api_key), - timeout=5, - ).json() - self._refresh_token = r["refresh"] - self._access_token = r["access"] - return self._access_token - def get_tenants(self): """Tenant / organisation which has an id and name. - Prints the tenants. The id can be used to get the token. + Prints the tenants. The id can be use to get the token. """ - tenant_url = self.url + "v1/tenants/" - tenants = requests.get( - url=tenant_url, auth=("__key__", self.api_key), timeout=5 - ).json() + tenant_url = self.url_auth + "v1/tenants/" + tenants = requests.get(url=tenant_url, auth=("__key__", self.api_key)).json() for tenant in tenants: logger.info(tenant) return tenants def test_api(self): - """Test api connection without requiring authorisation""" - health_url = self.url[:-4] + "/health/" # Health is not under v1. - response_health = requests.get(url=health_url, timeout=5) + """Test api connection""" + response_health = requests.get(url=self.health) assert response_health.status_code == 200 - def upload_excel(self, metadata_xlsx): - """Upload excel file of the scenario and retrieve id of the excel upload. - In case there is an error it will print the reason. - """ - url_excel_import = self.url + "excel-imports?mode=create" - with open(metadata_xlsx, "rb") as excel_files: + +# %% + + +class SelectFolder(hrt.Folder): + """ + An object to ease the accessibility, creation and checks of folders and + files that need to be uploaded to LDO. + + Main Folder object + ├── dem.tif + ├── results_3di.nc + """ + + def __init__(self, base, create=True): + self.path = base.path + self.scenario_name = base.name + self.zip_kb = None + super().__init__(base, create=create) + self.zipfile_location = None + + def copy_files(self): + # Folder location from where the scenarios are going to be copied + output_folder = ( + r"E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024\output" + ) + + def select_folder(output_folder, scenario_name): + scenario_paths = [ + j for i in Path(output_folder).glob("*/") for j in list(i.glob("*/")) + ] + for scenario_path in scenario_paths: + if scenario_path.name == scenario_name: + return scenario_path + + scenario_folder = select_folder(output_folder, self.scenario_name) + breach = Breaches(scenario_folder) + raster_compress_path = os.path.join(breach.wss.path, "dem_clip.tif") + netcdf_path = os.path.join(breach.netcdf.path, "results_3di.nc") + + shutil.copy2(netcdf_path, self.path) + shutil.copy2(raster_compress_path, self.path) + + return print( + f"Scenario {self.scenario_name} has been copy in the folder structure" + ) + + # Create the zip file to uploaded. + def zip_files(self): + # Create name of the zip file + zip_name = self.scenario_name + ".zip" + zip_name = zip_name.replace(" ", "_") + + # Set the zip file path + folder_structure_path = Path(os.path.join(self.path)) + self.zipfile_location = Path(os.path.join(self.path, zip_name)) + + # Zip the folder to be uploaded + with zipfile.ZipFile(self.zipfile_location, "w") as zipf: + # Walk through the folder and add files to the zip file + for root, dirs, files in os.walk(folder_structure_path): + for file in files: + if file != f"{zip_name}": + # root = r'E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024\ldo_structuur' + file_path = os.path.join(root, file) + arcname = os.path.relpath(file_path, folder_structure_path) + zipf.write( + file_path, + arcname=os.path.join(f"{self.scenario_name}", arcname), + ) + # Get zipfile size. + zp = zipfile.ZipFile(f"{self.zipfile_location}") + size = sum([zinfo.file_size for zinfo in zp.filelist]) + self.zip_kb = float(size) / 1000 # kB + print(f"zip file created with size {self.zip_kb} kb") + return self.zipfile_location + + +# %% + + +class LDO_API_UPLOAD: + def __init__(self, metadata_folder_path, refresh_token, scenario_name): + self.metadata_folder_path = metadata_folder_path + self.metadata_file = Path( + os.path.join(metadata_folder_path, scenario_name + ".xlsx") + ) + self.refresh_token = refresh_token + self.headers_excel = { + "accept": "application/json", + "authorization": f"Bearer {self.refresh_token}", + # 'content-type':'multiplart/form-data', + } + self.url_uploadfile = ( + "https://www.overstromingsinformatie.nl/api/v1/excel-imports" + ) + self.scenario_id = None + self.id_excel = None + + # UPLOAD EXCEL FILE OF THE SCENARIO and retrieve id of the excel upload. In case there is an + # error y will print the reason. + def upload_excel(self): + excel_import_url = self.url_uploadfile + "?mode=create" + excel_name = self.metadata_file.name + with open(self.metadata_file, "rb") as excel_files: excel_files = { "file": ( - metadata_xlsx.name, + f"{excel_name}", excel_files, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ) } excel_response = requests.post( - url=url_excel_import, headers=self.headers_excel, files=excel_files + url=excel_import_url, headers=self.headers_excel, files=excel_files ) response_json = json.loads(excel_response.content.decode("utf-8")) if response_json.__contains__("message"): msg = response_json["detail"][0]["msg"] - logger.error("The excel file has an error") - raise ValueError(msg) + print(f"The excel file has a error, reason {msg}") else: status = response_json["status"] - excel_id = response_json["id"] - scenario_id = response_json["scenario_ids"][0] - logger.info( - f"The excel file is {status}, and has been uploaded with excel_id: {excel_id}, scenario_id: {scenario_id}" + self.id_excel = response_json["id"] + self.scenario_id = response_json["scenario_ids"][0] + print( + f"The excel file is {status}, and has been uploaded with id_excel number: {self.id_excel}" ) - return excel_id, scenario_id + return response_json - def upload_zip_file(self, zip_path, excel_id): - """Upload the zip file using the excel ID.""" + # Upload the zip file using the excel ID. + def upload_zip_files(self, zipfile_location): + # With this link the zip file is not going to be uploaded. + zip_name = zipfile_location.name file_import_url = ( - self.url + f"excel-imports/{excel_id}/files/{zip_path.name}/upload" + self.url_uploadfile + f"/{self.id_excel}/files/{zip_name}/upload" ) - logger.info(f"Uploading zip to {file_import_url}") # Create link to upload zip file response = requests.put(url=file_import_url, headers=self.headers_excel) upload_url = response.json()["url"] - # Upload data using link - with open(zip_path, "rb") as data: + # upload data using link + with open(f"{zipfile_location}", "rb") as data: r = requests.put(upload_url, data=data) - logger.info(f"status code: {r.status_code}") - logger.info(f"reason: {r.reason}") - logger.info(f"Finished uploading {zip_path.name}") - - -# %% - - -class LdoUploadFolder(hrt.Folder): - def __init__(self, base, scenario_results_path, create=True): - """ - Folder to store files that need to be uploaded to LDO. - This is a temporary dir that will be removed after a successful - upload - - Folder - ├── dem.tif - ├── results_3di.nc - ├── Folder.zip (zipped dem.tif + results_3di.nc) - - Parameters - ---------- - base : Union[str,Path] - output directory where upload files will be placed - scenario_results_path - directory where scenario results are stored. - """ - super().__init__(base, create=create) - self.scenario_results_path = scenario_results_path - self.zip_size = None - self.zip_path = None - - def _find_scenario_folder(self): - """Get Path to scenario results""" - scenario_paths = [ - j for i in self.scenario_results_path.glob("*/") for j in list(i.glob("*/")) - ] - for scenario_path in scenario_paths: - if scenario_path.name == self.name: - return scenario_path - raise FileNotFoundError( - f"{self.name} not found in {self.scenario_results_path}" - ) - - def copy_files(self): - """Copy NetCDF and DEM to the upload folder""" - scenario_folder = self._find_scenario_folder() - breach = Breaches(scenario_folder) - raster_compress_path = breach.wss.path.joinpath("dem_clip.tif") - netcdf_path = breach.netcdf.path.joinpath("results_3di.nc") - - shutil.copy2(raster_compress_path, self.path) - shutil.copy2(netcdf_path, self.path) - - logger.info(f"Scenario {self.name} has been copied in the folder structure") - - def zip_files(self): - """Zip files so they can be uploaded""" - zip_name = self.name.replace(" ", "_") + ".zip" - - # Set the zip file path - self.zip_path = self.path.joinpath(zip_name) - - # Zip the folder to be uploaded - with zipfile.ZipFile(self.zip_path, "w") as zipf: - for file in self.path.glob("*"): - if file.name != zip_name: - zipf.write(file, arcname=file.name) - - self.zip_size = round(self.zip_path.stat().st_size / 1024, 0) # KB - logger.info(f"Zip {zip_name} created with size {self.zip_size / 1024} MB") - return self.zip_path - - -# %% + print(r.status_code) + print(r.reason) + print("uploading") + print(file_import_url) + return print(f"the scenario {zip_name} has been uploaded") # %% @@ -299,76 +308,94 @@ def zip_files(self): # Set Paths from the data to be uploaded # Excel files per scenario. - base_path = Path( - r"E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024" - ) - metadata_folder = base_path.joinpath(r"ldo_structuur\metadata_per_scenario") + metadata_folder = r"E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024\ldo_structuur\metadata_per_scenario" # Excel file where the ID and size of the upload is going to be stored - id_scenarios = base_path.joinpath(r"ldo_structuur\scenarios_ids.xlsx") + id_scenarios = r"E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024\ldo_structuur\scenarios_ids.xlsx" # Folder location where the scenarios are going to be copied - ldo_structuur_path = base_path.joinpath("ldo_structuur") + ldo_structuur_path = r"E:\03.resultaten\Overstromingsberekeningenprimairedoorbraken2024\ldo_structuur" - # Folder where scenario results are stored. - scenario_results_path = base_path.joinpath("output") + # List the scenario name to be uploaded + scenario_names = os.listdir(metadata_folder) # data frame from the scenarios that are gonig to be uploaded. pd_scenarios = pd.read_excel(id_scenarios) # Select scenarios ids that area already uploaded to be skiped - scenarios_done = pd_scenarios.loc[ + scenario_done = pd_scenarios.loc[ pd_scenarios["ID_SCENARIO"] > 0, "Naam van het scenario" ].to_list() # Sleep time to not burn out the API - sleeptime = 420 # FIXME 7 minutes seems alot? + sleeptime = 420 - # Set API key - ldo_api = LDO_API(api_key=LDO_API_KEY) + # Create a list to delete scenarios already uploaded. + delete_file = [] # Loop over al the scenarios - scenarios = list(metadata_folder.glob("*.xlsx")) - for metadata_xlsx in scenarios: + for excel_file_name in scenario_names: # Set Scenario Name - scenario_name = metadata_xlsx.stem + scenario_name = excel_file_name[:-5] # %% # If the scenario is done the continue - if scenario_name in scenarios_done: + if scenario_name in scenario_done: continue else: # Set folder with scenario name to be uploaded to LDO - scenario_path = ldo_structuur_path.joinpath(scenario_name) + path = hrt.Folder(os.path.join(ldo_structuur_path, scenario_name)) - # Create folder with data to upload. - ldo_structuur = LdoUploadFolder( - scenario_path, scenario_results_path=scenario_results_path - ) + # Create Folder as Oboject + ldo_structuur = SelectFolder(path) + + # Copy file NetCDF and DEM inside the previous folder. ldo_structuur.copy_files() - zip_path = ldo_structuur.zip_files() + + # Zip the file and retrieve the path of its location + ldo_structuur.zip_files() + + # Set API key + ldo_api = LDO_API_AUTH(url_auth=LDO_API_URL, api_key=LDO_API_KEY) + + # Set UPLOAD as an object + ldo_upload = LDO_API_UPLOAD( + metadata_folder, ldo_api.refresh_token, scenario_name + ) + + # get metadata file of the scenario that is been uploaded + metadata_file = ldo_upload.metadata_file # Upload excel file from the scenario, and retrieve json infomration - excel_id, scenario_id = ldo_api.upload_excel(metadata_xlsx=metadata_xlsx) + excel_response = ldo_upload.upload_excel() + + # Get size of the zip folder. + zip_size = ldo_structuur.zip_kb + # store scenario id in the metadata + scenario_id = ldo_upload.scenario_id - # Upload zip file - ldo_api.upload_zip_file(zip_path=zip_path, excel_id=excel_id) + # Upload zip file using the + ldo_upload.upload_zip_files(ldo_structuur.zipfile_location) + time.sleep(sleeptime) # Save the id of upload from the scenario pd_scenarios.loc[ pd_scenarios["Naam van het scenario"] == scenario_name, "ID_SCENARIO" ] = scenario_id - # Save the size of the scenario in the metdata dataframe + # Save the size of the scenario in the metdata dataframe pd_scenarios.loc[ pd_scenarios["Naam van het scenario"] == scenario_name, "SIZE_KB" - ] = ldo_structuur.zip_size + ] = zip_size + + # REMOVE/DELETE ZIP AND FOLDER FROM THE SCENARIO THAT IS ALREADY UPLOADED. + delete_file.append(ldo_structuur.path) + + if len(delete_file) > 1: + previous_folder = delete_file.pop(0) + shutil.rmtree(previous_folder) - # Clear outputs - shutil.rmtree(ldo_structuur.path) + print(f"the scenario {scenario_name} has been uploaded") # Save the excel file. pd_scenarios.to_excel(id_scenarios, index=False, engine="openpyxl") - logger.info(f"Finished processing {scenario_name}") - time.sleep(sleeptime) - # %%