diff --git a/docs-website/generateDocsDir.ts b/docs-website/generateDocsDir.ts index 831da46b3e6b1..99a42087c67ba 100644 --- a/docs-website/generateDocsDir.ts +++ b/docs-website/generateDocsDir.ts @@ -156,6 +156,9 @@ function get_slug(filepath: string): string { const hardcoded_titles = { "README.md": "Introduction", "docs/demo.md": "Demo", + "docs/actions/README.md": "Introduction", + "docs/actions/concepts.md": "Concepts", + "docs/actions/quickstart.md": "Quickstart", }; // titles that have been hardcoded in sidebars.js // (for cases where doc is reference multiple times with different titles) @@ -519,6 +522,10 @@ function write_markdown_file( const autogenerated_sidebar_directories = [ "docs/generated/metamodel", "docs/generated/ingestion", + "docs/actions/actions", + "docs/actions/events", + "docs/actions/sources", + "docs/actions/guides", "metadata-ingestion/archived", ]; for (const filepath of markdown_files) { diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index 213dddfef7c85..8ade4f5067a80 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -216,6 +216,55 @@ module.exports = { id: "docs/api/openapi/openapi-usage-guide", }, ], + Actions: [ + { + type: "doc", + id: "docs/actions/README", + label: "Introduction", + }, + { + label: "Quickstart", + type: "doc", + id: "docs/actions/quickstart", + }, + { + label: "Concepts", + type: "doc", + id: "docs/actions/concepts", + }, + { + Sources: [ + { + type: "autogenerated", + dirName: "docs/actions/sources", + }, + ], + }, + { + Events: [ + { + type: "autogenerated", + dirName: "docs/actions/events", + }, + ], + }, + { + Actions: [ + { + type: "autogenerated", + dirName: "docs/actions/actions", + }, + ], + }, + { + Guides: [ + { + type: "autogenerated", + dirName: "docs/actions/guides", + }, + ], + }, + ], "Usage Guides": [ "docs/policies", "docs/domains", diff --git a/docs/actions/README.md b/docs/actions/README.md new file mode 100644 index 0000000000000..fa0c6cb4b71ef --- /dev/null +++ b/docs/actions/README.md @@ -0,0 +1,253 @@ +# ⚡ DataHub Actions Framework + +Welcome to DataHub Actions! The Actions framework makes responding to realtime changes in your Metadata Graph easy, enabling you to seamlessly integrate [DataHub](https://github.com/datahub-project/datahub) into a broader events-based architecture. + +For a detailed introduction, check out the [original announcement](https://www.youtube.com/watch?v=7iwNxHgqxtg&t=2189s) of the DataHub Actions Framework at the DataHub April 2022 Town Hall. For a more in-depth look at use cases and concepts, check out [DataHub Actions Concepts](concepts.md). + +## Quickstart + +To get started right away, check out the [DataHub Actions Quickstart](quickstart.md) Guide. + + +## Prerequisites + +The DataHub Actions CLI commands are an extension of the base `datahub` CLI commands. We recommend +first installing the `datahub` CLI: + +```shell +python3 -m pip install --upgrade pip wheel setuptools +python3 -m pip install --upgrade acryl-datahub +datahub --version +``` + +> Note that the Actions Framework requires a version of `acryl-datahub` >= v0.8.34 + + +## Installation + +Next, simply install the `acryl-datahub-actions` package from PyPi: + +```shell +python3 -m pip install --upgrade pip wheel setuptools +python3 -m pip install --upgrade acryl-datahub-actions +datahub actions version +``` + + +## Configuring an Action + +Actions are configured using a YAML file, much in the same way DataHub ingestion sources are. An action configuration file consists of the following + +1. Action Pipeline Name (Should be unique and static) +2. Source Configurations +3. Transform + Filter Configurations +4. Action Configuration +5. Pipeline Options (Optional) +6. DataHub API configs (Optional - required for select actions) + +With each component being independently pluggable and configurable. + +```yml +# 1. Required: Action Pipeline Name +name: + +# 2. Required: Event Source - Where to source event from. +source: + type: + config: + # Event Source specific configs (map) + +# 3a. Optional: Filter to run on events (map) +filter: + event_type: + event: + # Filter event fields by exact-match + + +# 3b. Optional: Custom Transformers to run on events (array) +transform: + - type: + config: + # Transformer-specific configs (map) + +# 4. Required: Action - What action to take on events. +action: + type: + config: + # Action-specific configs (map) + +# 5. Optional: Additional pipeline options (error handling, etc) +options: + retry_count: 0 # The number of times to retry an Action with the same event. (If an exception is thrown). 0 by default. + failure_mode: "CONTINUE" # What to do when an event fails to be processed. Either 'CONTINUE' to make progress or 'THROW' to stop the pipeline. Either way, the failed event will be logged to a failed_events.log file. + failed_events_dir: "/tmp/datahub/actions" # The directory in which to write a failed_events.log file that tracks events which fail to be processed. Defaults to "/tmp/logs/datahub/actions". + +# 6. Optional: DataHub API configuration +datahub: + server: "http://localhost:8080" # Location of DataHub API + # token: # Required if Metadata Service Auth enabled +``` + +### Example: Hello World + +An simple configuration file for a "Hello World" action, which simply prints all events it receives, is + +```yml +# 1. Action Pipeline Name +name: "hello_world" +# 2. Event Source: Where to source event from. +source: + type: "kafka" + config: + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} +# 3. Action: What action to take on events. +action: + type: "hello_world" +``` + +We can modify this configuration further to filter for specific events, by adding a "filter" block. + +```yml +# 1. Action Pipeline Name +name: "hello_world" + +# 2. Event Source - Where to source event from. +source: + type: "kafka" + config: + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} + +# 3. Filter - Filter events that reach the Action +filter: + event_type: "EntityChangeEvent_v1" + event: + category: "TAG" + operation: "ADD" + modifier: "urn:li:tag:pii" + +# 4. Action - What action to take on events. +action: + type: "hello_world" +``` + + +## Running an Action + +To run a new Action, just use the `actions` CLI command + +``` +datahub actions -c +``` + +Once the Action is running, you will see + +``` +Action Pipeline with name '' is now running. +``` + +### Running multiple Actions + +You can run multiple actions pipeline within the same command. Simply provide multiple +config files by restating the "-c" command line argument. + +For example, + +``` +datahub actions -c -c +``` + +### Running in debug mode + +Simply append the `--debug` flag to the CLI to run your action in debug mode. + +``` +datahub actions -c --debug +``` + +### Stopping an Action + +Just issue a Control-C as usual. You should see the Actions Pipeline shut down gracefully, with a small +summary of processing results. + +``` +Actions Pipeline with name ' +![Certified](https://img.shields.io/badge/support%20status-certified-brightgreen) + + +## Overview + +This Action executes ingestion recipes that are configured via the UI. + +### Capabilities + +- Executing `datahub ingest` command in a sub-process when an Execution Request command is received from DataHub. (Scheduled or manual ingestion run) +- Resolving secrets within an ingestion recipe from DataHub +- Reporting ingestion execution status to DataHub + +### Supported Events + +- `MetadataChangeLog_v1` + +Specifically, changes to the `dataHubExecutionRequestInput` and `dataHubExecutionRequestSignal` aspects of the `dataHubExecutionRequest` entity are required. + + +## Action Quickstart + +### Prerequisites + +#### DataHub Privileges + +This action must be executed as a privileged DataHub user (e.g. using Personal Access Tokens). Specifically, the user must have the `Manage Secrets` Platform Privilege, which allows for retrieval +of decrypted secrets for injection into an ingestion recipe. + +An access token generated from a privileged account must be configured in the `datahub` configuration +block of the YAML configuration, as shown in the example below. + +#### Connecting to Ingestion Sources + +In order for ingestion to run successfully, the process running the Actions must have +network connectivity to any source systems that are required for ingestion. + +For example, if the ingestion recipe is pulling from an internal DBMS, the actions container +must be able to resolve & connect to that DBMS system for the ingestion command to run successfully. + +### Install the Plugin(s) + +Run the following commands to install the relevant action plugin(s): + +`pip install 'acryl-datahub-actions[executor]'` + + +### Configure the Action Config + +Use the following config(s) to get started with this Action. + +```yml +name: "pipeline-name" +source: + # source configs +action: + type: "executor" +# Requires DataHub API configurations to report to DataHub +datahub: + server: "http://${GMS_HOST:-localhost}:${GMS_PORT:-8080}" + # token: # Must have "Manage Secrets" privilege +``` + +
+ View All Configuration Options + + | Field | Required | Default | Description | + | --- | :-: | :-: | --- | + | `executor_id` | ❌ | `default` | An executor ID assigned to the executor. This can be used to manage multiple distinct executors. | +
+ + +## Troubleshooting + +### Quitting the Actions Framework + +Currently, when you quit the Actions framework, any in-flight ingestion processing will continue to execute as a subprocess on your system. This means that there may be "orphaned" processes which +are never marked as "Succeeded" or "Failed" in the UI, even though they may have completed. + +To address this, simply "Cancel" the ingestion source on the UI once you've restarted the Ingestion Executor action. diff --git a/docs/actions/actions/hello_world.md b/docs/actions/actions/hello_world.md new file mode 100644 index 0000000000000..1614427ba359d --- /dev/null +++ b/docs/actions/actions/hello_world.md @@ -0,0 +1,59 @@ +# Hello World + + +![Certified](https://img.shields.io/badge/support%20status-certified-brightgreen) + + +## Overview + +This Action is an example action which simply prints all Events it receives as JSON. + +### Capabilities + +- Printing events that are received by the Action to the console. + +### Supported Events + +All event types, including + +- `EntityChangeEvent_v1` +- `MetadataChangeLog_v1` + + +## Action Quickstart + +### Prerequisites + +No prerequisites. This action comes pre-loaded with `acryl-datahub-actions`. + +### Install the Plugin(s) + +This action comes with the Actions Framework by default: + +`pip install 'acryl-datahub-actions'` + + +### Configure the Action Config + +Use the following config(s) to get started with this Action. + +```yml +name: "pipeline-name" +source: + # source configs +action: + type: "hello_world" +``` + +
+ View All Configuration Options + + | Field | Required | Default | Description | + | --- | :-: | :-: | --- | + | `to_upper` | ❌| `False` | Whether to print events in upper case. | +
+ + +## Troubleshooting + +N/A \ No newline at end of file diff --git a/docs/actions/concepts.md b/docs/actions/concepts.md new file mode 100644 index 0000000000000..cc27014848da1 --- /dev/null +++ b/docs/actions/concepts.md @@ -0,0 +1,101 @@ +# DataHub Actions Concepts + +The Actions framework includes pluggable components for filtering, transforming, and reacting to important DataHub, such as + +- Tag Additions / Removals +- Glossary Term Additions / Removals +- Schema Field Additions / Removals +- Owner Additions / Removals + +& more, in real time. + +DataHub Actions comes with open library of freely available Transformers, Actions, Events, and more. + +Finally, the framework is highly configurable & scalable. Notable highlights include: + +- **Distributed Actions**: Ability to scale-out processing for a single action. Support for running the same Action configuration across multiple nodes to load balance the traffic from the event stream. +- **At-least Once Delivery**: Native support for independent processing state for each Action via post-processing acking to achieve at-least once semantics. +- **Robust Error Handling**: Configurable failure policies featuring event-retry, dead letter queue, and failed-event continuation policy to achieve the guarantees required by your organization. + + +### Use Cases + +Real-time use cases broadly fall into the following categories: + +- **Notifications**: Generate organization-specific notifications when a change is made on DataHub. For example, send an email to the governance team when a "PII" tag is added to any data asset. +- **Workflow Integration**: Integrate DataHub into your organization's internal workflows. For example, create a Jira ticket when specific Tags or Terms are proposed on a Dataset. +- **Synchronization**: Syncing changes made in DataHub into a 3rd party system. For example, reflecting Tag additions in DataHub into Snowflake. +- **Auditing**: Audit who is making what changes on DataHub through time. + +and more! + +## Concepts + +The Actions Framework consists of a few core concepts-- + +- **Pipelines** +- **Events** and **Event Sources** +- **Transformers** +- **Actions** + +Each of these will be described in detail below. + +![](imgs/actions.png) +*In the Actions Framework, Events flow continuously from left-to-right.** + +### Pipelines + +A **Pipeline** is a continuously running process which performs the following functions: + +1. Polls events from a configured Event Source (described below) +2. Applies configured Transformation + Filtering to the Event +3. Executes the configured Action on the resulting Event + +in addition to handling initialization, errors, retries, logging, and more. + +Each Action Configuration file corresponds to a unique Pipeline. In practice, +each Pipeline has its very own Event Source, Transforms, and Actions. This makes it easy to maintain state for mission-critical Actions independently. + +Importantly, each Action must have a unique name. This serves as a stable identifier across Pipeline run which can be useful in saving the Pipeline's consumer state (ie. resiliency + reliability). For example, the Kafka Event Source (default) uses the pipeline name as the Kafka Consumer Group id. This enables you to easily scale-out your Actions by running multiple processes with the same exact configuration file. Each will simply become different consumers in the same consumer group, sharing traffic of the DataHub Events stream. + +### Events + +**Events** are data objects representing changes that have occurred on DataHub. Strictly speaking, the only requirement that the Actions framework imposes is that these objects must be + +a. Convertible to JSON +b. Convertible from JSON + +So that in the event of processing failures, events can be written and read from a failed events file. + + +#### Event Types + +Each Event instance inside the framework corresponds to a single **Event Type**, which is common name (e.g. "EntityChangeEvent_v1") which can be used to understand the shape of the Event. This can be thought of as a "topic" or "stream" name. That being said, Events associated with a single type are not expected to change in backwards-breaking ways across versons. + +### Event Sources + +Events are produced to the framework by **Event Sources**. Event Sources may include their own guarantees, configurations, behaviors, and semantics. They usually produce a fixed set of Event Types. + +In addition to sourcing events, Event Sources are also responsible for acking the succesful processing of an event by implementing the `ack` method. This is invoked by the framework once the Event is guaranteed to have reached the configured Action successfully. + +### Transformers + +**Transformers** are pluggable components which take an Event as input, and produce an Event (or nothing) as output. This can be used to enrich the information of an Event prior to sending it to an Action. + +Multiple Transformers can be configured to run in sequence, filtering and transforming an event in multiple steps. + +Transformers can also be used to generate a completely new type of Event (i.e. registered at runtime via the Event Registry) which can subsequently serve as input to an Action. + +Transformers can be easily customized and plugged in to meet an organization's unqique requirements. For more information on developing a Transformer, check out [Developing a Transformer](guides/developing-a-transformer.md) + + +### Action + +**Actions** are pluggable components which take an Event as input and perform some business logic. Examples may be sending a Slack notification, logging to a file, +or creating a Jira ticket, etc. + +Each Pipeline can be configured to have a single Action which runs after the filtering and transformations have occurred. + +Actions can be easily customized and plugged in to meet an organization's unqique requirements. For more information on developing a Action, check out [Developing a Action](guides/developing-an-action.md) + + diff --git a/docs/actions/events/entity-change-event.md b/docs/actions/events/entity-change-event.md new file mode 100644 index 0000000000000..27277a97ad199 --- /dev/null +++ b/docs/actions/events/entity-change-event.md @@ -0,0 +1,352 @@ +# Entity Change Event V1 + +## Event Type + +`EntityChangeEvent_v1` + +## Overview + +This Event is emitted when certain changes are made to an entity (dataset, dashboard, chart, etc) on DataHub. + +## Event Structure + +Entity Change Events are generated in a variety of circumstances, but share a common set of fields. + +### Common Fields + +| Name | Type | Description | Optional | +|------------------|--------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------| +| entityUrn | String | The unique identifier for the Entity being changed. For example, a Dataset's urn. | False | +| entityType | String | The type of the entity being changed. Supported values include dataset, chart, dashboard, dataFlow (Pipeline), dataJob (Task), domain, tag, glossaryTerm, corpGroup, & corpUser. | False | +| category | String | The category of the change, related to the kind of operation that was performed. Examples include TAG, GLOSSARY_TERM, DOMAIN, LIFECYCLE, and more. | False | +| operation | String | The operation being performed on the entity given the category. For example, ADD ,REMOVE, MODIFY. For the set of valid operations, see the full catalog below. | False | +| modifier | String | The modifier that has been applied to the entity. The value depends on the category. An example includes the URN of a tag being applied to a Dataset or Schema Field. | True | +| parameters | Dict | Additional key-value parameters used to provide specific context. The precise contents depends on the category + operation of the event. See the catalog below for a full summary of the combinations. | True | +| auditStamp.actor | String | The urn of the actor who triggered the change. | False | +| auditStamp.time | Number | The timestamp in milliseconds corresponding to the event. | False | + + + +In following sections, we will provide sample events for each scenario in which Entity Change Events are fired. + + +### Add Tag Event + +This event is emitted when a Tag has been added to an entity on DataHub. + +#### Sample Event + +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "TAG", + "operation": "ADD", + "modifier": "urn:li:tag:PII", + "parameters": { + "tagUrn": "urn:li:tag:PII" + }, + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Remove Tag Event + +This event is emitted when a Tag has been removed from an entity on DataHub. +Header + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "TAG", + "operation": "REMOVE", + "modifier": "urn:li:tag:PII", + "parameters": { + "tagUrn": "urn:li:tag:PII" + }, + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Add Glossary Term Event + +This event is emitted when a Glossary Term has been added to an entity on DataHub. +Header + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "GLOSSARY_TERM", + "operation": "ADD", + "modifier": "urn:li:glossaryTerm:ExampleNode.ExampleTerm", + "parameters": { + "termUrn": "urn:li:glossaryTerm:ExampleNode.ExampleTerm" + }, + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Remove Glossary Term Event + +This event is emitted when a Glossary Term has been removed from an entity on DataHub. + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "GLOSSARY_TERM", + "operation": "REMOVE", + "modifier": "urn:li:glossaryTerm:ExampleNode.ExampleTerm", + "parameters": { + "termUrn": "urn:li:glossaryTerm:ExampleNode.ExampleTerm" + }, + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Add Domain Event + +This event is emitted when Domain has been added to an entity on DataHub. + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "DOMAIN", + "operation": "ADD", + "modifier": "urn:li:domain:ExampleDomain", + "parameters": { + "domainUrn": "urn:li:domain:ExampleDomain" + }, + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Remove Domain Event + +This event is emitted when Domain has been removed from an entity on DataHub. +Header + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "DOMAIN", + "operation": "REMOVE", + "modifier": "urn:li:domain:ExampleDomain", + "parameters": { + "domainUrn": "urn:li:domain:ExampleDomain" + }, + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Add Owner Event + +This event is emitted when a new owner has been assigned to an entity on DataHub. + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "OWNER", + "operation": "ADD", + "modifier": "urn:li:corpuser:jdoe", + "parameters": { + "ownerUrn": "urn:li:corpuser:jdoe", + "ownerType": "BUSINESS_OWNER" + }, + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Remove Owner Event + +This event is emitted when an existing owner has been removed from an entity on DataHub. + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "OWNER", + "operation": "REMOVE", + "modifier": "urn:li:corpuser:jdoe", + "parameters": { + "ownerUrn": "urn:li:corpuser:jdoe", + "ownerType": "BUSINESS_OWNER" + }, + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Modify Deprecation Event + +This event is emitted when the deprecation status of an entity has been modified on DataHub. + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "DEPRECATION", + "operation": "MODIFY", + "modifier": "DEPRECATED", + "parameters": { + "status": "DEPRECATED" + }, + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Add Dataset Schema Field Event + +This event is emitted when a new field has been added to a Dataset Schema. + +#### Sample Event + +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "TECHNICAL_SCHEMA", + "operation": "ADD", + "modifier": "urn:li:schemaField:(urn:li:dataset:abc,newFieldName)", + "parameters": { + "fieldUrn": "urn:li:schemaField:(urn:li:dataset:abc,newFieldName)", + "fieldPath": "newFieldName", + "nullable": false + }, + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Remove Dataset Schema Field Event + +This event is emitted when a new field has been remove from a Dataset Schema. + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "TECHNICAL_SCHEMA", + "operation": "REMOVE", + "modifier": "urn:li:schemaField:(urn:li:dataset:abc,newFieldName)", + "parameters": { + "fieldUrn": "urn:li:schemaField:(urn:li:dataset:abc,newFieldName)", + "fieldPath": "newFieldName", + "nullable": false + }, + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Entity Create Event + +This event is emitted when a new entity has been created on DataHub. +Header + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "LIFECYCLE", + "operation": "CREATE", + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Entity Soft-Delete Event + +This event is emitted when a new entity has been soft-deleted on DataHub. + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "LIFECYCLE", + "operation": "SOFT_DELETE", + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` + + +### Entity Hard-Delete Event + +This event is emitted when a new entity has been hard-deleted on DataHub. + +#### Sample Event +```json +{ + "entityUrn": "urn:li:dataset:abc", + "entityType": "dataset", + "category": "LIFECYCLE", + "operation": "HARD_DELETE", + "auditStamp": { + "actor": "urn:li:corpuser:jdoe", + "time": 1649953100653 + } +} +``` \ No newline at end of file diff --git a/docs/actions/events/metadata-change-log-event.md b/docs/actions/events/metadata-change-log-event.md new file mode 100644 index 0000000000000..cf2069cf2977b --- /dev/null +++ b/docs/actions/events/metadata-change-log-event.md @@ -0,0 +1,151 @@ +# Metadata Change Log Event V1 + +## Event Type + +`MetadataChangeLog_v1` + +## Overview + +This event is emitted when any aspect on DataHub Metadata Graph is changed. This includes creates, updates, and removals of both "versioned" aspects and "time-series" aspects. + +> Disclaimer: This event is quite powerful, but also quite low-level. Because it exposes the underlying metadata model directly, it is subject to more frequent structural and semantic changes than the higher level [Entity Change Event](entity-change-event.md). We recommend using that event instead to achieve your use case when possible. + +## Event Structure + +The fields include + +| Name | Type | Description | Optional | +|---------------------------------|--------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------| +| entityUrn | String | The unique identifier for the Entity being changed. For example, a Dataset's urn. | False | +| entityType | String | The type of the entity being changed. Supported values include dataset, chart, dashboard, dataFlow (Pipeline), dataJob (Task), domain, tag, glossaryTerm, corpGroup, & corpUser. | False | +| entityKeyAspect | Object | The key struct of the entity that was changed. Only present if the Metadata Change Proposal contained the raw key struct. | True | +| changeType | String | The change type. UPSERT or DELETE are currently supported. | False | +| aspectName | String | The entity aspect which was changed. | False | +| aspect | Object | The new aspect value. Null if the aspect was deleted. | True | +| aspect.contentType | String | The serialization type of the aspect itself. The only supported value is `application/json`. | False | +| aspect.value | String | The serialized aspect. This is a JSON-serialized representing the aspect document originally defined in PDL. See https://github.com/datahub-project/datahub/tree/master/metadata-models/src/main/pegasus/com/linkedin for more. | False | +| previousAspectValue | Object | The previous aspect value. Null if the aspect did not exist previously. | True | +| previousAspectValue.contentType | String | The serialization type of the aspect itself. The only supported value is `application/json` | False | +| previousAspectValue.value | String | The serialized aspect. This is a JSON-serialized representing the aspect document originally defined in PDL. See https://github.com/datahub-project/datahub/tree/master/metadata-models/src/main/pegasus/com/linkedin for more. | False | +| systemMetadata | Object | The new system metadata. This includes the the ingestion run-id, model registry and more. For the full structure, see https://github.com/datahub-project/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/mxe/SystemMetadata.pdl | True | +| previousSystemMetadata | Object | The previous system metadata. This includes the the ingestion run-id, model registry and more. For the full structure, see https://github.com/datahub-project/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/mxe/SystemMetadata.pdl | True | +| created | Object | Audit stamp about who triggered the Metadata Change and when. | False | +| created.time | Number | The timestamp in milliseconds when the aspect change occurred. | False | +| created.actor | String | The URN of the actor (e.g. corpuser) that triggered the change. + + +### Sample Events + +#### Tag Change Event + +```json +{ + "entityType": "container", + "entityUrn": "urn:li:container:DATABASE", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "value": "{\"tags\":[{\"tag\":\"urn:li:tag:pii\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1651516475595, + "runId": "no-run-id-provided", + "registryName": "unknownRegistry", + "registryVersion": "0.0.0.0-dev", + "properties": null + }, + "previousAspectValue": null, + "previousSystemMetadata": null, + "created": { + "time": 1651516475594, + "actor": "urn:li:corpuser:datahub", + "impersonator": null + } +} +``` + +#### Glossary Term Change Event + +```json +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "glossaryTerms", + "aspect": { + "value": "{\"auditStamp\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1651516599479},\"terms\":[{\"urn\":\"urn:li:glossaryTerm:CustomerAccount\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1651516599486, + "runId": "no-run-id-provided", + "registryName": "unknownRegistry", + "registryVersion": "0.0.0.0-dev", + "properties": null + }, + "previousAspectValue": null, + "previousSystemMetadata": null, + "created": { + "time": 1651516599480, + "actor": "urn:li:corpuser:datahub", + "impersonator": null + } +} +``` + +#### Owner Change Event + +```json +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "value": "{\"owners\":[{\"type\":\"DATAOWNER\",\"owner\":\"urn:li:corpuser:datahub\"}],\"lastModified\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1651516640488}}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1651516640493, + "runId": "no-run-id-provided", + "registryName": "unknownRegistry", + "registryVersion": "0.0.0.0-dev", + "properties": null + }, + "previousAspectValue": { + "value": "{\"owners\":[{\"owner\":\"urn:li:corpuser:jdoe\",\"type\":\"DATAOWNER\"},{\"owner\":\"urn:li:corpuser:datahub\",\"type\":\"DATAOWNER\"}],\"lastModified\":{\"actor\":\"urn:li:corpuser:jdoe\",\"time\":1581407189000}}", + "contentType": "application/json" + }, + "previousSystemMetadata": { + "lastObserved": 1651516415088, + "runId": "file-2022_05_02-11_33_35", + "registryName": null, + "registryVersion": null, + "properties": null + }, + "created": { + "time": 1651516640490, + "actor": "urn:li:corpuser:datahub", + "impersonator": null + } +} +``` +## FAQ + +### Where can I find all the aspects and their schemas? + +Great Question! All MetadataChangeLog events are based on the Metadata Model which is comprised of Entities, +Aspects, and Relationships which make up an enterprise Metadata Graph. We recommend checking out the following +resources to learn more about this: + +- [Intro to Metadata Model](https://datahubproject.io/docs/metadata-modeling/metadata-model) + +You can also find a comprehensive list of Entities + Aspects of the Metadata Model under the **Metadata Modeling > Entities** section of the [official DataHub docs](https://datahubproject.io/docs/). + + + diff --git a/docs/actions/guides/developing-a-transformer.md b/docs/actions/guides/developing-a-transformer.md new file mode 100644 index 0000000000000..a843dbc846cd5 --- /dev/null +++ b/docs/actions/guides/developing-a-transformer.md @@ -0,0 +1,133 @@ +# Developing a Transformer + +In this guide, we will outline each step to developing a custom Transformer for the DataHub Actions Framework. + +## Overview + +Developing a DataHub Actions Transformer is a matter of extending the `Transformer` base class in Python, installing your +Transformer to make it visible to the framework, and then configuring the framework to use the new Transformer. + + +## Step 1: Defining a Transformer + +To implement an Transformer, we'll need to extend the `Transformer` base class and override the following functions: + +- `create()` - This function is invoked to instantiate the action, with a free-form configuration dictionary + extracted from the Actions configuration file as input. +- `transform()` - This function is invoked when an Event is received. It should contain the core logic of the Transformer. + and will return the transformed Event, or `None` if the Event should be filtered. + +Let's start by defining a new implementation of Transformer called `CustomTransformer`. We'll keep it simple-- this Transformer will +print the configuration that is provided when it is created, and print any Events that it receives. + +```python +# custom_transformer.py +from datahub_actions.transform.transformer import Transformer +from datahub_actions.event.event import EventEnvelope +from datahub_actions.pipeline.pipeline_context import PipelineContext +from typing import Optional + +class CustomTransformer(Transformer): + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "Transformer": + # Simply print the config_dict. + print(config_dict) + return cls(config_dict, ctx) + + def __init__(self, ctx: PipelineContext): + self.ctx = ctx + + def transform(self, event: EventEnvelope) -> Optional[EventEnvelope]: + # Simply print the received event. + print(event) + # And return the original event (no-op) + return event +``` + + +## Step 2: Installing the Transformer + +Now that we've defined the Transformer, we need to make it visible to the framework by making +it available in the Python runtime environment. + +The easiest way to do this is to just place it in the same directory as your configuration file, in which case the module name is the same as the file +name - in this case it will be `custom_transformer`. + +### Advanced: Installing as a Package + +Alternatively, create a `setup.py` file in the same directory as the new Transformer to convert it into a package that pip can understand. + +``` +from setuptools import find_packages, setup + +setup( + name="custom_transformer_example", + version="1.0", + packages=find_packages(), + # if you don't already have DataHub Actions installed, add it under install_requires + # install_requires=["acryl-datahub-actions"] +) +``` + +Next, install the package + +```shell +pip install -e . +``` + +inside the module. (alt.`python setup.py`). + +Once we have done this, our class will be referencable via `custom_transformer_example.custom_transformer:CustomTransformer`. + + +## Step 3: Running the Action + +Now that we've defined our Transformer, we can create an Action configuration file that refers to the new Transformer. +We will need to provide the fully-qualified Python module & class name when doing so. + +*Example Configuration* + +```yaml +# custom_transformer_action.yaml +name: "custom_transformer_test" +source: + type: "kafka" + config: + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} +transform: + - type: "custom_transformer_example.custom_transformer:CustomTransformer" + config: + # Some sample configuration which should be printed on create. + config1: value1 +action: + # Simply reuse the default hello_world action + type: "hello_world" +``` + +Next, run the `datahub actions` command as usual: + +```shell +datahub actions -c custom_transformer_action.yaml +``` + +If all is well, your Transformer should now be receiving & printing Events. + + +### (Optional) Step 4: Contributing the Transformer + +If your Transformer is generally applicable, you can raise a PR to include it in the core Transformer library +provided by DataHub. All Transformers will live under the `datahub_actions/plugin/transform` directory inside the +[datahub-actions](https://github.com/acryldata/datahub-actions) repository. + +Once you've added your new Transformer there, make sure that you make it discoverable by updating the `entry_points` section +of the `setup.py` file. This allows you to assign a globally unique name for you Transformer, so that people can use +it without defining the full module path. + +#### Prerequisites: + +Prerequisites to consideration for inclusion in the core Transformer library include + +- **Testing** Define unit tests for your Transformer +- **Deduplication** Confirm that no existing Transformer serves the same purpose, or can be easily extended to serve the same purpose \ No newline at end of file diff --git a/docs/actions/guides/developing-an-action.md b/docs/actions/guides/developing-an-action.md new file mode 100644 index 0000000000000..d37f574710a86 --- /dev/null +++ b/docs/actions/guides/developing-an-action.md @@ -0,0 +1,132 @@ +# Developing an Action + +In this guide, we will outline each step to developing a Action for the DataHub Actions Framework. + +## Overview + +Developing a DataHub Action is a matter of extending the `Action` base class in Python, installing your +Action to make it visible to the framework, and then configuring the framework to use the new Action. + + +## Step 1: Defining an Action + +To implement an Action, we'll need to extend the `Action` base class and override the following functions: + +- `create()` - This function is invoked to instantiate the action, with a free-form configuration dictionary + extracted from the Actions configuration file as input. +- `act()` - This function is invoked when an Action is received. It should contain the core logic of the Action. +- `close()` - This function is invoked when the framework has issued a shutdown of the pipeline. It should be used + to cleanup any processes happening inside the Action. + +Let's start by defining a new implementation of Action called `CustomAction`. We'll keep it simple-- this Action will +print the configuration that is provided when it is created, and print any Events that it receives. + +```python +# custom_action.py +from datahub_actions.action.action import Action +from datahub_actions.event.event import EventEnvelope +from datahub_actions.pipeline.pipeline_context import PipelineContext + +class CustomAction(Action): + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action": + # Simply print the config_dict. + print(config_dict) + return cls(ctx) + + def __init__(self, ctx: PipelineContext): + self.ctx = ctx + + def act(self, event: EventEnvelope) -> None: + # Do something super important. + # For now, just print. :) + print(event) + + def close(self) -> None: + pass +``` + + +## Step 2: Installing the Action + +Now that we've defined the Action, we need to make it visible to the framework by making it +available in the Python runtime environment. + +The easiest way to do this is to just place it in the same directory as your configuration file, in which case the module name is the same as the file +name - in this case it will be `custom_action`. + +### Advanced: Installing as a Package + +Alternatively, create a `setup.py` file in the same directory as the new Action to convert it into a package that pip can understand. + +``` +from setuptools import find_packages, setup + +setup( + name="custom_action_example", + version="1.0", + packages=find_packages(), + # if you don't already have DataHub Actions installed, add it under install_requires + # install_requires=["acryl-datahub-actions"] +) +``` + +Next, install the package + +```shell +pip install -e . +``` + +inside the module. (alt.`python setup.py`). + +Once we have done this, our class will be referencable via `custom_action_example.custom_action:CustomAction`. + + +## Step 3: Running the Action + +Now that we've defined our Action, we can create an Action configuration file that refers to the new Action. +We will need to provide the fully-qualified Python module & class name when doing so. + +*Example Configuration* + +```yaml +# custom_action.yaml +name: "custom_action_test" +source: + type: "kafka" + config: + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} +action: + type: "custom_action_example.custom_action:CustomAction" + config: + # Some sample configuration which should be printed on create. + config1: value1 +``` + +Next, run the `datahub actions` command as usual: + +```shell +datahub actions -c custom_action.yaml +``` + +If all is well, your Action should now be receiving & printing Events. + + +## (Optional) Step 4: Contributing the Action + +If your Action is generally applicable, you can raise a PR to include it in the core Action library +provided by DataHub. All Actions will live under the `datahub_actions/plugin/action` directory inside the +[datahub-actions](https://github.com/acryldata/datahub-actions) repository. + +Once you've added your new Action there, make sure that you make it discoverable by updating the `entry_points` section +of the `setup.py` file. This allows you to assign a globally unique name for you Action, so that people can use +it without defining the full module path. + +### Prerequisites: + +Prerequisites to consideration for inclusion in the core Actions library include + +- **Testing** Define unit tests for your Action +- **Deduplication** Confirm that no existing Action serves the same purpose, or can be easily extended to serve the same purpose diff --git a/docs/actions/imgs/actions.png b/docs/actions/imgs/actions.png new file mode 100644 index 0000000000000..a92d15adcf9c8 Binary files /dev/null and b/docs/actions/imgs/actions.png differ diff --git a/docs/actions/quickstart.md b/docs/actions/quickstart.md new file mode 100644 index 0000000000000..6b460d4b2b890 --- /dev/null +++ b/docs/actions/quickstart.md @@ -0,0 +1,169 @@ +# DataHub Actions Quickstart + + +## Prerequisites + +The DataHub Actions CLI commands are an extension of the base `datahub` CLI commands. We recommend +first installing the `datahub` CLI: + +```shell +python3 -m pip install --upgrade pip wheel setuptools +python3 -m pip install --upgrade acryl-datahub +datahub --version +``` + +> Note that the Actions Framework requires a version of `acryl-datahub` >= v0.8.34 + + +## Installation + +To install DataHub Actions, you need to install the `acryl-datahub-actions` package from PyPi + +```shell +python3 -m pip install --upgrade pip wheel setuptools +python3 -m pip install --upgrade acryl-datahub-actions + +# Verify the installation by checking the version. +datahub actions version +``` + +### Hello World + +DataHub ships with a "Hello World" Action which logs all events it receives to the console. +To run this action, simply create a new Action configuration file: + +```yaml +# hello_world.yaml +name: "hello_world" +source: + type: "kafka" + config: + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} +action: + type: "hello_world" +``` + +and then run it using the `datahub actions` command: + +```shell +datahub actions -c hello_world.yaml +``` + +You should the see the following output if the Action has been started successfully: + +```shell +Action Pipeline with name 'hello_world' is now running. +``` + +Now, navigate to the instance of DataHub that you've connected to and perform an Action such as + +- Adding / removing a Tag +- Adding / removing a Glossary Term +- Adding / removing a Domain + +If all is well, you should see some events being logged to the console + +```shell +Hello world! Received event: +{ + "event_type": "EntityChangeEvent_v1", + "event": { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)", + "category": "TAG", + "operation": "ADD", + "modifier": "urn:li:tag:pii", + "parameters": {}, + "auditStamp": { + "time": 1651082697703, + "actor": "urn:li:corpuser:datahub", + "impersonator": null + }, + "version": 0, + "source": null + }, + "meta": { + "kafka": { + "topic": "PlatformEvent_v1", + "offset": 1262, + "partition": 0 + } + } +} +``` +*An example of an event emitted when a 'pii' tag has been added to a Dataset.* + +Woohoo! You've successfully started using the Actions framework. Now, let's see how we can get fancy. + + +#### Filtering events + +If we know which Event types we'd like to consume, we can optionally add a `filter` configuration, which +will prevent events that do not match the filter from being forwarded to the action. + +```yaml +# hello_world.yaml +name: "hello_world" +source: + type: "kafka" + config: + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} +filter: + event_type: "EntityChangeEvent_v1" +action: + type: "hello_world" +``` +*Filtering for events of type EntityChangeEvent_v1 only* + + +#### Advanced Filtering + +Beyond simply filtering by event type, we can also filter events by matching against the values of their fields. To do so, +use the `event` block. Each field provided will be compared against the real event's value. An event that matches +**all** of the fields will be forwarded to the action. + +```yaml +# hello_world.yaml +name: "hello_world" +source: + type: "kafka" + config: + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} +filter: + event_type: "EntityChangeEvent_v1" + event: + category: "TAG" + operation: "ADD" + modifier: "urn:li:tag:pii" +action: + type: "hello_world" +``` +*This filter only matches events representing "PII" tag additions to an entity.* + +And more, we can achieve "OR" semantics on a particular field by providing an array of values. + +```yaml +# hello_world.yaml +name: "hello_world" +source: + type: "kafka" + config: + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} +filter: + event_type: "EntityChangeEvent_v1" + event: + category: "TAG" + operation: [ "ADD", "REMOVE" ] + modifier: "urn:li:tag:pii" +action: + type: "hello_world" +``` +*This filter only matches events representing "PII" tag additions to OR removals from an entity. How fancy!* diff --git a/docs/actions/sources/kafka-event-source.md b/docs/actions/sources/kafka-event-source.md new file mode 100644 index 0000000000000..80bc54beca785 --- /dev/null +++ b/docs/actions/sources/kafka-event-source.md @@ -0,0 +1,93 @@ +# Kafka Event Source + +## Overview + +The Kafka Event Source is the default Event Source used within the DataHub Actions Framework. + +Under the hood, the Kafka Event Source uses a Kafka Consumer to subscribe to the topics streaming +out of DataHub (MetadataChangeLog_v1, PlatformEvent_v1). Each Action is automatically placed into a unique +[consumer group](https://docs.confluent.io/platform/current/clients/consumer.html#consumer-groups) based on +the unique `name` provided inside the Action configuration file. + +This means that you can easily scale-out Actions processing by sharing the same Action configuration file across +multiple nodes or processes. As long as the `name` of the Action is the same, each instance of the Actions framework will subscribe as a member in the same Kafka Consumer Group, which allows for load balancing the +topic traffic across consumers which each consume independent [partitions](https://developer.confluent.io/learn-kafka/apache-kafka/partitions/#kafka-partitioning). + +Because the Kafka Event Source uses consumer groups by default, actions using this source will be **stateful**. +This means that Actions will keep track of their processing offsets of the upstream Kafka topics. If you +stop an Action and restart it sometime later, it will first "catch up" by processing the messages that the topic +has received since the Action last ran. Be mindful of this - if your Action is computationally expensive, it may be preferable to start consuming from the end of the log, instead of playing catch up. The easiest way to achieve this is to simply rename the Action inside the Action configuration file - this will create a new Kafka Consumer Group which will begin processing new messages at the end of the log (latest policy). + +### Processing Guarantees + +This event source implements an "ack" function which is invoked if and only if an event is successfully processed +by the Actions framework, meaning that the event made it through the Transformers and into the Action without +any errors. Under the hood, the "ack" method synchronously commits Kafka Consumer Offsets on behalf of the Action. This means that by default, the framework provides *at-least once* processing semantics. That is, in the unusual case that a failure occurs when attempting to commit offsets back to Kafka, that event may be replayed on restart of the Action. + +If you've configured your Action pipeline `failure_mode` to be `CONTINUE` (the default), then events which +fail to be processed will simply be logged to a `failed_events.log` file for further investigation (dead letter queue). The Kafka Event Source will continue to make progress against the underlying topics and continue to commit offsets even in the case of failed messages. + +If you've configured your Action pipeline `failure_mode` to be `THROW`, then events which fail to be processed result in an Action Pipeline error. This in turn terminates the pipeline before committing offsets back to Kafka. Thus the message will not be marked as "processed" by the Action consumer. + + +## Supported Events + +The Kafka Event Source produces + +- [Entity Change Event V1](../events/entity-change-event.md) +- [Metadata Change Log V1](../events/metadata-change-log-event.md) + + +## Configure the Event Source + +Use the following config(s) to get started with the Kafka Event Source. + +```yml +name: "pipeline-name" +source: + type: "kafka" + config: + # Connection-related configuration + connection: + bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092} + schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081} + # Dictionary of freeform consumer configs propagated to underlying Kafka Consumer + consumer_config: + #security.protocol: ${KAFKA_PROPERTIES_SECURITY_PROTOCOL:-PLAINTEXT} + #ssl.keystore.location: ${KAFKA_PROPERTIES_SSL_KEYSTORE_LOCATION:-/mnt/certs/keystore} + #ssl.truststore.location: ${KAFKA_PROPERTIES_SSL_TRUSTSTORE_LOCATION:-/mnt/certs/truststore} + #ssl.keystore.password: ${KAFKA_PROPERTIES_SSL_KEYSTORE_PASSWORD:-keystore_password} + #ssl.key.password: ${KAFKA_PROPERTIES_SSL_KEY_PASSWORD:-keystore_password} + #ssl.truststore.password: ${KAFKA_PROPERTIES_SSL_TRUSTSTORE_PASSWORD:-truststore_password} + # Topic Routing - which topics to read from. + topic_routes: + mcl: ${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:-MetadataChangeLog_Versioned_v1} # Topic name for MetadataChangeLog_v1 events. + pe: ${PLATFORM_EVENT_TOPIC_NAME:-PlatformEvent_v1} # Topic name for PlatformEvent_v1 events. +action: + # action configs +``` + +
+ View All Configuration Options + + | Field | Required | Default | Description | + | --- | :-: | :-: | --- | + | `connection.bootstrap` | ✅ | N/A | The Kafka bootstrap URI, e.g. `localhost:9092`. | + | `connection.schema_registry_url` | ✅ | N/A | The URL for the Kafka schema registry, e.g. `http://localhost:8081` | + | `connection.consumer_config` | ❌ | {} | A set of key-value pairs that represents arbitrary Kafka Consumer configs | + | `topic_routes.mcl` | ❌ | `MetadataChangeLog_v1` | The name of the topic containing MetadataChangeLog events | + | `topic_routes.pe` | ❌ | `PlatformEvent_v1` | The name of the topic containing PlatformEvent events | +
+ + +## FAQ + +1. Is there a way to always start processing from the end of the topics on Actions start? + +Currently, the only way is to change the `name` of the Action in its configuration file. In the future, +we are hoping to add first-class support for configuring the action to be "stateless", ie only process +messages that are received while the Action is running. + +2. Is there a way to asynchronously commit offsets back to Kafka? + +Currently, all consumer offset commits are made synchronously for each message received. For now we've optimized for correctness over performance. If this commit policy does not accommodate your organization's needs, certainly reach out on [Slack](https://slack.datahubproject.io/). \ No newline at end of file