Skip to content

Commit

Permalink
[Components] kafka - new source and action components
Browse files Browse the repository at this point in the history
jcortes committed Jan 4, 2025
1 parent ee3fcb0 commit 21510c3
Showing 9 changed files with 427 additions and 21 deletions.
86 changes: 86 additions & 0 deletions components/kafka/actions/create-topic/create-topic.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import app from "../../kafka.app.mjs";

export default {
key: "kafka-create-topic",
name: "Create Topic",
description: "Create a new Kafka topic by specifying the topic name, number of partitions, and replication factor. [See the documentation](https://github.com/tulios/kafkajs).",
version: "0.0.1",
type: "action",
props: {
app,
topic: {
type: "string",
label: "Topic Name",
description: "Name of the topic to create.",
},
numPartitions: {
type: "integer",
label: "Number Of Partitions",
description: "The number of partitions for the topic.",
optional: true,
},
replicationFactor: {
type: "integer",
label: "Replication Factor",
description: "This is the number of replicas for each partition in the topic. Remember that the replication factor cannot be larger than the number of brokers in the Kafka cluster.",
optional: true,
},
cleanupPolicy: {
type: "string",
label: "Cleanup Policy",
description: "The cleanup policy for the app topic.",
optional: true,
options: [
"delete",
"compact",
],
},
retentionTime: {
type: "integer",
label: "Retention Time",
description: "The number of milli seconds to keep the local log segment before it gets deleted.",
optional: true,
},
},
async run({ $ }) {
const {
app,
topic,
numPartitions,
replicationFactor,
cleanupPolicy,
retentionTime,
} = this;

const configEntries = [];

if (cleanupPolicy) {
configEntries.push({
name: "cleanup.policy",
value: cleanupPolicy,
});
}

if (retentionTime) {
configEntries.push({
name: "retention.ms",
value: String(retentionTime),
});
}

const success = await app.createTopics({
topics: [
{
topic,
numPartitions,
replicationFactor,
configEntries,
},
],
});
$.export("$summary", "Successfully created topic.");
return {
success,
};
},
};
36 changes: 36 additions & 0 deletions components/kafka/actions/delete-topic/delete-topic.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import app from "../../kafka.app.mjs";

export default {
key: "kafka-delete-topic",
name: "Delete Topic",
description: "Deletes a specified Kafka topic. Requires the topic name as input. [See the documentation](https://github.com/tulios/kafkajs).",
version: "0.0.1",
type: "action",
props: {
app,
topic: {
description: "The Kafka topic to delete.",
propDefinition: [
app,
"topic",
],
},
},
async run({ $ }) {
const {
app,
topic,
} = this;

await app.deleteTopics({
topics: [
topic,
],
});

$.export("$summary", "Successfully deleted topic.");
return {
success: true,
};
},
};
71 changes: 71 additions & 0 deletions components/kafka/actions/publish-message/publish-message.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import app from "../../kafka.app.mjs";

export default {
key: "kafka-publish-message",
name: "Publish Message",
description: "Sends a message to a specified Kafka topic. Requires specifying the topic, message key, and value. Optional properties include headers and partition. [See the documentation](https://github.com/tulios/kafkajs).",
version: "0.0.1",
type: "action",
props: {
app,
topic: {
description: "The topic to send the message to.",
propDefinition: [
app,
"topic",
],
},
messageKey: {
type: "string",
label: "Message Key",
description: "Key of the message.",
optional: true,
},
messageValue: {
type: "string",
label: "Message Value",
description: "Value of the message.",
},
partition: {
type: "integer",
label: "Partition",
description: "The specific partition to send the message to, optional.",
optional: true,
},
headers: {
type: "object",
label: "Headers",
description: "Optional headers you want to send along with the message.",
optional: true,
},
},
async run({ $ }) {
const {
app,
topic,
messageKey,
messageValue,
partition,
headers,
} = this;

const response = await app.sendMessages({
topic,
messages: [
{
key: messageKey,
value: messageValue,
partition,
headers,
},
],
});

if (!response?.length) {
throw new Error("Failed to publish message. Please see the kafka app logs for more information.");
}

$.export("$summary", "Successfully published message.");
return response;
},
};
9 changes: 9 additions & 0 deletions components/kafka/common/constants.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const API = {
ADMIN: "admin",
PRODUCER: "producer",
CONSUMER: "consumer",
};

export default {
API,
};
78 changes: 74 additions & 4 deletions components/kafka/kafka.app.mjs
Original file line number Diff line number Diff line change
@@ -1,11 +1,81 @@
import { Kafka } from "kafkajs";
import constants from "./common/constants.mjs";

export default {
type: "app",
app: "kafka",
propDefinitions: {},
propDefinitions: {
topic: {
type: "string",
label: "Topic",
description: "The topic to interact with.",
options() {
return this.listTopics();
},
},
},
methods: {
// this.$auth contains connected account data
authKeys() {
console.log(Object.keys(this.$auth));
getBrokers() {
const {
host,
port,
} = this.$auth;
return [
`${host}:${port}`,
];
},
getClient() {
return new Kafka({
clientId: "Pipedream",
brokers: this.getBrokers(),
});
},
getApiClient(api, config) {
return this.getClient()[api](config);
},
async withApi(fn, api = constants.API.ADMIN, config) {
const apiClient = this.getApiClient(api, config);
await apiClient.connect();
try {
return await fn(apiClient);
} finally {
await apiClient.disconnect();
}
},
listTopics() {
return this.withApi((admin) => admin.listTopics());
},
listGroups() {
return this.withApi((admin) => admin.listGroups());
},
createTopics(args = {}) {
return this.withApi((admin) => admin.createTopics(args));
},
deleteTopics(args = {}) {
return this.withApi((admin) => admin.deleteTopics(args));
},
deleteGroups(args = {}) {
return this.withApi((admin) => admin.deleteGroups(args));
},
sendMessages(args = {}) {
return this.withApi((producer) => producer.send(args), constants.API.PRODUCER);
},
async messageListener({
topic, fromBeginning = true, onMessage, groupId,
} = {}) {
const config = {
groupId,
};
const consumer = this.getApiClient(constants.API.CONSUMER, config);
await consumer.connect();
await consumer.subscribe({
topic,
fromBeginning,
});
await consumer.run({
eachMessage: onMessage,
});
return consumer;
},
},
};
7 changes: 5 additions & 2 deletions components/kafka/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/kafka",
"version": "0.0.1",
"version": "0.1.0",
"description": "Pipedream Kafka Components",
"main": "kafka.app.mjs",
"keywords": [
@@ -11,5 +11,8 @@
"author": "Pipedream <support@pipedream.com> (https://pipedream.com/)",
"publishConfig": {
"access": "public"
},
"dependencies": {
"kafkajs": "^2.2.4"
}
}
}
71 changes: 71 additions & 0 deletions components/kafka/sources/new-message/new-message.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform";
import app from "../../kafka.app.mjs";

export default {
key: "kafka-new-message",
name: "New Message",
description: "Emit new event when a message is published to a Kafka topic using a timer. [See the documentation](https://github.com/tulios/kafkajs).",
version: "0.0.1",
type: "source",
dedupe: "unique",
props: {
app,
timer: {
type: "$.interface.timer",
default: {
intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL,
},
},
topic: {
description: "The topic to listen for new messages on.",
propDefinition: [
app,
"topic",
],
},
},
methods: {
delay(consumer, ms = 1000) {
return new Promise((resolve) => {
setTimeout(async () => {
await consumer.disconnect();
console.log("Consumer disconnected!!!");
resolve();
}, ms);
});
},
},
async run() {
console.log("Running ...");
const {
app,
topic,
delay,
} = this;

const GROUP_ID = "pipedream-group";

await app.deleteGroups([
GROUP_ID,
]);

const consumer = await app.messageListener({
topic,
groupId: GROUP_ID,
onMessage: (record) => {
const { message } = record;
this.$emit({
...record,
msgValue: message.value.toString(),
msgKey: message.key?.toString(),
}, {
id: `${message.key}-${message.offset}-${message.timestamp}`,
summary: `New Message ${message.timestamp}`,
ts: Date.parse(message.timestamp),
});
},
});

await delay(consumer);
},
};
58 changes: 58 additions & 0 deletions components/kafka/sources/new-topic-created/new-topic-created.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform";
import app from "../../kafka.app.mjs";

export default {
key: "kafka-new-topic-created",
name: "New Topic Created",
description: "Emit new event when a new Kafka topic is created.",
version: "0.0.1",
type: "source",
dedupe: "unique",
props: {
app,
timer: {
type: "$.interface.timer",
default: {
intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL,
},
},
topicNamePattern: {
type: "string",
label: "Topic Name Pattern",
description: "Only emit events for topics that match this pattern. E.g., `^my-topic-\\d+$`",
optional: true,
},
},
methods: {
topicMatchesPattern(topic) {
const { topicNamePattern } = this;
if (!topicNamePattern) {
return true;
}
const pattern = new RegExp(topicNamePattern);
return pattern.test(topic);
},
emitTopics(topics) {
topics.forEach((topic) => {
this.$emit({
topic,
}, {
id: topic,
summary: `New Topic: ${topic}`,
ts: Date.now(),
});
});
},
},
async run() {
const {
app,
topicMatchesPattern,
emitTopics,
} = this;

const topics = await app.listTopics();
const topicsToEmit = topics.filter(topicMatchesPattern);
emitTopics(topicsToEmit);
},
};
32 changes: 17 additions & 15 deletions pnpm-lock.yaml

0 comments on commit 21510c3

Please sign in to comment.