diff --git a/deploy/test-build/cluster/ssh/ssh_command_wrapper.sh b/deploy/test-build/cluster/ssh/ssh_command_wrapper.sh index 7aba956c..9384f06d 100644 --- a/deploy/test-build/cluster/ssh/ssh_command_wrapper.sh +++ b/deploy/test-build/cluster/ssh/ssh_command_wrapper.sh @@ -23,6 +23,11 @@ if [ "${SSH_EXECUTE:0:3}" == "ID=" -o "${SSH_EXECUTE:0:6}" == "F7TID=" ]; then actual="${SSH_EXECUTE}" fi +# Remove the SLURM_TIME_FORMAT=standard prefix if present +if [[ "${actual:0:26}" == "SLURM_TIME_FORMAT=standard" ]]; then + actual="${actual#* }" # remove everything before the first space, including the space +fi + command="${actual%% *}" # remove all after first space case "$command" in diff --git a/doc/openapi/firecrest-api.yaml b/doc/openapi/firecrest-api.yaml index 71c57263..7ef4db26 100644 --- a/doc/openapi/firecrest-api.yaml +++ b/doc/openapi/firecrest-api.yaml @@ -1963,6 +1963,53 @@ paths: description: User does not have permissions to access machine schema: type: integer + '/compute/reservations': + parameters: + - in: header + name: X-Machine-Name + description: The system name + required: true + schema: + type: string + get: + summary: Retrieves information about all reservations + description: Information about reservations in the scheduling queue. + tags: + - Compute + parameters: + - name: partitions + in: query + description: Comma-separated list of reservations to retrieve + schema: + type: array + items: + type: string + responses: + '200': + description: Task created + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Success' + '400': + description: Task creation error + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Error' + headers: + X-Machine-Does-Not-Exist: + description: Machine does not exist + schema: + type: integer + X-Machine-Not-Available: + description: Machine is not available + schema: + type: integer + X-Permission-Denied: + description: User does not have permissions to access machine + schema: + type: integer '/storage/xfer-internal/rsync': parameters: - in: header diff --git a/doc/openapi/firecrest-developers-api.yaml b/doc/openapi/firecrest-developers-api.yaml index e19d42c4..282b30b6 100644 --- a/doc/openapi/firecrest-developers-api.yaml +++ b/doc/openapi/firecrest-developers-api.yaml @@ -1951,6 +1951,53 @@ paths: description: User does not have permissions to access machine schema: type: integer + '/compute/reservations': + parameters: + - in: header + name: X-Machine-Name + description: The system name + required: true + schema: + type: string + get: + summary: Retrieves information about all reservations + description: Information about reservations in the scheduling queue. + tags: + - Compute + parameters: + - name: partitions + in: query + description: Comma-separated list of reservations to retrieve + schema: + type: array + items: + type: string + responses: + '200': + description: Task created + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Success' + '400': + description: Task creation error + content: + application/json: + schema: + $ref: '#/components/schemas/Task-Creation-Error' + headers: + X-Machine-Does-Not-Exist: + description: Machine does not exist + schema: + type: integer + X-Machine-Not-Available: + description: Machine is not available + schema: + type: integer + X-Permission-Denied: + description: User does not have permissions to access machine + schema: + type: integer '/storage/xfer-internal/rsync': parameters: - in: header diff --git a/src/common/schedulers/__init__.py b/src/common/schedulers/__init__.py index a158bbac..f585976a 100644 --- a/src/common/schedulers/__init__.py +++ b/src/common/schedulers/__init__.py @@ -176,12 +176,29 @@ def parse_partitions_output(self, output, partition_names=None): """ pass + @abc.abstractmethod + def get_reservations(self, reservation_names=None): + """Return the reservations command of the system. + """ + pass + + @abc.abstractmethod + def parse_reservations_output(self, output, reservation_names=None): + """Parses the reservations command. Should return records with: + * ReservationName + * State + * Nodes + * StartTime + * EndTime + * Features + """ + pass + @abc.abstractmethod def check_job_time(self, job_time): - """ Try to parse correctly the HH:MM:SS time format for the passed job_time argument. Accepted formats: + """Try to parse correctly the HH:MM:SS time format for the passed job_time argument. Accepted formats: * MM MM:SS * HH:MM:SS * DD-HH DD-HH:MM * DD-HH:MM:SS """ - pass diff --git a/src/common/schedulers/slurm.py b/src/common/schedulers/slurm.py index f6c99847..cccd8a4a 100644 --- a/src/common/schedulers/slurm.py +++ b/src/common/schedulers/slurm.py @@ -277,11 +277,11 @@ def parse_partitions_output(self, output, partition_names=None): "Default", ] for part_descr in partitions_descriptions: - node_info = {} + part_info = {} for attr_name in attributes: attr_match = re.search(rf'{attr_name}=(\S+)', part_descr) if attr_match: - node_info[attr_name] = attr_match.group(1) + part_info[attr_name] = attr_match.group(1) else: raise ValueError( f"Could not parse attribute '{attr_name}' in " @@ -290,11 +290,47 @@ def parse_partitions_output(self, output, partition_names=None): if ( partitions_set is None or - node_info["PartitionName"] in partitions_set + part_info["PartitionName"] in partitions_set ): - partitions.append(node_info) + partitions.append(part_info) + + return partitions + + def get_reservations(self, reservation_names=None): + return "SLURM_TIME_FORMAT=standard scontrol -a show -o reservations" + + def parse_reservations_output(self, output, reservation_names=None): + if output == "No reservations in the system": + return [] + + reservations_set = set(reservation_names) if reservation_names else None + reservations_descriptions = output.splitlines() + reservations = [] + attribute_seps = { + "ReservationName": None, + "State": None, + "Nodes": None, + "StartTime": None, + "EndTime": None, + "Features": "&", + } + for res_descr in reservations_descriptions: + res_info = {} + for attr_name, sep in attribute_seps.items(): + attr_match = re.search(rf'{attr_name}=(\S+)', res_descr) + if attr_match: + attr = attr_match.group(1) + res_info[attr_name] = attr.split(sep) if sep else attr + else: + raise ValueError( + f"Could not parse attribute '{attr_name}' in " + f"'{res_descr}'" + ) + + if reservations_set is None or res_info["ReservationName"] in reservations_set: + reservations.append(res_info) - return list(partitions) + return reservations def is_valid_accounting_time(self, sacct_time): # HH:MM[:SS] [AM|PM] diff --git a/src/compute/compute.py b/src/compute/compute.py index 5089bd55..407be53d 100644 --- a/src/compute/compute.py +++ b/src/compute/compute.py @@ -1041,6 +1041,34 @@ def partitions_task(headers, system_name, system_addr, action, task_id, partitio update_task(task_id, headers, async_task.SUCCESS, jobs, is_json=True) +def reservations_task(headers, system_name, system_addr, action, task_id, reservations_list=None): + # exec remote command + resp = exec_remote_command(headers, system_name, system_addr, action) + + # in case of error: + if resp["error"] == -2: + update_task(task_id, headers, async_task.ERROR, "Machine is not available") + return + + # in case of error: + if resp["error"] != 0: + err_msg = resp["msg"] + if in_str(err_msg,"OPENSSH"): + err_msg = "User does not have permissions to access machine" + update_task(task_id, headers, async_task.ERROR, err_msg) + return + + try: + reservations = scheduler.parse_reservations_output(resp["msg"], reservations_list) + app.logger.info(f"Number of reservations: {len(reservations)}") + except ValueError as e: + update_task(task_id, headers, async_task.ERROR, str(e)) + return + + # as it is a json data to be stored in Tasks, the is_json=True + update_task(task_id, headers, async_task.SUCCESS, reservations, is_json=True) + + # Job account information @app.route("/acct",methods=["GET"]) @check_auth_header @@ -1357,6 +1385,88 @@ def get_partitions(): ) return data, 400 +@app.route("/reservations", methods=["GET"]) +@check_auth_header +def get_reservations(): + try: + system_name = request.headers["X-Machine-Name"] + except KeyError: + app.logger.error("No machinename given") + return jsonify(description="No machine name given"), 400 + + # public endpoints from Kong to users + if system_name not in SYSTEMS_PUBLIC: + header = {"X-Machine-Does-Not-Exists": "Machine does not exists"} + return jsonify(description="Failed to retrieve account information", error="Machine does not exists"), 400, header + + # select index in the list corresponding with machine name + system_idx = SYSTEMS_PUBLIC.index(system_name) + system_addr = SYSTEMS_INTERNAL_COMPUTE[system_idx] + + [headers, ID] = get_tracing_headers(request) + # check if machine is accessible by user: + resp = exec_remote_command( + headers, + system_name, + system_addr, + f"ID={ID} true" + ) + + if resp["error"] != 0: + error_str = resp["msg"] + if resp["error"] == -2: + header = {"X-Machine-Not-Available": "Machine is not available"} + return jsonify(description="Failed to retrieve account information"), 400, header + if in_str(error_str, "Permission") or in_str(error_str, "OPENSSH"): + header = {"X-Permission-Denied": "User does not have permissions to access machine or path"} + return jsonify(description="Failed to retrieve account information"), 404, header + + reservations = request.args.get("reservations", None) + reservations_list = None + if reservations is not None: + v = validate_input(reservations) + if v != "": + return jsonify(description="Failed to retrieve reservations information", error=f"reservations '{reservations}' {v}"), 400 + + try: + reservations_list = reservations.split(",") + except: + return jsonify(description="Failed to retrieve reservations information", error="Reservations list wrong format"), 400 + + # In Slurm we are not actually using the reservations_names argument + # for the command but it can be used for other schedulers + sched_cmd = scheduler.get_reservations(reservations_list) + action = f"ID={ID} {sched_cmd}" + + try: + # obtain new task from Tasks microservice + task_id = create_task(headers, service="compute", system=system_name) + + # if error in creating task: + if task_id == -1: + return jsonify(description="Failed to retrieve reservations information", error='Error creating task'), 400 + + update_task(task_id, headers, async_task.QUEUED) + + # asynchronous task creation + aTask = threading.Thread(target=reservations_task, name=ID, + args=(headers, system_name, system_addr, action, task_id, reservations_list)) + + aTask.start() + task_url = f"/tasks/{task_id}" + + data = jsonify( + success="Task created", task_id=task_id, task_url=task_url + ) + return data, 200 + + except Exception as e: + data = jsonify( + description="Failed to retrieve reservations information", error=e + ) + return data, 400 + + @app.route("/status",methods=["GET"]) @check_auth_header def status(): diff --git a/src/tests/automated_tests/integration/test_compute.py b/src/tests/automated_tests/integration/test_compute.py index a8cb0066..3fbdaaee 100644 --- a/src/tests/automated_tests/integration/test_compute.py +++ b/src/tests/automated_tests/integration/test_compute.py @@ -235,6 +235,20 @@ def test_partitions_xfer(machine, headers): check_task_status(task_id, headers) +@skipif_not_uses_gateway +@pytest.mark.parametrize("machine", [SERVER_COMPUTE]) +def test_reservations(machine, headers): + url = f"{COMPUTE_URL}/reservations" + headers.update({"X-Machine-Name": machine}) + resp = requests.get(url, headers=headers, verify=False) + print(resp.content) + assert resp.status_code == 200 + + # check scancel status + task_id = resp.json()["task_id"] + check_task_status(task_id, headers) + + if __name__ == '__main__': pytest.main() diff --git a/src/tests/automated_tests/unit/test_unit_compute.py b/src/tests/automated_tests/unit/test_unit_compute.py index 3ac737b1..6adea626 100644 --- a/src/tests/automated_tests/unit/test_unit_compute.py +++ b/src/tests/automated_tests/unit/test_unit_compute.py @@ -192,6 +192,17 @@ def test_partition_xfer(machine, expected_response_code, headers): assert resp.status_code == expected_response_code +# Test get reservations information +@skipif_not_uses_gateway +@pytest.mark.parametrize("machine, expected_response_code", DATA) +def test_reservations(machine, expected_response_code, headers): + url = f"{COMPUTE_URL}/reservations" + headers.update({"X-Machine-Name": machine}) + resp = requests.get(url, headers=headers, verify=False) + print(resp.content) + assert resp.status_code == expected_response_code + + # Test get status of Jobs microservice @skipif_uses_gateway def test_status(headers):