Skip to content

Commit

Permalink
lookup only the last job but not all records to improve performance
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork committed Dec 26, 2024
1 parent 571cf78 commit ff484b6
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ public Map<String, List<TaskRunStatus>> listMVRefreshedTaskRunStatus(String dbNa
.forEach(addResult);

// history
taskRunManager.getTaskRunHistory().lookupHistoryByTaskNames(dbName, taskNames)
taskRunManager.getTaskRunHistory().lookupLastJobOfTasks(dbName, taskNames)
.stream()
.filter(taskRunFilter)
.forEach(addResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ public List<TaskRunStatus> lookupHistoryByTaskNames(String dbName, Set<String> t
return result;
}

/**
* Return the list of task runs belong to the LAST JOB:
* Each task run has a `startTaskRunId` as JobId, a job may have multiple task runs.
*/
public List<TaskRunStatus> lookupLastJobOfTasks(String dbName, Set<String> taskNames) {
List<TaskRunStatus> result = getInMemoryHistory().stream()
.filter(x -> x.matchByTaskName(dbName, taskNames))
.collect(Collectors.toList());
if (isEnableArchiveHistory()) {
result.addAll(historyTable.lookupLastJobOfTasks(dbName, taskNames));
}
return result;
}

public List<TaskRunStatus> lookupHistory(TGetTasksParams params) {
List<TaskRunStatus> result = getInMemoryHistory().stream()
.filter(x -> x.match(params))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.util.Strings;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.VelocityEngine;

import java.io.StringWriter;
import java.text.MessageFormat;
import java.time.ZoneId;
import java.util.Collections;
Expand Down Expand Up @@ -92,6 +95,13 @@ public class TaskRunHistoryTable {
private static final String LOOKUP =
"SELECT history_content_json " + "FROM " + TABLE_FULL_NAME + " WHERE ";

private static final VelocityEngine DEFAULT_VELOCITY_ENGINE;

static {
DEFAULT_VELOCITY_ENGINE = new VelocityEngine();
DEFAULT_VELOCITY_ENGINE.setProperty(VelocityEngine.RUNTIME_LOG_REFERENCE_LOG_INVALID, false);
}

private static final TableKeeper KEEPER =
new TableKeeper(DATABASE_NAME, TABLE_NAME, CREATE_TABLE,
() -> Math.max(1, Config.task_runs_ttl_second / 3600 / 24));
Expand Down Expand Up @@ -226,4 +236,48 @@ public List<TaskRunStatus> lookupByTaskNames(String dbName, Set<String> taskName
Collections.sort(result, TaskRunStatus.COMPARATOR_BY_CREATE_TIME_DESC);
return result;
}

public List<TaskRunStatus> lookupLastJobOfTasks(String dbName, Set<String> taskNames) {
final String template =
"WITH MaxStartRunID AS (" +
" SELECT " +
" task_name, " +
" cast(history_content_json->'startTaskRunId' as string) start_run_id" +
" FROM $tableName " +
" WHERE (task_name, create_time) IN (" +
" SELECT task_name, MAX(create_time)" +
" FROM $tableName" +
" WHERE $taskFilter" +
" GROUP BY task_name" +
" )" +
" )" +
" SELECT t.history_content_json" +
" FROM $tableName t" +
" JOIN MaxStartRunID msr" +
" ON t.task_name = msr.task_name" +
" AND cast(t.history_content_json->'startTaskRunId' as string) = msr.start_run_id" +
" ORDER BY t.task_name, t.create_time DESC";

List<String> predicates = Lists.newArrayList("TRUE");
if (StringUtils.isNotEmpty(dbName)) {
predicates.add(" get_json_string(" + CONTENT_COLUMN + ", 'dbName') = "
+ Strings.quote(ClusterNamespace.getFullName(dbName)));
}
if (CollectionUtils.isNotEmpty(taskNames)) {
String values = taskNames.stream().sorted().map(Strings::quote).collect(Collectors.joining(","));
predicates.add(" task_name IN (" + values + ")");
}
String where = Joiner.on(" AND ").join(predicates);

VelocityContext context = new VelocityContext();
context.put("tableName", TABLE_FULL_NAME);
context.put("taskFilter", where);

StringWriter sw = new StringWriter();
DEFAULT_VELOCITY_ENGINE.evaluate(context, sw, "", template);
String sql = sw.toString();

List<TResultBatch> batch = RepoExecutor.getInstance().executeDQL(sql);
return TaskRunStatus.fromResultBatch(batch);
}
}

0 comments on commit ff484b6

Please sign in to comment.