feat: sync client versioning (#5645)

after this pr, server will only accept client that have some major version
the client version <0.12 will be rejected by the server, >= 0.12 can receive outdated messages and notify users
This commit is contained in:
DarkSky
2024-02-05 08:43:50 +00:00
parent 5ca0d65241
commit 25e8a2a22f
15 changed files with 144 additions and 143 deletions

View File

@@ -113,6 +113,7 @@ const createHelmCommand = ({ isDryRun }) => {
`--set-string graphql.app.payment.stripe.webhookKey="${STRIPE_WEBHOOK_KEY}"`, `--set-string graphql.app.payment.stripe.webhookKey="${STRIPE_WEBHOOK_KEY}"`,
`--set graphql.app.experimental.enableJwstCodec=true`, `--set graphql.app.experimental.enableJwstCodec=true`,
`--set graphql.app.features.earlyAccessPreview=false`, `--set graphql.app.features.earlyAccessPreview=false`,
`--set graphql.app.features.syncClientVersionCheck=true`,
`--set sync.replicaCount=${syncReplicaCount}`, `--set sync.replicaCount=${syncReplicaCount}`,
`--set-string sync.image.tag="${imageTag}"`, `--set-string sync.image.tag="${imageTag}"`,
...serviceAnnotations, ...serviceAnnotations,

View File

@@ -83,6 +83,8 @@ spec:
value: "{{ .Values.app.captcha.enabled }}" value: "{{ .Values.app.captcha.enabled }}"
- name: FEATURES_EARLY_ACCESS_PREVIEW - name: FEATURES_EARLY_ACCESS_PREVIEW
value: "{{ .Values.app.features.earlyAccessPreview }}" value: "{{ .Values.app.features.earlyAccessPreview }}"
- name: FEATURES_SYNC_CLIENT_VERSION_CHECK
value: "{{ .Values.app.features.syncClientVersionCheck }}"
- name: OAUTH_EMAIL_SENDER - name: OAUTH_EMAIL_SENDER
valueFrom: valueFrom:
secretKeyRef: secretKeyRef:

View File

@@ -60,6 +60,7 @@ app:
webhookKey: '' webhookKey: ''
features: features:
earlyAccessPreview: false earlyAccessPreview: false
syncClientVersionCheck: false
serviceAccount: serviceAccount:
create: true create: true

View File

@@ -34,4 +34,8 @@ AFFiNE.ENV_MAP = {
STRIPE_API_KEY: 'plugins.payment.stripe.keys.APIKey', STRIPE_API_KEY: 'plugins.payment.stripe.keys.APIKey',
STRIPE_WEBHOOK_KEY: 'plugins.payment.stripe.keys.webhookKey', STRIPE_WEBHOOK_KEY: 'plugins.payment.stripe.keys.webhookKey',
FEATURES_EARLY_ACCESS_PREVIEW: ['featureFlags.earlyAccessPreview', 'boolean'], FEATURES_EARLY_ACCESS_PREVIEW: ['featureFlags.earlyAccessPreview', 'boolean'],
FEATURES_SYNC_CLIENT_VERSION_CHECK: [
'featureFlags.syncClientVersionCheck',
'boolean',
],
}; };

View File

@@ -1,4 +1,4 @@
enum EventErrorCode { export enum EventErrorCode {
WORKSPACE_NOT_FOUND = 'WORKSPACE_NOT_FOUND', WORKSPACE_NOT_FOUND = 'WORKSPACE_NOT_FOUND',
DOC_NOT_FOUND = 'DOC_NOT_FOUND', DOC_NOT_FOUND = 'DOC_NOT_FOUND',
NOT_IN_WORKSPACE = 'NOT_IN_WORKSPACE', NOT_IN_WORKSPACE = 'NOT_IN_WORKSPACE',

View File

@@ -22,6 +22,7 @@ import {
AccessDeniedError, AccessDeniedError,
DocNotFoundError, DocNotFoundError,
EventError, EventError,
EventErrorCode,
InternalError, InternalError,
NotInWorkspaceError, NotInWorkspaceError,
} from './error'; } from './error';
@@ -112,13 +113,42 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
metrics.socketio.gauge('realtime_connections').record(this.connectionCount); metrics.socketio.gauge('realtime_connections').record(this.connectionCount);
} }
checkVersion(client: Socket, version?: string) {
if (
// @todo(@darkskygit): remove this flag after 0.12 goes stable
AFFiNE.featureFlags.syncClientVersionCheck &&
version !== AFFiNE.version
) {
client.emit('server-version-rejected', {
currentVersion: version,
requiredVersion: AFFiNE.version,
reason: `Client version${
version ? ` ${version}` : ''
} is outdated, please update to ${AFFiNE.version}`,
});
return {
error: new EventError(
EventErrorCode.VERSION_REJECTED,
`Client version ${version} is outdated, please update to ${AFFiNE.version}`
),
};
}
return null;
}
@Auth() @Auth()
@SubscribeMessage('client-handshake-sync') @SubscribeMessage('client-handshake-sync')
async handleClientHandshakeSync( async handleClientHandshakeSync(
@CurrentUser() user: UserType, @CurrentUser() user: UserType,
@MessageBody() workspaceId: string, @MessageBody('workspaceId') workspaceId: string,
@MessageBody('version') version: string | undefined,
@ConnectedSocket() client: Socket @ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> { ): Promise<EventResponse<{ clientId: string }>> {
const versionError = this.checkVersion(client, version);
if (versionError) {
return versionError;
}
const canWrite = await this.permissions.tryCheckWorkspace( const canWrite = await this.permissions.tryCheckWorkspace(
workspaceId, workspaceId,
user.id, user.id,
@@ -143,9 +173,15 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@SubscribeMessage('client-handshake-awareness') @SubscribeMessage('client-handshake-awareness')
async handleClientHandshakeAwareness( async handleClientHandshakeAwareness(
@CurrentUser() user: UserType, @CurrentUser() user: UserType,
@MessageBody() workspaceId: string, @MessageBody('workspaceId') workspaceId: string,
@MessageBody('version') version: string | undefined,
@ConnectedSocket() client: Socket @ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> { ): Promise<EventResponse<{ clientId: string }>> {
const versionError = this.checkVersion(client, version);
if (versionError) {
return versionError;
}
const canWrite = await this.permissions.tryCheckWorkspace( const canWrite = await this.permissions.tryCheckWorkspace(
workspaceId, workspaceId,
user.id, user.id,
@@ -172,29 +208,17 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@Auth() @Auth()
@SubscribeMessage('client-handshake') @SubscribeMessage('client-handshake')
async handleClientHandShake( async handleClientHandShake(
@CurrentUser() user: UserType, @MessageBody() workspaceId: string,
@MessageBody()
workspaceId: string,
@ConnectedSocket() client: Socket @ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> { ): Promise<EventResponse<{ clientId: string }>> {
const canWrite = await this.permissions.tryCheckWorkspace( const versionError = this.checkVersion(client);
workspaceId, if (versionError) {
user.id, return versionError;
Permission.Write
);
if (canWrite) {
await client.join([`${workspaceId}:sync`, `${workspaceId}:awareness`]);
return {
data: {
clientId: client.id,
},
};
} else {
return {
error: new AccessDeniedError(workspaceId),
};
} }
// should unreachable
return {
error: new AccessDeniedError(workspaceId),
};
} }
@SubscribeMessage('client-leave-sync') @SubscribeMessage('client-leave-sync')
@@ -227,118 +251,6 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
} }
} }
/**
* @deprecated use `client-leave-sync` and `client-leave-awareness` instead
*/
@SubscribeMessage('client-leave')
async handleClientLeave(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
if (client.rooms.has(`${workspaceId}:sync`)) {
await client.leave(`${workspaceId}:sync`);
}
if (client.rooms.has(`${workspaceId}:awareness`)) {
await client.leave(`${workspaceId}:awareness`);
}
return {};
}
/**
* This is the old version of the `client-update` event without any data protocol.
* It only exists for backwards compatibility to adapt older clients.
*
* @deprecated
*/
@SubscribeMessage('client-update')
async handleClientUpdateV1(
@MessageBody()
{
workspaceId,
guid,
update,
}: {
workspaceId: string;
guid: string;
update: string;
},
@ConnectedSocket() client: Socket
) {
if (!client.rooms.has(`${workspaceId}:sync`)) {
this.logger.verbose(
`Client ${client.id} tried to push update to workspace ${workspaceId} without joining it first`
);
return;
}
const docId = new DocID(guid, workspaceId);
client
.to(`${docId.workspace}:sync`)
.emit('server-update', { workspaceId, guid, update });
// broadcast to all clients with newer version that only listen to `server-updates`
client
.to(`${docId.workspace}:sync`)
.emit('server-updates', { workspaceId, guid, updates: [update] });
const buf = Buffer.from(update, 'base64');
await this.docManager.push(docId.workspace, docId.guid, buf);
}
/**
* This is the old version of the `doc-load` event without any data protocol.
* It only exists for backwards compatibility to adapt older clients.
*
* @deprecated
*/
@Auth()
@SubscribeMessage('doc-load')
async loadDocV1(
@ConnectedSocket() client: Socket,
@CurrentUser() user: UserType,
@MessageBody()
{
workspaceId,
guid,
stateVector,
}: {
workspaceId: string;
guid: string;
stateVector?: string;
}
): Promise<{ missing: string; state?: string } | false> {
if (!client.rooms.has(`${workspaceId}:sync`)) {
const canRead = await this.permissions.tryCheckWorkspace(
workspaceId,
user.id
);
if (!canRead) {
return false;
}
}
const docId = new DocID(guid, workspaceId);
const doc = await this.docManager.get(docId.workspace, docId.guid);
if (!doc) {
return false;
}
const missing = Buffer.from(
encodeStateAsUpdate(
doc,
stateVector ? Buffer.from(stateVector, 'base64') : undefined
)
).toString('base64');
const state = Buffer.from(encodeStateVector(doc)).toString('base64');
return {
missing,
state,
};
}
@SubscribeMessage('client-update-v2') @SubscribeMessage('client-update-v2')
async handleClientUpdateV2( async handleClientUpdateV2(
@MessageBody() @MessageBody()

View File

@@ -173,6 +173,7 @@ export interface AFFiNEConfig {
*/ */
featureFlags: { featureFlags: {
earlyAccessPreview: boolean; earlyAccessPreview: boolean;
syncClientVersionCheck: boolean;
}; };
/** /**

View File

@@ -116,6 +116,7 @@ export const getDefaultAFFiNEConfig: () => AFFiNEConfig = () => {
}, },
featureFlags: { featureFlags: {
earlyAccessPreview: false, earlyAccessPreview: false,
syncClientVersionCheck: false,
}, },
https: false, https: false,
host: 'localhost', host: 'localhost',

View File

@@ -1,15 +1,23 @@
export enum SyncEngineStep { export enum SyncEngineStep {
// error
Rejected = -1,
// in progress
Stopped = 0, Stopped = 0,
Syncing = 1, Syncing = 1,
// finished
Synced = 2, Synced = 2,
} }
export enum SyncPeerStep { export enum SyncPeerStep {
// error
VersionRejected = -1,
// in progress
Stopped = 0, Stopped = 0,
Retrying = 1, Retrying = 1,
LoadingRootDoc = 2, LoadingRootDoc = 2,
LoadingSubDoc = 3, LoadingSubDoc = 3,
Loaded = 4.5, Loaded = 4.5,
Syncing = 5, Syncing = 5,
// finished
Synced = 6, Synced = 6,
} }

View File

@@ -13,6 +13,7 @@ export interface SyncEngineStatus {
step: SyncEngineStep; step: SyncEngineStep;
local: SyncPeerStatus | null; local: SyncPeerStatus | null;
remotes: (SyncPeerStatus | null)[]; remotes: (SyncPeerStatus | null)[];
error: string | null;
retrying: boolean; retrying: boolean;
} }
@@ -82,6 +83,7 @@ export class SyncEngine {
step: SyncEngineStep.Stopped, step: SyncEngineStep.Stopped,
local: null, local: null,
remotes: remotes.map(() => null), remotes: remotes.map(() => null),
error: null,
retrying: false, retrying: false,
}; };
} }
@@ -130,6 +132,7 @@ export class SyncEngine {
step: SyncEngineStep.Stopped, step: SyncEngineStep.Stopped,
local: null, local: null,
remotes: this.remotes.map(() => null), remotes: this.remotes.map(() => null),
error: 'Sync progress manually stopped',
retrying: false, retrying: false,
}; };
} }
@@ -209,10 +212,18 @@ export class SyncEngine {
updateSyncingState(local: SyncPeer | null, remotes: (SyncPeer | null)[]) { updateSyncingState(local: SyncPeer | null, remotes: (SyncPeer | null)[]) {
let step = SyncEngineStep.Synced; let step = SyncEngineStep.Synced;
let error = null;
const allPeer = [local, ...remotes]; const allPeer = [local, ...remotes];
for (const peer of allPeer) { for (const peer of allPeer) {
if (!peer || peer.status.step !== SyncPeerStep.Synced) { if (!peer || peer.status.step !== SyncPeerStep.Synced) {
step = SyncEngineStep.Syncing; if (peer && peer.status.step <= 0) {
// step < 0 means reject connection by server with some reason
// so the data may be out of date
step = SyncEngineStep.Rejected;
error = peer.status.lastError;
} else {
step = SyncEngineStep.Syncing;
}
break; break;
} }
} }
@@ -220,6 +231,7 @@ export class SyncEngine {
step, step,
local: local?.status ?? null, local: local?.status ?? null,
remotes: remotes.map(peer => peer?.status ?? null), remotes: remotes.map(peer => peer?.status ?? null),
error,
retrying: allPeer.some( retrying: allPeer.some(
peer => peer?.status.step === SyncPeerStep.Retrying peer => peer?.status.step === SyncPeerStep.Retrying
), ),

View File

@@ -19,6 +19,7 @@ export interface SyncPeerStatus {
loadedDocs: number; loadedDocs: number;
pendingPullUpdates: number; pendingPullUpdates: number;
pendingPushUpdates: number; pendingPushUpdates: number;
lastError: string | null;
} }
/** /**
@@ -54,6 +55,7 @@ export class SyncPeer {
loadedDocs: 0, loadedDocs: 0,
pendingPullUpdates: 0, pendingPullUpdates: 0,
pendingPushUpdates: 0, pendingPushUpdates: 0,
lastError: null,
}; };
onStatusChange = new Slot<SyncPeerStatus>(); onStatusChange = new Slot<SyncPeerStatus>();
readonly abort = new AbortController(); readonly abort = new AbortController();
@@ -119,6 +121,7 @@ export class SyncPeer {
loadedDocs: 0, loadedDocs: 0,
pendingPullUpdates: 0, pendingPullUpdates: 0,
pendingPushUpdates: 0, pendingPushUpdates: 0,
lastError: 'Retrying sync after 5 seconds',
}; };
await Promise.race([ await Promise.race([
new Promise<void>(resolve => { new Promise<void>(resolve => {
@@ -199,6 +202,7 @@ export class SyncPeer {
abortInner.abort('subscribe disconnect:' + reason); abortInner.abort('subscribe disconnect:' + reason);
} }
); );
throwIfAborted(abortInner.signal); throwIfAborted(abortInner.signal);
// Step 1: load root doc // Step 1: load root doc
@@ -368,7 +372,11 @@ export class SyncPeer {
reportSyncStatus() { reportSyncStatus() {
let step; let step;
if (this.state.connectedDocs.size === 0) { let lastError = null;
if (this.storage.errorMessage?.type === 'outdated') {
step = SyncPeerStep.VersionRejected;
lastError = this.storage.errorMessage.message.reason;
} else if (this.state.connectedDocs.size === 0) {
step = SyncPeerStep.LoadingRootDoc; step = SyncPeerStep.LoadingRootDoc;
} else if (this.state.subdocsLoadQueue.length || this.state.subdocLoading) { } else if (this.state.subdocsLoadQueue.length || this.state.subdocLoading) {
step = SyncPeerStep.LoadingSubDoc; step = SyncPeerStep.LoadingSubDoc;
@@ -391,6 +399,7 @@ export class SyncPeer {
this.state.pullUpdatesQueue.length + (this.state.subdocLoading ? 1 : 0), this.state.pullUpdatesQueue.length + (this.state.subdocLoading ? 1 : 0),
pendingPushUpdates: pendingPushUpdates:
this.state.pushUpdatesQueue.length + (this.state.pushingUpdate ? 1 : 0), this.state.pushUpdatesQueue.length + (this.state.pushingUpdate ? 1 : 0),
lastError,
}; };
} }

View File

@@ -1,9 +1,22 @@
export type RejectByVersion = {
currVersion: string;
requiredVersion: string;
reason: string;
};
export type SyncErrorMessage = {
type: 'outdated';
message: RejectByVersion;
};
export interface SyncStorage { export interface SyncStorage {
/** /**
* for debug * for debug
*/ */
name: string; name: string;
errorMessage?: SyncErrorMessage;
pull( pull(
docId: string, docId: string,
state: Uint8Array state: Uint8Array

View File

@@ -195,6 +195,14 @@ const useSyncEngineSyncProgress = () => {
`Syncing with AFFiNE Cloud` + `Syncing with AFFiNE Cloud` +
(progress ? ` (${Math.floor(progress * 100)}%)` : '') (progress ? ` (${Math.floor(progress * 100)}%)` : '')
); );
} else if (
syncEngineStatus &&
syncEngineStatus.step < SyncEngineStep.Syncing
) {
return (
syncEngineStatus.error ||
'Disconnected, please check your network connection'
);
} }
if (syncEngineStatus.retrying) { if (syncEngineStatus.retrying) {
return 'Sync disconnected due to unexpected issues, reconnecting.'; return 'Sync disconnected due to unexpected issues, reconnecting.';
@@ -227,7 +235,7 @@ const useSyncEngineSyncProgress = () => {
message: content, message: content,
icon: icon:
currentWorkspace.flavour === WorkspaceFlavour.AFFINE_CLOUD ? ( currentWorkspace.flavour === WorkspaceFlavour.AFFINE_CLOUD ? (
!isOnline ? ( !isOnline || syncEngineStatus?.error ? (
<OfflineStatus /> <OfflineStatus />
) : ( ) : (
<CloudWorkspaceSyncStatus /> <CloudWorkspaceSyncStatus />

View File

@@ -1,5 +1,5 @@
import { DebugLogger } from '@affine/debug'; import { DebugLogger } from '@affine/debug';
import type { AwarenessProvider } from '@toeverything/infra'; import type { AwarenessProvider, RejectByVersion } from '@toeverything/infra';
import { import {
applyAwarenessUpdate, applyAwarenessUpdate,
type Awareness, type Awareness,
@@ -33,6 +33,7 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider {
window.addEventListener('beforeunload', this.windowBeforeUnloadHandler); window.addEventListener('beforeunload', this.windowBeforeUnloadHandler);
this.socket.on('connect', () => this.handleConnect()); this.socket.on('connect', () => this.handleConnect());
this.socket.on('server-version-rejected', this.handleReject);
if (this.socket.connected) { if (this.socket.connected) {
this.handleConnect(); this.handleConnect();
@@ -40,6 +41,7 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider {
this.socket.connect(); this.socket.connect();
} }
} }
disconnect(): void { disconnect(): void {
removeAwarenessStates( removeAwarenessStates(
this.awareness, this.awareness,
@@ -54,6 +56,7 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider {
this.newClientAwarenessInitHandler this.newClientAwarenessInitHandler
); );
this.socket.off('connect', this.handleConnect); this.socket.off('connect', this.handleConnect);
this.socket.off('server-version-rejected', this.handleReject);
window.removeEventListener('unload', this.windowBeforeUnloadHandler); window.removeEventListener('unload', this.windowBeforeUnloadHandler);
} }
@@ -117,7 +120,16 @@ export class AffineCloudAwarenessProvider implements AwarenessProvider {
}; };
handleConnect = () => { handleConnect = () => {
this.socket.emit('client-handshake-awareness', this.workspaceId); this.socket.emit('client-handshake-awareness', {
workspaceId: this.workspaceId,
version: runtimeConfig.appVersion,
});
this.socket.emit('awareness-init', this.workspaceId); this.socket.emit('awareness-init', this.workspaceId);
}; };
handleReject = (_msg: RejectByVersion) => {
this.socket.off('server-version-rejected', this.handleReject);
this.disconnect();
this.socket.disconnect();
};
} }

View File

@@ -1,6 +1,10 @@
import { DebugLogger } from '@affine/debug'; import { DebugLogger } from '@affine/debug';
import { fetchWithTraceReport } from '@affine/graphql'; import { fetchWithTraceReport } from '@affine/graphql';
import { type SyncStorage } from '@toeverything/infra'; import {
type RejectByVersion,
type SyncErrorMessage,
type SyncStorage,
} from '@toeverything/infra';
import type { CleanupService } from '@toeverything/infra/lifecycle'; import type { CleanupService } from '@toeverything/infra/lifecycle';
import { getIoManager } from '../utils/affine-io'; import { getIoManager } from '../utils/affine-io';
@@ -15,14 +19,17 @@ export class AffineSyncStorage implements SyncStorage {
socket = getIoManager().socket('/'); socket = getIoManager().socket('/');
errorMessage?: SyncErrorMessage;
constructor( constructor(
private readonly workspaceId: string, private readonly workspaceId: string,
cleanupService: CleanupService cleanupService: CleanupService
) { ) {
this.socket.on('connect', this.handleConnect); this.socket.on('connect', this.handleConnect);
this.socket.on('server-version-rejected', this.handleReject);
if (this.socket.connected) { if (this.socket.connected) {
this.socket.emit('client-handshake-sync', this.workspaceId); this.handleConnect();
} else { } else {
this.socket.connect(); this.socket.connect();
} }
@@ -33,7 +40,17 @@ export class AffineSyncStorage implements SyncStorage {
} }
handleConnect = () => { handleConnect = () => {
this.socket.emit('client-handshake-sync', this.workspaceId); this.socket.emit('client-handshake-sync', {
workspaceId: this.workspaceId,
version: runtimeConfig.appVersion,
});
};
handleReject = (message: RejectByVersion) => {
this.socket.off('server-version-rejected', this.handleReject);
this.cleanup();
this.socket.disconnect();
this.errorMessage = { type: 'outdated', message };
}; };
async pull( async pull(