Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observe daily desktop users count #311

Merged
merged 11 commits into from
Feb 18, 2025
8 changes: 8 additions & 0 deletions db/migrations/006.do.daily-desktop-users.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE daily_desktop_users (
day DATE NOT NULL,
platform TEXT NOT NULL,
user_count INT NOT NULL,
PRIMARY KEY (day, platform)
);

CREATE INDEX daily_desktop_users_to_day ON daily_desktop_users (day);
5 changes: 3 additions & 2 deletions observer/bin/dry-run.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ethers } from 'ethers'
import assert from 'node:assert'

import { RPC_URL, rpcHeaders } from '../lib/config.js'
import { observeTransferEvents, observeScheduledRewards, observeRetrievalResultCodes } from '../lib/observer.js'
import { observeTransferEvents, observeScheduledRewards, observeRetrievalResultCodes, observeYesterdayDesktopUsers } from '../lib/observer.js'
import { createInflux } from '../lib/telemetry.js'
import { getPgPools } from '@filecoin-station/spark-stats-db'

Expand All @@ -26,7 +26,8 @@ await pgPools.stats.query('DELETE FROM daily_reward_transfers')
await Promise.all([
observeTransferEvents(pgPools.stats, ieContract, provider),
observeScheduledRewards(pgPools, ieContract),
observeRetrievalResultCodes(pgPools.stats, influxQueryApi)
observeRetrievalResultCodes(pgPools.stats, influxQueryApi),
observeYesterdayDesktopUsers(pgPools.stats, influxQueryApi)
])

await pgPools.stats.end()
8 changes: 7 additions & 1 deletion observer/bin/spark-observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import { getPgPools } from '@filecoin-station/spark-stats-db'
import {
observeTransferEvents,
observeScheduledRewards,
observeRetrievalResultCodes
observeRetrievalResultCodes,
observeYesterdayDesktopUsers
} from '../lib/observer.js'

const { INFLUXDB_TOKEN } = process.env
Expand Down Expand Up @@ -71,5 +72,10 @@ await Promise.all([
'Retrieval result codes',
() => observeRetrievalResultCodes(pgPools.stats, influxQueryApi),
ONE_HOUR
),
loop(
'Desktop users',
() => observeYesterdayDesktopUsers(pgPools.stats, influxQueryApi),
24 * ONE_HOUR
)
])
50 changes: 50 additions & 0 deletions observer/lib/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,53 @@ export const observeRetrievalResultCodes = async (pgPoolStats, influxQueryApi) =
rows.map(r => r._value)
])
}

export const observeYesterdayDesktopUsers = async (pgPoolStats, influxQueryApi) => {
// TODO: Replace with Flux boundaries.yesterday() once it becomes part of stable API
const yesterday = getYesterdayBoundaries()
const rows = await influxQueryApi.collectRows(`
from(bucket: "station-machines")
|> range(start: ${yesterday.start}, stop: ${yesterday.stop})
|> filter(fn: (r) => r._measurement == "machine" and exists r.platform)
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({_time: r._time, station_id: r.station_id, platform: r.platform}))
|> group()
|> unique(column: "station_id")
|> group(columns: ["platform"])
|> count(column: "station_id")
|> rename(columns: {station_id: "platform_count"})
|> group()
`)
await pgPoolStats.query(`
INSERT INTO daily_desktop_users
(day, platform, user_count)
VALUES (NOW() - INTERVAL '1 day', UNNEST($1::TEXT[]), UNNEST($2::INT[]))
ON CONFLICT (day, platform) DO UPDATE SET user_count = EXCLUDED.user_count
`, [
rows.map(row => row.platform),
rows.map(row => row.platform_count)
])
}

/**
* Returns the start and end timestamps for yesterday's date in UTC
* @returns {Object} Object containing start and stop timestamps
*/
function getYesterdayBoundaries () {
// Get current date
const now = new Date()

// Create start of yesterday
const start = new Date(now)
start.setDate(start.getDate() - 1) // Move to yesterday
start.setUTCHours(0, 0, 0, 0) // Set to start of day

// Create end of yesterday
const stop = new Date(now)
stop.setUTCHours(0, 0, 0, 0) // Set to end of day

return {
start: start.toISOString(),
stop: stop.toISOString()
}
}
2 changes: 1 addition & 1 deletion observer/lib/telemetry.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const debug = createDebug('spark:observer:telemetry')
export const createInflux = token => {
const influx = new InfluxDB({
url: 'https://eu-central-1-1.aws.cloud2.influxdata.com',
// bucket permissions: spark-evaluate:read spark-observer:write
// bucket permissions: spark-evaluate:read spark-observer:write station-machines:read
token
})
const writeClient = influx.getWriteApi(
Expand Down
30 changes: 29 additions & 1 deletion observer/test/observer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { beforeEach, describe, it } from 'mocha'
import { getPgPools } from '@filecoin-station/spark-stats-db'
import { givenDailyParticipants } from '@filecoin-station/spark-stats-db/test-helpers.js'

import { observeTransferEvents, observeScheduledRewards, observeRetrievalResultCodes } from '../lib/observer.js'
import { observeTransferEvents, observeScheduledRewards, observeRetrievalResultCodes, observeYesterdayDesktopUsers } from '../lib/observer.js'

describe('observer', () => {
let pgPools
Expand All @@ -15,6 +15,7 @@ describe('observer', () => {
].join('-')
}
const today = () => getLocalDayAsISOString(new Date())
const yesterday = () => getLocalDayAsISOString(new Date(Date.now() - 24 * 60 * 60 * 1000))

before(async () => {
pgPools = await getPgPools()
Expand Down Expand Up @@ -207,4 +208,31 @@ describe('observer', () => {
])
})
})

describe('observeDailyDesktopUsers', () => {
beforeEach(async () => {
await pgPools.stats.query('DELETE FROM daily_desktop_users')
})

it('observes desktop users count', async () => {
await observeYesterdayDesktopUsers(pgPools.stats, {
collectRows: async () => [
{ platform: 'win32', platform_count: 10 },
{ platform: 'darwin', platform_count: 5 },
{ platform: 'linux', platform_count: 3 }
]
})

const { rows } = await pgPools.stats.query(`
SELECT day::TEXT, platform, user_count
FROM daily_desktop_users
ORDER BY user_count DESC
`)
assert.deepStrictEqual(rows, [
{ day: yesterday(), platform: 'win32', user_count: 10 },
{ day: yesterday(), platform: 'darwin', user_count: 5 },
{ day: yesterday(), platform: 'linux', user_count: 3 }
])
})
})
})