fix(workspace): fix sync handshake (hot-fix) (#5797)

This commit is contained in:
EYHN
2024-02-05 10:56:46 +08:00
committed by GitHub
parent d2acd0385a
commit d15ec0ff77
3 changed files with 62 additions and 12 deletions

View File

@@ -23,6 +23,7 @@
"lodash-es": "^4.17.21",
"nanoid": "^5.0.3",
"next-auth": "^4.24.5",
"rxjs": "^7.8.1",
"socket.io-client": "^4.7.2",
"y-protocols": "^1.0.6",
"yjs": "^13.6.10"

View File

@@ -1,6 +1,14 @@
import { DebugLogger } from '@affine/debug';
import { fetchWithTraceReport } from '@affine/graphql';
import type { SyncStorage } from '@affine/workspace';
import { type SyncStorage } from '@affine/workspace';
import {
BehaviorSubject,
distinctUntilChanged,
filter,
lastValueFrom,
take,
timeout,
} from 'rxjs';
import { getIoManager } from '../../utils/affine-io';
import { base64ToUint8Array, uint8ArrayToBase64 } from '../../utils/base64';
@@ -14,14 +22,54 @@ export class AffineSyncStorage implements SyncStorage {
socket = getIoManager().socket('/');
constructor(private readonly workspaceId: string) {
this.socket.on('connect', this.handleConnect);
connected = new BehaviorSubject(false);
handshook = new BehaviorSubject(false);
if (this.socket.connected) {
this.socket.emit('client-handshake-sync', this.workspaceId);
} else {
this.socket.connect();
}
constructor(private readonly workspaceId: string) {
const handleConnect = () => {
this.connected.next(true);
};
this.socket.on('connect', handleConnect);
this.connected.next(this.socket.connected);
const handleDisconnect = () => {
this.connected.next(false);
this.handshook.next(false);
};
this.socket.on('disconnect', handleDisconnect);
this.connected.pipe(distinctUntilChanged()).subscribe(connected => {
if (connected) {
this.socket
.timeout(this.SEND_TIMEOUT)
.emitWithAck('client-handshake-sync', this.workspaceId)
.then(() => {
this.handshook.next(true);
})
.catch(err => {
logger.error('client-handshake-sync error', {
workspaceId: this.workspaceId,
error: err,
});
});
}
});
this.disconnect = () => {
this.socket.emit('client-leave-sync', this.workspaceId);
this.socket.off('connect', handleConnect);
this.socket.off('disconnect', handleDisconnect);
};
}
async waitForHandshake() {
await lastValueFrom(
this.handshook.pipe(
filter(v => v),
timeout({ first: this.SEND_TIMEOUT }),
take(1)
)
);
}
handleConnect = () => {
@@ -32,6 +80,7 @@ export class AffineSyncStorage implements SyncStorage {
docId: string,
state: Uint8Array
): Promise<{ data: Uint8Array; state?: Uint8Array } | null> {
await this.waitForHandshake();
const stateVector = state ? await uint8ArrayToBase64(state) : undefined;
logger.debug('doc-load-v2', {
@@ -75,6 +124,8 @@ export class AffineSyncStorage implements SyncStorage {
}
async push(docId: string, update: Uint8Array) {
await this.waitForHandshake();
logger.debug('client-update-v2', {
workspaceId: this.workspaceId,
guid: docId,
@@ -136,10 +187,7 @@ export class AffineSyncStorage implements SyncStorage {
};
}
disconnect() {
this.socket.emit('client-leave-sync', this.workspaceId);
this.socket.off('connect', this.handleConnect);
}
disconnect: () => void;
}
export function createAffineStorage(

View File

@@ -812,6 +812,7 @@ __metadata:
lodash-es: "npm:^4.17.21"
nanoid: "npm:^5.0.3"
next-auth: "npm:^4.24.5"
rxjs: "npm:^7.8.1"
socket.io-client: "npm:^4.7.2"
vitest: "npm:1.1.3"
ws: "npm:^8.14.2"