Skip to content

Commit

Permalink
[Improve][Zeta] Add check for submit duplicate job id (#7021)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Jul 18, 2024
1 parent e44e8b9 commit d1b4a7f
Show file tree
Hide file tree
Showing 15 changed files with 126 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private void submitJob(JobImmutableInformation jobImmutableInformation) {
jobImmutableInformation.getJobId(),
seaTunnelHazelcastClient
.getSerializationService()
.toData(jobImmutableInformation));
.toData(jobImmutableInformation),
jobImmutableInformation.isStartWithSavePoint());
PassiveCompletableFuture<Void> submitJobFuture =
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
submitJobFuture.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,55 @@ public void testSetJobId() throws ExecutionException, InterruptedException {
}
}

@Test
public void testSetJobIdDuplicate() {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/streaming_fake_to_console.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("testSetJobId");
long jobId = System.currentTimeMillis();
SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
JobClient jobClient = seaTunnelClient.getJobClient();
try {
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(
filePath, new ArrayList<>(), jobConfig, SEATUNNEL_CONFIG, jobId);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Assertions.assertEquals(jobId, clientJobProxy.getJobId());

await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
"RUNNING", jobClient.getJobStatus(jobId)));
jobClient.cancelJob(jobId);
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
"CANCELED", jobClient.getJobStatus(jobId)));

ClientJobExecutionEnvironment jobExecutionEnvWithSameJobId =
seaTunnelClient.createExecutionContext(
filePath, new ArrayList<>(), jobConfig, SEATUNNEL_CONFIG, jobId);
Exception exception =
Assertions.assertThrows(
Exception.class,
() -> jobExecutionEnvWithSameJobId.execute().waitForJobCompleteV2());
Assertions.assertEquals(
String.format(
"The job id %s has already been submitted and is not starting with a savepoint.",
jobId),
exception.getCause().getMessage());
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
seaTunnelClient.close();
}
}

@Test
public void testGetJobInfo() {
Common.setDeployMode(DeployMode.CLIENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import static com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BOOLEAN_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeBoolean;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeBoolean;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;

Expand All @@ -37,16 +40,19 @@
* to seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
*/

@Generated("ebea440b36898863958c102f47603fee")
/** */
@Generated("9933654790f5fbe98d0ee1c248bc999b")
public final class SeaTunnelSubmitJobCodec {
// hex: 0xDE0200
public static final int REQUEST_MESSAGE_TYPE = 14549504;
// hex: 0xDE0201
public static final int RESPONSE_MESSAGE_TYPE = 14549505;
private static final int REQUEST_JOB_ID_FIELD_OFFSET =
PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
private static final int REQUEST_INITIAL_FRAME_SIZE =
private static final int REQUEST_IS_START_WITH_SAVE_POINT_FIELD_OFFSET =
REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
private static final int REQUEST_INITIAL_FRAME_SIZE =
REQUEST_IS_START_WITH_SAVE_POINT_FIELD_OFFSET + BOOLEAN_SIZE_IN_BYTES;
private static final int RESPONSE_INITIAL_FRAME_SIZE =
RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;

Expand All @@ -57,10 +63,14 @@ public static class RequestParameters {
public long jobId;

public com.hazelcast.internal.serialization.Data jobImmutableInformation;

public boolean isStartWithSavePoint;
}

public static ClientMessage encodeRequest(
long jobId, com.hazelcast.internal.serialization.Data jobImmutableInformation) {
long jobId,
com.hazelcast.internal.serialization.Data jobImmutableInformation,
boolean isStartWithSavePoint) {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(false);
clientMessage.setOperationName("SeaTunnel.SubmitJob");
Expand All @@ -69,6 +79,10 @@ public static ClientMessage encodeRequest(
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
encodeBoolean(
initialFrame.content,
REQUEST_IS_START_WITH_SAVE_POINT_FIELD_OFFSET,
isStartWithSavePoint);
clientMessage.add(initialFrame);
DataCodec.encode(clientMessage, jobImmutableInformation);
return clientMessage;
Expand All @@ -80,6 +94,8 @@ public static SeaTunnelSubmitJobCodec.RequestParameters decodeRequest(
RequestParameters request = new RequestParameters();
ClientMessage.Frame initialFrame = iterator.next();
request.jobId = decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
request.isStartWithSavePoint =
decodeBoolean(initialFrame.content, REQUEST_IS_START_WITH_SAVE_POINT_FIELD_OFFSET);
request.jobImmutableInformation = DataCodec.decode(iterator);
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ methods:
nullable: false
since: 2.0
doc: ''
- name: isStartWithSavePoint
type: boolean
nullable: false
since: 2.0
doc: ''
response: {}

- id: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public class CoordinatorService {
* <p>This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node
* active
*/
IMap<Object, Object> runningJobStateIMap;
private IMap<Object, Object> runningJobStateIMap;

/**
* IMap key is one of jobId {@link
Expand All @@ -131,13 +131,13 @@ public class CoordinatorService {
* <p>This IMap is used to recovery runningJobStateTimestampsIMap in JobMaster when a new master
* node active
*/
IMap<Object, Long[]> runningJobStateTimestampsIMap;
private IMap<Object, Long[]> runningJobStateTimestampsIMap;

/**
* key: job id; <br>
* value: job master;
*/
private Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();
private final Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();

/**
* IMap key is {@link PipelineLocation}
Expand Down Expand Up @@ -213,8 +213,7 @@ private JobEventProcessor createJobEventProcessor(
handlers.add(httpReportHandler);
}
logger.info("Loaded event handlers: " + handlers);
JobEventProcessor eventProcessor = new JobEventProcessor(handlers);
return eventProcessor;
return new JobEventProcessor(handlers);
}

public JobHistoryService getJobHistoryService() {
Expand Down Expand Up @@ -454,7 +453,8 @@ public ResourceManager getResourceManager() {
}

/** call by client to submit job */
public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
public PassiveCompletableFuture<Void> submitJob(
long jobId, Data jobImmutableInformation, boolean isStartWithSavePoint) {
CompletableFuture<Void> jobSubmitFuture = new CompletableFuture<>();

// Check if the current jobID is already running. If so, complete the submission
Expand Down Expand Up @@ -485,6 +485,13 @@ public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInf
executorService.submit(
() -> {
try {
if (!isStartWithSavePoint
&& getJobHistoryService().getJobMetrics(jobId) != null) {
throw new JobException(
String.format(
"The job id %s has already been submitted and is not starting with a savepoint.",
jobId));
}
runningJobInfoIMap.put(
jobId,
new JobInfo(System.currentTimeMillis(), jobImmutableInformation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@

public class SubmitJobOperation extends AbstractJobAsyncOperation {
private Data jobImmutableInformation;
private boolean isStartWithSavePoint;

public SubmitJobOperation() {}

public SubmitJobOperation(long jobId, @NonNull Data jobImmutableInformation) {
public SubmitJobOperation(
long jobId, @NonNull Data jobImmutableInformation, boolean isStartWithSavePoint) {
super(jobId);
this.jobImmutableInformation = jobImmutableInformation;
this.isStartWithSavePoint = isStartWithSavePoint;
}

@Override
Expand All @@ -48,17 +51,21 @@ public int getClassId() {
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
IOUtil.writeData(out, jobImmutableInformation);
out.writeBoolean(isStartWithSavePoint);
}

@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
jobImmutableInformation = IOUtil.readData(in);
isStartWithSavePoint = in.readBoolean();
}

@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
SeaTunnelServer seaTunnelServer = getService();
return seaTunnelServer.getCoordinatorService().submitJob(jobId, jobImmutableInformation);
return seaTunnelServer
.getCoordinatorService()
.submitJob(jobId, jobImmutableInformation, isStartWithSavePoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ protected SubmitJobTask(ClientMessage clientMessage, Node node, Connection conne

@Override
protected Operation prepareOperation() {
return new SubmitJobOperation(parameters.jobId, parameters.jobImmutableInformation);
return new SubmitJobOperation(
parameters.jobId,
parameters.jobImmutableInformation,
parameters.isStartWithSavePoint);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
NodeEngineUtil.sendOperationToMasterNode(
getNode().nodeEngine,
new SubmitJobOperation(
jobId, getNode().nodeEngine.toData(jobImmutableInformation)))
jobId,
getNode().nodeEngine.toData(jobImmutableInformation),
jobImmutableInformation.isStartWithSavePoint()))
.join();

} else {
Expand Down Expand Up @@ -231,7 +233,9 @@ private void submitJob(
.toData(jobImmutableInformation);
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
coordinatorService.submitJob(
Long.parseLong(jobConfig.getJobContext().getJobId()), data);
Long.parseLong(jobConfig.getJobContext().getJobId()),
data,
jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ protected void startJob(Long jobId, String path, boolean isStartWithSavePoint) {
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobId, data);
server.getCoordinatorService()
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ public void testRestoreWhenMasterNodeSwitch() throws InterruptedException, IOExc

Data data = instance1.getSerializationService().toData(jobImmutableInformation);

coordinatorService.submitJob(jobId, data).join();
coordinatorService
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint())
.join();

// waiting for job status turn to running
await().atMost(20000, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ public void testClearCoordinatorService() {
Data data =
coordinatorServiceTest.getSerializationService().toData(jobImmutableInformation);

coordinatorService.submitJob(jobId, data).join();
coordinatorService
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint())
.join();

// waiting for job status turn to running
await().atMost(10000, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -174,7 +176,9 @@ public void testJobRestoreWhenMasterNodeSwitch() throws InterruptedException {

Data data = instance1.getSerializationService().toData(jobImmutableInformation);

coordinatorService.submitJob(jobId, data).join();
coordinatorService
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint())
.join();

// waiting for job status turn to running
await().atMost(20000, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ private void startJob(Long jobid, String path) {
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobid, data);
server.getCoordinatorService()
.submitJob(jobid, data, jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void testTask() throws MalformedURLException {
jobImmutableInformation.getJobId(),
nodeEngine
.getSerializationService()
.toData(jobImmutableInformation));
.toData(jobImmutableInformation),
jobImmutableInformation.isStartWithSavePoint());

Assertions.assertNotNull(voidPassiveCompletableFuture);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ private void startJob(Long jobid, String path) {
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobid, data);
server.getCoordinatorService()
.submitJob(jobid, data, jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ private JobMaster newJobInstanceWithRunningState(long jobId, boolean restore)
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobId, data);
server.getCoordinatorService()
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();

JobMaster jobMaster = server.getCoordinatorService().getJobMaster(jobId);
Expand Down

0 comments on commit d1b4a7f

Please sign in to comment.