fix(server): get doc diff from doc service (#10067)

close CLOUD-121

avoid sync server to merge doc updates

before

![image.png](https://graphite-user-uploaded-assets-prod.s3.amazonaws.com/hTwOityLamd4hitrae7M/054bf532-845d-427b-8cc4-f29e56f65720.png)

after

![image.png](https://graphite-user-uploaded-assets-prod.s3.amazonaws.com/hTwOityLamd4hitrae7M/fafe9244-c521-4af0-b131-5a6092eb5a16.png)
This commit is contained in:
fengmk2
2025-02-12 10:20:23 +00:00
parent 30612de1ad
commit db8557eafb
10 changed files with 384 additions and 46 deletions

View File

@@ -1,20 +1,18 @@
import {
ConsoleLogger,
INestApplication,
ModuleMetadata,
} from '@nestjs/common';
import { INestApplication, ModuleMetadata } from '@nestjs/common';
import type { NestExpressApplication } from '@nestjs/platform-express';
import { TestingModuleBuilder } from '@nestjs/testing';
import { User } from '@prisma/client';
import cookieParser from 'cookie-parser';
import graphqlUploadExpress from 'graphql-upload/graphqlUploadExpress.mjs';
import supertest from 'supertest';
import { ApplyType, GlobalExceptionFilter } from '../../base';
import { AFFiNELogger, ApplyType, GlobalExceptionFilter } from '../../base';
import { AuthService } from '../../core/auth';
import { UserModel } from '../../models';
import { createTestingModule } from './testing-module';
import { initTestingDB, TEST_LOG_LEVEL } from './utils';
interface TestingAppMeatdata extends ModuleMetadata {
interface TestingAppMetadata extends ModuleMetadata {
tapModule?(m: TestingModuleBuilder): void;
tapApp?(app: INestApplication): void;
}
@@ -22,16 +20,19 @@ interface TestingAppMeatdata extends ModuleMetadata {
export type TestUser = Omit<User, 'password'> & { password: string };
export async function createTestingApp(
moduleDef: TestingAppMeatdata = {}
moduleDef: TestingAppMetadata = {}
): Promise<TestingApp> {
const module = await createTestingModule(moduleDef, false);
const app = module.createNestApplication({
const app = module.createNestApplication<NestExpressApplication>({
cors: true,
bodyParser: true,
rawBody: true,
});
const logger = new ConsoleLogger();
if (AFFiNE.flavor.doc) {
app.useBodyParser('raw');
}
const logger = new AFFiNELogger();
logger.setLogLevels([TEST_LOG_LEVEL]);
app.useLogger(logger);

View File

@@ -24,6 +24,10 @@ export async function createApp() {
bufferLogs: true,
});
if (AFFiNE.flavor.doc) {
app.useBodyParser('raw');
}
app.useLogger(app.get(AFFiNELogger));
if (AFFiNE.server.path) {

View File

@@ -5,6 +5,7 @@ import type { ArgumentsHost, ExecutionContext } from '@nestjs/common';
import type { GqlContextType } from '@nestjs/graphql';
import { GqlArgumentsHost } from '@nestjs/graphql';
import type { Request, Response } from 'express';
import { ClsServiceManager } from 'nestjs-cls';
import type { Socket } from 'socket.io';
export function getRequestResponseFromHost(host: ArgumentsHost) {
@@ -87,9 +88,16 @@ export function parseCookies(
* - `ws`: websocket request
* - `se`: server event
* - `job`: cron job
* - `rpc`: rpc request
*/
export type RequestType = 'req' | 'ws' | 'se' | 'job';
export type RequestType = 'req' | 'ws' | 'se' | 'job' | 'rpc';
export function genRequestId(type: RequestType) {
return `${AFFiNE.flavor.type}:${type}-${randomUUID()}`;
}
export function getOrGenRequestId(type: RequestType) {
// The request id must exist in a cls context,
// but it can be lost in unexpected scenarios, such as unit tests, where it is automatically generated.
return ClsServiceManager.getClsService()?.getId() ?? genRequestId(type);
}

View File

@@ -1,15 +1,23 @@
import { Controller, Get, Logger, Param, Res } from '@nestjs/common';
import {
Controller,
Get,
Logger,
Param,
Post,
RawBody,
Res,
} from '@nestjs/common';
import type { Response } from 'express';
import { NotFound, SkipThrottle } from '../../base';
import { Internal } from '../auth';
import { PgWorkspaceDocStorageAdapter } from '../doc';
import { DatabaseDocReader } from '../doc';
@Controller('/rpc')
export class DocRpcController {
private readonly logger = new Logger(DocRpcController.name);
constructor(private readonly workspace: PgWorkspaceDocStorageAdapter) {}
constructor(private readonly docReader: DatabaseDocReader) {}
@SkipThrottle()
@Internal()
@@ -19,15 +27,47 @@ export class DocRpcController {
@Param('docId') docId: string,
@Res() res: Response
) {
const doc = await this.workspace.getDoc(workspaceId, docId);
const doc = await this.docReader.getDoc(workspaceId, docId);
if (!doc) {
throw new NotFound('Doc not found');
}
this.logger.log(`get doc ${docId} from workspace ${workspaceId}`);
this.logger.log(
`get doc ${docId} from workspace ${workspaceId}, size: ${doc.bin.length}`
);
res.setHeader('x-doc-timestamp', doc.timestamp.toString());
if (doc.editor) {
res.setHeader('x-doc-editor-id', doc.editor);
}
res.send(doc.bin);
}
@SkipThrottle()
@Internal()
@Post('/workspaces/:workspaceId/docs/:docId/diff')
async getDocDiff(
@Param('workspaceId') workspaceId: string,
@Param('docId') docId: string,
@RawBody() stateVector: Buffer | undefined,
@Res() res: Response
) {
const diff = await this.docReader.getDocDiff(
workspaceId,
docId,
stateVector
);
if (!diff) {
throw new NotFound('Doc not found');
}
this.logger.log(
`get doc diff ${docId} from workspace ${workspaceId}, missing size: ${diff.missing.length}, old state size: ${stateVector?.length}, new state size: ${diff.state.length}`
);
res.setHeader('x-doc-timestamp', diff.timestamp.toString());
res.setHeader('x-doc-missing-offset', `0,${diff.missing.length}`);
const stateOffset = diff.missing.length;
res.setHeader(
'x-doc-state-offset',
`${stateOffset},${stateOffset + diff.state.length}`
);
res.send(Buffer.concat([diff.missing, diff.state]));
}
}

View File

@@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto';
import { User, Workspace } from '@prisma/client';
import ava, { TestFn } from 'ava';
import { applyUpdate, Doc as YDoc } from 'yjs';
import { createTestingApp, type TestingApp } from '../../../__tests__/utils';
import { AppModule } from '../../../app.module';
@@ -70,3 +71,68 @@ test('should return doc when found', async t => {
t.is(doc!.timestamp, timestamp);
t.is(doc!.editor, user.id);
});
test('should return doc diff', async t => {
const { docReader } = t.context;
const docId = randomUUID();
const timestamp = Date.now();
let updates: Buffer[] = [];
const doc1 = new YDoc();
doc1.on('update', data => {
updates.push(Buffer.from(data));
});
const text = doc1.getText('content');
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
text.insert(11, '!');
await t.context.models.doc.createUpdates(
updates.map((update, index) => ({
spaceId: workspace.id,
docId,
blob: update,
timestamp: timestamp + index,
editorId: user.id,
}))
);
// clear updates
updates.splice(0, updates.length);
const doc2 = new YDoc();
const diff = await docReader.getDocDiff(workspace.id, docId);
t.truthy(diff);
t.truthy(diff!.missing);
t.truthy(diff!.state);
applyUpdate(doc2, diff!.missing);
t.is(doc2.getText('content').toString(), 'hello world!');
// nothing changed
const diff2 = await docReader.getDocDiff(workspace.id, docId, diff!.state);
t.truthy(diff2);
t.truthy(diff2!.missing);
t.deepEqual(diff2!.missing, new Uint8Array([0, 0]));
t.truthy(diff2!.state);
applyUpdate(doc2, diff2!.missing);
t.is(doc2.getText('content').toString(), 'hello world!');
// add new content on doc1
text.insert(12, '@');
await t.context.models.doc.createUpdates(
updates.map((update, index) => ({
spaceId: workspace.id,
docId,
blob: update,
timestamp: Date.now() + index + 1000,
editorId: user.id,
}))
);
const diff3 = await docReader.getDocDiff(workspace.id, docId, diff2!.state);
t.truthy(diff3);
t.truthy(diff3!.missing);
t.truthy(diff3!.state);
applyUpdate(doc2, diff3!.missing);
t.is(doc2.getText('content').toString(), 'hello world!@');
});

View File

@@ -3,6 +3,7 @@ import { mock } from 'node:test';
import { User, Workspace } from '@prisma/client';
import ava, { TestFn } from 'ava';
import { applyUpdate, Doc as YDoc } from 'yjs';
import { createTestingApp, type TestingApp } from '../../../__tests__/utils';
import { AppModule } from '../../../app.module';
@@ -78,7 +79,7 @@ test('should throw error when doc service internal error', async t => {
});
});
test('should fallback to database doc service when endpoint network error', async t => {
test('should fallback to database doc reader when endpoint network error', async t => {
const { docReader } = t.context;
t.context.config.docService.endpoint = 'http://localhost:13010';
const docId = randomUUID();
@@ -122,3 +123,106 @@ test('should return doc when found', async t => {
t.is(doc!.timestamp, timestamp);
t.is(doc!.editor, user.id);
});
test('should return doc diff', async t => {
const { docReader } = t.context;
const docId = randomUUID();
const timestamp = Date.now();
let updates: Buffer[] = [];
const doc1 = new YDoc();
doc1.on('update', data => {
updates.push(Buffer.from(data));
});
const text = doc1.getText('content');
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
text.insert(11, '!');
await t.context.models.doc.createUpdates(
updates.map((update, index) => ({
spaceId: workspace.id,
docId,
blob: update,
timestamp: timestamp + index,
editorId: user.id,
}))
);
// clear updates
updates.splice(0, updates.length);
const doc2 = new YDoc();
const diff = await docReader.getDocDiff(workspace.id, docId);
t.truthy(diff);
t.truthy(diff!.missing);
t.truthy(diff!.state);
applyUpdate(doc2, diff!.missing);
t.is(doc2.getText('content').toString(), 'hello world!');
// nothing changed
const diff2 = await docReader.getDocDiff(workspace.id, docId, diff!.state);
t.truthy(diff2);
t.truthy(diff2!.missing);
t.deepEqual(diff2!.missing, new Uint8Array([0, 0]));
t.truthy(diff2!.state);
applyUpdate(doc2, diff2!.missing);
t.is(doc2.getText('content').toString(), 'hello world!');
// add new content on doc1
text.insert(12, '@');
await t.context.models.doc.createUpdates(
updates.map((update, index) => ({
spaceId: workspace.id,
docId,
blob: update,
timestamp: Date.now() + index + 1000,
editorId: user.id,
}))
);
const diff3 = await docReader.getDocDiff(workspace.id, docId, diff2!.state);
t.truthy(diff3);
t.truthy(diff3!.missing);
t.truthy(diff3!.state);
applyUpdate(doc2, diff3!.missing);
t.is(doc2.getText('content').toString(), 'hello world!@');
});
test('should get doc diff fallback to database doc reader when endpoint network error', async t => {
const { docReader } = t.context;
t.context.config.docService.endpoint = 'http://localhost:13010';
const docId = randomUUID();
const timestamp = Date.now();
let updates: Buffer[] = [];
const doc1 = new YDoc();
doc1.on('update', data => {
updates.push(Buffer.from(data));
});
const text = doc1.getText('content');
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
text.insert(11, '!');
await t.context.models.doc.createUpdates(
updates.map((update, index) => ({
spaceId: workspace.id,
docId,
blob: update,
timestamp: timestamp + index,
editorId: user.id,
}))
);
// clear updates
updates.splice(0, updates.length);
const doc2 = new YDoc();
const diff = await docReader.getDocDiff(workspace.id, docId);
t.truthy(diff);
t.truthy(diff!.missing);
t.truthy(diff!.state);
applyUpdate(doc2, diff!.missing);
t.is(doc2.getText('content').toString(), 'hello world!');
});

View File

@@ -8,7 +8,7 @@ import { PgUserspaceDocStorageAdapter } from './adapters/userspace';
import { PgWorkspaceDocStorageAdapter } from './adapters/workspace';
import { DocStorageCronJob } from './job';
import { DocStorageOptions } from './options';
import { DocReader, DocReaderProvider } from './reader';
import { DatabaseDocReader, DocReader, DocReaderProvider } from './reader';
@Module({
imports: [QuotaModule, PermissionModule],
@@ -18,8 +18,10 @@ import { DocReader, DocReaderProvider } from './reader';
PgUserspaceDocStorageAdapter,
DocStorageCronJob,
DocReaderProvider,
DatabaseDocReader,
],
exports: [
DatabaseDocReader,
DocReader,
PgWorkspaceDocStorageAdapter,
PgUserspaceDocStorageAdapter,
@@ -27,6 +29,8 @@ import { DocReader, DocReaderProvider } from './reader';
})
export class DocStorageModule {}
export {
// only for doc-service
DatabaseDocReader,
DocReader,
PgUserspaceDocStorageAdapter,
PgWorkspaceDocStorageAdapter,

View File

@@ -1,16 +1,36 @@
import { FactoryProvider, Injectable, Logger } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { ClsService } from 'nestjs-cls';
import { diffUpdate, encodeStateVectorFromUpdate } from 'yjs';
import { Config, CryptoHelper, UserFriendlyError } from '../../base';
import {
Config,
CryptoHelper,
getOrGenRequestId,
UserFriendlyError,
} from '../../base';
import { PgWorkspaceDocStorageAdapter } from './adapters/workspace';
import { type DocRecord } from './storage';
import { type DocDiff, type DocRecord } from './storage';
export abstract class DocReader {
abstract getDoc(
workspaceId: string,
docId: string
): Promise<DocRecord | null>;
abstract getDocDiff(
spaceId: string,
docId: string,
stateVector?: Uint8Array
): Promise<DocDiff | null>;
protected docDiff(update: Uint8Array, stateVector?: Uint8Array) {
const missing = stateVector ? diffUpdate(update, stateVector) : update;
const state = encodeStateVectorFromUpdate(update);
return {
missing,
state,
};
}
}
@Injectable()
@@ -22,6 +42,21 @@ export class DatabaseDocReader extends DocReader {
async getDoc(workspaceId: string, docId: string): Promise<DocRecord | null> {
return await this.workspace.getDoc(workspaceId, docId);
}
async getDocDiff(
spaceId: string,
docId: string,
stateVector?: Uint8Array
): Promise<DocDiff | null> {
const doc = await this.workspace.getDoc(spaceId, docId);
if (!doc) {
return null;
}
return {
...this.docDiff(doc.bin, stateVector),
timestamp: doc.timestamp,
};
}
}
@Injectable()
@@ -31,33 +66,52 @@ export class RpcDocReader extends DatabaseDocReader {
constructor(
private readonly config: Config,
private readonly crypto: CryptoHelper,
private readonly cls: ClsService,
protected override readonly workspace: PgWorkspaceDocStorageAdapter
) {
super(workspace);
}
private async fetch(
accessToken: string,
url: string,
method: 'GET' | 'POST',
body?: Uint8Array
) {
const headers: Record<string, string> = {
'x-access-token': accessToken,
'x-cloud-trace-context': getOrGenRequestId('rpc'),
};
if (body) {
headers['content-type'] = 'application/octet-stream';
}
const res = await fetch(url, {
method,
headers,
body,
});
if (!res.ok) {
if (res.status === 404) {
return null;
}
const body = (await res.json()) as UserFriendlyError;
throw UserFriendlyError.fromUserFriendlyErrorJSON(body);
}
return res;
}
override async getDoc(
workspaceId: string,
docId: string
): Promise<DocRecord | null> {
const url = `${this.config.docService.endpoint}/rpc/workspaces/${workspaceId}/docs/${docId}`;
const accessToken = this.crypto.sign(docId);
try {
const res = await fetch(url, {
headers: {
'x-access-token': this.crypto.sign(docId),
'x-cloud-trace-context': this.cls.getId(),
},
});
if (!res.ok) {
if (res.status === 404) {
return null;
}
const body = (await res.json()) as UserFriendlyError;
throw UserFriendlyError.fromUserFriendlyErrorJSON(body);
const res = await this.fetch(accessToken, url, 'GET');
if (!res) {
return null;
}
const timestamp = res.headers.get('x-doc-timestamp') as string;
const editor = res.headers.get('x-doc-editor-id') as string;
const editor = res.headers.get('x-doc-editor-id') ?? undefined;
const bin = await res.arrayBuffer();
return {
spaceId: workspaceId,
@@ -66,19 +120,67 @@ export class RpcDocReader extends DatabaseDocReader {
timestamp: parseInt(timestamp),
editor,
};
} catch (err) {
if (err instanceof UserFriendlyError) {
throw err;
} catch (e) {
if (e instanceof UserFriendlyError) {
throw e;
}
const err = e as Error;
// other error
this.logger.error(
`Failed to fetch doc ${url}, error: ${err}`,
(err as Error).stack
`Failed to fetch doc ${url}, fallback to database doc reader`,
err.stack
);
// fallback to database doc service if the error is not user friendly, like network error
if (err.cause instanceof Error) {
this.logger.error(err.cause.stack);
}
// fallback to database doc reader if the error is not user friendly, like network error
return await super.getDoc(workspaceId, docId);
}
}
override async getDocDiff(
workspaceId: string,
docId: string,
stateVector?: Uint8Array
): Promise<DocDiff | null> {
const url = `${this.config.docService.endpoint}/rpc/workspaces/${workspaceId}/docs/${docId}/diff`;
const accessToken = this.crypto.sign(docId);
try {
const res = await this.fetch(accessToken, url, 'POST', stateVector);
if (!res) {
return null;
}
const timestamp = res.headers.get('x-doc-timestamp') as string;
// blob missing data offset [0, 123]
// x-doc-missing-offset: 0,123
// blob stateVector data offset [124,789]
// x-doc-state-offset: 124,789
const missingOffset = res.headers.get('x-doc-missing-offset') as string;
const [missingStart, missingEnd] = missingOffset.split(',').map(Number);
const stateOffset = res.headers.get('x-doc-state-offset') as string;
const [stateStart, stateEnd] = stateOffset.split(',').map(Number);
const bin = await res.arrayBuffer();
return {
missing: new Uint8Array(bin, missingStart, missingEnd - missingStart),
state: new Uint8Array(bin, stateStart, stateEnd - stateStart),
timestamp: parseInt(timestamp),
};
} catch (e) {
if (e instanceof UserFriendlyError) {
throw e;
}
const err = e as Error;
this.logger.error(
`Failed to fetch doc diff ${url}, fallback to database doc reader`,
err.stack
);
if (err.cause instanceof Error) {
this.logger.error(err.cause.stack);
}
// fallback to database doc reader if the error is not user friendly, like network error
return await super.getDocDiff(workspaceId, docId, stateVector);
}
}
}
export const DocReaderProvider: FactoryProvider = {

View File

@@ -26,6 +26,7 @@ export class SpaceStorage extends Connection {
export { BlobStorageAdapter, type BlobStorageOptions } from './blob';
export {
type DocDiff,
type DocRecord,
DocStorageAdapter,
type DocStorageOptions,

View File

@@ -23,6 +23,7 @@ import {
} from '../../base';
import { CurrentUser } from '../auth';
import {
DocReader,
DocStorageAdapter,
PgUserspaceDocStorageAdapter,
PgWorkspaceDocStorageAdapter,
@@ -144,7 +145,8 @@ export class SpaceSyncGateway
private readonly runtime: Runtime,
private readonly permissions: PermissionService,
private readonly workspace: PgWorkspaceDocStorageAdapter,
private readonly userspace: PgUserspaceDocStorageAdapter
private readonly userspace: PgUserspaceDocStorageAdapter,
private readonly docReader: DocReader
) {}
handleConnection() {
@@ -167,7 +169,8 @@ export class SpaceSyncGateway
const workspace = new WorkspaceSyncAdapter(
client,
this.workspace,
this.permissions
this.permissions,
this.docReader
);
const userspace = new UserspaceSyncAdapter(client, this.userspace);
@@ -671,7 +674,8 @@ class WorkspaceSyncAdapter extends SyncSocketAdapter {
constructor(
client: Socket,
storage: DocStorageAdapter,
private readonly permission: PermissionService
private readonly permission: PermissionService,
private readonly docReader: DocReader
) {
super(SpaceType.Workspace, client, storage);
}
@@ -686,9 +690,13 @@ class WorkspaceSyncAdapter extends SyncSocketAdapter {
return super.push(spaceId, id.guid, updates, editorId);
}
override diff(spaceId: string, docId: string, stateVector?: Uint8Array) {
override async diff(
spaceId: string,
docId: string,
stateVector?: Uint8Array
) {
const id = new DocID(docId, spaceId);
return this.storage.getDocDiff(spaceId, id.guid, stateVector);
return await this.docReader.getDocDiff(spaceId, id.guid, stateVector);
}
async assertAccessible(