Skip to content

Commit

Permalink
feat: Added distinct_id to kafka headers (#27666)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Jan 23, 2025
1 parent 9ccf678 commit 9e8bfb7
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 10 deletions.
7 changes: 5 additions & 2 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,8 @@ def capture_internal(
if extra_headers is None:
extra_headers = []

headers = [("token", token), ("distinct_id", distinct_id), *extra_headers]

parsed_event = build_kafka_event_data(
distinct_id=distinct_id,
ip=ip,
Expand All @@ -871,7 +873,6 @@ def capture_internal(

if event["event"] in SESSION_RECORDING_EVENT_NAMES:
session_id = event["properties"]["$session_id"]
headers = [("token", token), *extra_headers]

overflowing = False
if token in settings.REPLAY_OVERFLOW_FORCED_TOKENS:
Expand Down Expand Up @@ -900,7 +901,9 @@ def capture_internal(
else:
kafka_partition_key = candidate_partition_key

return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, historical=historical)
return log_event(
parsed_event, event["event"], partition_key=kafka_partition_key, historical=historical, headers=headers
)


def is_randomly_partitioned(candidate_partition_key: str) -> bool:
Expand Down
13 changes: 10 additions & 3 deletions posthog/api/test/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,10 @@ def _do_test_capture_with_likely_anonymous_ids(self, kafka_produce, expect_rando
topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC,
data=ANY,
key=None if expect_random_partitioning else ANY,
headers=None,
headers=[
("token", self.team.api_token),
("distinct_id", distinct_id),
],
)

if not expect_random_partitioning:
Expand Down Expand Up @@ -1916,10 +1919,11 @@ def test_recording_ingestion_can_write_headers_with_the_message(self, kafka_prod
with self.settings(
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480,
):
self._send_august_2023_version_session_recording_event()
self._send_august_2023_version_session_recording_event(distinct_id="distinct_id123")

assert kafka_produce.mock_calls[0].kwargs["headers"] == [
("token", "token123"),
("distinct_id", "distinct_id123"),
(
# without setting a version in the URL the default is unknown
"lib_version",
Expand All @@ -1932,10 +1936,13 @@ def test_recording_ingestion_can_read_version_from_request(self, kafka_produce:
with self.settings(
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480,
):
self._send_august_2023_version_session_recording_event(query_params="ver=1.123.4")
self._send_august_2023_version_session_recording_event(
query_params="ver=1.123.4", distinct_id="distinct_id123"
)

assert kafka_produce.mock_calls[0].kwargs["headers"] == [
("token", "token123"),
("distinct_id", "distinct_id123"),
(
# without setting a version in the URL the default is unknown
"lib_version",
Expand Down
19 changes: 14 additions & 5 deletions rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ impl KafkaSink {
let data_type = metadata.data_type;
let event_key = event.key();
let session_id = metadata.session_id.clone();
let distinct_id = event.distinct_id.clone();

drop(event); // Events can be EXTREMELY memory hungry

Expand Down Expand Up @@ -255,10 +256,17 @@ impl KafkaSink {
partition: None,
key: partition_key,
timestamp: None,
headers: Some(OwnedHeaders::new().insert(Header {
key: "token",
value: Some(&token),
})),
headers: Some(
OwnedHeaders::new()
.insert(Header {
key: "token",
value: Some(&token),
})
.insert(Header {
key: "distinct_id",
value: Some(&distinct_id),
}),
),
}) {
Ok(ack) => Ok(ack),
Err((e, _)) => match e.rdkafka_error_code() {
Expand Down Expand Up @@ -413,9 +421,10 @@ mod tests {
// We test different cases in a single test to amortize the startup cost of the producer.

let (cluster, sink) = start_on_mocked_sink(Some(3000000)).await;
let distinct_id = "test_distinct_id_123".to_string();
let event: CapturedEvent = CapturedEvent {
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
distinct_id: distinct_id.clone(),
ip: "".to_string(),
data: "".to_string(),
now: "".to_string(),
Expand Down

0 comments on commit 9e8bfb7

Please sign in to comment.