Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unique connection counting #9074

Merged
merged 14 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
"hash-sum": "^2.0.0",
"helmet": "^6.0.0",
"http-errors": "^2.0.0",
"hyperloglog-lite": "^1.0.2",
"ip-address": "^10.0.1",
"joi": "^17.13.3",
"js-sha256": "^0.11.0",
Expand Down
2 changes: 2 additions & 0 deletions src/lib/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import { OnboardingStore } from '../features/onboarding/onboarding-store';
import { createOnboardingReadModel } from '../features/onboarding/createOnboardingReadModel';
import { UserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store';
import { UserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model';
import { UniqueConnectionStore } from '../features/unique-connection/unique-connection-store';

export const createStores = (
config: IUnleashConfig,
Expand Down Expand Up @@ -185,6 +186,7 @@ export const createStores = (
),
userUnsubscribeStore: new UserUnsubscribeStore(db),
userSubscriptionsReadModel: new UserSubscriptionsReadModel(db),
uniqueConnectionStore: new UniqueConnectionStore(db),
};
};

Expand Down
7 changes: 7 additions & 0 deletions src/lib/features/scheduler/schedule-services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export const scheduleServices = async (
frontendApiService,
clientMetricsServiceV2,
integrationEventsService,
uniqueConnectionService,
} = services;

schedulerService.schedule(
Expand Down Expand Up @@ -179,4 +180,10 @@ export const scheduleServices = async (
minutesToMilliseconds(15),
'cleanUpIntegrationEvents',
);

schedulerService.schedule(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync in-memory HyperLogLogs with DB

uniqueConnectionService.sync.bind(uniqueConnectionService),
minutesToMilliseconds(10),
'uniqueConnectionService',
);
};
27 changes: 27 additions & 0 deletions src/lib/features/unique-connection/fake-unique-connection-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import type { IUniqueConnectionStore } from '../../types';
import type {
TimedUniqueConnections,
UniqueConnections,
} from './unique-connection-store-type';

export class FakeUniqueConnectionStore implements IUniqueConnectionStore {
private uniqueConnectionsRecord: Record<string, TimedUniqueConnections> =
{};

async insert(uniqueConnections: UniqueConnections): Promise<void> {
this.uniqueConnectionsRecord[uniqueConnections.id] = {
...uniqueConnections,
updatedAt: new Date(),
};
}

async get(
id: 'current' | 'previous',
): Promise<(UniqueConnections & { updatedAt: Date }) | null> {
return this.uniqueConnectionsRecord[id] || null;
}

async deleteAll(): Promise<void> {
this.uniqueConnectionsRecord = {};
}
}
169 changes: 169 additions & 0 deletions src/lib/features/unique-connection/unique-connection-service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { UniqueConnectionService } from './unique-connection-service';
import { FakeUniqueConnectionStore } from './fake-unique-connection-store';
import getLogger from '../../../test/fixtures/no-logger';
import type { IFlagResolver } from '../../types';
import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events';
import { addHours } from 'date-fns';
import EventEmitter from 'events';

const alwaysOnFlagResolver = {
isEnabled() {
return true;
},
} as unknown as IFlagResolver;

test('sync first current bucket', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');

await uniqueConnectionService.sync();

const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 0, current: 2 });
});

test('sync first previous bucket', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');

await uniqueConnectionService.sync();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3');

await uniqueConnectionService.sync(addHours(new Date(), 1));

const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 3, current: 0 });
});

test('sync to existing current bucket from the same service', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();

uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection2');

await uniqueConnectionService.sync();

uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection3');

const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 0, current: 3 });
});

test('sync to existing current bucket from another service', async () => {
const eventBus = new EventEmitter();
const config = {
flagResolver: alwaysOnFlagResolver,
getLogger,
eventBus: eventBus,
};
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService1 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
const uniqueConnectionService2 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);

uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
await uniqueConnectionService1.sync();

uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
await uniqueConnectionService2.sync();

const stats1 = await uniqueConnectionService1.getStats();
kwasniew marked this conversation as resolved.
Show resolved Hide resolved
expect(stats1).toEqual({ previous: 0, current: 3 });
const stats2 = await uniqueConnectionService2.getStats();
expect(stats2).toEqual({ previous: 0, current: 3 });
});

test('sync to existing previous bucket from another service', async () => {
const eventBus = new EventEmitter();
const config = {
flagResolver: alwaysOnFlagResolver,
getLogger,
eventBus: eventBus,
};
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService1 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
const uniqueConnectionService2 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);

uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
await uniqueConnectionService1.sync(addHours(new Date(), 1));

uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
await uniqueConnectionService2.sync(addHours(new Date(), 1));

const stats1 = await uniqueConnectionService1.getStats();
expect(stats1).toEqual({ previous: 3, current: 0 });
const stats2 = await uniqueConnectionService2.getStats();
expect(stats2).toEqual({ previous: 3, current: 0 });
});

test('populate previous and current', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);

uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection2');
await uniqueConnectionService.sync();
await uniqueConnectionService.sync();

uniqueConnectionService.count('connection3');
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call

uniqueConnectionService.count('connection3');
uniqueConnectionService.count('connection4');
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call

const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 3, current: 2 });
});
99 changes: 99 additions & 0 deletions src/lib/features/unique-connection/unique-connection-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import type { IUnleashConfig } from '../../types/option';
import type { IFlagResolver, IUnleashStores } from '../../types';
import type { Logger } from '../../logger';
import type { IUniqueConnectionStore } from './unique-connection-store-type';
import HyperLogLog from 'hyperloglog-lite';
import type EventEmitter from 'events';
import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events';

// HyperLogLog will create 2^n registers
const n = 12;
kwasniew marked this conversation as resolved.
Show resolved Hide resolved

export class UniqueConnectionService {
private logger: Logger;

private uniqueConnectionStore: IUniqueConnectionStore;

private flagResolver: IFlagResolver;

private eventBus: EventEmitter;

private activeHour: number;

private hll = HyperLogLog(n);

constructor(
{
uniqueConnectionStore,
}: Pick<IUnleashStores, 'uniqueConnectionStore'>,
config: Pick<IUnleashConfig, 'getLogger' | 'flagResolver' | 'eventBus'>,
) {
this.uniqueConnectionStore = uniqueConnectionStore;
this.logger = config.getLogger('services/unique-connection-service.ts');
this.flagResolver = config.flagResolver;
this.eventBus = config.eventBus;
this.activeHour = new Date().getHours();
}

listen() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this service is interested in new connection ids received

this.eventBus.on(SDK_CONNECTION_ID_RECEIVED, this.count.bind(this));
}

count(connectionId: string) {
if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;
this.hll.add(HyperLogLog.hash(connectionId));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this data structure tracks unique connections

}

async getStats() {
kwasniew marked this conversation as resolved.
Show resolved Hide resolved
const [previous, current] = await Promise.all([
this.uniqueConnectionStore.get('previous'),
this.uniqueConnectionStore.get('current'),
]);
const previousHll = HyperLogLog(n);
if (previous) {
previousHll.merge({ n, buckets: previous.hll });
}
const currentHll = HyperLogLog(n);
if (current) {
currentHll.merge({ n, buckets: current.hll });
}
return { previous: previousHll.count(), current: currentHll.count() };
}

async sync(currentTime = new Date()): Promise<void> {
if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;

const currentHour = currentTime.getHours();
const currentBucket = await this.uniqueConnectionStore.get('current');

if (this.activeHour !== currentHour && currentBucket) {
if (currentBucket.updatedAt.getHours() < currentHour) {
this.hll.merge({ n, buckets: currentBucket.hll });
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
});
} else {
const previousBucket =
await this.uniqueConnectionStore.get('previous');
if (previousBucket) {
this.hll.merge({ n, buckets: previousBucket.hll });
}
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
});
}

this.activeHour = currentHour;
this.hll = HyperLogLog(n);
} else if (currentBucket) {
this.hll.merge({ n, buckets: currentBucket.hll });
}

await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'current',
});
}
}
14 changes: 14 additions & 0 deletions src/lib/features/unique-connection/unique-connection-store-type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export type UniqueConnections = {
hll: Buffer;
kwasniew marked this conversation as resolved.
Show resolved Hide resolved
id: 'current' | 'previous';
};

export type TimedUniqueConnections = UniqueConnections & {
updatedAt: Date;
};

export interface IUniqueConnectionStore {
insert(uniqueConnections: UniqueConnections): Promise<void>;
get(id: 'current' | 'previous'): Promise<TimedUniqueConnections | null>;
deleteAll(): Promise<void>;
}
Loading
Loading