Skip to content

Commit

Permalink
enhance(controller): job support auto release (#2352)
Browse files Browse the repository at this point in the history
  • Loading branch information
jialeicui authored Jun 15, 2023
1 parent 4633177 commit d6132ee
Show file tree
Hide file tree
Showing 13 changed files with 97 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public ResponseEntity<ResponseMessage<String>> createJob(
jobRequest.getType(),
jobRequest.getDevWay(),
jobRequest.isDevMode(),
jobRequest.getDevPassword());
jobRequest.getDevPassword(),
jobRequest.getTimeToLiveInSec());

return ResponseEntity.ok(Code.success.asResponse(idConvertor.convert(jobId)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ public class JobRequest implements Serializable {

@JsonProperty("devWay")
private DevWay devWay = DevWay.VS_CODE;

private Long timeToLiveInSec;
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ private JobEntity convertFromFlatten(JobFlattenEntity flattenEntity) {
.devMode(flattenEntity.isDevMode())
.devWay(flattenEntity.getDevWay())
.devPassword(flattenEntity.getDevPassword())
.autoReleaseTime(flattenEntity.getAutoReleaseTime())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
Expand Down Expand Up @@ -174,7 +176,7 @@ public Long createJob(String projectUrl,
String modelVersionUrl, String datasetVersionUrls, String runtimeVersionUrl,
String comment, String resourcePool,
String handler, String stepSpecOverWrites, JobType type,
DevWay devWay, boolean devMode, String devPassword) {
DevWay devWay, boolean devMode, String devPassword, Long ttlInSec) {
User user = userService.currentUserDetail();
String jobUuid = IdUtil.simpleUUID();
var project = projectService.findProject(projectUrl);
Expand Down Expand Up @@ -257,6 +259,7 @@ public Long createJob(String projectUrl,
.devMode(devMode)
.devWay(devMode ? devWay : null)
.devPassword(devMode ? devPassword : null)
.autoReleaseTime(ttlInSec == null ? null : new Date(System.currentTimeMillis() + ttlInSec * 1000))
.build();

jobDao.addJob(jobEntity);
Expand All @@ -272,6 +275,32 @@ public Long createJob(String projectUrl,
return jobId;
}

@Transactional
@Scheduled(initialDelay = 10, fixedDelay = 10, timeUnit = TimeUnit.SECONDS)
public void gc() {
List<Job> runningJobs = jobDao.findJobByStatusIn(List.of(JobStatus.RUNNING));
if (CollectionUtils.isEmpty(runningJobs)) {
log.debug("no running job");
return;
}
// check if the auto release time is reached
List<Job> jobsToRelease = runningJobs.stream()
.filter(job -> job.getAutoReleaseTime() != null && job.getAutoReleaseTime().before(new Date()))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(jobsToRelease)) {
log.debug("no job to release");
return;
}
for (Job job : jobsToRelease) {
try {
log.info("release job: {}", job.getId());
cancelJob(job.getId().toString());
} catch (Exception e) {
log.error("failed to release job: {}", job.getId(), e);
}
}
}

/**
* transactional jobStatus->TO_CANCEL; RUNNING/PREPARING/ASSIGNING->TO_CANCEL;CREATED/PAUSED/UNKNOWN->CANCELED
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class Job extends TimeConcern {
boolean devMode;
DevWay devWay;
String devPassword;
Date autoReleaseTime;

@Override
public boolean equals(Object o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public Job fromEntity(JobEntity jobEntity) {
.devMode(jobEntity.isDevMode())
.devWay(jobEntity.getDevWay())
.devPassword(jobEntity.getDevPassword())
.autoReleaseTime(jobEntity.getAutoReleaseTime())
.build();
} catch (JsonProcessingException e) {
throw new SwValidationException(ValidSubject.JOB, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class JobEntity extends BaseEntity implements BundleEntity {
private boolean devMode;
private DevWay devWay;
private String devPassword;
private Date autoReleaseTime;

@Override
public String getName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,6 @@ public class JobFlattenEntity {

// don't sync it to datastore
private String devPassword;

private Date autoReleaseTime;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE job_info
ADD auto_release_time DATETIME NULL
8 changes: 6 additions & 2 deletions server/controller/src/main/resources/mapper/JobMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
j.dev_mode,
j.dev_way,
j.dev_password,
j.auto_release_time,
p.project_name,
p.is_deleted as project_is_deleted,
p.is_default as project_is_default,
Expand Down Expand Up @@ -163,7 +164,8 @@
resource_pool,
dev_mode,
dev_way,
dev_password)
dev_password,
auto_release_time)
values (#{job.jobUuid},
#{job.projectId},
#{job.modelVersionId},
Expand All @@ -177,7 +179,8 @@
#{job.resourcePool},
#{job.devMode},
#{job.devWay},
#{job.devPassword}
#{job.devPassword},
#{job.autoReleaseTime}
)
</insert>

Expand All @@ -202,6 +205,7 @@
<result property="devMode" column="dev_mode"/>
<result property="devWay" column="dev_way"/>
<result property="devPassword" column="dev_password"/>
<result property="autoReleaseTime" column="auto_release_time"/>
<association property="project" resultMap="projectResultMap"/>
<association property="modelVersion" resultMap="modelVersionResultMap"/>
<association property="owner" resultMap="userResultMap"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public void testListTasks() {
@Test
public void testCreatJob() {
given(jobService.createJob(anyString(), anyString(), anyString(), anyString(), anyString(),
anyString(), anyString(), any(), any(), eq(DevWay.VS_CODE), eq(false), anyString())).willReturn(1L);
anyString(), anyString(), any(), any(), eq(DevWay.VS_CODE), eq(false), anyString(),
any())).willReturn(1L);
JobRequest jobRequest = new JobRequest();
jobRequest.setHandler("eval");
jobRequest.setComment("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.BDDMockito.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.mock;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import ai.starwhale.mlops.api.protocol.job.JobVo;
import ai.starwhale.mlops.common.PageParams;
Expand Down Expand Up @@ -69,6 +73,7 @@
import ai.starwhale.mlops.exception.api.StarwhaleApiException;
import ai.starwhale.mlops.resulting.ResultQuerier;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -274,22 +279,22 @@ public void testCreateJob() {
.willReturn(DatasetVersion.builder().id(1L).versionName("a1s2d3f4g5h6").build());

assertThrows(StarwhaleApiException.class, () -> service.createJob("1", "3", "1", "2",
"", "1", "", "", JobType.EVALUATION, DevWay.VS_CODE, false, ""));
"", "1", "", "", JobType.EVALUATION, DevWay.VS_CODE, false, "", 1L));

assertThrows(StarwhaleApiException.class, () -> service.createJob("1", "3", "1", "2",
"", "1", "h", "s", JobType.EVALUATION, DevWay.VS_CODE, false, ""));
"", "1", "h", "s", JobType.EVALUATION, DevWay.VS_CODE, false, "", 1L));

assertThrows(SwValidationException.class, () -> service.createJob("1", "3", "1", "",
"", "1", "h", "s", JobType.EVALUATION, DevWay.VS_CODE, false, ""));
"", "1", "h", "s", JobType.EVALUATION, DevWay.VS_CODE, false, "", 1L));

var res = service.createJob("1", "3", "1", "2",
"", "1", "mnist.evaluator:MNISTInference.cmp", "", JobType.EVALUATION, DevWay.VS_CODE, false, "");
"", "1", "mnist.evaluator:MNISTInference.cmp", "", JobType.EVALUATION, DevWay.VS_CODE, false, "", 1L);
assertThat(res, is(1L));
verify(jobDao).addJob(argThat(jobFlattenEntity -> !jobFlattenEntity.isDevMode()
&& jobFlattenEntity.getDevWay() == null && jobFlattenEntity.getDevPassword() == null));

res = service.createJob("1", "3", "1", "2",
"", "1", "", overviewJobSpec, JobType.FINE_TUNE, DevWay.VS_CODE, true, "");
"", "1", "", overviewJobSpec, JobType.FINE_TUNE, DevWay.VS_CODE, true, "", 1L);
assertThat(res, is(1L));
verify(jobDao).addJob(argThat(jobFlattenEntity -> jobFlattenEntity.isDevMode()
&& jobFlattenEntity.getDevWay() == DevWay.VS_CODE && jobFlattenEntity.getDevPassword().equals("")));
Expand Down Expand Up @@ -365,4 +370,23 @@ public void testResumeJob() {
() -> service.resumeJob("2"));
}

@Test
public void testAutoReleaseJob() {
// nothing to do
when(jobDao.findJobByStatusIn(eq(List.of(JobStatus.RUNNING))))
.thenReturn(List.of(Job.builder().id(1L).status(JobStatus.RUNNING).build()));

var svc = spy(service);
svc.gc();
verify(svc, never()).cancelJob(any());

// cancel job 2
var theJob = Job.builder().id(2L).autoReleaseTime(new Date()).status(JobStatus.RUNNING).build();
when(jobDao.findJobByStatusIn(eq(List.of(JobStatus.RUNNING)))).thenReturn(List.of(theJob));
when(jobDao.getJobId(eq("2"))).thenReturn(theJob.getId());
when(hotJobHolder.ofIds(eq(List.of(theJob.getId())))).thenReturn(List.of(theJob));

svc.gc();
verify(svc).cancelJob("2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void initData() {
.resultOutputPath("").type(JobType.EVALUATION)
.stepSpec("stepSpec2")
.devMode(true)
.autoReleaseTime(new Date(123 * 1000L))
.projectId(project.getId()).ownerId(user.getId()).build();
jobMapper.addJob(jobPaused);
jobMapper.addJob(jobCreated);
Expand Down Expand Up @@ -222,6 +223,7 @@ private void validateJob(JobEntity expectedJob, UserEntity user, ProjectEntity p
Assertions.assertEquals(expectedJob.getStepSpec(), jobEntity.getStepSpec());
Assertions.assertEquals(expectedJob.getComment(), jobEntity.getComment());
Assertions.assertEquals(expectedJob.isDevMode(), jobEntity.isDevMode());
Assertions.assertEquals(expectedJob.getAutoReleaseTime(), jobEntity.getAutoReleaseTime());
Assertions.assertNotNull(jobEntity.getCreatedTime());
final int milli500 = 5;
if (null != expectedJob.getFinishedTime()) {
Expand Down

0 comments on commit d6132ee

Please sign in to comment.