Skip to content

Commit

Permalink
feat(license): invalidate license when cpu exceeds limit instead of r…
Browse files Browse the repository at this point in the history
…ejecting new compute nodes from joining (#20276)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Jan 25, 2025
1 parent fa4c463 commit 6390200
Show file tree
Hide file tree
Showing 16 changed files with 269 additions and 131 deletions.
2 changes: 1 addition & 1 deletion .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ extend-exclude = [
"src/sqlparser/tests/testdata/",
"src/frontend/planner_test/tests/testdata",
"src/tests/sqlsmith/tests/freeze",
"src/license/src/manager.rs",
"src/license/**/*.rs", # JWT license key
"Cargo.lock",
"**/Cargo.toml",
"**/go.mod",
Expand Down
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ message MetaSnapshot {
reserved "parallel_unit_mappings";
GetSessionParamsResponse session_params = 20;
repeated catalog.Secret secrets = 23;
uint64 compute_node_total_cpu_count = 24;
repeated common.WorkerNode nodes = 10;
hummock.HummockVersion hummock_version = 12;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
Expand Down Expand Up @@ -540,6 +541,7 @@ message SubscribeResponse {
FragmentWorkerSlotMapping streaming_worker_slot_mapping = 27;
FragmentWorkerSlotMappings serving_worker_slot_mappings = 28;
catalog.Secret secret = 29;
uint64 compute_node_total_cpu_count = 30;
}
reserved 12;
reserved "parallel_unit_mapping";
Expand Down
1 change: 1 addition & 0 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ where
Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(),
Info::HummockStats(_) => true,
Info::Recovery(_) => true,
Info::ComputeNodeTotalCpuCount(_) => true,
Info::StreamingWorkerSlotMapping(_) => {
notification.version
> info
Expand Down
5 changes: 5 additions & 0 deletions src/compute/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::license::LicenseManager;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::ObserverState;
Expand Down Expand Up @@ -45,6 +46,9 @@ impl ObserverState for ComputeObserverNode {
panic!("error type notification");
}
},
Info::ComputeNodeTotalCpuCount(count) => {
LicenseManager::get().update_cpu_core_count(count as _);
}
_ => {
panic!("error type notification");
}
Expand All @@ -57,6 +61,7 @@ impl ObserverState for ComputeObserverNode {
unreachable!();
};
LocalSecretManager::global().init_secrets(snapshot.secrets);
LicenseManager::get().update_cpu_core_count(snapshot.compute_node_total_cpu_count as _);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use parking_lot::RwLock;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
use risingwave_common::catalog::CatalogVersion;
use risingwave_common::hash::WorkerSlotMapping;
use risingwave_common::license::LicenseManager;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
Expand Down Expand Up @@ -114,6 +115,9 @@ impl ObserverState for FrontendObserverNode {
Info::Recovery(_) => {
self.compute_client_pool.invalidate_all();
}
Info::ComputeNodeTotalCpuCount(count) => {
LicenseManager::get().update_cpu_core_count(count as _);
}
}
}

Expand Down Expand Up @@ -147,6 +151,7 @@ impl ObserverState for FrontendObserverNode {
session_params,
version,
secrets,
compute_node_total_cpu_count,
} = snapshot;

for db in databases {
Expand Down Expand Up @@ -208,6 +213,7 @@ impl ObserverState for FrontendObserverNode {
*self.session_params.write() =
serde_json::from_str(&session_params.unwrap().params).unwrap();
LocalSecretManager::global().init_secrets(secrets);
LicenseManager::get().update_cpu_core_count(compute_node_total_cpu_count as _);
}
}

Expand Down
66 changes: 17 additions & 49 deletions src/license/src/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,56 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroU64;

use thiserror::Error;

use crate::{LicenseKeyError, LicenseManager};

/// The error type for CPU core limit exceeded as per the license key.
#[derive(Debug, Clone, Error)]
#[error("invalid license key")]
pub enum CpuCoreLimitExceeded {
#[error("cannot check CPU core limit due to license key error")]
LicenseKeyError(#[from] LicenseKeyError),

#[error(
"CPU core limit exceeded as per the license key, \
requesting {actual} while the maximum allowed is {limit}"
)]
Exceeded { limit: NonZeroU64, actual: u64 },
}

impl LicenseManager {
/// Check if the given CPU core count exceeds the limit as per the license key.
pub fn check_cpu_core_limit(&self, cpu_core_count: u64) -> Result<(), CpuCoreLimitExceeded> {
let license = self.license()?;

match license.cpu_core_limit {
Some(limit) if cpu_core_count > limit.get() => Err(CpuCoreLimitExceeded::Exceeded {
limit,
actual: cpu_core_count,
}),
_ => Ok(()),
}
}
}

// Tests below only work in debug mode.
#[cfg(debug_assertions)]
#[cfg(test)]
mod tests {
use expect_test::expect;
use thiserror_ext::AsReport as _;

use super::*;
use crate::{LicenseKey, TEST_PAID_LICENSE_KEY_CONTENT};
use crate::{Feature, LicenseKey, LicenseManager, TEST_PAID_LICENSE_KEY_CONTENT};

fn do_test(key: &str, cpu_core_count: u64, expect: expect_test::Expect) {
fn do_test(key: &str, cpu_core_count: usize, expect: expect_test::Expect) {
let manager = LicenseManager::new();
manager.refresh(LicenseKey(key));
manager.update_cpu_core_count(cpu_core_count);

match manager.check_cpu_core_limit(cpu_core_count) {
match Feature::TestPaid.check_available_with(&manager) {
Ok(_) => expect.assert_eq("ok"),
Err(error) => expect.assert_eq(&error.to_report_string()),
}
Expand All @@ -72,28 +37,31 @@ mod tests {
do_test(TEST_PAID_LICENSE_KEY_CONTENT, 114514, expect!["ok"]);
}

#[test]
fn test_no_license_key_no_limit() {
do_test("", 114514, expect!["ok"]);
}

#[test]
fn test_invalid_license_key() {
const KEY: &str = "invalid";

do_test(KEY, 0, expect!["cannot check CPU core limit due to license key error: invalid license key: InvalidToken"]);
do_test(KEY, 114514, expect!["cannot check CPU core limit due to license key error: invalid license key: InvalidToken"]);
do_test(
KEY,
0,
expect!["feature TestPaid is not available due to license error: invalid license key: InvalidToken"],
);
do_test(
KEY,
114514,
expect!["feature TestPaid is not available due to license error: invalid license key: InvalidToken"],
);
}

#[test]
fn test_limit() {
const KEY: &str =
"eyJhbGciOiJSUzUxMiIsInR5cCI6IkpXVCJ9.\
eyJzdWIiOiJmcmVlLXRlc3QtMzIiLCJpc3MiOiJwcm9kLnJpc2luZ3dhdmUuY29tIiwidGllciI6ImZyZWUiLCJleHAiOjE4NTI1NTk5OTksImlhdCI6MTcyMzcwMTk5NCwiY3B1X2NvcmVfbGltaXQiOjMyfQ.\
rsATtzlduLUkGQeXkOROtyGUpafdDhi18iKdYAzAldWQuO9KevNcnD8a6geCShZSGte65bI7oYtv7GHx8i66ge3B1SVsgGgYr10ebphPUNUQenYoN0mpD4Wn0prPStOgANzYZOI2ntMGAaeWStji1x67_iho6r0W9r6RX3kMvzFSbiObSIfvTdrMULeg-xeHc3bT_ErRhaXq7MAa2Oiq3lcK2sNgEvc9KYSP9YbhSik9CBkc8lcyeVoc48SSWEaBU-c8-Ge0fzjgWHI9KIsUV5Ihe66KEfs0PqdRoSWbgskYGzA3o8wHIbtJbJiPzra373kkFH9MGY0HOsw9QeJLGQ";
eyJzdWIiOiJwYWlkLXRlc3QtMzIiLCJpc3MiOiJ0ZXN0LnJpc2luZ3dhdmUuY29tIiwidGllciI6InBhaWQiLCJleHAiOjIxNTA0OTU5OTksImlhdCI6MTczNzYxMjQ5NSwiY3B1X2NvcmVfbGltaXQiOjMyfQ.\
SQpX2Dmon5Mb04VUbHyxsU7owJhcdLZHqUefxAXBwG5AqgKdpfS0XUePW5E4D-EfxtH_cWJiD4QDFsfdRUz88g_n_KvfNUObMW7NV5TUoRs_ImtS4ySugExNX3JzJi71QqgI8kugStQ7uOR9kZ_C-cCc_IG2CwwEmhhW1Ij0vX7qjhG5JNMit_bhxPY7Rh27ppgPTqWxJFTTsw-9B7O5WR_yIlaDjxVzk0ALm_j6DPB249gG3dkeK0rP0AK_ip2cK6iQdy8Cge7ATD6yUh4c_aR6GILDF6-vyB7QdWU6DdQS4KhdkPNWoe_Z9psotcXQJ7NhQ39hk8tdLzmTfGDDBA";

do_test(KEY, 31, expect!["ok"]);
do_test(KEY, 32, expect!["ok"]);
do_test(KEY, 33, expect!["CPU core limit exceeded as per the license key, requesting 33 while the maximum allowed is 32"]);
do_test(KEY, 33, expect!["feature TestPaid is not available due to license error: the license key is currently not effective because the CPU core in the cluster (33) exceeds the maximum allowed by the license key (32); consider removing some nodes or acquiring a new license key with a higher limit"]);
}
}
34 changes: 17 additions & 17 deletions src/license/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use thiserror::Error;

use super::{report_telemetry, License, LicenseKeyError, LicenseManager, Tier};
use super::{report_telemetry, LicenseError, LicenseManager, Tier};

/// Define all features that are available based on the tier of the license.
///
Expand Down Expand Up @@ -113,14 +113,17 @@ pub enum FeatureNotAvailable {
#[error("feature {feature:?} is not available due to license error")]
LicenseError {
feature: Feature,
source: LicenseKeyError,
source: LicenseError,
},
}

impl Feature {
/// Check whether the feature is available based on the current license.
pub fn check_available(self) -> Result<(), FeatureNotAvailable> {
let check_res = match LicenseManager::get().license() {
/// Check whether the feature is available based on the given license manager.
pub(crate) fn check_available_with(
self,
manager: &LicenseManager,
) -> Result<(), FeatureNotAvailable> {
let check_res = match manager.license() {
Ok(license) => {
if license.tier >= self.min_tier() {
Ok(())
Expand All @@ -131,22 +134,19 @@ impl Feature {
})
}
}
Err(error) => {
// If there's a license key error, we still try against the default license first
// to see if the feature is available for free.
if License::default().tier >= self.min_tier() {
Ok(())
} else {
Err(FeatureNotAvailable::LicenseError {
feature: self,
source: error,
})
}
}
Err(error) => Err(FeatureNotAvailable::LicenseError {
feature: self,
source: error,
}),
};

report_telemetry(&self, self.get_feature_name(), check_res.is_ok());

check_res
}

/// Check whether the feature is available based on the current license.
pub fn check_available(self) -> Result<(), FeatureNotAvailable> {
self.check_available_with(LicenseManager::get())
}
}
2 changes: 2 additions & 0 deletions src/license/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(let_chains)]

mod cpu;
mod feature;
mod key;
Expand Down
47 changes: 38 additions & 9 deletions src/license/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::num::NonZeroU64;
use std::num::NonZeroUsize;
use std::sync::{LazyLock, RwLock};

use jsonwebtoken::{Algorithm, DecodingKey, Validation};
Expand Down Expand Up @@ -81,7 +81,7 @@ pub struct License {
pub tier: Tier,

/// Maximum number of compute-node CPU cores allowed to use. Typically used for the paid tier.
pub cpu_core_limit: Option<NonZeroU64>,
pub cpu_core_limit: Option<NonZeroUsize>,

/// Expiration time in seconds since UNIX epoch.
///
Expand All @@ -106,11 +106,21 @@ impl Default for License {

/// The error type for invalid license key when verifying as JWT.
#[derive(Debug, Clone, Error)]
#[error("invalid license key")]
pub struct LicenseKeyError(#[source] jsonwebtoken::errors::Error);
pub enum LicenseError {
#[error("invalid license key")]
InvalidKey(#[source] jsonwebtoken::errors::Error),

#[error(
"the license key is currently not effective because the CPU core in the cluster \
({actual}) exceeds the maximum allowed by the license key ({limit}); \
consider removing some nodes or acquiring a new license key with a higher limit"
)]
CpuCoreLimitExceeded { limit: NonZeroUsize, actual: usize },
}

struct Inner {
license: Result<License, LicenseKeyError>,
license: Result<License, LicenseError>,
cached_cpu_core_count: usize,
}

/// The singleton license manager.
Expand All @@ -129,6 +139,7 @@ impl LicenseManager {
Self {
inner: RwLock::new(Inner {
license: Ok(License::default()),
cached_cpu_core_count: 0,
}),
}
}
Expand Down Expand Up @@ -162,7 +173,7 @@ impl LicenseManager {

inner.license = match jsonwebtoken::decode(license_key, &PUBLIC_KEY, &validation) {
Ok(data) => Ok(data.claims),
Err(error) => Err(LicenseKeyError(error)),
Err(error) => Err(LicenseError::InvalidKey(error)),
};

match &inner.license {
Expand All @@ -171,22 +182,40 @@ impl LicenseManager {
}
}

/// Update the cached CPU core count.
pub fn update_cpu_core_count(&self, cpu_core_count: usize) {
let mut inner = self.inner.write().unwrap();
inner.cached_cpu_core_count = cpu_core_count;
}

/// Get the current license if it is valid.
///
/// Since the license can expire, the returned license should not be cached by the caller.
///
/// Prefer calling [`crate::Feature::check_available`] to check the availability of a feature,
/// other than directly calling this method and checking the content of the license.
pub fn license(&self) -> Result<License, LicenseKeyError> {
let license = self.inner.read().unwrap().license.clone()?;
pub fn license(&self) -> Result<License, LicenseError> {
let inner = self.inner.read().unwrap();
let license = inner.license.clone()?;

// Check the expiration time additionally.
if license.exp < jsonwebtoken::get_current_timestamp() {
return Err(LicenseKeyError(
return Err(LicenseError::InvalidKey(
jsonwebtoken::errors::ErrorKind::ExpiredSignature.into(),
));
}

// Check the CPU core limit.
let actual_cpu_core = inner.cached_cpu_core_count;
if let Some(limit) = license.cpu_core_limit
&& actual_cpu_core > limit.get()
{
return Err(LicenseError::CpuCoreLimitExceeded {
limit,
actual: actual_cpu_core,
});
}

Ok(license)
}
}
Expand Down
Loading

0 comments on commit 6390200

Please sign in to comment.