chore: improve event flow (#14266)

This commit is contained in:
DarkSky
2026-01-16 16:07:27 +08:00
committed by GitHub
parent d4581b839a
commit 924d58603f
43 changed files with 2306 additions and 567 deletions

View File

@@ -4,6 +4,7 @@ import {
} from 'socket.io-client';
import { AutoReconnectConnection } from '../../connection';
import type { TelemetryAck, TelemetryBatch } from '../../telemetry/types';
import { throwIfAborted } from '../../utils/throw-if-aborted';
// TODO(@forehalo): use [UserFriendlyError]
@@ -104,6 +105,8 @@ interface ClientEvents {
},
];
'space:delete-doc': { spaceType: string; spaceId: string; docId: string };
'telemetry:batch': [TelemetryBatch, TelemetryAck];
}
export type ServerEventsMap = {

View File

@@ -0,0 +1,75 @@
import 'fake-indexeddb/auto';
import { expect, test, vi } from 'vitest';
import { TelemetryManager } from '../manager';
import type { TelemetryContext, TelemetryEvent } from '../types';
const context: TelemetryContext = {
isAuthed: false,
isSelfHosted: false,
channel: 'stable',
officialEndpoint: 'https://example.com',
};
const baseEvent: TelemetryEvent = {
schemaVersion: 1,
eventName: 'openDoc',
clientId: 'client-1',
eventId: 'event-1',
};
test('telemetry manager retries with backoff and flushes on success', async () => {
const fetchMock = vi
.fn()
.mockResolvedValueOnce({
ok: false,
status: 500,
text: async () => 'fail',
})
.mockResolvedValueOnce({
ok: true,
json: async () => ({ ok: true, accepted: 1, dropped: 0 }),
});
globalThis.fetch = fetchMock as any;
(globalThis as any).BUILD_CONFIG = { appVersion: 'test' };
const manager = new TelemetryManager({
retryBaseMs: 10,
retryMaxMs: 10,
maxBatchEvents: 5,
});
await manager.setContext(context);
await manager.track(baseEvent);
const first = await manager.flush();
expect(first.ok).toBe(false);
expect(manager.getQueueState().size).toBe(1);
expect(manager.getQueueState().nextRetryAt).toBeDefined();
await vi.waitFor(() => {
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(manager.getQueueState().size).toBe(0);
});
});
test('telemetry queue caps entries and drops oldest', async () => {
(globalThis as any).BUILD_CONFIG = { appVersion: 'test' };
globalThis.fetch = vi.fn() as any;
const manager = new TelemetryManager({
maxQueueEntries: 2,
maxQueueBytes: 10_000,
});
await manager.setContext({
...context,
officialEndpoint: '',
});
await manager.track({ ...baseEvent, eventId: 'event-1' });
await manager.track({ ...baseEvent, eventId: 'event-2' });
await manager.track({ ...baseEvent, eventId: 'event-3' });
expect(manager.getQueueState().size).toBe(2);
});

View File

@@ -0,0 +1,316 @@
import { SocketConnection } from '../impls/cloud/socket';
import { TelemetryQueue } from './queue';
import type {
TelemetryAck,
TelemetryBatch,
TelemetryContext,
TelemetryEvent,
} from './types';
const DEFAULT_MAX_QUEUE_ENTRIES = 2000;
const DEFAULT_MAX_QUEUE_BYTES = 2 * 1024 * 1024;
const DEFAULT_MAX_BATCH_EVENTS = 25;
const DEFAULT_RETRY_BASE_MS = 1000;
const DEFAULT_RETRY_MAX_MS = 5 * 60 * 1000;
type TelemetryManagerOptions = {
maxQueueEntries?: number;
maxQueueBytes?: number;
maxBatchEvents?: number;
retryBaseMs?: number;
retryMaxMs?: number;
};
export class TelemetryManager {
private context: TelemetryContext = {
isAuthed: false,
isSelfHosted: false,
channel: 'stable',
officialEndpoint: '',
};
private readonly queue: TelemetryQueue;
private readonly maxBatchEvents: number;
private readonly retryBaseMs: number;
private readonly retryMaxMs: number;
private retryAttempt = 0;
private retryTimer: ReturnType<typeof setTimeout> | null = null;
private nextRetryAt?: number;
private lastError?: string;
private flushPromise?: Promise<TelemetryAck>;
private socketConnection?: SocketConnection;
private socketEndpoint?: string;
constructor(options: TelemetryManagerOptions = {}) {
const maxQueueEntries =
options.maxQueueEntries ?? DEFAULT_MAX_QUEUE_ENTRIES;
const maxQueueBytes = options.maxQueueBytes ?? DEFAULT_MAX_QUEUE_BYTES;
this.queue = new TelemetryQueue(maxQueueEntries, maxQueueBytes);
this.maxBatchEvents = options.maxBatchEvents ?? DEFAULT_MAX_BATCH_EVENTS;
this.retryBaseMs = options.retryBaseMs ?? DEFAULT_RETRY_BASE_MS;
this.retryMaxMs = options.retryMaxMs ?? DEFAULT_RETRY_MAX_MS;
}
async setContext(context: TelemetryContext) {
this.context = { ...context };
this.updateSocketConnection();
this.scheduleFlush(true);
}
async track(event: TelemetryEvent) {
await this.queue.enqueue(event);
this.scheduleFlush(false);
return { queued: true };
}
async pageview(event: TelemetryEvent) {
return this.track(event);
}
async flush(): Promise<TelemetryAck> {
if (this.flushPromise) {
return this.flushPromise;
}
this.flushPromise = this.flushInternal().finally(() => {
this.flushPromise = undefined;
});
return this.flushPromise;
}
getQueueState() {
return {
size: this.queue.size,
lastError: this.lastError,
nextRetryAt: this.nextRetryAt,
};
}
private async flushInternal(): Promise<TelemetryAck> {
if (!this.context.officialEndpoint) {
return {
ok: false,
error: {
name: 'TelemetryEndpointMissing',
message: 'Telemetry official endpoint is not configured',
},
};
}
let accepted = 0;
let dropped = 0;
while (true) {
const items = await this.queue.peek(this.maxBatchEvents);
if (!items.length) {
this.resetRetry();
return { ok: true, accepted, dropped };
}
const events = items.map(item => this.mergeContext(item.event));
const ack = await this.sendBatch(events);
if (!ack.ok) {
this.recordFailure(ack.error.message);
return ack;
}
accepted += ack.accepted;
dropped += ack.dropped;
await this.queue.remove(items.map(item => item.id));
}
}
private mergeContext(event: TelemetryEvent): TelemetryEvent {
const mergedUserProps = {
...(this.context.userProperties ?? {}),
...(event.userProperties ?? {}),
};
const mergedContext = {
...event.context,
channel: event.context?.channel ?? this.context.channel,
};
return {
...event,
schemaVersion: 1,
userId: event.userId ?? this.context.userId,
userProperties: mergedUserProps,
context: mergedContext,
};
}
private async sendBatch(events: TelemetryEvent[]): Promise<TelemetryAck> {
const useWebsocket = this.context.isAuthed && !this.context.isSelfHosted;
const transport = useWebsocket ? 'ws' : 'http';
const batch: TelemetryBatch = {
schemaVersion: 1,
transport,
sentAt: Date.now(),
events,
};
try {
if (useWebsocket) {
return await this.sendWs(batch);
}
return await this.sendHttp(batch);
} catch (error) {
const err = error as Error;
return {
ok: false,
error: {
name: err?.name ?? 'TelemetrySendError',
message: err?.message ?? 'Telemetry send failed',
},
};
}
}
private async sendHttp(batch: TelemetryBatch): Promise<TelemetryAck> {
const url = new URL(
'/api/telemetry/collect',
this.context.officialEndpoint
);
const abortController = new AbortController();
const timeoutId = setTimeout(() => {
abortController.abort();
}, 10000);
const response = await fetch(url, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-affine-version': BUILD_CONFIG.appVersion,
},
body: JSON.stringify(batch),
signal: abortController.signal,
});
if (!response.ok) {
const text = await response.text().catch(() => '');
throw new Error(
`Telemetry HTTP failed with ${response.status}: ${text || 'unknown error'}`
);
} else {
clearTimeout(timeoutId);
}
const payload = (await response.json().catch(() => null)) as TelemetryAck;
if (!payload || typeof payload.ok !== 'boolean') {
throw new Error('Invalid telemetry response');
}
return payload;
}
private async sendWs(batch: TelemetryBatch): Promise<TelemetryAck> {
const socketConnection = this.ensureSocketConnection();
socketConnection.connect();
await socketConnection.waitForConnected();
const res = await socketConnection.inner.socket.emitWithAck(
'telemetry:batch',
batch
);
if ('error' in res) {
return {
ok: false,
error: {
name: res.error.name ?? 'TelemetryWebsocketError',
message: res.error.message ?? 'Telemetry websocket error',
},
};
}
return res.data as TelemetryAck;
}
private scheduleFlush(force: boolean) {
if (force) {
this.clearRetry();
}
if (this.retryTimer && !force) {
return;
}
queueMicrotask(() => {
this.flush().catch(() => {
return;
});
});
}
private recordFailure(message: string) {
this.lastError = message;
const delay = this.nextBackoffDelay();
this.retryAttempt += 1;
this.nextRetryAt = Date.now() + delay;
this.clearRetry();
this.retryTimer = setTimeout(() => {
this.retryTimer = null;
this.flush().catch(() => {
return;
});
}, delay);
}
private resetRetry() {
this.retryAttempt = 0;
this.nextRetryAt = undefined;
this.lastError = undefined;
this.clearRetry();
}
private clearRetry() {
if (this.retryTimer) {
clearTimeout(this.retryTimer);
this.retryTimer = null;
}
}
private nextBackoffDelay() {
const exp = Math.min(this.retryAttempt, 10);
const base = this.retryBaseMs * Math.pow(2, exp);
const delay = Math.min(this.retryMaxMs, base);
const jitter = Math.random() * delay * 0.2;
return delay + jitter;
}
private ensureSocketConnection() {
if (
this.socketConnection &&
this.socketEndpoint === this.context.officialEndpoint
) {
return this.socketConnection;
}
if (this.socketConnection) {
this.socketConnection.disconnect(true);
}
this.socketEndpoint = this.context.officialEndpoint;
this.socketConnection = new SocketConnection(
this.context.officialEndpoint,
this.context.isSelfHosted
);
return this.socketConnection;
}
private updateSocketConnection() {
const useWebsocket = this.context.isAuthed && !this.context.isSelfHosted;
if (!useWebsocket) {
if (this.socketConnection) {
this.socketConnection.disconnect(true);
}
this.socketConnection = undefined;
this.socketEndpoint = undefined;
return;
}
this.ensureSocketConnection();
}
}

View File

@@ -0,0 +1,132 @@
import { type DBSchema, openDB } from 'idb';
import type { TelemetryEvent } from './types';
interface TelemetryQueueDB extends DBSchema {
events: {
key: number;
value: {
id?: number;
event: TelemetryEvent;
size: number;
addedAt: number;
};
};
}
export type TelemetryQueueItem = {
id: number;
event: TelemetryEvent;
size: number;
addedAt: number;
};
export class TelemetryQueue {
private readonly dbPromise = openDB<TelemetryQueueDB>('affine-telemetry', 1, {
upgrade(db) {
if (!db.objectStoreNames.contains('events')) {
db.createObjectStore('events', { keyPath: 'id', autoIncrement: true });
}
},
});
private readonly ready = this.load();
private items: TelemetryQueueItem[] = [];
private totalSize = 0;
constructor(
private readonly maxEntries: number,
private readonly maxBytes: number
) {}
get size() {
return this.items.length;
}
async enqueue(event: TelemetryEvent) {
await this.ready;
const size = estimateSize(event);
const addedAt = Date.now();
const db = await this.dbPromise;
const id = await db.add('events', { event, size, addedAt });
const item = { id: Number(id), event, size, addedAt };
this.items.push(item);
this.totalSize += size;
await this.enforceLimits();
}
async peek(limit: number) {
await this.ready;
return this.items.slice(0, limit);
}
async remove(ids: number[]) {
await this.ready;
if (!ids.length) {
return;
}
const db = await this.dbPromise;
const tx = db.transaction('events', 'readwrite');
await Promise.all(ids.map(id => tx.store.delete(id)));
await tx.done;
const removeSet = new Set(ids);
this.items = this.items.filter(item => {
if (removeSet.has(item.id)) {
this.totalSize -= item.size;
return false;
}
return true;
});
}
private async load() {
const db = await this.dbPromise;
const all = await db.getAll('events');
this.items = all
.filter(item => typeof item.id === 'number')
.map(item => ({
id: item.id as number,
event: item.event,
size: item.size,
addedAt: item.addedAt,
}))
.sort((a, b) => a.id - b.id);
this.totalSize = this.items.reduce((sum, item) => sum + item.size, 0);
}
private async enforceLimits() {
if (
this.items.length <= this.maxEntries &&
this.totalSize <= this.maxBytes
) {
return;
}
const db = await this.dbPromise;
const tx = db.transaction('events', 'readwrite');
const deletions: Promise<unknown>[] = [];
while (
this.items.length > this.maxEntries ||
this.totalSize > this.maxBytes
) {
const removed = this.items.shift();
if (!removed) {
break;
}
this.totalSize -= removed.size;
deletions.push(tx.store.delete(removed.id));
}
await Promise.all(deletions);
await tx.done;
}
}
function estimateSize(event: TelemetryEvent) {
try {
return JSON.stringify(event).length;
} catch {
return 0;
}
}

View File

@@ -0,0 +1,50 @@
export type TelemetryEvent = {
schemaVersion: 1;
eventName: string;
params?: Record<string, unknown>;
userProperties?: Record<string, unknown>;
userId?: string;
clientId: string;
sessionId?: string;
eventId: string;
timestampMicros?: number;
context?: {
appVersion?: string;
editorVersion?: string;
environment?: string;
distribution?: string;
channel?: 'stable' | 'beta' | 'internal' | 'canary';
isDesktop?: boolean;
isMobile?: boolean;
locale?: string;
timezone?: string;
url?: string;
referrer?: string;
};
};
export type TelemetryBatch = {
schemaVersion: 1;
transport: 'http' | 'ws';
sentAt: number;
events: TelemetryEvent[];
};
export type TelemetryAck =
| { ok: true; accepted: number; dropped: number }
| { ok: false; error: { name: string; message: string } };
export interface TelemetryContext {
isAuthed: boolean;
isSelfHosted: boolean;
channel: 'stable' | 'beta' | 'internal' | 'canary';
userId?: string;
userProperties?: Record<string, unknown>;
officialEndpoint: string;
}
export interface TelemetryQueueState {
size: number;
lastError?: string;
nextRetryAt?: number;
}

View File

@@ -28,6 +28,12 @@ import type { AwarenessSync } from '../sync/awareness';
import type { BlobSync } from '../sync/blob';
import type { DocSync } from '../sync/doc';
import type { IndexerPreferOptions, IndexerSync } from '../sync/indexer';
import type {
TelemetryAck,
TelemetryContext,
TelemetryEvent,
TelemetryQueueState,
} from '../telemetry/types';
import type { StoreInitOptions, WorkerManagerOps, WorkerOps } from './ops';
export type { StoreInitOptions as WorkerInitOptions } from './ops';
@@ -41,7 +47,11 @@ export class StoreManagerClient {
}
>();
constructor(private readonly client: OpClient<WorkerManagerOps>) {}
constructor(private readonly client: OpClient<WorkerManagerOps>) {
this.telemetry = new TelemetryClient(this.client);
}
readonly telemetry: TelemetryClient;
open(key: string, options: StoreInitOptions) {
const { port1, port2 } = new MessageChannel();
@@ -104,6 +114,30 @@ export class StoreManagerClient {
}
}
class TelemetryClient {
constructor(private readonly client: OpClient<WorkerManagerOps>) {}
setContext(context: TelemetryContext): Promise<void> {
return this.client.call('telemetry.setContext', context);
}
track(event: TelemetryEvent): Promise<{ queued: boolean }> {
return this.client.call('telemetry.track', event);
}
pageview(event: TelemetryEvent): Promise<{ queued: boolean }> {
return this.client.call('telemetry.pageview', event);
}
flush(): Promise<TelemetryAck> {
return this.client.call('telemetry.flush');
}
getQueueState(): Promise<TelemetryQueueState> {
return this.client.call('telemetry.getQueueState');
}
}
export class StoreClient {
constructor(private readonly client: OpClient<WorkerOps>) {
this.docStorage = new WorkerDocStorage(this.client);

View File

@@ -6,6 +6,7 @@ import { SpaceStorage } from '../storage';
import type { AwarenessRecord } from '../storage/awareness';
import { Sync } from '../sync';
import type { PeerStorageOptions } from '../sync/types';
import { TelemetryManager } from '../telemetry/manager';
import { MANUALLY_STOP } from '../utils/throw-if-aborted';
import type { StoreInitOptions, WorkerManagerOps, WorkerOps } from './ops';
@@ -338,6 +339,7 @@ export class StoreManagerConsumer {
string,
{ store: StoreConsumer; refCount: number }
>();
private readonly telemetry = new TelemetryManager();
constructor(
private readonly availableStorageImplementations: StorageConstructor[]
@@ -386,6 +388,11 @@ export class StoreManagerConsumer {
workerDisposer();
this.storeDisposers.delete(key);
},
'telemetry.setContext': context => this.telemetry.setContext(context),
'telemetry.track': event => this.telemetry.track(event),
'telemetry.pageview': event => this.telemetry.pageview(event),
'telemetry.flush': () => this.telemetry.flush(),
'telemetry.getQueueState': () => this.telemetry.getQueueState(),
});
}
}

View File

@@ -16,6 +16,12 @@ import type { AwarenessRecord } from '../storage/awareness';
import type { BlobSyncBlobState, BlobSyncState } from '../sync/blob';
import type { DocSyncDocState, DocSyncState } from '../sync/doc';
import type { IndexerDocSyncState, IndexerSyncState } from '../sync/indexer';
import type {
TelemetryAck,
TelemetryContext,
TelemetryEvent,
TelemetryQueueState,
} from '../telemetry/types';
type StorageInitOptions = Values<{
[key in keyof AvailableStorageImplementations]: {
@@ -178,4 +184,9 @@ export type WorkerManagerOps = {
string,
];
close: [string, void];
'telemetry.setContext': [TelemetryContext, void];
'telemetry.track': [TelemetryEvent, { queued: boolean }];
'telemetry.pageview': [TelemetryEvent, { queued: boolean }];
'telemetry.flush': [void, TelemetryAck];
'telemetry.getQueueState': [void, TelemetryQueueState];
};