From 25e8a2a22f5e2df5c3ee46680974cd309dd853aa Mon Sep 17 00:00:00 2001 From: DarkSky Date: Mon, 5 Feb 2024 08:43:50 +0000 Subject: [PATCH] feat: sync client versioning (#5645) after this pr, server will only accept client that have some major version the client version <0.12 will be rejected by the server, >= 0.12 can receive outdated messages and notify users --- .github/actions/deploy/deploy.mjs | 1 + .../charts/graphql/templates/deployment.yaml | 2 + .../helm/affine/charts/graphql/values.yaml | 1 + .../backend/server/src/config/affine.env.ts | 4 + .../server/src/core/sync/events/error.ts | 2 +- .../src/core/sync/events/events.gateway.ts | 180 +++++------------- .../server/src/fundamentals/config/def.ts | 1 + .../server/src/fundamentals/config/default.ts | 1 + .../infra/src/workspace/engine/sync/consts.ts | 8 + .../infra/src/workspace/engine/sync/engine.ts | 14 +- .../infra/src/workspace/engine/sync/peer.ts | 11 +- .../src/workspace/engine/sync/storage.ts | 13 ++ .../workspace-card/index.tsx | 10 +- .../workspace-impl/src/cloud/awareness.ts | 16 +- .../frontend/workspace-impl/src/cloud/sync.ts | 23 ++- 15 files changed, 144 insertions(+), 143 deletions(-) diff --git a/.github/actions/deploy/deploy.mjs b/.github/actions/deploy/deploy.mjs index 8c860d2a8f..d5297f3781 100644 --- a/.github/actions/deploy/deploy.mjs +++ b/.github/actions/deploy/deploy.mjs @@ -113,6 +113,7 @@ const createHelmCommand = ({ isDryRun }) => { `--set-string graphql.app.payment.stripe.webhookKey="${STRIPE_WEBHOOK_KEY}"`, `--set graphql.app.experimental.enableJwstCodec=true`, `--set graphql.app.features.earlyAccessPreview=false`, + `--set graphql.app.features.syncClientVersionCheck=true`, `--set sync.replicaCount=${syncReplicaCount}`, `--set-string sync.image.tag="${imageTag}"`, ...serviceAnnotations, diff --git a/.github/helm/affine/charts/graphql/templates/deployment.yaml b/.github/helm/affine/charts/graphql/templates/deployment.yaml index 8a4de5f5e3..06d7bf77ea 100644 --- a/.github/helm/affine/charts/graphql/templates/deployment.yaml +++ b/.github/helm/affine/charts/graphql/templates/deployment.yaml @@ -83,6 +83,8 @@ spec: value: "{{ .Values.app.captcha.enabled }}" - name: FEATURES_EARLY_ACCESS_PREVIEW value: "{{ .Values.app.features.earlyAccessPreview }}" + - name: FEATURES_SYNC_CLIENT_VERSION_CHECK + value: "{{ .Values.app.features.syncClientVersionCheck }}" - name: OAUTH_EMAIL_SENDER valueFrom: secretKeyRef: diff --git a/.github/helm/affine/charts/graphql/values.yaml b/.github/helm/affine/charts/graphql/values.yaml index b0e787d2bf..78cd80e9c8 100644 --- a/.github/helm/affine/charts/graphql/values.yaml +++ b/.github/helm/affine/charts/graphql/values.yaml @@ -60,6 +60,7 @@ app: webhookKey: '' features: earlyAccessPreview: false + syncClientVersionCheck: false serviceAccount: create: true diff --git a/packages/backend/server/src/config/affine.env.ts b/packages/backend/server/src/config/affine.env.ts index 452bc119d7..0aae7ff906 100644 --- a/packages/backend/server/src/config/affine.env.ts +++ b/packages/backend/server/src/config/affine.env.ts @@ -34,4 +34,8 @@ AFFiNE.ENV_MAP = { STRIPE_API_KEY: 'plugins.payment.stripe.keys.APIKey', STRIPE_WEBHOOK_KEY: 'plugins.payment.stripe.keys.webhookKey', FEATURES_EARLY_ACCESS_PREVIEW: ['featureFlags.earlyAccessPreview', 'boolean'], + FEATURES_SYNC_CLIENT_VERSION_CHECK: [ + 'featureFlags.syncClientVersionCheck', + 'boolean', + ], }; diff --git a/packages/backend/server/src/core/sync/events/error.ts b/packages/backend/server/src/core/sync/events/error.ts index 4d832e0e12..ea6a3ac5e5 100644 --- a/packages/backend/server/src/core/sync/events/error.ts +++ b/packages/backend/server/src/core/sync/events/error.ts @@ -1,4 +1,4 @@ -enum EventErrorCode { +export enum EventErrorCode { WORKSPACE_NOT_FOUND = 'WORKSPACE_NOT_FOUND', DOC_NOT_FOUND = 'DOC_NOT_FOUND', NOT_IN_WORKSPACE = 'NOT_IN_WORKSPACE', diff --git a/packages/backend/server/src/core/sync/events/events.gateway.ts b/packages/backend/server/src/core/sync/events/events.gateway.ts index 5813bf03f8..3bac482421 100644 --- a/packages/backend/server/src/core/sync/events/events.gateway.ts +++ b/packages/backend/server/src/core/sync/events/events.gateway.ts @@ -22,6 +22,7 @@ import { AccessDeniedError, DocNotFoundError, EventError, + EventErrorCode, InternalError, NotInWorkspaceError, } from './error'; @@ -112,13 +113,42 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { metrics.socketio.gauge('realtime_connections').record(this.connectionCount); } + checkVersion(client: Socket, version?: string) { + if ( + // @todo(@darkskygit): remove this flag after 0.12 goes stable + AFFiNE.featureFlags.syncClientVersionCheck && + version !== AFFiNE.version + ) { + client.emit('server-version-rejected', { + currentVersion: version, + requiredVersion: AFFiNE.version, + reason: `Client version${ + version ? ` ${version}` : '' + } is outdated, please update to ${AFFiNE.version}`, + }); + return { + error: new EventError( + EventErrorCode.VERSION_REJECTED, + `Client version ${version} is outdated, please update to ${AFFiNE.version}` + ), + }; + } + return null; + } + @Auth() @SubscribeMessage('client-handshake-sync') async handleClientHandshakeSync( @CurrentUser() user: UserType, - @MessageBody() workspaceId: string, + @MessageBody('workspaceId') workspaceId: string, + @MessageBody('version') version: string | undefined, @ConnectedSocket() client: Socket ): Promise> { + const versionError = this.checkVersion(client, version); + if (versionError) { + return versionError; + } + const canWrite = await this.permissions.tryCheckWorkspace( workspaceId, user.id, @@ -143,9 +173,15 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { @SubscribeMessage('client-handshake-awareness') async handleClientHandshakeAwareness( @CurrentUser() user: UserType, - @MessageBody() workspaceId: string, + @MessageBody('workspaceId') workspaceId: string, + @MessageBody('version') version: string | undefined, @ConnectedSocket() client: Socket ): Promise> { + const versionError = this.checkVersion(client, version); + if (versionError) { + return versionError; + } + const canWrite = await this.permissions.tryCheckWorkspace( workspaceId, user.id, @@ -172,29 +208,17 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { @Auth() @SubscribeMessage('client-handshake') async handleClientHandShake( - @CurrentUser() user: UserType, - @MessageBody() - workspaceId: string, + @MessageBody() workspaceId: string, @ConnectedSocket() client: Socket ): Promise> { - const canWrite = await this.permissions.tryCheckWorkspace( - workspaceId, - user.id, - Permission.Write - ); - - if (canWrite) { - await client.join([`${workspaceId}:sync`, `${workspaceId}:awareness`]); - return { - data: { - clientId: client.id, - }, - }; - } else { - return { - error: new AccessDeniedError(workspaceId), - }; + const versionError = this.checkVersion(client); + if (versionError) { + return versionError; } + // should unreachable + return { + error: new AccessDeniedError(workspaceId), + }; } @SubscribeMessage('client-leave-sync') @@ -227,118 +251,6 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { } } - /** - * @deprecated use `client-leave-sync` and `client-leave-awareness` instead - */ - @SubscribeMessage('client-leave') - async handleClientLeave( - @MessageBody() workspaceId: string, - @ConnectedSocket() client: Socket - ): Promise { - if (client.rooms.has(`${workspaceId}:sync`)) { - await client.leave(`${workspaceId}:sync`); - } - if (client.rooms.has(`${workspaceId}:awareness`)) { - await client.leave(`${workspaceId}:awareness`); - } - return {}; - } - - /** - * This is the old version of the `client-update` event without any data protocol. - * It only exists for backwards compatibility to adapt older clients. - * - * @deprecated - */ - @SubscribeMessage('client-update') - async handleClientUpdateV1( - @MessageBody() - { - workspaceId, - guid, - update, - }: { - workspaceId: string; - guid: string; - update: string; - }, - @ConnectedSocket() client: Socket - ) { - if (!client.rooms.has(`${workspaceId}:sync`)) { - this.logger.verbose( - `Client ${client.id} tried to push update to workspace ${workspaceId} without joining it first` - ); - return; - } - - const docId = new DocID(guid, workspaceId); - - client - .to(`${docId.workspace}:sync`) - .emit('server-update', { workspaceId, guid, update }); - - // broadcast to all clients with newer version that only listen to `server-updates` - client - .to(`${docId.workspace}:sync`) - .emit('server-updates', { workspaceId, guid, updates: [update] }); - - const buf = Buffer.from(update, 'base64'); - await this.docManager.push(docId.workspace, docId.guid, buf); - } - - /** - * This is the old version of the `doc-load` event without any data protocol. - * It only exists for backwards compatibility to adapt older clients. - * - * @deprecated - */ - @Auth() - @SubscribeMessage('doc-load') - async loadDocV1( - @ConnectedSocket() client: Socket, - @CurrentUser() user: UserType, - @MessageBody() - { - workspaceId, - guid, - stateVector, - }: { - workspaceId: string; - guid: string; - stateVector?: string; - } - ): Promise<{ missing: string; state?: string } | false> { - if (!client.rooms.has(`${workspaceId}:sync`)) { - const canRead = await this.permissions.tryCheckWorkspace( - workspaceId, - user.id - ); - if (!canRead) { - return false; - } - } - - const docId = new DocID(guid, workspaceId); - const doc = await this.docManager.get(docId.workspace, docId.guid); - - if (!doc) { - return false; - } - - const missing = Buffer.from( - encodeStateAsUpdate( - doc, - stateVector ? Buffer.from(stateVector, 'base64') : undefined - ) - ).toString('base64'); - const state = Buffer.from(encodeStateVector(doc)).toString('base64'); - - return { - missing, - state, - }; - } - @SubscribeMessage('client-update-v2') async handleClientUpdateV2( @MessageBody() diff --git a/packages/backend/server/src/fundamentals/config/def.ts b/packages/backend/server/src/fundamentals/config/def.ts index c946c19b7b..833de588a8 100644 --- a/packages/backend/server/src/fundamentals/config/def.ts +++ b/packages/backend/server/src/fundamentals/config/def.ts @@ -173,6 +173,7 @@ export interface AFFiNEConfig { */ featureFlags: { earlyAccessPreview: boolean; + syncClientVersionCheck: boolean; }; /** diff --git a/packages/backend/server/src/fundamentals/config/default.ts b/packages/backend/server/src/fundamentals/config/default.ts index d88312d1b1..49bc5a6085 100644 --- a/packages/backend/server/src/fundamentals/config/default.ts +++ b/packages/backend/server/src/fundamentals/config/default.ts @@ -116,6 +116,7 @@ export const getDefaultAFFiNEConfig: () => AFFiNEConfig = () => { }, featureFlags: { earlyAccessPreview: false, + syncClientVersionCheck: false, }, https: false, host: 'localhost', diff --git a/packages/common/infra/src/workspace/engine/sync/consts.ts b/packages/common/infra/src/workspace/engine/sync/consts.ts index e5fd2e8718..b71f04eed1 100644 --- a/packages/common/infra/src/workspace/engine/sync/consts.ts +++ b/packages/common/infra/src/workspace/engine/sync/consts.ts @@ -1,15 +1,23 @@ export enum SyncEngineStep { + // error + Rejected = -1, + // in progress Stopped = 0, Syncing = 1, + // finished Synced = 2, } export enum SyncPeerStep { + // error + VersionRejected = -1, + // in progress Stopped = 0, Retrying = 1, LoadingRootDoc = 2, LoadingSubDoc = 3, Loaded = 4.5, Syncing = 5, + // finished Synced = 6, } diff --git a/packages/common/infra/src/workspace/engine/sync/engine.ts b/packages/common/infra/src/workspace/engine/sync/engine.ts index 996d422a91..108b6a6460 100644 --- a/packages/common/infra/src/workspace/engine/sync/engine.ts +++ b/packages/common/infra/src/workspace/engine/sync/engine.ts @@ -13,6 +13,7 @@ export interface SyncEngineStatus { step: SyncEngineStep; local: SyncPeerStatus | null; remotes: (SyncPeerStatus | null)[]; + error: string | null; retrying: boolean; } @@ -82,6 +83,7 @@ export class SyncEngine { step: SyncEngineStep.Stopped, local: null, remotes: remotes.map(() => null), + error: null, retrying: false, }; } @@ -130,6 +132,7 @@ export class SyncEngine { step: SyncEngineStep.Stopped, local: null, remotes: this.remotes.map(() => null), + error: 'Sync progress manually stopped', retrying: false, }; } @@ -209,10 +212,18 @@ export class SyncEngine { updateSyncingState(local: SyncPeer | null, remotes: (SyncPeer | null)[]) { let step = SyncEngineStep.Synced; + let error = null; const allPeer = [local, ...remotes]; for (const peer of allPeer) { if (!peer || peer.status.step !== SyncPeerStep.Synced) { - step = SyncEngineStep.Syncing; + if (peer && peer.status.step <= 0) { + // step < 0 means reject connection by server with some reason + // so the data may be out of date + step = SyncEngineStep.Rejected; + error = peer.status.lastError; + } else { + step = SyncEngineStep.Syncing; + } break; } } @@ -220,6 +231,7 @@ export class SyncEngine { step, local: local?.status ?? null, remotes: remotes.map(peer => peer?.status ?? null), + error, retrying: allPeer.some( peer => peer?.status.step === SyncPeerStep.Retrying ), diff --git a/packages/common/infra/src/workspace/engine/sync/peer.ts b/packages/common/infra/src/workspace/engine/sync/peer.ts index fd465d7728..5a0900ac51 100644 --- a/packages/common/infra/src/workspace/engine/sync/peer.ts +++ b/packages/common/infra/src/workspace/engine/sync/peer.ts @@ -19,6 +19,7 @@ export interface SyncPeerStatus { loadedDocs: number; pendingPullUpdates: number; pendingPushUpdates: number; + lastError: string | null; } /** @@ -54,6 +55,7 @@ export class SyncPeer { loadedDocs: 0, pendingPullUpdates: 0, pendingPushUpdates: 0, + lastError: null, }; onStatusChange = new Slot(); readonly abort = new AbortController(); @@ -119,6 +121,7 @@ export class SyncPeer { loadedDocs: 0, pendingPullUpdates: 0, pendingPushUpdates: 0, + lastError: 'Retrying sync after 5 seconds', }; await Promise.race([ new Promise(resolve => { @@ -199,6 +202,7 @@ export class SyncPeer { abortInner.abort('subscribe disconnect:' + reason); } ); + throwIfAborted(abortInner.signal); // Step 1: load root doc @@ -368,7 +372,11 @@ export class SyncPeer { reportSyncStatus() { let step; - if (this.state.connectedDocs.size === 0) { + let lastError = null; + if (this.storage.errorMessage?.type === 'outdated') { + step = SyncPeerStep.VersionRejected; + lastError = this.storage.errorMessage.message.reason; + } else if (this.state.connectedDocs.size === 0) { step = SyncPeerStep.LoadingRootDoc; } else if (this.state.subdocsLoadQueue.length || this.state.subdocLoading) { step = SyncPeerStep.LoadingSubDoc; @@ -391,6 +399,7 @@ export class SyncPeer { this.state.pullUpdatesQueue.length + (this.state.subdocLoading ? 1 : 0), pendingPushUpdates: this.state.pushUpdatesQueue.length + (this.state.pushingUpdate ? 1 : 0), + lastError, }; } diff --git a/packages/common/infra/src/workspace/engine/sync/storage.ts b/packages/common/infra/src/workspace/engine/sync/storage.ts index 0e1011c5d4..02c56dd753 100644 --- a/packages/common/infra/src/workspace/engine/sync/storage.ts +++ b/packages/common/infra/src/workspace/engine/sync/storage.ts @@ -1,9 +1,22 @@ +export type RejectByVersion = { + currVersion: string; + requiredVersion: string; + reason: string; +}; + +export type SyncErrorMessage = { + type: 'outdated'; + message: RejectByVersion; +}; + export interface SyncStorage { /** * for debug */ name: string; + errorMessage?: SyncErrorMessage; + pull( docId: string, state: Uint8Array diff --git a/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx b/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx index 33853d80ac..8a4a121bf5 100644 --- a/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx +++ b/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx @@ -195,6 +195,14 @@ const useSyncEngineSyncProgress = () => { `Syncing with AFFiNE Cloud` + (progress ? ` (${Math.floor(progress * 100)}%)` : '') ); + } else if ( + syncEngineStatus && + syncEngineStatus.step < SyncEngineStep.Syncing + ) { + return ( + syncEngineStatus.error || + 'Disconnected, please check your network connection' + ); } if (syncEngineStatus.retrying) { return 'Sync disconnected due to unexpected issues, reconnecting.'; @@ -227,7 +235,7 @@ const useSyncEngineSyncProgress = () => { message: content, icon: currentWorkspace.flavour === WorkspaceFlavour.AFFINE_CLOUD ? ( - !isOnline ? ( + !isOnline || syncEngineStatus?.error ? ( ) : ( diff --git a/packages/frontend/workspace-impl/src/cloud/awareness.ts b/packages/frontend/workspace-impl/src/cloud/awareness.ts index e1ec52d47c..450ae25972 100644 --- a/packages/frontend/workspace-impl/src/cloud/awareness.ts +++ b/packages/frontend/workspace-impl/src/cloud/awareness.ts @@ -1,5 +1,5 @@ import { DebugLogger } from '@affine/debug'; -import type { AwarenessProvider } from '@toeverything/infra'; +import type { AwarenessProvider, RejectByVersion } from '@toeverything/infra'; import { applyAwarenessUpdate, type Awareness, @@ -33,6 +33,7 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider { window.addEventListener('beforeunload', this.windowBeforeUnloadHandler); this.socket.on('connect', () => this.handleConnect()); + this.socket.on('server-version-rejected', this.handleReject); if (this.socket.connected) { this.handleConnect(); @@ -40,6 +41,7 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider { this.socket.connect(); } } + disconnect(): void { removeAwarenessStates( this.awareness, @@ -54,6 +56,7 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider { this.newClientAwarenessInitHandler ); this.socket.off('connect', this.handleConnect); + this.socket.off('server-version-rejected', this.handleReject); window.removeEventListener('unload', this.windowBeforeUnloadHandler); } @@ -117,7 +120,16 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider { }; handleConnect = () => { - this.socket.emit('client-handshake-awareness', this.workspaceId); + this.socket.emit('client-handshake-awareness', { + workspaceId: this.workspaceId, + version: runtimeConfig.appVersion, + }); this.socket.emit('awareness-init', this.workspaceId); }; + + handleReject = (_msg: RejectByVersion) => { + this.socket.off('server-version-rejected', this.handleReject); + this.disconnect(); + this.socket.disconnect(); + }; } diff --git a/packages/frontend/workspace-impl/src/cloud/sync.ts b/packages/frontend/workspace-impl/src/cloud/sync.ts index 3f57b71fca..553bd8174b 100644 --- a/packages/frontend/workspace-impl/src/cloud/sync.ts +++ b/packages/frontend/workspace-impl/src/cloud/sync.ts @@ -1,6 +1,10 @@ import { DebugLogger } from '@affine/debug'; import { fetchWithTraceReport } from '@affine/graphql'; -import { type SyncStorage } from '@toeverything/infra'; +import { + type RejectByVersion, + type SyncErrorMessage, + type SyncStorage, +} from '@toeverything/infra'; import type { CleanupService } from '@toeverything/infra/lifecycle'; import { getIoManager } from '../utils/affine-io'; @@ -15,14 +19,17 @@ export class AffineSyncStorage implements SyncStorage { socket = getIoManager().socket('/'); + errorMessage?: SyncErrorMessage; + constructor( private readonly workspaceId: string, cleanupService: CleanupService ) { this.socket.on('connect', this.handleConnect); + this.socket.on('server-version-rejected', this.handleReject); if (this.socket.connected) { - this.socket.emit('client-handshake-sync', this.workspaceId); + this.handleConnect(); } else { this.socket.connect(); } @@ -33,7 +40,17 @@ export class AffineSyncStorage implements SyncStorage { } handleConnect = () => { - this.socket.emit('client-handshake-sync', this.workspaceId); + this.socket.emit('client-handshake-sync', { + workspaceId: this.workspaceId, + version: runtimeConfig.appVersion, + }); + }; + + handleReject = (message: RejectByVersion) => { + this.socket.off('server-version-rejected', this.handleReject); + this.cleanup(); + this.socket.disconnect(); + this.errorMessage = { type: 'outdated', message }; }; async pull(