Skip to content

Commit

Permalink
client: add wait() method to poll for operation result (#66)
Browse files Browse the repository at this point in the history
Poll interval is handled internally and adapts to job run time.

Optionally, the user can set a max wait time to stop polling even if the
operation is still pending.

Supersedes #64
  • Loading branch information
brutasse authored Dec 16, 2024
1 parent 2497543 commit 06f2e5d
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 60 deletions.
2 changes: 2 additions & 0 deletions docs/changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Changelog
or 5xx HTTP statuses.
* Bump python version requirement to 3.8 due to the use of the walrus
operator.
* Add ``Client.wait(operation_id)`` to poll for the result of an asynchronous
API operation.

0.9.1 (2024-04-29)
~~~~~~~~~~~~~~~~~~
Expand Down
9 changes: 7 additions & 2 deletions docs/v2.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,10 @@ V2 API bindings
build/html/v2.html.

.. automodule:: exoscale.api.v2
:members: Client
:undoc-members:
:members:
:exclude-members: Client


.. autoclass:: exoscale.api.v2.Client
:members:
:inherited-members: wait
211 changes: 156 additions & 55 deletions exoscale/api/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,43 @@
>>> c.list_instances()
{'instances': []}
Waiting for an asynchronous operation to complete:
>>> from exoscale.api.v2 import Client
>>> c = Client("api-key", "api-secret", zone="ch-gva-2")
>>> version = c.list_sks_cluster_versions()["sks-cluster-versions"][0]
>>> operation = c.create_sks_cluster(
... cni="cilium",
... name="my-cluster",
... level="starter",
... version=version,
... )
>>> c.wait(operation["id"])
{'id': 'e2047130-b86e-11ef-83b3-0d8312b2c2d7',
'state': 'success',
'reference': {
'id': '8561ee34-09f0-42da-a765-abde807f944b',
'link': '/v2/sks-cluster/8561ee34-09f0-42da-a765-abde807f944b',
'command': 'get-sks-cluster'}}
"""

import copy
import json
import time
from itertools import chain
from pathlib import Path

from exoscale_auth import ExoscaleV2Auth

import requests

from .exceptions import (
ExoscaleAPIAuthException,
ExoscaleAPIClientException,
ExoscaleAPIServerException,
ExoscaleAPIAuthException,
)


import requests


with open(Path(__file__).parent.parent / "openapi.json", "r") as f:
API_SPEC = json.load(f)

Expand Down Expand Up @@ -96,6 +115,91 @@ def _get_ref(path):
return payload


_type_translations = {
"string": "str",
"integer": "int",
"object": "dict",
"array": "list",
"boolean": "bool",
"number": "float",
}


def _return_docstring(operation):
[status_code] = operation["responses"].keys()
[ctype] = operation["responses"][status_code]["content"].keys()
return_schema = operation["responses"][status_code]["content"][ctype][
"schema"
]
if "$ref" in return_schema:
ref = _get_ref(return_schema["$ref"])
if (
"properties" in ref
and ref["type"] == "object"
and "description" in ref
):
body = {}
for name, prop in ref["properties"].items():
if "$ref" in prop:
_ref = _get_ref(prop["$ref"])
item = _ref
else:
item = prop
typ = _type_translations[item["type"]]
desc = prop.get("description")
if "enum" in item:
choices = "``, ``".join(map(repr, item["enum"]))
desc += f". Values are ``{choices}``"
suffix = f": {desc}" if desc else ""
normalized_name = name.replace("-", "_")
body[normalized_name] = (
f"**{normalized_name}** ({typ}){suffix}."
)

doc = (
f"dict: {ref['description']}. A dictionnary with the following keys:"
+ "\n\n * ".join([""] + list(body.values()))
)
elif "description" in ref:
doc = f'{_type_translations[ref["type"]]}: {ref["description"]}.'
else:
doc = _type_translations[ref["type"]]
else:
doc = _type_translations[return_schema["type"]]
return doc


def _poll_interval(run_time):
"""
Returns the wait interval before next poll, given the current run time of a job.
We poll
- every 3 seconds for the first 30 seconds
- then increase linearly to reach 1 minute at 15 minutes of run time
- then every minute
"""
# y = a * x + b. Solve a and b for:
# 60 = a * 900 + b
# 3 = a * 30 + b
a = 57 / 870
b = 3 - 30 * a
min_wait = 3
max_wait = 60
interval = a * run_time + b
interval = max(min_wait, interval)
interval = min(max_wait, interval)
return interval


def _time():
return time.time()


def _sleep(start_time):
run_time = _time() - start_time
interval = _poll_interval(run_time)
return time.sleep(interval)


class BaseClient:
def __init__(self, key, secret, url=None, **kwargs):
if url is None:
Expand Down Expand Up @@ -123,6 +227,7 @@ def __init__(self, key, secret, url=None, **kwargs):
session.auth = ExoscaleV2Auth(key, secret)
self.session = session
self.key = key
self.WAIT_ABORT_ERRORS_COUNT = 5

def __repr__(self):
return (
Expand Down Expand Up @@ -178,59 +283,55 @@ def _call_operation(self, operation_id, parameters=None, body=None):

return response.json()


_type_translations = {
"string": "str",
"integer": "int",
"object": "dict",
"array": "list",
"boolean": "bool",
"number": "float",
}


def _return_docstring(operation):
[status_code] = operation["responses"].keys()
[ctype] = operation["responses"][status_code]["content"].keys()
return_schema = operation["responses"][status_code]["content"][ctype][
"schema"
]
if "$ref" in return_schema:
ref = _get_ref(return_schema["$ref"])
if (
"properties" in ref
and ref["type"] == "object"
and "description" in ref
):
body = {}
for name, prop in ref["properties"].items():
if "$ref" in prop:
_ref = _get_ref(prop["$ref"])
item = _ref
else:
item = prop
typ = _type_translations[item["type"]]
desc = prop.get("description")
if "enum" in item:
choices = "``, ``".join(map(repr, item["enum"]))
desc += f". Values are ``{choices}``"
suffix = f": {desc}" if desc else ""
normalized_name = name.replace("-", "_")
body[normalized_name] = (
f"**{normalized_name}** ({typ}){suffix}."
def wait(self, operation_id: str, max_wait_time: int = None):
"""
Wait for completion of an asynchronous operation.
Args:
operation_id (str)
max_wait_time (int): When set, stop waiting after this time in
seconds. Defaults to ``None``, which waits until operation
completion.
Returns:
{ret}
"""
start_time = _time()
subsequent_errors = 0
while True:
try:
result = self.get_operation(id=operation_id)
subsequent_errors = 0
except ExoscaleAPIServerException as e:
subsequent_errors += 1
if subsequent_errors >= self.WAIT_ABORT_ERRORS_COUNT:
raise ExoscaleAPIServerException(
"Server error while polling operation"
) from e
_sleep(start_time)
continue
state = result["state"]
if state == "success":
return result
elif state in {"failure", "timeout"}:
raise ExoscaleAPIServerException(
f"Operation error: {state}, {result['reason']}"
)
elif state == "pending":
run_time = _time() - start_time
if max_wait_time is not None and run_time > max_wait_time:
raise ExoscaleAPIClientException(
"Operation max wait time reached"
)
_sleep(start_time)
else:
raise ExoscaleAPIServerException(
f"Invalid operation state: {state}"
)

doc = (
f"dict: {ref['description']}. A dictionnary with the following keys:"
+ "\n\n * ".join([""] + list(body.values()))
)
elif "description" in ref:
doc = f'{_type_translations[ref["type"]]}: {ref["description"]}.'
else:
doc = _type_translations[ref["type"]]
else:
doc = _type_translations[return_schema["type"]]
return doc
wait.__doc__ = wait.__doc__.format(
ret=_return_docstring(BY_OPERATION["get-operation"]["operation"])
)


def _args_docstring(parameters, body):
Expand Down
Loading

0 comments on commit 06f2e5d

Please sign in to comment.