From 25409fd0159f81d7c5ec859bd7aeb03040f3e5f0 Mon Sep 17 00:00:00 2001 From: jzonthemtn Date: Sun, 18 Feb 2024 18:06:20 -0500 Subject: [PATCH] #11 Making the events queue abstract to future support other backends. --- .../ubl/UserBehaviorLoggingPlugin.java | 7 ++-- .../UserBehaviorLoggingRestHandler.java | 4 +-- .../UserBehaviorLoggingSearchFilter.java | 23 ++++++------ .../org/opensearch/ubl/backends/Backend.java | 2 +- .../ubl/backends/OpenSearchBackend.java | 16 ++++----- .../ubl/events/AbstractEventManager.java | 31 ++++++++++++++++ .../java/org/opensearch/ubl/events/Event.java | 29 +++++++++++++++ ...nager.java => OpenSearchEventManager.java} | 36 ++++++++++--------- .../ubl/events/queues/EventQueue.java | 5 +-- .../ubl/events/queues/InternalQueue.java | 10 +++--- 10 files changed, 116 insertions(+), 47 deletions(-) create mode 100644 src/main/java/org/opensearch/ubl/events/AbstractEventManager.java create mode 100644 src/main/java/org/opensearch/ubl/events/Event.java rename src/main/java/org/opensearch/ubl/events/{EventManager.java => OpenSearchEventManager.java} (51%) diff --git a/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java b/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java index 8ce2eb6..47380fe 100644 --- a/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java +++ b/src/main/java/org/opensearch/ubl/UserBehaviorLoggingPlugin.java @@ -24,7 +24,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; -import org.opensearch.ubl.events.EventManager; +import org.opensearch.ubl.events.OpenSearchEventManager; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; @@ -102,9 +102,10 @@ public Collection createComponents( LOGGER.info("Creating scheduled task"); - // TODO: Only start this if already initialized. + // TODO: Only start this if an OpenSearch store is already initialized. + // Otherwise, start it when a store is initialized. threadPool.scheduler().scheduleAtFixedRate(() -> { - EventManager.getInstance(client).process(); + OpenSearchEventManager.getInstance(client).process(); }, 0, 2000, TimeUnit.MILLISECONDS); return Collections.emptyList(); diff --git a/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingRestHandler.java b/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingRestHandler.java index c6ada81..dc2d4e0 100644 --- a/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingRestHandler.java +++ b/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingRestHandler.java @@ -72,8 +72,8 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod final String storeName = request.param("store"); LOGGER.info("Persisting event into {}", storeName); - final String event = request.content().utf8ToString(); - backend.persistEvent(storeName, event); + final String eventJson = request.content().utf8ToString(); + backend.persistEvent(storeName, eventJson); return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, "Event received")); diff --git a/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingSearchFilter.java b/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingSearchFilter.java index ff9965f..336c0fc 100644 --- a/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingSearchFilter.java +++ b/src/main/java/org/opensearch/ubl/action/UserBehaviorLoggingSearchFilter.java @@ -16,8 +16,10 @@ import org.opensearch.action.support.ActionFilter; import org.opensearch.action.support.ActionFilterChain; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.ubl.model.QueryResponse; import org.opensearch.ubl.backends.Backend; import org.opensearch.ubl.model.QueryRequest; @@ -67,20 +69,19 @@ public void onResponse(Response response) { //final Set indicesToLog = new HashSet<>(Arrays.asList(settings.get(SettingsConstants.INDEX_NAMES).split(","))); //if(indicesToLog.containsAll(indices)) { - // Create a UUID for this search request. - final String queryId = UUID.randomUUID().toString(); - - // The query will be empty when there is no query, e.g. /_search - final String query = searchRequest.source().toString(); + // Get all search hits from the response. + if (response instanceof SearchResponse) { - // Create a UUID for this search response. - final String queryResponseId = UUID.randomUUID().toString(); + // Create a UUID for this search request. + final String queryId = UUID.randomUUID().toString(); - final List queryResponseHitIds = new LinkedList<>(); + // The query will be empty when there is no query, e.g. /_search + final String query = searchRequest.source().toString(); - // Get all search hits from the response. - if (response instanceof SearchResponse) { + // Create a UUID for this search response. + final String queryResponseId = UUID.randomUUID().toString(); + final List queryResponseHitIds = new LinkedList<>(); final SearchResponse searchResponse = (SearchResponse) response; // Add each hit to the list of query responses. @@ -101,6 +102,8 @@ public void onResponse(Response response) { LOGGER.error("Unable to persist query.", ex); } + // TODO: Somehow return the queryId to the client. + } //} diff --git a/src/main/java/org/opensearch/ubl/backends/Backend.java b/src/main/java/org/opensearch/ubl/backends/Backend.java index 2c76064..9ed461d 100644 --- a/src/main/java/org/opensearch/ubl/backends/Backend.java +++ b/src/main/java/org/opensearch/ubl/backends/Backend.java @@ -20,7 +20,7 @@ public interface Backend { void delete(final String storeName, RestChannel channel); - void persistEvent(final String storeName, String event); + void persistEvent(final String storeName, String eventJson); void persistQuery(final String storeName, QueryRequest queryRequest, QueryResponse queryResponse) throws Exception; diff --git a/src/main/java/org/opensearch/ubl/backends/OpenSearchBackend.java b/src/main/java/org/opensearch/ubl/backends/OpenSearchBackend.java index 2956a93..1997ad9 100644 --- a/src/main/java/org/opensearch/ubl/backends/OpenSearchBackend.java +++ b/src/main/java/org/opensearch/ubl/backends/OpenSearchBackend.java @@ -21,7 +21,8 @@ import org.opensearch.rest.RestChannel; import org.opensearch.rest.action.RestToXContentListener; import org.opensearch.ubl.SettingsConstants; -import org.opensearch.ubl.events.EventManager; +import org.opensearch.ubl.events.Event; +import org.opensearch.ubl.events.OpenSearchEventManager; import org.opensearch.ubl.model.QueryRequest; import org.opensearch.ubl.model.QueryResponse; @@ -90,16 +91,15 @@ public void delete(String storeName, RestChannel channel) { } @Override - public void persistEvent(String storeName, String event) { + public void persistEvent(String storeName, String eventJson) { // Add the event for indexing. LOGGER.info("Indexing event into {}", storeName); final String eventsIndexName = getEventsIndexName(storeName); - final IndexRequest indexRequest = new IndexRequest(eventsIndexName) - .source(event, XContentType.JSON); //return (channel) -> client.index(indexRequest, new RestToXContentListener<>(channel)); - EventManager.getInstance(client).addIndexRequest(indexRequest); + final Event event = new Event(eventsIndexName, eventJson); + OpenSearchEventManager.getInstance(client).add(event); } @@ -123,8 +123,8 @@ public void persistQuery(final String storeName, final QueryRequest queryRequest final IndexRequest indexRequest = new IndexRequest(queriesIndexName) .source(source, XContentType.JSON); - //return (channel) -> client.index(indexRequest, new RestToXContentListener<>(channel)); - EventManager.getInstance(client).addIndexRequest(indexRequest); + // TODO: Move this to the queue, too. + client.index(indexRequest); } @@ -148,7 +148,7 @@ private String getResourceFile(final String fileName) { Streams.copy(is, out); return out.toString(StandardCharsets.UTF_8); } catch (IOException e) { - throw new IllegalStateException("failed to create index with resource [" + OpenSearchBackend.EVENTS_MAPPING_FILE + "]", e); + throw new IllegalStateException("Unable to create index with resource [" + OpenSearchBackend.EVENTS_MAPPING_FILE + "]", e); } } diff --git a/src/main/java/org/opensearch/ubl/events/AbstractEventManager.java b/src/main/java/org/opensearch/ubl/events/AbstractEventManager.java new file mode 100644 index 0000000..88f8093 --- /dev/null +++ b/src/main/java/org/opensearch/ubl/events/AbstractEventManager.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ubl.events; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.ubl.events.queues.EventQueue; +import org.opensearch.ubl.events.queues.InternalQueue; + +public abstract class AbstractEventManager { + + private final Logger LOGGER = LogManager.getLogger(AbstractEventManager.class); + + protected final EventQueue eventQueue; + + public AbstractEventManager() { + this.eventQueue = new InternalQueue(); + } + + public abstract void process(); + + public abstract void add(Event event); + +} diff --git a/src/main/java/org/opensearch/ubl/events/Event.java b/src/main/java/org/opensearch/ubl/events/Event.java new file mode 100644 index 0000000..4f38e65 --- /dev/null +++ b/src/main/java/org/opensearch/ubl/events/Event.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ubl.events; + +public class Event { + + private final String indexName; + private final String event; + + public Event(String indexName, String event) { + this.indexName = indexName; + this.event = event; + } + + public String getIndexName() { + return indexName; + } + + public String getEvent() { + return event; + } + +} diff --git a/src/main/java/org/opensearch/ubl/events/EventManager.java b/src/main/java/org/opensearch/ubl/events/OpenSearchEventManager.java similarity index 51% rename from src/main/java/org/opensearch/ubl/events/EventManager.java rename to src/main/java/org/opensearch/ubl/events/OpenSearchEventManager.java index ec0cac8..80a71ef 100644 --- a/src/main/java/org/opensearch/ubl/events/EventManager.java +++ b/src/main/java/org/opensearch/ubl/events/OpenSearchEventManager.java @@ -13,22 +13,20 @@ import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.client.Client; -import org.opensearch.ubl.events.queues.EventQueue; -import org.opensearch.ubl.events.queues.InternalQueue; +import org.opensearch.common.xcontent.XContentType; -public class EventManager { +public class OpenSearchEventManager extends AbstractEventManager { - private static final Logger LOGGER = LogManager.getLogger(EventManager.class); + private static final Logger LOGGER = LogManager.getLogger(OpenSearchEventManager.class); - private final EventQueue eventQueue; private final Client client; - private static EventManager eventManager; + private static OpenSearchEventManager openSearchEventManager; - private EventManager(Client client) { + private OpenSearchEventManager(Client client) { this.client = client; - this.eventQueue = new InternalQueue(); } + @Override public void process() { if(eventQueue.size() > 0) { @@ -36,8 +34,13 @@ public void process() { final BulkRequest bulkRequest = new BulkRequest(); LOGGER.info("Bulk inserting " + eventQueue.size() + " search relevance events"); - for (final IndexRequest indexRequest : eventQueue.get()) { + for (final Event event : eventQueue.get()) { + + final IndexRequest indexRequest = new IndexRequest(event.getIndexName()) + .source(event.getEvent(), XContentType.JSON); + bulkRequest.add(indexRequest); + } eventQueue.clear(); @@ -47,15 +50,16 @@ public void process() { } - public static EventManager getInstance(Client client) { - if(eventManager == null) { - eventManager = new EventManager(client); - } - return eventManager; + @Override + public void add(Event event) { + eventQueue.add(event); } - public void addIndexRequest(IndexRequest request) { - eventQueue.add(request); + public static OpenSearchEventManager getInstance(Client client) { + if(openSearchEventManager == null) { + openSearchEventManager = new OpenSearchEventManager(client); + } + return openSearchEventManager; } } diff --git a/src/main/java/org/opensearch/ubl/events/queues/EventQueue.java b/src/main/java/org/opensearch/ubl/events/queues/EventQueue.java index c66c571..31afec3 100644 --- a/src/main/java/org/opensearch/ubl/events/queues/EventQueue.java +++ b/src/main/java/org/opensearch/ubl/events/queues/EventQueue.java @@ -9,16 +9,17 @@ package org.opensearch.ubl.events.queues; import org.opensearch.action.index.IndexRequest; +import org.opensearch.ubl.events.Event; import java.util.List; public interface EventQueue { - void add(IndexRequest indexRequest); + void add(Event event); void clear(); - List get(); + List get(); int size(); diff --git a/src/main/java/org/opensearch/ubl/events/queues/InternalQueue.java b/src/main/java/org/opensearch/ubl/events/queues/InternalQueue.java index 279be1c..4266b78 100644 --- a/src/main/java/org/opensearch/ubl/events/queues/InternalQueue.java +++ b/src/main/java/org/opensearch/ubl/events/queues/InternalQueue.java @@ -8,18 +8,18 @@ package org.opensearch.ubl.events.queues; -import org.opensearch.action.index.IndexRequest; +import org.opensearch.ubl.events.Event; import java.util.LinkedList; import java.util.List; public class InternalQueue implements EventQueue { - private static final List indexRequests = new LinkedList<>(); + private static final List indexRequests = new LinkedList<>(); @Override - public void add(IndexRequest indexRequest) { - indexRequests.add(indexRequest); + public void add(Event event) { + indexRequests.add(event); } @Override @@ -28,7 +28,7 @@ public void clear() { } @Override - public List get() { + public List get() { return indexRequests; }