diff --git a/packages/backend/server/src/metrics/utils.ts b/packages/backend/server/src/metrics/utils.ts index f2fb599e0f..a4ee1ab86a 100644 --- a/packages/backend/server/src/metrics/utils.ts +++ b/packages/backend/server/src/metrics/utils.ts @@ -1,4 +1,8 @@ -import { Counter, Gauge, Summary } from 'prom-client'; +import { Counter, Gauge, register, Summary } from 'prom-client'; + +function getOr(name: string, or: () => T): T { + return (register.getSingleMetric(name) as T) || or(); +} type LabelValues = Partial>; type MetricsCreator = ( @@ -14,11 +18,15 @@ export const metricsCreatorGenerator = () => { name: string, labelNames?: T[] ): MetricsCreator => { - const counter = new Counter({ + const counter = getOr( name, - help: name, - ...(labelNames ? { labelNames } : {}), - }); + () => + new Counter({ + name, + help: name, + ...(labelNames ? { labelNames } : {}), + }) + ); return (value: number, labels: LabelValues) => { counter.inc(labels, value); @@ -29,11 +37,15 @@ export const metricsCreatorGenerator = () => { name: string, labelNames?: T[] ): MetricsCreator => { - const gauge = new Gauge({ + const gauge = getOr( name, - help: name, - ...(labelNames ? { labelNames } : {}), - }); + () => + new Gauge({ + name, + help: name, + ...(labelNames ? { labelNames } : {}), + }) + ); return (value: number, labels: LabelValues) => { gauge.set(labels, value); @@ -44,11 +56,15 @@ export const metricsCreatorGenerator = () => { name: string, labelNames?: T[] ): TimerMetricsCreator => { - const summary = new Summary({ + const summary = getOr( name, - help: name, - ...(labelNames ? { labelNames } : {}), - }); + () => + new Summary({ + name, + help: name, + ...(labelNames ? { labelNames } : {}), + }) + ); return (labels: LabelValues) => { const now = process.hrtime(); @@ -71,3 +87,68 @@ export const metricsCreatorGenerator = () => { }; export const metricsCreator = metricsCreatorGenerator(); + +export const CallTimer = ( + name: string, + labels: Record = {} +): MethodDecorator => { + const timer = metricsCreator.timer(name, Object.keys(labels)); + + // @ts-expect-error allow + return ( + _target, + _key, + desc: TypedPropertyDescriptor<(...args: any[]) => any> + ) => { + const originalMethod = desc.value; + if (!originalMethod) { + return desc; + } + + desc.value = function (...args: any[]) { + const endTimer = timer(labels); + let result: any; + try { + result = originalMethod.apply(this, args); + } catch (e) { + endTimer(); + throw e; + } + + if (result instanceof Promise) { + return result.finally(endTimer); + } else { + endTimer(); + return result; + } + }; + + return desc; + }; +}; + +export const CallCounter = ( + name: string, + labels: Record = {} +): MethodDecorator => { + const count = metricsCreator.counter(name, Object.keys(labels)); + + // @ts-expect-error allow + return ( + _target, + _key, + desc: TypedPropertyDescriptor<(...args: any[]) => any> + ) => { + const originalMethod = desc.value; + if (!originalMethod) { + return desc; + } + + desc.value = function (...args: any[]) { + count(1, labels); + return originalMethod.apply(this, args); + }; + + return desc; + }; +}; diff --git a/packages/backend/server/src/modules/doc/manager.ts b/packages/backend/server/src/modules/doc/manager.ts index 81f519cfb1..e27ec546b4 100644 --- a/packages/backend/server/src/modules/doc/manager.ts +++ b/packages/backend/server/src/modules/doc/manager.ts @@ -6,6 +6,7 @@ import { OnModuleInit, } from '@nestjs/common'; import { Snapshot, Update } from '@prisma/client'; +import { chunk } from 'lodash-es'; import { defer, retry } from 'rxjs'; import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector } from 'yjs'; @@ -89,10 +90,10 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { protected applyUpdates(guid: string, ...updates: Buffer[]): Doc { const doc = this.recoverDoc(...updates); - this.metrics.jwstCodecMerge(1, {}); // test jwst codec if (this.config.doc.manager.experimentalMergeWithJwstCodec) { + this.metrics.jwstCodecMerge(1, {}); const yjsResult = Buffer.from(encodeStateAsUpdate(doc)); let log = false; try { @@ -163,7 +164,12 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { /** * add update to manager for later processing. */ - async push(workspaceId: string, guid: string, update: Buffer) { + async push( + workspaceId: string, + guid: string, + update: Buffer, + retryTimes = 10 + ) { await new Promise((resolve, reject) => { defer(async () => { const seq = await this.getUpdateSeq(workspaceId, guid); @@ -176,7 +182,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { }, }); }) - .pipe(retry(MAX_SEQ_NUM)) // retry until seq num not conflict + .pipe(retry(retryTimes)) // retry until seq num not conflict .subscribe({ next: () => { this.logger.verbose( @@ -184,7 +190,54 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { ); resolve(); }, - error: reject, + error: e => { + this.logger.error('Failed to push updates', e); + reject(new Error('Failed to push update')); + }, + }); + }); + } + + async batchPush( + workspaceId: string, + guid: string, + updates: Buffer[], + retryTimes = 10 + ) { + await new Promise((resolve, reject) => { + defer(async () => { + const seq = await this.getUpdateSeq(workspaceId, guid, updates.length); + let turn = 0; + const batchCount = 10; + for (const batch of chunk(updates, batchCount)) { + await this.db.update.createMany({ + data: batch.map((update, i) => ({ + workspaceId, + id: guid, + // `seq` is the last seq num of the batch + // example for 11 batched updates, start from seq num 20 + // seq for first update in the batch should be: + // 31 - 11 + 0 * 10 + 0 + 1 = 21 + // ^ last seq num ^ updates.length ^ turn ^ batchCount ^i + seq: seq - updates.length + turn * batchCount + i + 1, + blob: update, + })), + }); + turn++; + } + }) + .pipe(retry(retryTimes)) // retry until seq num not conflict + .subscribe({ + next: () => { + this.logger.verbose( + `pushed updates for workspace: ${workspaceId}, guid: ${guid}` + ); + resolve(); + }, + error: e => { + this.logger.error('Failed to push updates', e); + reject(new Error('Failed to push update')); + }, }); }); } @@ -370,7 +423,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { return doc; } - private async getUpdateSeq(workspaceId: string, guid: string) { + private async getUpdateSeq(workspaceId: string, guid: string, batch = 1) { try { const { seq } = await this.db.snapshot.update({ select: { @@ -384,13 +437,13 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { }, data: { seq: { - increment: 1, + increment: batch, }, }, }); // reset - if (seq === MAX_SEQ_NUM) { + if (seq >= MAX_SEQ_NUM) { await this.db.snapshot.update({ where: { id_workspaceId: { @@ -406,9 +459,10 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { return seq; } catch { + // not existing snapshot just count it from 1 const last = this.seqMap.get(workspaceId + guid) ?? 0; - this.seqMap.set(workspaceId + guid, last + 1); - return last + 1; + this.seqMap.set(workspaceId + guid, last + batch); + return last + batch; } } } diff --git a/packages/backend/server/src/modules/sync/events/error.ts b/packages/backend/server/src/modules/sync/events/error.ts new file mode 100644 index 0000000000..4d832e0e12 --- /dev/null +++ b/packages/backend/server/src/modules/sync/events/error.ts @@ -0,0 +1,81 @@ +enum EventErrorCode { + WORKSPACE_NOT_FOUND = 'WORKSPACE_NOT_FOUND', + DOC_NOT_FOUND = 'DOC_NOT_FOUND', + NOT_IN_WORKSPACE = 'NOT_IN_WORKSPACE', + ACCESS_DENIED = 'ACCESS_DENIED', + INTERNAL = 'INTERNAL', + VERSION_REJECTED = 'VERSION_REJECTED', +} + +// Such errore are generally raised from the gateway handling to user, +// the stack must be full of internal code, +// so there is no need to inherit from `Error` class. +export class EventError { + constructor( + public readonly code: EventErrorCode, + public readonly message: string + ) {} + + toJSON() { + return { + code: this.code, + message: this.message, + }; + } +} + +export class WorkspaceNotFoundError extends EventError { + constructor(public readonly workspaceId: string) { + super( + EventErrorCode.WORKSPACE_NOT_FOUND, + `You are trying to access an unknown workspace ${workspaceId}.` + ); + } +} + +export class DocNotFoundError extends EventError { + constructor( + public readonly workspaceId: string, + public readonly docId: string + ) { + super( + EventErrorCode.DOC_NOT_FOUND, + `You are trying to access an unknown doc ${docId} under workspace ${workspaceId}.` + ); + } +} + +export class NotInWorkspaceError extends EventError { + constructor(public readonly workspaceId: string) { + super( + EventErrorCode.NOT_IN_WORKSPACE, + `You should join in workspace ${workspaceId} before broadcasting messages.` + ); + } +} + +export class AccessDeniedError extends EventError { + constructor(public readonly workspaceId: string) { + super( + EventErrorCode.ACCESS_DENIED, + `You have no permission to access workspace ${workspaceId}.` + ); + } +} + +export class InternalError extends EventError { + constructor(public readonly error: Error) { + super(EventErrorCode.INTERNAL, `Internal error happened: ${error.message}`); + } +} + +export class VersionRejectedError extends EventError { + constructor(public readonly version: number) { + super( + EventErrorCode.VERSION_REJECTED, + // TODO: Too general error message, + // need to be more specific when versioning system is implemented. + `The version ${version} is rejected by server.` + ); + } +} diff --git a/packages/backend/server/src/modules/sync/events/events.gateway.ts b/packages/backend/server/src/modules/sync/events/events.gateway.ts index 6334fd3804..df2725a00a 100644 --- a/packages/backend/server/src/modules/sync/events/events.gateway.ts +++ b/packages/backend/server/src/modules/sync/events/events.gateway.ts @@ -1,10 +1,10 @@ -import { Logger } from '@nestjs/common'; +import { applyDecorators, Logger } from '@nestjs/common'; import { ConnectedSocket, MessageBody, OnGatewayConnection, OnGatewayDisconnect, - SubscribeMessage, + SubscribeMessage as RawSubscribeMessage, WebSocketGateway, WebSocketServer, } from '@nestjs/websockets'; @@ -12,12 +12,40 @@ import { Server, Socket } from 'socket.io'; import { encodeStateAsUpdate, encodeStateVector } from 'yjs'; import { Metrics } from '../../../metrics/metrics'; +import { CallCounter, CallTimer } from '../../../metrics/utils'; import { DocID } from '../../../utils/doc'; import { Auth, CurrentUser } from '../../auth'; import { DocManager } from '../../doc'; import { UserType } from '../../users'; import { PermissionService } from '../../workspaces/permission'; import { Permission } from '../../workspaces/types'; +import { + AccessDeniedError, + DocNotFoundError, + EventError, + InternalError, + NotInWorkspaceError, + WorkspaceNotFoundError, +} from './error'; + +const SubscribeMessage = (event: string) => + applyDecorators( + CallCounter('socket_io_counter', { event }), + CallTimer('socket_io_timer', { event }), + RawSubscribeMessage(event) + ); + +type EventResponse = + | { + error: EventError; + } + | (Data extends never + ? { + data?: never; + } + : { + data: Data; + }); @WebSocketGateway({ cors: process.env.NODE_ENV !== 'production', @@ -52,38 +80,50 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { @CurrentUser() user: UserType, @MessageBody() workspaceId: string, @ConnectedSocket() client: Socket - ) { - this.metric.socketIOEventCounter(1, { event: 'client-handshake' }); - const endTimer = this.metric.socketIOEventTimer({ - event: 'client-handshake', - }); - + ): Promise> { const canWrite = await this.permissions.tryCheck( workspaceId, user.id, Permission.Write ); - if (canWrite) await client.join(workspaceId); - endTimer(); - return canWrite; + if (canWrite) { + await client.join(workspaceId); + return { + data: { + clientId: client.id, + }, + }; + } else { + return { + error: new AccessDeniedError(workspaceId), + }; + } } @SubscribeMessage('client-leave') async handleClientLeave( @MessageBody() workspaceId: string, @ConnectedSocket() client: Socket - ) { - this.metric.socketIOEventCounter(1, { event: 'client-leave' }); - const endTimer = this.metric.socketIOEventTimer({ - event: 'client-leave', - }); - await client.leave(workspaceId); - endTimer(); + ): Promise { + if (client.rooms.has(workspaceId)) { + await client.leave(workspaceId); + return {}; + } else { + return { + error: new NotInWorkspaceError(workspaceId), + }; + } } + /** + * This is the old version of the `client-update` event without any data protocol. + * It only exists for backwards compatibility to adapt older clients. + * + * @deprecated + */ @SubscribeMessage('client-update') - async handleClientUpdate( + async handleClientUpdateV1( @MessageBody() { workspaceId, @@ -96,31 +136,37 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { }, @ConnectedSocket() client: Socket ) { - this.metric.socketIOEventCounter(1, { event: 'client-update' }); - const endTimer = this.metric.socketIOEventTimer({ event: 'client-update' }); - if (!client.rooms.has(workspaceId)) { this.logger.verbose( `Client ${client.id} tried to push update to workspace ${workspaceId} without joining it first` ); - endTimer(); return; } const docId = new DocID(guid, workspaceId); + client .to(docId.workspace) .emit('server-update', { workspaceId, guid, update }); - const buf = Buffer.from(update, 'base64'); + // broadcast to all clients with newer version that only listen to `server-updates` + client + .to(docId.workspace) + .emit('server-updates', { workspaceId, guid, updates: [update] }); + const buf = Buffer.from(update, 'base64'); await this.docManager.push(docId.workspace, docId.guid, buf); - endTimer(); } + /** + * This is the old version of the `doc-load` event without any data protocol. + * It only exists for backwards compatibility to adapt older clients. + * + * @deprecated + */ @Auth() @SubscribeMessage('doc-load') - async loadDoc( + async loadDocV1( @ConnectedSocket() client: Socket, @CurrentUser() user: UserType, @MessageBody() @@ -134,12 +180,9 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { stateVector?: string; } ): Promise<{ missing: string; state?: string } | false> { - this.metric.socketIOEventCounter(1, { event: 'doc-load' }); - const endTimer = this.metric.socketIOEventTimer({ event: 'doc-load' }); if (!client.rooms.has(workspaceId)) { const canRead = await this.permissions.tryCheck(workspaceId, user.id); if (!canRead) { - endTimer(); return false; } } @@ -148,7 +191,6 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { const doc = await this.docManager.get(docId.workspace, docId.guid); if (!doc) { - endTimer(); return false; } @@ -160,53 +202,138 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { ).toString('base64'); const state = Buffer.from(encodeStateVector(doc)).toString('base64'); - endTimer(); return { missing, state, }; } + @SubscribeMessage('client-update-v2') + async handleClientUpdateV2( + @MessageBody() + { + workspaceId, + guid, + updates, + }: { + workspaceId: string; + guid: string; + updates: string[]; + }, + @ConnectedSocket() client: Socket + ): Promise> { + if (!client.rooms.has(workspaceId)) { + return { + error: new NotInWorkspaceError(workspaceId), + }; + } + + try { + const docId = new DocID(guid, workspaceId); + client + .to(docId.workspace) + .emit('server-updates', { workspaceId, guid, updates }); + + const buffers = updates.map(update => Buffer.from(update, 'base64')); + + await this.docManager.batchPush(docId.workspace, docId.guid, buffers); + return { + data: { + accepted: true, + }, + }; + } catch (e) { + return { + error: new InternalError(e as Error), + }; + } + } + + @Auth() + @SubscribeMessage('doc-load-v2') + async loadDocV2( + @ConnectedSocket() client: Socket, + @CurrentUser() user: UserType, + @MessageBody() + { + workspaceId, + guid, + stateVector, + }: { + workspaceId: string; + guid: string; + stateVector?: string; + } + ): Promise> { + if (!client.rooms.has(workspaceId)) { + const canRead = await this.permissions.tryCheck(workspaceId, user.id); + if (!canRead) { + return { + error: new AccessDeniedError(workspaceId), + }; + } + } + + const docId = new DocID(guid, workspaceId); + const doc = await this.docManager.get(docId.workspace, docId.guid); + + if (!doc) { + return { + error: docId.isWorkspace + ? new WorkspaceNotFoundError(workspaceId) + : new DocNotFoundError(workspaceId, docId.guid), + }; + } + + const missing = Buffer.from( + encodeStateAsUpdate( + doc, + stateVector ? Buffer.from(stateVector, 'base64') : undefined + ) + ).toString('base64'); + const state = Buffer.from(encodeStateVector(doc)).toString('base64'); + + return { + data: { + missing, + state, + }, + }; + } + @SubscribeMessage('awareness-init') async handleInitAwareness( @MessageBody() workspaceId: string, @ConnectedSocket() client: Socket - ) { - this.metric.socketIOEventCounter(1, { event: 'awareness-init' }); - const endTimer = this.metric.socketIOEventTimer({ - event: 'init-awareness', - }); + ): Promise> { if (client.rooms.has(workspaceId)) { client.to(workspaceId).emit('new-client-awareness-init'); + return { + data: { + clientId: client.id, + }, + }; } else { - this.logger.verbose( - `Client ${client.id} tried to init awareness for workspace ${workspaceId} without joining it first` - ); + return { + error: new NotInWorkspaceError(workspaceId), + }; } - endTimer(); } @SubscribeMessage('awareness-update') async handleHelpGatheringAwareness( @MessageBody() message: { workspaceId: string; awarenessUpdate: string }, @ConnectedSocket() client: Socket - ) { - this.metric.socketIOEventCounter(1, { event: 'awareness-update' }); - const endTimer = this.metric.socketIOEventTimer({ - event: 'awareness-update', - }); - + ): Promise { if (client.rooms.has(message.workspaceId)) { - client.to(message.workspaceId).emit('server-awareness-broadcast', { - ...message, - }); + client + .to(message.workspaceId) + .emit('server-awareness-broadcast', message); + return {}; } else { - this.logger.verbose( - `Client ${client.id} tried to update awareness for workspace ${message.workspaceId} without joining it first` - ); + return { + error: new NotInWorkspaceError(message.workspaceId), + }; } - - endTimer(); - return 'ack'; } } diff --git a/packages/backend/server/tests/doc.spec.ts b/packages/backend/server/tests/doc.spec.ts index 0f3dd67905..5f18bedbed 100644 --- a/packages/backend/server/tests/doc.spec.ts +++ b/packages/backend/server/tests/doc.spec.ts @@ -225,6 +225,31 @@ test('should have sequential update number', async t => { t.not(records.length, 0); }); +test('should have correct sequential update number with batching push', async t => { + const manager = m.get(DocManager); + const doc = new YDoc(); + const text = doc.getText('content'); + const updates: Buffer[] = []; + + doc.on('update', update => { + updates.push(Buffer.from(update)); + }); + + text.insert(0, 'hello'); + text.insert(5, 'world'); + text.insert(5, ' '); + + await manager.batchPush('2', '2', updates); + + // [1,2,3] + const records = await manager.getUpdates('2', '2'); + + t.deepEqual( + records.map(({ seq }) => seq), + [1, 2, 3] + ); +}); + test('should retry if seq num conflict', async t => { const manager = m.get(DocManager); @@ -240,3 +265,19 @@ test('should retry if seq num conflict', async t => { t.is(stub.callCount, 3); }); + +test('should throw if meet max retry times', async t => { + const manager = m.get(DocManager); + + // @ts-expect-error private method + const stub = Sinon.stub(manager, 'getUpdateSeq'); + + stub.resolves(1); + await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0]))); + + await t.throwsAsync( + () => manager.push('1', '1', Buffer.from([0, 0]), 3 /* retry 3 times */), + { message: 'Failed to push update' } + ); + t.is(stub.callCount, 5); +}); diff --git a/packages/frontend/workspace/package.json b/packages/frontend/workspace/package.json index 7a3cf9af3c..4a9293c246 100644 --- a/packages/frontend/workspace/package.json +++ b/packages/frontend/workspace/package.json @@ -27,6 +27,7 @@ "js-base64": "^3.7.5", "ky": "^1.0.1", "lib0": "^0.2.87", + "lodash-es": "^4.17.21", "nanoid": "^5.0.1", "next-auth": "^4.23.2", "react": "18.2.0", diff --git a/packages/frontend/workspace/src/affine/batch-sync-sender.ts b/packages/frontend/workspace/src/affine/batch-sync-sender.ts new file mode 100644 index 0000000000..daebb9bc53 --- /dev/null +++ b/packages/frontend/workspace/src/affine/batch-sync-sender.ts @@ -0,0 +1,107 @@ +interface SyncUpdateSender { + ( + guid: string, + updates: Uint8Array[] + ): Promise<{ + accepted: boolean; + retry: boolean; + }>; +} + +/** + * BatchSyncSender is simple wrapper with vanilla update sync with several advanced features: + * - ACK mechanism, send updates sequentially with previous sync request correctly responds with ACK + * - batching updates, when waiting for previous ACK, new updates will be buffered and sent in single sync request + * - retryable, allow retry when previous sync request failed but with retry flag been set to true + */ +export class BatchSyncSender { + private buffered: Uint8Array[] = []; + private job: Promise | null = null; + private started = true; + + constructor( + private guid: string, + private readonly rawSender: SyncUpdateSender + ) {} + + send(update: Uint8Array) { + this.buffered.push(update); + this.next(); + return Promise.resolve(); + } + + stop() { + this.started = false; + } + + start() { + this.started = true; + this.next(); + } + + private next() { + if (!this.started || this.job || !this.buffered.length) { + return; + } + + const lastIndex = Math.min( + this.buffered.length - 1, + 99 /* max batch updates size */ + ); + const updates = this.buffered.slice(0, lastIndex + 1); + + if (updates.length) { + this.job = this.rawSender(this.guid, updates) + .then(({ accepted, retry }) => { + // remove pending updates if updates are accepted + if (accepted) { + this.buffered.splice(0, lastIndex + 1); + } + + // stop when previous sending failed and non-recoverable + if (accepted || retry) { + // avoid call stack overflow + setTimeout(() => { + this.next(); + }, 0); + } else { + this.stop(); + } + }) + .catch(() => { + this.stop(); + }) + .finally(() => { + this.job = null; + }); + } + } +} + +export class MultipleBatchSyncSender { + private senders: Record = {}; + + constructor(private readonly rawSender: SyncUpdateSender) {} + + async send(guid: string, update: Uint8Array) { + return this.getSender(guid).send(update); + } + + private getSender(guid: string) { + let sender = this.senders[guid]; + if (!sender) { + sender = new BatchSyncSender(guid, this.rawSender); + this.senders[guid] = sender; + } + + return sender; + } + + start() { + Object.values(this.senders).forEach(sender => sender.start()); + } + + stop() { + Object.values(this.senders).forEach(sender => sender.stop()); + } +} diff --git a/packages/frontend/workspace/src/affine/crud.ts b/packages/frontend/workspace/src/affine/crud.ts index 2e631f3d0e..0edbcf663e 100644 --- a/packages/frontend/workspace/src/affine/crud.ts +++ b/packages/frontend/workspace/src/affine/crud.ts @@ -59,14 +59,6 @@ export const CRUD: WorkspaceCRUD = { WorkspaceFlavour.AFFINE_CLOUD ); - const datasource = createAffineDataSource( - createWorkspace.id, - newBlockSuiteWorkspace.doc, - newBlockSuiteWorkspace.awarenessStore.awareness - ); - - await syncDataSourceFromDoc(upstreamWorkspace.doc, datasource); - Y.applyUpdate( newBlockSuiteWorkspace.doc, Y.encodeStateAsUpdate(upstreamWorkspace.doc) @@ -85,6 +77,16 @@ export const CRUD: WorkspaceCRUD = { }) ); + const datasource = createAffineDataSource( + createWorkspace.id, + newBlockSuiteWorkspace.doc, + newBlockSuiteWorkspace.awarenessStore.awareness + ); + + const disconnect = datasource.onDocUpdate(() => {}); + await syncDataSourceFromDoc(upstreamWorkspace.doc, datasource); + disconnect(); + const provider = createIndexedDBProvider( newBlockSuiteWorkspace.doc, DEFAULT_DB_NAME diff --git a/packages/frontend/workspace/src/affine/index.ts b/packages/frontend/workspace/src/affine/index.ts index 327c5ab7c4..c5d5796d65 100644 --- a/packages/frontend/workspace/src/affine/index.ts +++ b/packages/frontend/workspace/src/affine/index.ts @@ -1,4 +1,5 @@ import { DebugLogger } from '@affine/debug'; +import { isEqual } from 'lodash-es'; import type { Socket } from 'socket.io-client'; import { Manager } from 'socket.io-client'; import { @@ -10,6 +11,7 @@ import { import type { DocDataSource } from 'y-provider'; import type { Doc } from 'yjs'; +import { MultipleBatchSyncSender } from './batch-sync-sender'; import { type AwarenessChanges, base64ToUint8Array, @@ -41,8 +43,44 @@ export const createAffineDataSource = ( console.warn('important!! please use doc.guid as roomName'); } - logger.debug('createAffineDataSource', id, rootDoc.guid, awareness); + logger.debug('createAffineDataSource', id, rootDoc.guid); const socket = getIoManager().socket('/'); + const syncSender = new MultipleBatchSyncSender(async (guid, updates) => { + const payload = await Promise.all( + updates.map(update => uint8ArrayToBase64(update)) + ); + + return new Promise(resolve => { + socket.emit( + 'client-update-v2', + { + workspaceId: rootDoc.guid, + guid, + updates: payload, + }, + (response: { + // TODO: reuse `EventError` with server + error?: any; + data: any; + }) => { + // TODO: raise error with different code to users + if (response.error) { + logger.error('client-update-v2 error', { + workspaceId: rootDoc.guid, + guid, + response, + }); + } + + resolve({ + accepted: !response.error, + // TODO: reuse `EventError` with server + retry: response.error?.code === 'INTERNAL', + }); + } + ); + }); + }); return { get socket() { @@ -54,78 +92,93 @@ export const createAffineDataSource = ( : undefined; return new Promise((resolve, reject) => { - logger.debug('doc-load', { + logger.debug('doc-load-v2', { workspaceId: rootDoc.guid, guid, stateVector, }); socket.emit( - 'doc-load', + 'doc-load-v2', { workspaceId: rootDoc.guid, guid, stateVector, }, - (docState: Error | { missing: string; state: string } | null) => { + ( + response: // TODO: reuse `EventError` with server + { error: any } | { data: { missing: string; state: string } } + ) => { logger.debug('doc-load callback', { workspaceId: rootDoc.guid, guid, stateVector, - docState, + response, }); - if (docState instanceof Error) { - reject(docState); - return; - } - resolve( - docState - ? { - missing: base64ToUint8Array(docState.missing), - state: docState.state - ? base64ToUint8Array(docState.state) - : undefined, - } - : false - ); + if ('error' in response) { + // TODO: result `EventError` with server + if (response.error.code === 'DOC_NOT_FOUND') { + resolve(false); + } else { + reject(new Error(response.error.message)); + } + } else { + resolve({ + missing: base64ToUint8Array(response.data.missing), + state: response.data.state + ? base64ToUint8Array(response.data.state) + : undefined, + }); + } } ); }); }, sendDocUpdate: async (guid: string, update: Uint8Array) => { - logger.debug('client-update', { + logger.debug('client-update-v2', { workspaceId: rootDoc.guid, guid, update, }); - socket.emit('client-update', { - workspaceId: rootDoc.guid, - guid, - update: await uint8ArrayToBase64(update), - }); - return Promise.resolve(); + await syncSender.send(guid, update); }, onDocUpdate: callback => { - socket.on('connect', () => { - socket.emit('client-handshake', rootDoc.guid); - }); const onUpdate = async (message: { workspaceId: string; guid: string; - update: string; + updates: string[]; }) => { if (message.workspaceId === rootDoc.guid) { - callback(message.guid, base64ToUint8Array(message.update)); + message.updates.forEach(update => { + callback(message.guid, base64ToUint8Array(update)); + }); } }; - socket.on('server-update', onUpdate); - const destroyAwareness = setupAffineAwareness(socket, rootDoc, awareness); + let destroyAwareness = () => {}; + socket.on('server-updates', onUpdate); + socket.on('connect', () => { + socket.emit( + 'client-handshake', + rootDoc.guid, + (response: { error?: any }) => { + if (!response.error) { + syncSender.start(); + destroyAwareness = setupAffineAwareness( + socket, + rootDoc, + awareness + ); + } + } + ); + }); socket.connect(); return () => { + syncSender.stop(); socket.emit('client-leave', rootDoc.guid); - socket.off('server-update', onUpdate); + socket.off('server-updates', onUpdate); destroyAwareness(); socket.disconnect(); }; @@ -138,6 +191,23 @@ function setupAffineAwareness( rootDoc: Doc, awareness: Awareness ) { + let lastAwarenessState: Map = new Map(); + // can't compare on update binary because the protocol will encode clock in it but the state is still the same + const compareAwarenessState = (clients: number[]) => { + const newAwarenessState = new Map(); + clients.forEach(client => { + newAwarenessState.set(client, awareness.states.get(client)); + }); + + const equal = isEqual(lastAwarenessState, newAwarenessState); + + if (!equal) { + lastAwarenessState = newAwarenessState; + } + + return equal; + }; + const awarenessBroadcast = ({ workspaceId, awarenessUpdate, @@ -148,7 +218,6 @@ function setupAffineAwareness( if (workspaceId !== rootDoc.guid) { return; } - applyAwarenessUpdate( awareness, base64ToUint8Array(awarenessUpdate), @@ -166,6 +235,11 @@ function setupAffineAwareness( ...cur, ]); + // hit the last awareness update cache, skip + if (compareAwarenessState(changedClients)) { + return; + } + const update = encodeAwarenessUpdate(awareness, changedClients); uint8ArrayToBase64(update) .then(encodedUpdate => { @@ -174,7 +248,7 @@ function setupAffineAwareness( awarenessUpdate: encodedUpdate, }); }) - .catch(err => console.error(err)); + .catch(err => logger.error(err)); }; const newClientAwarenessInitHandler = () => { @@ -188,7 +262,7 @@ function setupAffineAwareness( awarenessUpdate: encodedAwarenessUpdate, }); }) - .catch(err => console.error(err)); + .catch(err => logger.error(err)); }; const windowBeforeUnloadHandler = () => { @@ -199,12 +273,10 @@ function setupAffineAwareness( conn.on('new-client-awareness-init', newClientAwarenessInitHandler); awareness.on('update', awarenessUpdate); - conn.on('connect', () => { - conn.emit('awareness-init', rootDoc.guid); - }); - window.addEventListener('beforeunload', windowBeforeUnloadHandler); + conn.emit('awareness-init', rootDoc.guid); + return () => { awareness.off('update', awarenessUpdate); conn.off('server-awareness-broadcast', awarenessBroadcast); diff --git a/packages/frontend/workspace/src/providers/index.ts b/packages/frontend/workspace/src/providers/index.ts index 83589f9970..29864d1009 100644 --- a/packages/frontend/workspace/src/providers/index.ts +++ b/packages/frontend/workspace/src/providers/index.ts @@ -38,13 +38,10 @@ const createAffineSocketIOProvider: DocProviderCreator = ( const lazyProvider = createLazyProvider(doc, dataSource, { origin: 'affine-socket-io', }); - return { - flavour: 'affine-socket-io', - ...lazyProvider, - get status() { - return lazyProvider.status; - }, - }; + + Object.assign(lazyProvider, { flavour: 'affine-socket-io' }); + + return lazyProvider as unknown as AffineSocketIOProvider; }; const createIndexedDBBackgroundProvider: DocProviderCreator = ( diff --git a/yarn.lock b/yarn.lock index 7c1111063d..a2fd82e50c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -865,6 +865,7 @@ __metadata: js-base64: "npm:^3.7.5" ky: "npm:^1.0.1" lib0: "npm:^0.2.87" + lodash-es: "npm:^4.17.21" nanoid: "npm:^5.0.1" next-auth: "npm:^4.23.2" react: "npm:18.2.0"