fix(server): online and storage statistics (#14792)

#### PR Dependency Tree


* **PR #14792** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Admin dashboard returns more accurate sync and storage timelines with
carry‑forwarded minute buckets and corrected current totals.

* **Bug Fixes**
* Active-user flushes are debounced/scheduled to prevent overlapping
writes and reduce stale counts.
* Snapshot writes now retry and will skip gracefully when lock
contention prevents completion, avoiding partial snapshots.

* **Tests**
* New e2e tests cover carry‑forward behavior, no backfill outside
requested windows, and storage history accuracy.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-04-07 02:08:13 +08:00
committed by GitHub
parent 193ec14ad3
commit 5806ad8a3a
4 changed files with 332 additions and 38 deletions
@@ -1,4 +1,5 @@
import { PrismaClient } from '@prisma/client';
import Sinon from 'sinon';
import { app, e2e, Mockers } from '../test';
@@ -389,6 +390,201 @@ e2e(
}
);
e2e(
'adminDashboard should carry forward missing sync and storage samples',
async t => {
const now = new Date('2026-04-05T08:55:00.000Z');
const clock = Sinon.useFakeTimers({ now, toFake: ['Date'] });
try {
const admin = await app.create(Mockers.User, {
feature: 'administrator',
});
await app.login(admin);
const owner = await app.create(Mockers.User);
const workspace = await app.create(Mockers.Workspace, {
owner: { id: owner.id },
});
const db = app.get(PrismaClient);
await ensureAnalyticsTables(db);
const minute = new Date();
minute.setSeconds(0, 0);
const sampleStartMinute = new Date(minute.getTime() - 30 * 60 * 1000);
const sampleEndMinute = new Date(
sampleStartMinute.getTime() + 2 * 60 * 1000
);
await db.$executeRaw`
INSERT INTO sync_active_users_minutely (minute_ts, active_users, updated_at)
VALUES
(${sampleStartMinute}, 5, NOW()),
(${sampleEndMinute}, 7, NOW())
ON CONFLICT (minute_ts)
DO UPDATE SET active_users = EXCLUDED.active_users, updated_at = EXCLUDED.updated_at
`;
await db.$executeRaw`
INSERT INTO workspace_admin_stats (
workspace_id, snapshot_count, snapshot_size, blob_count, blob_size, member_count, public_page_count, features, updated_at
)
VALUES (${workspace.id}, 1, 130, 1, 70, 1, 0, ARRAY[]::text[], NOW())
ON CONFLICT (workspace_id)
DO UPDATE SET
snapshot_count = EXCLUDED.snapshot_count,
snapshot_size = EXCLUDED.snapshot_size,
blob_count = EXCLUDED.blob_count,
blob_size = EXCLUDED.blob_size,
member_count = EXCLUDED.member_count,
public_page_count = EXCLUDED.public_page_count,
features = EXCLUDED.features,
updated_at = EXCLUDED.updated_at
`;
const today = new Date();
const currentDay = new Date(
Date.UTC(
today.getUTCFullYear(),
today.getUTCMonth(),
today.getUTCDate()
)
);
const twoDaysAgo = new Date(
currentDay.getTime() - 2 * 24 * 60 * 60 * 1000
);
await db.$executeRaw`
INSERT INTO workspace_admin_stats_daily (
workspace_id, date, snapshot_size, blob_size, member_count, updated_at
)
VALUES (${workspace.id}, ${twoDaysAgo}, 100, 50, 1, NOW())
ON CONFLICT (workspace_id, date)
DO UPDATE SET
snapshot_size = EXCLUDED.snapshot_size,
blob_size = EXCLUDED.blob_size,
member_count = EXCLUDED.member_count,
updated_at = EXCLUDED.updated_at
`;
const result = await gql(
`
query AdminDashboard($input: AdminDashboardInput) {
adminDashboard(input: $input) {
syncActiveUsers
syncActiveUsersTimeline {
minute
activeUsers
}
workspaceStorageBytes
blobStorageBytes
workspaceStorageHistory {
date
value
}
blobStorageHistory {
date
value
}
}
}
`,
{
input: {
storageHistoryDays: 3,
syncHistoryHours: 2,
},
}
);
t.falsy(result.errors);
const dashboard = result.data!.adminDashboard;
t.is(dashboard.syncActiveUsers, 7);
const missingMinute = new Date(sampleStartMinute.getTime() + 60 * 1000);
t.is(
dashboard.syncActiveUsersTimeline.find(
(point: { minute: string }) =>
point.minute === missingMinute.toISOString()
)?.activeUsers,
5
);
const workspaceHistory = dashboard.workspaceStorageHistory.map(
(point: { value: number }) => point.value
);
const blobHistory = dashboard.blobStorageHistory.map(
(point: { value: number }) => point.value
);
t.is(workspaceHistory.length, 3);
t.is(blobHistory.length, 3);
t.is(workspaceHistory[0], workspaceHistory[1]);
t.is(blobHistory[0], blobHistory[1]);
t.is(
workspaceHistory[workspaceHistory.length - 1],
dashboard.workspaceStorageBytes
);
t.is(blobHistory[blobHistory.length - 1], dashboard.blobStorageBytes);
} finally {
clock.restore();
}
}
);
e2e(
'adminDashboard should not backfill sync samples older than the requested window',
async t => {
const now = new Date('2026-04-05T08:55:00.000Z');
const clock = Sinon.useFakeTimers({ now, toFake: ['Date'] });
try {
const admin = await app.create(Mockers.User, {
feature: 'administrator',
});
await app.login(admin);
const db = app.get(PrismaClient);
await ensureAnalyticsTables(db);
const staleMinute = new Date('2026-04-05T05:55:00.000Z');
await db.$executeRaw`
INSERT INTO sync_active_users_minutely (minute_ts, active_users, updated_at)
VALUES (${staleMinute}, 9, NOW())
ON CONFLICT (minute_ts)
DO UPDATE SET active_users = EXCLUDED.active_users, updated_at = EXCLUDED.updated_at
`;
const result = await gql(
`
query AdminDashboard($input: AdminDashboardInput) {
adminDashboard(input: $input) {
syncActiveUsers
syncActiveUsersTimeline {
activeUsers
}
}
}
`,
{
input: {
syncHistoryHours: 1,
},
}
);
t.falsy(result.errors);
const dashboard = result.data!.adminDashboard;
t.is(dashboard.syncActiveUsers, 0);
t.true(
dashboard.syncActiveUsersTimeline.every(
(point: { activeUsers: number }) => point.activeUsers === 0
)
);
} finally {
clock.restore();
}
}
);
e2e(
'Doc analytics and lastAccessedMembers should enforce permissions and privacy',
async t => {
@@ -218,6 +218,9 @@ export class SpaceSyncGateway
private readonly localUserConnectionCounts = new Map<string, number>();
private unresolvedPresenceSockets = 0;
private flushTimer?: NodeJS.Timeout;
private activeUsersFlushTimer?: NodeJS.Timeout;
private activeUsersFlushInFlight = false;
private activeUsersFlushQueued = false;
constructor(
private readonly ac: AccessController,
@@ -229,12 +232,9 @@ export class SpaceSyncGateway
) {}
onModuleInit() {
this.scheduleActiveUsersFlush(0);
this.flushTimer = setInterval(() => {
this.flushActiveUsersMinute().catch(error => {
this.logger.warn(
`Failed to flush active users minute: ${this.formatError(error)}`
);
});
this.scheduleActiveUsersFlush(0);
}, 60_000);
this.flushTimer.unref?.();
}
@@ -244,6 +244,11 @@ export class SpaceSyncGateway
clearInterval(this.flushTimer);
this.flushTimer = undefined;
}
if (this.activeUsersFlushTimer) {
clearTimeout(this.activeUsersFlushTimer);
this.activeUsersFlushTimer = undefined;
}
this.activeUsersFlushQueued = false;
}
private encodeUpdates(updates: Uint8Array[]) {
@@ -331,13 +336,7 @@ export class SpaceSyncGateway
metrics.socketio.gauge('connections').record(this.connectionCount);
const userId = this.attachPresenceUserId(client);
this.trackConnectedSocket(client.id, userId);
void this.flushActiveUsersMinute({
aggregateAcrossCluster: false,
}).catch(error => {
this.logger.warn(
`Failed to flush active users minute: ${this.formatError(error)}`
);
});
this.scheduleActiveUsersFlush();
}
handleDisconnect(client: Socket) {
@@ -347,13 +346,7 @@ export class SpaceSyncGateway
`Connection disconnected, total: ${this.connectionCount}`
);
metrics.socketio.gauge('connections').record(this.connectionCount);
void this.flushActiveUsersMinute({
aggregateAcrossCluster: false,
}).catch(error => {
this.logger.warn(
`Failed to flush active users minute: ${this.formatError(error)}`
);
});
this.scheduleActiveUsersFlush();
}
private attachPresenceUserId(client: Socket): string | null {
@@ -435,13 +428,55 @@ export class SpaceSyncGateway
}
}
private scheduleActiveUsersFlush(delayMs = 250) {
if (this.activeUsersFlushTimer) {
return;
}
if (this.activeUsersFlushInFlight) {
this.activeUsersFlushQueued = true;
return;
}
this.activeUsersFlushTimer = setTimeout(() => {
this.activeUsersFlushTimer = undefined;
this.runScheduledActiveUsersFlush();
}, delayMs);
this.activeUsersFlushTimer.unref?.();
}
private runScheduledActiveUsersFlush() {
if (this.activeUsersFlushInFlight) {
this.activeUsersFlushQueued = true;
return;
}
this.activeUsersFlushInFlight = true;
void this.flushActiveUsersMinute()
.catch(error => {
this.logger.warn(
`Failed to flush active users minute: ${this.formatError(error)}`
);
})
.finally(() => {
this.activeUsersFlushInFlight = false;
if (this.activeUsersFlushQueued) {
this.activeUsersFlushQueued = false;
this.scheduleActiveUsersFlush(0);
}
});
}
private async flushActiveUsersMinute(options?: {
aggregateAcrossCluster?: boolean;
skipWriteOnAggregateError?: boolean;
}) {
const minute = new Date();
minute.setSeconds(0, 0);
const aggregateAcrossCluster = options?.aggregateAcrossCluster ?? true;
const skipWriteOnAggregateError =
options?.skipWriteOnAggregateError ?? aggregateAcrossCluster;
let activeUsers = this.resolveLocalActiveUsers();
if (aggregateAcrossCluster) {
try {
@@ -467,8 +502,9 @@ export class SpaceSyncGateway
}
} catch (error) {
this.logger.warn(
`Failed to aggregate active users from sockets, using local value: ${this.formatError(error)}`
`Failed to aggregate active users from sockets: ${this.formatError(error)}`
);
if (skipWriteOnAggregateError) return;
}
}
@@ -5,9 +5,11 @@ import { Prisma, PrismaClient } from '@prisma/client';
import { metrics } from '../../base';
const LOCK_NAMESPACE = 97_301;
const LOCK_KEY = 1;
const REFRESH_LOCK_KEY = 1;
const DIRTY_BATCH_SIZE = 500;
const FULL_REFRESH_BATCH_SIZE = 2000;
const REFRESH_LOCK_RETRY_DELAY_MS = 5_000;
const REFRESH_LOCK_RETRY_TIMES = 12;
const TRANSACTION_TIMEOUT_MS = 120_000;
@Injectable()
@@ -21,7 +23,7 @@ export class WorkspaceStatsJob {
const started = Date.now();
try {
const result = await this.withAdvisoryLock(async tx => {
const result = await this.withAdvisoryLock(REFRESH_LOCK_KEY, async tx => {
const backlog = await this.countDirty(tx);
metrics.workspace
.gauge('admin_stats_dirty_backlog')
@@ -63,11 +65,12 @@ export class WorkspaceStatsJob {
async recalibrate() {
let lastSid = 0;
let processed = 0;
let completed = true;
while (true) {
const started = Date.now();
try {
const result = await this.withAdvisoryLock(async tx => {
const result = await this.withRefreshLockRetry(async tx => {
const workspaces = await this.fetchWorkspaceBatch(
tx,
lastSid,
@@ -87,8 +90,9 @@ export class WorkspaceStatsJob {
});
if (!result) {
this.logger.debug(
'skip admin stats recalibration, lock not acquired'
completed = false;
this.logger.warn(
'skip admin stats recalibration after retrying lock acquisition'
);
break;
}
@@ -108,6 +112,7 @@ export class WorkspaceStatsJob {
break;
}
} catch (error) {
completed = false;
metrics.workspace.counter('admin_stats_refresh_failed').add(1, {
mode: 'full',
});
@@ -125,13 +130,24 @@ export class WorkspaceStatsJob {
);
}
if (!completed) {
this.logger.warn(
'Skip daily workspace admin stats snapshot because full recalibration did not complete'
);
return;
}
try {
const snapshotted = await this.withAdvisoryLock(async tx => {
const snapshotted = await this.withRefreshLockRetry(async tx => {
await this.writeDailySnapshot(tx);
return true;
});
if (snapshotted) {
this.logger.debug('Wrote daily workspace admin stats snapshot');
} else {
this.logger.warn(
'Skipped daily workspace admin stats snapshot after retrying lock acquisition'
);
}
} catch (error) {
this.logger.error(
@@ -142,9 +158,10 @@ export class WorkspaceStatsJob {
}
private async withAdvisoryLock<T>(
lockKey: number,
callback: (tx: Prisma.TransactionClient) => Promise<T>
): Promise<T | null> {
const lockIdSql = Prisma.sql`(${LOCK_NAMESPACE}::bigint << 32) + ${LOCK_KEY}::bigint`;
const lockIdSql = Prisma.sql`(${LOCK_NAMESPACE}::bigint << 32) + ${lockKey}::bigint`;
return await this.prisma.$transaction(
async tx => {
@@ -169,6 +186,26 @@ export class WorkspaceStatsJob {
);
}
private async withRefreshLockRetry<T>(
callback: (tx: Prisma.TransactionClient) => Promise<T>
) {
for (let attempt = 0; attempt < REFRESH_LOCK_RETRY_TIMES; attempt++) {
const result = await this.withAdvisoryLock(REFRESH_LOCK_KEY, callback);
if (result) {
return result;
}
if (attempt < REFRESH_LOCK_RETRY_TIMES - 1) {
await new Promise(resolve =>
setTimeout(resolve, REFRESH_LOCK_RETRY_DELAY_MS)
);
}
}
return null;
}
private async loadDirty(
tx: Prisma.TransactionClient,
limit: number
@@ -349,7 +349,7 @@ export class WorkspaceAnalyticsModel extends BaseModel {
(
SELECT active_users
FROM sync_active_users_minutely
WHERE minute_ts <= ${syncTo}
WHERE minute_ts BETWEEN ${syncFrom} AND ${syncTo}
ORDER BY minute_ts DESC
LIMIT 1
),
@@ -362,9 +362,15 @@ export class WorkspaceAnalyticsModel extends BaseModel {
)
SELECT
minutes.minute_ts AS minute,
COALESCE(s.active_users, 0)::integer AS "activeUsers"
COALESCE(sample.active_users, 0)::integer AS "activeUsers"
FROM minutes
LEFT JOIN sync_active_users_minutely s ON s.minute_ts = minutes.minute_ts
LEFT JOIN LATERAL (
SELECT active_users
FROM sync_active_users_minutely
WHERE minute_ts BETWEEN ${syncFrom} AND minutes.minute_ts
ORDER BY minute_ts DESC
LIMIT 1
) sample ON TRUE
ORDER BY minute ASC
`,
this.db.$queryRaw<
@@ -394,15 +400,23 @@ export class WorkspaceAnalyticsModel extends BaseModel {
COALESCE(SUM(snapshot_size), 0) AS workspace_storage_bytes,
COALESCE(SUM(blob_size), 0) AS blob_storage_bytes
FROM workspace_admin_stats_daily
WHERE date BETWEEN ${storageFrom}::date AND ${currentDay}::date
WHERE date <= ${currentDay}::date
GROUP BY date
)
SELECT
days.day AS date,
COALESCE(grouped.workspace_storage_bytes, 0) AS "workspaceStorageBytes",
COALESCE(grouped.blob_storage_bytes, 0) AS "blobStorageBytes"
COALESCE(snapshot.workspace_storage_bytes, 0) AS "workspaceStorageBytes",
COALESCE(snapshot.blob_storage_bytes, 0) AS "blobStorageBytes"
FROM days
LEFT JOIN grouped ON grouped.date = days.day
LEFT JOIN LATERAL (
SELECT
workspace_storage_bytes,
blob_storage_bytes
FROM grouped
WHERE date <= days.day
ORDER BY date DESC
LIMIT 1
) snapshot ON TRUE
ORDER BY date ASC
`,
this.db.$queryRaw<{ conversations: bigint | number }[]>`
@@ -415,11 +429,24 @@ export class WorkspaceAnalyticsModel extends BaseModel {
topSharedLinksPromise,
]);
const currentWorkspaceStorageBytes = Number(
storageCurrent[0]?.workspaceStorageBytes ?? 0
);
const currentBlobStorageBytes = Number(
storageCurrent[0]?.blobStorageBytes ?? 0
);
const storageHistorySeries = storageHistory.map(row => ({
date: row.date,
workspaceStorageBytes: Number(row.workspaceStorageBytes ?? 0),
blobStorageBytes: Number(row.blobStorageBytes ?? 0),
}));
if (storageHistorySeries.length > 0) {
const lastPoint = storageHistorySeries[storageHistorySeries.length - 1];
if (asDateOnlyString(lastPoint.date) === asDateOnlyString(currentDay)) {
lastPoint.workspaceStorageBytes = currentWorkspaceStorageBytes;
lastPoint.blobStorageBytes = currentBlobStorageBytes;
}
}
return {
syncActiveUsers: Number(syncCurrent[0]?.activeUsers ?? 0),
@@ -436,10 +463,8 @@ export class WorkspaceAnalyticsModel extends BaseModel {
effectiveSize: syncHistoryHours,
},
copilotConversations: Number(copilotCount[0]?.conversations ?? 0),
workspaceStorageBytes: Number(
storageCurrent[0]?.workspaceStorageBytes ?? 0
),
blobStorageBytes: Number(storageCurrent[0]?.blobStorageBytes ?? 0),
workspaceStorageBytes: currentWorkspaceStorageBytes,
blobStorageBytes: currentBlobStorageBytes,
workspaceStorageHistory: storageHistorySeries.map(row => ({
date: row.date,
value: row.workspaceStorageBytes,