Skip to content

Commit

Permalink
Indexing completed jobs.
Browse files Browse the repository at this point in the history
Signed-off-by: jzonthemtn <[email protected]>
  • Loading branch information
jzonthemtn committed Nov 4, 2024
1 parent ca155be commit 964c000
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,23 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.action.ActionListener;
import org.opensearch.eval.judgments.clickmodel.coec.CoecClickModel;
import org.opensearch.eval.judgments.clickmodel.coec.CoecClickModelParameters;
import org.opensearch.eval.judgments.model.Judgment;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class SearchQualityEvaluationJobRunner implements ScheduledJobRunner {

Expand Down Expand Up @@ -100,24 +104,39 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC
final SearchQualityEvaluationJobParameter searchQualityEvaluationJobParameter = (SearchQualityEvaluationJobParameter) jobParameter;

final long startTime = System.currentTimeMillis();
final long judgments;

if(StringUtils.equalsIgnoreCase(searchQualityEvaluationJobParameter.getClickModel(), "coec")) {

LOGGER.info("Beginning implicit judgment generation using clicks-over-expected-clicks.");
final CoecClickModelParameters coecClickModelParameters = new CoecClickModelParameters(true, searchQualityEvaluationJobParameter.getMaxRank());
final CoecClickModel coecClickModel = new CoecClickModel(client, coecClickModelParameters);

coecClickModel.calculateJudgments();
judgments = coecClickModel.calculateJudgments();

} else {

// Invalid click model.
throw new IllegalArgumentException("Invalid click model: " + searchQualityEvaluationJobParameter.getClickModel());

}

LOGGER.info("Implicit judgment generation completed in {} ms", System.currentTimeMillis() - startTime);
final long elapsedTime = System.currentTimeMillis() - startTime;
LOGGER.info("Implicit judgment generation completed in {} ms", elapsedTime);

final Map<String, Object> job = new HashMap<>();
job.put("name", searchQualityEvaluationJobParameter.getName());
job.put("click_model", searchQualityEvaluationJobParameter.getClickModel());
job.put("started", startTime);
job.put("duration", elapsedTime);
job.put("judgments", judgments);
job.put("invocation", "scheduled");
job.put("max_rank", searchQualityEvaluationJobParameter.getMaxRank());

final IndexRequest indexRequest = new IndexRequest().index(SearchQualityEvaluationPlugin.COMPLETED_JOBS_INDEX_NAME)
.id(UUID.randomUUID().toString()).source(job).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

lockService.release(lock,
ActionListener.wrap(released -> LOGGER.info("Released lock for job {}", jobParameter.getName()), exception -> {
throw new IllegalStateException("Failed to release lock.");
})
);
client.index(indexRequest).get();

}, exception -> { throw new IllegalStateException("Failed to acquire lock."); }));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class SearchQualityEvaluationPlugin extends Plugin implements ActionPlugi

private static final Logger LOGGER = LogManager.getLogger(SearchQualityEvaluationPlugin.class);

public static final String SCHEDULED_JOBS_INDEX_NAME = "search_quality_eval_scheduler";
public static final String SCHEDULED_JOBS_INDEX_NAME = "search_quality_eval_scheduled_jobs";
public static final String COMPLETED_JOBS_INDEX_NAME = "search_quality_eval_completed_jobs";

@Override
public Collection<Object> createComponents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

public class SearchQualityEvaluationRestHandler extends BaseRestHandler {

Expand Down Expand Up @@ -69,26 +72,47 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli

if (request.method().equals(RestRequest.Method.POST)) {

final long startTime = System.currentTimeMillis();
final String clickModel = request.param("click_model");
final int maxRank = Integer.parseInt(request.param("max_rank", "20"));
final long judgments;

if (StringUtils.equalsIgnoreCase(clickModel, "coec")) {

final CoecClickModelParameters coecClickModelParameters = new CoecClickModelParameters(true, maxRank);
final CoecClickModel coecClickModel = new CoecClickModel(client, coecClickModelParameters);

// TODO: Run this in a separate thread.
try {
coecClickModel.calculateJudgments();
judgments = coecClickModel.calculateJudgments();
} catch (Exception e) {
throw new RuntimeException(e);
}

final long elapsedTime = System.currentTimeMillis() - startTime;

final Map<String, Object> job = new HashMap<>();
job.put("name", "manual_generation");
job.put("click_model", clickModel);
job.put("started", startTime);
job.put("duration", elapsedTime);
job.put("judgments", judgments);
job.put("invocation", "on_demand");
job.put("max_rank", maxRank);

final IndexRequest indexRequest = new IndexRequest().index(SearchQualityEvaluationPlugin.COMPLETED_JOBS_INDEX_NAME)
.id(UUID.randomUUID().toString()).source(job).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

try {
client.index(indexRequest).get();
} catch (Exception e) {
throw new RuntimeException(e);
}

return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.OK, "Implicit judgment generation initiated."));

} else {

return restChannel -> restChannel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Invalid click_model."));

}

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ public abstract class ClickModel<T extends ClickModelParameters> {
public static final String INDEX_UBI_EVENTS = "ubi_events";
public static final String INDEX_UBI_QUERIES = "ubi_queries";

public abstract void calculateJudgments() throws Exception;
public abstract long calculateJudgments() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public CoecClickModel(final Client client, final CoecClickModelParameters parame
}

@Override
public void calculateJudgments() throws Exception {
public long calculateJudgments() throws Exception {

final int maxRank = parameters.getMaxRank();

Expand All @@ -94,11 +94,11 @@ public void calculateJudgments() throws Exception {

// Generate and index the implicit judgments.
LOGGER.info("Beginning calculation of implicit judgments.");
calculateCoec(rankAggregatedClickThrough, clickthroughRates);
return calculateCoec(rankAggregatedClickThrough, clickthroughRates);

}

public void calculateCoec(final Map<Integer, Double> rankAggregatedClickThrough,
public long calculateCoec(final Map<Integer, Double> rankAggregatedClickThrough,
final Map<String, Set<ClickthroughRate>> clickthroughRates) throws Exception {

// Calculate the COEC.
Expand Down Expand Up @@ -158,7 +158,9 @@ public void calculateCoec(final Map<Integer, Double> rankAggregatedClickThrough,
openSearchHelper.indexJudgments(judgments);
}

LOGGER.info("Persisted number of judgments: {}", judgments.size());
LOGGER.debug("Persisted number of judgments: {}", judgments.size());

return judgments.size();

}

Expand Down

0 comments on commit 964c000

Please sign in to comment.