diff --git a/packages/backend/server/src/modules/sync/events/events.gateway.ts b/packages/backend/server/src/modules/sync/events/events.gateway.ts index 4953241185..abc1a946d0 100644 --- a/packages/backend/server/src/modules/sync/events/events.gateway.ts +++ b/packages/backend/server/src/modules/sync/events/events.gateway.ts @@ -114,8 +114,8 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { } @Auth() - @SubscribeMessage('client-handshake') - async handleClientHandShake( + @SubscribeMessage('client-handshake-sync') + async handleClientHandshakeSync( @CurrentUser() user: UserType, @MessageBody() workspaceId: string, @ConnectedSocket() client: Socket @@ -127,7 +127,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { ); if (canWrite) { - await client.join(workspaceId); + await client.join(`${workspaceId}:sync`); return { data: { clientId: client.id, @@ -140,13 +140,71 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { } } - @SubscribeMessage('client-leave') - async handleClientLeave( + @Auth() + @SubscribeMessage('client-handshake-awareness') + async handleClientHandshakeAwareness( + @CurrentUser() user: UserType, + @MessageBody() workspaceId: string, + @ConnectedSocket() client: Socket + ): Promise> { + const canWrite = await this.permissions.tryCheckWorkspace( + workspaceId, + user.id, + Permission.Write + ); + + if (canWrite) { + await client.join(`${workspaceId}:awareness`); + return { + data: { + clientId: client.id, + }, + }; + } else { + return { + error: new AccessDeniedError(workspaceId), + }; + } + } + + /** + * @deprecated use `client-handshake-sync` and `client-handshake-awareness` instead + */ + @Auth() + @SubscribeMessage('client-handshake') + async handleClientHandShake( + @CurrentUser() user: UserType, + @MessageBody() + workspaceId: string, + @ConnectedSocket() client: Socket + ): Promise> { + const canWrite = await this.permissions.tryCheckWorkspace( + workspaceId, + user.id, + Permission.Write + ); + + if (canWrite) { + await client.join([`${workspaceId}:sync`, `${workspaceId}:awareness`]); + return { + data: { + clientId: client.id, + }, + }; + } else { + return { + error: new AccessDeniedError(workspaceId), + }; + } + } + + @SubscribeMessage('client-leave-sync') + async handleLeaveSync( @MessageBody() workspaceId: string, @ConnectedSocket() client: Socket ): Promise { - if (client.rooms.has(workspaceId)) { - await client.leave(workspaceId); + if (client.rooms.has(`${workspaceId}:sync`)) { + await client.leave(`${workspaceId}:sync`); return {}; } else { return { @@ -155,6 +213,38 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { } } + @SubscribeMessage('client-leave-awareness') + async handleLeaveAwareness( + @MessageBody() workspaceId: string, + @ConnectedSocket() client: Socket + ): Promise { + if (client.rooms.has(`${workspaceId}:awareness`)) { + await client.leave(`${workspaceId}:awareness`); + return {}; + } else { + return { + error: new NotInWorkspaceError(workspaceId), + }; + } + } + + /** + * @deprecated use `client-leave-sync` and `client-leave-awareness` instead + */ + @SubscribeMessage('client-leave') + async handleClientLeave( + @MessageBody() workspaceId: string, + @ConnectedSocket() client: Socket + ): Promise { + if (client.rooms.has(`${workspaceId}:sync`)) { + await client.leave(`${workspaceId}:sync`); + } + if (client.rooms.has(`${workspaceId}:awareness`)) { + await client.leave(`${workspaceId}:awareness`); + } + return {}; + } + /** * This is the old version of the `client-update` event without any data protocol. * It only exists for backwards compatibility to adapt older clients. @@ -175,7 +265,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { }, @ConnectedSocket() client: Socket ) { - if (!client.rooms.has(workspaceId)) { + if (!client.rooms.has(`${workspaceId}:sync`)) { this.logger.verbose( `Client ${client.id} tried to push update to workspace ${workspaceId} without joining it first` ); @@ -185,12 +275,12 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { const docId = new DocID(guid, workspaceId); client - .to(docId.workspace) + .to(`${docId.workspace}:sync`) .emit('server-update', { workspaceId, guid, update }); // broadcast to all clients with newer version that only listen to `server-updates` client - .to(docId.workspace) + .to(`${docId.workspace}:sync`) .emit('server-updates', { workspaceId, guid, updates: [update] }); const buf = Buffer.from(update, 'base64'); @@ -219,7 +309,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { stateVector?: string; } ): Promise<{ missing: string; state?: string } | false> { - if (!client.rooms.has(workspaceId)) { + if (!client.rooms.has(`${workspaceId}:sync`)) { const canRead = await this.permissions.tryCheckWorkspace( workspaceId, user.id @@ -264,7 +354,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { }, @ConnectedSocket() client: Socket ): Promise> { - if (!client.rooms.has(workspaceId)) { + if (!client.rooms.has(`${workspaceId}:sync`)) { return { error: new NotInWorkspaceError(workspaceId), }; @@ -272,7 +362,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { const docId = new DocID(guid, workspaceId); client - .to(docId.workspace) + .to(`${docId.workspace}:sync`) .emit('server-updates', { workspaceId, guid, updates }); const buffers = updates.map(update => Buffer.from(update, 'base64')); @@ -301,7 +391,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { stateVector?: string; } ): Promise> { - if (!client.rooms.has(workspaceId)) { + if (!client.rooms.has(`${workspaceId}:sync`)) { const canRead = await this.permissions.tryCheckWorkspace( workspaceId, user.id @@ -343,8 +433,8 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { @MessageBody() workspaceId: string, @ConnectedSocket() client: Socket ): Promise> { - if (client.rooms.has(workspaceId)) { - client.to(workspaceId).emit('new-client-awareness-init'); + if (client.rooms.has(`${workspaceId}:awareness`)) { + client.to(`${workspaceId}:awareness`).emit('new-client-awareness-init'); return { data: { clientId: client.id, @@ -362,9 +452,9 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { @MessageBody() message: { workspaceId: string; awarenessUpdate: string }, @ConnectedSocket() client: Socket ): Promise { - if (client.rooms.has(message.workspaceId)) { + if (client.rooms.has(`${message.workspaceId}:awareness`)) { client - .to(message.workspaceId) + .to(`${message.workspaceId}:awareness`) .emit('server-awareness-broadcast', message); return {}; } else {