Skip to content

Commit

Permalink
feat: unique connection counting (#9074)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwasniew authored Jan 13, 2025
1 parent af1b6c8 commit e559718
Show file tree
Hide file tree
Showing 18 changed files with 456 additions and 1 deletion.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
"hash-sum": "^2.0.0",
"helmet": "^6.0.0",
"http-errors": "^2.0.0",
"hyperloglog-lite": "^1.0.2",
"ip-address": "^10.0.1",
"joi": "^17.13.3",
"js-sha256": "^0.11.0",
Expand Down
2 changes: 2 additions & 0 deletions src/lib/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import { OnboardingStore } from '../features/onboarding/onboarding-store';
import { createOnboardingReadModel } from '../features/onboarding/createOnboardingReadModel';
import { UserUnsubscribeStore } from '../features/user-subscriptions/user-unsubscribe-store';
import { UserSubscriptionsReadModel } from '../features/user-subscriptions/user-subscriptions-read-model';
import { UniqueConnectionStore } from '../features/unique-connection/unique-connection-store';

export const createStores = (
config: IUnleashConfig,
Expand Down Expand Up @@ -185,6 +186,7 @@ export const createStores = (
),
userUnsubscribeStore: new UserUnsubscribeStore(db),
userSubscriptionsReadModel: new UserSubscriptionsReadModel(db),
uniqueConnectionStore: new UniqueConnectionStore(db),
};
};

Expand Down
7 changes: 7 additions & 0 deletions src/lib/features/scheduler/schedule-services.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export const scheduleServices = async (
frontendApiService,
clientMetricsServiceV2,
integrationEventsService,
uniqueConnectionService,
} = services;

schedulerService.schedule(
Expand Down Expand Up @@ -179,4 +180,10 @@ export const scheduleServices = async (
minutesToMilliseconds(15),
'cleanUpIntegrationEvents',
);

schedulerService.schedule(
uniqueConnectionService.sync.bind(uniqueConnectionService),
minutesToMilliseconds(10),
'uniqueConnectionService',
);
};
27 changes: 27 additions & 0 deletions src/lib/features/unique-connection/fake-unique-connection-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import type { IUniqueConnectionStore } from '../../types';
import type {
TimedUniqueConnections,
UniqueConnections,
} from './unique-connection-store-type';

export class FakeUniqueConnectionStore implements IUniqueConnectionStore {
private uniqueConnectionsRecord: Record<string, TimedUniqueConnections> =
{};

async insert(uniqueConnections: UniqueConnections): Promise<void> {
this.uniqueConnectionsRecord[uniqueConnections.id] = {
...uniqueConnections,
updatedAt: new Date(),
};
}

async get(
id: 'current' | 'previous',
): Promise<(UniqueConnections & { updatedAt: Date }) | null> {
return this.uniqueConnectionsRecord[id] || null;
}

async deleteAll(): Promise<void> {
this.uniqueConnectionsRecord = {};
}
}
169 changes: 169 additions & 0 deletions src/lib/features/unique-connection/unique-connection-service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { UniqueConnectionService } from './unique-connection-service';
import { FakeUniqueConnectionStore } from './fake-unique-connection-store';
import getLogger from '../../../test/fixtures/no-logger';
import type { IFlagResolver } from '../../types';
import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events';
import { addHours } from 'date-fns';
import EventEmitter from 'events';

const alwaysOnFlagResolver = {
isEnabled() {
return true;
},
} as unknown as IFlagResolver;

test('sync first current bucket', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');

await uniqueConnectionService.sync();

const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 0, current: 2 });
});

test('sync first previous bucket', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');

await uniqueConnectionService.sync();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3');

await uniqueConnectionService.sync(addHours(new Date(), 1));

const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 3, current: 0 });
});

test('sync to existing current bucket from the same service', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
uniqueConnectionService.listen();

uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection2');

await uniqueConnectionService.sync();

uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection3');

const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 0, current: 3 });
});

test('sync to existing current bucket from another service', async () => {
const eventBus = new EventEmitter();
const config = {
flagResolver: alwaysOnFlagResolver,
getLogger,
eventBus: eventBus,
};
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService1 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
const uniqueConnectionService2 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);

uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
await uniqueConnectionService1.sync();

uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
await uniqueConnectionService2.sync();

const stats1 = await uniqueConnectionService1.getStats();
expect(stats1).toEqual({ previous: 0, current: 3 });
const stats2 = await uniqueConnectionService2.getStats();
expect(stats2).toEqual({ previous: 0, current: 3 });
});

test('sync to existing previous bucket from another service', async () => {
const eventBus = new EventEmitter();
const config = {
flagResolver: alwaysOnFlagResolver,
getLogger,
eventBus: eventBus,
};
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService1 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
const uniqueConnectionService2 = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);

uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
await uniqueConnectionService1.sync(addHours(new Date(), 1));

uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
await uniqueConnectionService2.sync(addHours(new Date(), 1));

const stats1 = await uniqueConnectionService1.getStats();
expect(stats1).toEqual({ previous: 3, current: 0 });
const stats2 = await uniqueConnectionService2.getStats();
expect(stats2).toEqual({ previous: 3, current: 0 });
});

test('populate previous and current', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);

uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection2');
await uniqueConnectionService.sync();
await uniqueConnectionService.sync();

uniqueConnectionService.count('connection3');
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call

uniqueConnectionService.count('connection3');
uniqueConnectionService.count('connection4');
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call

const stats = await uniqueConnectionService.getStats();
expect(stats).toEqual({ previous: 3, current: 2 });
});
99 changes: 99 additions & 0 deletions src/lib/features/unique-connection/unique-connection-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import type { IUnleashConfig } from '../../types/option';
import type { IFlagResolver, IUnleashStores } from '../../types';
import type { Logger } from '../../logger';
import type { IUniqueConnectionStore } from './unique-connection-store-type';
import HyperLogLog from 'hyperloglog-lite';
import type EventEmitter from 'events';
import { SDK_CONNECTION_ID_RECEIVED } from '../../metric-events';

// HyperLogLog will create 2^n registers
const n = 12;

export class UniqueConnectionService {
private logger: Logger;

private uniqueConnectionStore: IUniqueConnectionStore;

private flagResolver: IFlagResolver;

private eventBus: EventEmitter;

private activeHour: number;

private hll = HyperLogLog(n);

constructor(
{
uniqueConnectionStore,
}: Pick<IUnleashStores, 'uniqueConnectionStore'>,
config: Pick<IUnleashConfig, 'getLogger' | 'flagResolver' | 'eventBus'>,
) {
this.uniqueConnectionStore = uniqueConnectionStore;
this.logger = config.getLogger('services/unique-connection-service.ts');
this.flagResolver = config.flagResolver;
this.eventBus = config.eventBus;
this.activeHour = new Date().getHours();
}

listen() {
this.eventBus.on(SDK_CONNECTION_ID_RECEIVED, this.count.bind(this));
}

count(connectionId: string) {
if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;
this.hll.add(HyperLogLog.hash(connectionId));
}

async getStats() {
const [previous, current] = await Promise.all([
this.uniqueConnectionStore.get('previous'),
this.uniqueConnectionStore.get('current'),
]);
const previousHll = HyperLogLog(n);
if (previous) {
previousHll.merge({ n, buckets: previous.hll });
}
const currentHll = HyperLogLog(n);
if (current) {
currentHll.merge({ n, buckets: current.hll });
}
return { previous: previousHll.count(), current: currentHll.count() };
}

async sync(currentTime = new Date()): Promise<void> {
if (!this.flagResolver.isEnabled('uniqueSdkTracking')) return;

const currentHour = currentTime.getHours();
const currentBucket = await this.uniqueConnectionStore.get('current');

if (this.activeHour !== currentHour && currentBucket) {
if (currentBucket.updatedAt.getHours() < currentHour) {
this.hll.merge({ n, buckets: currentBucket.hll });
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
});
} else {
const previousBucket =
await this.uniqueConnectionStore.get('previous');
if (previousBucket) {
this.hll.merge({ n, buckets: previousBucket.hll });
}
await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'previous',
});
}

this.activeHour = currentHour;
this.hll = HyperLogLog(n);
} else if (currentBucket) {
this.hll.merge({ n, buckets: currentBucket.hll });
}

await this.uniqueConnectionStore.insert({
hll: this.hll.output().buckets,
id: 'current',
});
}
}
14 changes: 14 additions & 0 deletions src/lib/features/unique-connection/unique-connection-store-type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export type UniqueConnections = {
hll: Buffer;
id: 'current' | 'previous';
};

export type TimedUniqueConnections = UniqueConnections & {
updatedAt: Date;
};

export interface IUniqueConnectionStore {
insert(uniqueConnections: UniqueConnections): Promise<void>;
get(id: 'current' | 'previous'): Promise<TimedUniqueConnections | null>;
deleteAll(): Promise<void>;
}
Loading

0 comments on commit e559718

Please sign in to comment.