Skip to content

Commit

Permalink
Pull request update/250206
Browse files Browse the repository at this point in the history
36fb4f7 OSN-618. Add a reason for the Terminated status of the runner
runset
f0a76d4 OSN-118. Fixed error when spot_settings is None
eb933db OSN-202. Updated states and reasons for runners
51a4bff OSN-118. Support spot_price parameter for runset
  • Loading branch information
tm-hystax authored Feb 6, 2025
2 parents 3a73eb9 + 36fb4f7 commit 480437a
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 34 deletions.
1 change: 0 additions & 1 deletion bulldozer/bulldozer_api/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import asyncio
from typing import Tuple
from enum import Enum
Expand Down
4 changes: 4 additions & 0 deletions bulldozer/bulldozer_worker/generator/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, seed,
key=None,
tags=None,
open_ingress=False,
spot_price=None,
):
if tags is None:
tags = dict()
Expand All @@ -63,6 +64,7 @@ def __init__(self, seed,
self.user_data = user_data
self.tags = tags
self.open_ingress = open_ingress
self.spot_price = spot_price

@property
def image(self):
Expand Down Expand Up @@ -102,6 +104,8 @@ def generate_payload(self, spot=False):
}
if self.user_data:
d.update({"user_data": self.user_data})
if self.spot_price:
d.update({"spot_price": self.spot_price})
return d


Expand Down
3 changes: 3 additions & 0 deletions bulldozer/bulldozer_worker/generator/templates/aws.tft
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ resource "aws_spot_instance_request" "optscale_instance_{{seed}}" {
{{#key}}
key_name = "{{key_name}}"
{{/key}}
{{#spot_price}}
spot_price = "{{spot_price}}"
{{/spot_price}}
instance_type = "{{instance_type}}"
vpc_security_group_ids = ["${aws_security_group.optscale_sg_{{seed}}.id}"]
root_block_device {
Expand Down
4 changes: 3 additions & 1 deletion bulldozer/bulldozer_worker/infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def start(
key=None,
tags=None,
open_ingress=False,
spot_price=None,
):
gen_cls = self.generator
gen = gen_cls(
Expand All @@ -223,7 +224,8 @@ def start(
user_data,
key,
tags,
open_ingress
open_ingress,
spot_price
)
template = gen.render()
path = os.path.join(os.path.dirname(self.path), self.seed)
Expand Down
31 changes: 20 additions & 11 deletions bulldozer/bulldozer_worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import logging

from bulldozer.bulldozer_worker.infra import Infra, InfraException
Expand Down Expand Up @@ -74,7 +73,7 @@ class RunsetState:


class TaskReason:
COMPLETED = "task completed successfully"
COMPLETED = "Task completed successfully"


class Base:
Expand Down Expand Up @@ -237,7 +236,7 @@ def check_destroy_conditions(self):
_, runner = self.bulldozer_cl.get_runner(runner_id)
# check for destroy flag set
if runner.get("destroy"):
raise DestroyFlagSet("Destroy flag is set")
raise DestroyFlagSet("Aborted - Destroy flag is set")

destroy_conditions = runner.get("destroy_conditions", {})
# check budget condition
Expand All @@ -250,7 +249,8 @@ def check_destroy_conditions(self):
"current (estimated): %f", runner_id, max_budget, cost)
if max_budget < cost:
raise BudgetExceeded(
f"Budget exceeded max: {max_budget}, current: {cost}")
f"Aborted - Budget exceeded max: {max_budget}, "
f"current: {cost}")
max_duration = destroy_conditions.get("max_duration")
if max_duration:
LOG.info("checking for max duration %d for runner %s",
Expand All @@ -263,7 +263,7 @@ def check_destroy_conditions(self):
runner_id, now, threshold)
if now > threshold:
raise TimeoutConditionExceeded(
f"Duration exceeded: current time: {now} "
f"Aborted - Duration exceeded: current time: {now} "
f"threshold: {threshold}"
)

Expand Down Expand Up @@ -366,15 +366,18 @@ def _exec(self):
}
)
infra.destroy()

self.bulldozer_cl.update_runner(
runner_id,
state=TaskState.DESTROYED,
destroyed_at=utcnow_timestamp()
)
except Exception as exc:
# basically exception
LOG.exception("Cleanup problem: %s", str(exc))
finally:
self.bulldozer_cl.update_runner(
runner_id,
state=TaskState.ERROR,
destroyed_at=utcnow_timestamp())
state=TaskState.ERROR)
finally:
self.update_reason()
self.message.ack()

Expand Down Expand Up @@ -402,6 +405,11 @@ def _exec(self):
tags = runner.get("tags", dict())
# opens ingress ports for runner instance
open_ingress = runner.get("open_ingress", False)
_, runset = self.bulldozer_cl.runset_get(runner["runset_id"])
spot_settings = runset.get("spot_settings")
spot_price = None
if spot_settings:
spot_price = spot_settings.get("spot_price")

if hp is not None and isinstance(hp, dict):
for k, v in hp.items():
Expand All @@ -413,7 +421,7 @@ def _exec(self):
state=TaskState.STARTING)
_, cloud_account = self.rest_cl.cloud_account_get(
cloud_account_id, True)
# TODO: get cloud type form cloud account to support multi-cloud
# TODO: get cloud type from cloud account to support multi-cloud
# Now only AWS is supported
c_type = "AWS"
if not self.body.get("type"):
Expand Down Expand Up @@ -448,6 +456,7 @@ def _exec(self):
key=None,
tags=tags,
open_ingress=open_ingress,
spot_price=spot_price
)

LOG.info("Created runner id=%s, instance=%s, ip=%s",
Expand Down Expand Up @@ -494,7 +503,7 @@ def _exec(self):
current_time, wait_time)
if current_time > wait_time:
# TODO: Do we need automatically destroy env?
raise ArceeWaitException("Arcee wait exceeded")
raise ArceeWaitException("Aborted - Arcee wait exceeded")
else:
self.update_run_info(run_id, runner)
self.bulldozer_cl.update_runner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { OPTSCALE_CAPABILITY } from "utils/constants";
import { getCloudResourceIdentifier } from "utils/resources";
import { CELL_EMPTY_VALUE } from "utils/tables";

const STATUSES = Object.freeze({
const STATE = Object.freeze({
STARTING_PREPARING: "starting preparing",
STARTING: "starting",
STARTED: "started",
Expand All @@ -27,6 +27,19 @@ const STATUSES = Object.freeze({
UNKNOWN: "unknown"
});

const STATE_TRANSLATION_MAP = Object.freeze({
[STATE.STARTING_PREPARING]: "startPreparing",
[STATE.STARTING]: "starting",
[STATE.STARTED]: "started",
[STATE.DESTROYING_SCHEDULED]: "terminateScheduled",
[STATE.DESTROY_PREPARING]: "terminatePrepared",
[STATE.DESTROYING]: "terminating",
[STATE.DESTROYED]: "terminated",
[STATE.ERROR]: "error",
[STATE.WAITING_ARCEE]: "waitingOptscaleArcee",
[STATE.UNKNOWN]: "unknown"
});

const Executors = ({ executors, isLoading }) => {
const isFinOpsEnabled = useIsOptScaleCapabilityEnabled(OPTSCALE_CAPABILITY.FINOPS);

Expand All @@ -43,29 +56,15 @@ const Executors = ({ executors, isLoading }) => {
cell: ({
cell,
row: {
original: { reason: errorReason }
original: { reason }
}
}) => {
const status = cell.getValue();

const getStatusTranslationId = () =>
({
[STATUSES.STARTING_PREPARING]: "startPreparing",
[STATUSES.STARTING]: "starting",
[STATUSES.STARTED]: "started",
[STATUSES.DESTROYING_SCHEDULED]: "terminateScheduled",
[STATUSES.DESTROY_PREPARING]: "terminatePrepared",
[STATUSES.DESTROYING]: "terminating",
[STATUSES.DESTROYED]: "terminated",
[STATUSES.ERROR]: "error",
[STATUSES.WAITING_ARCEE]: "waitingOptscaleArcee",
[STATUSES.UNKNOWN]: "unknown"
})[status];
const state = cell.getValue();

const translationId = getStatusTranslationId();
const translationId = STATE_TRANSLATION_MAP[state];

return translationId ? (
<CaptionedCell caption={status === STATUSES.ERROR ? errorReason : undefined}>
<CaptionedCell caption={[STATE.ERROR, STATE.DESTROYED].includes(state) ? reason : undefined}>
<FormattedMessage id={translationId} />
</CaptionedCell>
) : (
Expand Down
16 changes: 16 additions & 0 deletions rest_api/rest_api_server/handlers/v2/infrastructure/runsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class RunsetsAsyncCollectionHandler(BaseAsyncCollectionHandler,
}
SPOT_VALIDATION_MAP = {
'tries': (check_int_attribute, False),
'spot_price': (check_float_attribute, False),
}

def _get_controller_class(self):
Expand Down Expand Up @@ -121,6 +122,11 @@ async def post(self, organization_id, template_id, **url_params):
description: Organization id
required: true
type: string
- name: template_id
in: path
description: Runset template id
required: true
type: string
- in: body
name: body
description: Template parameters
Expand Down Expand Up @@ -187,8 +193,18 @@ async def post(self, organization_id, template_id, **url_params):
type: object
description: Related runners spot settings
required: false
properties:
tries:
type: integer
description: number of spot instance tries
required: false
spot_price:
type: number
description: spot instance price
required: false
example:
tries: 4
spot_price: 1.1
responses:
201:
description: Returns created runset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def setUp(self, version='v2'):
},
'open_ingress': False,
'spot_settings': {
'tries': 3
'tries': 3,
'spot_price': 1.1
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid

from copy import deepcopy
from unittest.mock import patch

from rest_api.rest_api_server.tests.unittests.test_infrastructure_base import (
Expand Down Expand Up @@ -74,6 +74,14 @@ def test_create_next(self):
# Hardcoded based on duration and flavor cost (hourly cost is 0.175)
self.assertEqual(res.get('cost'), 0.0049)

def test_create_spot_price(self):
params = deepcopy(self.valid_runset)
params['spot_settings']['spot_price'] = "123"
code, resp = self.client.runset_create(
self.organization_id, str(uuid.uuid4()), params)
self.assertEqual(code, 400)
self.assertEqual(resp['error']['error_code'], 'OE0466')

def test_create_nonexisting(self):
code, _ = self.client.runset_create(
self.organization_id, str(uuid.uuid4()), self.valid_runset)
Expand Down

0 comments on commit 480437a

Please sign in to comment.