From acc8a9383df88d218196cdb5a226c95185eb9d40 Mon Sep 17 00:00:00 2001 From: Oscar Bazaldua <511911+oscb@users.noreply.github.com> Date: Wed, 24 Apr 2024 14:59:37 -0700 Subject: [PATCH] fix: startup flush policy should send restored events (#943) Problem reported that StartupFlushPolicy wouldn't send the pending upload events from the previous app launch. Caused by concurrency: FlushPolicies get initialized before and without awaiting for storage restoration. Futhermore the Queue for upload is managed by the SegmentDestination plugin so there's no explicit way to await for that queue to be restored from the base client itself. Solution: Move the flush policy initialization later in the client: after plugins are initialized but before replaying buffered events Refactor StartupFlushPolicy to mark shouldFlush outside of start (this is to be able to handle it with the manualFlush) Refactor QueueFlushingPlugin and SegmentDestination to await for queue restoration and settings load when a flush is triggered. This guarantees that if the flush is triggered before the plugins are fully initialized (previous session events loaded) it will await for those operations to complete before uploading any events. --- packages/core/src/__tests__/analytics.test.ts | 15 ++++++---- packages/core/src/analytics.ts | 26 +++++++++------- .../flushPolicies/flush-policy-executer.ts | 2 +- .../src/flushPolicies/startup-flush-policy.ts | 7 ++++- .../core/src/plugins/QueueFlushingPlugin.ts | 28 ++++++++++++++++- .../core/src/plugins/SegmentDestination.ts | 30 ++++++++++++++----- packages/core/src/util.ts | 20 +++++++++++++ 7 files changed, 101 insertions(+), 27 deletions(-) diff --git a/packages/core/src/__tests__/analytics.test.ts b/packages/core/src/__tests__/analytics.test.ts index f2bd05959..0e575b88d 100644 --- a/packages/core/src/__tests__/analytics.test.ts +++ b/packages/core/src/__tests__/analytics.test.ts @@ -172,7 +172,7 @@ describe('SegmentClient', () => { }); describe('Flush Policies', () => { - it('creates the default flush policies when config is empty', () => { + it('creates the default flush policies when config is empty', async () => { client = new SegmentClient({ ...clientArgs, config: { @@ -181,11 +181,12 @@ describe('SegmentClient', () => { flushInterval: undefined, }, }); + await client.init(); const flushPolicies = client.getFlushPolicies(); expect(flushPolicies.length).toBe(2); }); - it('setting flush policies is mutually exclusive with flushAt/Interval', () => { + it('setting flush policies is mutually exclusive with flushAt/Interval', async () => { client = new SegmentClient({ ...clientArgs, config: { @@ -195,11 +196,12 @@ describe('SegmentClient', () => { flushPolicies: [new CountFlushPolicy(1)], }, }); + await client.init(); const flushPolicies = client.getFlushPolicies(); expect(flushPolicies.length).toBe(1); }); - it('setting flushAt/Interval to 0 should make the client have no uploads', () => { + it('setting flushAt/Interval to 0 should make the client have no uploads', async () => { client = new SegmentClient({ ...clientArgs, config: { @@ -208,11 +210,12 @@ describe('SegmentClient', () => { flushInterval: 0, }, }); + await client.init(); const flushPolicies = client.getFlushPolicies(); expect(flushPolicies.length).toBe(0); }); - it('setting an empty array of policies should make the client have no uploads', () => { + it('setting an empty array of policies should make the client have no uploads', async () => { client = new SegmentClient({ ...clientArgs, config: { @@ -222,11 +225,12 @@ describe('SegmentClient', () => { flushPolicies: [], }, }); + await client.init(); const flushPolicies = client.getFlushPolicies(); expect(flushPolicies.length).toBe(0); }); - it('can add and remove policies, does not mutate original array', () => { + it('can add and remove policies, does not mutate original array', async () => { const policies = [new CountFlushPolicy(1), new TimerFlushPolicy(200)]; client = new SegmentClient({ ...clientArgs, @@ -237,6 +241,7 @@ describe('SegmentClient', () => { flushPolicies: policies, }, }); + await client.init(); expect(client.getFlushPolicies().length).toBe(policies.length); client.removeFlushPolicy(...policies); diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 2970277a4..eca237883 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -88,7 +88,12 @@ export class SegmentClient { private pluginsToAdd: Plugin[] = []; - private flushPolicyExecuter!: FlushPolicyExecuter; + private flushPolicyExecuter: FlushPolicyExecuter = new FlushPolicyExecuter( + [], + () => { + void this.flush(); + } + ); private onPluginAddedObservers: OnPluginAddedCallback[] = []; @@ -233,9 +238,6 @@ export class SegmentClient { // Setup platform specific plugins this.platformPlugins.forEach((plugin) => this.add({ plugin: plugin })); - // Start flush policies - this.setupFlushPolicies(); - // set up tracking for lifecycle events this.setupLifecycleEvents(); } @@ -278,9 +280,6 @@ export class SegmentClient { await this.onReady(); this.isReady.value = true; - - // flush any stored events - this.flushPolicyExecuter.manualFlush(); } catch (error) { this.reportInternalError( new SegmentError( @@ -494,13 +493,18 @@ export class SegmentClient { } } + // Start flush policies + // This should be done before any pending events are added to the queue so that any policies that rely on events queued can trigger accordingly + this.setupFlushPolicies(); + // Send all events in the queue const pending = await this.store.pendingEvents.get(true); for (const e of pending) { await this.startTimelineProcessing(e); await this.store.pendingEvents.remove(e); } - // this.store.pendingEvents.set([]); + + this.flushPolicyExecuter.manualFlush(); } async flush(): Promise { @@ -756,9 +760,9 @@ export class SegmentClient { } } - this.flushPolicyExecuter = new FlushPolicyExecuter(flushPolicies, () => { - void this.flush(); - }); + for (const fp of flushPolicies) { + this.flushPolicyExecuter.add(fp); + } } /** diff --git a/packages/core/src/flushPolicies/flush-policy-executer.ts b/packages/core/src/flushPolicies/flush-policy-executer.ts index b84dd43d8..a705cacf2 100644 --- a/packages/core/src/flushPolicies/flush-policy-executer.ts +++ b/packages/core/src/flushPolicies/flush-policy-executer.ts @@ -89,12 +89,12 @@ export class FlushPolicyExecuter { } private startPolicy(policy: FlushPolicy) { - policy.start(); const unsubscribe = policy.shouldFlush.onChange((shouldFlush) => { if (shouldFlush) { this.onFlush(); } }); this.observers.push(unsubscribe); + policy.start(); } } diff --git a/packages/core/src/flushPolicies/startup-flush-policy.ts b/packages/core/src/flushPolicies/startup-flush-policy.ts index fdce72cae..7d29a4ec2 100644 --- a/packages/core/src/flushPolicies/startup-flush-policy.ts +++ b/packages/core/src/flushPolicies/startup-flush-policy.ts @@ -5,10 +5,15 @@ import { FlushPolicyBase } from './types'; * StatupFlushPolicy triggers a flush right away on client startup */ export class StartupFlushPolicy extends FlushPolicyBase { - start() { + constructor() { + super(); this.shouldFlush.value = true; } + start(): void { + // Nothing to do + } + onEvent(_event: SegmentEvent): void { // Nothing to do } diff --git a/packages/core/src/plugins/QueueFlushingPlugin.ts b/packages/core/src/plugins/QueueFlushingPlugin.ts index ac0d6ddb0..6c303525a 100644 --- a/packages/core/src/plugins/QueueFlushingPlugin.ts +++ b/packages/core/src/plugins/QueueFlushingPlugin.ts @@ -3,6 +3,8 @@ import type { SegmentClient } from '../analytics'; import { defaultConfig } from '../constants'; import { UtilityPlugin } from '../plugin'; import { PluginType, SegmentEvent } from '../types'; +import { createPromise } from '../util'; +import { ErrorType, SegmentError } from '../errors'; /** * This plugin manages a queue where all events get added to after timeline processing. @@ -17,17 +19,25 @@ export class QueueFlushingPlugin extends UtilityPlugin { private isPendingUpload = false; private queueStore: Store<{ events: SegmentEvent[] }> | undefined; private onFlush: (events: SegmentEvent[]) => Promise; + private isRestoredResolve: () => void; + private isRestored: Promise; /** * @param onFlush callback to execute when the queue is flushed (either by reaching the limit or manually) e.g. code to upload events to your destination + * @param storeKey key to store the queue in the store. Must be unique per destination instance + * @param restoreTimeout time in ms to wait for the queue to be restored from the store before uploading events (default: 500ms) */ constructor( onFlush: (events: SegmentEvent[]) => Promise, - storeKey = 'events' + storeKey = 'events', + restoreTimeout = 500 ) { super(); this.onFlush = onFlush; this.storeKey = storeKey; + const { promise, resolve } = createPromise(restoreTimeout); + this.isRestored = promise; + this.isRestoredResolve = resolve; } configure(analytics: SegmentClient): void { @@ -43,6 +53,9 @@ export class QueueFlushingPlugin extends UtilityPlugin { storeId: `${config.writeKey}-${this.storeKey}`, persistor: config.storePersistor, saveDelay: config.storePersistorSaveDelay ?? 0, + onInitialized: () => { + this.isRestoredResolve(); + }, }, } ); @@ -60,6 +73,19 @@ export class QueueFlushingPlugin extends UtilityPlugin { * Calls the onFlush callback with the events in the queue */ async flush() { + // Wait for the queue to be restored + try { + await this.isRestored; + } catch (e) { + // If the queue is not restored before the timeout, we will notify but not block flushing events + this.analytics?.reportInternalError( + new SegmentError( + ErrorType.InitializationError, + 'Queue timeout before completed restoration', + e + ) + ); + } const events = (await this.queueStore?.getState(true))?.events ?? []; if (!this.isPendingUpload) { try { diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index 93f0cbcb0..b3717b8ef 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -6,7 +6,7 @@ import { SegmentEvent, UpdateType, } from '../types'; -import { chunk } from '../util'; +import { chunk, createPromise } from '../util'; import { uploadEvents } from '../api'; import type { SegmentClient } from '../analytics'; import { DestinationMetadataEnrichment } from './DestinationMetadataEnrichment'; @@ -23,18 +23,25 @@ export class SegmentDestination extends DestinationPlugin { type = PluginType.destination; key = SEGMENT_DESTINATION_KEY; private apiHost?: string; - private isReady = false; + private settingsResolve: () => void; + private settingsPromise: Promise; + + constructor() { + super(); + // We don't timeout this promise. We strictly need the response from Segment before sending things + const { promise, resolve } = createPromise(); + this.settingsPromise = promise; + this.settingsResolve = resolve; + } private sendEvents = async (events: SegmentEvent[]): Promise => { - if (!this.isReady) { - // We're not sending events until Segment has loaded all settings - return Promise.resolve(); - } - if (events.length === 0) { return Promise.resolve(); } + // We're not sending events until Segment has loaded all settings + await this.settingsPromise; + const config = this.analytics?.getConfig() ?? defaultConfig; const chunkedEvents: SegmentEvent[][] = chunk( @@ -89,6 +96,12 @@ export class SegmentDestination extends DestinationPlugin { configure(analytics: SegmentClient): void { super.configure(analytics); + // If the client has a proxy we don't need to await for settings apiHost, we can send events directly + // Important! If new settings are required in the future you probably want to change this! + if (analytics.getConfig().proxy !== undefined) { + this.settingsResolve(); + } + // Enrich events with the Destination metadata this.add(new DestinationMetadataEnrichment(SEGMENT_DESTINATION_KEY)); this.add(this.queuePlugin); @@ -105,7 +118,7 @@ export class SegmentDestination extends DestinationPlugin { ) { this.apiHost = `https://${segmentSettings.apiHost}/b`; } - this.isReady = true; + this.settingsResolve(); } execute(event: SegmentEvent): Promise { @@ -115,6 +128,7 @@ export class SegmentDestination extends DestinationPlugin { } async flush() { + // Wait until the queue is done restoring before flushing return this.queuePlugin.flush(); } } diff --git a/packages/core/src/util.ts b/packages/core/src/util.ts index ae229abba..19b9bd50b 100644 --- a/packages/core/src/util.ts +++ b/packages/core/src/util.ts @@ -232,3 +232,23 @@ export function deepCompare(a: T, b: T): boolean { return true; } + +export const createPromise = ( + timeout: number | undefined = undefined +): { promise: Promise; resolve: (value: T) => void } => { + let resolver: (value: T) => void; + const promise = new Promise((resolve, reject) => { + resolver = resolve; + if (timeout !== undefined) { + setTimeout(() => { + reject(new Error('Promise timed out')); + }, timeout); + } + }); + + return { + promise: promise, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + resolve: resolver!, + }; +};