diff --git a/packages/backend/server/src/core/sync/events/events.gateway.ts b/packages/backend/server/src/core/sync/events/events.gateway.ts index a637675f10..b71e70278d 100644 --- a/packages/backend/server/src/core/sync/events/events.gateway.ts +++ b/packages/backend/server/src/core/sync/events/events.gateway.ts @@ -38,27 +38,21 @@ export const GatewayErrorWrapper = (): MethodDecorator => { return desc; } - desc.value = function (...args: any[]) { - let result: any; + desc.value = async function (...args: any[]) { try { - result = originalMethod.apply(this, args); + return await originalMethod.apply(this, args); } catch (e) { - metrics.socketio.counter('unhandled_errors').add(1); - return { - error: new InternalError(e as Error), - }; - } - - if (result instanceof Promise) { - return result.catch(e => { - metrics.socketio.counter('unhandled_errors').add(1); - new Logger('EventsGateway').error(e, e.stack); + if (e instanceof EventError) { return { - error: new InternalError(e), + error: e, }; - }); - } else { - return result; + } else { + metrics.socketio.counter('unhandled_errors').add(1); + new Logger('EventsGateway').error(e, (e as Error).stack); + return { + error: new InternalError(e as Error), + }; + } } }; @@ -85,6 +79,14 @@ type EventResponse = data: Data; }); +function Sync(workspaceId: string): `${string}:sync` { + return `${workspaceId}:sync`; +} + +function Awareness(workspaceId: string): `${string}:awareness` { + return `${workspaceId}:awareness`; +} + @WebSocketGateway({ cors: !AFFiNE.node.prod, transports: ['websocket'], @@ -113,7 +115,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { metrics.socketio.gauge('realtime_connections').record(this.connectionCount); } - checkVersion(client: Socket, version?: string) { + assertVersion(client: Socket, version?: string) { if ( // @todo(@darkskygit): remove this flag after 0.12 goes stable AFFiNE.featureFlags.syncClientVersionCheck && @@ -126,14 +128,48 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { version ? ` ${version}` : '' } is outdated, please update to ${AFFiNE.version}`, }); - return { - error: new EventError( - EventErrorCode.VERSION_REJECTED, - `Client version ${version} is outdated, please update to ${AFFiNE.version}` - ), - }; + + throw new EventError( + EventErrorCode.VERSION_REJECTED, + `Client version ${version} is outdated, please update to ${AFFiNE.version}` + ); + } + } + + async joinWorkspace( + client: Socket, + room: `${string}:${'sync' | 'awareness'}` + ) { + await client.join(room); + } + + async leaveWorkspace( + client: Socket, + room: `${string}:${'sync' | 'awareness'}` + ) { + await client.leave(room); + } + + assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) { + if (!client.rooms.has(room)) { + throw new NotInWorkspaceError(room); + } + } + + async assertWorkspaceAccessible( + workspaceId: string, + userId: string, + permission: Permission = Permission.Read + ) { + if ( + !(await this.permissions.isWorkspaceMember( + workspaceId, + userId, + permission + )) + ) { + throw new AccessDeniedError(workspaceId); } - return null; } @Auth() @@ -144,29 +180,19 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { @MessageBody('version') version: string | undefined, @ConnectedSocket() client: Socket ): Promise> { - const versionError = this.checkVersion(client, version); - if (versionError) { - return versionError; - } - - const canWrite = await this.permissions.tryCheckWorkspace( + this.assertVersion(client, version); + await this.assertWorkspaceAccessible( workspaceId, user.id, Permission.Write ); - if (canWrite) { - await client.join(`${workspaceId}:sync`); - return { - data: { - clientId: client.id, - }, - }; - } else { - return { - error: new AccessDeniedError(workspaceId), - }; - } + await this.joinWorkspace(client, Sync(workspaceId)); + return { + data: { + clientId: client.id, + }, + }; } @Auth() @@ -177,47 +203,18 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { @MessageBody('version') version: string | undefined, @ConnectedSocket() client: Socket ): Promise> { - const versionError = this.checkVersion(client, version); - if (versionError) { - return versionError; - } - - const canWrite = await this.permissions.tryCheckWorkspace( + this.assertVersion(client, version); + await this.assertWorkspaceAccessible( workspaceId, user.id, Permission.Write ); - if (canWrite) { - await client.join(`${workspaceId}:awareness`); - return { - data: { - clientId: client.id, - }, - }; - } else { - return { - error: new AccessDeniedError(workspaceId), - }; - } - } - - /** - * @deprecated use `client-handshake-sync` and `client-handshake-awareness` instead - */ - @Auth() - @SubscribeMessage('client-handshake') - async handleClientHandShake( - @MessageBody() workspaceId: string, - @ConnectedSocket() client: Socket - ): Promise> { - const versionError = this.checkVersion(client); - if (versionError) { - return versionError; - } - // should unreachable + await this.joinWorkspace(client, Awareness(workspaceId)); return { - error: new AccessDeniedError(workspaceId), + data: { + clientId: client.id, + }, }; } @@ -226,14 +223,9 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { @MessageBody() workspaceId: string, @ConnectedSocket() client: Socket ): Promise { - if (client.rooms.has(`${workspaceId}:sync`)) { - await client.leave(`${workspaceId}:sync`); - return {}; - } else { - return { - error: new NotInWorkspaceError(workspaceId), - }; - } + this.assertInWorkspace(client, Sync(workspaceId)); + await this.leaveWorkspace(client, Sync(workspaceId)); + return {}; } @SubscribeMessage('client-leave-awareness') @@ -241,14 +233,9 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { @MessageBody() workspaceId: string, @ConnectedSocket() client: Socket ): Promise { - if (client.rooms.has(`${workspaceId}:awareness`)) { - await client.leave(`${workspaceId}:awareness`); - return {}; - } else { - return { - error: new NotInWorkspaceError(workspaceId), - }; - } + this.assertInWorkspace(client, Awareness(workspaceId)); + await this.leaveWorkspace(client, Awareness(workspaceId)); + return {}; } @SubscribeMessage('client-pre-sync') @@ -257,11 +244,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { @MessageBody() { workspaceId, timestamp }: { workspaceId: string; timestamp?: number } ): Promise>> { - if (!client.rooms.has(`${workspaceId}:sync`)) { - return { - error: new NotInWorkspaceError(workspaceId), - }; - } + this.assertInWorkspace(client, Sync(workspaceId)); const stats = await this.docManager.getStats(workspaceId, timestamp); @@ -284,11 +267,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { }, @ConnectedSocket() client: Socket ): Promise> { - if (!client.rooms.has(`${workspaceId}:sync`)) { - return { - error: new NotInWorkspaceError(workspaceId), - }; - } + this.assertInWorkspace(client, Sync(workspaceId)); const docId = new DocID(guid, workspaceId); const buffers = updates.map(update => Buffer.from(update, 'base64')); @@ -299,7 +278,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { ); client - .to(`${docId.workspace}:sync`) + .to(Sync(workspaceId)) .emit('server-updates', { workspaceId, guid, updates, timestamp }); return { @@ -310,11 +289,9 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { }; } - @Auth() @SubscribeMessage('doc-load-v2') async loadDocV2( @ConnectedSocket() client: Socket, - @CurrentUser() user: CurrentUser, @MessageBody() { workspaceId, @@ -326,17 +303,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { stateVector?: string; } ): Promise> { - if (!client.rooms.has(`${workspaceId}:sync`)) { - const canRead = await this.permissions.tryCheckWorkspace( - workspaceId, - user.id - ); - if (!canRead) { - return { - error: new AccessDeniedError(workspaceId), - }; - } - } + this.assertInWorkspace(client, Sync(workspaceId)); const docId = new DocID(guid, workspaceId); const doc = await this.docManager.get(docId.workspace, docId.guid); @@ -363,40 +330,33 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { }; } - @Auth() @SubscribeMessage('awareness-init') async handleInitAwareness( @MessageBody() workspaceId: string, @ConnectedSocket() client: Socket ): Promise> { - if (client.rooms.has(`${workspaceId}:awareness`)) { - client.to(`${workspaceId}:awareness`).emit('new-client-awareness-init'); - return { - data: { - clientId: client.id, - }, - }; - } else { - return { - error: new NotInWorkspaceError(workspaceId), - }; - } + this.assertInWorkspace(client, Awareness(workspaceId)); + client.to(Awareness(workspaceId)).emit('new-client-awareness-init'); + return { + data: { + clientId: client.id, + }, + }; } @SubscribeMessage('awareness-update') async handleHelpGatheringAwareness( - @MessageBody() message: { workspaceId: string; awarenessUpdate: string }, + @MessageBody() + { + workspaceId, + awarenessUpdate, + }: { workspaceId: string; awarenessUpdate: string }, @ConnectedSocket() client: Socket ): Promise { - if (client.rooms.has(`${message.workspaceId}:awareness`)) { - client - .to(`${message.workspaceId}:awareness`) - .emit('server-awareness-broadcast', message); - return {}; - } else { - return { - error: new NotInWorkspaceError(message.workspaceId), - }; - } + this.assertInWorkspace(client, Awareness(workspaceId)); + client + .to(Awareness(workspaceId)) + .emit('server-awareness-broadcast', { workspaceId, awarenessUpdate }); + return {}; } } diff --git a/packages/backend/server/src/core/workspaces/permission.ts b/packages/backend/server/src/core/workspaces/permission.ts index c188e1166b..1c750c73f9 100644 --- a/packages/backend/server/src/core/workspaces/permission.ts +++ b/packages/backend/server/src/core/workspaces/permission.ts @@ -73,6 +73,28 @@ export class PermissionService { return this.tryCheckPage(ws, id, user); } + /** + * Returns whether a given user is a member of a workspace and has the given or higher permission. + */ + async isWorkspaceMember( + ws: string, + user: string, + permission: Permission + ): Promise { + const count = await this.prisma.workspaceUserPermission.count({ + where: { + workspaceId: ws, + userId: user, + accepted: true, + type: { + gte: permission, + }, + }, + }); + + return count !== 0; + } + async checkWorkspace( ws: string, user?: string, diff --git a/packages/backend/server/src/fundamentals/metrics/utils.ts b/packages/backend/server/src/fundamentals/metrics/utils.ts index e4d16a3492..83bcd1856c 100644 --- a/packages/backend/server/src/fundamentals/metrics/utils.ts +++ b/packages/backend/server/src/fundamentals/metrics/utils.ts @@ -18,11 +18,16 @@ export const CallTimer = ( return desc; } - desc.value = function (...args: any[]) { + desc.value = async function (...args: any[]) { const timer = metrics[scope].histogram(name, { description: `function call time costs of ${name}`, unit: 'ms', }); + metrics[scope] + .counter(`${name}_calls`, { + description: `function call counts of ${name}`, + }) + .add(1, attrs); const start = Date.now(); @@ -30,19 +35,10 @@ export const CallTimer = ( timer.record(Date.now() - start, attrs); }; - let result: any; try { - result = originalMethod.apply(this, args); - } catch (e) { + return await originalMethod.apply(this, args); + } finally { end(); - throw e; - } - - if (result instanceof Promise) { - return result.finally(end); - } else { - end(); - return result; } }; diff --git a/packages/frontend/workspace-impl/src/cloud/awareness.ts b/packages/frontend/workspace-impl/src/cloud/awareness.ts index 450ae25972..431c5e1855 100644 --- a/packages/frontend/workspace-impl/src/cloud/awareness.ts +++ b/packages/frontend/workspace-impl/src/cloud/awareness.ts @@ -104,7 +104,7 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider { uint8ArrayToBase64(awarenessUpdate) .then(encodedAwarenessUpdate => { this.socket.emit('awareness-update', { - guid: this.workspaceId, + workspaceId: this.workspaceId, awarenessUpdate: encodedAwarenessUpdate, }); }) @@ -120,11 +120,19 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider { }; handleConnect = () => { - this.socket.emit('client-handshake-awareness', { - workspaceId: this.workspaceId, - version: runtimeConfig.appVersion, - }); - this.socket.emit('awareness-init', this.workspaceId); + this.socket.emit( + 'client-handshake-awareness', + { + workspaceId: this.workspaceId, + version: runtimeConfig.appVersion, + }, + (res: any) => { + logger.debug('awareness handshake finished', res); + this.socket.emit('awareness-init', this.workspaceId, (res: any) => { + logger.debug('awareness-init finished', res); + }); + } + ); }; handleReject = (_msg: RejectByVersion) => { diff --git a/packages/frontend/workspace-impl/src/cloud/sync.ts b/packages/frontend/workspace-impl/src/cloud/sync.ts index 1ce7349640..ca14d2f17b 100644 --- a/packages/frontend/workspace-impl/src/cloud/sync.ts +++ b/packages/frontend/workspace-impl/src/cloud/sync.ts @@ -42,10 +42,16 @@ export class AffineSyncStorage implements SyncStorage { } handleConnect = () => { - this.socket.emit('client-handshake-sync', { - workspaceId: this.workspaceId, - version: runtimeConfig.appVersion, - }); + this.socket.emit( + 'client-handshake-sync', + { + workspaceId: this.workspaceId, + version: runtimeConfig.appVersion, + }, + (res: any) => { + logger.debug('client handshake finished', res); + } + ); }; handleReject = (message: RejectByVersion) => {