From 69f7982ae18745e1049b91c806852aeef8f8e472 Mon Sep 17 00:00:00 2001 From: phvalguima Date: Wed, 22 Jan 2025 20:16:13 +0100 Subject: [PATCH] [DPE-6296] Pyright fixes + structured_config additions + break down of actions.py (#13) --- poetry.lock | 207 +++++------ src/benchmark/base_charm.py | 167 ++------- src/benchmark/core/models.py | 32 +- src/benchmark/core/pebble_workload_base.py | 12 +- src/benchmark/core/structured_config.py | 32 ++ src/benchmark/core/systemd_workload_base.py | 7 +- src/benchmark/core/workload_base.py | 4 +- src/benchmark/events/actions.py | 142 ++++++++ src/benchmark/events/db.py | 52 +-- src/benchmark/literals.py | 1 - src/benchmark/managers/collector.py | 53 --- src/benchmark/managers/config.py | 34 +- src/benchmark/managers/lifecycle.py | 361 +++++++++++++------- src/benchmark/wrapper/core.py | 18 +- src/benchmark/wrapper/main.py | 3 + src/benchmark/wrapper/process.py | 27 +- src/charm.py | 144 ++++---- src/models.py | 33 ++ src/wrapper.py | 4 +- tests/unit/test_lifecycle.py | 11 + tox.ini | 3 + 21 files changed, 756 insertions(+), 591 deletions(-) create mode 100644 src/benchmark/core/structured_config.py create mode 100644 src/benchmark/events/actions.py delete mode 100644 src/benchmark/managers/collector.py create mode 100644 src/models.py diff --git a/poetry.lock b/poetry.lock index 4e8b8f8..c087731 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.0.0 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand. [[package]] name = "anyio" @@ -148,34 +148,34 @@ uvloop = ["uvloop (>=0.15.2)"] [[package]] name = "boto3" -version = "1.35.98" +version = "1.36.3" description = "The AWS SDK for Python" optional = false python-versions = ">=3.8" groups = ["integration"] files = [ - {file = "boto3-1.35.98-py3-none-any.whl", hash = "sha256:d0224e1499d7189b47aa7f469d96522d98df6f5702fccb20a95a436582ebcd9d"}, - {file = "boto3-1.35.98.tar.gz", hash = "sha256:4b6274b4fe9d7113f978abea66a1f20c8a397c268c9d1b2a6c96b14a256da4a5"}, + {file = "boto3-1.36.3-py3-none-any.whl", hash = "sha256:f9843a5d06f501d66ada06f5a5417f671823af2cf319e36ceefa1bafaaaaa953"}, + {file = "boto3-1.36.3.tar.gz", hash = "sha256:53a5307f6a3526ee2f8590e3c45efa504a3ea4532c1bfe4926c0c19bf188d141"}, ] [package.dependencies] -botocore = ">=1.35.98,<1.36.0" +botocore = ">=1.36.3,<1.37.0" jmespath = ">=0.7.1,<2.0.0" -s3transfer = ">=0.10.0,<0.11.0" +s3transfer = ">=0.11.0,<0.12.0" [package.extras] crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.35.98" +version = "1.36.3" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" groups = ["integration"] files = [ - {file = "botocore-1.35.98-py3-none-any.whl", hash = "sha256:4f1c0b687488663a774ad3a5e81a5f94fae1bcada2364cfdc48482c4dbf794d5"}, - {file = "botocore-1.35.98.tar.gz", hash = "sha256:d11742b3824bdeac3c89eeeaf5132351af41823bbcef8fc15e95c8250b1de09c"}, + {file = "botocore-1.36.3-py3-none-any.whl", hash = "sha256:536ab828e6f90dbb000e3702ac45fd76642113ae2db1b7b1373ad24104e89255"}, + {file = "botocore-1.36.3.tar.gz", hash = "sha256:775b835e979da5c96548ed1a0b798101a145aec3cd46541d62e27dda5a94d7f8"}, ] [package.dependencies] @@ -184,18 +184,18 @@ python-dateutil = ">=2.1,<3.0.0" urllib3 = {version = ">=1.25.4,<2.2.0 || >2.2.0,<3", markers = "python_version >= \"3.10\""} [package.extras] -crt = ["awscrt (==0.22.0)"] +crt = ["awscrt (==0.23.4)"] [[package]] name = "cachetools" -version = "5.5.0" +version = "5.5.1" description = "Extensible memoizing collections and decorators" optional = false python-versions = ">=3.7" groups = ["integration"] files = [ - {file = "cachetools-5.5.0-py3-none-any.whl", hash = "sha256:02134e8439cdc2ffb62023ce1debca2944c3f289d66bb17ead3ab3dede74b292"}, - {file = "cachetools-5.5.0.tar.gz", hash = "sha256:2cc24fb4cbe39633fb7badd9db9ca6295d766d9c2995f245725a46715d050f2a"}, + {file = "cachetools-5.5.1-py3-none-any.whl", hash = "sha256:b76651fdc3b24ead3c648bbdeeb940c1b04d365b38b4af66788f9ec4a81d42bb"}, + {file = "cachetools-5.5.1.tar.gz", hash = "sha256:70f238fbba50383ef62e55c6aff6d9673175fe59f7c6782c7a0b9e38f4a9df95"}, ] [[package]] @@ -410,14 +410,14 @@ colorama = {version = "*", markers = "platform_system == \"Windows\""} [[package]] name = "codespell" -version = "2.3.0" -description = "Codespell" +version = "2.4.0" +description = "Fix common misspellings in text files" optional = false python-versions = ">=3.8" groups = ["lint"] files = [ - {file = "codespell-2.3.0-py3-none-any.whl", hash = "sha256:a9c7cef2501c9cfede2110fd6d4e5e62296920efe9abfb84648df866e47f58d1"}, - {file = "codespell-2.3.0.tar.gz", hash = "sha256:360c7d10f75e65f67bad720af7007e1060a5d395670ec11a7ed1fed9dd17471f"}, + {file = "codespell-2.4.0-py3-none-any.whl", hash = "sha256:b4c5b779f747dd481587aeecb5773301183f52b94b96ed51a28126d0482eec1d"}, + {file = "codespell-2.4.0.tar.gz", hash = "sha256:587d45b14707fb8ce51339ba4cce50ae0e98ce228ef61f3c5e160e34f681be58"}, ] [package.extras] @@ -1350,14 +1350,14 @@ twisted = ["twisted"] [[package]] name = "prompt-toolkit" -version = "3.0.48" +version = "3.0.50" description = "Library for building powerful interactive command lines in Python" optional = false -python-versions = ">=3.7.0" +python-versions = ">=3.8.0" groups = ["integration"] files = [ - {file = "prompt_toolkit-3.0.48-py3-none-any.whl", hash = "sha256:f49a827f90062e411f1ce1f854f2aedb3c23353244f8108b89283587397ac10e"}, - {file = "prompt_toolkit-3.0.48.tar.gz", hash = "sha256:d6623ab0477a80df74e646bdbc93621143f5caf104206aa29294d53de1a03d90"}, + {file = "prompt_toolkit-3.0.50-py3-none-any.whl", hash = "sha256:9b6427eb19e479d98acff65196a307c555eb567989e6d88ebbb1b509d9779198"}, + {file = "prompt_toolkit-3.0.50.tar.gz", hash = "sha256:544748f3860a2623ca5cd6d2795e7a14f3d0e1c3c9728359013f79877fc89bab"}, ] [package.dependencies] @@ -1594,14 +1594,14 @@ pytz = "*" [[package]] name = "pyright" -version = "1.1.391" +version = "1.1.392.post0" description = "Command line wrapper for pyright" optional = false python-versions = ">=3.7" groups = ["format", "lint"] files = [ - {file = "pyright-1.1.391-py3-none-any.whl", hash = "sha256:54fa186f8b3e8a55a44ebfa842636635688670c6896dcf6cf4a7fc75062f4d15"}, - {file = "pyright-1.1.391.tar.gz", hash = "sha256:66b2d42cdf5c3cbab05f2f4b76e8bec8aa78e679bfa0b6ad7b923d9e027cadb2"}, + {file = "pyright-1.1.392.post0-py3-none-any.whl", hash = "sha256:252f84458a46fa2f0fd4e2f91fc74f50b9ca52c757062e93f6c250c0d8329eb2"}, + {file = "pyright-1.1.392.post0.tar.gz", hash = "sha256:3b7f88de74a28dcfa90c7d90c782b6569a48c2be5f9d4add38472bdaac247ebd"}, ] [package.dependencies] @@ -1845,19 +1845,20 @@ files = [ [[package]] name = "referencing" -version = "0.35.1" +version = "0.36.1" description = "JSON Referencing + Python" optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" groups = ["main", "charm-libs"] files = [ - {file = "referencing-0.35.1-py3-none-any.whl", hash = "sha256:eda6d3234d62814d1c64e305c1331c9a3a6132da475ab6382eaa997b21ee75de"}, - {file = "referencing-0.35.1.tar.gz", hash = "sha256:25b42124a6c8b632a425174f24087783efb348a6f1e0008e63cd4466fedf703c"}, + {file = "referencing-0.36.1-py3-none-any.whl", hash = "sha256:363d9c65f080d0d70bc41c721dce3c7f3e77fc09f269cd5c8813da18069a6794"}, + {file = "referencing-0.36.1.tar.gz", hash = "sha256:ca2e6492769e3602957e9b831b94211599d2aade9477f5d44110d2530cf9aade"}, ] [package.dependencies] attrs = ">=22.2.0" rpds-py = ">=0.7.0" +typing-extensions = {version = ">=4.4.0", markers = "python_version < \"3.13\""} [[package]] name = "requests" @@ -2058,21 +2059,21 @@ files = [ [[package]] name = "s3transfer" -version = "0.10.4" +version = "0.11.1" description = "An Amazon S3 Transfer Manager" optional = false python-versions = ">=3.8" groups = ["integration"] files = [ - {file = "s3transfer-0.10.4-py3-none-any.whl", hash = "sha256:244a76a24355363a68164241438de1b72f8781664920260c48465896b712a41e"}, - {file = "s3transfer-0.10.4.tar.gz", hash = "sha256:29edc09801743c21eb5ecbc617a152df41d3c287f67b615f73e5f750583666a7"}, + {file = "s3transfer-0.11.1-py3-none-any.whl", hash = "sha256:8fa0aa48177be1f3425176dfe1ab85dcd3d962df603c3dbfc585e6bf857ef0ff"}, + {file = "s3transfer-0.11.1.tar.gz", hash = "sha256:3f25c900a367c8b7f7d8f9c34edc87e300bde424f779dc9f0a8ae4f9df9264f6"}, ] [package.dependencies] -botocore = ">=1.33.2,<2.0a.0" +botocore = ">=1.36.0,<2.0a.0" [package.extras] -crt = ["botocore[crt] (>=1.33.2,<2.0a.0)"] +crt = ["botocore[crt] (>=1.36.0,<2.0a.0)"] [[package]] name = "shellcheck-py" @@ -2347,81 +2348,81 @@ test = ["websockets"] [[package]] name = "websockets" -version = "14.1" +version = "14.2" description = "An implementation of the WebSocket Protocol (RFC 6455 & 7692)" optional = false python-versions = ">=3.9" groups = ["integration"] files = [ - {file = "websockets-14.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:a0adf84bc2e7c86e8a202537b4fd50e6f7f0e4a6b6bf64d7ccb96c4cd3330b29"}, - {file = "websockets-14.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:90b5d9dfbb6d07a84ed3e696012610b6da074d97453bd01e0e30744b472c8179"}, - {file = "websockets-14.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:2177ee3901075167f01c5e335a6685e71b162a54a89a56001f1c3e9e3d2ad250"}, - {file = "websockets-14.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3f14a96a0034a27f9d47fd9788913924c89612225878f8078bb9d55f859272b0"}, - {file = "websockets-14.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1f874ba705deea77bcf64a9da42c1f5fc2466d8f14daf410bc7d4ceae0a9fcb0"}, - {file = "websockets-14.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9607b9a442392e690a57909c362811184ea429585a71061cd5d3c2b98065c199"}, - {file = "websockets-14.1-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:bea45f19b7ca000380fbd4e02552be86343080120d074b87f25593ce1700ad58"}, - {file = "websockets-14.1-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:219c8187b3ceeadbf2afcf0f25a4918d02da7b944d703b97d12fb01510869078"}, - {file = "websockets-14.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:ad2ab2547761d79926effe63de21479dfaf29834c50f98c4bf5b5480b5838434"}, - {file = "websockets-14.1-cp310-cp310-win32.whl", hash = "sha256:1288369a6a84e81b90da5dbed48610cd7e5d60af62df9851ed1d1d23a9069f10"}, - {file = "websockets-14.1-cp310-cp310-win_amd64.whl", hash = "sha256:e0744623852f1497d825a49a99bfbec9bea4f3f946df6eb9d8a2f0c37a2fec2e"}, - {file = "websockets-14.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:449d77d636f8d9c17952628cc7e3b8faf6e92a17ec581ec0c0256300717e1512"}, - {file = "websockets-14.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:a35f704be14768cea9790d921c2c1cc4fc52700410b1c10948511039be824aac"}, - {file = "websockets-14.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:b1f3628a0510bd58968c0f60447e7a692933589b791a6b572fcef374053ca280"}, - {file = "websockets-14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3c3deac3748ec73ef24fc7be0b68220d14d47d6647d2f85b2771cb35ea847aa1"}, - {file = "websockets-14.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7048eb4415d46368ef29d32133134c513f507fff7d953c18c91104738a68c3b3"}, - {file = "websockets-14.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f6cf0ad281c979306a6a34242b371e90e891bce504509fb6bb5246bbbf31e7b6"}, - {file = "websockets-14.1-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:cc1fc87428c1d18b643479caa7b15db7d544652e5bf610513d4a3478dbe823d0"}, - {file = "websockets-14.1-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:f95ba34d71e2fa0c5d225bde3b3bdb152e957150100e75c86bc7f3964c450d89"}, - {file = "websockets-14.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:9481a6de29105d73cf4515f2bef8eb71e17ac184c19d0b9918a3701c6c9c4f23"}, - {file = "websockets-14.1-cp311-cp311-win32.whl", hash = "sha256:368a05465f49c5949e27afd6fbe0a77ce53082185bbb2ac096a3a8afaf4de52e"}, - {file = "websockets-14.1-cp311-cp311-win_amd64.whl", hash = "sha256:6d24fc337fc055c9e83414c94e1ee0dee902a486d19d2a7f0929e49d7d604b09"}, - {file = "websockets-14.1-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:ed907449fe5e021933e46a3e65d651f641975a768d0649fee59f10c2985529ed"}, - {file = "websockets-14.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:87e31011b5c14a33b29f17eb48932e63e1dcd3fa31d72209848652310d3d1f0d"}, - {file = "websockets-14.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:bc6ccf7d54c02ae47a48ddf9414c54d48af9c01076a2e1023e3b486b6e72c707"}, - {file = "websockets-14.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9777564c0a72a1d457f0848977a1cbe15cfa75fa2f67ce267441e465717dcf1a"}, - {file = "websockets-14.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a655bde548ca98f55b43711b0ceefd2a88a71af6350b0c168aa77562104f3f45"}, - {file = "websockets-14.1-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a3dfff83ca578cada2d19e665e9c8368e1598d4e787422a460ec70e531dbdd58"}, - {file = "websockets-14.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:6a6c9bcf7cdc0fd41cc7b7944447982e8acfd9f0d560ea6d6845428ed0562058"}, - {file = "websockets-14.1-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:4b6caec8576e760f2c7dd878ba817653144d5f369200b6ddf9771d64385b84d4"}, - {file = "websockets-14.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:eb6d38971c800ff02e4a6afd791bbe3b923a9a57ca9aeab7314c21c84bf9ff05"}, - {file = "websockets-14.1-cp312-cp312-win32.whl", hash = "sha256:1d045cbe1358d76b24d5e20e7b1878efe578d9897a25c24e6006eef788c0fdf0"}, - {file = "websockets-14.1-cp312-cp312-win_amd64.whl", hash = "sha256:90f4c7a069c733d95c308380aae314f2cb45bd8a904fb03eb36d1a4983a4993f"}, - {file = "websockets-14.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:3630b670d5057cd9e08b9c4dab6493670e8e762a24c2c94ef312783870736ab9"}, - {file = "websockets-14.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:36ebd71db3b89e1f7b1a5deaa341a654852c3518ea7a8ddfdf69cc66acc2db1b"}, - {file = "websockets-14.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:5b918d288958dc3fa1c5a0b9aa3256cb2b2b84c54407f4813c45d52267600cd3"}, - {file = "websockets-14.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:00fe5da3f037041da1ee0cf8e308374e236883f9842c7c465aa65098b1c9af59"}, - {file = "websockets-14.1-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8149a0f5a72ca36720981418eeffeb5c2729ea55fa179091c81a0910a114a5d2"}, - {file = "websockets-14.1-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:77569d19a13015e840b81550922056acabc25e3f52782625bc6843cfa034e1da"}, - {file = "websockets-14.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:cf5201a04550136ef870aa60ad3d29d2a59e452a7f96b94193bee6d73b8ad9a9"}, - {file = "websockets-14.1-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:88cf9163ef674b5be5736a584c999e98daf3aabac6e536e43286eb74c126b9c7"}, - {file = "websockets-14.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:836bef7ae338a072e9d1863502026f01b14027250a4545672673057997d5c05a"}, - {file = "websockets-14.1-cp313-cp313-win32.whl", hash = "sha256:0d4290d559d68288da9f444089fd82490c8d2744309113fc26e2da6e48b65da6"}, - {file = "websockets-14.1-cp313-cp313-win_amd64.whl", hash = "sha256:8621a07991add373c3c5c2cf89e1d277e49dc82ed72c75e3afc74bd0acc446f0"}, - {file = "websockets-14.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:01bb2d4f0a6d04538d3c5dfd27c0643269656c28045a53439cbf1c004f90897a"}, - {file = "websockets-14.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:414ffe86f4d6f434a8c3b7913655a1a5383b617f9bf38720e7c0799fac3ab1c6"}, - {file = "websockets-14.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8fda642151d5affdee8a430bd85496f2e2517be3a2b9d2484d633d5712b15c56"}, - {file = "websockets-14.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cd7c11968bc3860d5c78577f0dbc535257ccec41750675d58d8dc66aa47fe52c"}, - {file = "websockets-14.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a032855dc7db987dff813583d04f4950d14326665d7e714d584560b140ae6b8b"}, - {file = "websockets-14.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b7e7ea2f782408c32d86b87a0d2c1fd8871b0399dd762364c731d86c86069a78"}, - {file = "websockets-14.1-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:39450e6215f7d9f6f7bc2a6da21d79374729f5d052333da4d5825af8a97e6735"}, - {file = "websockets-14.1-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:ceada5be22fa5a5a4cdeec74e761c2ee7db287208f54c718f2df4b7e200b8d4a"}, - {file = "websockets-14.1-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:3fc753451d471cff90b8f467a1fc0ae64031cf2d81b7b34e1811b7e2691bc4bc"}, - {file = "websockets-14.1-cp39-cp39-win32.whl", hash = "sha256:14839f54786987ccd9d03ed7f334baec0f02272e7ec4f6e9d427ff584aeea8b4"}, - {file = "websockets-14.1-cp39-cp39-win_amd64.whl", hash = "sha256:d9fd19ecc3a4d5ae82ddbfb30962cf6d874ff943e56e0c81f5169be2fda62979"}, - {file = "websockets-14.1-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:e5dc25a9dbd1a7f61eca4b7cb04e74ae4b963d658f9e4f9aad9cd00b688692c8"}, - {file = "websockets-14.1-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:04a97aca96ca2acedf0d1f332c861c5a4486fdcba7bcef35873820f940c4231e"}, - {file = "websockets-14.1-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:df174ece723b228d3e8734a6f2a6febbd413ddec39b3dc592f5a4aa0aff28098"}, - {file = "websockets-14.1-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:034feb9f4286476f273b9a245fb15f02c34d9586a5bc936aff108c3ba1b21beb"}, - {file = "websockets-14.1-pp310-pypy310_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:660c308dabd2b380807ab64b62985eaccf923a78ebc572bd485375b9ca2b7dc7"}, - {file = "websockets-14.1-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:5a42d3ecbb2db5080fc578314439b1d79eef71d323dc661aa616fb492436af5d"}, - {file = "websockets-14.1-pp39-pypy39_pp73-macosx_10_15_x86_64.whl", hash = "sha256:ddaa4a390af911da6f680be8be4ff5aaf31c4c834c1a9147bc21cbcbca2d4370"}, - {file = "websockets-14.1-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:a4c805c6034206143fbabd2d259ec5e757f8b29d0a2f0bf3d2fe5d1f60147a4a"}, - {file = "websockets-14.1-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:205f672a6c2c671a86d33f6d47c9b35781a998728d2c7c2a3e1cf3333fcb62b7"}, - {file = "websockets-14.1-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5ef440054124728cc49b01c33469de06755e5a7a4e83ef61934ad95fc327fbb0"}, - {file = "websockets-14.1-pp39-pypy39_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e7591d6f440af7f73c4bd9404f3772bfee064e639d2b6cc8c94076e71b2471c1"}, - {file = "websockets-14.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:25225cc79cfebc95ba1d24cd3ab86aaa35bcd315d12fa4358939bd55e9bd74a5"}, - {file = "websockets-14.1-py3-none-any.whl", hash = "sha256:4d4fc827a20abe6d544a119896f6b78ee13fe81cbfef416f3f2ddf09a03f0e2e"}, - {file = "websockets-14.1.tar.gz", hash = "sha256:398b10c77d471c0aab20a845e7a60076b6390bfdaac7a6d2edb0d2c59d75e8d8"}, + {file = "websockets-14.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:e8179f95323b9ab1c11723e5d91a89403903f7b001828161b480a7810b334885"}, + {file = "websockets-14.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0d8c3e2cdb38f31d8bd7d9d28908005f6fa9def3324edb9bf336d7e4266fd397"}, + {file = "websockets-14.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:714a9b682deb4339d39ffa674f7b674230227d981a37d5d174a4a83e3978a610"}, + {file = "websockets-14.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f2e53c72052f2596fb792a7acd9704cbc549bf70fcde8a99e899311455974ca3"}, + {file = "websockets-14.2-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e3fbd68850c837e57373d95c8fe352203a512b6e49eaae4c2f4088ef8cf21980"}, + {file = "websockets-14.2-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b27ece32f63150c268593d5fdb82819584831a83a3f5809b7521df0685cd5d8"}, + {file = "websockets-14.2-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:4daa0faea5424d8713142b33825fff03c736f781690d90652d2c8b053345b0e7"}, + {file = "websockets-14.2-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:bc63cee8596a6ec84d9753fd0fcfa0452ee12f317afe4beae6b157f0070c6c7f"}, + {file = "websockets-14.2-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:7a570862c325af2111343cc9b0257b7119b904823c675b22d4ac547163088d0d"}, + {file = "websockets-14.2-cp310-cp310-win32.whl", hash = "sha256:75862126b3d2d505e895893e3deac0a9339ce750bd27b4ba515f008b5acf832d"}, + {file = "websockets-14.2-cp310-cp310-win_amd64.whl", hash = "sha256:cc45afb9c9b2dc0852d5c8b5321759cf825f82a31bfaf506b65bf4668c96f8b2"}, + {file = "websockets-14.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:3bdc8c692c866ce5fefcaf07d2b55c91d6922ac397e031ef9b774e5b9ea42166"}, + {file = "websockets-14.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:c93215fac5dadc63e51bcc6dceca72e72267c11def401d6668622b47675b097f"}, + {file = "websockets-14.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:1c9b6535c0e2cf8a6bf938064fb754aaceb1e6a4a51a80d884cd5db569886910"}, + {file = "websockets-14.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0a52a6d7cf6938e04e9dceb949d35fbdf58ac14deea26e685ab6368e73744e4c"}, + {file = "websockets-14.2-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9f05702e93203a6ff5226e21d9b40c037761b2cfb637187c9802c10f58e40473"}, + {file = "websockets-14.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:22441c81a6748a53bfcb98951d58d1af0661ab47a536af08920d129b4d1c3473"}, + {file = "websockets-14.2-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:efd9b868d78b194790e6236d9cbc46d68aba4b75b22497eb4ab64fa640c3af56"}, + {file = "websockets-14.2-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1a5a20d5843886d34ff8c57424cc65a1deda4375729cbca4cb6b3353f3ce4142"}, + {file = "websockets-14.2-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:34277a29f5303d54ec6468fb525d99c99938607bc96b8d72d675dee2b9f5bf1d"}, + {file = "websockets-14.2-cp311-cp311-win32.whl", hash = "sha256:02687db35dbc7d25fd541a602b5f8e451a238ffa033030b172ff86a93cb5dc2a"}, + {file = "websockets-14.2-cp311-cp311-win_amd64.whl", hash = "sha256:862e9967b46c07d4dcd2532e9e8e3c2825e004ffbf91a5ef9dde519ee2effb0b"}, + {file = "websockets-14.2-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:1f20522e624d7ffbdbe259c6b6a65d73c895045f76a93719aa10cd93b3de100c"}, + {file = "websockets-14.2-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:647b573f7d3ada919fd60e64d533409a79dcf1ea21daeb4542d1d996519ca967"}, + {file = "websockets-14.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:6af99a38e49f66be5a64b1e890208ad026cda49355661549c507152113049990"}, + {file = "websockets-14.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:091ab63dfc8cea748cc22c1db2814eadb77ccbf82829bac6b2fbe3401d548eda"}, + {file = "websockets-14.2-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b374e8953ad477d17e4851cdc66d83fdc2db88d9e73abf755c94510ebddceb95"}, + {file = "websockets-14.2-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a39d7eceeea35db85b85e1169011bb4321c32e673920ae9c1b6e0978590012a3"}, + {file = "websockets-14.2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:0a6f3efd47ffd0d12080594f434faf1cd2549b31e54870b8470b28cc1d3817d9"}, + {file = "websockets-14.2-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:065ce275e7c4ffb42cb738dd6b20726ac26ac9ad0a2a48e33ca632351a737267"}, + {file = "websockets-14.2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:e9d0e53530ba7b8b5e389c02282f9d2aa47581514bd6049d3a7cffe1385cf5fe"}, + {file = "websockets-14.2-cp312-cp312-win32.whl", hash = "sha256:20e6dd0984d7ca3037afcb4494e48c74ffb51e8013cac71cf607fffe11df7205"}, + {file = "websockets-14.2-cp312-cp312-win_amd64.whl", hash = "sha256:44bba1a956c2c9d268bdcdf234d5e5ff4c9b6dc3e300545cbe99af59dda9dcce"}, + {file = "websockets-14.2-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:6f1372e511c7409a542291bce92d6c83320e02c9cf392223272287ce55bc224e"}, + {file = "websockets-14.2-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:4da98b72009836179bb596a92297b1a61bb5a830c0e483a7d0766d45070a08ad"}, + {file = "websockets-14.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:f8a86a269759026d2bde227652b87be79f8a734e582debf64c9d302faa1e9f03"}, + {file = "websockets-14.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:86cf1aaeca909bf6815ea714d5c5736c8d6dd3a13770e885aafe062ecbd04f1f"}, + {file = "websockets-14.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a9b0f6c3ba3b1240f602ebb3971d45b02cc12bd1845466dd783496b3b05783a5"}, + {file = "websockets-14.2-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:669c3e101c246aa85bc8534e495952e2ca208bd87994650b90a23d745902db9a"}, + {file = "websockets-14.2-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:eabdb28b972f3729348e632ab08f2a7b616c7e53d5414c12108c29972e655b20"}, + {file = "websockets-14.2-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:2066dc4cbcc19f32c12a5a0e8cc1b7ac734e5b64ac0a325ff8353451c4b15ef2"}, + {file = "websockets-14.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:ab95d357cd471df61873dadf66dd05dd4709cae001dd6342edafc8dc6382f307"}, + {file = "websockets-14.2-cp313-cp313-win32.whl", hash = "sha256:a9e72fb63e5f3feacdcf5b4ff53199ec8c18d66e325c34ee4c551ca748623bbc"}, + {file = "websockets-14.2-cp313-cp313-win_amd64.whl", hash = "sha256:b439ea828c4ba99bb3176dc8d9b933392a2413c0f6b149fdcba48393f573377f"}, + {file = "websockets-14.2-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:7cd5706caec1686c5d233bc76243ff64b1c0dc445339bd538f30547e787c11fe"}, + {file = "websockets-14.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:ec607328ce95a2f12b595f7ae4c5d71bf502212bddcea528290b35c286932b12"}, + {file = "websockets-14.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:da85651270c6bfb630136423037dd4975199e5d4114cae6d3066641adcc9d1c7"}, + {file = "websockets-14.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c3ecadc7ce90accf39903815697917643f5b7cfb73c96702318a096c00aa71f5"}, + {file = "websockets-14.2-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1979bee04af6a78608024bad6dfcc0cc930ce819f9e10342a29a05b5320355d0"}, + {file = "websockets-14.2-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2dddacad58e2614a24938a50b85969d56f88e620e3f897b7d80ac0d8a5800258"}, + {file = "websockets-14.2-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:89a71173caaf75fa71a09a5f614f450ba3ec84ad9fca47cb2422a860676716f0"}, + {file = "websockets-14.2-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:6af6a4b26eea4fc06c6818a6b962a952441e0e39548b44773502761ded8cc1d4"}, + {file = "websockets-14.2-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:80c8efa38957f20bba0117b48737993643204645e9ec45512579132508477cfc"}, + {file = "websockets-14.2-cp39-cp39-win32.whl", hash = "sha256:2e20c5f517e2163d76e2729104abc42639c41cf91f7b1839295be43302713661"}, + {file = "websockets-14.2-cp39-cp39-win_amd64.whl", hash = "sha256:b4c8cef610e8d7c70dea92e62b6814a8cd24fbd01d7103cc89308d2bfe1659ef"}, + {file = "websockets-14.2-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:d7d9cafbccba46e768be8a8ad4635fa3eae1ffac4c6e7cb4eb276ba41297ed29"}, + {file = "websockets-14.2-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:c76193c1c044bd1e9b3316dcc34b174bbf9664598791e6fb606d8d29000e070c"}, + {file = "websockets-14.2-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fd475a974d5352390baf865309fe37dec6831aafc3014ffac1eea99e84e83fc2"}, + {file = "websockets-14.2-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2c6c0097a41968b2e2b54ed3424739aab0b762ca92af2379f152c1aef0187e1c"}, + {file = "websockets-14.2-pp310-pypy310_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6d7ff794c8b36bc402f2e07c0b2ceb4a2424147ed4785ff03e2a7af03711d60a"}, + {file = "websockets-14.2-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:dec254fcabc7bd488dab64846f588fc5b6fe0d78f641180030f8ea27b76d72c3"}, + {file = "websockets-14.2-pp39-pypy39_pp73-macosx_10_15_x86_64.whl", hash = "sha256:bbe03eb853e17fd5b15448328b4ec7fb2407d45fb0245036d06a3af251f8e48f"}, + {file = "websockets-14.2-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:a3c4aa3428b904d5404a0ed85f3644d37e2cb25996b7f096d77caeb0e96a3b42"}, + {file = "websockets-14.2-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:577a4cebf1ceaf0b65ffc42c54856214165fb8ceeba3935852fc33f6b0c55e7f"}, + {file = "websockets-14.2-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ad1c1d02357b7665e700eca43a31d52814ad9ad9b89b58118bdabc365454b574"}, + {file = "websockets-14.2-pp39-pypy39_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f390024a47d904613577df83ba700bd189eedc09c57af0a904e5c39624621270"}, + {file = "websockets-14.2-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:3c1426c021c38cf92b453cdf371228d3430acd775edee6bac5a4d577efc72365"}, + {file = "websockets-14.2-py3-none-any.whl", hash = "sha256:7a6ceec4ea84469f15cf15807a747e9efe57e369c384fa86e022b3bea679b79b"}, + {file = "websockets-14.2.tar.gz", hash = "sha256:5059ed9c54945efb321f097084b4c7e52c246f2c869815876a69d1efc4ad6eb5"}, ] [metadata] diff --git a/src/benchmark/base_charm.py b/src/benchmark/base_charm.py index fdce6e3..5a131f8 100644 --- a/src/benchmark/base_charm.py +++ b/src/benchmark/base_charm.py @@ -16,10 +16,9 @@ import logging import subprocess -from abc import ABC, abstractmethod from typing import Any -import ops +from charms.data_platform_libs.v0.data_models import TypedCharmBase from charms.grafana_agent.v0.cos_agent import COSAgentProvider from ops.charm import CharmEvents from ops.framework import EventBase, EventSource @@ -27,15 +26,16 @@ from benchmark.core.models import DPBenchmarkLifecycleState from benchmark.core.pebble_workload_base import DPBenchmarkPebbleWorkloadBase +from benchmark.core.structured_config import BenchmarkCharmConfig from benchmark.core.systemd_workload_base import DPBenchmarkSystemdWorkloadBase from benchmark.core.workload_base import WorkloadBase +from benchmark.events.actions import ActionsHandler from benchmark.events.db import DatabaseRelationHandler from benchmark.events.peer import PeerRelationHandler from benchmark.literals import ( COS_AGENT_RELATION, METRICS_PORT, PEER_RELATION, - DPBenchmarkLifecycleTransition, DPBenchmarkMissingOptionsError, ) from benchmark.managers.config import ConfigManager @@ -70,34 +70,22 @@ def workload_build(workload_params_template: str) -> WorkloadBase: return DPBenchmarkSystemdWorkloadBase(workload_params_template) -class DPBenchmarkCharmBase(ops.CharmBase, ABC): +class DPBenchmarkCharmBase(TypedCharmBase[BenchmarkCharmConfig]): """The base benchmark class.""" - on = DPBenchmarkEvents() # pyright: ignore [reportGeneralTypeIssues] + on = DPBenchmarkEvents() # pyright: ignore [reportAssignmentType] RESOURCE_DEB_NAME = "benchmark-deb" workload_params_template = "" + config_type = BenchmarkCharmConfig + def __init__(self, *args, db_relation_name: str, workload: WorkloadBase | None = None): super().__init__(*args) self.framework.observe(self.on.install, self._on_install) self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.update_status, self._on_update_status) - self.framework.observe(self.on.prepare_action, self.on_prepare_action) - self.framework.observe(self.on.run_action, self.on_run_action) - self.framework.observe(self.on.stop_action, self.on_stop_action) - self.framework.observe(self.on.cleanup_action, self.on_clean_action) - - self.framework.observe( - self.on.check_upload, - self._on_check_upload, - ) - self.framework.observe( - self.on.check_collect, - self._on_check_collect, - ) - self.database = DatabaseRelationHandler(self, db_relation_name) self.peers = PeerRelationHandler(self, PEER_RELATION) self.framework.observe(self.database.on.db_config_update, self._on_config_changed) @@ -119,8 +107,8 @@ def __init__(self, *args, db_relation_name: str, workload: WorkloadBase | None = self.config_manager = ConfigManager( workload=self.workload, - database=self.database.state, - peer=self.peers.peers(), + database_state=self.database.state, + peers=self.peers.peers(), config=self.config, labels=self.labels, ) @@ -129,11 +117,7 @@ def __init__(self, *args, db_relation_name: str, workload: WorkloadBase | None = self.peers.this_unit(), self.config_manager, ) - - @abstractmethod - def supported_workloads(self) -> list[str]: - """List of supported workloads.""" - ... + self.actions = ActionsHandler(self) ########################################################################### # @@ -146,28 +130,6 @@ def _on_install(self, event: EventBase) -> None: self.workload.install() self.peers.state.lifecycle = DPBenchmarkLifecycleState.UNSET - def _on_check_collect(self, event: EventBase) -> None: - """Check if the upload is finished.""" - if self.config_manager.is_collecting(): - # Nothing to do, upload is still in progress - event.defer() - return - - if self.unit.is_leader(): - self.peers.state.set(DPBenchmarkLifecycleState.UPLOADING) - # Raise we are running an upload and we will check the status later - self.on.check_upload.emit() - return - self.peers.state.set(DPBenchmarkLifecycleState.FINISHED) - - def _on_check_upload(self, event: EventBase) -> None: - """Check if the upload is finished.""" - if self.config_manager.is_uploading(): - # Nothing to do, upload is still in progress - event.defer() - return - self.peers.state.lifecycle = DPBenchmarkLifecycleState.FINISHED - def _on_update_status(self, event: EventBase | None = None) -> None: """Set status for the operator and finishes the service. @@ -176,7 +138,7 @@ def _on_update_status(self, event: EventBase | None = None) -> None: benchmark service and the benchmark status. """ try: - status = self.database.state.get() + status = self.database.state.model() except DPBenchmarkMissingOptionsError as e: self.unit.status = BlockedStatus(str(e)) return @@ -184,26 +146,12 @@ def _on_update_status(self, event: EventBase | None = None) -> None: self.unit.status = BlockedStatus("No database relation available") return - # We need to narrow the options of workload_name to the supported ones - if self.config.get("workload_name") not in self.supported_workloads(): - self.unit.status = BlockedStatus( - f"Unsupported workload: {self.config.get('workload_name')}" - ) - return - # Now, let's check if we need to update our lifecycle position - self._update_state() + self.update_state() self.unit.status = self.lifecycle.status def _on_config_changed(self, event: EventBase) -> None: """Config changed event.""" - # We need to narrow the options of workload_name to the supported ones - if self.config.get("workload_name") not in self.supported_workloads(): - self.unit.status = BlockedStatus( - f"Unsupported workload: {self.config.get('workload_name')}" - ) - return - if not self.config_manager.is_prepared(): # nothing to do: set the status and leave self._on_update_status() @@ -228,88 +176,6 @@ def scrape_config(self) -> list[dict[str, Any]]: } ] - ########################################################################### - # - # Action and Lifecycle Handlers - # - ########################################################################### - - def _preflight_checks(self) -> bool: - """Check if we have the necessary relations.""" - if len(self.peers.units()) > 0 and not bool(self.peers.state.get()): - return False - try: - return bool(self.database.state.get()) - except DPBenchmarkMissingOptionsError: - return False - - def on_prepare_action(self, event: EventBase) -> None: - """Process the prepare action.""" - if not self._preflight_checks(): - event.fail("Missing DB or S3 relations") - return - - if not (state := self.lifecycle.next(DPBenchmarkLifecycleTransition.PREPARE)): - event.fail("Failed to prepare the benchmark: already done") - return - - if state != DPBenchmarkLifecycleState.PREPARING: - event.fail( - "Another peer is already in prepare state. Wait or call clean action to reset." - ) - return - - # We process the special case of PREPARE, as explained in lifecycle.make_transition() - if not self.config_manager.prepare(): - event.fail("Failed to prepare the benchmark") - return - - self.lifecycle.make_transition(state) - self.unit.status = self.lifecycle.status - event.set_results({"message": "Benchmark is being prepared"}) - - def on_run_action(self, event: EventBase) -> None: - """Process the run action.""" - if not self._preflight_checks(): - event.fail("Missing DB or S3 relations") - return - - if not self._process_action_transition(DPBenchmarkLifecycleTransition.RUN): - event.fail("Failed to run the benchmark") - event.set_results({"message": "Benchmark has started"}) - - def on_stop_action(self, event: EventBase) -> None: - """Process the stop action.""" - if not self._preflight_checks(): - event.fail("Missing DB or S3 relations") - return - - if not self._process_action_transition(DPBenchmarkLifecycleTransition.STOP): - event.fail("Failed to stop the benchmark") - event.set_results({"message": "Benchmark has stopped"}) - - def on_clean_action(self, event: EventBase) -> None: - """Process the clean action.""" - if not self._preflight_checks(): - event.fail("Missing DB or S3 relations") - return - - if not self._process_action_transition(DPBenchmarkLifecycleTransition.CLEAN): - event.fail("Failed to clean the benchmark") - event.set_results({"message": "Benchmark is cleaning"}) - - def _process_action_transition(self, transition: DPBenchmarkLifecycleTransition) -> bool: - """Process the action.""" - # First, check if we have an update in our lifecycle state - self._update_state() - - if not (state := self.lifecycle.next(transition)): - return False - - self.lifecycle.make_transition(state) - self.unit.status = self.lifecycle.status - return True - ########################################################################### # # Helpers @@ -318,9 +184,14 @@ def _process_action_transition(self, transition: DPBenchmarkLifecycleTransition) def _unit_ip(self) -> str: """Current unit ip.""" - return self.model.get_binding(PEER_RELATION).network.bind_address + bind_address = None + if PEER_RELATION: + if binding := self.model.get_binding(PEER_RELATION): + bind_address = binding.network.bind_address + + return str(bind_address) if bind_address else "" - def _update_state(self) -> None: + def update_state(self) -> None: """Update the state of the charm.""" if (next_state := self.lifecycle.next(None)) and self.lifecycle.current() != next_state: self.lifecycle.make_transition(next_state) diff --git a/src/benchmark/core/models.py b/src/benchmark/core/models.py index 479b983..3f24a1b 100644 --- a/src/benchmark/core/models.py +++ b/src/benchmark/core/models.py @@ -9,7 +9,7 @@ """ import logging -from typing import Any, Optional +from typing import Any, MutableMapping, Optional from ops.model import Application, Relation, Unit from overrides import override @@ -17,7 +17,6 @@ from benchmark.literals import ( LIFECYCLE_KEY, - STOP_KEY, DPBenchmarkLifecycleState, DPBenchmarkMissingOptionsError, Scope, @@ -106,7 +105,6 @@ class DPBenchmarkWrapperOptionsModel(BaseModel): workload_name: str db_info: DPBenchmarkBaseDatabaseModel report_interval: int - workload_profile: str labels: str peers: str | None = None @@ -125,20 +123,18 @@ def __init__( self.scope = scope @property - def relation_data(self) -> dict[str, str]: + def relation_data(self) -> MutableMapping[str, str]: """Returns the relation data.""" if self.relation: return self.relation.data[self.component] return {} @property - def remote_data(self) -> dict[str, str]: + def remote_data(self) -> MutableMapping[str, str]: """Returns the remote relation data.""" - if not self.relation: + if not self.relation or self.scope != Scope.APP: return {} - if self.scope == Scope.APP: - return self.relation.data[self.relation.app] - return self.relation.data[self.relation.unit] + return self.relation.data[self.relation.app] def __bool__(self) -> bool: """Boolean evaluation based on the existence of self.relation.""" @@ -191,16 +187,6 @@ def lifecycle(self, status: DPBenchmarkLifecycleState | str) -> None: else: self.set({LIFECYCLE_KEY: status}) - @property - def stop(self) -> bool: - """Returns the value of the stop key.""" - return self.relation_data.get(STOP_KEY, False) - - @stop.setter - def stop(self, switch: bool) -> bool: - """Toggles the stop key value.""" - self.set({STOP_KEY: switch}) - class DatabaseState(RelationState): """State collection for the database relation.""" @@ -236,7 +222,7 @@ def tls_ca(self) -> str | None: return None return tls_ca - def get(self) -> DPBenchmarkBaseDatabaseModel | None: + def model(self) -> DPBenchmarkBaseDatabaseModel | None: """Returns the value of the key.""" if not self.relation or not (endpoints := self.remote_data.get("endpoints")): return None @@ -248,9 +234,9 @@ def get(self) -> DPBenchmarkBaseDatabaseModel | None: return DPBenchmarkBaseDatabaseModel( hosts=endpoints.split(), unix_socket=unix_socket, - username=self.data.get("username"), - password=self.data.get("password"), - db_name=self.remote_data.get(self.database_key), + username=self.data.get("username", ""), + password=self.data.get("password", ""), + db_name=self.remote_data.get(self.database_key, ""), tls=self.tls, tls_ca=self.tls_ca, ) diff --git a/src/benchmark/core/pebble_workload_base.py b/src/benchmark/core/pebble_workload_base.py index 725240f..6dec3d7 100644 --- a/src/benchmark/core/pebble_workload_base.py +++ b/src/benchmark/core/pebble_workload_base.py @@ -20,13 +20,9 @@ class DPBenchmarkPebbleTemplatePaths(WorkloadTemplatePaths): """Represents the benchmark service template paths.""" - def __init__(self): - super().__init__() - self.svc_name = "dpe_benchmark" - @property @override - def service(self) -> str | None: + def service(self) -> str: """The optional path to the service file managing the script.""" return f"/etc/systemd/system/{self.svc_name}.service" @@ -44,6 +40,12 @@ def templates(self) -> str: """The path to the workload template folder.""" return os.path.join(os.environ.get("CHARM_DIR", ""), "templates") + @property + @override + def results(self) -> str: + """The path to the results folder.""" + return "/root/.benchmark/charmed_parameters/results/" + @property @override def service_template(self) -> str: diff --git a/src/benchmark/core/structured_config.py b/src/benchmark/core/structured_config.py new file mode 100644 index 0000000..1a905b0 --- /dev/null +++ b/src/benchmark/core/structured_config.py @@ -0,0 +1,32 @@ +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Structured configuration for the Kafka charm.""" + +import logging + +from charms.data_platform_libs.v0.data_models import BaseConfigModel +from pydantic import Field, validator + +logger = logging.getLogger(__name__) + + +class BenchmarkCharmConfig(BaseConfigModel): + """Manager for the structured configuration.""" + + test_name: str = Field(default="", validate_default=False) + parallel_processes: int = Field(default=1, validate_default=False, ge=1) + threads: int = Field(default=1, validate_default=False, ge=1) + duration: int = Field(default=0, validate_default=False, ge=0) + run_count: int = Field(default=0, validate_default=False, ge=0) + workload_name: str = Field(default="default", validate_default=True) + override_access_hostname: str = Field(default="", validate_default=False) + report_interval: int = Field(default=1, validate_default=False, ge=1) + + @validator("*", pre=True) + @classmethod + def blank_string(cls, value): + """Check for empty strings.""" + if value == "": + return None + return value diff --git a/src/benchmark/core/systemd_workload_base.py b/src/benchmark/core/systemd_workload_base.py index 329aeb2..bce0b14 100644 --- a/src/benchmark/core/systemd_workload_base.py +++ b/src/benchmark/core/systemd_workload_base.py @@ -33,7 +33,7 @@ def __init__(self): @property @override - def service(self) -> str | None: + def service(self) -> str: """The optional path to the service file managing the script.""" return f"/etc/systemd/system/{self.svc_name}.service" @@ -52,6 +52,7 @@ def workload_params(self) -> str: return "/root/.benchmark/charmed_parameters/" + self.svc_name + ".json" @property + @override def results(self) -> str: """The path to the results folder.""" return "/root/.benchmark/charmed_parameters/results/" @@ -98,7 +99,7 @@ def halt(self) -> bool: @override def reload(self) -> bool: """Reloads the script.""" - daemon_reload() + return daemon_reload() @override def read(self, path: str) -> list[str]: @@ -142,7 +143,7 @@ def exec( ) except subprocess.CalledProcessError: return None - return output or "" + return output.stdout.decode() if output.stdout else None @override def is_active(self) -> bool: diff --git a/src/benchmark/core/workload_base.py b/src/benchmark/core/workload_base.py index 30a501a..e1dc679 100644 --- a/src/benchmark/core/workload_base.py +++ b/src/benchmark/core/workload_base.py @@ -22,13 +22,13 @@ def bin(self) -> str: @property @abstractmethod - def service(self) -> str | None: + def service(self) -> str: """The optional path to the service file managing the python wrapper.""" ... @property @abstractmethod - def service_template(self) -> str | None: + def service_template(self) -> str: """The path to the service template file.""" ... diff --git a/src/benchmark/events/actions.py b/src/benchmark/events/actions.py new file mode 100644 index 0000000..c65b77b --- /dev/null +++ b/src/benchmark/events/actions.py @@ -0,0 +1,142 @@ +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. + +"""This module handles the action events.""" + +import logging + +from ops.charm import ActionEvent +from ops.framework import EventBase + +from benchmark.base_charm import DPBenchmarkCharmBase +from benchmark.core.models import DPBenchmarkLifecycleState +from benchmark.literals import ( + DPBenchmarkLifecycleTransition, + DPBenchmarkMissingOptionsError, +) + +logger = logging.getLogger(__name__) + + +class ActionsHandler: + """Handle the actions for the benchmark charm.""" + + def __init__(self, charm: DPBenchmarkCharmBase): + """Initialize the class.""" + self.charm = charm + self.database = charm.database + self.lifecycle = charm.lifecycle + self.framework = charm.framework + self.config_manager = charm.config_manager + self.peers = charm.peers + self.unit = charm.unit + + self.framework.observe(self.charm.on.prepare_action, self.on_prepare_action) + self.framework.observe(self.charm.on.run_action, self.on_run_action) + self.framework.observe(self.charm.on.stop_action, self.on_stop_action) + self.framework.observe(self.charm.on.cleanup_action, self.on_clean_action) + + self.framework.observe( + self.charm.on.check_upload, + self._on_check_upload, + ) + self.framework.observe( + self.charm.on.check_collect, + self._on_check_collect, + ) + + def _on_check_collect(self, event: EventBase) -> None: + """Check if the upload is finished.""" + if self.config_manager.is_collecting(): + # Nothing to do, upload is still in progress + event.defer() + return + + if self.unit.is_leader(): + self.peers.state.lifecycle = DPBenchmarkLifecycleState.UPLOADING + # Raise we are running an upload and we will check the status later + self.charm.on.check_upload.emit() + return + self.peers.state.lifecycle = DPBenchmarkLifecycleState.FINISHED + + def _on_check_upload(self, event: EventBase) -> None: + """Check if the upload is finished.""" + if self.config_manager.is_uploading(): + # Nothing to do, upload is still in progress + event.defer() + return + self.peers.state.lifecycle = DPBenchmarkLifecycleState.FINISHED + + def _preflight_checks(self) -> bool: + """Check if we have the necessary relations.""" + try: + return bool(self.database.state.model()) + except DPBenchmarkMissingOptionsError: + return False + + def on_prepare_action(self, event: ActionEvent) -> None: + """Process the prepare action.""" + if not self._preflight_checks(): + event.fail("Missing DB or S3 relations") + return + + if not (state := self.lifecycle.next(DPBenchmarkLifecycleTransition.PREPARE)): + event.fail("Failed to prepare the benchmark: already done") + return + + if state != DPBenchmarkLifecycleState.PREPARING: + event.fail( + "Another peer is already in prepare state. Wait or call clean action to reset." + ) + return + + # We process the special case of PREPARE, as explained in lifecycle.make_transition() + if not self.config_manager.prepare(): + event.fail("Failed to prepare the benchmark") + return + + self.lifecycle.make_transition(state) + self.unit.status = self.lifecycle.status + event.set_results({"message": "Benchmark is being prepared"}) + + def on_run_action(self, event: ActionEvent) -> None: + """Process the run action.""" + if not self._preflight_checks(): + event.fail("Missing DB or S3 relations") + return + + if not self._process_action_transition(DPBenchmarkLifecycleTransition.RUN): + event.fail("Failed to run the benchmark") + event.set_results({"message": "Benchmark has started"}) + + def on_stop_action(self, event: ActionEvent) -> None: + """Process the stop action.""" + if not self._preflight_checks(): + event.fail("Missing DB or S3 relations") + return + + if not self._process_action_transition(DPBenchmarkLifecycleTransition.STOP): + event.fail("Failed to stop the benchmark") + event.set_results({"message": "Benchmark has stopped"}) + + def on_clean_action(self, event: ActionEvent) -> None: + """Process the clean action.""" + if not self._preflight_checks(): + event.fail("Missing DB or S3 relations") + return + + if not self._process_action_transition(DPBenchmarkLifecycleTransition.CLEAN): + event.fail("Failed to clean the benchmark") + event.set_results({"message": "Benchmark is cleaning"}) + + def _process_action_transition(self, transition: DPBenchmarkLifecycleTransition) -> bool: + """Process the action.""" + # First, check if we have an update in our lifecycle state + self.charm.update_state() + + if not (state := self.lifecycle.next(transition)): + return False + + self.lifecycle.make_transition(state) + self.unit.status = self.lifecycle.status + return True diff --git a/src/benchmark/events/db.py b/src/benchmark/events/db.py index a6d8b7c..8b01106 100644 --- a/src/benchmark/events/db.py +++ b/src/benchmark/events/db.py @@ -9,11 +9,13 @@ """ import logging +from abc import abstractmethod from charms.data_platform_libs.v0.data_interfaces import DatabaseRequires from ops.charm import CharmBase, CharmEvents from ops.framework import EventBase, EventSource +from benchmark.core.models import DatabaseState from benchmark.events.handler import RelationHandler from benchmark.literals import DPBenchmarkMissingOptionsError @@ -37,7 +39,7 @@ class DatabaseRelationHandler(RelationHandler): well as the current relation status. """ - on = DatabaseHandlerEvents() # pyright: ignore [reportGeneralTypeIssues] + on = DatabaseHandlerEvents() # pyright: ignore [reportAssignmentType] def __init__( self, @@ -58,48 +60,10 @@ def __init__( self.charm.on[self.relation_name].relation_broken, self._on_endpoints_changed ) - # @property - # def username(self) -> str|None: - # """Returns the username to connect to the database.""" - # return (self._secret_user or {}).get("username") - - # @property - # def password(self) -> str|None: - # """Returns the password to connect to the database.""" - # return (self._secret_user or {}).get("password") - - # @property - # def tls(self) -> str|None: - # """Returns the TLS to connect to the database.""" - # tls = (self._secret_tls or {}).get("tls") - # if not tls or tls == "disabled": - # return None - # return tls - - # @property - # def tls_ca(self) -> str|None: - # """Returns the TLS CA to connect to the database.""" - # tls_ca = (self._secret_user or {}).get("tls_ca") - # if not tls_ca or tls_ca == "disabled": - # return None - # return tls_ca - - # @property - # def _secret_user(self) -> dict[str, str]|None: - # if not (secret_id := self.client.fetch_relation_data()[self.relation.id].get("secret-user")): - # return None - # return self.charm.framework.model.get_secret(id=secret_id).get_content() - - # @property - # def _secret_tls(self) -> dict[str, str]|None: - # if not (secret_id := self.client.fetch_relation_data()[self.relation.id].get("secret-tls")): - # return None - # return self.charm.framework.model.get_secret(id=secret_id).get_content() - - def _on_endpoints_changed(self, event: EventBase) -> None: + def _on_endpoints_changed(self, _: EventBase) -> None: """Handles the endpoints_changed event.""" try: - if self.state.get(): + if self.state.model(): self.on.db_config_update.emit() except DPBenchmarkMissingOptionsError as e: logger.warning(f"Missing options: {e}") @@ -109,3 +73,9 @@ def _on_endpoints_changed(self, event: EventBase) -> None: def client(self) -> DatabaseRequires: """Returns the data_interfaces client corresponding to the database.""" ... + + @property + @abstractmethod + def state(self) -> DatabaseState: + """Returns the state of the database.""" + ... diff --git a/src/benchmark/literals.py b/src/benchmark/literals.py index fa83cd1..7952a34 100644 --- a/src/benchmark/literals.py +++ b/src/benchmark/literals.py @@ -10,7 +10,6 @@ # Peer relation keys LIFECYCLE_KEY = "lifecycle" -STOP_KEY = "stop" class Substrate(str, Enum): diff --git a/src/benchmark/managers/collector.py b/src/benchmark/managers/collector.py deleted file mode 100644 index 94ebd92..0000000 --- a/src/benchmark/managers/collector.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2024 Canonical Ltd. -# See LICENSE file for licensing details. - -"""The collector class. - -This class runs all the collection tasks for a given result. -""" - -from benchmark.core.models import SosreportCLIArgsModel -from benchmark.core.workload_base import WorkloadBase - - -class CollectorManager: - """The collector manager class.""" - - def __init__( - self, - workload: WorkloadBase, - sosreport_config: SosreportCLIArgsModel | None = None, - ): - # TODO: we need a way to run "sos collect" - # For that, we will have to manage ssh keys between the peers - # E.G.: - # sudo sos collect \ - # -i ~/.local/share/juju/ssh/juju_id_rsa --ssh-user ubuntu --no-local \ - # --nodes "$NODES" \ - # --only-plugins systemd,logs,juju \ - # -k logs.all_logs=true \ - # --batch \ - # --clean \ - # --tmp-dir=/tmp/sos \ - # -z gzip -j 1 - self.workload = workload - if not sosreport_config: - if workload.is_running_on_k8s(): - self.sosreport_config = SosreportCLIArgsModel( - plugins=["systemd", "logs", "juju"], - ) - else: - self.sosreport_config = SosreportCLIArgsModel( - plugins=["logs", "juju"], - ) - self.sosreport_config = sosreport_config - - def install(self) -> bool: - """Installs the collector.""" - ... - - def collect_sosreport(self) -> bool: - """Collect the sosreport.""" - self.workload.exec( - command=["sosreport"] + str(self.sosreport_config).split(), - ) diff --git a/src/benchmark/managers/config.py b/src/benchmark/managers/config.py index 399b332..7387915 100644 --- a/src/benchmark/managers/config.py +++ b/src/benchmark/managers/config.py @@ -19,6 +19,7 @@ DatabaseState, DPBenchmarkWrapperOptionsModel, ) +from benchmark.core.structured_config import BenchmarkCharmConfig from benchmark.core.workload_base import WorkloadBase from benchmark.literals import DPBenchmarkLifecycleTransition @@ -34,7 +35,7 @@ def __init__( workload: WorkloadBase, database_state: DatabaseState, peers: list[str], - config: dict[str, Any], + config: BenchmarkCharmConfig, labels: str, ): self.workload = workload @@ -61,7 +62,7 @@ def is_cleaned(self) -> bool: @property def _test_name(self) -> str: """Return the test name.""" - return self.config.get("test_name") or "dpe-benchmark" + return self.config.test_name or "dpe-benchmark" @property def test_name(self) -> str: @@ -76,20 +77,19 @@ def get_execution_options( Raises: DPBenchmarkMissingOptionsError: If the database is not ready. """ - if not (db := self.database_state.get()): + if not (db := self.database_state.model()): # It means we are not yet ready. Return None # This check also serves to ensure we have only one valid relation at the time return None return DPBenchmarkWrapperOptionsModel( test_name=self.test_name, - parallel_processes=self.config.get("parallel_processes"), - threads=self.config.get("threads"), - duration=self.config.get("duration"), - run_count=self.config.get("run_count"), + parallel_processes=self.config.parallel_processes, + threads=self.config.threads, + duration=self.config.duration, + run_count=self.config.run_count, db_info=db, - workload_name=self.config.get("workload_name"), - report_interval=self.config.get("report_interval"), - workload_profile=self.config.get("workload_profile"), + workload_name=self.config.workload_name, + report_interval=self.config.report_interval, labels=self.labels, peers=",".join(self.peers), ) @@ -174,7 +174,7 @@ def is_failed( def _render_params( self, - dst_path: str | None = None, + dst_path: str, ) -> str | None: """Render the workload parameters.""" return self._render( @@ -190,7 +190,9 @@ def _render_service( dst_path: str | None = None, ) -> str | None: """Render the workload parameters.""" - values = self.get_execution_options().dict() | { + if not (options := self.get_execution_options()): + return None + values = options.dict() | { "charm_root": os.environ.get("CHARM_DIR", ""), "command": transition.value, } @@ -216,7 +218,9 @@ def _check( "command": transition.value, "target_hosts": values.db_info.hosts, } - compare_svc = "\n".join(self.workload.read(self.workload.paths.service)) == self._render( + compare_svc = "\n".join( + self.workload.read(self.workload.paths.service) or "" + ) == self._render( values=values, template_file=self.workload.paths.service_template, template_content=None, @@ -239,7 +243,7 @@ def _render( template_file: str | None, template_content: str | None, dst_filepath: str | None = None, - ) -> str: + ) -> str | None: """Renders from a file or an string content and return final rendered value.""" try: if template_file: @@ -247,7 +251,7 @@ def _render( template = template_env.get_template(template_file) else: template_env = Environment( - loader=DictLoader({"workload_params": template_content}) + loader=DictLoader({"workload_params": template_content or ""}) ) template = template_env.get_template("workload_params") content = template.render(values) diff --git a/src/benchmark/managers/lifecycle.py b/src/benchmark/managers/lifecycle.py index 7869807..785d4ac 100644 --- a/src/benchmark/managers/lifecycle.py +++ b/src/benchmark/managers/lifecycle.py @@ -3,6 +3,9 @@ """The lifecycle manager class.""" +from abc import ABC, abstractmethod +from typing import Optional + from ops.model import ( ActiveStatus, BlockedStatus, @@ -26,7 +29,7 @@ class LifecycleManager: def __init__( self, peers: dict[Unit, PeerState], - this_unit: PeerState, + this_unit: Unit, config_manager: ConfigManager, ): self.peers = peers @@ -101,127 +104,16 @@ def make_transition(self, new_state: DPBenchmarkLifecycleState) -> bool: # noqa self.peers[self.this_unit].lifecycle = new_state.value return True - def next( # noqa: C901 + def next( self, transition: DPBenchmarkLifecycleTransition | None = None ) -> DPBenchmarkLifecycleState | None: """Return the next lifecycle state.""" - # Changes that takes us to UNSET: - if transition == DPBenchmarkLifecycleTransition.CLEAN: - # Simplest case, we return to unset - return DPBenchmarkLifecycleState.UNSET - - # Changes that takes us to STOPPED: - # Either we received a stop transition - if transition == DPBenchmarkLifecycleTransition.STOP: - return DPBenchmarkLifecycleState.STOPPED - # OR one of our peers is in stopped state - if ( - self._compare_lifecycle_states( - self._peers_state(), - DPBenchmarkLifecycleState.STOPPED, - ) - == 0 - ): - return DPBenchmarkLifecycleState.STOPPED - - # FAILED takes precedence over all other states - # Changes that takes us to FAILED: - # Workload has failed and we were: - # - PREPARING - # - RUNNING - # - COLLECTING - # - UPLOADING - if ( - self.current() - in [ - DPBenchmarkLifecycleState.PREPARING, - DPBenchmarkLifecycleState.RUNNING, - DPBenchmarkLifecycleState.COLLECTING, - DPBenchmarkLifecycleState.UPLOADING, - ] - and self.config_manager.workload.is_failed() - ): - return DPBenchmarkLifecycleState.FAILED - - # Changes that takes us to PREPARING: - # We received a prepare signal and no one else is available yet or we failed previously - if transition == DPBenchmarkLifecycleTransition.PREPARE and self._peers_state() in [ - DPBenchmarkLifecycleState.UNSET, - DPBenchmarkLifecycleState.FAILED, - ]: - return DPBenchmarkLifecycleState.PREPARING - elif transition == DPBenchmarkLifecycleTransition.PREPARE: - # Failed to calculate a proper state as we have neighbors in more advanced state for now - return None - - # Changes that takes us to AVAILABLE: - # Either we were in preparing and we are finished - if ( - self.current() == DPBenchmarkLifecycleState.PREPARING - and self.config_manager.is_prepared() - ): - return DPBenchmarkLifecycleState.AVAILABLE - # OR highest peers state is AVAILABLE but no actions has happened - if ( - transition is None - and self._compare_lifecycle_states( - self._peers_state(), - DPBenchmarkLifecycleState.AVAILABLE, - ) - == 0 - ): - return DPBenchmarkLifecycleState.AVAILABLE - - # Changes that takes us to RUNNING: - # Either we receive a transition to running and we were in one of: - # - AVAILABLE - # - FAILED - # - STOPPED - # - FINISHED - if transition == DPBenchmarkLifecycleTransition.RUN and self.current() in [ - DPBenchmarkLifecycleState.AVAILABLE, - DPBenchmarkLifecycleState.FAILED, - DPBenchmarkLifecycleState.STOPPED, - DPBenchmarkLifecycleState.FINISHED, - ]: - return DPBenchmarkLifecycleState.RUNNING - # OR any other peer is beyond the >=RUNNING state - # and we are still AVAILABLE. - if self._compare_lifecycle_states( - self._peers_state(), - DPBenchmarkLifecycleState.RUNNING, - ) == 0 and self.current() in [ - DPBenchmarkLifecycleState.UNSET, - DPBenchmarkLifecycleState.AVAILABLE, - ]: - return DPBenchmarkLifecycleState.RUNNING - - # Changes that takes us to COLLECTING: - # the workload is in collecting state - if self.config_manager.is_collecting(): - return DPBenchmarkLifecycleState.COLLECTING - - # Changes that takes us to UPLOADING: - # the workload is in uploading state - if self.config_manager.is_uploading(): - return DPBenchmarkLifecycleState.UPLOADING - - # Changes that takes us to FINISHED: - # Workload has finished and we were in one of: - # - RUNNING - # - UPLOADING - if ( - self.current() - in [ - DPBenchmarkLifecycleState.RUNNING, - DPBenchmarkLifecycleState.UPLOADING, - ] - and self.config_manager.workload.is_halted() - ): - return DPBenchmarkLifecycleState.FINISHED - - # We are in an incongruent state OR the transition does not make sense - return None + lifecycle_state = _lifecycle_build( + self, + self.current(), + ) + result = lifecycle_state.next(transition) + return result.state if result else None def _peers_state(self) -> DPBenchmarkLifecycleState | None: next_state = self.peers[self.this_unit].lifecycle @@ -229,10 +121,20 @@ def _peers_state(self) -> DPBenchmarkLifecycleState | None: neighbor = self.peers[unit].lifecycle if neighbor is None: continue - elif self._compare_lifecycle_states(neighbor, next_state) > 0: + elif not next_state or self._compare_lifecycle_states(neighbor, next_state) > 0: next_state = neighbor return next_state or DPBenchmarkLifecycleState.UNSET + def check_all_peers_in_state(self, state: DPBenchmarkLifecycleState) -> bool: + """Check if the unit can run the workload. + + That happens if all the peers are set as state value. + """ + for unit in self.peers.keys(): + if state != self.peers[unit].lifecycle: + return False + return True + @property def status(self) -> StatusBase: """Return the status of the benchmark.""" @@ -289,3 +191,222 @@ def _get_value(phase: DPBenchmarkLifecycleState) -> int: # noqa: C901 return 8 return _get_value(neighbor) - _get_value(this) + + +class _LifecycleState(ABC): + """The lifecycle state represents a single state and encapsulates the transition logic.""" + + state: DPBenchmarkLifecycleState + + def __init__(self, manager: LifecycleManager): + self.manager = manager + + @abstractmethod + def next( + self, transition: Optional[DPBenchmarkLifecycleTransition] = None + ) -> Optional["_LifecycleState"]: + """Returns the next state given a transition request.""" + ... + + +class _StoppedLifecycleState(_LifecycleState): + """The stopped lifecycle state.""" + + state = DPBenchmarkLifecycleState.STOPPED + + def next( + self, transition: DPBenchmarkLifecycleTransition | None = None + ) -> Optional["_LifecycleState"]: + if transition == DPBenchmarkLifecycleTransition.CLEAN: + return _UnsetLifecycleState(self.manager) + + if self.manager.config_manager.is_running(): + return _RunningLifecycleState(self.manager) + + if transition == DPBenchmarkLifecycleTransition.RUN: + return _RunningLifecycleState(self.manager) + + if self.manager.config_manager.is_failed(): + return _FailedLifecycleState(self.manager) + + return None + + +class _FailedLifecycleState(_LifecycleState): + """The failed lifecycle state.""" + + state = DPBenchmarkLifecycleState.FAILED + + def next( + self, transition: DPBenchmarkLifecycleTransition | None = None + ) -> Optional["_LifecycleState"]: + if transition == DPBenchmarkLifecycleTransition.CLEAN: + return _UnsetLifecycleState(self.manager) + + if self.manager.config_manager.is_running(): + return _RunningLifecycleState(self.manager) + + if transition == DPBenchmarkLifecycleTransition.RUN: + return _RunningLifecycleState(self.manager) + + return None + + +class _FinishedLifecycleState(_LifecycleState): + """The finished lifecycle state.""" + + state = DPBenchmarkLifecycleState.FINISHED + + def next( + self, transition: DPBenchmarkLifecycleTransition | None = None + ) -> Optional["_LifecycleState"]: + if transition == DPBenchmarkLifecycleTransition.CLEAN: + return _UnsetLifecycleState(self.manager) + + if transition == DPBenchmarkLifecycleTransition.STOP: + return _StoppedLifecycleState(self.manager) + + if self.manager.config_manager.is_running(): + return _RunningLifecycleState(self.manager) + + if transition == DPBenchmarkLifecycleTransition.RUN: + return _RunningLifecycleState(self.manager) + + if self.manager.config_manager.is_failed(): + return _FailedLifecycleState(self.manager) + + return None + + +class _RunningLifecycleState(_LifecycleState): + """The running lifecycle state.""" + + state = DPBenchmarkLifecycleState.RUNNING + + def next( + self, transition: DPBenchmarkLifecycleTransition | None = None + ) -> Optional["_LifecycleState"]: + if transition == DPBenchmarkLifecycleTransition.CLEAN: + return _UnsetLifecycleState(self.manager) + + if transition == DPBenchmarkLifecycleTransition.STOP: + return _StoppedLifecycleState(self.manager) + + if (peer_state := self.manager._peers_state()) and ( + self.manager._compare_lifecycle_states( + peer_state, + DPBenchmarkLifecycleState.STOPPED, + ) + == 0 + ): + return _StoppedLifecycleState(self.manager) + + if self.manager.config_manager.is_failed(): + return _FailedLifecycleState(self.manager) + + if not self.manager.config_manager.is_running(): + # TODO: Collect state should be implemented here instead + return _FinishedLifecycleState(self.manager) + + return None + + +class _AvailableLifecycleState(_LifecycleState): + """The available lifecycle state.""" + + state = DPBenchmarkLifecycleState.AVAILABLE + + def next( + self, transition: DPBenchmarkLifecycleTransition | None = None + ) -> Optional["_LifecycleState"]: + if transition == DPBenchmarkLifecycleTransition.CLEAN: + return _UnsetLifecycleState(self.manager) + + if transition == DPBenchmarkLifecycleTransition.RUN: + return _RunningLifecycleState(self.manager) + + if (peer_state := self.manager._peers_state()) and ( + self.manager._compare_lifecycle_states( + peer_state, + DPBenchmarkLifecycleState.RUNNING, + ) + == 0 + ): + return _RunningLifecycleState(self.manager) + + return None + + +class _PreparingLifecycleState(_LifecycleState): + """The preparing lifecycle state.""" + + state = DPBenchmarkLifecycleState.PREPARING + + def next( + self, transition: DPBenchmarkLifecycleTransition | None = None + ) -> Optional["_LifecycleState"]: + if transition == DPBenchmarkLifecycleTransition.CLEAN: + return _UnsetLifecycleState(self.manager) + + if self.manager.config_manager.is_failed(): + return _FailedLifecycleState(self.manager) + + if self.manager.config_manager.is_prepared(): + return _AvailableLifecycleState(self.manager) + + return None + + +class _UnsetLifecycleState(_LifecycleState): + """The unset lifecycle state.""" + + state = DPBenchmarkLifecycleState.UNSET + + def next( + self, transition: DPBenchmarkLifecycleTransition | None = None + ) -> Optional["_LifecycleState"]: + if transition == DPBenchmarkLifecycleTransition.PREPARE: + return _PreparingLifecycleState(self.manager) + + if (peer_state := self.manager._peers_state()) and ( + self.manager._compare_lifecycle_states( + peer_state, + DPBenchmarkLifecycleState.AVAILABLE, + ) + == 0 + ): + return _AvailableLifecycleState(self.manager) + + if (peer_state := self.manager._peers_state()) and ( + self.manager._compare_lifecycle_states( + peer_state, + DPBenchmarkLifecycleState.RUNNING, + ) + == 0 + ): + return _RunningLifecycleState(self.manager) + + return None + + +def _lifecycle_build( + manager: LifecycleManager, state: DPBenchmarkLifecycleState +) -> _LifecycleState: + """Build the lifecycle state.""" + match state: + case DPBenchmarkLifecycleState.UNSET: + return _UnsetLifecycleState(manager) + case DPBenchmarkLifecycleState.PREPARING: + return _PreparingLifecycleState(manager) + case DPBenchmarkLifecycleState.AVAILABLE: + return _AvailableLifecycleState(manager) + case DPBenchmarkLifecycleState.RUNNING: + return _RunningLifecycleState(manager) + case DPBenchmarkLifecycleState.FAILED: + return _FailedLifecycleState(manager) + case DPBenchmarkLifecycleState.FINISHED: + return _FinishedLifecycleState(manager) + case DPBenchmarkLifecycleState.STOPPED: + return _StoppedLifecycleState(manager) + case _: + raise ValueError("Unknown state") diff --git a/src/benchmark/wrapper/core.py b/src/benchmark/wrapper/core.py index d49918c..d967464 100644 --- a/src/benchmark/wrapper/core.py +++ b/src/benchmark/wrapper/core.py @@ -116,18 +116,18 @@ class KafkaBenchmarkSample(BaseModel): class KafkaBenchmarkSampleMatcher(Enum): """Hard-coded regexes to process the benchmark sample.""" - produce_rate: str = r"Pub rate\s+(.*?)\s+msg/s" - produce_throughput: str = r"Pub rate\s+\d+.\d+\s+msg/s\s+/\s+(.*?)\s+MB/s" - produce_error_rate: str = r"Pub err\s+(.*?)\s+err/s" - produce_latency_avg: str = r"Pub Latency \(ms\) avg:\s+(.*?)\s+" + produce_rate = r"Pub rate\s+(.*?)\s+msg/s" + produce_throughput = r"Pub rate\s+\d+.\d+\s+msg/s\s+/\s+(.*?)\s+MB/s" + produce_error_rate = r"Pub err\s+(.*?)\s+err/s" + produce_latency_avg = r"Pub Latency \(ms\) avg:\s+(.*?)\s+" # Match: Pub Latency (ms) avg: 1478.1 - 50%: 1312.6 - 99%: 4981.5 - 99.9%: 5104.7 - Max: 5110.5 # Generates: [('1478.1', '1312.6', '4981.5', '5104.7', '5110.5')] - produce_latency_percentiles: str = r"Pub Latency \(ms\) avg:\s+(.*?)\s+- 50%:\s+(.*?)\s+- 99%:\s+(.*?)\s+- 99.9%:\s+(.*?)\s+- Max:\s+(.*?)\s+" + produce_latency_percentiles = r"Pub Latency \(ms\) avg:\s+(.*?)\s+- 50%:\s+(.*?)\s+- 99%:\s+(.*?)\s+- 99.9%:\s+(.*?)\s+- Max:\s+(.*?)\s+" # Pub Delay Latency (us) avg: 21603452.9 - 50%: 21861759.0 - 99%: 23621631.0 - 99.9%: 24160895.0 - Max: 24163839.0 # Generates: [('21603452.9', '21861759.0', '23621631.0', '24160895.0', '24163839.0')] - produce_latency_delay_percentiles: str = r"Pub Delay Latency \(us\) avg:\s+(.*?)\s+- 50%:\s+(.*?)\s+- 99%:\s+(.*?)\s+- 99.9%:\s+(.*?)\s+- Max:\s+(\d+\.\d+)" + produce_latency_delay_percentiles = r"Pub Delay Latency \(us\) avg:\s+(.*?)\s+- 50%:\s+(.*?)\s+- 99%:\s+(.*?)\s+- 99.9%:\s+(.*?)\s+- Max:\s+(\d+\.\d+)" - consume_rate: str = r"Cons rate\s+(.*?)\s+msg/s" - consume_throughput: str = r"Cons rate\s+\d+.\d+\s+msg/s\s+/\s+(.*?)\s+MB/s" - consume_backlog: str = r"Backlog:\s+(.*?)\s+K" + consume_rate = r"Cons rate\s+(.*?)\s+msg/s" + consume_throughput = r"Cons rate\s+\d+.\d+\s+msg/s\s+/\s+(.*?)\s+MB/s" + consume_backlog = r"Backlog:\s+(.*?)\s+K" diff --git a/src/benchmark/wrapper/main.py b/src/benchmark/wrapper/main.py index 675fbf9..41da76a 100755 --- a/src/benchmark/wrapper/main.py +++ b/src/benchmark/wrapper/main.py @@ -24,6 +24,9 @@ def run(self): """Prepares the workload and runs the benchmark.""" manager, _ = self.mapping.map(self.args.command) + if not manager: + raise ValueError("No manager found for the command") + logging.basicConfig(filename=self.args.log_file, encoding="utf-8", level=logging.INFO) def _exit(*args, **kwargs): diff --git a/src/benchmark/wrapper/process.py b/src/benchmark/wrapper/process.py index 5398158..5945db1 100644 --- a/src/benchmark/wrapper/process.py +++ b/src/benchmark/wrapper/process.py @@ -70,10 +70,12 @@ def start(self): cwd=self.model.cwd, ) # Now, let's make stdout a non-blocking file - os.set_blocking(self._proc.stdout.fileno(), False) + if self._proc: + if self._proc.stdout: + os.set_blocking(self._proc.stdout.fileno(), False) - self.model.pid = self._proc.pid - self.model.status = ProcessStatus.RUNNING + self.model.pid = self._proc.pid + self.model.status = ProcessStatus.RUNNING def status(self) -> ProcessStatus: """Return the status of the process.""" @@ -86,7 +88,9 @@ def status(self) -> ProcessStatus: stat = ProcessStatus.RUNNING elif self._proc.returncode != 0: stat = ProcessStatus.ERROR - self.model.status = stat + + if self.model: + self.model.status = stat return stat async def process( @@ -104,7 +108,7 @@ async def process( or (self.status() == ProcessStatus.RUNNING and self.args.duration == 0) ): to_wait = True - if self._proc: + if self._proc and self._proc.stdout: for line in iter(self._proc.stdout.readline, ""): if output := self.process_line(line): self.metrics.add(output) @@ -140,7 +144,8 @@ def stop(self): self._proc.kill() except Exception as e: logger.warning(f"Error stopping worker: {e}") - self.model.status = ProcessStatus.STOPPED + if self.model: + self.model.status = ProcessStatus.STOPPED @abstractmethod def process_line(self, line: str) -> BaseModel | None: @@ -202,11 +207,15 @@ def __init__(self, args: WorkloadCLIArgsModel, metrics: BenchmarkMetrics): self.manager = None self.metrics = metrics - def status(self) -> ProcessStatus: + def status(self) -> ProcessStatus | None: """Return the status of the benchmark.""" - return self.manager.status() + if self.manager: + return self.manager.status() + return None - def map(self, cmd: BenchmarkCommand) -> tuple[BenchmarkManager, list[BenchmarkProcess]]: + def map( + self, cmd: BenchmarkCommand + ) -> tuple[BenchmarkManager | None, list[BenchmarkProcess] | None]: """Processes high-level arguments into the benchmark manager and workers. Returns all the processes that will be running the benchmark. diff --git a/src/charm.py b/src/charm.py index 5ca2913..40c5be1 100755 --- a/src/charm.py +++ b/src/charm.py @@ -33,9 +33,10 @@ from benchmark.core.models import ( DatabaseState, DPBenchmarkBaseDatabaseModel, - RelationState, ) +from benchmark.core.structured_config import BenchmarkCharmConfig from benchmark.core.workload_base import WorkloadBase +from benchmark.events.actions import ActionsHandler from benchmark.events.db import DatabaseRelationHandler from benchmark.events.peer import PeerRelationHandler from benchmark.literals import ( @@ -45,6 +46,7 @@ from benchmark.managers.config import ConfigManager from benchmark.managers.lifecycle import LifecycleManager from literals import CLIENT_RELATION_NAME, TOPIC_NAME +from models import KafkaBenchmarkCharmConfig # Log messages can be retrieved using juju debug-log logger = logging.getLogger(__name__) @@ -123,12 +125,13 @@ def __init__( ) self.database_key = "topic" - def get(self) -> DPBenchmarkBaseDatabaseModel | None: - """Returns the value of the key.""" + @override + def model(self) -> DPBenchmarkBaseDatabaseModel | None: + """Returns the database model.""" if not self.relation or not (endpoints := self.remote_data.get("endpoints")): return None - - dbmodel = super().get() + if not (dbmodel := super().model()): + return None return DPBenchmarkBaseDatabaseModel( hosts=endpoints.split(","), unix_socket=dbmodel.unix_socket, @@ -168,7 +171,7 @@ def __init__( @property @override - def state(self) -> RelationState: + def state(self) -> KafkaDatabaseState: """Returns the state of the database.""" if not ( self.relation and self.client and self.relation.id in self.client.fetch_relation_data() @@ -192,11 +195,13 @@ def client(self) -> Any: """Returns the data_interfaces client corresponding to the database.""" return self._internal_client - def bootstrap_servers(self) -> str | None: + def bootstrap_servers(self) -> list[str] | None: """Return the bootstrap servers.""" - return self.state.get().hosts + if not self.state or not (model := self.state.model()): + return None + return model.hosts - def tls(self) -> tuple[str, str] | None: + def tls(self) -> tuple[str | None, str | None]: """Return the TLS certificates.""" if not self.state.tls_ca: return self.state.tls, None @@ -224,7 +229,7 @@ def __init__( workload: WorkloadBase, database_state: DatabaseState, peers: list[str], - config: dict[str, Any], + config: BenchmarkCharmConfig, labels: str, ): super().__init__(workload, database_state, peers, config, labels) @@ -237,7 +242,9 @@ def _render_service( dst_path: str | None = None, ) -> str | None: """Render the workload parameters.""" - values = self.get_execution_options().dict() | { + if not (options := self.get_execution_options()): + return None + values = options.dict() | { "charm_root": os.environ.get("CHARM_DIR", ""), "command": transition.value, } @@ -282,18 +289,19 @@ def _check( def get_worker_params(self) -> dict[str, Any]: """Return the workload parameters.""" - db = self.database.state.get() + if not (db := self.database_state.model()): + return {} return { - "total_number_of_brokers": len(self.peer.units()) + 1, + "total_number_of_brokers": len(self.peers) + 1, # We cannot have quotes nor brackets in this string. # Therefore, we render the entire line instead "list_of_brokers_bootstrap": "bootstrap.servers={}".format( - ",".join(self.database.bootstrap_servers()) + ",".join(db.hosts) if db.hosts else "" ), "username": db.username, "password": db.password, - "threads": self.config.get("threads", 1) if self.config.get("threads") > 0 else 1, + "threads": self.config.threads if self.config.threads > 0 else 1, } def _render_worker_params( @@ -312,9 +320,9 @@ def _render_worker_params( def get_workload_params(self) -> dict[str, Any]: """Return the worker parameters.""" return { - "partitionsPerTopic": self.config.get("parallel_processes"), - "duration": int(self.config.get("duration") / 60) - if self.config.get("duration") > 0 + "partitionsPerTopic": self.config.parallel_processes, + "duration": int(self.config.duration / 60) + if self.config.duration > 0 else TEN_YEARS_IN_MINUTES, "charm_root": os.environ.get("CHARM_DIR", ""), } @@ -322,7 +330,7 @@ def get_workload_params(self) -> dict[str, Any]: @override def _render_params( self, - dst_path: str | None = None, + dst_path: str, ) -> str | None: """Render the workload parameters. @@ -344,12 +352,16 @@ def prepare(self) -> bool: # First, clean if a topic already existed self.clean() try: - topic = NewTopic( - name=self.database.state.get().db_name, - num_partitions=self.config.get("threads") * self.config.get("parallel_processes"), - replication_factor=self.client.replication_factor, - ) - self.client.create_topic(topic) + if model := self.database_state.model(): + topic = NewTopic( + name=model.db_name, + num_partitions=self.config.threads * self.config.parallel_processes, + replication_factor=self.client.replication_factor, + ) + self.client.create_topic(topic) + else: + logger.warning("No database model found") + return False except Exception as e: logger.debug(f"Error creating topic: {e}") @@ -360,16 +372,18 @@ def prepare(self) -> bool: def is_prepared(self) -> bool: """Checks if the benchmark service has passed its "prepare" status.""" try: - return self.database.state.get().db_name in self.client._admin_client.list_topics() + if model := self.database_state.model(): + return model.db_name in self.client._admin_client.list_topics() except Exception as e: logger.info(f"Error describing topic: {e}") - return False + return False @override def clean(self) -> bool: """Clean the benchmark service.""" try: - self.client.delete_topics([self.database.state.get().db_name]) + if model := self.database_state.model(): + self.client.delete_topics([model.db_name]) except Exception as e: logger.info(f"Error deleting topic: {e}") return self.is_cleaned() @@ -378,7 +392,9 @@ def clean(self) -> bool: def is_cleaned(self) -> bool: """Checks if the benchmark service has passed its "prepare" status.""" try: - return self.database.state.get().db_name not in self.client._admin_client.list_topics() + if not (model := self.database_state.model()): + return False + return model.db_name not in self.client._admin_client.list_topics() except Exception as e: logger.info(f"Error describing topic: {e}") return False @@ -386,25 +402,57 @@ def is_cleaned(self) -> bool: @cached_property def client(self) -> KafkaClient: """Return the Kafka client.""" - state = self.database.state.get() + if not (state := self.database_state.model()): + return KafkaClient( + servers=[], + username=None, + password=None, + security_protocol="SASL_PLAINTEXT", + replication_factor=1, + ) return KafkaClient( - servers=self.database.bootstrap_servers(), + servers=state.hosts or [], username=state.username, password=state.password, security_protocol="SASL_SSL" if (state.tls or state.tls_ca) else "SASL_PLAINTEXT", cafile_path=state.tls_ca, certfile_path=state.tls, - replication_factor=len(self.peer.units()) + 1, + replication_factor=len(self.peers) + 1, ) +class KafkaBenchmarkActionsHandler(ActionsHandler): + """Handle the actions for the benchmark charm.""" + + def __init__(self, charm: DPBenchmarkCharmBase): + """Initialize the class.""" + super().__init__(charm) + self.config: BenchmarkCharmConfig = charm.config + + @override + def _preflight_checks(self) -> bool: + """Check if we have the necessary relations. + + In kafka case, we need the client relation to be able to connect to the database. + """ + if int(self.config.parallel_processes) < 2: + logger.error("The number of parallel processes must be greater than 1.") + self.unit.status = BlockedStatus( + "The number of parallel processes must be greater than 1." + ) + return False + return self._preflight_checks() + + class KafkaBenchmarkOperator(DPBenchmarkCharmBase): """Charm the service.""" - def __init__(self, *args): - self.workload_params_template = KAFKA_WORKLOAD_PARAMS_TEMPLATE + config_type = KafkaBenchmarkCharmConfig + def __init__(self, *args): super().__init__(*args, db_relation_name=CLIENT_RELATION_NAME) + + self.workload_params_template = KAFKA_WORKLOAD_PARAMS_TEMPLATE self.labels = ",".join([self.model.name, self.unit.name.replace("/", "-")]) self.database = KafkaDatabaseRelationHandler( @@ -414,8 +462,8 @@ def __init__(self, *args): self.peer_handler = KafkaPeersRelationHandler(self, PEER_RELATION) self.config_manager = KafkaConfigManager( workload=self.workload, - database=self.database, - peer=self.peer_handler, + database_state=self.database.state, + peers=self.peer_handler.peers(), config=self.config, labels=self.labels, ) @@ -424,41 +472,23 @@ def __init__(self, *args): self.peers.this_unit(), self.config_manager, ) + self.actions = KafkaBenchmarkActionsHandler(self) self.framework.observe(self.database.on.db_config_update, self._on_config_changed) @override - def _on_install(self, event: EventBase) -> None: + def _on_install(self, _: EventBase) -> None: """Install the charm.""" apt.add_package("openjdk-18-jre", update_cache=True) - @override - def _preflight_checks(self) -> bool: - """Check if we have the necessary relations. - - In kafka case, we need the client relation to be able to connect to the database. - """ - if self.config.get("parallel_processes") < 2: - logger.error("The number of parallel processes must be greater than 1.") - self.unit.status = BlockedStatus( - "The number of parallel processes must be greater than 1." - ) - return False - return super()._preflight_checks() - @override def _on_config_changed(self, event): """Handle the config changed event.""" - if not self._preflight_checks(): + if not self.actions._preflight_checks(): event.defer() return return super()._on_config_changed(event) - @override - def supported_workloads(self) -> list[str]: - """List of supported workloads.""" - return ["default"] - if __name__ == "__main__": ops.main(KafkaBenchmarkOperator) diff --git a/src/models.py b/src/models.py new file mode 100644 index 0000000..92f1651 --- /dev/null +++ b/src/models.py @@ -0,0 +1,33 @@ +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Structured configuration for the Kafka charm.""" + +from pydantic import BaseModel, validator + +from benchmark.core.structured_config import BenchmarkCharmConfig + + +class WorkloadType(BaseModel): + """Workload type parameters.""" + + message_size: int + producer_rate: int + + +WorkloadTypeParameters = { + "default": WorkloadType(message_size=1024, producer_rate=100000), +} + + +class KafkaBenchmarkCharmConfig(BenchmarkCharmConfig): + """Manager for the structured configuration.""" + + @validator("workload_name") + @classmethod + def profile_values(cls, value: str) -> str: + """Check profile config option is valid.""" + if value not in WorkloadTypeParameters.keys(): + raise ValueError(f"Value not one of {str(WorkloadTypeParameters.keys())}") + + return value diff --git a/src/wrapper.py b/src/wrapper.py index bd8ed87..e083844 100755 --- a/src/wrapper.py +++ b/src/wrapper.py @@ -8,8 +8,8 @@ import os import re -from overrides import override from pydantic import BaseModel +from typing_extensions import override from benchmark.literals import BENCHMARK_WORKLOAD_PATH from benchmark.wrapper.core import ( @@ -137,7 +137,7 @@ def _map_run(self) -> tuple[BenchmarkManager | None, list[BenchmarkProcess] | No """Returns the mapping for the run phase.""" driver_path = os.path.join(BENCHMARK_WORKLOAD_PATH, "worker_params.yaml") workload_path = os.path.join(BENCHMARK_WORKLOAD_PATH, "dpe_benchmark.json") - processes = [ + processes: list[BenchmarkProcess] = [ KafkaBenchmarkProcess( model=ProcessModel( cmd=f"""sudo bin/benchmark-worker -p {peer.split(":")[1]} -sp {int(peer.split(":")[1]) + 1}""", diff --git a/tests/unit/test_lifecycle.py b/tests/unit/test_lifecycle.py index 0d28336..8b5e8d1 100644 --- a/tests/unit/test_lifecycle.py +++ b/tests/unit/test_lifecycle.py @@ -45,6 +45,17 @@ def test_next_state_clean(): assert lifecycle.next(DPBenchmarkLifecycleTransition.CLEAN) == DPBenchmarkLifecycleState.UNSET +def test_next_state_stop(): + lifecycle = lifecycle_factory(DPBenchmarkLifecycleState.STOPPED) + lifecycle.config_manager.is_running = MagicMock(return_value=False) + # Check the other condition + assert lifecycle.next(None) is None + + # Test now with the workload recovered + lifecycle.config_manager.is_running = MagicMock(return_value=True) + assert lifecycle.next(None) == DPBenchmarkLifecycleState.RUNNING + + def test_next_state_prepare(): lifecycle = lifecycle_factory(DPBenchmarkLifecycleState.UNSET) assert ( diff --git a/tox.ini b/tox.ini index a1e638d..b46e9c1 100644 --- a/tox.ini +++ b/tox.ini @@ -55,6 +55,9 @@ commands = poetry run ruff check {[vars]all_path} poetry run black --check --diff {[vars]all_path} + poetry install --with lint + poetry run pyright + [testenv:unit] description = Run unit tests set_env =