diff --git a/packages/backend/server/src/__tests__/e2e/workspace/admin-analytics.spec.ts b/packages/backend/server/src/__tests__/e2e/workspace/admin-analytics.spec.ts index 11a628bd78..08e7bd8ffb 100644 --- a/packages/backend/server/src/__tests__/e2e/workspace/admin-analytics.spec.ts +++ b/packages/backend/server/src/__tests__/e2e/workspace/admin-analytics.spec.ts @@ -1,4 +1,5 @@ import { PrismaClient } from '@prisma/client'; +import Sinon from 'sinon'; import { app, e2e, Mockers } from '../test'; @@ -389,6 +390,201 @@ e2e( } ); +e2e( + 'adminDashboard should carry forward missing sync and storage samples', + async t => { + const now = new Date('2026-04-05T08:55:00.000Z'); + const clock = Sinon.useFakeTimers({ now, toFake: ['Date'] }); + + try { + const admin = await app.create(Mockers.User, { + feature: 'administrator', + }); + await app.login(admin); + + const owner = await app.create(Mockers.User); + const workspace = await app.create(Mockers.Workspace, { + owner: { id: owner.id }, + }); + + const db = app.get(PrismaClient); + await ensureAnalyticsTables(db); + + const minute = new Date(); + minute.setSeconds(0, 0); + const sampleStartMinute = new Date(minute.getTime() - 30 * 60 * 1000); + const sampleEndMinute = new Date( + sampleStartMinute.getTime() + 2 * 60 * 1000 + ); + + await db.$executeRaw` + INSERT INTO sync_active_users_minutely (minute_ts, active_users, updated_at) + VALUES + (${sampleStartMinute}, 5, NOW()), + (${sampleEndMinute}, 7, NOW()) + ON CONFLICT (minute_ts) + DO UPDATE SET active_users = EXCLUDED.active_users, updated_at = EXCLUDED.updated_at + `; + + await db.$executeRaw` + INSERT INTO workspace_admin_stats ( + workspace_id, snapshot_count, snapshot_size, blob_count, blob_size, member_count, public_page_count, features, updated_at + ) + VALUES (${workspace.id}, 1, 130, 1, 70, 1, 0, ARRAY[]::text[], NOW()) + ON CONFLICT (workspace_id) + DO UPDATE SET + snapshot_count = EXCLUDED.snapshot_count, + snapshot_size = EXCLUDED.snapshot_size, + blob_count = EXCLUDED.blob_count, + blob_size = EXCLUDED.blob_size, + member_count = EXCLUDED.member_count, + public_page_count = EXCLUDED.public_page_count, + features = EXCLUDED.features, + updated_at = EXCLUDED.updated_at + `; + + const today = new Date(); + const currentDay = new Date( + Date.UTC( + today.getUTCFullYear(), + today.getUTCMonth(), + today.getUTCDate() + ) + ); + const twoDaysAgo = new Date( + currentDay.getTime() - 2 * 24 * 60 * 60 * 1000 + ); + + await db.$executeRaw` + INSERT INTO workspace_admin_stats_daily ( + workspace_id, date, snapshot_size, blob_size, member_count, updated_at + ) + VALUES (${workspace.id}, ${twoDaysAgo}, 100, 50, 1, NOW()) + ON CONFLICT (workspace_id, date) + DO UPDATE SET + snapshot_size = EXCLUDED.snapshot_size, + blob_size = EXCLUDED.blob_size, + member_count = EXCLUDED.member_count, + updated_at = EXCLUDED.updated_at + `; + + const result = await gql( + ` + query AdminDashboard($input: AdminDashboardInput) { + adminDashboard(input: $input) { + syncActiveUsers + syncActiveUsersTimeline { + minute + activeUsers + } + workspaceStorageBytes + blobStorageBytes + workspaceStorageHistory { + date + value + } + blobStorageHistory { + date + value + } + } + } + `, + { + input: { + storageHistoryDays: 3, + syncHistoryHours: 2, + }, + } + ); + + t.falsy(result.errors); + const dashboard = result.data!.adminDashboard; + t.is(dashboard.syncActiveUsers, 7); + const missingMinute = new Date(sampleStartMinute.getTime() + 60 * 1000); + t.is( + dashboard.syncActiveUsersTimeline.find( + (point: { minute: string }) => + point.minute === missingMinute.toISOString() + )?.activeUsers, + 5 + ); + const workspaceHistory = dashboard.workspaceStorageHistory.map( + (point: { value: number }) => point.value + ); + const blobHistory = dashboard.blobStorageHistory.map( + (point: { value: number }) => point.value + ); + t.is(workspaceHistory.length, 3); + t.is(blobHistory.length, 3); + t.is(workspaceHistory[0], workspaceHistory[1]); + t.is(blobHistory[0], blobHistory[1]); + t.is( + workspaceHistory[workspaceHistory.length - 1], + dashboard.workspaceStorageBytes + ); + t.is(blobHistory[blobHistory.length - 1], dashboard.blobStorageBytes); + } finally { + clock.restore(); + } + } +); + +e2e( + 'adminDashboard should not backfill sync samples older than the requested window', + async t => { + const now = new Date('2026-04-05T08:55:00.000Z'); + const clock = Sinon.useFakeTimers({ now, toFake: ['Date'] }); + + try { + const admin = await app.create(Mockers.User, { + feature: 'administrator', + }); + await app.login(admin); + + const db = app.get(PrismaClient); + await ensureAnalyticsTables(db); + + const staleMinute = new Date('2026-04-05T05:55:00.000Z'); + await db.$executeRaw` + INSERT INTO sync_active_users_minutely (minute_ts, active_users, updated_at) + VALUES (${staleMinute}, 9, NOW()) + ON CONFLICT (minute_ts) + DO UPDATE SET active_users = EXCLUDED.active_users, updated_at = EXCLUDED.updated_at + `; + + const result = await gql( + ` + query AdminDashboard($input: AdminDashboardInput) { + adminDashboard(input: $input) { + syncActiveUsers + syncActiveUsersTimeline { + activeUsers + } + } + } + `, + { + input: { + syncHistoryHours: 1, + }, + } + ); + + t.falsy(result.errors); + const dashboard = result.data!.adminDashboard; + t.is(dashboard.syncActiveUsers, 0); + t.true( + dashboard.syncActiveUsersTimeline.every( + (point: { activeUsers: number }) => point.activeUsers === 0 + ) + ); + } finally { + clock.restore(); + } + } +); + e2e( 'Doc analytics and lastAccessedMembers should enforce permissions and privacy', async t => { diff --git a/packages/backend/server/src/core/sync/gateway.ts b/packages/backend/server/src/core/sync/gateway.ts index 52619d6604..6f0b7da15a 100644 --- a/packages/backend/server/src/core/sync/gateway.ts +++ b/packages/backend/server/src/core/sync/gateway.ts @@ -218,6 +218,9 @@ export class SpaceSyncGateway private readonly localUserConnectionCounts = new Map(); private unresolvedPresenceSockets = 0; private flushTimer?: NodeJS.Timeout; + private activeUsersFlushTimer?: NodeJS.Timeout; + private activeUsersFlushInFlight = false; + private activeUsersFlushQueued = false; constructor( private readonly ac: AccessController, @@ -229,12 +232,9 @@ export class SpaceSyncGateway ) {} onModuleInit() { + this.scheduleActiveUsersFlush(0); this.flushTimer = setInterval(() => { - this.flushActiveUsersMinute().catch(error => { - this.logger.warn( - `Failed to flush active users minute: ${this.formatError(error)}` - ); - }); + this.scheduleActiveUsersFlush(0); }, 60_000); this.flushTimer.unref?.(); } @@ -244,6 +244,11 @@ export class SpaceSyncGateway clearInterval(this.flushTimer); this.flushTimer = undefined; } + if (this.activeUsersFlushTimer) { + clearTimeout(this.activeUsersFlushTimer); + this.activeUsersFlushTimer = undefined; + } + this.activeUsersFlushQueued = false; } private encodeUpdates(updates: Uint8Array[]) { @@ -331,13 +336,7 @@ export class SpaceSyncGateway metrics.socketio.gauge('connections').record(this.connectionCount); const userId = this.attachPresenceUserId(client); this.trackConnectedSocket(client.id, userId); - void this.flushActiveUsersMinute({ - aggregateAcrossCluster: false, - }).catch(error => { - this.logger.warn( - `Failed to flush active users minute: ${this.formatError(error)}` - ); - }); + this.scheduleActiveUsersFlush(); } handleDisconnect(client: Socket) { @@ -347,13 +346,7 @@ export class SpaceSyncGateway `Connection disconnected, total: ${this.connectionCount}` ); metrics.socketio.gauge('connections').record(this.connectionCount); - void this.flushActiveUsersMinute({ - aggregateAcrossCluster: false, - }).catch(error => { - this.logger.warn( - `Failed to flush active users minute: ${this.formatError(error)}` - ); - }); + this.scheduleActiveUsersFlush(); } private attachPresenceUserId(client: Socket): string | null { @@ -435,13 +428,55 @@ export class SpaceSyncGateway } } + private scheduleActiveUsersFlush(delayMs = 250) { + if (this.activeUsersFlushTimer) { + return; + } + + if (this.activeUsersFlushInFlight) { + this.activeUsersFlushQueued = true; + return; + } + + this.activeUsersFlushTimer = setTimeout(() => { + this.activeUsersFlushTimer = undefined; + this.runScheduledActiveUsersFlush(); + }, delayMs); + this.activeUsersFlushTimer.unref?.(); + } + + private runScheduledActiveUsersFlush() { + if (this.activeUsersFlushInFlight) { + this.activeUsersFlushQueued = true; + return; + } + + this.activeUsersFlushInFlight = true; + void this.flushActiveUsersMinute() + .catch(error => { + this.logger.warn( + `Failed to flush active users minute: ${this.formatError(error)}` + ); + }) + .finally(() => { + this.activeUsersFlushInFlight = false; + if (this.activeUsersFlushQueued) { + this.activeUsersFlushQueued = false; + this.scheduleActiveUsersFlush(0); + } + }); + } + private async flushActiveUsersMinute(options?: { aggregateAcrossCluster?: boolean; + skipWriteOnAggregateError?: boolean; }) { const minute = new Date(); minute.setSeconds(0, 0); const aggregateAcrossCluster = options?.aggregateAcrossCluster ?? true; + const skipWriteOnAggregateError = + options?.skipWriteOnAggregateError ?? aggregateAcrossCluster; let activeUsers = this.resolveLocalActiveUsers(); if (aggregateAcrossCluster) { try { @@ -467,8 +502,9 @@ export class SpaceSyncGateway } } catch (error) { this.logger.warn( - `Failed to aggregate active users from sockets, using local value: ${this.formatError(error)}` + `Failed to aggregate active users from sockets: ${this.formatError(error)}` ); + if (skipWriteOnAggregateError) return; } } diff --git a/packages/backend/server/src/core/workspaces/stats.job.ts b/packages/backend/server/src/core/workspaces/stats.job.ts index d2fc7f2358..447cc05df6 100644 --- a/packages/backend/server/src/core/workspaces/stats.job.ts +++ b/packages/backend/server/src/core/workspaces/stats.job.ts @@ -5,9 +5,11 @@ import { Prisma, PrismaClient } from '@prisma/client'; import { metrics } from '../../base'; const LOCK_NAMESPACE = 97_301; -const LOCK_KEY = 1; +const REFRESH_LOCK_KEY = 1; const DIRTY_BATCH_SIZE = 500; const FULL_REFRESH_BATCH_SIZE = 2000; +const REFRESH_LOCK_RETRY_DELAY_MS = 5_000; +const REFRESH_LOCK_RETRY_TIMES = 12; const TRANSACTION_TIMEOUT_MS = 120_000; @Injectable() @@ -21,7 +23,7 @@ export class WorkspaceStatsJob { const started = Date.now(); try { - const result = await this.withAdvisoryLock(async tx => { + const result = await this.withAdvisoryLock(REFRESH_LOCK_KEY, async tx => { const backlog = await this.countDirty(tx); metrics.workspace .gauge('admin_stats_dirty_backlog') @@ -63,11 +65,12 @@ export class WorkspaceStatsJob { async recalibrate() { let lastSid = 0; let processed = 0; + let completed = true; while (true) { const started = Date.now(); try { - const result = await this.withAdvisoryLock(async tx => { + const result = await this.withRefreshLockRetry(async tx => { const workspaces = await this.fetchWorkspaceBatch( tx, lastSid, @@ -87,8 +90,9 @@ export class WorkspaceStatsJob { }); if (!result) { - this.logger.debug( - 'skip admin stats recalibration, lock not acquired' + completed = false; + this.logger.warn( + 'skip admin stats recalibration after retrying lock acquisition' ); break; } @@ -108,6 +112,7 @@ export class WorkspaceStatsJob { break; } } catch (error) { + completed = false; metrics.workspace.counter('admin_stats_refresh_failed').add(1, { mode: 'full', }); @@ -125,13 +130,24 @@ export class WorkspaceStatsJob { ); } + if (!completed) { + this.logger.warn( + 'Skip daily workspace admin stats snapshot because full recalibration did not complete' + ); + return; + } + try { - const snapshotted = await this.withAdvisoryLock(async tx => { + const snapshotted = await this.withRefreshLockRetry(async tx => { await this.writeDailySnapshot(tx); return true; }); if (snapshotted) { this.logger.debug('Wrote daily workspace admin stats snapshot'); + } else { + this.logger.warn( + 'Skipped daily workspace admin stats snapshot after retrying lock acquisition' + ); } } catch (error) { this.logger.error( @@ -142,9 +158,10 @@ export class WorkspaceStatsJob { } private async withAdvisoryLock( + lockKey: number, callback: (tx: Prisma.TransactionClient) => Promise ): Promise { - const lockIdSql = Prisma.sql`(${LOCK_NAMESPACE}::bigint << 32) + ${LOCK_KEY}::bigint`; + const lockIdSql = Prisma.sql`(${LOCK_NAMESPACE}::bigint << 32) + ${lockKey}::bigint`; return await this.prisma.$transaction( async tx => { @@ -169,6 +186,26 @@ export class WorkspaceStatsJob { ); } + private async withRefreshLockRetry( + callback: (tx: Prisma.TransactionClient) => Promise + ) { + for (let attempt = 0; attempt < REFRESH_LOCK_RETRY_TIMES; attempt++) { + const result = await this.withAdvisoryLock(REFRESH_LOCK_KEY, callback); + + if (result) { + return result; + } + + if (attempt < REFRESH_LOCK_RETRY_TIMES - 1) { + await new Promise(resolve => + setTimeout(resolve, REFRESH_LOCK_RETRY_DELAY_MS) + ); + } + } + + return null; + } + private async loadDirty( tx: Prisma.TransactionClient, limit: number diff --git a/packages/backend/server/src/models/workspace-analytics.ts b/packages/backend/server/src/models/workspace-analytics.ts index 8041346797..7a1d8a87c0 100644 --- a/packages/backend/server/src/models/workspace-analytics.ts +++ b/packages/backend/server/src/models/workspace-analytics.ts @@ -349,7 +349,7 @@ export class WorkspaceAnalyticsModel extends BaseModel { ( SELECT active_users FROM sync_active_users_minutely - WHERE minute_ts <= ${syncTo} + WHERE minute_ts BETWEEN ${syncFrom} AND ${syncTo} ORDER BY minute_ts DESC LIMIT 1 ), @@ -362,9 +362,15 @@ export class WorkspaceAnalyticsModel extends BaseModel { ) SELECT minutes.minute_ts AS minute, - COALESCE(s.active_users, 0)::integer AS "activeUsers" + COALESCE(sample.active_users, 0)::integer AS "activeUsers" FROM minutes - LEFT JOIN sync_active_users_minutely s ON s.minute_ts = minutes.minute_ts + LEFT JOIN LATERAL ( + SELECT active_users + FROM sync_active_users_minutely + WHERE minute_ts BETWEEN ${syncFrom} AND minutes.minute_ts + ORDER BY minute_ts DESC + LIMIT 1 + ) sample ON TRUE ORDER BY minute ASC `, this.db.$queryRaw< @@ -394,15 +400,23 @@ export class WorkspaceAnalyticsModel extends BaseModel { COALESCE(SUM(snapshot_size), 0) AS workspace_storage_bytes, COALESCE(SUM(blob_size), 0) AS blob_storage_bytes FROM workspace_admin_stats_daily - WHERE date BETWEEN ${storageFrom}::date AND ${currentDay}::date + WHERE date <= ${currentDay}::date GROUP BY date ) SELECT days.day AS date, - COALESCE(grouped.workspace_storage_bytes, 0) AS "workspaceStorageBytes", - COALESCE(grouped.blob_storage_bytes, 0) AS "blobStorageBytes" + COALESCE(snapshot.workspace_storage_bytes, 0) AS "workspaceStorageBytes", + COALESCE(snapshot.blob_storage_bytes, 0) AS "blobStorageBytes" FROM days - LEFT JOIN grouped ON grouped.date = days.day + LEFT JOIN LATERAL ( + SELECT + workspace_storage_bytes, + blob_storage_bytes + FROM grouped + WHERE date <= days.day + ORDER BY date DESC + LIMIT 1 + ) snapshot ON TRUE ORDER BY date ASC `, this.db.$queryRaw<{ conversations: bigint | number }[]>` @@ -415,11 +429,24 @@ export class WorkspaceAnalyticsModel extends BaseModel { topSharedLinksPromise, ]); + const currentWorkspaceStorageBytes = Number( + storageCurrent[0]?.workspaceStorageBytes ?? 0 + ); + const currentBlobStorageBytes = Number( + storageCurrent[0]?.blobStorageBytes ?? 0 + ); const storageHistorySeries = storageHistory.map(row => ({ date: row.date, workspaceStorageBytes: Number(row.workspaceStorageBytes ?? 0), blobStorageBytes: Number(row.blobStorageBytes ?? 0), })); + if (storageHistorySeries.length > 0) { + const lastPoint = storageHistorySeries[storageHistorySeries.length - 1]; + if (asDateOnlyString(lastPoint.date) === asDateOnlyString(currentDay)) { + lastPoint.workspaceStorageBytes = currentWorkspaceStorageBytes; + lastPoint.blobStorageBytes = currentBlobStorageBytes; + } + } return { syncActiveUsers: Number(syncCurrent[0]?.activeUsers ?? 0), @@ -436,10 +463,8 @@ export class WorkspaceAnalyticsModel extends BaseModel { effectiveSize: syncHistoryHours, }, copilotConversations: Number(copilotCount[0]?.conversations ?? 0), - workspaceStorageBytes: Number( - storageCurrent[0]?.workspaceStorageBytes ?? 0 - ), - blobStorageBytes: Number(storageCurrent[0]?.blobStorageBytes ?? 0), + workspaceStorageBytes: currentWorkspaceStorageBytes, + blobStorageBytes: currentBlobStorageBytes, workspaceStorageHistory: storageHistorySeries.map(row => ({ date: row.date, value: row.workspaceStorageBytes,