Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observe daily desktop users count #311

Merged
merged 11 commits into from
Feb 18, 2025
8 changes: 8 additions & 0 deletions db/migrations/006.do.daily-desktop-users.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE daily_desktop_users (
day DATE NOT NULL,
platform TEXT NOT NULL,
user_count INT NOT NULL,
PRIMARY KEY (day, platform)
);

CREATE INDEX daily_desktop_users_to_day ON daily_desktop_users (day);
8 changes: 7 additions & 1 deletion observer/bin/spark-observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import { getPgPools } from '@filecoin-station/spark-stats-db'
import {
observeTransferEvents,
observeScheduledRewards,
observeRetrievalResultCodes
observeRetrievalResultCodes,
observeDesktopUsers
} from '../lib/observer.js'

const { INFLUXDB_TOKEN } = process.env
Expand Down Expand Up @@ -71,5 +72,10 @@ await Promise.all([
'Retrieval result codes',
() => observeRetrievalResultCodes(pgPools.stats, influxQueryApi),
ONE_HOUR
),
loop(
'Desktop users',
() => observeDesktopUsers(pgPools.stats, influxQueryApi),
24 * ONE_HOUR
)
])
29 changes: 29 additions & 0 deletions observer/lib/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,32 @@ export const observeRetrievalResultCodes = async (pgPoolStats, influxQueryApi) =
])
}
}

export const observeDesktopUsers = async (pgPoolStats, influxQueryApi) => {
const rows = await influxQueryApi.collectRows(`
from(bucket: "station-machines")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "machine" and exists r.platform)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({
_time: r._time,
station_id: r.station_id,
platform: r.platform,
}))
|> group()
|> unique(column: "station_id")
|> group(columns: ["platform"])
|> count(column: "station_id")
|> rename(columns: {station_id: "platform_count"})
|> group()
`)
await pgPoolStats.query(`
INSERT INTO daily_desktop_users
(day, platform, user_count)
VALUES (NOW(), UNNEST($1::TEXT[]), UNNEST($2::INT[]))
ON CONFLICT (day, platform) DO UPDATE SET user_count = EXCLUDED.user_count
`, [
rows.map(row => row.platform),
rows.map(row => row.platform_count)
])
}
2 changes: 1 addition & 1 deletion observer/lib/telemetry.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const debug = createDebug('spark:observer:telemetry')
export const createInflux = token => {
const influx = new InfluxDB({
url: 'https://eu-central-1-1.aws.cloud2.influxdata.com',
// bucket permissions: spark-evaluate:read spark-observer:write
// bucket permissions: spark-evaluate:read spark-observer:write station-machines:read
token
})
const writeClient = influx.getWriteApi(
Expand Down
29 changes: 28 additions & 1 deletion observer/test/observer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { beforeEach, describe, it } from 'mocha'
import { getPgPools } from '@filecoin-station/spark-stats-db'
import { givenDailyParticipants } from '@filecoin-station/spark-stats-db/test-helpers.js'

import { observeTransferEvents, observeScheduledRewards, observeRetrievalResultCodes } from '../lib/observer.js'
import { observeTransferEvents, observeScheduledRewards, observeRetrievalResultCodes, observeDesktopUsers } from '../lib/observer.js'

describe('observer', () => {
let pgPools
Expand Down Expand Up @@ -207,4 +207,31 @@ describe('observer', () => {
])
})
})

describe('observeDesktopUsers', () => {
beforeEach(async () => {
await pgPools.stats.query('DELETE FROM daily_desktop_users')
})

it('observes desktop users count', async () => {
await observeDesktopUsers(pgPools.stats, {
collectRows: async () => [
{ platform: 'win32', platform_count: 10 },
{ platform: 'darwin', platform_count: 5 },
{ platform: 'linux', platform_count: 3 }
]
})

const { rows } = await pgPools.stats.query(`
SELECT day::TEXT, platform, user_count
FROM daily_desktop_users
ORDER BY user_count DESC
`)
assert.deepStrictEqual(rows, [
{ day: today(), platform: 'win32', user_count: 10 },
{ day: today(), platform: 'darwin', user_count: 5 },
{ day: today(), platform: 'linux', user_count: 3 }
])
})
})
})