From b543052384890a40c8c49dab38d2e3b34c5363b1 Mon Sep 17 00:00:00 2001 From: Jialei Date: Wed, 27 Dec 2023 15:51:05 +0800 Subject: [PATCH] enhance(client): support checkpoint in remote datastore --- client/starwhale/api/_impl/data_store.py | 22 +++++++++---- .../starwhale/base/client/api/data_store.py | 33 +++++++++++++++++++ client/starwhale/base/client/models/models.py | 24 ++++++++++++++ 3 files changed, 72 insertions(+), 7 deletions(-) create mode 100644 client/starwhale/base/client/api/data_store.py diff --git a/client/starwhale/api/_impl/data_store.py b/client/starwhale/api/_impl/data_store.py index f9ff90cddf..b14de92522 100644 --- a/client/starwhale/api/_impl/data_store.py +++ b/client/starwhale/api/_impl/data_store.py @@ -59,7 +59,12 @@ ) from starwhale.utils.config import SWCliConfigMixed from starwhale.base.models.base import SwBaseModel -from starwhale.base.client.models.models import ColumnSchemaDesc, KeyValuePairSchema +from starwhale.base.client.models.models import ( + ColumnSchemaDesc, + KeyValuePairSchema, + CreateCheckpointRequest, +) +from starwhale.base.client.api.data_store import DataStoreApi datastore_manifest_file_name = "manifest.json" datastore_max_dirty_records = int(os.getenv("DATASTORE_MAX_DIRTY_RECORDS", "10000")) @@ -2591,6 +2596,7 @@ def __init__(self, instance_uri: str, token: str) -> None: self.instance_uri = instance_uri self.token = token + self.client = DataStoreApi(instance_uri, token) def __str__(self) -> str: return f"RemoteDataStore for {self.instance_uri}" @@ -2710,13 +2716,15 @@ def delete_by_range( return "" def list_table_checkpoints(self, table_name: str) -> List[Checkpoint]: - return [] + resp = self.client.list_checkpoints(table_name) + return [Checkpoint(revision=cp.id) for cp in resp] - def add_checkpoint(self, table_name: str, revision: str) -> None: - ... + def add_checkpoint(self, table_name: str) -> Checkpoint: + resp = self.client.create_checkpoint(CreateCheckpointRequest(table=table_name)) + return Checkpoint(revision=resp.id) def remove_checkpoint(self, table_name: str, cp: Checkpoint) -> None: - ... + self.client.delete_checkpoint(table_name, cp.revision) def get_table_size(self, table_name: str, cp: Checkpoint | None = None) -> int: return 0 @@ -2724,7 +2732,7 @@ def get_table_size(self, table_name: str, cp: Checkpoint | None = None) -> int: class Checkpoint(SwBaseModel): revision: str - created_at: int # timestamp in milliseconds + created_at: Optional[int] # timestamp in milliseconds class DataStore(Protocol): @@ -2777,7 +2785,7 @@ def delete_by_range( def list_table_checkpoints(self, table_name: str) -> List[Checkpoint]: ... - def add_checkpoint(self, table_name: str, revision: str) -> None: + def add_checkpoint(self, table_name: str) -> Checkpoint: ... def remove_checkpoint(self, table_name: str, cp: Checkpoint) -> None: diff --git a/client/starwhale/base/client/api/data_store.py b/client/starwhale/base/client/api/data_store.py new file mode 100644 index 0000000000..e248c05c51 --- /dev/null +++ b/client/starwhale/base/client/api/data_store.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from typing import List + +from starwhale.base.client.client import Client, TypeWrapper +from starwhale.base.client.models.models import ( + CheckpointVo, + CreateCheckpointRequest, + ResponseMessageListCheckpointVo, +) + +_URI = "/api/v1/project/datastore/checkpoint" + + +class DataStoreApi(Client): + def __init__(self, url: str, token: str) -> None: + super().__init__(url, token) + + def list_checkpoints(self, table: str) -> List[CheckpointVo]: + return ( + TypeWrapper( + ResponseMessageListCheckpointVo, + self.http_get(_URI, params={"table": table}), + ) + .response() + .data + ) + + def create_checkpoint(self, req: CreateCheckpointRequest) -> CheckpointVo: + return TypeWrapper(CheckpointVo, self.http_post(_URI, json=req)).response().data + + def delete_checkpoint(self, table: str, revision: str) -> None: + self.http_delete(_URI, params={"table": table, "id": revision}) diff --git a/client/starwhale/base/client/models/models.py b/client/starwhale/base/client/models/models.py index e33bb3bbef..dbcaec41a0 100644 --- a/client/starwhale/base/client/models/models.py +++ b/client/starwhale/base/client/models/models.py @@ -423,6 +423,24 @@ class FlushRequest(SwBaseModel): pass +class CreateCheckpointRequest(SwBaseModel): + table: str + user_data: Optional[str] = Field(None, alias='userData') + + +class CheckpointVo(SwBaseModel): + id: str + created_time: int = Field(..., alias='createdTime') + row_count: int = Field(..., alias='rowCount') + user_data: Optional[str] = Field(None, alias='userData') + + +class ResponseMessageCheckpointVo(SwBaseModel): + code: str + message: str + data: CheckpointVo + + class InitUploadBlobRequest(SwBaseModel): content_md5: str = Field(..., alias='contentMd5') content_length: int = Field(..., alias='contentLength') @@ -1127,6 +1145,12 @@ class RuntimeSuggestionVo(SwBaseModel): runtimes: Optional[List[RuntimeVersionVo]] = None +class ResponseMessageListCheckpointVo(SwBaseModel): + code: str + message: str + data: List[CheckpointVo] + + class UserRoleDeleteRequest(UserCheckPasswordRequest): pass