Users
diff --git a/frontend/src/scenes/pipeline/destinations/newDestinationsLogic.tsx b/frontend/src/scenes/pipeline/destinations/newDestinationsLogic.tsx
index e4b6bd8db6c24..285665e5aef79 100644
--- a/frontend/src/scenes/pipeline/destinations/newDestinationsLogic.tsx
+++ b/frontend/src/scenes/pipeline/destinations/newDestinationsLogic.tsx
@@ -63,7 +63,7 @@ export const newDestinationsLogic = kea([
const destinationTypes = siteDesinationsEnabled
? props.types
: props.types.filter((type) => type !== 'site_destination')
- const templates = await api.hogFunctions.listTemplates(destinationTypes)
+ const templates = await api.hogFunctions.listTemplates({ types: destinationTypes })
return templates.results.reduce((acc, template) => {
acc[template.id] = template
return acc
diff --git a/frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx
index cfa5dc06d463c..c94ac3d6681f5 100644
--- a/frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx
+++ b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx
@@ -6,7 +6,6 @@ import {
LemonDropdown,
LemonInput,
LemonLabel,
- LemonSelect,
LemonSwitch,
LemonTag,
LemonTextArea,
@@ -42,9 +41,23 @@ const EVENT_THRESHOLD_ALERT_LEVEL = 8000
export interface HogFunctionConfigurationProps {
templateId?: string | null
id?: string | null
+
+ displayOptions?: {
+ showFilters?: boolean
+ showExpectedVolume?: boolean
+ showStatus?: boolean
+ showEnabled?: boolean
+ showTesting?: boolean
+ canEditSource?: boolean
+ showPersonsCount?: boolean
+ }
}
-export function HogFunctionConfiguration({ templateId, id }: HogFunctionConfigurationProps): JSX.Element {
+export function HogFunctionConfiguration({
+ templateId,
+ id,
+ displayOptions = {},
+}: HogFunctionConfigurationProps): JSX.Element {
const logicProps = { templateId, id }
const logic = hogFunctionConfigurationLogic(logicProps)
const {
@@ -66,9 +79,7 @@ export function HogFunctionConfiguration({ templateId, id }: HogFunctionConfigur
personsCountLoading,
personsListQuery,
template,
- subTemplate,
templateHasChanged,
- forcedSubTemplateId,
type,
} = useValues(logic)
const {
@@ -80,7 +91,6 @@ export function HogFunctionConfiguration({ templateId, id }: HogFunctionConfigur
duplicateFromTemplate,
setConfigurationValue,
deleteHogFunction,
- setSubTemplateId,
} = useActions(logic)
if (loading && !loaded) {
@@ -152,13 +162,24 @@ export function HogFunctionConfiguration({ templateId, id }: HogFunctionConfigur
return
}
- const showFilters = ['destination', 'site_destination', 'broadcast', 'transformation'].includes(type)
- const showExpectedVolume = ['destination', 'site_destination'].includes(type)
- const showStatus = ['destination', 'email', 'transformation'].includes(type)
- const showEnabled = ['destination', 'email', 'site_destination', 'site_app', 'transformation'].includes(type)
- const canEditSource = ['destination', 'email', 'site_destination', 'site_app', 'transformation'].includes(type)
- const showPersonsCount = ['broadcast'].includes(type)
- const showTesting = ['destination', 'transformation', 'broadcast', 'email'].includes(type)
+ const showFilters =
+ displayOptions.showFilters ??
+ ['destination', 'internal_destination', 'site_destination', 'broadcast', 'transformation'].includes(type)
+ const showExpectedVolume = displayOptions.showExpectedVolume ?? ['destination', 'site_destination'].includes(type)
+ const showStatus =
+ displayOptions.showStatus ?? ['destination', 'internal_destination', 'email', 'transformation'].includes(type)
+ const showEnabled =
+ displayOptions.showEnabled ??
+ ['destination', 'internal_destination', 'email', 'site_destination', 'site_app', 'transformation'].includes(
+ type
+ )
+ const canEditSource =
+ displayOptions.canEditSource ??
+ ['destination', 'email', 'site_destination', 'site_app', 'transformation'].includes(type)
+ const showPersonsCount = displayOptions.showPersonsCount ?? ['broadcast'].includes(type)
+ const showTesting =
+ displayOptions.showTesting ??
+ ['destination', 'internal_destination', 'transformation', 'broadcast', 'email'].includes(type)
return (
@@ -359,41 +380,6 @@ export function HogFunctionConfiguration({ templateId, id }: HogFunctionConfigur
- {!forcedSubTemplateId && template?.sub_templates && (
- <>
-
-
-
Choose template
-
({
- value: subTemplate.id,
- label: subTemplate.name,
- labelInMenu: (
-
-
{subTemplate.name}
-
- {subTemplate.description}
-
-
- ),
- })),
- ]}
- value={subTemplate?.id}
- onChange={(value) => {
- setSubTemplateId(value)
- }}
- />
-
-
- >
- )}
-
{
if (!filters) {
@@ -74,6 +75,10 @@ export function HogFunctionFilters(): JSX.Element {
)
}
+ if (type === 'internal_destination') {
+ return
+ }
+
const showMasking = type === 'destination'
const showDropEvents = type === 'transformation'
diff --git a/frontend/src/scenes/pipeline/hogfunctions/filters/HogFunctionFiltersInternal.tsx b/frontend/src/scenes/pipeline/hogfunctions/filters/HogFunctionFiltersInternal.tsx
new file mode 100644
index 0000000000000..7195bc320cf40
--- /dev/null
+++ b/frontend/src/scenes/pipeline/hogfunctions/filters/HogFunctionFiltersInternal.tsx
@@ -0,0 +1,49 @@
+import { LemonSelect } from '@posthog/lemon-ui'
+import { LemonField } from 'lib/lemon-ui/LemonField'
+
+import { HogFunctionFiltersType } from '~/types'
+
+// NOTE: This is all a bit WIP and will be improved upon over time
+// TODO: Make this more advanced with sub type filtering etc.
+// TODO: Make it possible for the renderer to limit the options based on the type
+const FILTER_OPTIONS = [
+ {
+ label: 'Team activity',
+ value: '$activity_log_entry_created',
+ },
+]
+
+const getSimpleFilterValue = (value?: HogFunctionFiltersType): string | undefined => {
+ return value?.events?.[0]?.id
+}
+
+const setSimpleFilterValue = (value: string): HogFunctionFiltersType => {
+ return {
+ events: [
+ {
+ name: FILTER_OPTIONS.find((option) => option.value === value)?.label,
+ id: value,
+ type: 'events',
+ },
+ ],
+ }
+}
+
+export function HogFunctionFiltersInternal(): JSX.Element {
+ return (
+
+
+ {({ value, onChange }) => (
+ <>
+ onChange(setSimpleFilterValue(value))}
+ placeholder="Select a filter"
+ />
+ >
+ )}
+
+
+ )
+}
diff --git a/frontend/src/scenes/pipeline/hogfunctions/hogFunctionConfigurationLogic.test.ts b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionConfigurationLogic.test.ts
index 3c9bef43c45d8..0f93034551c59 100644
--- a/frontend/src/scenes/pipeline/hogfunctions/hogFunctionConfigurationLogic.test.ts
+++ b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionConfigurationLogic.test.ts
@@ -23,7 +23,7 @@ import { hogFunctionConfigurationLogic } from './hogFunctionConfigurationLogic'
const HOG_TEMPLATE: HogFunctionTemplateType = {
sub_templates: [
{
- id: 'early_access_feature_enrollment',
+ id: 'early-access-feature-enrollment',
name: 'HTTP Webhook on feature enrollment',
description: null,
filters: {
@@ -38,7 +38,7 @@ const HOG_TEMPLATE: HogFunctionTemplateType = {
inputs: null,
},
{
- id: 'survey_response',
+ id: 'survey-response',
name: 'HTTP Webhook on survey response',
description: null,
filters: {
diff --git a/frontend/src/scenes/pipeline/hogfunctions/hogFunctionConfigurationLogic.tsx b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionConfigurationLogic.tsx
index d38b39ce21c59..f7312f19e8640 100644
--- a/frontend/src/scenes/pipeline/hogfunctions/hogFunctionConfigurationLogic.tsx
+++ b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionConfigurationLogic.tsx
@@ -33,8 +33,6 @@ import {
HogFunctionInputType,
HogFunctionInvocationGlobals,
HogFunctionMappingType,
- HogFunctionSubTemplateIdType,
- HogFunctionSubTemplateType,
HogFunctionTemplateType,
HogFunctionType,
HogFunctionTypeType,
@@ -49,7 +47,6 @@ import type { hogFunctionConfigurationLogicType } from './hogFunctionConfigurati
export interface HogFunctionConfigurationLogicProps {
templateId?: string | null
- subTemplateId?: string | null
id?: string | null
}
@@ -116,19 +113,11 @@ export function sanitizeConfiguration(data: HogFunctionConfigurationType): HogFu
return payload
}
-const templateToConfiguration = (
- template: HogFunctionTemplateType,
- subTemplate?: HogFunctionSubTemplateType | null
-): HogFunctionConfigurationType => {
- function getInputs(
- inputs_schema?: HogFunctionInputSchemaType[] | null,
- subTemplate?: HogFunctionSubTemplateType | null
- ): Record {
+const templateToConfiguration = (template: HogFunctionTemplateType): HogFunctionConfigurationType => {
+ function getInputs(inputs_schema?: HogFunctionInputSchemaType[] | null): Record {
const inputs: Record = {}
inputs_schema?.forEach((schema) => {
- if (typeof subTemplate?.inputs?.[schema.key] !== 'undefined') {
- inputs[schema.key] = { value: subTemplate.inputs[schema.key] }
- } else if (schema.default !== undefined) {
+ if (schema.default !== undefined) {
inputs[schema.key] = { value: schema.default }
}
})
@@ -149,11 +138,11 @@ const templateToConfiguration = (
return {
type: template.type ?? 'destination',
- name: subTemplate?.name ?? template.name,
- description: subTemplate?.name ?? template.description,
+ name: template.name,
+ description: template.description,
inputs_schema: template.inputs_schema,
- filters: subTemplate?.filters ?? template.filters,
- mappings: (subTemplate?.mappings ?? template.mappings)?.map(
+ filters: template.filters,
+ mappings: template.mappings?.map(
(mapping): HogFunctionMappingType => ({
...mapping,
inputs: getMappingInputs(mapping.inputs_schema),
@@ -161,7 +150,7 @@ const templateToConfiguration = (
),
hog: template.hog,
icon_url: template.icon_url,
- inputs: getInputs(template.inputs_schema, subTemplate),
+ inputs: getInputs(template.inputs_schema),
enabled: template.type !== 'broadcast',
}
}
@@ -226,7 +215,6 @@ export const hogFunctionConfigurationLogic = kea ({ sparklineQuery } as { sparklineQuery: TrendsQuery }),
personsCountQueryChanged: (personsCountQuery: ActorsQuery) =>
({ personsCountQuery } as { personsCountQuery: ActorsQuery }),
- setSubTemplateId: (subTemplateId: HogFunctionSubTemplateIdType | null) => ({ subTemplateId }),
loadSampleGlobals: true,
setUnsavedConfiguration: (configuration: HogFunctionConfigurationType | null) => ({ configuration }),
persistForUnload: true,
@@ -254,12 +242,6 @@ export const hogFunctionConfigurationLogic = kea true,
},
],
- subTemplateId: [
- null as HogFunctionSubTemplateIdType | null,
- {
- setSubTemplateId: (_, { subTemplateId }) => subTemplateId,
- },
- ],
unsavedConfiguration: [
null as { timestamp: number; configuration: HogFunctionConfigurationType } | null,
@@ -467,6 +449,10 @@ export const hogFunctionConfigurationLogic = kea (hogFunction ?? template)?.type === 'site_destination',
],
defaultFormState: [
- (s) => [s.template, s.hogFunction, s.subTemplate],
- (template, hogFunction, subTemplate): HogFunctionConfigurationType | null => {
+ (s) => [s.template, s.hogFunction],
+ (template, hogFunction): HogFunctionConfigurationType | null => {
if (template) {
- return templateToConfiguration(template, subTemplate)
+ return templateToConfiguration(template)
}
return hogFunction ?? null
},
@@ -843,18 +829,6 @@ export const hogFunctionConfigurationLogic = kea [s.template, s.subTemplateId],
- (template, subTemplateId) => {
- if (!template || !subTemplateId) {
- return null
- }
-
- const subTemplate = template.sub_templates?.find((st) => st.id === subTemplateId)
- return subTemplate
- },
- ],
- forcedSubTemplateId: [() => [router.selectors.searchParams], ({ sub_template }) => !!sub_template],
mappingTemplates: [
(s) => [s.hogFunction, s.template],
(hogFunction, template) => template?.mapping_templates ?? hogFunction?.template?.mapping_templates ?? [],
@@ -966,7 +940,7 @@ export const hogFunctionConfigurationLogic = kea {
const template = values.hogFunction?.template ?? values.template
if (template) {
- const config = templateToConfiguration(template, values.subTemplate)
+ const config = templateToConfiguration(template)
const inputs = config.inputs ?? {}
@@ -1014,10 +988,6 @@ export const hogFunctionConfigurationLogic = kea {
- actions.resetToTemplate()
- },
-
persistForUnload: () => {
actions.setUnsavedConfiguration(values.configuration)
},
@@ -1030,9 +1000,6 @@ export const hogFunctionConfigurationLogic = kea([
actions.setTestInvocationValue('globals', JSON.stringify(sampleGlobals, null, 2))
},
})),
+
forms(({ props, actions, values }) => ({
testInvocation: {
defaults: {
diff --git a/frontend/src/scenes/pipeline/hogfunctions/list/HogFunctionTemplateList.tsx b/frontend/src/scenes/pipeline/hogfunctions/list/HogFunctionTemplateList.tsx
index d86cd44b5634e..7f9fd156672ac 100644
--- a/frontend/src/scenes/pipeline/hogfunctions/list/HogFunctionTemplateList.tsx
+++ b/frontend/src/scenes/pipeline/hogfunctions/list/HogFunctionTemplateList.tsx
@@ -20,11 +20,11 @@ export function HogFunctionTemplateList({
)
const { loadHogFunctionTemplates, setFilters, resetFilters } = useActions(hogFunctionTemplateListLogic(props))
- useEffect(() => loadHogFunctionTemplates(), [])
+ useEffect(() => loadHogFunctionTemplates(), [props.type, props.subTemplateId])
return (
<>
-
+
{!props.forceFilters?.search && (
setShowNewDestination(false)}>
diff --git a/frontend/src/scenes/pipeline/hogfunctions/list/hogFunctionTemplateListLogic.tsx b/frontend/src/scenes/pipeline/hogfunctions/list/hogFunctionTemplateListLogic.tsx
index c3397c10d4c50..e8f71ecca6454 100644
--- a/frontend/src/scenes/pipeline/hogfunctions/list/hogFunctionTemplateListLogic.tsx
+++ b/frontend/src/scenes/pipeline/hogfunctions/list/hogFunctionTemplateListLogic.tsx
@@ -8,7 +8,7 @@ import { objectsEqual } from 'lib/utils'
import { hogFunctionNewUrl } from 'scenes/pipeline/hogfunctions/urls'
import { pipelineAccessLogic } from 'scenes/pipeline/pipelineAccessLogic'
-import { HogFunctionTemplateType, HogFunctionTypeType } from '~/types'
+import { HogFunctionSubTemplateIdType, HogFunctionTemplateType, HogFunctionTypeType } from '~/types'
import type { hogFunctionTemplateListLogicType } from './hogFunctionTemplateListLogicType'
@@ -18,11 +18,11 @@ export interface Fuse extends FuseClass {}
export type HogFunctionTemplateListFilters = {
search?: string
filters?: Record
- subTemplateId?: string
}
export type HogFunctionTemplateListLogicProps = {
type: HogFunctionTypeType
+ subTemplateId?: HogFunctionSubTemplateIdType
defaultFilters?: HogFunctionTemplateListFilters
forceFilters?: HogFunctionTemplateListFilters
syncFiltersWithUrl?: boolean
@@ -30,7 +30,12 @@ export type HogFunctionTemplateListLogicProps = {
export const hogFunctionTemplateListLogic = kea([
props({} as HogFunctionTemplateListLogicProps),
- key((props) => `${props.syncFiltersWithUrl ? 'scene' : 'default'}/${props.type ?? 'destination'}`),
+ key(
+ (props) =>
+ `${props.syncFiltersWithUrl ? 'scene' : 'default'}/${props.type ?? 'destination'}/${
+ props.subTemplateId ?? ''
+ }`
+ ),
path((id) => ['scenes', 'pipeline', 'destinationsLogic', id]),
connect({
values: [pipelineAccessLogic, ['canEnableNewDestinations'], featureFlagLogic, ['featureFlags']],
@@ -55,41 +60,22 @@ export const hogFunctionTemplateListLogic = kea ({
- rawTemplates: [
+ templates: [
[] as HogFunctionTemplateType[],
{
loadHogFunctionTemplates: async () => {
- return (await api.hogFunctions.listTemplates(props.type)).results
+ return (
+ await api.hogFunctions.listTemplates({
+ types: [props.type],
+ sub_template_id: props.subTemplateId,
+ })
+ ).results
},
},
],
})),
selectors({
- loading: [(s) => [s.rawTemplatesLoading], (x) => x],
- templates: [
- (s) => [s.rawTemplates, s.filters],
- (rawTemplates, { subTemplateId }): HogFunctionTemplateType[] => {
- if (!subTemplateId) {
- return rawTemplates
- }
- const templates: HogFunctionTemplateType[] = []
- // We want to pull out the sub templates and return the template but with overrides applied
-
- rawTemplates.forEach((template) => {
- const subTemplate = template.sub_templates?.find((subTemplate) => subTemplate.id === subTemplateId)
-
- if (subTemplate) {
- templates.push({
- ...template,
- name: subTemplate.name,
- description: subTemplate.description ?? template.description,
- })
- }
- })
-
- return templates
- },
- ],
+ loading: [(s) => [s.templatesLoading], (x) => x],
templatesFuse: [
(s) => [s.templates],
(hogFunctionTemplates): Fuse => {
@@ -123,13 +109,9 @@ export const hogFunctionTemplateListLogic = kea string) => {
return (template: HogFunctionTemplateType) => {
// Add the filters to the url and the template id
- const subTemplateId = filters.subTemplateId
-
return combineUrl(
hogFunctionNewUrl(template.type, template.id),
- {
- sub_template: subTemplateId,
- },
+ {},
{
configuration: {
filters: filters.filters,
diff --git a/frontend/src/scenes/surveys/SurveyView.tsx b/frontend/src/scenes/surveys/SurveyView.tsx
index f064b83899bde..267879c405c43 100644
--- a/frontend/src/scenes/surveys/SurveyView.tsx
+++ b/frontend/src/scenes/surveys/SurveyView.tsx
@@ -279,7 +279,7 @@ export function SurveyView({ id }: { id: string }): JSX.Element {
content: (
-
Display mode
+
Display mode
{survey.type === SurveyType.API
? survey.type.toUpperCase()
@@ -287,9 +287,9 @@ export function SurveyView({ id }: { id: string }): JSX.Element {
{survey.questions[0].question && (
<>
-
Type
+
Type
{SurveyQuestionLabel[survey.questions[0].type]}
-
+
{pluralize(
survey.questions.length,
'Question',
@@ -304,20 +304,20 @@ export function SurveyView({ id }: { id: string }): JSX.Element {
)}
{survey.questions[0].type === SurveyQuestionType.Link && (
<>
- Link url
+ Link url
{survey.questions[0].link}
>
)}
{survey.start_date && (
- Start date
+ Start date
)}
{survey.end_date && (
- End date
+ End date
)}
@@ -328,7 +328,7 @@ export function SurveyView({ id }: { id: string }): JSX.Element {
survey.iteration_count > 0 &&
survey.iteration_frequency_days > 0 ? (
- Schedule
+ Schedule
Repeats every {survey.iteration_frequency_days}{' '}
{pluralize(
@@ -345,7 +345,7 @@ export function SurveyView({ id }: { id: string }): JSX.Element {
{surveyUsesLimit && (
<>
-
Completion conditions
+
Completion conditions
The survey will be stopped once {survey.responses_limit} {' '}
responses are received.
@@ -354,7 +354,7 @@ export function SurveyView({ id }: { id: string }): JSX.Element {
)}
{surveyUsesAdaptiveLimit && (
<>
- Completion conditions
+ Completion conditions
Survey response collection is limited to receive{' '}
{survey.response_sampling_limit} responses every{' '}
@@ -370,10 +370,10 @@ export function SurveyView({ id }: { id: string }): JSX.Element {
targetingFlagFilters={targetingFlagFilters}
/>
-
+
{survey.type === SurveyType.API && (
-
-
+
+
Learn how to set up API surveys{' '}
Get notified whenever a survey result is submitted
{surveyNPSScore}
- Latest NPS Score
+ Latest NPS Score
>
)}
@@ -544,7 +544,7 @@ export function SurveyResult({ disableEventsTable }: { disableEventsTable?: bool
}
})}
>
-
+
{tab === SurveysTabs.Settings && (
<>
-
+
These settings apply to new surveys in this organization.
@@ -165,7 +165,7 @@ export function Surveys(): JSX.Element {
)}
-
+
Get notified whenever a survey result is submitted
-
+
=10'}
+ /zod@3.24.1:
+ resolution: {integrity: sha512-muH7gBL9sI1nciMZV67X5fTKKBLtwpZ5VBp1vsOQzj1MhrBZ4wlVCm3gedKZWLp0Oyel8sIGfeiz54Su+OVT+A==}
+ dev: false
+
file:../rust/cyclotron-node:
resolution: {directory: ../rust/cyclotron-node, type: directory}
name: '@posthog/cyclotron'
diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts
index 6a9d30af15ff4..9cefda83bb90d 100644
--- a/plugin-server/src/capabilities.ts
+++ b/plugin-server/src/capabilities.ts
@@ -24,6 +24,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
appManagementSingleton: true,
preflightSchedules: true,
cdpProcessedEvents: true,
+ cdpInternalEvents: true,
cdpFunctionCallbacks: true,
cdpCyclotronWorker: true,
syncInlinePlugins: true,
@@ -98,6 +99,11 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpProcessedEvents: true,
...sharedCapabilities,
}
+ case PluginServerMode.cdp_internal_events:
+ return {
+ cdpInternalEvents: true,
+ ...sharedCapabilities,
+ }
case PluginServerMode.cdp_function_callbacks:
return {
cdpFunctionCallbacks: true,
diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts
index f738b559a0523..a219cd6242864 100644
--- a/plugin-server/src/cdp/cdp-consumers.ts
+++ b/plugin-server/src/cdp/cdp-consumers.ts
@@ -7,6 +7,7 @@ import { buildIntegerMatcher } from '../config/config'
import {
KAFKA_APP_METRICS_2,
KAFKA_CDP_FUNCTION_CALLBACKS,
+ KAFKA_CDP_INTERNAL_EVENTS,
KAFKA_EVENTS_JSON,
KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_LOG_ENTRIES,
@@ -37,6 +38,7 @@ import { HogFunctionManager } from './hog-function-manager'
import { HogMasker } from './hog-masker'
import { HogWatcher, HogWatcherState } from './hog-watcher'
import { CdpRedis, createCdpRedisPool } from './redis'
+import { CdpInternalEventSchema } from './schema'
import {
HogFunctionInvocation,
HogFunctionInvocationGlobals,
@@ -46,9 +48,11 @@ import {
HogFunctionLogEntrySerialized,
HogFunctionMessageToProduce,
HogFunctionType,
+ HogFunctionTypeType,
HogHooksFetchResponse,
} from './types'
import {
+ convertInternalEventToHogFunctionInvocationGlobals,
convertToCaptureEvent,
convertToHogFunctionInvocationGlobals,
createInvocation,
@@ -81,6 +85,12 @@ const counterFunctionInvocation = new Counter({
labelNames: ['outcome'], // One of 'failed', 'succeeded', 'overflowed', 'disabled', 'filtered'
})
+const counterParseError = new Counter({
+ name: 'cdp_function_parse_error',
+ help: 'A function invocation was parsed with an error',
+ labelNames: ['error'],
+})
+
const gaugeBatchUtilization = new Gauge({
name: 'cdp_cyclotron_batch_utilization',
help: 'Indicates how big batches are we are processing compared to the max batch size. Useful as a scaling metric',
@@ -110,6 +120,7 @@ abstract class CdpConsumerBase {
messagesToProduce: HogFunctionMessageToProduce[] = []
redis: CdpRedis
+ protected hogTypes: HogFunctionTypeType[] = []
protected kafkaProducer?: KafkaProducerWrapper
protected abstract name: string
@@ -363,7 +374,7 @@ abstract class CdpConsumerBase {
public async start(): Promise {
// NOTE: This is only for starting shared services
await Promise.all([
- this.hogFunctionManager.start(),
+ this.hogFunctionManager.start(this.hogTypes),
createKafkaProducerWrapper(this.hub).then((producer) => {
this.kafkaProducer = producer
this.kafkaProducer.producer.connect()
@@ -397,6 +408,10 @@ abstract class CdpConsumerBase {
*/
export class CdpProcessedEventsConsumer extends CdpConsumerBase {
protected name = 'CdpProcessedEventsConsumer'
+ protected topic = KAFKA_EVENTS_JSON
+ protected groupId = 'cdp-processed-events-consumer'
+ protected hogTypes: HogFunctionTypeType[] = ['destination']
+
private cyclotronMatcher: ValueMatcher
private cyclotronManager?: CyclotronManager
@@ -559,8 +574,8 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
}
// This consumer always parses from kafka
- public async _handleKafkaBatch(messages: Message[]): Promise {
- const invocationGlobals = await this.runWithHeartbeat(() =>
+ public async _parseKafkaBatch(messages: Message[]): Promise {
+ return await this.runWithHeartbeat(() =>
runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
func: async () => {
@@ -596,16 +611,17 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
},
})
)
-
- await this.processBatch(invocationGlobals)
}
public async start(): Promise {
await super.start()
await this.startKafkaConsumer({
- topic: KAFKA_EVENTS_JSON,
- groupId: 'cdp-processed-events-consumer',
- handleBatch: (messages) => this._handleKafkaBatch(messages),
+ topic: this.topic,
+ groupId: this.groupId,
+ handleBatch: async (messages) => {
+ const invocationGlobals = await this._parseKafkaBatch(messages)
+ await this.processBatch(invocationGlobals)
+ },
})
const shardDepthLimit = this.hub.CYCLOTRON_SHARD_DEPTH_LIMIT ?? 1000000
@@ -618,11 +634,66 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
}
}
+/**
+ * This consumer handles incoming events from the main clickhouse topic
+ * Currently it produces to both kafka and Cyclotron based on the team
+ */
+export class CdpInternalEventsConsumer extends CdpProcessedEventsConsumer {
+ protected name = 'CdpInternalEventsConsumer'
+ protected topic = KAFKA_CDP_INTERNAL_EVENTS
+ protected groupId = 'cdp-internal-events-consumer'
+ protected hogTypes: HogFunctionTypeType[] = ['internal_destination']
+
+ // This consumer always parses from kafka
+ public async _parseKafkaBatch(messages: Message[]): Promise {
+ return await this.runWithHeartbeat(() =>
+ runInstrumentedFunction({
+ statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
+ func: async () => {
+ const events: HogFunctionInvocationGlobals[] = []
+ await Promise.all(
+ messages.map(async (message) => {
+ try {
+ const kafkaEvent = JSON.parse(message.value!.toString()) as unknown
+ // This is the input stream from elsewhere so we want to do some proper validation
+ const event = CdpInternalEventSchema.parse(kafkaEvent)
+
+ if (!this.hogFunctionManager.teamHasHogDestinations(event.team_id)) {
+ // No need to continue if the team doesn't have any functions
+ return
+ }
+
+ const team = await this.hub.teamManager.fetchTeam(event.team_id)
+ if (!team) {
+ return
+ }
+ events.push(
+ convertInternalEventToHogFunctionInvocationGlobals(
+ event,
+ team,
+ this.hub.SITE_URL ?? 'http://localhost:8000'
+ )
+ )
+ } catch (e) {
+ status.error('Error parsing message', e)
+ counterParseError.labels({ error: e.message }).inc()
+ }
+ })
+ )
+
+ return events
+ },
+ })
+ )
+ }
+}
+
/**
* This consumer only deals with kafka messages and will eventually be replaced by the Cyclotron worker
*/
export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
protected name = 'CdpFunctionCallbackConsumer'
+ protected hogTypes: HogFunctionTypeType[] = ['destination', 'internal_destination']
public async processBatch(invocations: HogFunctionInvocation[]): Promise {
if (!invocations.length) {
@@ -658,8 +729,8 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
await this.produceQueuedMessages()
}
- public async _handleKafkaBatch(messages: Message[]): Promise {
- const events = await this.runWithHeartbeat(() =>
+ public async _parseKafkaBatch(messages: Message[]): Promise {
+ return await this.runWithHeartbeat(() =>
runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.parseKafkaMessages`,
func: async () => {
@@ -727,8 +798,6 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
},
})
)
-
- await this.processBatch(events)
}
public async start(): Promise {
@@ -736,7 +805,10 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
await this.startKafkaConsumer({
topic: KAFKA_CDP_FUNCTION_CALLBACKS,
groupId: 'cdp-function-callback-consumer',
- handleBatch: (messages) => this._handleKafkaBatch(messages),
+ handleBatch: async (messages) => {
+ const invocations = await this._parseKafkaBatch(messages)
+ await this.processBatch(invocations)
+ },
})
}
}
@@ -749,6 +821,7 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
private cyclotronWorker?: CyclotronWorker
private runningWorker: Promise | undefined
protected queue: 'hog' | 'fetch' = 'hog'
+ protected hogTypes: HogFunctionTypeType[] = ['destination', 'internal_destination']
public async processBatch(invocations: HogFunctionInvocation[]): Promise {
if (!invocations.length) {
diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts
index 1b843ca04c513..e45536dc947da 100644
--- a/plugin-server/src/cdp/hog-executor.ts
+++ b/plugin-server/src/cdp/hog-executor.ts
@@ -126,7 +126,7 @@ export class HogExecutor {
nonMatchingFunctions: HogFunctionType[]
erroredFunctions: [HogFunctionType, string][]
} {
- const allFunctionsForTeam = this.hogFunctionManager.getTeamHogDestinations(globals.project.id)
+ const allFunctionsForTeam = this.hogFunctionManager.getTeamHogFunctions(globals.project.id)
const filtersGlobals = convertToHogFunctionFilterGlobal(globals)
const nonMatchingFunctions: HogFunctionType[] = []
@@ -333,39 +333,39 @@ export class HogExecutor {
// We need to pass these in but they don't actually do anything as it is a sync exec
fetch: async () => Promise.resolve(),
},
- importBytecode: (module) => {
- // TODO: more than one hardcoded module
- if (module === 'provider/email') {
- const provider = this.hogFunctionManager.getTeamHogEmailProvider(invocation.teamId)
- if (!provider) {
- throw new Error('No email provider configured')
- }
- try {
- const providerGlobals = this.buildHogFunctionGlobals({
- id: '',
- teamId: invocation.teamId,
- hogFunction: provider,
- globals: {} as any,
- queue: 'hog',
- timings: [],
- priority: 0,
- } satisfies HogFunctionInvocation)
-
- return {
- bytecode: provider.bytecode,
- globals: providerGlobals,
- }
- } catch (e) {
- result.logs.push({
- level: 'error',
- timestamp: DateTime.now(),
- message: `Error building inputs: ${e}`,
- })
- throw e
- }
- }
- throw new Error(`Can't import unknown module: ${module}`)
- },
+ // importBytecode: (module) => {
+ // // TODO: more than one hardcoded module
+ // if (module === 'provider/email') {
+ // const provider = this.hogFunctionManager.getTeamHogEmailProvider(invocation.teamId)
+ // if (!provider) {
+ // throw new Error('No email provider configured')
+ // }
+ // try {
+ // const providerGlobals = this.buildHogFunctionGlobals({
+ // id: '',
+ // teamId: invocation.teamId,
+ // hogFunction: provider,
+ // globals: {} as any,
+ // queue: 'hog',
+ // timings: [],
+ // priority: 0,
+ // } satisfies HogFunctionInvocation)
+
+ // return {
+ // bytecode: provider.bytecode,
+ // globals: providerGlobals,
+ // }
+ // } catch (e) {
+ // result.logs.push({
+ // level: 'error',
+ // timestamp: DateTime.now(),
+ // message: `Error building inputs: ${e}`,
+ // })
+ // throw e
+ // }
+ // }
+ // throw new Error(`Can't import unknown module: ${module}`)
+ // },
functions: {
print: (...args) => {
hogLogs++
diff --git a/plugin-server/src/cdp/hog-function-manager.ts b/plugin-server/src/cdp/hog-function-manager.ts
index c53ff71952ec2..aea3ffb9b10e5 100644
--- a/plugin-server/src/cdp/hog-function-manager.ts
+++ b/plugin-server/src/cdp/hog-function-manager.ts
@@ -5,7 +5,7 @@ import { Hub, Team } from '../types'
import { PostgresUse } from '../utils/db/postgres'
import { PubSub } from '../utils/pubsub'
import { status } from '../utils/status'
-import { HogFunctionType, IntegrationType } from './types'
+import { HogFunctionType, HogFunctionTypeType, IntegrationType } from './types'
type HogFunctionCache = {
functions: Record
@@ -26,14 +26,13 @@ const HOG_FUNCTION_FIELDS = [
'type',
]
-const RELOAD_HOG_FUNCTION_TYPES = ['destination', 'email']
-
export class HogFunctionManager {
private started: boolean
private ready: boolean
private cache: HogFunctionCache
private pubSub: PubSub
private refreshJob?: schedule.Job
+ private hogTypes: HogFunctionTypeType[] = []
constructor(private hub: Hub) {
this.started = false
@@ -60,7 +59,8 @@ export class HogFunctionManager {
})
}
- public async start(): Promise {
+ public async start(hogTypes: HogFunctionTypeType[]): Promise {
+ this.hogTypes = hogTypes
// TRICKY - when running with individual capabilities, this won't run twice but locally or as a complete service it will...
if (this.started) {
return
@@ -96,14 +96,6 @@ export class HogFunctionManager {
.filter((x) => !!x) as HogFunctionType[]
}
- public getTeamHogDestinations(teamId: Team['id']): HogFunctionType[] {
- return this.getTeamHogFunctions(teamId).filter((x) => x.type === 'destination' || !x.type)
- }
-
- public getTeamHogEmailProvider(teamId: Team['id']): HogFunctionType | undefined {
- return this.getTeamHogFunctions(teamId).find((x) => x.type === 'email')
- }
-
public getHogFunction(id: HogFunctionType['id']): HogFunctionType | undefined {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
@@ -124,7 +116,7 @@ export class HogFunctionManager {
}
public teamHasHogDestinations(teamId: Team['id']): boolean {
- return !!Object.keys(this.getTeamHogDestinations(teamId)).length
+ return !!Object.keys(this.getTeamHogFunctions(teamId)).length
}
public async reloadAllHogFunctions(): Promise {
@@ -134,9 +126,9 @@ export class HogFunctionManager {
`
SELECT ${HOG_FUNCTION_FIELDS.join(', ')}
FROM posthog_hogfunction
- WHERE deleted = FALSE AND enabled = TRUE AND (type is NULL or type = ANY($1))
+ WHERE deleted = FALSE AND enabled = TRUE AND type = ANY($1)
`,
- [RELOAD_HOG_FUNCTION_TYPES],
+ [this.hogTypes],
'fetchAllHogFunctions'
)
).rows
@@ -167,8 +159,8 @@ export class HogFunctionManager {
PostgresUse.COMMON_READ,
`SELECT ${HOG_FUNCTION_FIELDS.join(', ')}
FROM posthog_hogfunction
- WHERE id = ANY($1) AND deleted = FALSE AND enabled = TRUE`,
- [ids],
+ WHERE id = ANY($1) AND deleted = FALSE AND enabled = TRUE AND type = ANY($2)`,
+ [ids, this.hogTypes],
'fetchEnabledHogFunctions'
)
).rows
@@ -218,6 +210,11 @@ export class HogFunctionManager {
items.forEach((item) => {
const encryptedInputsString = item.encrypted_inputs as string | undefined
+ if (!Array.isArray(item.inputs_schema)) {
+ // NOTE: The sql lib can sometimes return an empty object instead of an empty array
+ item.inputs_schema = []
+ }
+
if (encryptedInputsString) {
try {
const decrypted = this.hub.encryptedFields.decrypt(encryptedInputsString || '')
diff --git a/plugin-server/src/cdp/schema.ts b/plugin-server/src/cdp/schema.ts
new file mode 100644
index 0000000000000..35dbf01e5e3f3
--- /dev/null
+++ b/plugin-server/src/cdp/schema.ts
@@ -0,0 +1,27 @@
+import { z } from 'zod'
+
+export const CdpInternalEventSchema = z.object({
+ team_id: z.number(),
+ event: z.object({
+ uuid: z.string(),
+ event: z.string(),
+ // In this context distinct_id should be whatever we want to use if doing follow up things (like tracking a standard event)
+ distinct_id: z.string(),
+ properties: z.record(z.any()),
+ timestamp: z.string(),
+ url: z.string().optional().nullable(),
+ }),
+ // Person may be a event-style person or an org member
+ person: z
+ .object({
+ id: z.string(),
+ properties: z.record(z.any()),
+ name: z.string().optional().nullable(),
+ url: z.string().optional().nullable(),
+ })
+ .optional()
+ .nullable(),
+})
+
+// Infer the TypeScript type
+export type CdpInternalEvent = z.infer
diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts
index dfe0464a1f9ec..9f7b8bba7433f 100644
--- a/plugin-server/src/cdp/types.ts
+++ b/plugin-server/src/cdp/types.ts
@@ -275,7 +275,16 @@ export type HogFunctionInputSchemaType = {
requiredScopes?: string
}
-export type HogFunctionTypeType = 'destination' | 'email' | 'sms' | 'push' | 'activity' | 'alert' | 'broadcast'
+export type HogFunctionTypeType =
+ | 'destination'
+ | 'transformation'
+ | 'internal_destination'
+ | 'email'
+ | 'sms'
+ | 'push'
+ | 'activity'
+ | 'alert'
+ | 'broadcast'
export type HogFunctionType = {
id: string
diff --git a/plugin-server/src/cdp/utils.ts b/plugin-server/src/cdp/utils.ts
index f4c09b602a514..73909ec9e5a7b 100644
--- a/plugin-server/src/cdp/utils.ts
+++ b/plugin-server/src/cdp/utils.ts
@@ -11,6 +11,7 @@ import { RawClickHouseEvent, Team, TimestampFormat } from '../types'
import { safeClickhouseString } from '../utils/db/utils'
import { status } from '../utils/status'
import { castTimestampOrNow, clickHouseTimestampToISO, UUIDT } from '../utils/utils'
+import { CdpInternalEvent } from './schema'
import {
HogFunctionCapturedEvent,
HogFunctionFilterGlobals,
@@ -90,6 +91,47 @@ export function convertToHogFunctionInvocationGlobals(
return context
}
+export function convertInternalEventToHogFunctionInvocationGlobals(
+ data: CdpInternalEvent,
+ team: Team,
+ siteUrl: string
+): HogFunctionInvocationGlobals {
+ const projectUrl = `${siteUrl}/project/${team.id}`
+
+ let person: HogFunctionInvocationGlobals['person']
+
+ if (data.person) {
+ const personDisplayName = getPersonDisplayName(team, data.event.distinct_id, data.person.properties)
+
+ person = {
+ id: data.person.id,
+ properties: data.person.properties,
+ name: personDisplayName,
+ url: data.person.url ?? '',
+ }
+ }
+
+ const context: HogFunctionInvocationGlobals = {
+ project: {
+ id: team.id,
+ name: team.name,
+ url: projectUrl,
+ },
+ event: {
+ uuid: data.event.uuid,
+ event: data.event.event,
+ elements_chain: '', // Not applicable but left here for compatibility
+ distinct_id: data.event.distinct_id,
+ properties: data.event.properties,
+ timestamp: data.event.timestamp,
+ url: data.event.url ?? '',
+ },
+ person,
+ }
+
+ return context
+}
+
function getElementsChainHref(elementsChain: string): string {
// Adapted from SQL: extract(elements_chain, '(?::|\")href="(.*?)"'),
const hrefRegex = new RE2(/(?::|")href="(.*?)"/)
diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts
index 8610bf8f0b819..79959a951e9a7 100644
--- a/plugin-server/src/config/kafka-topics.ts
+++ b/plugin-server/src/config/kafka-topics.ts
@@ -45,6 +45,7 @@ export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}`
// CDP topics
export const KAFKA_CDP_FUNCTION_CALLBACKS = `${prefix}cdp_function_callbacks${suffix}`
export const KAFKA_CDP_FUNCTION_OVERFLOW = `${prefix}cdp_function_overflow${suffix}`
+export const KAFKA_CDP_INTERNAL_EVENTS = `${prefix}cdp_internal_events${suffix}`
// Error tracking topics
export const KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS = `${prefix}exception_symbolification_events${suffix}`
diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts
index 2f1082f2aa5b4..66f4eac0ea0f7 100644
--- a/plugin-server/src/kafka/batch-consumer.ts
+++ b/plugin-server/src/kafka/batch-consumer.ts
@@ -249,6 +249,8 @@ export const startBatchConsumer = async ({
let batchesProcessed = 0
const statusLogInterval = setInterval(() => {
status.info('🔁', 'main_loop', {
+ groupId,
+ topic,
messagesPerSecond: messagesProcessed / (STATUS_LOG_INTERVAL_MS / 1000),
batchesProcessed: batchesProcessed,
lastHeartbeatTime: new Date(lastHeartbeatTime).toISOString(),
diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts
index d61f3bb5e0510..ac482ca21a6fa 100644
--- a/plugin-server/src/main/pluginsServer.ts
+++ b/plugin-server/src/main/pluginsServer.ts
@@ -15,6 +15,7 @@ import {
CdpCyclotronWorker,
CdpCyclotronWorkerFetch,
CdpFunctionCallbackConsumer,
+ CdpInternalEventsConsumer,
CdpProcessedEventsConsumer,
} from '../cdp/cdp-consumers'
import { defaultConfig } from '../config/config'
@@ -451,6 +452,13 @@ export async function startPluginsServer(
services.push(consumer.service)
}
+ if (capabilities.cdpInternalEvents) {
+ const hub = await setupHub()
+ const consumer = new CdpInternalEventsConsumer(hub)
+ await consumer.start()
+ services.push(consumer.service)
+ }
+
if (capabilities.cdpFunctionCallbacks) {
const hub = await setupHub()
const consumer = new CdpFunctionCallbackConsumer(hub)
diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts
index 1263a896d04a3..390f7d8d3a5a5 100644
--- a/plugin-server/src/types.ts
+++ b/plugin-server/src/types.ts
@@ -84,6 +84,7 @@ export enum PluginServerMode {
recordings_blob_ingestion = 'recordings-blob-ingestion',
recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow',
cdp_processed_events = 'cdp-processed-events',
+ cdp_internal_events = 'cdp-internal-events',
cdp_function_callbacks = 'cdp-function-callbacks',
cdp_cyclotron_worker = 'cdp-cyclotron-worker',
functional_tests = 'functional-tests',
@@ -358,6 +359,7 @@ export interface PluginServerCapabilities {
sessionRecordingBlobIngestion?: boolean
sessionRecordingBlobOverflowIngestion?: boolean
cdpProcessedEvents?: boolean
+ cdpInternalEvents?: boolean
cdpFunctionCallbacks?: boolean
cdpCyclotronWorker?: boolean
appManagementSingleton?: boolean
diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-events-consumer.test.ts
similarity index 95%
rename from plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts
rename to plugin-server/tests/cdp/cdp-events-consumer.test.ts
index c559a4240fca4..db400a56672b3 100644
--- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts
+++ b/plugin-server/tests/cdp/cdp-events-consumer.test.ts
@@ -1,4 +1,4 @@
-import { CdpProcessedEventsConsumer } from '../../src/cdp/cdp-consumers'
+import { CdpInternalEventsConsumer, CdpProcessedEventsConsumer } from '../../src/cdp/cdp-consumers'
import { HogWatcherState } from '../../src/cdp/hog-watcher'
import { HogFunctionInvocationGlobals, HogFunctionType } from '../../src/cdp/types'
import { Hub, Team } from '../../src/types'
@@ -74,13 +74,22 @@ const decodeAllKafkaMessages = (): any[] => {
return mockProducer.produce.mock.calls.map((x) => decodeKafkaMessage(x[0]))
}
-describe('CDP Processed Events Consumer', () => {
- let processor: CdpProcessedEventsConsumer
+/**
+ * NOTE: The internal and normal events consumers are very similar so we can test them together
+ */
+describe.each([
+ [CdpProcessedEventsConsumer.name, CdpProcessedEventsConsumer, 'destination' as const],
+ [CdpInternalEventsConsumer.name, CdpInternalEventsConsumer, 'internal_destination' as const],
+])('%s', (_name, Consumer, hogType) => {
+ let processor: CdpProcessedEventsConsumer | CdpInternalEventsConsumer
let hub: Hub
let team: Team
const insertHogFunction = async (hogFunction: Partial) => {
- const item = await _insertHogFunction(hub.postgres, team.id, hogFunction)
+ const item = await _insertHogFunction(hub.postgres, team.id, {
+ ...hogFunction,
+ type: hogType,
+ })
// Trigger the reload that django would do
await processor.hogFunctionManager.reloadAllHogFunctions()
return item
@@ -91,7 +100,7 @@ describe('CDP Processed Events Consumer', () => {
hub = await createHub()
team = await getFirstTeam(hub)
- processor = new CdpProcessedEventsConsumer(hub)
+ processor = new Consumer(hub)
await processor.start()
mockFetch.mockClear()
diff --git a/plugin-server/tests/cdp/cdp-internal-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-internal-events-consumer.test.ts
new file mode 100644
index 0000000000000..995b2eeae2667
--- /dev/null
+++ b/plugin-server/tests/cdp/cdp-internal-events-consumer.test.ts
@@ -0,0 +1,99 @@
+import { CdpInternalEventsConsumer } from '../../src/cdp/cdp-consumers'
+import { HogFunctionType } from '../../src/cdp/types'
+import { Hub, Team } from '../../src/types'
+import { closeHub, createHub } from '../../src/utils/db/hub'
+import { getFirstTeam, resetTestDatabase } from '../helpers/sql'
+import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples'
+import { createInternalEvent, createKafkaMessage, insertHogFunction as _insertHogFunction } from './fixtures'
+
+describe('CDP Internal Events Consumer', () => {
+ let processor: CdpInternalEventsConsumer
+ let hub: Hub
+ let team: Team
+
+ const insertHogFunction = async (hogFunction: Partial) => {
+ const item = await _insertHogFunction(hub.postgres, team.id, hogFunction)
+ // Trigger the reload that django would do
+ await processor.hogFunctionManager.reloadAllHogFunctions()
+ return item
+ }
+
+ beforeEach(async () => {
+ await resetTestDatabase()
+ hub = await createHub()
+ team = await getFirstTeam(hub)
+
+ processor = new CdpInternalEventsConsumer(hub)
+ // Speed hack as we don't need all of kafka to be started for this test
+ await processor.hogFunctionManager.start(processor['hogTypes'])
+ })
+
+ afterEach(async () => {
+ jest.setTimeout(1000)
+ await closeHub(hub)
+ })
+
+ afterAll(() => {
+ jest.useRealTimers()
+ })
+
+ describe('_handleKafkaBatch', () => {
+ it('should ignore invalid message', async () => {
+ const events = await processor._parseKafkaBatch([createKafkaMessage({})])
+ expect(events).toHaveLength(0)
+ })
+
+ it('should ignore message with no team', async () => {
+ const events = await processor._parseKafkaBatch([createKafkaMessage(createInternalEvent(999999, {}))])
+ expect(events).toHaveLength(0)
+ })
+
+ describe('with an existing team and hog function', () => {
+ beforeEach(async () => {
+ await insertHogFunction({
+ ...HOG_EXAMPLES.simple_fetch,
+ ...HOG_INPUTS_EXAMPLES.simple_fetch,
+ ...HOG_FILTERS_EXAMPLES.no_filters,
+ type: 'internal_destination',
+ })
+ })
+
+ it('should ignore invalid payloads', async () => {
+ const events = await processor._parseKafkaBatch([
+ createKafkaMessage(
+ createInternalEvent(team.id, {
+ event: 'WRONG' as any,
+ })
+ ),
+ ])
+ expect(events).toHaveLength(0)
+ })
+
+ it('should parse a valid message with an existing team and hog function ', async () => {
+ const event = createInternalEvent(team.id, {})
+ event.event.timestamp = '2024-12-18T15:06:23.545Z'
+ event.event.uuid = 'b6da2f33-ba54-4550-9773-50d3278ad61f'
+
+ const events = await processor._parseKafkaBatch([createKafkaMessage(event)])
+ expect(events).toHaveLength(1)
+ expect(events[0]).toEqual({
+ event: {
+ distinct_id: 'distinct_id',
+ elements_chain: '',
+ event: '$pageview',
+ properties: {},
+ timestamp: '2024-12-18T15:06:23.545Z',
+ url: '',
+ uuid: 'b6da2f33-ba54-4550-9773-50d3278ad61f',
+ },
+ person: undefined,
+ project: {
+ id: 2,
+ name: 'TEST PROJECT',
+ url: 'http://localhost:8000/project/2',
+ },
+ })
+ })
+ })
+ })
+})
diff --git a/plugin-server/tests/cdp/fixtures.ts b/plugin-server/tests/cdp/fixtures.ts
index e34920fdd981e..79c56798866db 100644
--- a/plugin-server/tests/cdp/fixtures.ts
+++ b/plugin-server/tests/cdp/fixtures.ts
@@ -1,6 +1,7 @@
import { randomUUID } from 'crypto'
import { Message } from 'node-rdkafka'
+import { CdpInternalEvent } from '../../src/cdp/schema'
import {
HogFunctionInvocation,
HogFunctionInvocationGlobals,
@@ -60,7 +61,7 @@ export const createIncomingEvent = (teamId: number, data: Partial = {}): Message => {
+export const createKafkaMessage = (event: any, overrides: Partial = {}): Message => {
return {
partition: 1,
topic: 'test',
@@ -72,6 +73,20 @@ export const createMessage = (event: RawClickHouseEvent, overrides: Partial): CdpInternalEvent => {
+ return {
+ team_id: teamId,
+ event: {
+ timestamp: new Date().toISOString(),
+ properties: {},
+ uuid: randomUUID(),
+ event: '$pageview',
+ distinct_id: 'distinct_id',
+ },
+ ...data,
+ }
+}
+
export const insertHogFunction = async (
postgres: PostgresRouter,
team_id: Team['id'],
diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts
index aeacc1067d0f4..99feb53d62207 100644
--- a/plugin-server/tests/cdp/hog-executor.test.ts
+++ b/plugin-server/tests/cdp/hog-executor.test.ts
@@ -48,9 +48,8 @@ describe('Hog Executor', () => {
const mockFunctionManager = {
reloadAllHogFunctions: jest.fn(),
- getTeamHogDestinations: jest.fn(),
+ getTeamHogFunctions: jest.fn(),
getTeamHogFunction: jest.fn(),
- getTeamHogEmailProvider: jest.fn(),
}
beforeEach(async () => {
@@ -70,7 +69,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.no_filters,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([hogFunction])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([hogFunction])
mockFunctionManager.getTeamHogFunction.mockReturnValue(hogFunction)
})
@@ -254,7 +253,7 @@ describe('Hog Executor', () => {
})
})
- describe('email provider functions', () => {
+ describe.skip('email provider functions', () => {
let hogFunction: HogFunctionType
let providerFunction: HogFunctionType
beforeEach(() => {
@@ -270,9 +269,9 @@ describe('Hog Executor', () => {
...HOG_INPUTS_EXAMPLES.email,
...HOG_FILTERS_EXAMPLES.no_filters,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([hogFunction, providerFunction])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([hogFunction, providerFunction])
mockFunctionManager.getTeamHogFunction.mockReturnValue(hogFunction)
- mockFunctionManager.getTeamHogEmailProvider.mockReturnValue(providerFunction)
+ // mockFunctionManager.getTeamHogEmailProvider.mockReturnValue(providerFunction)
})
it('can execute an invocation', () => {
@@ -326,7 +325,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.pageview_or_autocapture_filter,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const resultsShouldntMatch = executor.findMatchingFunctions(createHogExecutionGlobals({ groups: {} }))
expect(resultsShouldntMatch.matchingFunctions).toHaveLength(0)
@@ -356,7 +355,7 @@ describe('Hog Executor', () => {
...HOG_INPUTS_EXAMPLES.simple_fetch,
...HOG_FILTERS_EXAMPLES.broken_filters,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const resultsShouldMatch = executor.findMatchingFunctions(
createHogExecutionGlobals({
groups: {},
@@ -388,7 +387,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.elements_text_filter,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const elementsChain = (buttonText: string) =>
`span.LemonButton__content:attr__class="LemonButton__content"nth-child="2"nth-of-type="2"text="${buttonText}";span.LemonButton__chrome:attr__class="LemonButton__chrome"nth-child="1"nth-of-type="1";button.LemonButton.LemonButton--has-icon.LemonButton--secondary.LemonButton--status-default:attr__class="LemonButton LemonButton--secondary LemonButton--status-default LemonButton--has-icon"attr__type="button"nth-child="1"nth-of-type="1"text="${buttonText}";div.flex.gap-4.items-center:attr__class="flex gap-4 items-center"nth-child="1"nth-of-type="1";div.flex.flex-wrap.gap-4.justify-between:attr__class="flex gap-4 justify-between flex-wrap"nth-child="3"nth-of-type="3";div.flex.flex-1.flex-col.gap-4.h-full.relative.w-full:attr__class="relative w-full flex flex-col gap-4 flex-1 h-full"nth-child="1"nth-of-type="1";div.LemonTabs__content:attr__class="LemonTabs__content"nth-child="2"nth-of-type="1";div.LemonTabs.LemonTabs--medium:attr__class="LemonTabs LemonTabs--medium"attr__style="--lemon-tabs-slider-width: 48px; --lemon-tabs-slider-offset: 0px;"nth-child="1"nth-of-type="1";div.Navigation3000__scene:attr__class="Navigation3000__scene"nth-child="2"nth-of-type="2";main:nth-child="2"nth-of-type="1";div.Navigation3000:attr__class="Navigation3000"nth-child="1"nth-of-type="1";div:attr__id="root"attr_id="root"nth-child="3"nth-of-type="1";body.overflow-hidden:attr__class="overflow-hidden"attr__theme="light"nth-child="2"nth-of-type="1"`
@@ -438,7 +437,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.elements_href_filter,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const elementsChain = (link: string) =>
`span.LemonButton__content:attr__class="LemonButton__content"attr__href="${link}"href="${link}"nth-child="2"nth-of-type="2"text="Activity";span.LemonButton__chrome:attr__class="LemonButton__chrome"nth-child="1"nth-of-type="1";a.LemonButton.LemonButton--full-width.LemonButton--has-icon.LemonButton--secondary.LemonButton--status-alt.Link.NavbarButton:attr__class="Link LemonButton LemonButton--secondary LemonButton--status-alt LemonButton--full-width LemonButton--has-icon NavbarButton"attr__data-attr="menu-item-activity"attr__href="${link}"href="${link}"nth-child="1"nth-of-type="1"text="Activity";li.w-full:attr__class="w-full"nth-child="6"nth-of-type="6";ul:nth-child="1"nth-of-type="1";div.Navbar3000__top.ScrollableShadows__inner:attr__class="ScrollableShadows__inner Navbar3000__top"nth-child="1"nth-of-type="1";div.ScrollableShadows.ScrollableShadows--vertical:attr__class="ScrollableShadows ScrollableShadows--vertical"nth-child="1"nth-of-type="1";div.Navbar3000__content:attr__class="Navbar3000__content"nth-child="1"nth-of-type="1";nav.Navbar3000:attr__class="Navbar3000"nth-child="1"nth-of-type="1";div.Navigation3000:attr__class="Navigation3000"nth-child="1"nth-of-type="1";div:attr__id="root"attr_id="root"nth-child="3"nth-of-type="1";body.overflow-hidden:attr__class="overflow-hidden"attr__theme="light"nth-child="2"nth-of-type="1"`
@@ -488,7 +487,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.elements_tag_and_id_filter,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const elementsChain = (id: string) =>
`a.Link.font-semibold.text-text-3000.text-xl:attr__class="Link font-semibold text-xl text-text-3000"attr__href="/project/1/dashboard/1"attr__id="${id}"attr_id="${id}"href="/project/1/dashboard/1"nth-child="1"nth-of-type="1"text="My App Dashboard";div.ProjectHomepage__dashboardheader__title:attr__class="ProjectHomepage__dashboardheader__title"nth-child="1"nth-of-type="1";div.ProjectHomepage__dashboardheader:attr__class="ProjectHomepage__dashboardheader"nth-child="2"nth-of-type="2";div.ProjectHomepage:attr__class="ProjectHomepage"nth-child="1"nth-of-type="1";div.Navigation3000__scene:attr__class="Navigation3000__scene"nth-child="2"nth-of-type="2";main:nth-child="2"nth-of-type="1";div.Navigation3000:attr__class="Navigation3000"nth-child="1"nth-of-type="1";div:attr__id="root"attr_id="root"nth-child="3"nth-of-type="1";body.overflow-hidden:attr__class="overflow-hidden"attr__theme="light"nth-child="2"nth-of-type="1"`
@@ -579,7 +578,7 @@ describe('Hog Executor', () => {
...HOG_FILTERS_EXAMPLES.no_filters,
})
- mockFunctionManager.getTeamHogDestinations.mockReturnValue([fn])
+ mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
const result = executor.execute(createInvocation(fn))
expect(result.error).toContain('Execution timed out after 0.1 seconds. Performed ')
diff --git a/plugin-server/tests/cdp/hog-function-manager.test.ts b/plugin-server/tests/cdp/hog-function-manager.test.ts
index d5d5b575dd3ec..752927c3d53dd 100644
--- a/plugin-server/tests/cdp/hog-function-manager.test.ts
+++ b/plugin-server/tests/cdp/hog-function-manager.test.ts
@@ -62,22 +62,31 @@ describe('HogFunctionManager', () => {
hogFunctions.push(
await insertHogFunction(hub.postgres, teamId1, {
- name: 'Email Provider team 1',
- type: 'email',
- inputs_schema: [
- {
- type: 'email',
- key: 'message',
- },
- ],
- inputs: {
- email: {
- value: { from: 'me@a.com', to: 'you@b.com', subject: 'subject', html: 'text' },
- },
- },
+ name: 'Test Hog Function team 1 - transformation',
+ type: 'transformation',
+ inputs_schema: [],
+ inputs: {},
})
)
+ // hogFunctions.push(
+ // await insertHogFunction(hub.postgres, teamId1, {
+ // name: 'Email Provider team 1',
+ // type: 'email',
+ // inputs_schema: [
+ // {
+ // type: 'email',
+ // key: 'message',
+ // },
+ // ],
+ // inputs: {
+ // email: {
+ // value: { from: 'me@a.com', to: 'you@b.com', subject: 'subject', html: 'text' },
+ // },
+ // },
+ // })
+ // )
+
hogFunctions.push(
await insertHogFunction(hub.postgres, teamId2, {
name: 'Test Hog Function team 2',
@@ -98,7 +107,7 @@ describe('HogFunctionManager', () => {
})
)
- await manager.start()
+ await manager.start(['destination'])
})
afterEach(async () => {
@@ -107,7 +116,7 @@ describe('HogFunctionManager', () => {
})
it('returns the hog functions', async () => {
- let items = manager.getTeamHogDestinations(teamId1)
+ let items = manager.getTeamHogFunctions(teamId1)
expect(items).toEqual([
{
@@ -142,13 +151,6 @@ describe('HogFunctionManager', () => {
},
])
- const allFunctions = manager.getTeamHogFunctions(teamId1)
- expect(allFunctions.length).toEqual(2)
- expect(allFunctions.map((f) => f.type).sort()).toEqual(['destination', 'email'])
-
- const emailProvider = manager.getTeamHogEmailProvider(teamId1)
- expect(emailProvider.type).toEqual('email')
-
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`UPDATE posthog_hogfunction SET name='Test Hog Function team 1 updated' WHERE id = $1`,
@@ -159,7 +161,7 @@ describe('HogFunctionManager', () => {
// This is normally dispatched by django
await manager.reloadHogFunctions(teamId1, [hogFunctions[0].id])
- items = manager.getTeamHogDestinations(teamId1)
+ items = manager.getTeamHogFunctions(teamId1)
expect(items).toMatchObject([
{
@@ -169,8 +171,21 @@ describe('HogFunctionManager', () => {
])
})
+ it('filters hog functions by type', async () => {
+ manager['hogTypes'] = ['transformation']
+ await manager.reloadAllHogFunctions()
+ expect(manager.getTeamHogFunctions(teamId1).length).toEqual(1)
+ expect(manager.getTeamHogFunctions(teamId1)[0].type).toEqual('transformation')
+
+ manager['hogTypes'] = ['transformation', 'destination']
+ await manager.reloadAllHogFunctions()
+ expect(manager.getTeamHogFunctions(teamId1).length).toEqual(2)
+ expect(manager.getTeamHogFunctions(teamId1)[0].type).toEqual('destination')
+ expect(manager.getTeamHogFunctions(teamId1)[1].type).toEqual('transformation')
+ })
+
it('removes disabled functions', async () => {
- let items = manager.getTeamHogDestinations(teamId1)
+ let items = manager.getTeamHogFunctions(teamId1)
expect(items).toMatchObject([
{
@@ -188,14 +203,14 @@ describe('HogFunctionManager', () => {
// This is normally dispatched by django
await manager.reloadHogFunctions(teamId1, [hogFunctions[0].id])
- items = manager.getTeamHogDestinations(teamId1)
+ items = manager.getTeamHogFunctions(teamId1)
expect(items).toEqual([])
})
it('enriches integration inputs if found and belonging to the team', () => {
- const function1Inputs = manager.getTeamHogDestinations(teamId1)[0].inputs
- const function2Inputs = manager.getTeamHogDestinations(teamId2)[0].inputs
+ const function1Inputs = manager.getTeamHogFunctions(teamId1)[0].inputs
+ const function2Inputs = manager.getTeamHogFunctions(teamId2)[0].inputs
// Only the right team gets the integration inputs enriched
expect(function1Inputs).toEqual({
diff --git a/posthog/api/hog_function_template.py b/posthog/api/hog_function_template.py
index 38641031167ad..1f8151e161bec 100644
--- a/posthog/api/hog_function_template.py
+++ b/posthog/api/hog_function_template.py
@@ -5,7 +5,7 @@
from rest_framework.response import Response
from rest_framework.exceptions import NotFound
-from posthog.cdp.templates import HOG_FUNCTION_TEMPLATES
+from posthog.cdp.templates import HOG_FUNCTION_SUB_TEMPLATES, HOG_FUNCTION_TEMPLATES, ALL_HOG_FUNCTION_TEMPLATES_BY_ID
from posthog.cdp.templates.hog_function_template import (
HogFunctionMapping,
HogFunctionMappingTemplate,
@@ -51,17 +51,33 @@ class PublicHogFunctionTemplateViewSet(viewsets.GenericViewSet):
def list(self, request: Request, *args, **kwargs):
types = ["destination"]
+
+ sub_template_id = request.GET.get("sub_template_id")
+
if "type" in request.GET:
types = [self.request.GET.get("type", "destination")]
elif "types" in request.GET:
types = self.request.GET.get("types", "destination").split(",")
- templates = [item for item in HOG_FUNCTION_TEMPLATES if item.type in types]
- page = self.paginate_queryset(templates)
+
+ templates_list = HOG_FUNCTION_SUB_TEMPLATES if sub_template_id else HOG_FUNCTION_TEMPLATES
+
+ matching_templates = []
+
+ for template in templates_list:
+ if template.type not in types:
+ continue
+
+ if sub_template_id and sub_template_id not in template.id:
+ continue
+
+ matching_templates.append(template)
+
+ page = self.paginate_queryset(matching_templates)
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
def retrieve(self, request: Request, *args, **kwargs):
- item = next((item for item in HOG_FUNCTION_TEMPLATES if item.id == kwargs["pk"]), None)
+ item = ALL_HOG_FUNCTION_TEMPLATES_BY_ID.get(kwargs["pk"], None)
if not item:
raise NotFound(f"Template with id {kwargs['pk']} not found.")
diff --git a/posthog/api/test/test_hog_function_templates.py b/posthog/api/test/test_hog_function_templates.py
index 7a9b5150f5acd..4a34e36f88235 100644
--- a/posthog/api/test/test_hog_function_templates.py
+++ b/posthog/api/test/test_hog_function_templates.py
@@ -1,6 +1,8 @@
from unittest.mock import ANY
+from inline_snapshot import snapshot
from rest_framework import status
+from posthog.cdp.templates.hog_function_template import derive_sub_templates
from posthog.test.base import APIBaseTest, ClickhouseTestMixin, QueryMatchingTest
from posthog.cdp.templates.slack.template_slack import template
@@ -23,6 +25,22 @@
}
+class TestHogFunctionTemplatesMixin(APIBaseTest):
+ def test_derive_sub_templates(self):
+ # One sanity check test (rather than all of them)
+ sub_templates = derive_sub_templates([template])
+
+ # check overridden params
+ assert sub_templates[0].inputs_schema[-1]["key"] == "text"
+ assert sub_templates[0].inputs_schema[-1]["default"] == snapshot(
+ "*{person.name}* {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'"
+ )
+ assert sub_templates[0].filters == snapshot(
+ {"events": [{"id": "$feature_enrollment_update", "type": "events"}]}
+ )
+ assert sub_templates[0].type == "destination"
+
+
class TestHogFunctionTemplates(ClickhouseTestMixin, APIBaseTest, QueryMatchingTest):
def test_list_function_templates(self):
response = self.client.get("/api/projects/@current/hog_function_templates/")
@@ -48,6 +66,29 @@ def test_filter_function_templates(self):
response5 = self.client.get("/api/projects/@current/hog_function_templates/?types=site_destination,destination")
assert len(response5.json()["results"]) > 0
+ def test_filter_sub_templates(self):
+ response1 = self.client.get(
+ "/api/projects/@current/hog_function_templates/?type=internal_destination&sub_template_id=activity-log"
+ )
+ assert response1.status_code == status.HTTP_200_OK, response1.json()
+ assert len(response1.json()["results"]) > 0
+
+ template = response1.json()["results"][0]
+
+ assert template["sub_templates"] is None
+ assert template["type"] == "internal_destination"
+ assert template["id"] == "template-slack-activity-log"
+
+ def test_retrieve_function_template(self):
+ response = self.client.get("/api/projects/@current/hog_function_templates/template-slack")
+ assert response.status_code == status.HTTP_200_OK, response.json()
+ assert response.json()["id"] == "template-slack"
+
+ def test_retrieve_function_sub_template(self):
+ response = self.client.get("/api/projects/@current/hog_function_templates/template-slack-activity-log")
+ assert response.status_code == status.HTTP_200_OK, response.json()
+ assert response.json()["id"] == "template-slack-activity-log"
+
def test_public_list_function_templates(self):
self.client.logout()
response = self.client.get("/api/public_hog_function_templates/")
diff --git a/posthog/cdp/internal_events.py b/posthog/cdp/internal_events.py
new file mode 100644
index 0000000000000..ba945ede3f2ff
--- /dev/null
+++ b/posthog/cdp/internal_events.py
@@ -0,0 +1,72 @@
+from dataclasses import dataclass
+from datetime import datetime
+from typing import Optional
+import uuid
+
+import structlog
+from posthog.kafka_client.client import KafkaProducer
+from posthog.kafka_client.topics import KAFKA_CDP_INTERNAL_EVENTS
+from rest_framework_dataclasses.serializers import DataclassSerializer
+
+logger = structlog.get_logger(__name__)
+
+
+@dataclass
+class InternalEventEvent:
+ event: str
+ distinct_id: str
+ properties: dict
+ timestamp: Optional[str] = None
+ url: Optional[str] = None
+ uuid: Optional[str] = None
+
+
+@dataclass
+class InternalEventPerson:
+ id: str
+ properties: dict
+ name: Optional[str] = None
+ url: Optional[str] = None
+
+
+@dataclass
+class InternalEvent:
+ team_id: int
+ event: InternalEventEvent
+ person: Optional[InternalEventPerson] = None
+
+
+class InternalEventSerializer(DataclassSerializer):
+ class Meta:
+ dataclass = InternalEvent
+
+
+def internal_event_to_dict(data: InternalEvent) -> dict:
+ return InternalEventSerializer(data).data
+
+
+def create_internal_event(
+ team_id: int, event: InternalEventEvent, person: Optional[InternalEventPerson] = None
+) -> InternalEvent:
+ data = InternalEvent(team_id=team_id, event=event, person=person)
+
+ if data.event.uuid is None:
+ data.event.uuid = str(uuid.uuid4())
+ if data.event.timestamp is None:
+ data.event.timestamp = datetime.now().isoformat()
+
+ return data
+
+
+def produce_internal_event(team_id: int, event: InternalEventEvent, person: Optional[InternalEventPerson] = None):
+ data = create_internal_event(team_id, event, person)
+ serialized_data = internal_event_to_dict(data)
+ kafka_topic = KAFKA_CDP_INTERNAL_EVENTS
+
+ try:
+ producer = KafkaProducer()
+ future = producer.produce(topic=kafka_topic, data=serialized_data, key=data.event.uuid)
+ future.get()
+ except Exception as e:
+ logger.exception("Failed to produce internal event", data=serialized_data, error=e)
+ raise
diff --git a/posthog/cdp/templates/__init__.py b/posthog/cdp/templates/__init__.py
index fd2f988a8df72..6d4a24e6d1a27 100644
--- a/posthog/cdp/templates/__init__.py
+++ b/posthog/cdp/templates/__init__.py
@@ -1,3 +1,4 @@
+from posthog.cdp.templates.hog_function_template import derive_sub_templates
from .webhook.template_webhook import template as webhook
from .slack.template_slack import template as slack
from .hubspot.template_hubspot import template_event as hubspot_event, template as hubspot, TemplateHubspotMigrator
@@ -51,6 +52,7 @@
from .snapchat_ads.template_pixel import template_snapchat_pixel as snapchat_pixel
from ._transformations.template_pass_through import template as pass_through_transformation
+
HOG_FUNCTION_TEMPLATES = [
_broadcast,
blank_site_destination,
@@ -107,7 +109,12 @@
]
+# This is a list of sub templates that are generated by merging the subtemplate with it's template
+HOG_FUNCTION_SUB_TEMPLATES = derive_sub_templates(HOG_FUNCTION_TEMPLATES)
+
HOG_FUNCTION_TEMPLATES_BY_ID = {template.id: template for template in HOG_FUNCTION_TEMPLATES}
+HOG_FUNCTION_SUB_TEMPLATES_BY_ID = {template.id: template for template in HOG_FUNCTION_SUB_TEMPLATES}
+ALL_HOG_FUNCTION_TEMPLATES_BY_ID = {**HOG_FUNCTION_TEMPLATES_BY_ID, **HOG_FUNCTION_SUB_TEMPLATES_BY_ID}
HOG_FUNCTION_MIGRATORS = {
TemplateCustomerioMigrator.plugin_url: TemplateCustomerioMigrator,
@@ -123,4 +130,4 @@
TemplateAvoMigrator.plugin_url: TemplateAvoMigrator,
}
-__all__ = ["HOG_FUNCTION_TEMPLATES", "HOG_FUNCTION_TEMPLATES_BY_ID"]
+__all__ = ["HOG_FUNCTION_TEMPLATES", "HOG_FUNCTION_TEMPLATES_BY_ID", "ALL_HOG_FUNCTION_TEMPLATES_BY_ID"]
diff --git a/posthog/cdp/templates/discord/template_discord.py b/posthog/cdp/templates/discord/template_discord.py
index fb8cb2bf50c64..9e3111ec88817 100644
--- a/posthog/cdp/templates/discord/template_discord.py
+++ b/posthog/cdp/templates/discord/template_discord.py
@@ -1,5 +1,16 @@
from posthog.cdp.templates.hog_function_template import HogFunctionTemplate, HogFunctionSubTemplate, SUB_TEMPLATE_COMMON
+COMMON_INPUTS_SCHEMA = [
+ {
+ "key": "webhookUrl",
+ "type": "string",
+ "label": "Webhook URL",
+ "description": "See this page on how to generate a Webhook URL: https://support.discord.com/hc/en-us/articles/228383668-Intro-to-Webhooks",
+ "secret": False,
+ "required": True,
+ },
+]
+
template: HogFunctionTemplate = HogFunctionTemplate(
status="free",
type="destination",
@@ -48,20 +59,37 @@
],
sub_templates=[
HogFunctionSubTemplate(
- id="early_access_feature_enrollment",
+ id="early-access-feature-enrollment",
name="Post to Discord on feature enrollment",
description="Posts a message to Discord when a user enrolls or un-enrolls in an early access feature",
- filters=SUB_TEMPLATE_COMMON["early_access_feature_enrollment"].filters,
- inputs={
- "content": "**{person.name}** {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'"
+ filters=SUB_TEMPLATE_COMMON["early-access-feature-enrollment"].filters,
+ input_schema_overrides={
+ "content": {
+ "default": "**{person.name}** {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
+ }
},
),
HogFunctionSubTemplate(
- id="survey_response",
+ id="survey-response",
name="Post to Discord on survey response",
description="Posts a message to Discord when a user responds to a survey",
- filters=SUB_TEMPLATE_COMMON["survey_response"].filters,
- inputs={"content": "**{person.name}** responded to survey **{event.properties.$survey_name}**"},
+ filters=SUB_TEMPLATE_COMMON["survey-response"].filters,
+ input_schema_overrides={
+ "content": {
+ "default": "**{person.name}** responded to survey **{event.properties.$survey_name}**",
+ }
+ },
+ ),
+ HogFunctionSubTemplate(
+ id="activity-log",
+ type="internal_destination",
+ name="Post to Discord on team activity",
+ filters=SUB_TEMPLATE_COMMON["activity-log"].filters,
+ input_schema_overrides={
+ "content": {
+ "default": "**{person.name}** {event.properties.activity} {event.properties.scope} {event.properties.item_id}",
+ }
+ },
),
],
)
diff --git a/posthog/cdp/templates/hog_function_template.py b/posthog/cdp/templates/hog_function_template.py
index 0ebfc1f1c37dc..f76deacc3d4e4 100644
--- a/posthog/cdp/templates/hog_function_template.py
+++ b/posthog/cdp/templates/hog_function_template.py
@@ -8,10 +8,25 @@
PluginConfig = None
-SubTemplateId = Literal["early_access_feature_enrollment", "survey_response"]
+SubTemplateId = Literal["early-access-feature-enrollment", "survey-response", "activity-log"]
SUB_TEMPLATE_ID: tuple[SubTemplateId, ...] = get_args(SubTemplateId)
+HogFunctionTemplateType = Literal[
+ "destination",
+ "internal_destination",
+ "site_destination",
+ "site_app",
+ "transformation",
+ "shared",
+ "email",
+ "sms",
+ "push",
+ "broadcast",
+ "activity",
+ "alert",
+]
+
@dataclasses.dataclass(frozen=True)
class HogFunctionSubTemplate:
@@ -20,7 +35,8 @@ class HogFunctionSubTemplate:
description: Optional[str] = None
filters: Optional[dict] = None
masking: Optional[dict] = None
- inputs: Optional[dict] = None
+ input_schema_overrides: Optional[dict[str, dict]] = None
+ type: Optional[HogFunctionTemplateType] = None
@dataclasses.dataclass(frozen=True)
@@ -42,19 +58,7 @@ class HogFunctionMappingTemplate:
@dataclasses.dataclass(frozen=True)
class HogFunctionTemplate:
status: Literal["alpha", "beta", "stable", "free", "client-side"]
- type: Literal[
- "destination",
- "site_destination",
- "site_app",
- "transformation",
- "shared",
- "email",
- "sms",
- "push",
- "broadcast",
- "activity",
- "alert",
- ]
+ type: HogFunctionTemplateType
id: str
name: str
description: str
@@ -78,9 +82,41 @@ def migrate(cls, obj: PluginConfig) -> dict:
raise NotImplementedError()
+def derive_sub_templates(templates: list[HogFunctionTemplate]) -> list[HogFunctionTemplate]:
+ sub_templates = []
+ for template in templates:
+ for sub_template in template.sub_templates or []:
+ merged_id = f"{template.id}-{sub_template.id}"
+ template_params = dataclasses.asdict(template)
+ sub_template_params = dataclasses.asdict(sub_template)
+
+ # Override inputs_schema if set
+ input_schema_overrides = sub_template_params.pop("input_schema_overrides")
+ if input_schema_overrides:
+ new_input_schema = []
+ for schema in template_params["inputs_schema"]:
+ if schema["key"] in input_schema_overrides:
+ schema.update(input_schema_overrides[schema["key"]])
+ new_input_schema.append(schema)
+ template_params["inputs_schema"] = new_input_schema
+
+ # Get rid of the sub_templates from the template
+ template_params.pop("sub_templates")
+ # Update with the sub template params if not none
+ for key, value in sub_template_params.items():
+ if value is not None:
+ template_params[key] = value
+
+ template_params["id"] = merged_id
+ merged_template = HogFunctionTemplate(**template_params)
+ sub_templates.append(merged_template)
+
+ return sub_templates
+
+
SUB_TEMPLATE_COMMON: dict[SubTemplateId, HogFunctionSubTemplate] = {
- "survey_response": HogFunctionSubTemplate(
- id="survey_response",
+ "survey-response": HogFunctionSubTemplate(
+ id="survey-response",
name="Survey Response",
filters={
"events": [
@@ -99,9 +135,15 @@ def migrate(cls, obj: PluginConfig) -> dict:
]
},
),
- "early_access_feature_enrollment": HogFunctionSubTemplate(
- id="early_access_feature_enrollment",
+ "early-access-feature-enrollment": HogFunctionSubTemplate(
+ id="early-access-feature-enrollment",
name="Early Access Feature Enrollment",
filters={"events": [{"id": "$feature_enrollment_update", "type": "events"}]},
),
+ "activity-log": HogFunctionSubTemplate(
+ id="activity-log",
+ name="Team Activity",
+ type="internal_destination",
+ filters={"events": [{"id": "$activity_log_entry_created", "type": "events"}]},
+ ),
}
diff --git a/posthog/cdp/templates/microsoft_teams/template_microsoft_teams.py b/posthog/cdp/templates/microsoft_teams/template_microsoft_teams.py
index e647dde19f411..a6eb7063a52e6 100644
--- a/posthog/cdp/templates/microsoft_teams/template_microsoft_teams.py
+++ b/posthog/cdp/templates/microsoft_teams/template_microsoft_teams.py
@@ -1,5 +1,6 @@
from posthog.cdp.templates.hog_function_template import HogFunctionTemplate, HogFunctionSubTemplate, SUB_TEMPLATE_COMMON
+
template: HogFunctionTemplate = HogFunctionTemplate(
status="free",
type="destination",
@@ -66,20 +67,37 @@
],
sub_templates=[
HogFunctionSubTemplate(
- id="early_access_feature_enrollment",
+ id="early-access-feature-enrollment",
name="Post to Microsoft Teams on feature enrollment",
description="Posts a message to Microsoft Teams when a user enrolls or un-enrolls in an early access feature",
- filters=SUB_TEMPLATE_COMMON["early_access_feature_enrollment"].filters,
- inputs={
- "text": "**{person.name}** {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'"
+ filters=SUB_TEMPLATE_COMMON["early-access-feature-enrollment"].filters,
+ input_schema_overrides={
+ "text": {
+ "default": "**{person.name}** {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
+ }
},
),
HogFunctionSubTemplate(
- id="survey_response",
+ id="survey-response",
name="Post to Microsoft Teams on survey response",
description="Posts a message to Microsoft Teams when a user responds to a survey",
- filters=SUB_TEMPLATE_COMMON["survey_response"].filters,
- inputs={"text": "**{person.name}** responded to survey **{event.properties.$survey_name}**"},
+ filters=SUB_TEMPLATE_COMMON["survey-response"].filters,
+ input_schema_overrides={
+ "text": {
+ "default": "**{person.name}** responded to survey **{event.properties.$survey_name}**",
+ }
+ },
+ ),
+ HogFunctionSubTemplate(
+ id="activity-log",
+ type="internal_destination",
+ name="Post to Microsoft Teams on team activity",
+ filters=SUB_TEMPLATE_COMMON["activity-log"].filters,
+ input_schema_overrides={
+ "text": {
+ "default": "**{person.name}** {event.properties.activity} {event.properties.scope} {event.properties.item_id}",
+ }
+ },
),
],
)
diff --git a/posthog/cdp/templates/slack/template_slack.py b/posthog/cdp/templates/slack/template_slack.py
index 8cfb5a84101de..3454c18381797 100644
--- a/posthog/cdp/templates/slack/template_slack.py
+++ b/posthog/cdp/templates/slack/template_slack.py
@@ -1,5 +1,6 @@
from posthog.cdp.templates.hog_function_template import HogFunctionTemplate, HogFunctionSubTemplate, SUB_TEMPLATE_COMMON
+
template: HogFunctionTemplate = HogFunctionTemplate(
status="free",
type="destination",
@@ -108,65 +109,95 @@
],
sub_templates=[
HogFunctionSubTemplate(
- id="early_access_feature_enrollment",
+ id="early-access-feature-enrollment",
name="Post to Slack on feature enrollment",
- description="Posts a message to Slack when a user enrolls or un-enrolls in an early access feature",
- filters=SUB_TEMPLATE_COMMON["early_access_feature_enrollment"].filters,
- inputs={
- "text": "*{person.name}* {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
- "blocks": [
- {
- "text": {
- "text": "*{person.name}* {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
- "type": "mrkdwn",
- },
- "type": "section",
- },
- {
- "type": "actions",
- "elements": [
- {
- "url": "{person.url}",
- "text": {"text": "View Person in PostHog", "type": "plain_text"},
- "type": "button",
+ # description="Posts a message to Slack when a user enrolls or un-enrolls in an early access feature",
+ filters=SUB_TEMPLATE_COMMON["early-access-feature-enrollment"].filters,
+ input_schema_overrides={
+ "blocks": {
+ "default": [
+ {
+ "text": {
+ "text": "*{person.name}* {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
+ "type": "mrkdwn",
},
- # NOTE: It would be nice to have a link to the EAF but the event needs more info
- ],
- },
- ],
+ "type": "section",
+ },
+ {
+ "type": "actions",
+ "elements": [
+ {
+ "url": "{person.url}",
+ "text": {"text": "View Person in PostHog", "type": "plain_text"},
+ "type": "button",
+ },
+ ],
+ },
+ ],
+ },
+ "text": {
+ "default": "*{person.name}* {event.properties.$feature_enrollment ? 'enrolled in' : 'un-enrolled from'} the early access feature for '{event.properties.$feature_flag}'",
+ },
},
),
HogFunctionSubTemplate(
- id="survey_response",
+ id="survey-response",
name="Post to Slack on survey response",
description="Posts a message to Slack when a user responds to a survey",
- filters=SUB_TEMPLATE_COMMON["survey_response"].filters,
- inputs={
- "text": "*{person.name}* responded to survey *{event.properties.$survey_name}*",
- "blocks": [
- {
- "text": {
- "text": "*{person.name}* responded to survey *{event.properties.$survey_name}*",
- "type": "mrkdwn",
- },
- "type": "section",
- },
- {
- "type": "actions",
- "elements": [
- {
- "url": "{project.url}/surveys/{event.properties.$survey_id}",
- "text": {"text": "View Survey", "type": "plain_text"},
- "type": "button",
+ filters=SUB_TEMPLATE_COMMON["survey-response"].filters,
+ input_schema_overrides={
+ "blocks": {
+ "default": [
+ {
+ "text": {
+ "text": "*{person.name}* responded to survey *{event.properties.$survey_name}*",
+ "type": "mrkdwn",
},
- {
- "url": "{person.url}",
- "text": {"text": "View Person", "type": "plain_text"},
- "type": "button",
+ "type": "section",
+ },
+ {
+ "type": "actions",
+ "elements": [
+ {
+ "url": "{project.url}/surveys/{event.properties.$survey_id}",
+ "text": {"text": "View Survey", "type": "plain_text"},
+ "type": "button",
+ },
+ {
+ "url": "{person.url}",
+ "text": {"text": "View Person", "type": "plain_text"},
+ "type": "button",
+ },
+ ],
+ },
+ ],
+ },
+ "text": {
+ "default": "*{person.name}* responded to survey *{event.properties.$survey_name}*",
+ },
+ },
+ ),
+ HogFunctionSubTemplate(
+ id="activity-log",
+ name="Post to Slack on team activity",
+ description="",
+ filters=SUB_TEMPLATE_COMMON["activity-log"].filters,
+ type="internal_destination",
+ input_schema_overrides={
+ "blocks": {
+ "default": [
+ {
+ "text": {
+ "text": "*{person.properties.email}* {event.properties.activity} {event.properties.scope} {event.properties.item_id} ",
+ "type": "mrkdwn",
},
- ],
- },
- ],
+ "type": "section",
+ }
+ ],
+ },
+ "text": {
+ "default": "*{person.properties.email}* {event.properties.activity} {event.properties.scope} {event.properties.item_id}",
+ },
},
),
],
diff --git a/posthog/cdp/templates/webhook/template_webhook.py b/posthog/cdp/templates/webhook/template_webhook.py
index 49e350736de51..45789df2b9fac 100644
--- a/posthog/cdp/templates/webhook/template_webhook.py
+++ b/posthog/cdp/templates/webhook/template_webhook.py
@@ -92,14 +92,20 @@
],
sub_templates=[
HogFunctionSubTemplate(
- id="early_access_feature_enrollment",
+ id="early-access-feature-enrollment",
name="HTTP Webhook on feature enrollment",
- filters=SUB_TEMPLATE_COMMON["early_access_feature_enrollment"].filters,
+ filters=SUB_TEMPLATE_COMMON["early-access-feature-enrollment"].filters,
),
HogFunctionSubTemplate(
- id="survey_response",
+ id="survey-response",
name="HTTP Webhook on survey response",
- filters=SUB_TEMPLATE_COMMON["survey_response"].filters,
+ filters=SUB_TEMPLATE_COMMON["survey-response"].filters,
+ ),
+ HogFunctionSubTemplate(
+ id="activity-log",
+ name="HTTP Webhook on team activity",
+ filters=SUB_TEMPLATE_COMMON["activity-log"].filters,
+ type="internal_destination",
),
],
)
diff --git a/posthog/kafka_client/topics.py b/posthog/kafka_client/topics.py
index fa58d40c5fa36..3ed04cfc78d38 100644
--- a/posthog/kafka_client/topics.py
+++ b/posthog/kafka_client/topics.py
@@ -35,3 +35,5 @@
KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS = f"{KAFKA_PREFIX}exception_symbolification_events{SUFFIX}"
KAFKA_ERROR_TRACKING_ISSUE_FINGERPRINT = f"{KAFKA_PREFIX}clickhouse_error_tracking_issue_fingerprint{SUFFIX}"
+
+KAFKA_CDP_INTERNAL_EVENTS = f"{KAFKA_PREFIX}cdp_internal_events{SUFFIX}"
diff --git a/posthog/models/activity_logging/activity_log.py b/posthog/models/activity_logging/activity_log.py
index 7cf9595e64983..567ea9d6d7f85 100644
--- a/posthog/models/activity_logging/activity_log.py
+++ b/posthog/models/activity_logging/activity_log.py
@@ -5,6 +5,9 @@
from typing import Any, Literal, Optional, Union
from uuid import UUID
+from django.db.models.signals import post_save
+from django.dispatch.dispatcher import receiver
+from sentry_sdk import capture_exception
import structlog
from django.core.paginator import Paginator
from django.core.exceptions import ObjectDoesNotExist
@@ -498,3 +501,37 @@ def load_all_activity(scope_list: list[ActivityScope], team_id: int, limit: int
)
return get_activity_page(activity_query, limit, page)
+
+
+@receiver(post_save, sender=ActivityLog)
+def activity_log_created(sender, instance: "ActivityLog", created, **kwargs):
+ from posthog.cdp.internal_events import InternalEventEvent, InternalEventPerson, produce_internal_event
+ from posthog.api.activity_log import ActivityLogSerializer
+ from posthog.api.shared import UserBasicSerializer
+
+ try:
+ serialized_data = ActivityLogSerializer(instance).data
+ # TODO: Move this into the producer to support dataclasses
+ serialized_data["detail"] = dataclasses.asdict(serialized_data["detail"])
+ user_data = UserBasicSerializer(instance.user).data if instance.user else None
+
+ if created and instance.team_id is not None:
+ produce_internal_event(
+ team_id=instance.team_id,
+ event=InternalEventEvent(
+ event="$activity_log_entry_created",
+ distinct_id=user_data["distinct_id"] if user_data else f"team_{instance.team_id}",
+ properties=serialized_data,
+ ),
+ person=InternalEventPerson(
+ id=user_data["id"],
+ properties=user_data,
+ )
+ if user_data
+ else None,
+ )
+ except Exception as e:
+ # We don't want to hard fail here.
+ logger.exception("Failed to produce internal event", data=serialized_data, error=e)
+ capture_exception(e)
+ return
diff --git a/posthog/models/hog_functions/hog_function.py b/posthog/models/hog_functions/hog_function.py
index 8328973bc0a2e..a715f10b86b7b 100644
--- a/posthog/models/hog_functions/hog_function.py
+++ b/posthog/models/hog_functions/hog_function.py
@@ -36,6 +36,7 @@ class HogFunctionState(enum.Enum):
class HogFunctionType(models.TextChoices):
DESTINATION = "destination"
SITE_DESTINATION = "site_destination"
+ INTERNAL_DESTINATION = "internal_destination"
SITE_APP = "site_app"
TRANSFORMATION = "transformation"
EMAIL = "email"
@@ -46,8 +47,13 @@ class HogFunctionType(models.TextChoices):
BROADCAST = "broadcast"
-TYPES_THAT_RELOAD_PLUGIN_SERVER = (HogFunctionType.DESTINATION, HogFunctionType.EMAIL, HogFunctionType.TRANSFORMATION)
-TYPES_WITH_COMPILED_FILTERS = (HogFunctionType.DESTINATION,)
+TYPES_THAT_RELOAD_PLUGIN_SERVER = (
+ HogFunctionType.DESTINATION,
+ HogFunctionType.EMAIL,
+ HogFunctionType.TRANSFORMATION,
+ HogFunctionType.INTERNAL_DESTINATION,
+)
+TYPES_WITH_COMPILED_FILTERS = (HogFunctionType.DESTINATION, HogFunctionType.INTERNAL_DESTINATION)
TYPES_WITH_TRANSPILED_FILTERS = (HogFunctionType.SITE_DESTINATION, HogFunctionType.SITE_APP)
TYPES_WITH_JAVASCRIPT_SOURCE = (HogFunctionType.SITE_DESTINATION, HogFunctionType.SITE_APP)
@@ -88,9 +94,9 @@ class Meta:
@property
def template(self) -> Optional[HogFunctionTemplate]:
- from posthog.cdp.templates import HOG_FUNCTION_TEMPLATES_BY_ID
+ from posthog.cdp.templates import ALL_HOG_FUNCTION_TEMPLATES_BY_ID
- return HOG_FUNCTION_TEMPLATES_BY_ID.get(self.template_id, None)
+ return ALL_HOG_FUNCTION_TEMPLATES_BY_ID.get(self.template_id, None)
@property
def filter_action_ids(self) -> list[int]: