From 964c000ca02fec0bae905b5434a5f089fff124c9 Mon Sep 17 00:00:00 2001 From: jzonthemtn Date: Mon, 4 Nov 2024 12:22:17 -0500 Subject: [PATCH] Indexing completed jobs. Signed-off-by: jzonthemtn --- .../SearchQualityEvaluationJobRunner.java | 37 ++++++++++++++----- .../eval/SearchQualityEvaluationPlugin.java | 3 +- .../SearchQualityEvaluationRestHandler.java | 30 +++++++++++++-- .../eval/judgments/clickmodel/ClickModel.java | 2 +- .../clickmodel/coec/CoecClickModel.java | 10 +++-- 5 files changed, 64 insertions(+), 18 deletions(-) diff --git a/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationJobRunner.java b/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationJobRunner.java index 31fccca..c92027b 100644 --- a/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationJobRunner.java +++ b/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationJobRunner.java @@ -11,19 +11,23 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.support.WriteRequest; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.action.ActionListener; import org.opensearch.eval.judgments.clickmodel.coec.CoecClickModel; import org.opensearch.eval.judgments.clickmodel.coec.CoecClickModelParameters; -import org.opensearch.eval.judgments.model.Judgment; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.jobscheduler.spi.utils.LockService; import org.opensearch.threadpool.ThreadPool; -import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; public class SearchQualityEvaluationJobRunner implements ScheduledJobRunner { @@ -100,6 +104,7 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC final SearchQualityEvaluationJobParameter searchQualityEvaluationJobParameter = (SearchQualityEvaluationJobParameter) jobParameter; final long startTime = System.currentTimeMillis(); + final long judgments; if(StringUtils.equalsIgnoreCase(searchQualityEvaluationJobParameter.getClickModel(), "coec")) { @@ -107,17 +112,31 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC final CoecClickModelParameters coecClickModelParameters = new CoecClickModelParameters(true, searchQualityEvaluationJobParameter.getMaxRank()); final CoecClickModel coecClickModel = new CoecClickModel(client, coecClickModelParameters); - coecClickModel.calculateJudgments(); + judgments = coecClickModel.calculateJudgments(); + + } else { + + // Invalid click model. + throw new IllegalArgumentException("Invalid click model: " + searchQualityEvaluationJobParameter.getClickModel()); } - LOGGER.info("Implicit judgment generation completed in {} ms", System.currentTimeMillis() - startTime); + final long elapsedTime = System.currentTimeMillis() - startTime; + LOGGER.info("Implicit judgment generation completed in {} ms", elapsedTime); + + final Map job = new HashMap<>(); + job.put("name", searchQualityEvaluationJobParameter.getName()); + job.put("click_model", searchQualityEvaluationJobParameter.getClickModel()); + job.put("started", startTime); + job.put("duration", elapsedTime); + job.put("judgments", judgments); + job.put("invocation", "scheduled"); + job.put("max_rank", searchQualityEvaluationJobParameter.getMaxRank()); + + final IndexRequest indexRequest = new IndexRequest().index(SearchQualityEvaluationPlugin.COMPLETED_JOBS_INDEX_NAME) + .id(UUID.randomUUID().toString()).source(job).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - lockService.release(lock, - ActionListener.wrap(released -> LOGGER.info("Released lock for job {}", jobParameter.getName()), exception -> { - throw new IllegalStateException("Failed to release lock."); - }) - ); + client.index(indexRequest).get(); }, exception -> { throw new IllegalStateException("Failed to acquire lock."); })); } diff --git a/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationPlugin.java b/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationPlugin.java index 2236055..1ee68cc 100644 --- a/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationPlugin.java +++ b/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationPlugin.java @@ -48,7 +48,8 @@ public class SearchQualityEvaluationPlugin extends Plugin implements ActionPlugi private static final Logger LOGGER = LogManager.getLogger(SearchQualityEvaluationPlugin.class); - public static final String SCHEDULED_JOBS_INDEX_NAME = "search_quality_eval_scheduler"; + public static final String SCHEDULED_JOBS_INDEX_NAME = "search_quality_eval_scheduled_jobs"; + public static final String COMPLETED_JOBS_INDEX_NAME = "search_quality_eval_completed_jobs"; @Override public Collection createComponents( diff --git a/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationRestHandler.java b/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationRestHandler.java index a84608a..a92fe98 100644 --- a/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationRestHandler.java +++ b/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/SearchQualityEvaluationRestHandler.java @@ -31,8 +31,11 @@ import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutionException; public class SearchQualityEvaluationRestHandler extends BaseRestHandler { @@ -69,16 +72,39 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli if (request.method().equals(RestRequest.Method.POST)) { + final long startTime = System.currentTimeMillis(); final String clickModel = request.param("click_model"); final int maxRank = Integer.parseInt(request.param("max_rank", "20")); + final long judgments; if (StringUtils.equalsIgnoreCase(clickModel, "coec")) { final CoecClickModelParameters coecClickModelParameters = new CoecClickModelParameters(true, maxRank); final CoecClickModel coecClickModel = new CoecClickModel(client, coecClickModelParameters); + // TODO: Run this in a separate thread. try { - coecClickModel.calculateJudgments(); + judgments = coecClickModel.calculateJudgments(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + final long elapsedTime = System.currentTimeMillis() - startTime; + + final Map job = new HashMap<>(); + job.put("name", "manual_generation"); + job.put("click_model", clickModel); + job.put("started", startTime); + job.put("duration", elapsedTime); + job.put("judgments", judgments); + job.put("invocation", "on_demand"); + job.put("max_rank", maxRank); + + final IndexRequest indexRequest = new IndexRequest().index(SearchQualityEvaluationPlugin.COMPLETED_JOBS_INDEX_NAME) + .id(UUID.randomUUID().toString()).source(job).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + try { + client.index(indexRequest).get(); } catch (Exception e) { throw new RuntimeException(e); } @@ -86,9 +112,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.OK, "Implicit judgment generation initiated.")); } else { - return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Invalid click_model.")); - } } else { diff --git a/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/judgments/clickmodel/ClickModel.java b/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/judgments/clickmodel/ClickModel.java index cef9f49..d0d7779 100644 --- a/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/judgments/clickmodel/ClickModel.java +++ b/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/judgments/clickmodel/ClickModel.java @@ -13,6 +13,6 @@ public abstract class ClickModel { public static final String INDEX_UBI_EVENTS = "ubi_events"; public static final String INDEX_UBI_QUERIES = "ubi_queries"; - public abstract void calculateJudgments() throws Exception; + public abstract long calculateJudgments() throws Exception; } diff --git a/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/judgments/clickmodel/coec/CoecClickModel.java b/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/judgments/clickmodel/coec/CoecClickModel.java index cdab8ac..624a75e 100644 --- a/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/judgments/clickmodel/coec/CoecClickModel.java +++ b/opensearch-search-quality-evaluation-plugin/src/main/java/org/opensearch/eval/judgments/clickmodel/coec/CoecClickModel.java @@ -76,7 +76,7 @@ public CoecClickModel(final Client client, final CoecClickModelParameters parame } @Override - public void calculateJudgments() throws Exception { + public long calculateJudgments() throws Exception { final int maxRank = parameters.getMaxRank(); @@ -94,11 +94,11 @@ public void calculateJudgments() throws Exception { // Generate and index the implicit judgments. LOGGER.info("Beginning calculation of implicit judgments."); - calculateCoec(rankAggregatedClickThrough, clickthroughRates); + return calculateCoec(rankAggregatedClickThrough, clickthroughRates); } - public void calculateCoec(final Map rankAggregatedClickThrough, + public long calculateCoec(final Map rankAggregatedClickThrough, final Map> clickthroughRates) throws Exception { // Calculate the COEC. @@ -158,7 +158,9 @@ public void calculateCoec(final Map rankAggregatedClickThrough, openSearchHelper.indexJudgments(judgments); } - LOGGER.info("Persisted number of judgments: {}", judgments.size()); + LOGGER.debug("Persisted number of judgments: {}", judgments.size()); + + return judgments.size(); }