refactor(core): adapt to new sync api (#7929)

This commit is contained in:
EYHN
2024-08-21 05:30:33 +00:00
parent b6f46e01c5
commit cfe0677b84
2 changed files with 71 additions and 36 deletions

View File

@@ -23,9 +23,9 @@ export class CloudAwarenessConnection implements AwarenessConnection {
) {}
connect(awareness: Awareness): void {
this.socket.on('server-awareness-broadcast', this.awarenessBroadcast);
this.socket.on('space:broadcast-awareness-update', this.awarenessBroadcast);
this.socket.on(
'new-client-awareness-init',
'space:collect-awareness',
this.newClientAwarenessInitHandler
);
this.awareness = awareness;
@@ -54,10 +54,17 @@ export class CloudAwarenessConnection implements AwarenessConnection {
}
this.awareness = null;
this.socket.emit('client-leave-awareness', this.workspaceId);
this.socket.off('server-awareness-broadcast', this.awarenessBroadcast);
this.socket.emit('space:leave-awareness', {
spaceType: 'workspace',
spaceId: this.workspaceId,
docId: this.workspaceId,
});
this.socket.off(
'new-client-awareness-init',
'space:broadcast-awareness-update',
this.awarenessBroadcast
);
this.socket.off(
'space:collect-awareness',
this.newClientAwarenessInitHandler
);
this.socket.off('connect', this.handleConnect);
@@ -66,16 +73,19 @@ export class CloudAwarenessConnection implements AwarenessConnection {
}
awarenessBroadcast = ({
workspaceId: wsId,
spaceId: wsId,
spaceType,
awarenessUpdate,
}: {
workspaceId: string;
spaceType: string;
spaceId: string;
docId: string;
awarenessUpdate: string;
}) => {
if (!this.awareness) {
return;
}
if (wsId !== this.workspaceId) {
if (wsId !== this.workspaceId || spaceType !== 'workspace') {
return;
}
applyAwarenessUpdate(
@@ -101,8 +111,10 @@ export class CloudAwarenessConnection implements AwarenessConnection {
const update = encodeAwarenessUpdate(this.awareness, changedClients);
uint8ArrayToBase64(update)
.then(encodedUpdate => {
this.socket.emit('awareness-update', {
workspaceId: this.workspaceId,
this.socket.emit('space:update-awareness', {
spaceType: 'workspace',
spaceId: this.workspaceId,
docId: this.workspaceId,
awarenessUpdate: encodedUpdate,
});
})
@@ -119,8 +131,10 @@ export class CloudAwarenessConnection implements AwarenessConnection {
]);
uint8ArrayToBase64(awarenessUpdate)
.then(encodedAwarenessUpdate => {
this.socket.emit('awareness-update', {
workspaceId: this.workspaceId,
this.socket.emit('space:update-awareness', {
spaceType: 'workspace',
spaceId: this.workspaceId,
docId: this.workspaceId,
awarenessUpdate: encodedAwarenessUpdate,
});
})
@@ -141,16 +155,26 @@ export class CloudAwarenessConnection implements AwarenessConnection {
handleConnect = () => {
this.socket.emit(
'client-handshake-awareness',
'space:join-awareness',
{
workspaceId: this.workspaceId,
version: runtimeConfig.appVersion,
spaceType: 'workspace',
spaceId: this.workspaceId,
docId: this.workspaceId,
clientVersion: runtimeConfig.appVersion,
},
(res: any) => {
logger.debug('awareness handshake finished', res);
this.socket.emit('awareness-init', this.workspaceId, (res: any) => {
logger.debug('awareness-init finished', res);
});
this.socket.emit(
'space:load-awarenesses',
{
spaceType: 'workspace',
spaceId: this.workspaceId,
docId: this.workspaceId,
},
(res: any) => {
logger.debug('awareness-init finished', res);
}
);
}
);
};

View File

@@ -26,9 +26,10 @@ export class CloudDocEngineServer implements DocServer {
) {}
private async clientHandShake() {
await this.socket.emitWithAck('client-handshake-sync', {
workspaceId: this.workspaceId,
version: runtimeConfig.appVersion,
await this.socket.emitWithAck('space:join', {
spaceType: 'workspace',
spaceId: this.workspaceId,
clientVersion: runtimeConfig.appVersion,
});
}
@@ -44,9 +45,10 @@ export class CloudDocEngineServer implements DocServer {
timestamp: number;
}> = await this.socket
.timeout(this.SEND_TIMEOUT)
.emitWithAck('doc-load-v2', {
workspaceId: this.workspaceId,
guid: docId,
.emitWithAck('space:load-doc', {
spaceType: 'workspace',
spaceId: this.workspaceId,
docId: docId,
stateVector,
});
@@ -72,9 +74,10 @@ export class CloudDocEngineServer implements DocServer {
const response: WebsocketResponse<{ timestamp: number }> = await this.socket
.timeout(this.SEND_TIMEOUT)
.emitWithAck('client-update-v2', {
workspaceId: this.workspaceId,
guid: docId,
.emitWithAck('space:push-doc-updates', {
spaceType: 'workspace',
spaceId: this.workspaceId,
docId: docId,
updates: [payload],
});
@@ -94,8 +97,9 @@ export class CloudDocEngineServer implements DocServer {
const response: WebsocketResponse<Record<string, number>> =
await this.socket
.timeout(this.SEND_TIMEOUT)
.emitWithAck('client-pre-sync', {
workspaceId: this.workspaceId,
.emitWithAck('space:load-doc-timestamps', {
spaceType: 'workspace',
spaceId: this.workspaceId,
timestamp: after,
});
@@ -118,25 +122,29 @@ export class CloudDocEngineServer implements DocServer {
}) => void
): Promise<() => void> {
const handleUpdate = async (message: {
workspaceId: string;
guid: string;
spaceType: string;
spaceId: string;
docId: string;
updates: string[];
timestamp: number;
}) => {
if (message.workspaceId === this.workspaceId) {
if (
message.spaceType === 'workspace' &&
message.spaceId === this.workspaceId
) {
message.updates.forEach(update => {
cb({
docId: message.guid,
docId: message.docId,
data: base64ToUint8Array(update),
serverClock: message.timestamp,
});
});
}
};
this.socket.on('server-updates', handleUpdate);
this.socket.on('space:broadcast-doc-updates', handleUpdate);
return () => {
this.socket.off('server-updates', handleUpdate);
this.socket.off('space:broadcast-doc-updates', handleUpdate);
};
}
async waitForConnectingServer(signal: AbortSignal): Promise<void> {
@@ -165,7 +173,10 @@ export class CloudDocEngineServer implements DocServer {
return;
}
this.socket.emit('client-leave-sync', this.workspaceId);
this.socket.emit('space:leave', {
spaceType: 'workspace',
spaceId: this.workspaceId,
});
this.socket.off('server-version-rejected', this.handleVersionRejected);
this.socket.off('disconnect', this.handleDisconnect);
this.socket.disconnect();