fix(server): too much redundant updates events (#10383)

This commit is contained in:
forehalo
2025-02-24 04:44:42 +00:00
parent 2e0f0c624a
commit f02b57d58b

View File

@@ -45,7 +45,9 @@ type EventResponse<Data = any> = Data extends never
data: Data;
};
type RoomType = 'sync' | `${string}:awareness`;
// 019 only receives space:broadcast-doc-updates and send space:push-doc-updates
// 020 only receives space:broadcast-doc-update and send space:push-doc-update
type RoomType = 'sync' | `${string}:awareness` | 'sync-019';
function Room(
spaceId: string,
@@ -214,7 +216,16 @@ export class SpaceSyncGateway
): Promise<EventResponse<{ clientId: string; success: true }>> {
await this.assertVersion(client, clientVersion);
await this.selectAdapter(client, spaceType).join(user.id, spaceId);
// TODO(@forehalo): remove this after 0.19 goes out of life
// simple match 0.19.x
if (/^0.19.[\d]$/.test(clientVersion)) {
const room = Room(spaceId, 'sync-019');
if (!client.rooms.has(room)) {
await client.join(room);
}
} else {
await this.selectAdapter(client, spaceType).join(user.id, spaceId);
}
return { data: { clientId: client.id, success: true } };
}
@@ -270,6 +281,8 @@ export class SpaceSyncGateway
/**
* @deprecated use [space:push-doc-update] instead, client should always merge updates on their own
*
* only 0.19.x client will send this event
*/
@SubscribeMessage('space:push-doc-updates')
async onReceiveDocUpdates(
@@ -289,23 +302,19 @@ export class SpaceSyncGateway
user.id
);
// could be put in [adapter.push]
// but the event should be kept away from adapter
// so
// broadcast to 0.19.x clients
client
.to(adapter.room(spaceId))
.to(Room(spaceId, 'sync-019'))
.emit('space:broadcast-doc-updates', { ...message, timestamp });
// TODO(@forehalo): remove backward compatibility
if (spaceType === SpaceType.Workspace) {
const id = new DocID(docId, spaceId);
client.to(adapter.room(spaceId)).emit('server-updates', {
workspaceId: spaceId,
guid: id.guid,
updates,
// broadcast to new clients
updates.forEach(update => {
client.to(adapter.room(spaceId)).emit('space:broadcast-doc-update', {
...message,
update,
timestamp,
});
}
});
return {
data: {
@@ -333,9 +342,8 @@ export class SpaceSyncGateway
user.id
);
// TODO(@forehalo): separate different version of clients into different rooms,
// so the clients won't receive useless updates events
client.to(adapter.room(spaceId)).emit('space:broadcast-doc-updates', {
// broadcast to 0.19.x clients
client.to(Room(spaceId, 'sync-019')).emit('space:broadcast-doc-updates', {
spaceType,
spaceId,
docId,
@@ -445,163 +453,8 @@ export class SpaceSyncGateway
.to(adapter.room(spaceId, roomType))
.emit('space:broadcast-awareness-update', message);
// TODO(@forehalo): remove backward compatibility
if (spaceType === SpaceType.Workspace) {
client
.to(adapter.room(spaceId, roomType))
.emit('server-awareness-broadcast', {
workspaceId: spaceId,
awarenessUpdate: message.awarenessUpdate,
});
}
return {};
}
// TODO(@forehalo): remove
// deprecated section
@SubscribeMessage('client-handshake-sync')
async handleClientHandshakeSync(
@CurrentUser() user: CurrentUser,
@MessageBody('workspaceId') workspaceId: string,
@MessageBody('version') version: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
await this.assertVersion(client, version);
return this.onJoinSpace(user, client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
clientVersion: version,
});
}
@SubscribeMessage('client-leave-sync')
async handleLeaveSync(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
return this.onLeaveSpace(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
});
}
@SubscribeMessage('client-pre-sync')
async loadDocStats(
@ConnectedSocket() client: Socket,
@MessageBody()
{ workspaceId, timestamp }: { workspaceId: string; timestamp?: number }
): Promise<EventResponse<Record<string, number>>> {
return this.onLoadDocTimestamps(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
timestamp,
});
}
@SubscribeMessage('client-update-v2')
async handleClientUpdateV2(
@CurrentUser() user: CurrentUser,
@MessageBody()
{
workspaceId,
guid,
updates,
}: {
workspaceId: string;
guid: string;
updates: string[];
},
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
return this.onReceiveDocUpdates(client, user, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: guid,
updates,
});
}
@SubscribeMessage('doc-load-v2')
async loadDocV2(
@ConnectedSocket() client: Socket,
@MessageBody()
{
workspaceId,
guid,
stateVector,
}: {
workspaceId: string;
guid: string;
stateVector?: string;
}
): Promise<
EventResponse<{ missing: string; state?: string; timestamp: number }>
> {
return this.onLoadSpaceDoc(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: guid,
stateVector,
});
}
@SubscribeMessage('client-handshake-awareness')
async handleClientHandshakeAwareness(
@ConnectedSocket() client: Socket,
@CurrentUser() user: CurrentUser,
@MessageBody('workspaceId') workspaceId: string,
@MessageBody('version') version: string
): Promise<EventResponse<{ clientId: string }>> {
return this.onJoinAwareness(client, user, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: workspaceId,
clientVersion: version,
});
}
@SubscribeMessage('client-leave-awareness')
async handleLeaveAwareness(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
return this.onLeaveAwareness(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: workspaceId,
});
}
@SubscribeMessage('awareness-init')
async handleInitAwareness(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
return this.onLoadAwareness(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: workspaceId,
});
}
@SubscribeMessage('awareness-update')
async handleHelpGatheringAwareness(
@MessageBody()
{
workspaceId,
awarenessUpdate,
}: { workspaceId: string; awarenessUpdate: string },
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
return this.onUpdateAwareness(client, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: workspaceId,
awarenessUpdate,
});
}
}
abstract class SyncSocketAdapter {
@@ -647,7 +500,8 @@ abstract class SyncSocketAdapter {
): Promise<void>;
push(spaceId: string, docId: string, updates: Buffer[], editorId: string) {
this.assertIn(spaceId);
// TODO(@forehalo): enable this after 0.19 goes out of life
// this.assertIn(spaceId);
return this.storage.pushDocUpdates(spaceId, docId, updates, editorId);
}