Skip to content

Commit

Permalink
client: add wait() method to poll for operation result
Browse files Browse the repository at this point in the history
Poll interval is handled internally and adapts to job run time.

Optionally, the user can set a max wait time to stop polling even if the
operation is still pending.

Supersedes #64
  • Loading branch information
brutasse committed Dec 12, 2024
1 parent e789364 commit f0c8e52
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 54 deletions.
9 changes: 7 additions & 2 deletions docs/v2.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,10 @@ V2 API bindings
build/html/v2.html.

.. automodule:: exoscale.api.v2
:members: Client
:undoc-members:
:members:
:exclude-members: Client


.. autoclass:: exoscale.api.v2.Client
:members:
:inherited-members: wait
186 changes: 135 additions & 51 deletions exoscale/api/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,24 @@
>>> c.list_instances()
{'instances': []}
Waiting for an asynchronous operation to complete:
>>> from exoscale.api.v2 import Client
>>> c = Client("api-key", "api-secret", zone="ch-gva-2")
>>> version = c.list_sks_cluster_versions()["sks-cluster-versions"][0]
>>> operation = c.create_sks_cluster(
... cni="cilium",
... name="my-cluster",
... level="starter",
... version=version,
... )
>>> c.wait(operation["id"])
{'id': 'e2047130-b86e-11ef-83b3-0d8312b2c2d7', 'state': 'success', 'reference': {'id': '8561ee34-09f0-42da-a765-abde807f944b', 'link': '/v2/sks-cluster/8561ee34-09f0-42da-a765-abde807f944b', 'command': 'get-sks-cluster'}}
"""

import copy
import json
import time
from itertools import chain
from pathlib import Path

Expand Down Expand Up @@ -96,6 +110,81 @@ def _get_ref(path):
return payload


_type_translations = {
"string": "str",
"integer": "int",
"object": "dict",
"array": "list",
"boolean": "bool",
"number": "float",
}


def _return_docstring(operation):
[status_code] = operation["responses"].keys()
[ctype] = operation["responses"][status_code]["content"].keys()
return_schema = operation["responses"][status_code]["content"][ctype][
"schema"
]
if "$ref" in return_schema:
ref = _get_ref(return_schema["$ref"])
if (
"properties" in ref
and ref["type"] == "object"
and "description" in ref
):
body = {}
for name, prop in ref["properties"].items():
if "$ref" in prop:
_ref = _get_ref(prop["$ref"])
item = _ref
else:
item = prop
typ = _type_translations[item["type"]]
desc = prop.get("description")
if "enum" in item:
choices = "``, ``".join(map(repr, item["enum"]))
desc += f". Values are ``{choices}``"
suffix = f": {desc}" if desc else ""
normalized_name = name.replace("-", "_")
body[normalized_name] = (
f"**{normalized_name}** ({typ}){suffix}."
)

doc = (
f"dict: {ref['description']}. A dictionnary with the following keys:"
+ "\n\n * ".join([""] + list(body.values()))
)
elif "description" in ref:
doc = f'{_type_translations[ref["type"]]}: {ref["description"]}.'
else:
doc = _type_translations[ref["type"]]
else:
doc = _type_translations[return_schema["type"]]
return doc


def _poll_interval(run_time):
"""
Returns the wait interval before next poll, given the current run time of a job.
We poll
- every 3 seconds for the first 30 seconds
- then increase linearly to reach 1 minute at 15 minutes of run time
- then every minute
"""
# y = a * x + b. Solve a and b for:
# 60 = a * 900 + b
# 3 = a * 30 + b
a = 57 / 870
b = 3 - 30 * a
min_wait = 3
max_wait = 60
interval = a * run_time + b
interval = max(min_wait, interval)
interval = min(max_wait, interval)
return interval


class BaseClient:
def __init__(self, key, secret, url=None, **kwargs):
if url is None:
Expand Down Expand Up @@ -123,6 +212,7 @@ def __init__(self, key, secret, url=None, **kwargs):
session.auth = ExoscaleV2Auth(key, secret)
self.session = session
self.key = key
self.WAIT_ABORT_ERRORS_COUNT = 5

def __repr__(self):
return (
Expand Down Expand Up @@ -178,59 +268,53 @@ def _call_operation(self, operation_id, parameters=None, body=None):

return response.json()


_type_translations = {
"string": "str",
"integer": "int",
"object": "dict",
"array": "list",
"boolean": "bool",
"number": "float",
}


def _return_docstring(operation):
[status_code] = operation["responses"].keys()
[ctype] = operation["responses"][status_code]["content"].keys()
return_schema = operation["responses"][status_code]["content"][ctype][
"schema"
]
if "$ref" in return_schema:
ref = _get_ref(return_schema["$ref"])
if (
"properties" in ref
and ref["type"] == "object"
and "description" in ref
):
body = {}
for name, prop in ref["properties"].items():
if "$ref" in prop:
_ref = _get_ref(prop["$ref"])
item = _ref
else:
item = prop
typ = _type_translations[item["type"]]
desc = prop.get("description")
if "enum" in item:
choices = "``, ``".join(map(repr, item["enum"]))
desc += f". Values are ``{choices}``"
suffix = f": {desc}" if desc else ""
normalized_name = name.replace("-", "_")
body[normalized_name] = (
f"**{normalized_name}** ({typ}){suffix}."
def wait(self, operation_id: str, max_wait_time: int = None):
"""
Wait for completion of an asynchronous operation.
Args:
operation_id (str)
max_wait_time (int): When set, stop waiting after this time in seconds. Defaults to ``None``, which waits until operation completion.
Returns:
{ret}
"""
start_time = time.time()
subsequent_errors = 0
while True:
try:
result = self.get_operation(id=operation_id)
subsequent_errors = 0
except ExoscaleAPIServerException as e:
subsequent_errors += 1
if subsequent_errors >= self.WAIT_ABORT_ERRORS_COUNT:
raise ExoscaleAPIServerException(
"Server error while polling operation"
) from e
continue
state = result["state"]
if state == "success":
return result
elif state in {"failure", "timeout"}:
raise ExoscaleAPIServerException(
f"Operation error: {state}, {result['reason']}"
)
elif state == "pending":
run_time = time.time() - start_time
if max_wait_time is not None and run_time > max_wait_time:
raise ExoscaleAPIClientException(
"Operation max wait time reached"
)
interval = _poll_interval(run_time)
time.sleep(interval)
else:
raise ExoscaleAPIServerException(
f"Invalid operation state: {state}"
)

doc = (
f"dict: {ref['description']}. A dictionnary with the following keys:"
+ "\n\n * ".join([""] + list(body.values()))
)
elif "description" in ref:
doc = f'{_type_translations[ref["type"]]}: {ref["description"]}.'
else:
doc = _type_translations[ref["type"]]
else:
doc = _type_translations[return_schema["type"]]
return doc
wait.__doc__ = wait.__doc__.format(
ret=_return_docstring(BY_OPERATION["get-operation"]["operation"])
)


def _args_docstring(parameters, body):
Expand Down
59 changes: 58 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from exoscale.api.v2 import Client
from exoscale.api.v2 import Client, _poll_interval
from exoscale.api.exceptions import (
ExoscaleAPIClientException,
ExoscaleAPIServerException,
Expand Down Expand Up @@ -71,5 +71,62 @@ def test_client_error_handling(requests_mock):
)


def test_wait_interval():
assert _poll_interval(25) == 3
assert 3 < _poll_interval(33) < 4
assert 8 < _poll_interval(120) < 9
assert 40 < _poll_interval(600) < 41
assert _poll_interval(1000) == 60
assert _poll_interval(999999) == 60


def test_operation_poll_failure(requests_mock):
requests_mock.get(
"https://api-ch-gva-2.exoscale.com/v2/operation/e2047130-b86e-11ef-83b3-0d8312b2c2d7", # noqa
status_code=200,
text='{"id": "4c5547c0-b870-11ef-83b3-0d8312b2c2d7", "state": "failure", "reason": "unknown", "reference": {"id": "97d7426f-8b25-4591-91d5-4a19e9a1d61a", "link": "/v2/sks-cluster/97d7426f-8b25-4591-91d5-4a19e9a1d61a", "command": "get-sks-cluster"}}', # noqa
)

client = Client(key="EXOtest", secret="sdsd")
try:
client.wait(operation_id="e2047130-b86e-11ef-83b3-0d8312b2c2d7")
except ExoscaleAPIServerException as e:
assert "Operation error: failure" in str(e)
else:
assert False, "exception not raised"


def test_operation_abort_on_500(requests_mock):
requests_mock.get(
"https://api-ch-gva-2.exoscale.com/v2/operation/e2047130-b86e-11ef-83b3-0d8312b2c2d7", # noqa
status_code=500,
text='{"message": "server error"}',
)

client = Client(key="EXOtest", secret="sdsd")
try:
client.wait(operation_id="e2047130-b86e-11ef-83b3-0d8312b2c2d7")
except ExoscaleAPIServerException as e:
assert "Server error while polling operation" in str(e)
else:
assert False, "exception not raised"


def test_operation_invalid_state(requests_mock):
requests_mock.get(
"https://api-ch-gva-2.exoscale.com/v2/operation/e2047130-b86e-11ef-83b3-0d8312b2c2d7", # noqa
status_code=200,
text='{"id": "4c5547c0-b870-11ef-83b3-0d8312b2c2d7", "state": "weird", "reference": {"id": "97d7426f-8b25-4591-91d5-4a19e9a1d61a", "link": "/v2/sks-cluster/97d7426f-8b25-4591-91d5-4a19e9a1d61a", "command": "get-sks-cluster"}}', # noqa
)

client = Client(key="EXOtest", secret="sdsd")
try:
client.wait(operation_id="e2047130-b86e-11ef-83b3-0d8312b2c2d7")
except ExoscaleAPIServerException as e:
assert "Invalid operation state: weird" in str(e)
else:
assert False, "exception not raised"


if __name__ == "__main__":
pytest.main()

0 comments on commit f0c8e52

Please sign in to comment.