Skip to content

Commit

Permalink
[#2966] feat(core): supports generate topic event (#2968)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
supports generate topic event

### Why are the changes needed?


Fix: #2966 

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
existing tests

---------

Co-authored-by: Jerry Shao <[email protected]>
  • Loading branch information
FANNG1 and jerryshao authored Apr 16, 2024
1 parent 71c687d commit b04efd0
Show file tree
Hide file tree
Showing 20 changed files with 711 additions and 15 deletions.
15 changes: 9 additions & 6 deletions core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.datastrato.gravitino.catalog.TableDispatcher;
import com.datastrato.gravitino.catalog.TableEventDispatcher;
import com.datastrato.gravitino.catalog.TableOperationDispatcher;
import com.datastrato.gravitino.catalog.TopicDispatcher;
import com.datastrato.gravitino.catalog.TopicEventDispatcher;
import com.datastrato.gravitino.catalog.TopicOperationDispatcher;
import com.datastrato.gravitino.listener.EventBus;
import com.datastrato.gravitino.listener.EventListenerManager;
Expand Down Expand Up @@ -55,7 +57,7 @@ public class GravitinoEnv {

private FilesetDispatcher filesetDispatcher;

private TopicOperationDispatcher topicOperationDispatcher;
private TopicDispatcher topicDispatcher;

private MetalakeDispatcher metalakeDispatcher;

Expand Down Expand Up @@ -149,8 +151,9 @@ public void initialize(Config config) {
FilesetOperationDispatcher filesetOperationDispatcher =
new FilesetOperationDispatcher(catalogManager, entityStore, idGenerator);
this.filesetDispatcher = new FilesetEventDispatcher(eventBus, filesetOperationDispatcher);
this.topicOperationDispatcher =
TopicOperationDispatcher topicOperationDispatcher =
new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
this.topicDispatcher = new TopicEventDispatcher(eventBus, topicOperationDispatcher);

// Create and initialize access control related modules
boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
Expand Down Expand Up @@ -225,12 +228,12 @@ public FilesetDispatcher filesetDispatcher() {
}

/**
* Get the TopicOperationDispatcher associated with the Gravitino environment.
* Get the TopicDispatcher associated with the Gravitino environment.
*
* @return The TopicOperationDispatcher instance.
* @return The TopicDispatcher instance.
*/
public TopicOperationDispatcher topicOperationDispatcher() {
return topicOperationDispatcher;
public TopicDispatcher topicDispatcher() {
return topicDispatcher;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog;

import com.datastrato.gravitino.messaging.TopicCatalog;

/**
* {@code TopicDispatcher} interface acts as a specialization of the {@link TopicCatalog} interface.
* This interface is designed to potentially add custom behaviors or operations related to
* dispatching or handling topic-related events or actions that are not covered by the standard
* {@code TopicCatalog} operations.
*/
public interface TopicDispatcher extends TopicCatalog {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.exceptions.NoSuchTopicException;
import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException;
import com.datastrato.gravitino.listener.EventBus;
import com.datastrato.gravitino.listener.api.event.AlterTopicEvent;
import com.datastrato.gravitino.listener.api.event.AlterTopicFailureEvent;
import com.datastrato.gravitino.listener.api.event.CreateTopicEvent;
import com.datastrato.gravitino.listener.api.event.CreateTopicFailureEvent;
import com.datastrato.gravitino.listener.api.event.DropTopicEvent;
import com.datastrato.gravitino.listener.api.event.DropTopicFailureEvent;
import com.datastrato.gravitino.listener.api.event.ListTopicEvent;
import com.datastrato.gravitino.listener.api.event.ListTopicFailureEvent;
import com.datastrato.gravitino.listener.api.event.LoadTopicEvent;
import com.datastrato.gravitino.listener.api.event.LoadTopicFailureEvent;
import com.datastrato.gravitino.listener.api.info.TopicInfo;
import com.datastrato.gravitino.messaging.DataLayout;
import com.datastrato.gravitino.messaging.Topic;
import com.datastrato.gravitino.messaging.TopicChange;
import com.datastrato.gravitino.utils.PrincipalUtils;
import java.util.Map;

/**
* {@code TopicEventDispatcher} is a decorator for {@link TopicDispatcher} that not only delegates
* topic operations to the underlying catalog dispatcher but also dispatches corresponding events to
* an {@link EventBus} after each operation is completed. This allows for event-driven workflows or
* monitoring of topic operations.
*/
public class TopicEventDispatcher implements TopicDispatcher {
private final EventBus eventBus;
private final TopicDispatcher dispatcher;

/**
* Constructs a TopicEventDispatcher with a specified EventBus and TopicCatalog.
*
* @param eventBus The EventBus to which events will be dispatched.
* @param dispatcher The underlying {@link TopicDispatcher} that will perform the actual topic
* operations.
*/
public TopicEventDispatcher(EventBus eventBus, TopicDispatcher dispatcher) {
this.eventBus = eventBus;
this.dispatcher = dispatcher;
}

@Override
public Topic alterTopic(NameIdentifier ident, TopicChange... changes)
throws NoSuchTopicException, IllegalArgumentException {
try {
Topic topic = dispatcher.alterTopic(ident, changes);
eventBus.dispatchEvent(
new AlterTopicEvent(
PrincipalUtils.getCurrentUserName(), ident, changes, new TopicInfo(topic)));
return topic;
} catch (Exception e) {
eventBus.dispatchEvent(
new AlterTopicFailureEvent(PrincipalUtils.getCurrentUserName(), ident, e, changes));
throw e;
}
}

@Override
public boolean dropTopic(NameIdentifier ident) {
try {
boolean isExists = dispatcher.dropTopic(ident);
eventBus.dispatchEvent(
new DropTopicEvent(PrincipalUtils.getCurrentUserName(), ident, isExists));
return isExists;
} catch (Exception e) {
eventBus.dispatchEvent(
new DropTopicFailureEvent(PrincipalUtils.getCurrentUserName(), ident, e));
throw e;
}
}

@Override
public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchTopicException {
try {
NameIdentifier[] nameIdentifiers = dispatcher.listTopics(namespace);
eventBus.dispatchEvent(new ListTopicEvent(PrincipalUtils.getCurrentUserName(), namespace));
return nameIdentifiers;
} catch (Exception e) {
eventBus.dispatchEvent(
new ListTopicFailureEvent(PrincipalUtils.getCurrentUserName(), namespace, e));
throw e;
}
}

@Override
public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
try {
Topic topic = dispatcher.loadTopic(ident);
eventBus.dispatchEvent(
new LoadTopicEvent(PrincipalUtils.getCurrentUserName(), ident, new TopicInfo(topic)));
return topic;
} catch (Exception e) {
eventBus.dispatchEvent(
new LoadTopicFailureEvent(PrincipalUtils.getCurrentUserName(), ident, e));
throw e;
}
}

@Override
public boolean topicExists(NameIdentifier ident) {
return dispatcher.topicExists(ident);
}

@Override
public Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties)
throws NoSuchTopicException, TopicAlreadyExistsException {
try {
Topic topic = dispatcher.createTopic(ident, comment, dataLayout, properties);
eventBus.dispatchEvent(
new CreateTopicEvent(PrincipalUtils.getCurrentUserName(), ident, new TopicInfo(topic)));
return topic;
} catch (Exception e) {
TopicInfo createTopicRequest = new TopicInfo(ident.name(), comment, properties, null);
eventBus.dispatchEvent(
new CreateTopicFailureEvent(
PrincipalUtils.getCurrentUserName(), ident, e, createTopicRequest));
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException;
import com.datastrato.gravitino.messaging.DataLayout;
import com.datastrato.gravitino.messaging.Topic;
import com.datastrato.gravitino.messaging.TopicCatalog;
import com.datastrato.gravitino.messaging.TopicChange;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.TopicEntity;
Expand All @@ -31,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicOperationDispatcher extends OperationDispatcher implements TopicCatalog {
public class TopicOperationDispatcher extends OperationDispatcher implements TopicDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(TopicOperationDispatcher.class);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.listener.api.event;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.annotation.DeveloperApi;
import com.datastrato.gravitino.listener.api.info.TopicInfo;
import com.datastrato.gravitino.messaging.TopicChange;

/** Represents an event fired when a topic is successfully altered. */
@DeveloperApi
public final class AlterTopicEvent extends TopicEvent {
private final TopicInfo updatedTopicInfo;
private final TopicChange[] topicChanges;

/**
* Constructs an instance of {@code AlterTopicEvent}, encapsulating the key details about the
* successful alteration of a topic.
*
* @param user The username of the individual responsible for initiating the topic alteration.
* @param identifier The unique identifier of the altered topic, serving as a clear reference
* point for the topic in question.
* @param topicChanges An array of {@link TopicChange} objects representing the specific changes
* applied to the topic during the alteration process.
* @param updatedTopicInfo The post-alteration state of the topic.
*/
public AlterTopicEvent(
String user,
NameIdentifier identifier,
TopicChange[] topicChanges,
TopicInfo updatedTopicInfo) {
super(user, identifier);
this.topicChanges = topicChanges.clone();
this.updatedTopicInfo = updatedTopicInfo;
}

/**
* Retrieves the updated state of the topic after the successful alteration.
*
* @return A {@link TopicInfo} instance encapsulating the details of the altered topic.
*/
public TopicInfo updatedTopicInfo() {
return updatedTopicInfo;
}

/**
* Retrieves the specific changes that were made to the topic during the alteration process.
*
* @return An array of {@link TopicChange} objects detailing each modification applied to the
* topic.
*/
public TopicChange[] topicChanges() {
return topicChanges;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.listener.api.event;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.annotation.DeveloperApi;
import com.datastrato.gravitino.messaging.TopicChange;

/**
* Represents an event that is triggered when an attempt to alter a topic fails due to an exception.
*/
@DeveloperApi
public final class AlterTopicFailureEvent extends TopicFailureEvent {
private final TopicChange[] topicChanges;

/**
* Constructs an {@code AlterTopicFailureEvent} instance, capturing detailed information about the
* failed topic alteration attempt.
*
* @param user The user who initiated the topic alteration operation.
* @param identifier The identifier of the topic that was attempted to be altered.
* @param exception The exception that was thrown during the topic alteration operation.
* @param topicChanges The changes that were attempted on the topic.
*/
public AlterTopicFailureEvent(
String user, NameIdentifier identifier, Exception exception, TopicChange[] topicChanges) {
super(user, identifier, exception);
this.topicChanges = topicChanges.clone();
}

/**
* Retrieves the changes that were attempted on the topic.
*
* @return An array of {@link TopicChange} objects representing the attempted modifications to the
* topic.
*/
public TopicChange[] topicChanges() {
return topicChanges;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.listener.api.event;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.annotation.DeveloperApi;
import com.datastrato.gravitino.listener.api.info.TopicInfo;

/** Represents an event triggered upon the successful creation of a topic. */
@DeveloperApi
public final class CreateTopicEvent extends TopicEvent {
private final TopicInfo createdTopicInfo;

/**
* Constructs an instance of {@code CreateTopicEvent}, capturing essential details about the
* successful creation of a topic.
*
* @param user The username of the individual who initiated the topic creation.
* @param identifier The unique identifier of the topic that was created.
* @param createdTopicInfo The final state of the topic post-creation.
*/
public CreateTopicEvent(String user, NameIdentifier identifier, TopicInfo createdTopicInfo) {
super(user, identifier);
this.createdTopicInfo = createdTopicInfo;
}

/**
* Retrieves the final state of the topic as it was returned to the user after successful
* creation.
*
* @return A {@link TopicInfo} instance encapsulating the comprehensive details of the newly
* created topic.
*/
public TopicInfo createdTopicInfo() {
return createdTopicInfo;
}
}
Loading

0 comments on commit b04efd0

Please sign in to comment.