From 7c7ff4faabd947e430e924cfeeb8a0c17e57afb8 Mon Sep 17 00:00:00 2001 From: star <15031259256@163.com> Date: Wed, 19 Jul 2023 10:32:43 +0800 Subject: [PATCH] feat(controller): build dataset in server (#2497) --- client/scripts/sw-docker-entrypoint | 52 ++++- .../ai/starwhale/mlops/api/DatasetApi.java | 31 +++ .../mlops/api/DatasetBuildLogWsServer.java | 125 +++++++++++ .../mlops/api/DatasetController.java | 26 +++ .../starwhale/mlops/api/FileStorageApi.java | 59 +++++ .../mlops/api/FileStorageController.java | 66 ++++++ .../java/ai/starwhale/mlops/api/LogApi.java | 11 + .../ai/starwhale/mlops/api/LogController.java | 10 +- .../starwhale/mlops/api/TaskLogWsServer.java | 12 +- .../protocol/dataset/build/BuildRecordVo.java | 34 +++ .../dataset/build/DatasetBuildRequest.java | 42 ++++ .../filestorage/ApplySignedUrlRequest.java | 41 ++++ .../filestorage/FileDeleteRequest.java | 39 ++++ .../filestorage/SignedUrlResponse.java | 28 +++ .../configuration/RunTimeProperties.java | 9 +- .../security/DatasetBuildTokenValidator.java | 64 ++++++ .../mlops/domain/dataset/DatasetService.java | 210 +++++++++++++++++- .../build/BuildRecordGarbageCollector.java | 58 +++++ .../domain/dataset/build/BuildStatus.java | 27 +++ .../mlops/domain/dataset/build/BuildType.java | 27 +++ .../build/bo/CreateBuildRecordRequest.java | 33 +++ .../converter/BuildRecordVoConverter.java | 20 ++ .../dataset/build/log/BuildLogCollector.java | 95 ++++++++ .../build/mapper/BuildRecordMapper.java | 89 ++++++++ .../dataset/build/po/BuildRecordEntity.java | 45 ++++ .../filestorage/FileStorageService.java | 145 ++++++++++++ .../domain/system/SystemSettingService.java | 2 + .../mlops/schedule/k8s/JobEventHandler.java | 28 ++- .../mlops/schedule/k8s/K8sJobTemplate.java | 7 + .../mlops/schedule/k8s/PodEventHandler.java | 111 ++++++--- .../k8s/log/CancellableJobLogCollector.java} | 4 +- .../log/CancellableJobLogK8sCollector.java} | 12 +- ...CancellableJobLogK8sCollectorFactory.java} | 10 +- .../src/main/resources/application.yaml | 6 + .../V0_4_0_011__add_dataset_build_record.sql | 33 +++ .../mlops/api/LogControllerTest.java | 3 +- .../mlops/api/TaskLogWsServerTest.java | 16 +- .../domain/dataset/DatasetServiceTest.java | 182 ++++++++++++++- .../filestorage/FileStorageServiceTest.java | 108 +++++++++ .../domain/runtime/RuntimeServiceTest.java | 3 +- .../system/SystemSettingServiceTest.java | 5 +- .../schedule/k8s/JobEventHandlerTest.java | 7 +- .../schedule/k8s/K8sJobTemplateTest.java | 9 +- .../schedule/k8s/K8sTaskSchedulerTest.java | 16 +- .../schedule/k8s/PodEventHandlerTest.java | 10 +- ...ellableJobLogK8sCollectorFactoryTest.java} | 9 +- .../CancellableJobLogK8sCollectorTest.java} | 7 +- .../src/test/resources/template/job.yaml | 3 + 48 files changed, 1894 insertions(+), 95 deletions(-) create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/api/DatasetBuildLogWsServer.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/api/FileStorageApi.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/api/FileStorageController.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/api/protocol/dataset/build/BuildRecordVo.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/api/protocol/dataset/build/DatasetBuildRequest.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/ApplySignedUrlRequest.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/FileDeleteRequest.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/SignedUrlResponse.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/configuration/security/DatasetBuildTokenValidator.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildRecordGarbageCollector.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildStatus.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildType.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/bo/CreateBuildRecordRequest.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/converter/BuildRecordVoConverter.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/log/BuildLogCollector.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/mapper/BuildRecordMapper.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/po/BuildRecordEntity.java create mode 100644 server/controller/src/main/java/ai/starwhale/mlops/domain/filestorage/FileStorageService.java rename server/controller/src/main/java/ai/starwhale/mlops/{domain/task/status/watchers/log/CancellableTaskLogCollector.java => schedule/k8s/log/CancellableJobLogCollector.java} (86%) rename server/controller/src/main/java/ai/starwhale/mlops/{domain/task/status/watchers/log/CancellableTaskLogK8sCollector.java => schedule/k8s/log/CancellableJobLogK8sCollector.java} (80%) rename server/controller/src/main/java/ai/starwhale/mlops/{domain/task/status/watchers/log/CancellableTaskLogK8sCollectorFactory.java => schedule/k8s/log/CancellableJobLogK8sCollectorFactory.java} (71%) create mode 100644 server/controller/src/main/resources/db/migration/v0_4_0/V0_4_0_011__add_dataset_build_record.sql create mode 100644 server/controller/src/test/java/ai/starwhale/mlops/domain/filestorage/FileStorageServiceTest.java rename server/controller/src/test/java/ai/starwhale/mlops/{domain/task/log/CancellableTaskLogK8sCollectorFactoryTest.java => schedule/k8s/log/CancellableJobLogK8sCollectorFactoryTest.java} (86%) rename server/controller/src/test/java/ai/starwhale/mlops/{domain/task/log/CancellableTaskLogK8sCollectorTest.java => schedule/k8s/log/CancellableJobLogK8sCollectorTest.java} (90%) diff --git a/client/scripts/sw-docker-entrypoint b/client/scripts/sw-docker-entrypoint index 45c5eeecd8..c3926dd34e 100755 --- a/client/scripts/sw-docker-entrypoint +++ b/client/scripts/sw-docker-entrypoint @@ -191,7 +191,54 @@ run_code_server () { echo "-->[Preparing] run code-server done." } -welcome $1 +ds_build_and_upload () { + echo "-->[Preparing] Downloading files..." + BUILD_DIR=$DATASET_BUILD_NAME + mkdir -p "$BUILD_DIR" + cd "$BUILD_DIR" + + SIGNED_URLS=$(curl -X 'GET' "$SW_INSTANCE_URI/api/v1/filestorage/signedurl?pathPrefix=$DATASET_BUILD_DIR_PREFIX" -H 'accept: application/json' -H "Authorization: $SW_TOKEN" | jq ".data.signedUrls") + + for entry in $(echo "$SIGNED_URLS" | jq -r 'to_entries|map("\(.key)@\(.value)")|.[]'); do + IFS='@' read -r file signedurl <<< "$entry" + + filedir=$(dirname "$file") + if [ ! -d "$filedir" ]; then + mkdir -p "$filedir" + fi + echo "$file $signedurl" + done | xargs -I {} -n 2 -P 10 sh -c 'curl -o "$1" "$2"' sh + cd - + + cmd="swcli dataset build -n $DATASET_BUILD_NAME" + if [ "$DATASET_BUILD_TYPE" = "IMAGE" ]; then + cmd="$cmd -if $BUILD_DIR" + elif [ "$DATASET_BUILD_TYPE" = "VIDEO" ]; then + cmd="$cmd -vf $BUILD_DIR" + elif [ "$DATASET_BUILD_TYPE" = "AUDIO" ]; then + cmd="$cmd -af $BUILD_DIR" + elif [ "$DATASET_BUILD_TYPE" = "JSON" ]; then + cmd="$cmd -jf $BUILD_DIR" + if [ -z "$DATASET_BUILD_EXTRA" ]; then + cmd="$cmd --field-selector $DATASET_BUILD_EXTRA" + fi + elif [ "$DATASET_BUILD_TYPE" = "HANDLER" ]; then + cmd="$cmd -h $DATASET_BUILD_HANDLER" + elif [ "$DATASET_BUILD_TYPE" = "YAML" ]; then + cmd="$cmd -f $DATASET_BUILD_YAML" + else + echo "Unknown type: $DATASET_BUILD_TYPE" && exit 1 + fi + + echo "-->[Building] Start to build dataset: $DATASET_BUILD_NAME..." + eval "$cmd" || exit 1 + + echo "-->[Uploading] Start to upload dataset: $DATASET_BUILD_NAME..." + swcli instance login --token "$SW_TOKEN" --alias server "$SW_INSTANCE_URI" + swcli dataset copy --patch "$DATASET_BUILD_NAME"/version/latest cloud://server/project/"$SW_PROJECT" || exit 1 +} + +welcome "$1" case "$1" in pre_config) pre_config @@ -206,6 +253,9 @@ case "$1" in prepare && install_code_server && run_code_server tail -f /var/log/dev.log ;; + dataset_build) + ds_build_and_upload + ;; *) prepare "starwhale" && exec "$@" ;; diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/DatasetApi.java b/server/controller/src/main/java/ai/starwhale/mlops/api/DatasetApi.java index 391efa01ae..ef364ebfd8 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/api/DatasetApi.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/DatasetApi.java @@ -25,10 +25,13 @@ import ai.starwhale.mlops.api.protocol.dataset.DatasetViewVo; import ai.starwhale.mlops.api.protocol.dataset.DatasetVo; import ai.starwhale.mlops.api.protocol.dataset.RevertDatasetRequest; +import ai.starwhale.mlops.api.protocol.dataset.build.BuildRecordVo; +import ai.starwhale.mlops.api.protocol.dataset.build.DatasetBuildRequest; import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataConsumptionRequest; import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataIndexDesc; import ai.starwhale.mlops.api.protocol.dataset.upload.DatasetUploadRequest; import ai.starwhale.mlops.api.protocol.upload.UploadResult; +import ai.starwhale.mlops.domain.dataset.build.BuildStatus; import com.github.pagehelper.PageInfo; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -430,4 +433,32 @@ ResponseEntity headDataset( @PathVariable("versionUrl") String versionUrl); + @Operation(summary = "Build Dataset", description = "Build Dataset") + @ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")}) + @PostMapping("/project/{projectUrl}/dataset/{datasetName}/build") + @PreAuthorize("hasAnyRole('OWNER', 'MAINTAINER')") + ResponseEntity> buildDataset( + @Parameter(in = ParameterIn.PATH, required = true, schema = @Schema()) + @PathVariable(name = "projectUrl") + String projectUrl, + @Parameter(in = ParameterIn.PATH, required = true, schema = @Schema()) + @PathVariable(name = "datasetName") + String datasetName, + @Valid @RequestBody DatasetBuildRequest datasetBuildRequest); + + @Operation(summary = "List Build Records", description = "List Build Records") + @ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")}) + @GetMapping("/project/{projectUrl}/dataset/build/list") + @PreAuthorize("hasAnyRole('OWNER', 'MAINTAINER')") + ResponseEntity>> listBuildRecords( + @Parameter(in = ParameterIn.PATH, required = true, schema = @Schema()) + @PathVariable(name = "projectUrl") + String projectUrl, + @RequestParam(value = "status", required = false) + BuildStatus status, + @Valid @RequestParam(value = "pageNum", required = false, defaultValue = "1") + Integer pageNum, + @Valid @RequestParam(value = "pageSize", required = false, defaultValue = "10") + Integer pageSize); + } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/DatasetBuildLogWsServer.java b/server/controller/src/main/java/ai/starwhale/mlops/api/DatasetBuildLogWsServer.java new file mode 100644 index 0000000000..be71f4702b --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/DatasetBuildLogWsServer.java @@ -0,0 +1,125 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.api; + +import ai.starwhale.mlops.common.IdConverter; +import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogCollector; +import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogK8sCollectorFactory; +import io.kubernetes.client.openapi.ApiException; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +@Slf4j +@ServerEndpoint("/api/v1/log/online/dataset/{name}/build/{id}") +public class DatasetBuildLogWsServer { + + private static final ExecutorService executorService = Executors.newCachedThreadPool(); + + private static IdConverter idConvertor; + + private static CancellableJobLogK8sCollectorFactory logCollectorFactory; + + private Session session; + + private String readerId; + + private Long id; + + private CancellableJobLogCollector logCollector; + + + @Autowired + public void setIdConvertor(IdConverter idConvertor) { + DatasetBuildLogWsServer.idConvertor = idConvertor; + } + + @Autowired + public void setLogCollectorFactory(CancellableJobLogK8sCollectorFactory factory) { + DatasetBuildLogWsServer.logCollectorFactory = factory; + } + + @OnOpen + public void onOpen(Session session, @PathParam("name") String name, @PathParam("id") String id) { + this.session = session; + this.readerId = session.getId(); + this.id = idConvertor.revert(id); + try { + logCollector = logCollectorFactory.make(String.format("%s-%s", name, id)); + } catch (IOException | ApiException e) { + log.error("make k8s log collector failed", e); + } + log.info("Build log ws opened. reader={}, task={}", readerId, id); + executorService.submit(() -> { + String line; + while (true) { + try { + if ((line = logCollector.readLine()) == null) { + break; + } + sendMessage(line); + } catch (IOException e) { + log.error("read k8s log failed", e); + break; + } + } + }); + } + + @OnClose + public void onClose() { + cancelLogCollector(); + log.info("Build log ws closed. reader={}, task={}", readerId, id); + } + + @OnMessage + public void onMessage(String message, Session session) { + + } + + @OnError + public void onError(Session session, Throwable error) { + cancelLogCollector(); + log.error("Task log ws error: reader={}, task={}, message={}", readerId, id, error.getMessage()); + } + + public void sendMessage(String message) { + try { + this.session.getBasicRemote().sendText(message); + } catch (IOException e) { + log.error("ws send message", e); + } + } + + private void cancelLogCollector() { + if (logCollector != null) { + logCollector.cancel(); + logCollector = null; + } + } +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/DatasetController.java b/server/controller/src/main/java/ai/starwhale/mlops/api/DatasetController.java index a4e71ab17c..2a783b37ba 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/api/DatasetController.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/DatasetController.java @@ -24,6 +24,8 @@ import ai.starwhale.mlops.api.protocol.dataset.DatasetViewVo; import ai.starwhale.mlops.api.protocol.dataset.DatasetVo; import ai.starwhale.mlops.api.protocol.dataset.RevertDatasetRequest; +import ai.starwhale.mlops.api.protocol.dataset.build.BuildRecordVo; +import ai.starwhale.mlops.api.protocol.dataset.build.DatasetBuildRequest; import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataConsumptionRequest; import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataIndexDesc; import ai.starwhale.mlops.api.protocol.dataset.upload.DatasetUploadRequest; @@ -34,6 +36,8 @@ import ai.starwhale.mlops.domain.dataset.DatasetService; import ai.starwhale.mlops.domain.dataset.bo.DatasetQuery; import ai.starwhale.mlops.domain.dataset.bo.DatasetVersionQuery; +import ai.starwhale.mlops.domain.dataset.build.BuildStatus; +import ai.starwhale.mlops.domain.dataset.build.bo.CreateBuildRecordRequest; import ai.starwhale.mlops.domain.dataset.dataloader.DataReadRequest; import ai.starwhale.mlops.domain.dataset.dataloader.ReadMode; import ai.starwhale.mlops.domain.dataset.objectstore.HashNamedDatasetObjectStoreFactory; @@ -356,4 +360,26 @@ public ResponseEntity headDataset(String projectUrl, String datasetUrl, Strin } } + @Override + public ResponseEntity> buildDataset( + String projectUrl, String datasetName, DatasetBuildRequest datasetBuildRequest) { + datasetService.build(CreateBuildRecordRequest.builder() + .datasetId(datasetBuildRequest.getDatasetId()) + .datasetName(datasetName) + .shared(datasetBuildRequest.getShared()) + .projectUrl(projectUrl) + .type(datasetBuildRequest.getType()) + .storagePath(datasetBuildRequest.getStoragePath()) + .build()); + return ResponseEntity.ok(Code.success.asResponse("success")); + } + + @Override + public ResponseEntity>> listBuildRecords( + String projectUrl, BuildStatus status, Integer pageNum, Integer pageSize) { + return ResponseEntity.ok(Code.success.asResponse( + datasetService.listBuildRecords(projectUrl, status, new PageParams(pageNum, pageSize)))); + } + + } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/FileStorageApi.java b/server/controller/src/main/java/ai/starwhale/mlops/api/FileStorageApi.java new file mode 100644 index 0000000000..9377c1038a --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/FileStorageApi.java @@ -0,0 +1,59 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.api; + +import ai.starwhale.mlops.api.protocol.ResponseMessage; +import ai.starwhale.mlops.api.protocol.filestorage.ApplySignedUrlRequest; +import ai.starwhale.mlops.api.protocol.filestorage.FileDeleteRequest; +import ai.starwhale.mlops.api.protocol.filestorage.SignedUrlResponse; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.http.ResponseEntity; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; + +@Tag(name = "File storage", description = "File storage operations") +@Validated +public interface FileStorageApi { + + @Operation(summary = "Apply pathPrefix", description = "Apply pathPrefix") + @ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")}) + @GetMapping("/filestorage/path/apply") + ResponseEntity> applyPathPrefix(String flag); + + @Operation(summary = "Apply signedUrls for put", description = "Apply signedUrls for put") + @ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")}) + @PutMapping("/filestorage/signedurl") + ResponseEntity> applySignedPutUrls( + @RequestBody ApplySignedUrlRequest applySignedUrlRequest); + + @Operation(summary = "Apply signedUrls for get", description = "Apply signedUrls for get") + @ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")}) + @GetMapping("/filestorage/signedurl") + ResponseEntity> applySignedGetUrls(String pathPrefix); + + @Operation(summary = "Delete path", description = "Delete path") + @ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")}) + @DeleteMapping("/filestorage/file") + ResponseEntity> deletePath(@RequestBody FileDeleteRequest request); + +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/FileStorageController.java b/server/controller/src/main/java/ai/starwhale/mlops/api/FileStorageController.java new file mode 100644 index 0000000000..df19db15f6 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/FileStorageController.java @@ -0,0 +1,66 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.api; + +import ai.starwhale.mlops.api.protocol.Code; +import ai.starwhale.mlops.api.protocol.ResponseMessage; +import ai.starwhale.mlops.api.protocol.filestorage.ApplySignedUrlRequest; +import ai.starwhale.mlops.api.protocol.filestorage.FileDeleteRequest; +import ai.starwhale.mlops.api.protocol.filestorage.SignedUrlResponse; +import ai.starwhale.mlops.domain.filestorage.FileStorageService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@Slf4j +@RestController +@RequestMapping("${sw.controller.api-prefix}") +public class FileStorageController implements FileStorageApi { + + private final FileStorageService service; + + public FileStorageController(FileStorageService service) { + this.service = service; + } + + @Override + public ResponseEntity> applyPathPrefix(String flag) { + return ResponseEntity.ok(Code.success.asResponse(service.generatePathPrefix(flag))); + } + + @Override + public ResponseEntity> applySignedPutUrls( + ApplySignedUrlRequest applySignedUrlRequest) { + var path = applySignedUrlRequest.getPathPrefix(); + var signedUrls = service.generateSignedPutUrls(path, applySignedUrlRequest.getFiles()); + + return ResponseEntity.ok(Code.success.asResponse(new SignedUrlResponse(path, signedUrls))); + } + + @Override + public ResponseEntity> applySignedGetUrls(String pathPrefix) { + return ResponseEntity.ok(Code.success.asResponse( + new SignedUrlResponse(pathPrefix, service.generateSignedGetUrls(pathPrefix)))); + } + + @Override + public ResponseEntity> deletePath(FileDeleteRequest request) { + service.deleteFiles(request.getPathPrefix(), request.getFiles()); + return ResponseEntity.ok(Code.success.asResponse("success")); + } +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/LogApi.java b/server/controller/src/main/java/ai/starwhale/mlops/api/LogApi.java index 539e9f8b24..a7dfca52ea 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/api/LogApi.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/LogApi.java @@ -66,4 +66,15 @@ ResponseEntity logContent( schema = @Schema()) @PathVariable("fileName") String fileName); + + @Operation(summary = "Get the build log content") + @ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")}) + @GetMapping( + value = "/log/offline/dataset/{name}/build/{id}", + produces = MediaType.TEXT_PLAIN_VALUE) + ResponseEntity buildLogContent( + @Parameter(in = ParameterIn.PATH, description = "the name of a dataset", schema = @Schema()) + @PathVariable("name") String name, + @Parameter(in = ParameterIn.PATH, description = "the id of the build record", schema = @Schema()) + @PathVariable("id") Long id); } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/LogController.java b/server/controller/src/main/java/ai/starwhale/mlops/api/LogController.java index bfd10ab520..db5be6ddd3 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/api/LogController.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/LogController.java @@ -18,6 +18,7 @@ import ai.starwhale.mlops.api.protocol.Code; import ai.starwhale.mlops.api.protocol.ResponseMessage; +import ai.starwhale.mlops.domain.dataset.DatasetService; import ai.starwhale.mlops.domain.task.TaskService; import java.util.List; import org.springframework.http.ResponseEntity; @@ -29,9 +30,11 @@ public class LogController implements LogApi { final TaskService taskService; + final DatasetService datasetService; - public LogController(TaskService taskService) { + public LogController(TaskService taskService, DatasetService datasetService) { this.taskService = taskService; + this.datasetService = datasetService; } @Override @@ -43,4 +46,9 @@ public ResponseEntity>> offlineLogs(Long taskId) { public ResponseEntity logContent(Long taskId, String fileName) { return ResponseEntity.ok(taskService.logContent(taskId, fileName)); } + + @Override + public ResponseEntity buildLogContent(String name, Long id) { + return ResponseEntity.ok(datasetService.buildLogContent(id)); + } } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/TaskLogWsServer.java b/server/controller/src/main/java/ai/starwhale/mlops/api/TaskLogWsServer.java index cd6484abee..ddb0de7bd0 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/api/TaskLogWsServer.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/TaskLogWsServer.java @@ -17,8 +17,8 @@ package ai.starwhale.mlops.api; import ai.starwhale.mlops.common.IdConverter; -import ai.starwhale.mlops.domain.task.status.watchers.log.CancellableTaskLogCollector; -import ai.starwhale.mlops.domain.task.status.watchers.log.CancellableTaskLogK8sCollectorFactory; +import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogCollector; +import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogK8sCollectorFactory; import io.kubernetes.client.openapi.ApiException; import java.io.IOException; import java.util.concurrent.ExecutorService; @@ -43,7 +43,7 @@ public class TaskLogWsServer { private static IdConverter idConvertor; - private static CancellableTaskLogK8sCollectorFactory logCollectorFactory; + private static CancellableJobLogK8sCollectorFactory logCollectorFactory; private Session session; @@ -51,7 +51,7 @@ public class TaskLogWsServer { private Long id; - private CancellableTaskLogCollector logCollector; + private CancellableJobLogCollector logCollector; @Autowired @@ -60,7 +60,7 @@ public void setIdConvertor(IdConverter idConvertor) { } @Autowired - public void setLogCollectorFactory(CancellableTaskLogK8sCollectorFactory factory) { + public void setLogCollectorFactory(CancellableJobLogK8sCollectorFactory factory) { TaskLogWsServer.logCollectorFactory = factory; } @@ -70,7 +70,7 @@ public void onOpen(Session session, @PathParam("taskId") String taskId) { this.readerId = session.getId(); this.id = idConvertor.revert(taskId); try { - logCollector = logCollectorFactory.make(this.id); + logCollector = logCollectorFactory.make(taskId); } catch (IOException | ApiException e) { log.error("make k8s log collector failed", e); } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/dataset/build/BuildRecordVo.java b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/dataset/build/BuildRecordVo.java new file mode 100644 index 0000000000..97e5272b5a --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/dataset/build/BuildRecordVo.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.api.protocol.dataset.build; + +import ai.starwhale.mlops.domain.dataset.build.BuildStatus; +import ai.starwhale.mlops.domain.dataset.build.BuildType; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class BuildRecordVo { + private String id; + private String datasetId; + private String projectId; + private String datasetName; + private BuildStatus status; + private BuildType type; + private Long createTime; +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/dataset/build/DatasetBuildRequest.java b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/dataset/build/DatasetBuildRequest.java new file mode 100644 index 0000000000..d51222e572 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/dataset/build/DatasetBuildRequest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.api.protocol.dataset.build; + +import ai.starwhale.mlops.domain.dataset.build.BuildType; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.validation.annotation.Validated; + + +@Validated +@Data +@NoArgsConstructor +@AllArgsConstructor +public class DatasetBuildRequest { + + private Long datasetId; + + @NotNull(message = "type can not be null") + private BuildType type; + + private Boolean shared = false; + + @NotNull(message = "storagePath can not be null") + private String storagePath; +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/ApplySignedUrlRequest.java b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/ApplySignedUrlRequest.java new file mode 100644 index 0000000000..948c7bec70 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/ApplySignedUrlRequest.java @@ -0,0 +1,41 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.api.protocol.filestorage; + +import java.util.Set; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.validation.annotation.Validated; + + +@Validated +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ApplySignedUrlRequest { + private String flag; + + @NotNull(message = "pathPrefix is required") + private String pathPrefix; + + @NotNull(message = "files is required") + private Set files; +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/FileDeleteRequest.java b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/FileDeleteRequest.java new file mode 100644 index 0000000000..db73926dd1 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/FileDeleteRequest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.api.protocol.filestorage; + +import java.util.Set; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.validation.annotation.Validated; + +@Validated +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class FileDeleteRequest { + + @NotNull(message = "pathPrefix is required") + private String pathPrefix; + + @NotNull(message = "files is required") + private Set files; +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/SignedUrlResponse.java b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/SignedUrlResponse.java new file mode 100644 index 0000000000..70a643bbab --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/api/protocol/filestorage/SignedUrlResponse.java @@ -0,0 +1,28 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.api.protocol.filestorage; + +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class SignedUrlResponse { + private String pathPrefix; + private Map signedUrls; +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/configuration/RunTimeProperties.java b/server/controller/src/main/java/ai/starwhale/mlops/configuration/RunTimeProperties.java index 5ddf2c9794..d7f98a7966 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/configuration/RunTimeProperties.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/configuration/RunTimeProperties.java @@ -28,20 +28,21 @@ public class RunTimeProperties { String imageDefault; - ImageBuild imageBuild; + RunConfig imageBuild; + RunConfig datasetBuild; Pypi pypi; String condarc; @Data @AllArgsConstructor @NoArgsConstructor - public static class ImageBuild { + public static class RunConfig { String resourcePool; String image; - public static ImageBuild empty() { - return new ImageBuild("", ""); + public static RunConfig empty() { + return new RunConfig("", ""); } } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/configuration/security/DatasetBuildTokenValidator.java b/server/controller/src/main/java/ai/starwhale/mlops/configuration/security/DatasetBuildTokenValidator.java new file mode 100644 index 0000000000..40f1f3240b --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/configuration/security/DatasetBuildTokenValidator.java @@ -0,0 +1,64 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.configuration.security; + +import ai.starwhale.mlops.common.util.JwtTokenUtil; +import ai.starwhale.mlops.domain.dataset.build.BuildStatus; +import ai.starwhale.mlops.domain.dataset.build.mapper.BuildRecordMapper; +import ai.starwhale.mlops.domain.user.bo.User; +import ai.starwhale.mlops.exception.SwValidationException; +import ai.starwhale.mlops.exception.SwValidationException.ValidSubject; +import io.jsonwebtoken.Claims; +import java.util.Map; +import org.springframework.stereotype.Component; + + +@Component +public class DatasetBuildTokenValidator implements JwtClaimValidator { + + private static final String CLAIM_ID = "buildRecordId"; + final JwtTokenUtil jwtTokenUtil; + final BuildRecordMapper mapper; + + public DatasetBuildTokenValidator(JwtTokenUtil jwtTokenUtil, BuildRecordMapper mapper) { + this.jwtTokenUtil = jwtTokenUtil; + this.mapper = mapper; + } + + public String getToken(User owner, Long taskId) { + String jwtToken = jwtTokenUtil.generateAccessToken(owner, Map.of(CLAIM_ID, taskId)); + return String.format("Bearer %s", jwtToken); + } + + @Override + public void validClaims(Claims claims) throws SwValidationException { + Object claimId = claims.get(CLAIM_ID); + if (null == claimId) { + return; + } + long id; + try { + id = ((Number) claimId).longValue(); + } catch (ClassCastException e) { + throw new SwValidationException(ValidSubject.USER, "dataset build record claim invalid"); + } + var record = mapper.selectById(id); + if (null == record || BuildStatus.BUILDING != record.getStatus()) { + throw new SwValidationException(ValidSubject.USER, "dataset build record claim invalid"); + } + } +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/DatasetService.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/DatasetService.java index 82bffad9c3..8aa820a995 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/DatasetService.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/DatasetService.java @@ -16,6 +16,8 @@ package ai.starwhale.mlops.domain.dataset; +import static ai.starwhale.mlops.schedule.k8s.ResourceOverwriteSpec.RESOURCE_CPU; +import static ai.starwhale.mlops.schedule.k8s.ResourceOverwriteSpec.RESOURCE_MEMORY; import static cn.hutool.core.util.BooleanUtil.toInt; import ai.starwhale.mlops.api.protocol.dataset.DatasetInfoVo; @@ -23,13 +25,16 @@ import ai.starwhale.mlops.api.protocol.dataset.DatasetVersionVo; import ai.starwhale.mlops.api.protocol.dataset.DatasetViewVo; import ai.starwhale.mlops.api.protocol.dataset.DatasetVo; +import ai.starwhale.mlops.api.protocol.dataset.build.BuildRecordVo; import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataIndexDesc; import ai.starwhale.mlops.api.protocol.storage.FlattenFileVo; +import ai.starwhale.mlops.common.DockerImage; import ai.starwhale.mlops.common.IdConverter; import ai.starwhale.mlops.common.PageParams; import ai.starwhale.mlops.common.TagAction; import ai.starwhale.mlops.common.VersionAliasConverter; import ai.starwhale.mlops.common.util.PageUtil; +import ai.starwhale.mlops.configuration.security.DatasetBuildTokenValidator; import ai.starwhale.mlops.domain.bundle.BundleManager; import ai.starwhale.mlops.domain.bundle.BundleUrl; import ai.starwhale.mlops.domain.bundle.BundleVersionUrl; @@ -40,6 +45,10 @@ import ai.starwhale.mlops.domain.dataset.bo.DatasetQuery; import ai.starwhale.mlops.domain.dataset.bo.DatasetVersion; import ai.starwhale.mlops.domain.dataset.bo.DatasetVersionQuery; +import ai.starwhale.mlops.domain.dataset.build.BuildStatus; +import ai.starwhale.mlops.domain.dataset.build.bo.CreateBuildRecordRequest; +import ai.starwhale.mlops.domain.dataset.build.mapper.BuildRecordMapper; +import ai.starwhale.mlops.domain.dataset.build.po.BuildRecordEntity; import ai.starwhale.mlops.domain.dataset.converter.DatasetVersionVoConverter; import ai.starwhale.mlops.domain.dataset.converter.DatasetVoConverter; import ai.starwhale.mlops.domain.dataset.dataloader.DataLoader; @@ -51,8 +60,10 @@ import ai.starwhale.mlops.domain.dataset.po.DatasetVersionViewEntity; import ai.starwhale.mlops.domain.project.ProjectService; import ai.starwhale.mlops.domain.project.bo.Project; +import ai.starwhale.mlops.domain.runtime.RuntimeResource; import ai.starwhale.mlops.domain.storage.StorageService; import ai.starwhale.mlops.domain.storage.UriAccessor; +import ai.starwhale.mlops.domain.system.SystemSettingService; import ai.starwhale.mlops.domain.trash.Trash; import ai.starwhale.mlops.domain.trash.Trash.Type; import ai.starwhale.mlops.domain.trash.TrashService; @@ -64,13 +75,26 @@ import ai.starwhale.mlops.exception.SwValidationException; import ai.starwhale.mlops.exception.SwValidationException.ValidSubject; import ai.starwhale.mlops.exception.api.StarwhaleApiException; +import ai.starwhale.mlops.schedule.k8s.ContainerOverwriteSpec; +import ai.starwhale.mlops.schedule.k8s.K8sClient; +import ai.starwhale.mlops.schedule.k8s.K8sJobTemplate; +import ai.starwhale.mlops.schedule.k8s.ResourceOverwriteSpec; +import ai.starwhale.mlops.storage.StorageAccessService; import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import com.google.common.base.Joiner; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.models.V1EnvVar; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -79,8 +103,10 @@ import java.util.stream.Collectors; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StringUtils; @Slf4j @@ -89,9 +115,11 @@ public class DatasetService { private final DatasetMapper datasetMapper; private final DatasetVersionMapper datasetVersionMapper; + private final BuildRecordMapper buildRecordMapper; private final DatasetVoConverter datasetVoConverter; private final DatasetVersionVoConverter versionConvertor; private final StorageService storageService; + private final StorageAccessService storageAccessService; private final ProjectService projectService; private final DatasetDao datasetDao; private final IdConverter idConvertor; @@ -100,20 +128,33 @@ public class DatasetService { private final UriAccessor uriAccessor; private final DataLoader dataLoader; private final TrashService trashService; + private final K8sClient k8sClient; + private final K8sJobTemplate k8sJobTemplate; + private final DatasetBuildTokenValidator datasetBuildTokenValidator; + private final SystemSettingService systemSettingService; + private final String instanceUri; @Setter private BundleManager bundleManager; public DatasetService(ProjectService projectService, DatasetMapper datasetMapper, - DatasetVersionMapper datasetVersionMapper, DatasetVoConverter datasetVoConverter, - DatasetVersionVoConverter versionConvertor, StorageService storageService, DatasetDao datasetDao, - IdConverter idConvertor, VersionAliasConverter versionAliasConvertor, UserService userService, - UriAccessor uriAccessor, DataLoader dataLoader, TrashService trashService) { + DatasetVersionMapper datasetVersionMapper, BuildRecordMapper buildRecordMapper, + DatasetVoConverter datasetVoConverter, DatasetVersionVoConverter versionConvertor, + StorageService storageService, StorageAccessService storageAccessService, + DatasetDao datasetDao, IdConverter idConvertor, VersionAliasConverter versionAliasConvertor, + UserService userService, UriAccessor uriAccessor, + DataLoader dataLoader, TrashService trashService, + K8sClient k8sClient, K8sJobTemplate k8sJobTemplate, + DatasetBuildTokenValidator datasetBuildTokenValidator, + SystemSettingService systemSettingService, + @Value("${sw.instance-uri}") String instanceUri) { this.projectService = projectService; this.datasetMapper = datasetMapper; this.datasetVersionMapper = datasetVersionMapper; + this.buildRecordMapper = buildRecordMapper; this.datasetVoConverter = datasetVoConverter; this.versionConvertor = versionConvertor; this.storageService = storageService; + this.storageAccessService = storageAccessService; this.datasetDao = datasetDao; this.idConvertor = idConvertor; this.versionAliasConvertor = versionAliasConvertor; @@ -121,6 +162,11 @@ public DatasetService(ProjectService projectService, DatasetMapper datasetMapper this.uriAccessor = uriAccessor; this.dataLoader = dataLoader; this.trashService = trashService; + this.k8sClient = k8sClient; + this.k8sJobTemplate = k8sJobTemplate; + this.datasetBuildTokenValidator = datasetBuildTokenValidator; + this.systemSettingService = systemSettingService; + this.instanceUri = instanceUri; this.bundleManager = new BundleManager( idConvertor, versionAliasConvertor, @@ -371,4 +417,160 @@ public Map signLinks(String project, String datasetName, Set 0) { + throw new SwValidationException(ValidSubject.DATASET, MessageFormat.format( + "The dataset:{0} in project:{1} is already in building.", + request.getDatasetName(), project.getName())); + } + var entity = BuildRecordEntity.builder() + .datasetId(request.getDatasetId()) + .projectId(project.getId()) + .shared(request.getShared()) + .datasetName(request.getDatasetName()) + .storagePath(request.getStoragePath()) + .type(request.getType()) + .status(BuildStatus.CREATED) + .createdTime(new Date()) + .build(); + var res = buildRecordMapper.insert(entity) > 0; + if (res) { + // start build + var user = userService.currentUserDetail(); + DockerImage image; + var runtimeProperties = systemSettingService.getRunTimeProperties(); + if (runtimeProperties != null && runtimeProperties.getDatasetBuild() != null + && StringUtils.hasText(runtimeProperties.getDatasetBuild().getImage())) { + image = new DockerImage(systemSettingService.getRunTimeProperties().getDatasetBuild().getImage()); + } else { + image = new DockerImage("docker-registry.starwhale.cn/star-whale/starwhale:latest"); + } + + var job = k8sJobTemplate.loadJob(K8sJobTemplate.WORKLOAD_TYPE_DATASET_BUILD); + + // record id to annotations + var info = Map.of("id", String.valueOf(entity.getId())); + k8sJobTemplate.updateAnnotations(job.getMetadata(), info); + k8sJobTemplate.updateAnnotations(job.getSpec().getTemplate().getMetadata(), info); + + Map ret = new HashMap<>(); + var envVars = List.of( + new V1EnvVar().name("SW_INSTANCE_URI").value(instanceUri), + new V1EnvVar().name("SW_PROJECT").value(project.getName()), + new V1EnvVar().name("SW_TOKEN").value(datasetBuildTokenValidator.getToken(user, entity.getId())), + new V1EnvVar().name("DATASET_BUILD_NAME").value(entity.getDatasetName()), + new V1EnvVar().name("DATASET_BUILD_TYPE").value(String.valueOf(entity.getType())), + new V1EnvVar().name("DATASET_BUILD_DIR_PREFIX").value(entity.getStoragePath()) + ); + String rp = null; + if (runtimeProperties != null && runtimeProperties.getDatasetBuild() != null) { + rp = systemSettingService.getRunTimeProperties().getDatasetBuild().getResourcePool(); + } + var pool = Objects.isNull(rp) ? null : systemSettingService.queryResourcePool(rp); + k8sJobTemplate.getContainersTemplates(job).forEach(templateContainer -> { + ContainerOverwriteSpec containerOverwriteSpec = new ContainerOverwriteSpec(templateContainer.getName()); + containerOverwriteSpec.setEnvs(envVars); + containerOverwriteSpec.setImage( + image.resolve(systemSettingService.getDockerSetting().getRegistryForPull())); + containerOverwriteSpec.setCmds(List.of("dataset_build")); + List resources = Objects.isNull(pool) ? List.of() + : pool.validateAndPatchResource(List.of( + RuntimeResource.builder().type(RESOURCE_CPU).build(), + RuntimeResource.builder().type(RESOURCE_MEMORY).build())); + log.info("using resource pool {}, patched resources {}", pool, resources); + containerOverwriteSpec.setResourceOverwriteSpec(new ResourceOverwriteSpec(resources)); + + ret.put(templateContainer.getName(), containerOverwriteSpec); + }); + Map nodeSelector = pool != null ? pool.getNodeSelector() : Map.of(); + var toleration = pool != null ? pool.getTolerations() : null; + k8sJobTemplate.renderJob(job, String.format("%s-%d", entity.getDatasetName(), entity.getId()), + "Never", 0, ret, nodeSelector, toleration, null); + + log.debug("deploying dataset build job to k8s :{}", JSONUtil.toJsonStr(job)); + try { + k8sClient.deployJob(job); + } catch (ApiException e) { + throw new SwProcessException(ErrorType.K8S, "deploy dataset build job failed", e); + } + } else { + throw new SwProcessException(ErrorType.DB, "create build record failed"); + } + } + + @Transactional + public boolean updateBuildStatus(Long id, BuildStatus status) { + var record = buildRecordMapper.selectById(id); + if (record == null) { + log.warn("build record:{} can't find when update status to {}.", id, status); + return false; + } + var res = buildRecordMapper.updateStatus(id, status) > 0; + if (res && status == BuildStatus.SUCCESS && record.getShared()) { + // update shared + var dataset = datasetMapper.findByName(record.getDatasetName(), record.getProjectId(), false); + if (dataset != null) { + var version = datasetVersionMapper.findByLatest(dataset.getId()); + if (version != null) { + datasetVersionMapper.updateShared(version.getId(), true); + } + } + } + return res; + } + + public PageInfo listBuildRecords(String projectUrl, BuildStatus status, PageParams pageParams) { + var project = projectService.findProject(projectUrl); + PageHelper.startPage(pageParams.getPageNum(), pageParams.getPageSize()); + var entities = buildRecordMapper.selectByStatus(project.getId(), status); + return PageUtil.toPageInfo(entities, entity -> BuildRecordVo.builder() + .id(String.valueOf(entity.getId())) + .datasetId(String.valueOf(entity.getDatasetId())) + .datasetName(entity.getDatasetName()) + .type(entity.getType()) + .status(entity.getStatus()) + .createTime(entity.getCreatedTime().getTime()) + .build() + ); + } + + public String buildLogContent(Long id) { + var record = buildRecordMapper.selectById(id); + if (StringUtils.hasText(record.getLogPath())) { + try (InputStream inputStream = storageAccessService.get(record.getLogPath())) { + return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new SwProcessException(ErrorType.DB, + MessageFormat.format("read build log path failed {}", id), e); + } + } + return ""; + } } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildRecordGarbageCollector.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildRecordGarbageCollector.java new file mode 100644 index 0000000000..0a96dbb4ad --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildRecordGarbageCollector.java @@ -0,0 +1,58 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.domain.dataset.build; + +import ai.starwhale.mlops.domain.dataset.build.mapper.BuildRecordMapper; +import ai.starwhale.mlops.domain.dataset.build.po.BuildRecordEntity; +import ai.starwhale.mlops.storage.StorageAccessService; +import java.io.IOException; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + + +@Slf4j +@Component +public class BuildRecordGarbageCollector { + + private final BuildRecordMapper mapper; + + private final StorageAccessService storageAccessService; + + public BuildRecordGarbageCollector(BuildRecordMapper mapper, StorageAccessService storageAccessService) { + this.mapper = mapper; + this.storageAccessService = storageAccessService; + } + + @Scheduled(cron = "${sw.dataset.build.gc-rate:0 0 0 * * ?}") + public void gc() { + var finished = mapper.selectFinishedAndUncleaned(); + for (BuildRecordEntity finishedRecord : finished) { + try { + var files = storageAccessService.list(finishedRecord.getStoragePath()) + .collect(Collectors.toList()); + for (String file : files) { + storageAccessService.delete(file); + } + mapper.updateCleaned(finishedRecord.getId()); + } catch (IOException e) { + log.warn("delete storage file path:{} failed", finishedRecord.getStoragePath(), e); + } + } + } +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildStatus.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildStatus.java new file mode 100644 index 0000000000..5f5f980ef4 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildStatus.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.domain.dataset.build; + +public enum BuildStatus { + /** + * build status + */ + CREATED, + BUILDING, + SUCCESS, + FAILED +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildType.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildType.java new file mode 100644 index 0000000000..7d25ea5914 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/BuildType.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.domain.dataset.build; + +public enum BuildType { + /** + * build type + */ + IMAGE, + VIDEO, + AUDIO, + // TODO : add more build type (HANDLER,YAML,JSON,JSONL,CSV,) +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/bo/CreateBuildRecordRequest.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/bo/CreateBuildRecordRequest.java new file mode 100644 index 0000000000..ae9827ea31 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/bo/CreateBuildRecordRequest.java @@ -0,0 +1,33 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.domain.dataset.build.bo; + +import ai.starwhale.mlops.domain.dataset.build.BuildType; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class CreateBuildRecordRequest { + private Long datasetId; + private String projectUrl; + private String datasetName; + private Boolean shared; + private BuildType type; + private String storagePath; + private String format; +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/converter/BuildRecordVoConverter.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/converter/BuildRecordVoConverter.java new file mode 100644 index 0000000000..0ce1e3715d --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/converter/BuildRecordVoConverter.java @@ -0,0 +1,20 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.domain.dataset.build.converter; + +public class BuildRecordVoConverter { +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/log/BuildLogCollector.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/log/BuildLogCollector.java new file mode 100644 index 0000000000..ebd4164369 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/log/BuildLogCollector.java @@ -0,0 +1,95 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.domain.dataset.build.log; + +import ai.starwhale.mlops.domain.dataset.build.mapper.BuildRecordMapper; +import ai.starwhale.mlops.exception.StarwhaleException; +import ai.starwhale.mlops.exception.SwProcessException; +import ai.starwhale.mlops.exception.SwProcessException.ErrorType; +import ai.starwhale.mlops.schedule.k8s.K8sClient; +import ai.starwhale.mlops.schedule.k8s.K8sJobTemplate; +import ai.starwhale.mlops.storage.StorageAccessService; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.models.V1Pod; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.text.MessageFormat; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + + +@Slf4j +@Component +public class BuildLogCollector { + + final String buildLogPath; + + final StorageAccessService storageService; + final BuildRecordMapper buildRecordMapper; + final K8sClient k8sClient; + final List containers; + + public BuildLogCollector(StorageAccessService storageService, + BuildRecordMapper buildRecordMapper, + K8sClient k8sClient, + K8sJobTemplate k8sJobTemplate, + @Value("${sw.dataset.build.log-path}") String buildLogPath) { + this.storageService = storageService; + this.buildRecordMapper = buildRecordMapper; + this.k8sClient = k8sClient; + this.containers = k8sJobTemplate.getJobContainerNames( + k8sJobTemplate.loadJob(K8sJobTemplate.WORKLOAD_TYPE_DATASET_BUILD)); + this.buildLogPath = buildLogPath; + } + + public void collect(String jobName, Long id) throws StarwhaleException { + log.debug("logging for dataset build {} begins...", id); + try { + V1Pod v1Pod = k8sClient.podOfJob(K8sClient.toV1LabelSelector(Map.of( + K8sJobTemplate.JOB_IDENTITY_LABEL, jobName))); + if (null == v1Pod) { + log.warn("pod not exists for dataset build {}", id); + return; + } + String buildLog = k8sClient.logOfPod(v1Pod, containers); + log.debug("logs for dataset build {} collected {} ...", id, + StringUtils.hasText(buildLog) ? buildLog.substring(0, Math.min(buildLog.length() - 1, 100)) : ""); + var record = buildRecordMapper.selectById(id); + if (null == record) { + log.warn("record not exists for dataset build {}", id); + return; + } + String logPath = resolveLogPath(String.format("%s-%s", record.getDatasetName(), record.getId())); + log.debug("putting log to storage at path {}", logPath); + storageService.put(logPath, buildLog.getBytes(StandardCharsets.UTF_8)); + buildRecordMapper.updateLogPath(id, logPath); + } catch (ApiException e) { + throw new SwProcessException(ErrorType.INFRA, + MessageFormat.format("k8s api exception {0}", e.getResponseBody()), e); + } catch (IOException e) { + throw new SwProcessException(ErrorType.STORAGE, "uploading log failed", e); + } + } + + private String resolveLogPath(String logName) { + return String.format("%s/%s.log", buildLogPath, logName); + } +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/mapper/BuildRecordMapper.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/mapper/BuildRecordMapper.java new file mode 100644 index 0000000000..d2e2f066b6 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/mapper/BuildRecordMapper.java @@ -0,0 +1,89 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.domain.dataset.build.mapper; + +import ai.starwhale.mlops.domain.dataset.build.BuildStatus; +import ai.starwhale.mlops.domain.dataset.build.po.BuildRecordEntity; +import java.util.List; +import java.util.Objects; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Options; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.jdbc.SQL; + +@Mapper +public interface BuildRecordMapper { + String COLUMNS_FOR_INSERT = "dataset_id, dataset_name, project_id, " + + "type, status, storage_path, log_path, format, shared, created_time"; + String COLUMNS_FOR_SELECT = "id, " + COLUMNS_FOR_INSERT; + + @Select("SELECT " + COLUMNS_FOR_SELECT + " FROM dataset_build_record WHERE id = #{id}") + BuildRecordEntity selectById(Long id); + + @SelectProvider(value = SqlProvider.class, method = "listByStatus") + List selectByStatus( + @Param("projectId") Long projectId, @Param("status") BuildStatus status); + + @Select("SELECT " + COLUMNS_FOR_SELECT + " FROM dataset_build_record " + + "WHERE project_id = #{projectId} AND dataset_name = #{datasetName} AND status = 'BUILDING' " + + "FOR UPDATE") + List selectBuildingInOneProjectForUpdate( + @Param("projectId") Long projectId, @Param("datasetName") String datasetName); + + @Select("SELECT " + COLUMNS_FOR_SELECT + " FROM dataset_build_record " + + "WHERE (status = 'SUCCESS' or status = 'FAILED') AND cleaned = 0") + List selectFinishedAndUncleaned(); + + @Update("UPDATE dataset_build_record set status = #{status} WHERE id = #{id}") + int updateStatus(@Param("id") Long id, @Param("status") BuildStatus status); + + @Update("UPDATE dataset_build_record set log_path = #{path} WHERE id = #{id}") + int updateLogPath(@Param("id") Long id, @Param("path") String logPath); + + @Update("UPDATE dataset_build_record set cleaned = 1 WHERE id = #{id} AND cleaned = 0") + int updateCleaned(@Param("id") Long id); + + @Insert("INSERT INTO dataset_build_record (" + COLUMNS_FOR_INSERT + ") " + + "VALUES (" + + "#{datasetId}, #{datasetName}, #{projectId}, " + + "#{type}, #{status}, #{storagePath}, #{logPath}, #{format}, #{shared}, #{createdTime}" + + ")") + @Options(useGeneratedKeys = true, keyProperty = "id") + int insert(BuildRecordEntity buildRecord); + + class SqlProvider { + public String listByStatus(@Param("projectId") Long projectId, @Param("status") BuildStatus status) { + return new SQL() { + { + SELECT(COLUMNS_FOR_SELECT); + FROM("dataset_build_record"); + if (Objects.nonNull(projectId)) { + WHERE("project_id = #{projectId}"); + } + if (null != status) { + WHERE("status = #{status}"); + } + ORDER_BY("id desc"); + } + }.toString(); + } + } +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/po/BuildRecordEntity.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/po/BuildRecordEntity.java new file mode 100644 index 0000000000..d8f44711c7 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/dataset/build/po/BuildRecordEntity.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.domain.dataset.build.po; + +import ai.starwhale.mlops.common.BaseEntity; +import ai.starwhale.mlops.domain.dataset.build.BuildStatus; +import ai.starwhale.mlops.domain.dataset.build.BuildType; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +@Data +@SuperBuilder +@NoArgsConstructor +@AllArgsConstructor +@EqualsAndHashCode(callSuper = true) +public class BuildRecordEntity extends BaseEntity { + private Long id; + private Long datasetId; + private Long projectId; + private Boolean shared; + private Boolean cleaned; + private String datasetName; + private BuildStatus status; + private BuildType type; + private String storagePath; + private String logPath; + private String format; +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/filestorage/FileStorageService.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/filestorage/FileStorageService.java new file mode 100644 index 0000000000..72797165e1 --- /dev/null +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/filestorage/FileStorageService.java @@ -0,0 +1,145 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.domain.filestorage; + +import static org.springframework.http.MediaType.APPLICATION_OCTET_STREAM_VALUE; + +import ai.starwhale.mlops.exception.SwProcessException; +import ai.starwhale.mlops.exception.SwValidationException; +import ai.starwhale.mlops.storage.StorageAccessService; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.convert.DurationStyle; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + + +@Slf4j +@Service +public class FileStorageService { + private final StorageAccessService storageAccessService; + private final String dataRootPath; + private final Long urlExpirationTimeMillis; + + public FileStorageService(StorageAccessService storageAccessService, + @Value("${sw.filestorage.data-root-path:files}") String dataRootPath, + @Value("${sw.filestorage.url-expiration-time:24h}") String urlExpirationTime) { + this.storageAccessService = storageAccessService; + if (dataRootPath.endsWith("/")) { + this.dataRootPath = dataRootPath; + } else { + this.dataRootPath = dataRootPath + "/"; + } + this.urlExpirationTimeMillis = DurationStyle.detectAndParse(urlExpirationTime).toMillis(); + } + + /** + * generate path prefix + * + * @return path + */ + public String generatePathPrefix(String flag) { + if (!StringUtils.hasText(flag)) { + throw new SwValidationException(SwValidationException.ValidSubject.OBJECT_STORE, "flag can't be null"); + } + return String.format("%s%s/%s/", dataRootPath, flag, UUID.randomUUID()); + } + + /** + * validate path prefix + * + * @param pathPrefix path prefix + * @return true if valid, false if not + */ + public boolean validatePathPrefix(String pathPrefix) { + return pathPrefix.startsWith(dataRootPath); + } + + /** + * generate signed urls for files(use the special oss which parsed from path prefix) + * + * @param pathPrefix path prefix + * @param files files to be signed. + * @return signed urls for files, key is file name, value is signed url. + */ + public Map generateSignedPutUrls(String pathPrefix, Set files) { + if (!validatePathPrefix(pathPrefix)) { + throw new SwValidationException(SwValidationException.ValidSubject.OBJECT_STORE, "pathPrefix is invalid"); + } + Map signedUrls = new HashMap<>(); + for (String file : files) { + try { + var url = storageAccessService.signedPutUrl( + pathPrefix + file, APPLICATION_OCTET_STREAM_VALUE, urlExpirationTimeMillis); + signedUrls.put(file, url); + } catch (IOException e) { + log.error("generate signed put url error", e); + throw new SwProcessException(SwProcessException.ErrorType.STORAGE, e.getMessage()); + } + } + return signedUrls; + } + + public void deleteFiles(String pathPrefix, Set files) { + if (!validatePathPrefix(pathPrefix)) { + throw new SwValidationException(SwValidationException.ValidSubject.OBJECT_STORE, "path is invalid"); + } + for (String file : files) { + try { + storageAccessService.delete(pathPrefix + file); + } catch (IOException e) { + log.error("delete path:{} error.", pathPrefix + file, e); + throw new SwProcessException(SwProcessException.ErrorType.STORAGE, e.getMessage()); + } + } + } + + + /** + * get signed urls for files(use the special oss which parsed from path prefix) + * + * @param pathPrefix path prefix + * @return signed urls for files, key is file name, value is signed url. + */ + public Map generateSignedGetUrls(String pathPrefix) { + if (!validatePathPrefix(pathPrefix)) { + throw new SwValidationException(SwValidationException.ValidSubject.OBJECT_STORE, "pathPrefix is invalid"); + } + try { + return storageAccessService.list(pathPrefix) + .collect(Collectors.toMap( + filePath -> filePath.substring(pathPrefix.length() + (pathPrefix.endsWith("/") ? 0 : 1)), + filePath -> { + try { + return storageAccessService.signedUrl(filePath, urlExpirationTimeMillis); + } catch (IOException e) { + log.error("file:{} generate signed get url error", filePath, e); + throw new SwProcessException(SwProcessException.ErrorType.STORAGE, e.getMessage()); + } + })); + } catch (IOException e) { + log.error("path:{} list files error", pathPrefix, e); + throw new SwProcessException(SwProcessException.ErrorType.STORAGE, e.getMessage()); + } + } +} diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/system/SystemSettingService.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/system/SystemSettingService.java index 77d6a81b98..146a5b0919 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/domain/system/SystemSettingService.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/system/SystemSettingService.java @@ -34,6 +34,7 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature; import java.util.List; import java.util.stream.Collectors; +import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; @@ -42,6 +43,7 @@ import org.springframework.util.CollectionUtils; @Slf4j +@Data @Order(1) @Service public class SystemSettingService implements CommandLineRunner { diff --git a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/JobEventHandler.java b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/JobEventHandler.java index 179e826fc7..b5b2e731e2 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/JobEventHandler.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/JobEventHandler.java @@ -16,6 +16,8 @@ package ai.starwhale.mlops.schedule.k8s; +import ai.starwhale.mlops.domain.dataset.DatasetService; +import ai.starwhale.mlops.domain.dataset.build.BuildStatus; import ai.starwhale.mlops.domain.runtime.RuntimeService; import ai.starwhale.mlops.domain.task.status.TaskStatus; import ai.starwhale.mlops.domain.task.status.TaskStatusMachine; @@ -42,17 +44,19 @@ public class JobEventHandler implements ResourceEventHandler { private final TaskModifyReceiver taskModifyReceiver; private final TaskStatusMachine taskStatusMachine; private final RuntimeService runtimeService; + private final DatasetService datasetService; private final K8sClient k8sClient; public JobEventHandler( TaskModifyReceiver taskModifyReceiver, TaskStatusMachine taskStatusMachine, RuntimeService runtimeService, - K8sClient k8sClient - ) { + DatasetService datasetService, + K8sClient k8sClient) { this.taskModifyReceiver = taskModifyReceiver; this.taskStatusMachine = taskStatusMachine; this.runtimeService = runtimeService; + this.datasetService = datasetService; this.k8sClient = k8sClient; } @@ -92,6 +96,9 @@ private void dispatch(V1Job job, String event) { case K8sJobTemplate.WORKLOAD_TYPE_EVAL: updateEvalTask(job, false); break; + case K8sJobTemplate.WORKLOAD_TYPE_DATASET_BUILD: + updateDatasetBuildRecord(job); + break; case K8sJobTemplate.WORKLOAD_TYPE_IMAGE_BUILDER: updateImageBuildTask(job); break; @@ -116,6 +123,23 @@ private void updateImageBuildTask(V1Job job) { } } + private void updateDatasetBuildRecord(V1Job job) { + V1JobStatus status = job.getStatus(); + if (status == null) { + return; + } + var jobName = jobName(job); + var buildId = Long.parseLong(job.getMetadata().getAnnotations().get("id")); + if (null != status.getSucceeded()) { + log.info("dataset:{} build success", jobName); + datasetService.updateBuildStatus(buildId, BuildStatus.SUCCESS); + } else if (null != status.getFailed()) { + log.info("dataset:{} build failed", jobName); + datasetService.updateBuildStatus(buildId, BuildStatus.FAILED); + } + + } + private void updateEvalTask(V1Job job, boolean onDelete) { V1JobStatus status = job.getStatus(); if (status == null) { diff --git a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplate.java b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplate.java index 396c2250f1..ae6cf3d49e 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplate.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplate.java @@ -65,6 +65,7 @@ public class K8sJobTemplate { public static final String LABEL_APP = "app"; public static final String LABEL_WORKLOAD_TYPE = "starwhale-workload-type"; public static final String WORKLOAD_TYPE_EVAL = "eval"; + public static final String WORKLOAD_TYPE_DATASET_BUILD = "dataset-build"; public static final String WORKLOAD_TYPE_ONLINE_EVAL = "online-eval"; public static final String WORKLOAD_TYPE_IMAGE_BUILDER = "image-builder"; public static final int ONLINE_EVAL_PORT_IN_POD = 8080; @@ -106,6 +107,7 @@ public V1Job loadJob(String type) { V1Job job; switch (type) { case WORKLOAD_TYPE_EVAL: + case WORKLOAD_TYPE_DATASET_BUILD: job = Yaml.loadAs(evalJobTemplate, V1Job.class); break; case WORKLOAD_TYPE_IMAGE_BUILDER: @@ -174,6 +176,11 @@ public void updateLabels(V1Job job, Map labels) { originLabels = originLabels == null ? new HashMap<>() : originLabels; originLabels.putAll(labels); job.getMetadata().labels(originLabels); + + var specLabels = job.getSpec().getTemplate().getMetadata().getLabels(); + specLabels = specLabels == null ? new HashMap<>() : specLabels; + specLabels.putAll(labels); + job.getSpec().getTemplate().getMetadata().labels(specLabels); } public void updateAnnotations(V1ObjectMeta meta, Map annotations) { diff --git a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/PodEventHandler.java b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/PodEventHandler.java index a923b7fb7f..bb567da6e1 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/PodEventHandler.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/PodEventHandler.java @@ -16,6 +16,9 @@ package ai.starwhale.mlops.schedule.k8s; +import ai.starwhale.mlops.domain.dataset.DatasetService; +import ai.starwhale.mlops.domain.dataset.build.BuildStatus; +import ai.starwhale.mlops.domain.dataset.build.log.BuildLogCollector; import ai.starwhale.mlops.domain.job.cache.HotJobHolder; import ai.starwhale.mlops.domain.task.bo.Task; import ai.starwhale.mlops.domain.task.status.TaskStatus; @@ -36,14 +39,22 @@ public class PodEventHandler implements ResourceEventHandler { final TaskLogK8sCollector taskLogK8sCollector; + final BuildLogCollector buildLogCollector; final TaskModifyReceiver taskModifyReceiver; final HotJobHolder jobHolder; + final DatasetService datasetService; public PodEventHandler( - TaskLogK8sCollector taskLogK8sCollector, TaskModifyReceiver taskModifyReceiver, HotJobHolder jobHolder) { + TaskLogK8sCollector taskLogK8sCollector, + BuildLogCollector buildLogCollector, + TaskModifyReceiver taskModifyReceiver, + HotJobHolder jobHolder, + DatasetService datasetService) { this.taskLogK8sCollector = taskLogK8sCollector; + this.buildLogCollector = buildLogCollector; this.taskModifyReceiver = taskModifyReceiver; this.jobHolder = jobHolder; + this.datasetService = datasetService; } @Override @@ -52,29 +63,67 @@ public void onAdd(V1Pod obj) { @Override public void onUpdate(V1Pod oldObj, V1Pod newObj) { - // one task one k8s job - updateEvalTask(newObj); - collectLog(newObj); + var metaData = newObj.getMetadata(); + if (metaData == null) { + return; + } + var labels = metaData.getLabels(); + if (CollectionUtils.isEmpty(labels)) { + return; + } + var type = labels.get(K8sJobTemplate.JOB_TYPE_LABEL); + if (StringUtils.hasText(type)) { + log.debug("job({}) {} for {}.", type, "onUpdate", newObj.getMetadata().getLabels().get("job-name")); + switch (type) { + case K8sJobTemplate.WORKLOAD_TYPE_EVAL: + updateEvalTask(newObj); + collectLog(newObj, type); + break; + case K8sJobTemplate.WORKLOAD_TYPE_DATASET_BUILD: + updateDatasetBuild(newObj); + collectLog(newObj, type); + break; + default: + } + } + } @Override public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) { } - private Long getTaskId(V1Pod pod) { - String taskId = pod.getMetadata().getLabels().get("job-name"); - if (null == taskId || taskId.isBlank()) { - log.info("no task id found for pod {}", taskId); + private Long getJobNameAsId(V1Pod pod) { + String jobName = pod.getMetadata().getLabels().get("job-name"); + if (null == jobName || jobName.isBlank()) { + log.info("no id found for pod {}", jobName); return null; } - Long tid; + Long id; try { - tid = Long.valueOf(taskId); + id = Long.valueOf(jobName); } catch (Exception e) { - log.warn("task id is not number {}", taskId); - tid = null; + log.warn("id is not number {}", jobName); + id = null; + } + return id; + } + + private void updateDatasetBuild(V1Pod pod) { + if (null == pod.getStatus() || null == pod.getStatus().getPhase()) { + return; + } + var phase = pod.getStatus().getPhase(); + if (StringUtils.hasText(phase)) { + switch (phase) { + case "Running": + var id = Long.parseLong(pod.getMetadata().getAnnotations().get("id")); + datasetService.updateBuildStatus(id, BuildStatus.BUILDING); + break; + default: + + } } - return tid; } private void updateEvalTask(V1Pod pod) { @@ -87,8 +136,7 @@ private void updateEvalTask(V1Pod pod) { log.info("pod {} is being deleted", pod.getMetadata().getName()); return; } - - Long tid = getTaskId(pod); + Long tid = getJobNameAsId(pod); if (tid == null) { return; } @@ -136,7 +184,7 @@ private void updateEvalTask(V1Pod pod) { taskModifyReceiver.receive(List.of(report)); } - private void collectLog(V1Pod pod) { + private void collectLog(V1Pod pod, String type) { log.debug("collect log for pod {} status {}", pod.getMetadata().getName(), pod.getStatus()); if (null == pod.getStatus() || null == pod.getStatus().getContainerStatuses() @@ -149,16 +197,29 @@ private void collectLog(V1Pod pod) { if (pod.getMetadata() != null && pod.getMetadata().getDeletionTimestamp() != null) { return; } - Long tid = getTaskId(pod); - if (tid != null) { - Collection optionalTasks = jobHolder.tasksOfIds(List.of(tid)); - if (CollectionUtils.isEmpty(optionalTasks)) { - log.warn("no tasks found for pod {}", pod.getMetadata().getName()); - return; - } - Task task = optionalTasks.stream().findAny().get(); - taskLogK8sCollector.collect(task); + Long id; + switch (type) { + case K8sJobTemplate.WORKLOAD_TYPE_EVAL: + id = getJobNameAsId(pod); + if (id != null) { + Collection optionalTasks = jobHolder.tasksOfIds(List.of(id)); + if (CollectionUtils.isEmpty(optionalTasks)) { + log.warn("no tasks found for pod {}", pod.getMetadata().getName()); + return; + } + Task task = optionalTasks.stream().findAny().get(); + taskLogK8sCollector.collect(task); + } + break; + case K8sJobTemplate.WORKLOAD_TYPE_DATASET_BUILD: + String jobName = pod.getMetadata().getLabels().get("job-name"); + id = Long.parseLong(pod.getMetadata().getAnnotations().get("id")); + buildLogCollector.collect(jobName, id); + break; + default: } + + } } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/task/status/watchers/log/CancellableTaskLogCollector.java b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogCollector.java similarity index 86% rename from server/controller/src/main/java/ai/starwhale/mlops/domain/task/status/watchers/log/CancellableTaskLogCollector.java rename to server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogCollector.java index 5bd9a789b9..9590ab023d 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/domain/task/status/watchers/log/CancellableTaskLogCollector.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogCollector.java @@ -14,11 +14,11 @@ * limitations under the License. */ -package ai.starwhale.mlops.domain.task.status.watchers.log; +package ai.starwhale.mlops.schedule.k8s.log; import java.io.IOException; -public interface CancellableTaskLogCollector { +public interface CancellableJobLogCollector { String readLine() throws IOException; diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/task/status/watchers/log/CancellableTaskLogK8sCollector.java b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollector.java similarity index 80% rename from server/controller/src/main/java/ai/starwhale/mlops/domain/task/status/watchers/log/CancellableTaskLogK8sCollector.java rename to server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollector.java index 017c7b6914..abc374c4a7 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/domain/task/status/watchers/log/CancellableTaskLogK8sCollector.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollector.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package ai.starwhale.mlops.domain.task.status.watchers.log; +package ai.starwhale.mlops.schedule.k8s.log; import ai.starwhale.mlops.schedule.k8s.K8sClient; import io.kubernetes.client.openapi.ApiException; @@ -25,23 +25,23 @@ import okhttp3.Call; import okhttp3.Response; -public class CancellableTaskLogK8sCollector implements CancellableTaskLogCollector { +public class CancellableJobLogK8sCollector implements CancellableJobLogCollector { public static final String WORKER_CONTAINER = "worker"; final K8sClient k8sClient; final Call call; final Response resp; final BufferedReader bufferedReader; - public CancellableTaskLogK8sCollector(K8sClient k8sClient, Long taskId) + public CancellableJobLogK8sCollector(K8sClient k8sClient, String jobName) throws IOException, ApiException { this.k8sClient = k8sClient; - call = k8sClient.readLog(getPodName(taskId), WORKER_CONTAINER, true); + call = k8sClient.readLog(getPodName(jobName), WORKER_CONTAINER, true); resp = call.execute(); bufferedReader = new BufferedReader(new InputStreamReader(resp.body().byteStream(), StandardCharsets.UTF_8)); } - private String getPodName(Long taskId) throws ApiException { - var podList = k8sClient.getPodsByJobName(taskId.toString()); + private String getPodName(String taskId) throws ApiException { + var podList = k8sClient.getPodsByJobName(taskId); if (podList.getItems().isEmpty()) { throw new ApiException("get empty pod list by job name " + taskId); } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/task/status/watchers/log/CancellableTaskLogK8sCollectorFactory.java b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorFactory.java similarity index 71% rename from server/controller/src/main/java/ai/starwhale/mlops/domain/task/status/watchers/log/CancellableTaskLogK8sCollectorFactory.java rename to server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorFactory.java index 8e7a187b95..eaa4f2a07f 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/domain/task/status/watchers/log/CancellableTaskLogK8sCollectorFactory.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorFactory.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package ai.starwhale.mlops.domain.task.status.watchers.log; +package ai.starwhale.mlops.schedule.k8s.log; import ai.starwhale.mlops.schedule.k8s.K8sClient; import io.kubernetes.client.openapi.ApiException; @@ -22,14 +22,14 @@ import org.springframework.stereotype.Service; @Service -public class CancellableTaskLogK8sCollectorFactory { +public class CancellableJobLogK8sCollectorFactory { private final K8sClient k8sClient; - public CancellableTaskLogK8sCollectorFactory(K8sClient k8sClient) { + public CancellableJobLogK8sCollectorFactory(K8sClient k8sClient) { this.k8sClient = k8sClient; } - public CancellableTaskLogCollector make(Long taskId) throws IOException, ApiException { - return new CancellableTaskLogK8sCollector(this.k8sClient, taskId); + public CancellableJobLogCollector make(String jobName) throws IOException, ApiException { + return new CancellableJobLogK8sCollector(this.k8sClient, jobName); } } diff --git a/server/controller/src/main/resources/application.yaml b/server/controller/src/main/resources/application.yaml index 764a96c63f..4eac719559 100644 --- a/server/controller/src/main/resources/application.yaml +++ b/server/controller/src/main/resources/application.yaml @@ -23,6 +23,9 @@ sw: image-build: resource-pool: ${SW_IMAGE_BUILD_RESOURCE_POOL:default} image: ${SW_IMAGE_BUILD_IMAGE:gcr.io/kaniko-project/executor:latest} + dataset-build: + resource-pool: ${SW_DATASET_BUILD_RESOURCE_POOL:default} + image: ${SW_DATASET_BUILD_IMAGE:docker-registry.starwhale.cn/star-whale/starwhale:latest} pypi: index-url: ${SW_PYPI_INDEX_URL:} extra-index-url: ${SW_PYPI_EXTRA_INDEX_URL:} @@ -33,6 +36,9 @@ sw: issuer: ${SW_JWT_ISSUER:starwhale} expire-minutes: ${SW_JWT_TOKEN_EXPIRE_MINUTES:43200} dataset: + build: + log-path: ${DATASET_BUILD_LOG_PREFIX:/dataset-build/logs} + gc-rate: ${DATASET_BUILD_FILES_GC_RATE:0 0 0 * * ?} load: batch-size: ${DATASET_CONSUMPTION_BATCH_SIZE:50} task: diff --git a/server/controller/src/main/resources/db/migration/v0_4_0/V0_4_0_011__add_dataset_build_record.sql b/server/controller/src/main/resources/db/migration/v0_4_0/V0_4_0_011__add_dataset_build_record.sql new file mode 100644 index 0000000000..ca10b06e8a --- /dev/null +++ b/server/controller/src/main/resources/db/migration/v0_4_0/V0_4_0_011__add_dataset_build_record.sql @@ -0,0 +1,33 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +create table if not exists dataset_build_record +( + id bigint auto_increment primary key not null, + dataset_id bigint comment 'existence of dataset, it is a new dataset if it is null', + project_id bigint not null, + dataset_name varchar(255) not null comment 'should check dataset name when dataset id is null', + shared tinyint default 0 not null, + cleaned tinyint default 0 not null, + type varchar(64) not null comment 'image, video, audio, others(json, csv, txt ...etc)', + status varchar(32) not null comment 'created, building, failed, success', + storage_path varchar(255) not null, + log_path varchar(255), + format varchar(255) comment 'reserve for future use', + created_time datetime default CURRENT_TIMESTAMP not null, + modified_time datetime default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP +); diff --git a/server/controller/src/test/java/ai/starwhale/mlops/api/LogControllerTest.java b/server/controller/src/test/java/ai/starwhale/mlops/api/LogControllerTest.java index 13d6453d26..fb4b9729a3 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/api/LogControllerTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/api/LogControllerTest.java @@ -27,6 +27,7 @@ import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; +import ai.starwhale.mlops.domain.dataset.DatasetService; import ai.starwhale.mlops.domain.task.TaskService; import java.util.List; import java.util.Objects; @@ -43,7 +44,7 @@ public class LogControllerTest { @BeforeEach public void setUp() { taskService = mock(TaskService.class); - logController = new LogController(taskService); + logController = new LogController(taskService, mock(DatasetService.class)); } @Test diff --git a/server/controller/src/test/java/ai/starwhale/mlops/api/TaskLogWsServerTest.java b/server/controller/src/test/java/ai/starwhale/mlops/api/TaskLogWsServerTest.java index e54992c2ec..d1f7aa2a60 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/api/TaskLogWsServerTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/api/TaskLogWsServerTest.java @@ -22,8 +22,8 @@ import static org.mockito.Mockito.when; import ai.starwhale.mlops.common.IdConverter; -import ai.starwhale.mlops.domain.task.status.watchers.log.CancellableTaskLogK8sCollector; -import ai.starwhale.mlops.domain.task.status.watchers.log.CancellableTaskLogK8sCollectorFactory; +import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogK8sCollector; +import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogK8sCollectorFactory; import io.kubernetes.client.openapi.ApiException; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -32,17 +32,17 @@ import org.junit.jupiter.api.Test; public class TaskLogWsServerTest { - private CancellableTaskLogK8sCollectorFactory factory; - private CancellableTaskLogK8sCollector logK8sCollector; + private CancellableJobLogK8sCollectorFactory factory; + private CancellableJobLogK8sCollector logK8sCollector; private IdConverter idConvertor; private Session session; @BeforeEach public void setup() { - factory = mock(CancellableTaskLogK8sCollectorFactory.class); + factory = mock(CancellableJobLogK8sCollectorFactory.class); idConvertor = mock(IdConverter.class); session = mock(Session.class); - logK8sCollector = mock(CancellableTaskLogK8sCollector.class); + logK8sCollector = mock(CancellableJobLogK8sCollector.class); } @Test @@ -52,12 +52,12 @@ public void testOpen() throws IOException, ApiException, InterruptedException { server.setLogCollectorFactory(factory); final Long taskId = 1L; - when(factory.make(taskId)).thenReturn(logK8sCollector); + when(factory.make(taskId.toString())).thenReturn(logK8sCollector); when(session.getId()).thenReturn("1"); when(idConvertor.revert(any())).thenReturn(taskId); when(logK8sCollector.readLine()).thenReturn("foo"); server.onOpen(session, "1"); - verify(factory).make(taskId); + verify(factory).make(taskId.toString()); TimeUnit.MILLISECONDS.sleep(500); verify(logK8sCollector).readLine(); } diff --git a/server/controller/src/test/java/ai/starwhale/mlops/domain/dataset/DatasetServiceTest.java b/server/controller/src/test/java/ai/starwhale/mlops/domain/dataset/DatasetServiceTest.java index 967391e0e3..aaa864a902 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/domain/dataset/DatasetServiceTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/domain/dataset/DatasetServiceTest.java @@ -16,6 +16,8 @@ package ai.starwhale.mlops.domain.dataset; +import static ai.starwhale.mlops.schedule.k8s.K8sJobTemplate.JOB_TYPE_LABEL; +import static ai.starwhale.mlops.schedule.k8s.K8sJobTemplate.WORKLOAD_TYPE_DATASET_BUILD; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.hasItem; @@ -25,11 +27,13 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.any; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.mock; @@ -37,6 +41,7 @@ import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -46,6 +51,7 @@ import ai.starwhale.mlops.common.IdConverter; import ai.starwhale.mlops.common.PageParams; import ai.starwhale.mlops.common.VersionAliasConverter; +import ai.starwhale.mlops.configuration.security.DatasetBuildTokenValidator; import ai.starwhale.mlops.domain.bundle.BundleException; import ai.starwhale.mlops.domain.bundle.BundleManager; import ai.starwhale.mlops.domain.bundle.BundleUrl; @@ -55,6 +61,11 @@ import ai.starwhale.mlops.domain.dataset.bo.DatasetQuery; import ai.starwhale.mlops.domain.dataset.bo.DatasetVersion; import ai.starwhale.mlops.domain.dataset.bo.DatasetVersionQuery; +import ai.starwhale.mlops.domain.dataset.build.BuildStatus; +import ai.starwhale.mlops.domain.dataset.build.BuildType; +import ai.starwhale.mlops.domain.dataset.build.bo.CreateBuildRecordRequest; +import ai.starwhale.mlops.domain.dataset.build.mapper.BuildRecordMapper; +import ai.starwhale.mlops.domain.dataset.build.po.BuildRecordEntity; import ai.starwhale.mlops.domain.dataset.converter.DatasetVersionVoConverter; import ai.starwhale.mlops.domain.dataset.converter.DatasetVoConverter; import ai.starwhale.mlops.domain.dataset.dataloader.DataLoader; @@ -67,11 +78,21 @@ import ai.starwhale.mlops.domain.project.bo.Project; import ai.starwhale.mlops.domain.storage.StorageService; import ai.starwhale.mlops.domain.storage.UriAccessor; +import ai.starwhale.mlops.domain.system.SystemSettingService; import ai.starwhale.mlops.domain.trash.TrashService; import ai.starwhale.mlops.domain.user.UserService; import ai.starwhale.mlops.domain.user.bo.User; import ai.starwhale.mlops.exception.SwNotFoundException; +import ai.starwhale.mlops.exception.SwProcessException; import ai.starwhale.mlops.exception.SwValidationException; +import ai.starwhale.mlops.schedule.k8s.K8sClient; +import ai.starwhale.mlops.schedule.k8s.K8sJobTemplate; +import ai.starwhale.mlops.storage.StorageAccessService; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.models.V1Job; +import io.kubernetes.client.openapi.models.V1JobSpec; +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1PodTemplateSpec; import java.util.List; import java.util.Map; import java.util.Objects; @@ -88,6 +109,7 @@ public class DatasetServiceTest { private DatasetService service; private DatasetMapper datasetMapper; private DatasetVersionMapper datasetVersionMapper; + private BuildRecordMapper buildRecordMapper; private DatasetVoConverter datasetConvertor; private DatasetVersionVoConverter versionConvertor; private StorageService storageService; @@ -97,6 +119,10 @@ public class DatasetServiceTest { private UriAccessor uriAccessor; private DataLoader dataLoader; private TrashService trashService; + private K8sClient k8sClient; + private K8sJobTemplate k8sJobTemplate; + private DatasetBuildTokenValidator datasetBuildTokenValidator; + private SystemSettingService systemSettingService; @Setter private BundleManager bundleManager; @@ -105,6 +131,7 @@ public class DatasetServiceTest { public void setUp() { datasetMapper = mock(DatasetMapper.class); datasetVersionMapper = mock(DatasetVersionMapper.class); + buildRecordMapper = mock(BuildRecordMapper.class); datasetConvertor = mock(DatasetVoConverter.class); given(datasetConvertor.convert(any(DatasetEntity.class))) .willAnswer(invocation -> { @@ -145,22 +172,31 @@ public void setUp() { dataLoader = mock(DataLoader.class); trashService = mock(TrashService.class); + k8sClient = mock(K8sClient.class); + k8sJobTemplate = mock(K8sJobTemplate.class); + datasetBuildTokenValidator = mock(DatasetBuildTokenValidator.class); + systemSettingService = mock(SystemSettingService.class); service = new DatasetService( projectService, datasetMapper, datasetVersionMapper, + buildRecordMapper, datasetConvertor, versionConvertor, storageService, + mock(StorageAccessService.class), datasetDao, new IdConverter(), new VersionAliasConverter(), userService, uriAccessor, dataLoader, - trashService - ); + trashService, + k8sClient, + k8sJobTemplate, + datasetBuildTokenValidator, + systemSettingService, ""); bundleManager = mock(BundleManager.class); given(bundleManager.getBundleId(any(BundleUrl.class))) .willAnswer(invocation -> { @@ -416,17 +452,22 @@ public void testShareDatasetVersion() { projectService, mock(DatasetMapper.class), datasetVersionMapper, + mock(BuildRecordMapper.class), mock(DatasetVoConverter.class), mock(DatasetVersionVoConverter.class), mock(StorageService.class), + mock(StorageAccessService.class), datasetDao, new IdConverter(), versionAliasConverter, mock(UserService.class), mock(UriAccessor.class), mock(DataLoader.class), - mock(TrashService.class) - ); + mock(TrashService.class), + mock(K8sClient.class), + mock(K8sJobTemplate.class), + mock(DatasetBuildTokenValidator.class), + mock(SystemSettingService.class), ""); // public project when(projectService.getProjectVo("pub")).thenReturn(ProjectVo.builder().id("1").privacy("PUBLIC").build()); @@ -493,4 +534,137 @@ public void testListDatasetVersionView() { assertEquals("v3", res.get(3).getVersions().get(0).getAlias()); } + @Test + public void testStartBuild() throws ApiException { + var datasetName = "test-build-ds"; + var projectId = 1L; + given(projectService.findProject(String.valueOf(projectId))) + .willReturn(Project.builder().id(projectId).build()); + + // case1-1: patch and the dataset is not exist + given(datasetMapper.find(1L)).willReturn(null); + assertThrows(SwValidationException.class, () -> service.build(CreateBuildRecordRequest.builder() + .datasetId(1L) + .datasetName(datasetName) + .projectUrl(String.valueOf(projectId)) + .build()) + ); + // case1-2: patch and the dataset name in param is not right + given(datasetMapper.find(1L)).willReturn(DatasetEntity.builder().datasetName("already-dataset").build()); + assertThrows(SwValidationException.class, () -> service.build(CreateBuildRecordRequest.builder() + .datasetId(1L) + .datasetName(datasetName) + .projectUrl(String.valueOf(projectId)) + .build()) + ); + + // case2: create and already exist the same name dataset + given(datasetMapper.findByName(datasetName, projectId, true)) + .willReturn(DatasetEntity.builder().build()); + assertThrows(SwValidationException.class, () -> service.build(CreateBuildRecordRequest.builder() + .datasetId(null) + .datasetName(datasetName) + .projectUrl(String.valueOf(projectId)) + .build()) + ); + + // case3: create and not exist the same name, but already building a same name dataset + given(datasetMapper.findByName(datasetName, projectId, true)).willReturn(null); + given(buildRecordMapper.selectBuildingInOneProjectForUpdate(projectId, datasetName)) + .willReturn(List.of(BuildRecordEntity.builder().build())); + assertThrows(SwValidationException.class, () -> service.build(CreateBuildRecordRequest.builder() + .datasetId(null) + .datasetName(datasetName) + .projectUrl(String.valueOf(projectId)) + .build()) + ); + + // case4: insert to db failed + given(buildRecordMapper.selectBuildingInOneProjectForUpdate(projectId, datasetName)).willReturn(List.of()); + given(buildRecordMapper.insert(any())).willReturn(0); + assertThrows(SwProcessException.class, () -> service.build(CreateBuildRecordRequest.builder() + .datasetId(null) + .datasetName(datasetName) + .projectUrl(String.valueOf(projectId)) + .build()) + ); + + // case5: normal build + given(buildRecordMapper.insert(any())).willReturn(1); + V1Job v1Job = new V1Job(); + v1Job.setMetadata(new V1ObjectMeta() + .name("dataset_build-1").labels(Map.of(JOB_TYPE_LABEL, WORKLOAD_TYPE_DATASET_BUILD))); + v1Job.setSpec(new V1JobSpec().template(new V1PodTemplateSpec().metadata(new V1ObjectMeta()))); + given(k8sJobTemplate.loadJob(WORKLOAD_TYPE_DATASET_BUILD)).willReturn(v1Job); + + service.build(CreateBuildRecordRequest.builder() + .datasetId(null) + .datasetName(datasetName) + .shared(true) + .type(BuildType.IMAGE) + .projectUrl(String.valueOf(projectId)) + .storagePath("storage-path") + .build()); + verify(k8sJobTemplate, times(1)).loadJob(WORKLOAD_TYPE_DATASET_BUILD); + verify(k8sJobTemplate, times(2)).updateAnnotations(any(), any()); + verify(datasetBuildTokenValidator, times(1)).getToken(any(), any()); + verify(k8sJobTemplate, times(1)).renderJob( + any(), eq("test-build-ds-null"), eq("Never"), eq(0), any(), any(), isNull(), isNull()); + verify(k8sClient, times(1)).deployJob(any()); + } + + @Test + public void testUpdateBuildRecord() { + var recordId = 1L; + var projectId = 101L; + var datasetName = "test-build-ds"; + + // case1: not found + given(buildRecordMapper.selectById(recordId)).willReturn(null); + assertFalse(service.updateBuildStatus(recordId, BuildStatus.SUCCESS)); + + var record = BuildRecordEntity.builder() + .id(recordId).projectId(projectId).datasetName(datasetName).shared(false).build(); + given(buildRecordMapper.selectById(recordId)).willReturn(record); + + // case2: update failed + given(buildRecordMapper.updateStatus(recordId, BuildStatus.SUCCESS)).willReturn(0); + assertFalse(service.updateBuildStatus(recordId, BuildStatus.SUCCESS)); + + // update is ok + given(buildRecordMapper.updateStatus(eq(recordId), any())).willReturn(1); + // case3: update to failed and shared is false + service.updateBuildStatus(recordId, BuildStatus.FAILED); + verify(datasetMapper, times(0)).findByName(eq(datasetName), eq(projectId), eq(true)); + + record.setShared(true); + // case4: update to failed and shared is true + service.updateBuildStatus(recordId, BuildStatus.FAILED); + verify(datasetMapper, times(0)).findByName(eq(datasetName), eq(projectId), eq(true)); + + // case5: update to success and shared is true + given(datasetMapper.findByName(datasetName, projectId, false)) + .willReturn(DatasetEntity.builder().id(1000L).build()); + given(datasetVersionMapper.findByLatest(1000L)) + .willReturn(DatasetVersionEntity.builder().id(10000L).build()); + + service.updateBuildStatus(recordId, BuildStatus.SUCCESS); + + verify(datasetVersionMapper, times(1)).updateShared(eq(10000L), eq(true)); + } + + @Test + public void testListBuildRecord() { + Long project = 1L; + given(projectService.findProject(String.valueOf(project))).willReturn(Project.builder().id(project).build()); + given(buildRecordMapper.selectByStatus(project, BuildStatus.SUCCESS)) + .willReturn(List.of(BuildRecordEntity.builder().id(10L).datasetName("ds").build())); + + var page = service.listBuildRecords( + String.valueOf(project), BuildStatus.SUCCESS, PageParams.builder().pageNum(1).pageSize(10).build()); + assertThat(page, allOf( + hasProperty("list", iterableWithSize(1)) + )); + } + } diff --git a/server/controller/src/test/java/ai/starwhale/mlops/domain/filestorage/FileStorageServiceTest.java b/server/controller/src/test/java/ai/starwhale/mlops/domain/filestorage/FileStorageServiceTest.java new file mode 100644 index 0000000000..32e812bb20 --- /dev/null +++ b/server/controller/src/test/java/ai/starwhale/mlops/domain/filestorage/FileStorageServiceTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.starwhale.mlops.domain.filestorage; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import ai.starwhale.mlops.exception.SwValidationException; +import ai.starwhale.mlops.storage.StorageAccessService; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + + +public class FileStorageServiceTest { + private final StorageAccessService storageAccessService = mock(StorageAccessService.class); + private FileStorageService fileStorageService; + + private static final String rootPath = "mock-files/"; + private static final String flag = "my-dir"; + private static final String uuidStr = "123e4567-e89b-12d3-a456-426655440000"; + + private static final String pathPrefix = rootPath + flag + "/" + uuidStr + "/"; + + @BeforeEach + public void init() { + fileStorageService = new FileStorageService( + storageAccessService, rootPath, "4h"); + } + + @Test + public void testApplyPath() { + var expect = UUID.fromString(uuidStr); + try (MockedStatic uuidMocked = mockStatic(UUID.class)) { + uuidMocked.when(UUID::randomUUID).thenReturn(expect); + assertEquals(pathPrefix, fileStorageService.generatePathPrefix(flag)); + } + } + + @Test + public void testGeneratePutUrl() throws IOException { + given(storageAccessService.signedPutUrl(any(), any(), any())).willReturn("signedUrl"); + assertThrows(SwValidationException.class, () -> + fileStorageService.generateSignedPutUrls("invalidPath", Set.of("a.txt", "b.txt", "c.txt"))); + + assertThat("signed put urls", + fileStorageService.generateSignedPutUrls(pathPrefix, Set.of("a.txt", "b.txt")), + is(Map.of( + "a.txt", "signedUrl", + "b.txt", "signedUrl" + )) + ); + + } + + @Test + public void testGenerateGetUrl() throws IOException { + given(storageAccessService.list(pathPrefix)) + .willReturn(List.of(pathPrefix + "a.txt", pathPrefix + "b.txt").stream()); + given(storageAccessService.signedUrl(any(), any())).willReturn("signedGetUrl"); + assertThrows(SwValidationException.class, () -> + fileStorageService.generateSignedGetUrls("invalidPath")); + + assertThat("signed get urls", fileStorageService.generateSignedGetUrls(pathPrefix), + is(Map.of( + "a.txt", "signedGetUrl", + "b.txt", "signedGetUrl" + )) + ); + } + + @Test + public void testDeleteFile() throws IOException { + assertThrows(SwValidationException.class, () -> + fileStorageService.deleteFiles("invalidPath", Set.of())); + + fileStorageService.deleteFiles(pathPrefix, Set.of("a.txt", "b.txt")); + verify(storageAccessService, times(2)).delete(any()); + } + +} diff --git a/server/controller/src/test/java/ai/starwhale/mlops/domain/runtime/RuntimeServiceTest.java b/server/controller/src/test/java/ai/starwhale/mlops/domain/runtime/RuntimeServiceTest.java index 95423ad881..32e570827f 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/domain/runtime/RuntimeServiceTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/domain/runtime/RuntimeServiceTest.java @@ -203,7 +203,8 @@ public void setUp() { runtimeTokenValidator, systemSettingService, new DockerSetting("localhost:8083", "localhost:8083", "admin", "admin123", false), - new RunTimeProperties("", new RunTimeProperties.ImageBuild("rc", ""), + new RunTimeProperties( + "", new RunTimeProperties.RunConfig("rc", ""), new RunTimeProperties.RunConfig("rc", ""), new RunTimeProperties.Pypi("https://pypi.io/simple", "https://edu.io/simple", "pypi.io", 1, 2), ""), "http://mock-controller"); bundleManager = mock(BundleManager.class); diff --git a/server/controller/src/test/java/ai/starwhale/mlops/domain/system/SystemSettingServiceTest.java b/server/controller/src/test/java/ai/starwhale/mlops/domain/system/SystemSettingServiceTest.java index f471d97630..3689bb5525 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/domain/system/SystemSettingServiceTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/domain/system/SystemSettingServiceTest.java @@ -120,7 +120,7 @@ public void setUp() throws Exception { systemSettingService = new SystemSettingService( systemSettingMapper, List.of(listener), - new RunTimeProperties("", new RunTimeProperties.ImageBuild(), + new RunTimeProperties("", new RunTimeProperties.RunConfig(), new RunTimeProperties.RunConfig(), new Pypi("url1", "url2", "host1", 11, 91), CONDARC), new DockerSetting("", "", "", "", false), userService); @@ -211,7 +211,8 @@ public void testStartWithoutData() throws Exception { SystemSettingService systemSettingService = new SystemSettingService( mock(SystemSettingMapper.class), List.of(listener), - new RunTimeProperties("", new RunTimeProperties.ImageBuild(), new Pypi("", "", "", 1, 2), ""), + new RunTimeProperties("", new RunTimeProperties.RunConfig(), new RunTimeProperties.RunConfig(), + new Pypi("", "", "", 1, 2), ""), new DockerSetting("abcd.com", "abcd2.com", "admin", "admin123", false), mock(UserService.class)); systemSettingService.run(); diff --git a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/JobEventHandlerTest.java b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/JobEventHandlerTest.java index b6811c0d7c..59087b57fd 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/JobEventHandlerTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/JobEventHandlerTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import ai.starwhale.mlops.domain.dataset.DatasetService; import ai.starwhale.mlops.domain.runtime.RuntimeService; import ai.starwhale.mlops.domain.task.status.TaskStatus; import ai.starwhale.mlops.domain.task.status.TaskStatusMachine; @@ -49,6 +50,7 @@ public class JobEventHandlerTest { TaskModifyReceiver taskModifyReceiver; + DatasetService datasetService; JobEventHandler jobEventHandler; private final K8sClient k8sClient = mock(K8sClient.class); private final OffsetDateTime startTime = OffsetDateTime.now().minusMinutes(1); @@ -57,9 +59,10 @@ public class JobEventHandlerTest { @BeforeEach public void setUp() throws ApiException { taskModifyReceiver = mock(TaskModifyReceiver.class); + datasetService = mock(DatasetService.class); TaskStatusMachine taskStatusMachine = new TaskStatusMachine(); - jobEventHandler = - new JobEventHandler(taskModifyReceiver, taskStatusMachine, mock(RuntimeService.class), k8sClient); + jobEventHandler = new JobEventHandler( + taskModifyReceiver, taskStatusMachine, mock(RuntimeService.class), datasetService, k8sClient); var pod = new V1Pod().metadata(new V1ObjectMeta().name("1")); pod.setStatus(new V1PodStatus().startTime(startTime)); diff --git a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplateTest.java b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplateTest.java index a702ce69e5..1384f86ec0 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplateTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplateTest.java @@ -152,8 +152,13 @@ public void testDevInfoLabel() { job = k8sJobTemplate.loadJob(K8sJobTemplate.WORKLOAD_TYPE_EVAL); k8sJobTemplate.renderJob(job, "foo", "OnFailure", 10, specs, Map.of(), null, null); labels = job.getSpec().getTemplate().getMetadata().getLabels(); - assertThat(labels, is(Map.of(K8sJobTemplate.DEVICE_LABEL_NAME_PREFIX + "nvidia.com/gpu", "true", - K8sJobTemplate.DEVICE_LABEL_NAME_PREFIX + "cpu", "true"))); + assertThat(labels, is(Map.of( + K8sJobTemplate.DEVICE_LABEL_NAME_PREFIX + "nvidia.com/gpu", "true", + K8sJobTemplate.DEVICE_LABEL_NAME_PREFIX + "cpu", "true", + "job-type", "eval", + "job-name", "foo", + "owner", "starwhale" + ))); } @Test diff --git a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/K8sTaskSchedulerTest.java b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/K8sTaskSchedulerTest.java index 7b473a6849..37f7e64c71 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/K8sTaskSchedulerTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/K8sTaskSchedulerTest.java @@ -26,8 +26,8 @@ import static org.mockito.Mockito.when; import ai.starwhale.mlops.configuration.RunTimeProperties; -import ai.starwhale.mlops.configuration.RunTimeProperties.ImageBuild; import ai.starwhale.mlops.configuration.RunTimeProperties.Pypi; +import ai.starwhale.mlops.configuration.RunTimeProperties.RunConfig; import ai.starwhale.mlops.configuration.security.TaskTokenValidator; import ai.starwhale.mlops.domain.dataset.bo.DataSet; import ai.starwhale.mlops.domain.job.JobType; @@ -97,7 +97,7 @@ private K8sTaskScheduler buildK8sScheduler(K8sClient k8sClient) throws IOExcepti TaskTokenValidator taskTokenValidator = mock(TaskTokenValidator.class); when(taskTokenValidator.getTaskToken(any(), any())).thenReturn("tt"); RunTimeProperties runTimeProperties = new RunTimeProperties( - "", new ImageBuild(), new Pypi("indexU", "extraU", "trustedH", 1, 2), CONDARC); + "", new RunConfig(), new RunConfig(), new Pypi("indexU", "extraU", "trustedH", 1, 2), CONDARC); StorageAccessService storageAccessService = mock(StorageAccessService.class); when(storageAccessService.list(eq("path_rt"))).thenReturn(Stream.of("path_rt")); when(storageAccessService.signedUrl(eq("path_rt"), any())).thenReturn("s3://bucket/path_rt"); @@ -129,8 +129,8 @@ public void testException() throws ApiException, IOException { public void testRenderWithoutGpuResource() throws IOException, ApiException { var client = mock(K8sClient.class); - var runTimeProperties = new RunTimeProperties("", new ImageBuild(), new Pypi("", "", "", 1, 2), - CONDARC); + var runTimeProperties = new RunTimeProperties( + "", new RunConfig(), new RunConfig(), new Pypi("", "", "", 1, 2), CONDARC); var k8sJobTemplate = new K8sJobTemplate("", "", "", ""); var scheduler = new K8sTaskScheduler( client, @@ -166,8 +166,8 @@ public void testRenderWithoutGpuResource() throws IOException, ApiException { public void testRenderWithDefaultGpuResourceInPool() throws IOException, ApiException { var client = mock(K8sClient.class); - var runTimeProperties = new RunTimeProperties("", new ImageBuild(), new Pypi("", "", "", 1, 2), - CONDARC); + var runTimeProperties = new RunTimeProperties( + "", new RunConfig(), new RunConfig(), new Pypi("", "", "", 1, 2), CONDARC); var k8sJobTemplate = new K8sJobTemplate("", "", "", ""); var scheduler = new K8sTaskScheduler( client, @@ -207,8 +207,8 @@ public void testRenderWithDefaultGpuResourceInPool() throws IOException, ApiExce public void testDevMode() throws IOException, ApiException { var client = mock(K8sClient.class); - var runTimeProperties = new RunTimeProperties("", new ImageBuild(), new Pypi("", "", "", 1, 2), - CONDARC); + var runTimeProperties = new RunTimeProperties( + "", new RunConfig(), new RunConfig(), new Pypi("", "", "", 1, 2), CONDARC); var k8sJobTemplate = new K8sJobTemplate("", "", "", ""); var scheduler = new K8sTaskScheduler( client, diff --git a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/PodEventHandlerTest.java b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/PodEventHandlerTest.java index 5a3a8ac276..1087ef5c26 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/PodEventHandlerTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/PodEventHandlerTest.java @@ -24,6 +24,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import ai.starwhale.mlops.domain.dataset.DatasetService; +import ai.starwhale.mlops.domain.dataset.build.log.BuildLogCollector; import ai.starwhale.mlops.domain.job.cache.HotJobHolder; import ai.starwhale.mlops.domain.task.bo.Task; import ai.starwhale.mlops.domain.task.status.TaskStatus; @@ -49,6 +51,7 @@ public class PodEventHandlerTest { PodEventHandler podEventHandler; TaskLogK8sCollector taskLogK8sCollector; + BuildLogCollector buildLogCollector; TaskModifyReceiver taskModifyReceiver; HotJobHolder hotJobHolder; @@ -59,10 +62,13 @@ public class PodEventHandlerTest { public void setup() { hotJobHolder = mock(HotJobHolder.class); taskLogK8sCollector = mock(TaskLogK8sCollector.class); + buildLogCollector = mock(BuildLogCollector.class); taskModifyReceiver = mock(TaskModifyReceiver.class); - podEventHandler = new PodEventHandler(taskLogK8sCollector, taskModifyReceiver, hotJobHolder); + podEventHandler = new PodEventHandler( + taskLogK8sCollector, buildLogCollector, taskModifyReceiver, hotJobHolder, mock(DatasetService.class)); v1Pod = new V1Pod() - .metadata(new V1ObjectMeta().labels(Map.of("job-name", "3")).name("3-xxx")) + .metadata(new V1ObjectMeta() + .labels(Map.of("job-name", "3", "job-type", "eval")).name("3-xxx")) .status(new V1PodStatus() .containerStatuses(List.of( new V1ContainerStatus().state( diff --git a/server/controller/src/test/java/ai/starwhale/mlops/domain/task/log/CancellableTaskLogK8sCollectorFactoryTest.java b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorFactoryTest.java similarity index 86% rename from server/controller/src/test/java/ai/starwhale/mlops/domain/task/log/CancellableTaskLogK8sCollectorFactoryTest.java rename to server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorFactoryTest.java index 4f378012e2..a896824934 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/domain/task/log/CancellableTaskLogK8sCollectorFactoryTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorFactoryTest.java @@ -14,14 +14,13 @@ * limitations under the License. */ -package ai.starwhale.mlops.domain.task.log; +package ai.starwhale.mlops.schedule.k8s.log; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import ai.starwhale.mlops.domain.task.status.watchers.log.CancellableTaskLogK8sCollectorFactory; import ai.starwhale.mlops.schedule.k8s.K8sClient; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.models.V1ObjectMeta; @@ -36,7 +35,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class CancellableTaskLogK8sCollectorFactoryTest { +public class CancellableJobLogK8sCollectorFactoryTest { @Test public void testMake() throws IOException, ApiException { var k8sClient = mock(K8sClient.class); @@ -49,8 +48,8 @@ public void testMake() throws IOException, ApiException { when(resp.body()).thenReturn(respBody); when(call.execute()).thenReturn(resp); when(k8sClient.readLog(anyString(), anyString(), anyBoolean())).thenReturn(call); - var factory = new CancellableTaskLogK8sCollectorFactory(k8sClient); - var collector = factory.make(1L); + var factory = new CancellableJobLogK8sCollectorFactory(k8sClient); + var collector = factory.make("1"); Assertions.assertNotNull(collector); } } diff --git a/server/controller/src/test/java/ai/starwhale/mlops/domain/task/log/CancellableTaskLogK8sCollectorTest.java b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorTest.java similarity index 90% rename from server/controller/src/test/java/ai/starwhale/mlops/domain/task/log/CancellableTaskLogK8sCollectorTest.java rename to server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorTest.java index d59acd4881..3f309b5a99 100644 --- a/server/controller/src/test/java/ai/starwhale/mlops/domain/task/log/CancellableTaskLogK8sCollectorTest.java +++ b/server/controller/src/test/java/ai/starwhale/mlops/schedule/k8s/log/CancellableJobLogK8sCollectorTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package ai.starwhale.mlops.domain.task.log; +package ai.starwhale.mlops.schedule.k8s.log; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -24,7 +24,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import ai.starwhale.mlops.domain.task.status.watchers.log.CancellableTaskLogK8sCollector; import ai.starwhale.mlops.schedule.k8s.K8sClient; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.models.V1ObjectMeta; @@ -39,7 +38,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -public class CancellableTaskLogK8sCollectorTest { +public class CancellableJobLogK8sCollectorTest { K8sClient k8sClient; @BeforeEach @@ -62,7 +61,7 @@ public void testInitAndRead() throws IOException, ApiException { when(call.execute()).thenReturn(resp); when(k8sClient.readLog(anyString(), anyString(), anyBoolean())).thenReturn(call); - var ins = new CancellableTaskLogK8sCollector(k8sClient, 1L); + var ins = new CancellableJobLogK8sCollector(k8sClient, "1"); assertThat(ins.readLine(), is(line)); verify(k8sClient).getPodsByJobName("1"); diff --git a/server/controller/src/test/resources/template/job.yaml b/server/controller/src/test/resources/template/job.yaml index da5b3058f2..85edd24719 100644 --- a/server/controller/src/test/resources/template/job.yaml +++ b/server/controller/src/test/resources/template/job.yaml @@ -7,6 +7,9 @@ spec: parallelism: 1 completionMode: Indexed template: + metadata: + labels: + owner: starwhale spec: restartPolicy: Never containers: