Skip to content

Commit

Permalink
fix: startup flush policy should send restored events (#943)
Browse files Browse the repository at this point in the history
Problem reported that StartupFlushPolicy wouldn't send the pending upload events from the previous app launch.

Caused by concurrency: FlushPolicies get initialized before and without awaiting for storage restoration. Futhermore the Queue for upload is managed by the SegmentDestination plugin so there's no explicit way to await for that queue to be restored from the base client itself.

Solution:

Move the flush policy initialization later in the client: after plugins are initialized but before replaying buffered events
Refactor StartupFlushPolicy to mark shouldFlush outside of start (this is to be able to handle it with the manualFlush)
Refactor QueueFlushingPlugin and SegmentDestination to await for queue restoration and settings load when a flush is triggered. This guarantees that if the flush is triggered before the plugins are fully initialized (previous session events loaded) it will await for those operations to complete before uploading any events.
  • Loading branch information
oscb authored Apr 24, 2024
1 parent 7160d3d commit acc8a93
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 27 deletions.
15 changes: 10 additions & 5 deletions packages/core/src/__tests__/analytics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ describe('SegmentClient', () => {
});

describe('Flush Policies', () => {
it('creates the default flush policies when config is empty', () => {
it('creates the default flush policies when config is empty', async () => {
client = new SegmentClient({
...clientArgs,
config: {
Expand All @@ -181,11 +181,12 @@ describe('SegmentClient', () => {
flushInterval: undefined,
},
});
await client.init();
const flushPolicies = client.getFlushPolicies();
expect(flushPolicies.length).toBe(2);
});

it('setting flush policies is mutually exclusive with flushAt/Interval', () => {
it('setting flush policies is mutually exclusive with flushAt/Interval', async () => {
client = new SegmentClient({
...clientArgs,
config: {
Expand All @@ -195,11 +196,12 @@ describe('SegmentClient', () => {
flushPolicies: [new CountFlushPolicy(1)],
},
});
await client.init();
const flushPolicies = client.getFlushPolicies();
expect(flushPolicies.length).toBe(1);
});

it('setting flushAt/Interval to 0 should make the client have no uploads', () => {
it('setting flushAt/Interval to 0 should make the client have no uploads', async () => {
client = new SegmentClient({
...clientArgs,
config: {
Expand All @@ -208,11 +210,12 @@ describe('SegmentClient', () => {
flushInterval: 0,
},
});
await client.init();
const flushPolicies = client.getFlushPolicies();
expect(flushPolicies.length).toBe(0);
});

it('setting an empty array of policies should make the client have no uploads', () => {
it('setting an empty array of policies should make the client have no uploads', async () => {
client = new SegmentClient({
...clientArgs,
config: {
Expand All @@ -222,11 +225,12 @@ describe('SegmentClient', () => {
flushPolicies: [],
},
});
await client.init();
const flushPolicies = client.getFlushPolicies();
expect(flushPolicies.length).toBe(0);
});

it('can add and remove policies, does not mutate original array', () => {
it('can add and remove policies, does not mutate original array', async () => {
const policies = [new CountFlushPolicy(1), new TimerFlushPolicy(200)];
client = new SegmentClient({
...clientArgs,
Expand All @@ -237,6 +241,7 @@ describe('SegmentClient', () => {
flushPolicies: policies,
},
});
await client.init();
expect(client.getFlushPolicies().length).toBe(policies.length);

client.removeFlushPolicy(...policies);
Expand Down
26 changes: 15 additions & 11 deletions packages/core/src/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ export class SegmentClient {

private pluginsToAdd: Plugin[] = [];

private flushPolicyExecuter!: FlushPolicyExecuter;
private flushPolicyExecuter: FlushPolicyExecuter = new FlushPolicyExecuter(
[],
() => {
void this.flush();
}
);

private onPluginAddedObservers: OnPluginAddedCallback[] = [];

Expand Down Expand Up @@ -233,9 +238,6 @@ export class SegmentClient {
// Setup platform specific plugins
this.platformPlugins.forEach((plugin) => this.add({ plugin: plugin }));

// Start flush policies
this.setupFlushPolicies();

// set up tracking for lifecycle events
this.setupLifecycleEvents();
}
Expand Down Expand Up @@ -278,9 +280,6 @@ export class SegmentClient {

await this.onReady();
this.isReady.value = true;

// flush any stored events
this.flushPolicyExecuter.manualFlush();
} catch (error) {
this.reportInternalError(
new SegmentError(
Expand Down Expand Up @@ -494,13 +493,18 @@ export class SegmentClient {
}
}

// Start flush policies
// This should be done before any pending events are added to the queue so that any policies that rely on events queued can trigger accordingly
this.setupFlushPolicies();

// Send all events in the queue
const pending = await this.store.pendingEvents.get(true);
for (const e of pending) {
await this.startTimelineProcessing(e);
await this.store.pendingEvents.remove(e);
}
// this.store.pendingEvents.set([]);

this.flushPolicyExecuter.manualFlush();
}

async flush(): Promise<void> {
Expand Down Expand Up @@ -756,9 +760,9 @@ export class SegmentClient {
}
}

this.flushPolicyExecuter = new FlushPolicyExecuter(flushPolicies, () => {
void this.flush();
});
for (const fp of flushPolicies) {
this.flushPolicyExecuter.add(fp);
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/flushPolicies/flush-policy-executer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ export class FlushPolicyExecuter {
}

private startPolicy(policy: FlushPolicy) {
policy.start();
const unsubscribe = policy.shouldFlush.onChange((shouldFlush) => {
if (shouldFlush) {
this.onFlush();
}
});
this.observers.push(unsubscribe);
policy.start();
}
}
7 changes: 6 additions & 1 deletion packages/core/src/flushPolicies/startup-flush-policy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ import { FlushPolicyBase } from './types';
* StatupFlushPolicy triggers a flush right away on client startup
*/
export class StartupFlushPolicy extends FlushPolicyBase {
start() {
constructor() {
super();
this.shouldFlush.value = true;
}

start(): void {
// Nothing to do
}

onEvent(_event: SegmentEvent): void {
// Nothing to do
}
Expand Down
28 changes: 27 additions & 1 deletion packages/core/src/plugins/QueueFlushingPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import type { SegmentClient } from '../analytics';
import { defaultConfig } from '../constants';
import { UtilityPlugin } from '../plugin';
import { PluginType, SegmentEvent } from '../types';
import { createPromise } from '../util';
import { ErrorType, SegmentError } from '../errors';

/**
* This plugin manages a queue where all events get added to after timeline processing.
Expand All @@ -17,17 +19,25 @@ export class QueueFlushingPlugin extends UtilityPlugin {
private isPendingUpload = false;
private queueStore: Store<{ events: SegmentEvent[] }> | undefined;
private onFlush: (events: SegmentEvent[]) => Promise<void>;
private isRestoredResolve: () => void;
private isRestored: Promise<void>;

/**
* @param onFlush callback to execute when the queue is flushed (either by reaching the limit or manually) e.g. code to upload events to your destination
* @param storeKey key to store the queue in the store. Must be unique per destination instance
* @param restoreTimeout time in ms to wait for the queue to be restored from the store before uploading events (default: 500ms)
*/
constructor(
onFlush: (events: SegmentEvent[]) => Promise<void>,
storeKey = 'events'
storeKey = 'events',
restoreTimeout = 500
) {
super();
this.onFlush = onFlush;
this.storeKey = storeKey;
const { promise, resolve } = createPromise<void>(restoreTimeout);
this.isRestored = promise;
this.isRestoredResolve = resolve;
}

configure(analytics: SegmentClient): void {
Expand All @@ -43,6 +53,9 @@ export class QueueFlushingPlugin extends UtilityPlugin {
storeId: `${config.writeKey}-${this.storeKey}`,
persistor: config.storePersistor,
saveDelay: config.storePersistorSaveDelay ?? 0,
onInitialized: () => {
this.isRestoredResolve();
},
},
}
);
Expand All @@ -60,6 +73,19 @@ export class QueueFlushingPlugin extends UtilityPlugin {
* Calls the onFlush callback with the events in the queue
*/
async flush() {
// Wait for the queue to be restored
try {
await this.isRestored;
} catch (e) {
// If the queue is not restored before the timeout, we will notify but not block flushing events
this.analytics?.reportInternalError(
new SegmentError(
ErrorType.InitializationError,
'Queue timeout before completed restoration',
e
)
);
}
const events = (await this.queueStore?.getState(true))?.events ?? [];
if (!this.isPendingUpload) {
try {
Expand Down
30 changes: 22 additions & 8 deletions packages/core/src/plugins/SegmentDestination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
SegmentEvent,
UpdateType,
} from '../types';
import { chunk } from '../util';
import { chunk, createPromise } from '../util';
import { uploadEvents } from '../api';
import type { SegmentClient } from '../analytics';
import { DestinationMetadataEnrichment } from './DestinationMetadataEnrichment';
Expand All @@ -23,18 +23,25 @@ export class SegmentDestination extends DestinationPlugin {
type = PluginType.destination;
key = SEGMENT_DESTINATION_KEY;
private apiHost?: string;
private isReady = false;
private settingsResolve: () => void;
private settingsPromise: Promise<void>;

constructor() {
super();
// We don't timeout this promise. We strictly need the response from Segment before sending things
const { promise, resolve } = createPromise<void>();
this.settingsPromise = promise;
this.settingsResolve = resolve;
}

private sendEvents = async (events: SegmentEvent[]): Promise<void> => {
if (!this.isReady) {
// We're not sending events until Segment has loaded all settings
return Promise.resolve();
}

if (events.length === 0) {
return Promise.resolve();
}

// We're not sending events until Segment has loaded all settings
await this.settingsPromise;

const config = this.analytics?.getConfig() ?? defaultConfig;

const chunkedEvents: SegmentEvent[][] = chunk(
Expand Down Expand Up @@ -89,6 +96,12 @@ export class SegmentDestination extends DestinationPlugin {
configure(analytics: SegmentClient): void {
super.configure(analytics);

// If the client has a proxy we don't need to await for settings apiHost, we can send events directly
// Important! If new settings are required in the future you probably want to change this!
if (analytics.getConfig().proxy !== undefined) {
this.settingsResolve();
}

// Enrich events with the Destination metadata
this.add(new DestinationMetadataEnrichment(SEGMENT_DESTINATION_KEY));
this.add(this.queuePlugin);
Expand All @@ -105,7 +118,7 @@ export class SegmentDestination extends DestinationPlugin {
) {
this.apiHost = `https://${segmentSettings.apiHost}/b`;
}
this.isReady = true;
this.settingsResolve();
}

execute(event: SegmentEvent): Promise<SegmentEvent | undefined> {
Expand All @@ -115,6 +128,7 @@ export class SegmentDestination extends DestinationPlugin {
}

async flush() {
// Wait until the queue is done restoring before flushing
return this.queuePlugin.flush();
}
}
20 changes: 20 additions & 0 deletions packages/core/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,23 @@ export function deepCompare<T>(a: T, b: T): boolean {

return true;
}

export const createPromise = <T>(
timeout: number | undefined = undefined
): { promise: Promise<T>; resolve: (value: T) => void } => {
let resolver: (value: T) => void;
const promise = new Promise<T>((resolve, reject) => {
resolver = resolve;
if (timeout !== undefined) {
setTimeout(() => {
reject(new Error('Promise timed out'));
}, timeout);
}
});

return {
promise: promise,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
resolve: resolver!,
};
};

0 comments on commit acc8a93

Please sign in to comment.