diff --git a/db/migrations/006.do.daily-desktop-users.sql b/db/migrations/006.do.daily-desktop-users.sql new file mode 100644 index 0000000..ac7fd33 --- /dev/null +++ b/db/migrations/006.do.daily-desktop-users.sql @@ -0,0 +1,8 @@ +CREATE TABLE daily_desktop_users ( + day DATE NOT NULL, + platform TEXT NOT NULL, + user_count INT NOT NULL, + PRIMARY KEY (day, platform) +); + +CREATE INDEX daily_desktop_users_to_day ON daily_desktop_users (day); diff --git a/observer/bin/dry-run.js b/observer/bin/dry-run.js index d718bdd..1a84f64 100644 --- a/observer/bin/dry-run.js +++ b/observer/bin/dry-run.js @@ -3,7 +3,7 @@ import { ethers } from 'ethers' import assert from 'node:assert' import { RPC_URL, rpcHeaders } from '../lib/config.js' -import { observeTransferEvents, observeScheduledRewards, observeRetrievalResultCodes } from '../lib/observer.js' +import { observeTransferEvents, observeScheduledRewards, observeRetrievalResultCodes, observeYesterdayDesktopUsers } from '../lib/observer.js' import { createInflux } from '../lib/telemetry.js' import { getPgPools } from '@filecoin-station/spark-stats-db' @@ -26,7 +26,8 @@ await pgPools.stats.query('DELETE FROM daily_reward_transfers') await Promise.all([ observeTransferEvents(pgPools.stats, ieContract, provider), observeScheduledRewards(pgPools, ieContract), - observeRetrievalResultCodes(pgPools.stats, influxQueryApi) + observeRetrievalResultCodes(pgPools.stats, influxQueryApi), + observeYesterdayDesktopUsers(pgPools.stats, influxQueryApi) ]) await pgPools.stats.end() diff --git a/observer/bin/spark-observer.js b/observer/bin/spark-observer.js index b454f6b..5aad329 100644 --- a/observer/bin/spark-observer.js +++ b/observer/bin/spark-observer.js @@ -12,7 +12,8 @@ import { getPgPools } from '@filecoin-station/spark-stats-db' import { observeTransferEvents, observeScheduledRewards, - observeRetrievalResultCodes + observeRetrievalResultCodes, + observeYesterdayDesktopUsers } from '../lib/observer.js' const { INFLUXDB_TOKEN } = process.env @@ -71,5 +72,10 @@ await Promise.all([ 'Retrieval result codes', () => observeRetrievalResultCodes(pgPools.stats, influxQueryApi), ONE_HOUR + ), + loop( + 'Desktop users', + () => observeYesterdayDesktopUsers(pgPools.stats, influxQueryApi), + 24 * ONE_HOUR ) ]) diff --git a/observer/lib/observer.js b/observer/lib/observer.js index d28aa9c..b8a4447 100644 --- a/observer/lib/observer.js +++ b/observer/lib/observer.js @@ -130,3 +130,53 @@ export const observeRetrievalResultCodes = async (pgPoolStats, influxQueryApi) = rows.map(r => r._value) ]) } + +export const observeYesterdayDesktopUsers = async (pgPoolStats, influxQueryApi) => { + // TODO: Replace with Flux boundaries.yesterday() once it becomes part of stable API + const yesterday = getYesterdayBoundaries() + const rows = await influxQueryApi.collectRows(` + from(bucket: "station-machines") + |> range(start: ${yesterday.start}, stop: ${yesterday.stop}) + |> filter(fn: (r) => r._measurement == "machine" and exists r.platform) + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> map(fn: (r) => ({_time: r._time, station_id: r.station_id, platform: r.platform})) + |> group() + |> unique(column: "station_id") + |> group(columns: ["platform"]) + |> count(column: "station_id") + |> rename(columns: {station_id: "platform_count"}) + |> group() + `) + await pgPoolStats.query(` + INSERT INTO daily_desktop_users + (day, platform, user_count) + VALUES (NOW() - INTERVAL '1 day', UNNEST($1::TEXT[]), UNNEST($2::INT[])) + ON CONFLICT (day, platform) DO UPDATE SET user_count = EXCLUDED.user_count + `, [ + rows.map(row => row.platform), + rows.map(row => row.platform_count) + ]) +} + +/** + * Returns the start and end timestamps for yesterday's date in UTC + * @returns {Object} Object containing start and stop timestamps + */ +function getYesterdayBoundaries () { + // Get current date + const now = new Date() + + // Create start of yesterday + const start = new Date(now) + start.setDate(start.getDate() - 1) // Move to yesterday + start.setUTCHours(0, 0, 0, 0) // Set to start of day + + // Create end of yesterday + const stop = new Date(now) + stop.setUTCHours(0, 0, 0, 0) // Set to end of day + + return { + start: start.toISOString(), + stop: stop.toISOString() + } +} diff --git a/observer/lib/telemetry.js b/observer/lib/telemetry.js index 7567c5a..bde1a99 100644 --- a/observer/lib/telemetry.js +++ b/observer/lib/telemetry.js @@ -6,7 +6,7 @@ const debug = createDebug('spark:observer:telemetry') export const createInflux = token => { const influx = new InfluxDB({ url: 'https://eu-central-1-1.aws.cloud2.influxdata.com', - // bucket permissions: spark-evaluate:read spark-observer:write + // bucket permissions: spark-evaluate:read spark-observer:write station-machines:read token }) const writeClient = influx.getWriteApi( diff --git a/observer/test/observer.test.js b/observer/test/observer.test.js index 99f55a1..b2fdafe 100644 --- a/observer/test/observer.test.js +++ b/observer/test/observer.test.js @@ -3,7 +3,7 @@ import { beforeEach, describe, it } from 'mocha' import { getPgPools } from '@filecoin-station/spark-stats-db' import { givenDailyParticipants } from '@filecoin-station/spark-stats-db/test-helpers.js' -import { observeTransferEvents, observeScheduledRewards, observeRetrievalResultCodes } from '../lib/observer.js' +import { observeTransferEvents, observeScheduledRewards, observeRetrievalResultCodes, observeYesterdayDesktopUsers } from '../lib/observer.js' describe('observer', () => { let pgPools @@ -15,6 +15,7 @@ describe('observer', () => { ].join('-') } const today = () => getLocalDayAsISOString(new Date()) + const yesterday = () => getLocalDayAsISOString(new Date(Date.now() - 24 * 60 * 60 * 1000)) before(async () => { pgPools = await getPgPools() @@ -207,4 +208,31 @@ describe('observer', () => { ]) }) }) + + describe('observeDailyDesktopUsers', () => { + beforeEach(async () => { + await pgPools.stats.query('DELETE FROM daily_desktop_users') + }) + + it('observes desktop users count', async () => { + await observeYesterdayDesktopUsers(pgPools.stats, { + collectRows: async () => [ + { platform: 'win32', platform_count: 10 }, + { platform: 'darwin', platform_count: 5 }, + { platform: 'linux', platform_count: 3 } + ] + }) + + const { rows } = await pgPools.stats.query(` + SELECT day::TEXT, platform, user_count + FROM daily_desktop_users + ORDER BY user_count DESC + `) + assert.deepStrictEqual(rows, [ + { day: yesterday(), platform: 'win32', user_count: 10 }, + { day: yesterday(), platform: 'darwin', user_count: 5 }, + { day: yesterday(), platform: 'linux', user_count: 3 } + ]) + }) + }) })