Skip to content

Commit

Permalink
feat(controller): build dataset in server (#2497)
Browse files Browse the repository at this point in the history
  • Loading branch information
goldenxinxing authored Jul 19, 2023
1 parent 130d048 commit 7c7ff4f
Show file tree
Hide file tree
Showing 48 changed files with 1,894 additions and 95 deletions.
52 changes: 51 additions & 1 deletion client/scripts/sw-docker-entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,54 @@ run_code_server () {
echo "-->[Preparing] run code-server done."
}

welcome $1
ds_build_and_upload () {
echo "-->[Preparing] Downloading files..."
BUILD_DIR=$DATASET_BUILD_NAME
mkdir -p "$BUILD_DIR"
cd "$BUILD_DIR"

SIGNED_URLS=$(curl -X 'GET' "$SW_INSTANCE_URI/api/v1/filestorage/signedurl?pathPrefix=$DATASET_BUILD_DIR_PREFIX" -H 'accept: application/json' -H "Authorization: $SW_TOKEN" | jq ".data.signedUrls")

for entry in $(echo "$SIGNED_URLS" | jq -r 'to_entries|map("\(.key)@\(.value)")|.[]'); do
IFS='@' read -r file signedurl <<< "$entry"

filedir=$(dirname "$file")
if [ ! -d "$filedir" ]; then
mkdir -p "$filedir"
fi
echo "$file $signedurl"
done | xargs -I {} -n 2 -P 10 sh -c 'curl -o "$1" "$2"' sh
cd -

cmd="swcli dataset build -n $DATASET_BUILD_NAME"
if [ "$DATASET_BUILD_TYPE" = "IMAGE" ]; then
cmd="$cmd -if $BUILD_DIR"
elif [ "$DATASET_BUILD_TYPE" = "VIDEO" ]; then
cmd="$cmd -vf $BUILD_DIR"
elif [ "$DATASET_BUILD_TYPE" = "AUDIO" ]; then
cmd="$cmd -af $BUILD_DIR"
elif [ "$DATASET_BUILD_TYPE" = "JSON" ]; then
cmd="$cmd -jf $BUILD_DIR"
if [ -z "$DATASET_BUILD_EXTRA" ]; then
cmd="$cmd --field-selector $DATASET_BUILD_EXTRA"
fi
elif [ "$DATASET_BUILD_TYPE" = "HANDLER" ]; then
cmd="$cmd -h $DATASET_BUILD_HANDLER"
elif [ "$DATASET_BUILD_TYPE" = "YAML" ]; then
cmd="$cmd -f $DATASET_BUILD_YAML"
else
echo "Unknown type: $DATASET_BUILD_TYPE" && exit 1
fi

echo "-->[Building] Start to build dataset: $DATASET_BUILD_NAME..."
eval "$cmd" || exit 1

echo "-->[Uploading] Start to upload dataset: $DATASET_BUILD_NAME..."
swcli instance login --token "$SW_TOKEN" --alias server "$SW_INSTANCE_URI"
swcli dataset copy --patch "$DATASET_BUILD_NAME"/version/latest cloud://server/project/"$SW_PROJECT" || exit 1
}

welcome "$1"
case "$1" in
pre_config)
pre_config
Expand All @@ -206,6 +253,9 @@ case "$1" in
prepare && install_code_server && run_code_server
tail -f /var/log/dev.log
;;
dataset_build)
ds_build_and_upload
;;
*)
prepare "starwhale" && exec "$@"
;;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import ai.starwhale.mlops.api.protocol.dataset.DatasetViewVo;
import ai.starwhale.mlops.api.protocol.dataset.DatasetVo;
import ai.starwhale.mlops.api.protocol.dataset.RevertDatasetRequest;
import ai.starwhale.mlops.api.protocol.dataset.build.BuildRecordVo;
import ai.starwhale.mlops.api.protocol.dataset.build.DatasetBuildRequest;
import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataConsumptionRequest;
import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataIndexDesc;
import ai.starwhale.mlops.api.protocol.dataset.upload.DatasetUploadRequest;
import ai.starwhale.mlops.api.protocol.upload.UploadResult;
import ai.starwhale.mlops.domain.dataset.build.BuildStatus;
import com.github.pagehelper.PageInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
Expand Down Expand Up @@ -430,4 +433,32 @@ ResponseEntity<?> headDataset(
@PathVariable("versionUrl")
String versionUrl);

@Operation(summary = "Build Dataset", description = "Build Dataset")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@PostMapping("/project/{projectUrl}/dataset/{datasetName}/build")
@PreAuthorize("hasAnyRole('OWNER', 'MAINTAINER')")
ResponseEntity<ResponseMessage<String>> buildDataset(
@Parameter(in = ParameterIn.PATH, required = true, schema = @Schema())
@PathVariable(name = "projectUrl")
String projectUrl,
@Parameter(in = ParameterIn.PATH, required = true, schema = @Schema())
@PathVariable(name = "datasetName")
String datasetName,
@Valid @RequestBody DatasetBuildRequest datasetBuildRequest);

@Operation(summary = "List Build Records", description = "List Build Records")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@GetMapping("/project/{projectUrl}/dataset/build/list")
@PreAuthorize("hasAnyRole('OWNER', 'MAINTAINER')")
ResponseEntity<ResponseMessage<PageInfo<BuildRecordVo>>> listBuildRecords(
@Parameter(in = ParameterIn.PATH, required = true, schema = @Schema())
@PathVariable(name = "projectUrl")
String projectUrl,
@RequestParam(value = "status", required = false)
BuildStatus status,
@Valid @RequestParam(value = "pageNum", required = false, defaultValue = "1")
Integer pageNum,
@Valid @RequestParam(value = "pageSize", required = false, defaultValue = "10")
Integer pageSize);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.starwhale.mlops.api;

import ai.starwhale.mlops.common.IdConverter;
import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogCollector;
import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogK8sCollectorFactory;
import io.kubernetes.client.openapi.ApiException;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@ServerEndpoint("/api/v1/log/online/dataset/{name}/build/{id}")
public class DatasetBuildLogWsServer {

private static final ExecutorService executorService = Executors.newCachedThreadPool();

private static IdConverter idConvertor;

private static CancellableJobLogK8sCollectorFactory logCollectorFactory;

private Session session;

private String readerId;

private Long id;

private CancellableJobLogCollector logCollector;


@Autowired
public void setIdConvertor(IdConverter idConvertor) {
DatasetBuildLogWsServer.idConvertor = idConvertor;
}

@Autowired
public void setLogCollectorFactory(CancellableJobLogK8sCollectorFactory factory) {
DatasetBuildLogWsServer.logCollectorFactory = factory;
}

@OnOpen
public void onOpen(Session session, @PathParam("name") String name, @PathParam("id") String id) {
this.session = session;
this.readerId = session.getId();
this.id = idConvertor.revert(id);
try {
logCollector = logCollectorFactory.make(String.format("%s-%s", name, id));
} catch (IOException | ApiException e) {
log.error("make k8s log collector failed", e);
}
log.info("Build log ws opened. reader={}, task={}", readerId, id);
executorService.submit(() -> {
String line;
while (true) {
try {
if ((line = logCollector.readLine()) == null) {
break;
}
sendMessage(line);
} catch (IOException e) {
log.error("read k8s log failed", e);
break;
}
}
});
}

@OnClose
public void onClose() {
cancelLogCollector();
log.info("Build log ws closed. reader={}, task={}", readerId, id);
}

@OnMessage
public void onMessage(String message, Session session) {

}

@OnError
public void onError(Session session, Throwable error) {
cancelLogCollector();
log.error("Task log ws error: reader={}, task={}, message={}", readerId, id, error.getMessage());
}

public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("ws send message", e);
}
}

private void cancelLogCollector() {
if (logCollector != null) {
logCollector.cancel();
logCollector = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import ai.starwhale.mlops.api.protocol.dataset.DatasetViewVo;
import ai.starwhale.mlops.api.protocol.dataset.DatasetVo;
import ai.starwhale.mlops.api.protocol.dataset.RevertDatasetRequest;
import ai.starwhale.mlops.api.protocol.dataset.build.BuildRecordVo;
import ai.starwhale.mlops.api.protocol.dataset.build.DatasetBuildRequest;
import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataConsumptionRequest;
import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataIndexDesc;
import ai.starwhale.mlops.api.protocol.dataset.upload.DatasetUploadRequest;
Expand All @@ -34,6 +36,8 @@
import ai.starwhale.mlops.domain.dataset.DatasetService;
import ai.starwhale.mlops.domain.dataset.bo.DatasetQuery;
import ai.starwhale.mlops.domain.dataset.bo.DatasetVersionQuery;
import ai.starwhale.mlops.domain.dataset.build.BuildStatus;
import ai.starwhale.mlops.domain.dataset.build.bo.CreateBuildRecordRequest;
import ai.starwhale.mlops.domain.dataset.dataloader.DataReadRequest;
import ai.starwhale.mlops.domain.dataset.dataloader.ReadMode;
import ai.starwhale.mlops.domain.dataset.objectstore.HashNamedDatasetObjectStoreFactory;
Expand Down Expand Up @@ -356,4 +360,26 @@ public ResponseEntity<?> headDataset(String projectUrl, String datasetUrl, Strin
}
}

@Override
public ResponseEntity<ResponseMessage<String>> buildDataset(
String projectUrl, String datasetName, DatasetBuildRequest datasetBuildRequest) {
datasetService.build(CreateBuildRecordRequest.builder()
.datasetId(datasetBuildRequest.getDatasetId())
.datasetName(datasetName)
.shared(datasetBuildRequest.getShared())
.projectUrl(projectUrl)
.type(datasetBuildRequest.getType())
.storagePath(datasetBuildRequest.getStoragePath())
.build());
return ResponseEntity.ok(Code.success.asResponse("success"));
}

@Override
public ResponseEntity<ResponseMessage<PageInfo<BuildRecordVo>>> listBuildRecords(
String projectUrl, BuildStatus status, Integer pageNum, Integer pageSize) {
return ResponseEntity.ok(Code.success.asResponse(
datasetService.listBuildRecords(projectUrl, status, new PageParams(pageNum, pageSize))));
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.starwhale.mlops.api;

import ai.starwhale.mlops.api.protocol.ResponseMessage;
import ai.starwhale.mlops.api.protocol.filestorage.ApplySignedUrlRequest;
import ai.starwhale.mlops.api.protocol.filestorage.FileDeleteRequest;
import ai.starwhale.mlops.api.protocol.filestorage.SignedUrlResponse;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;

@Tag(name = "File storage", description = "File storage operations")
@Validated
public interface FileStorageApi {

@Operation(summary = "Apply pathPrefix", description = "Apply pathPrefix")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@GetMapping("/filestorage/path/apply")
ResponseEntity<ResponseMessage<String>> applyPathPrefix(String flag);

@Operation(summary = "Apply signedUrls for put", description = "Apply signedUrls for put")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@PutMapping("/filestorage/signedurl")
ResponseEntity<ResponseMessage<SignedUrlResponse>> applySignedPutUrls(
@RequestBody ApplySignedUrlRequest applySignedUrlRequest);

@Operation(summary = "Apply signedUrls for get", description = "Apply signedUrls for get")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@GetMapping("/filestorage/signedurl")
ResponseEntity<ResponseMessage<SignedUrlResponse>> applySignedGetUrls(String pathPrefix);

@Operation(summary = "Delete path", description = "Delete path")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@DeleteMapping("/filestorage/file")
ResponseEntity<ResponseMessage<String>> deletePath(@RequestBody FileDeleteRequest request);

}
Loading

0 comments on commit 7c7ff4f

Please sign in to comment.