mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-13 12:55:00 +00:00
refactor(server): move bin content parser to doc reader (#10302)
This commit is contained in:
@@ -187,7 +187,7 @@ test('should generate request id for event', async t => {
|
||||
|
||||
await eventbus.emitAsync('__test__.requestId', {});
|
||||
|
||||
t.true(listeners.onRequestId.lastCall.returnValue.includes(':event/'));
|
||||
t.true(listeners.onRequestId.lastCall.returnValue.includes(':event:'));
|
||||
});
|
||||
|
||||
test('should continuously use the same request id', async t => {
|
||||
|
||||
@@ -10,7 +10,7 @@ import Sinon from 'sinon';
|
||||
import { AppModule } from '../app.module';
|
||||
import { EventBus } from '../base';
|
||||
import { AuthService } from '../core/auth';
|
||||
import { DocContentService } from '../core/doc-renderer';
|
||||
import { DocReader } from '../core/doc';
|
||||
import { DocRole, PermissionService, WorkspaceRole } from '../core/permission';
|
||||
import { WorkspaceType } from '../core/workspaces';
|
||||
import { Models } from '../models';
|
||||
@@ -53,7 +53,7 @@ test.before(async t => {
|
||||
module
|
||||
.overrideProvider(EventBus)
|
||||
.useValue(Sinon.createStubInstance(EventBus));
|
||||
module.overrideProvider(DocContentService).useValue({
|
||||
module.overrideProvider(DocReader).useValue({
|
||||
getWorkspaceContent() {
|
||||
return {
|
||||
name: 'test',
|
||||
|
||||
@@ -226,7 +226,7 @@ test('should generate request id', async t => {
|
||||
|
||||
await executor.run('nightly.__test__requestId', {});
|
||||
|
||||
t.true(spy.returnValues.some(v => v.includes(':job/')));
|
||||
t.true(spy.returnValues.some(v => v.includes(':job:')));
|
||||
|
||||
spy.restore();
|
||||
});
|
||||
|
||||
@@ -94,7 +94,7 @@ export function parseCookies(
|
||||
export type RequestType = GqlContextType | 'event' | 'job';
|
||||
|
||||
export function genRequestId(type: RequestType) {
|
||||
return `${AFFiNE.flavor.type}:${type}/${randomUUID()}`;
|
||||
return `${AFFiNE.flavor.type}:${type}:${randomUUID()}`;
|
||||
}
|
||||
|
||||
export function getOrGenRequestId(type: RequestType) {
|
||||
|
||||
@@ -1,85 +0,0 @@
|
||||
import { randomUUID } from 'node:crypto';
|
||||
|
||||
import { User, Workspace } from '@prisma/client';
|
||||
import ava, { TestFn } from 'ava';
|
||||
import { Doc as YDoc } from 'yjs';
|
||||
|
||||
import { createTestingApp, type TestingApp } from '../../../__tests__/utils';
|
||||
import { AppModule } from '../../../app.module';
|
||||
import { Config } from '../../../base';
|
||||
import { ConfigModule } from '../../../base/config';
|
||||
import { Models } from '../../../models';
|
||||
import { PgWorkspaceDocStorageAdapter } from '../../doc';
|
||||
import { DocContentService } from '..';
|
||||
|
||||
const test = ava as TestFn<{
|
||||
models: Models;
|
||||
app: TestingApp;
|
||||
docContentService: DocContentService;
|
||||
config: Config;
|
||||
adapter: PgWorkspaceDocStorageAdapter;
|
||||
}>;
|
||||
|
||||
test.before(async t => {
|
||||
const app = await createTestingApp({
|
||||
imports: [
|
||||
ConfigModule.forRoot({
|
||||
flavor: {
|
||||
doc: false,
|
||||
},
|
||||
docService: {
|
||||
endpoint: '',
|
||||
},
|
||||
}),
|
||||
AppModule,
|
||||
],
|
||||
});
|
||||
|
||||
t.context.models = app.get(Models);
|
||||
t.context.docContentService = app.get(DocContentService);
|
||||
t.context.config = app.get(Config);
|
||||
t.context.adapter = app.get(PgWorkspaceDocStorageAdapter);
|
||||
t.context.app = app;
|
||||
});
|
||||
|
||||
let user: User;
|
||||
let workspace: Workspace;
|
||||
|
||||
test.beforeEach(async t => {
|
||||
t.context.config.docService.endpoint = t.context.app.url();
|
||||
await t.context.app.initTestingDB();
|
||||
user = await t.context.models.user.create({
|
||||
email: 'test@affine.pro',
|
||||
});
|
||||
workspace = await t.context.models.workspace.create(user.id);
|
||||
});
|
||||
|
||||
test.after.always(async t => {
|
||||
await t.context.app.close();
|
||||
});
|
||||
|
||||
test('should get doc content from doc service rpc', async t => {
|
||||
const docId = randomUUID();
|
||||
const { docContentService } = t.context;
|
||||
|
||||
const doc = new YDoc();
|
||||
const text = doc.getText('content');
|
||||
const updates: Buffer[] = [];
|
||||
|
||||
doc.on('update', update => {
|
||||
updates.push(Buffer.from(update));
|
||||
});
|
||||
|
||||
text.insert(0, 'hello');
|
||||
text.insert(5, 'world');
|
||||
text.insert(5, ' ');
|
||||
|
||||
await t.context.adapter.pushDocUpdates(workspace.id, docId, updates, user.id);
|
||||
|
||||
const docContent = await docContentService.getPageContent(
|
||||
workspace.id,
|
||||
docId
|
||||
);
|
||||
// TODO(@fengmk2): should create a test ydoc with blocks
|
||||
t.is(docContent, null);
|
||||
});
|
||||
@@ -8,8 +8,8 @@ import isMobile from 'is-mobile';
|
||||
import { Config, metrics, URLHelper } from '../../base';
|
||||
import { htmlSanitize } from '../../native';
|
||||
import { Public } from '../auth';
|
||||
import { DocReader } from '../doc';
|
||||
import { PermissionService } from '../permission';
|
||||
import { DocContentService } from './service';
|
||||
|
||||
interface RenderOptions {
|
||||
title: string;
|
||||
@@ -50,7 +50,7 @@ export class DocRendererController {
|
||||
private readonly mobileAssets: HtmlAssets = defaultAssets;
|
||||
|
||||
constructor(
|
||||
private readonly doc: DocContentService,
|
||||
private readonly doc: DocReader,
|
||||
private readonly permission: PermissionService,
|
||||
private readonly config: Config,
|
||||
private readonly url: URLHelper
|
||||
@@ -114,7 +114,7 @@ export class DocRendererController {
|
||||
}
|
||||
|
||||
if (allowUrlPreview) {
|
||||
return this.doc.getPageContent(workspaceId, docId);
|
||||
return this.doc.getDocContent(workspaceId, docId);
|
||||
}
|
||||
|
||||
return null;
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
import { FactoryProvider } from '@nestjs/common';
|
||||
|
||||
import { Config, OnEvent } from '../../base';
|
||||
import { DocContentService } from './service';
|
||||
|
||||
class DocEventsListener {
|
||||
constructor(private readonly doc: DocContentService) {}
|
||||
|
||||
@OnEvent('doc.snapshot.updated')
|
||||
async handleDocSnapshotUpdated({
|
||||
workspaceId,
|
||||
docId,
|
||||
}: Events['doc.snapshot.updated']) {
|
||||
await this.doc.markDocContentCacheStale(workspaceId, docId);
|
||||
}
|
||||
}
|
||||
|
||||
export const DocEventsListenerProvider: FactoryProvider = {
|
||||
provide: DocEventsListener,
|
||||
useFactory: (config: Config, doc: DocContentService) => {
|
||||
if (config.flavor.renderer) {
|
||||
return new DocEventsListener(doc);
|
||||
}
|
||||
return;
|
||||
},
|
||||
inject: [Config, DocContentService],
|
||||
};
|
||||
@@ -3,15 +3,9 @@ import { Module } from '@nestjs/common';
|
||||
import { DocStorageModule } from '../doc';
|
||||
import { PermissionModule } from '../permission';
|
||||
import { DocRendererController } from './controller';
|
||||
import { DocEventsListenerProvider } from './event';
|
||||
import { DocContentService } from './service';
|
||||
|
||||
@Module({
|
||||
imports: [DocStorageModule, PermissionModule],
|
||||
providers: [DocContentService, DocEventsListenerProvider],
|
||||
controllers: [DocRendererController],
|
||||
exports: [DocContentService],
|
||||
})
|
||||
export class DocRendererModule {}
|
||||
|
||||
export { DocContentService };
|
||||
|
||||
@@ -1,88 +0,0 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { applyUpdate, Doc } from 'yjs';
|
||||
|
||||
import { Cache } from '../../base';
|
||||
import { DocReader } from '../doc';
|
||||
import {
|
||||
type PageDocContent,
|
||||
parsePageDoc,
|
||||
parseWorkspaceDoc,
|
||||
type WorkspaceDocContent,
|
||||
} from '../utils/blocksuite';
|
||||
|
||||
@Injectable()
|
||||
export class DocContentService {
|
||||
constructor(
|
||||
private readonly cache: Cache,
|
||||
private readonly docReader: DocReader
|
||||
) {}
|
||||
|
||||
async getPageContent(
|
||||
workspaceId: string,
|
||||
guid: string
|
||||
): Promise<PageDocContent | null> {
|
||||
const cacheKey = `workspace:${workspaceId}:doc:${guid}:content`;
|
||||
const cachedResult = await this.cache.get<PageDocContent>(cacheKey);
|
||||
|
||||
if (cachedResult) {
|
||||
return cachedResult;
|
||||
}
|
||||
|
||||
const docRecord = await this.docReader.getDoc(workspaceId, guid);
|
||||
if (!docRecord) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const doc = new Doc();
|
||||
applyUpdate(doc, docRecord.bin);
|
||||
|
||||
const content = parsePageDoc(doc);
|
||||
|
||||
if (content) {
|
||||
await this.cache.set(cacheKey, content, {
|
||||
ttl:
|
||||
7 *
|
||||
24 *
|
||||
60 *
|
||||
60 *
|
||||
1000 /* TODO(@forehalo): we need time constants helper */,
|
||||
});
|
||||
}
|
||||
return content;
|
||||
}
|
||||
|
||||
async getWorkspaceContent(
|
||||
workspaceId: string
|
||||
): Promise<WorkspaceDocContent | null> {
|
||||
const cacheKey = `workspace:${workspaceId}:content`;
|
||||
const cachedResult = await this.cache.get<WorkspaceDocContent>(cacheKey);
|
||||
|
||||
if (cachedResult) {
|
||||
return cachedResult;
|
||||
}
|
||||
|
||||
const docRecord = await this.docReader.getDoc(workspaceId, workspaceId);
|
||||
if (!docRecord) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const doc = new Doc();
|
||||
applyUpdate(doc, docRecord.bin);
|
||||
|
||||
const content = parseWorkspaceDoc(doc);
|
||||
|
||||
if (content) {
|
||||
await this.cache.set(cacheKey, content);
|
||||
}
|
||||
|
||||
return content;
|
||||
}
|
||||
|
||||
async markDocContentCacheStale(workspaceId: string, docId: string) {
|
||||
const key =
|
||||
workspaceId === docId
|
||||
? `workspace:${workspaceId}:content`
|
||||
: `workspace:${workspaceId}:doc:${docId}:content`;
|
||||
await this.cache.delete(key);
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { mock } from 'node:test';
|
||||
|
||||
import { User, Workspace } from '@prisma/client';
|
||||
import ava, { TestFn } from 'ava';
|
||||
@@ -8,11 +9,13 @@ import { AppModule } from '../../../app.module';
|
||||
import { CryptoHelper } from '../../../base';
|
||||
import { ConfigModule } from '../../../base/config';
|
||||
import { Models } from '../../../models';
|
||||
import { DatabaseDocReader } from '../../doc';
|
||||
|
||||
const test = ava as TestFn<{
|
||||
models: Models;
|
||||
app: TestingApp;
|
||||
crypto: CryptoHelper;
|
||||
databaseDocReader: DatabaseDocReader;
|
||||
}>;
|
||||
|
||||
test.before(async t => {
|
||||
@@ -23,6 +26,7 @@ test.before(async t => {
|
||||
t.context.models = app.get(Models);
|
||||
t.context.crypto = app.get(CryptoHelper);
|
||||
t.context.app = app;
|
||||
t.context.databaseDocReader = app.get(DatabaseDocReader);
|
||||
});
|
||||
|
||||
let user: User;
|
||||
@@ -119,3 +123,101 @@ test('should return doc when found', async t => {
|
||||
t.is(res.headers['x-doc-timestamp'], timestamp.toString());
|
||||
t.is(res.headers['x-doc-editor-id'], user.id);
|
||||
});
|
||||
|
||||
test('should 404 when doc diff not found', async t => {
|
||||
const { app } = t.context;
|
||||
|
||||
const workspaceId = '123';
|
||||
const docId = '123';
|
||||
await app
|
||||
.POST(`/rpc/workspaces/${workspaceId}/docs/${docId}/diff`)
|
||||
.set('x-access-token', t.context.crypto.sign(docId))
|
||||
.expect({
|
||||
status: 404,
|
||||
code: 'Not Found',
|
||||
type: 'RESOURCE_NOT_FOUND',
|
||||
name: 'NOT_FOUND',
|
||||
message: 'Doc not found',
|
||||
})
|
||||
.expect(404);
|
||||
t.pass();
|
||||
});
|
||||
|
||||
test('should 404 when doc content not found', async t => {
|
||||
const { app } = t.context;
|
||||
|
||||
const workspaceId = '123';
|
||||
const docId = '123';
|
||||
await app
|
||||
.GET(`/rpc/workspaces/${workspaceId}/docs/${docId}/content`)
|
||||
.set('x-access-token', t.context.crypto.sign(docId))
|
||||
.expect({
|
||||
status: 404,
|
||||
code: 'Not Found',
|
||||
type: 'RESOURCE_NOT_FOUND',
|
||||
name: 'NOT_FOUND',
|
||||
message: 'Doc not found',
|
||||
})
|
||||
.expect(404);
|
||||
t.pass();
|
||||
});
|
||||
|
||||
test('should get doc content in json format', async t => {
|
||||
const { app } = t.context;
|
||||
mock.method(t.context.databaseDocReader, 'getDocContent', async () => {
|
||||
return {
|
||||
title: 'test title',
|
||||
summary: 'test summary',
|
||||
};
|
||||
});
|
||||
|
||||
const docId = randomUUID();
|
||||
await app
|
||||
.GET(`/rpc/workspaces/${workspace.id}/docs/${docId}/content`)
|
||||
.set('x-access-token', t.context.crypto.sign(docId))
|
||||
.expect({
|
||||
title: 'test title',
|
||||
summary: 'test summary',
|
||||
})
|
||||
.expect(200);
|
||||
t.pass();
|
||||
});
|
||||
|
||||
test('should 404 when workspace content not found', async t => {
|
||||
const { app } = t.context;
|
||||
|
||||
const workspaceId = '123';
|
||||
await app
|
||||
.GET(`/rpc/workspaces/${workspaceId}/content`)
|
||||
.set('x-access-token', t.context.crypto.sign(workspaceId))
|
||||
.expect({
|
||||
status: 404,
|
||||
code: 'Not Found',
|
||||
type: 'RESOURCE_NOT_FOUND',
|
||||
name: 'NOT_FOUND',
|
||||
message: 'Workspace not found',
|
||||
})
|
||||
.expect(404);
|
||||
t.pass();
|
||||
});
|
||||
|
||||
test('should get workspace content in json format', async t => {
|
||||
const { app } = t.context;
|
||||
mock.method(t.context.databaseDocReader, 'getWorkspaceContent', async () => {
|
||||
return {
|
||||
name: 'test name',
|
||||
avatarKey: 'avatar key',
|
||||
};
|
||||
});
|
||||
|
||||
const workspaceId = randomUUID();
|
||||
await app
|
||||
.GET(`/rpc/workspaces/${workspaceId}/content`)
|
||||
.set('x-access-token', t.context.crypto.sign(workspaceId))
|
||||
.expect(200)
|
||||
.expect({
|
||||
name: 'test name',
|
||||
avatarKey: 'avatar key',
|
||||
});
|
||||
t.pass();
|
||||
});
|
||||
|
||||
@@ -70,4 +70,31 @@ export class DocRpcController {
|
||||
);
|
||||
res.send(Buffer.concat([diff.missing, diff.state]));
|
||||
}
|
||||
|
||||
@SkipThrottle()
|
||||
@Internal()
|
||||
@Get('/workspaces/:workspaceId/docs/:docId/content')
|
||||
async getDocContent(
|
||||
@Param('workspaceId') workspaceId: string,
|
||||
@Param('docId') docId: string
|
||||
) {
|
||||
const content = await this.docReader.getDocContent(workspaceId, docId);
|
||||
if (!content) {
|
||||
throw new NotFound('Doc not found');
|
||||
}
|
||||
this.logger.log(`get doc content ${docId} from workspace ${workspaceId}`);
|
||||
return content;
|
||||
}
|
||||
|
||||
@SkipThrottle()
|
||||
@Internal()
|
||||
@Get('/workspaces/:workspaceId/content')
|
||||
async getWorkspaceContent(@Param('workspaceId') workspaceId: string) {
|
||||
const content = await this.docReader.getWorkspaceContent(workspaceId);
|
||||
if (!content) {
|
||||
throw new NotFound('Workspace not found');
|
||||
}
|
||||
this.logger.log(`get workspace content ${workspaceId}`);
|
||||
return content;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,13 +8,14 @@ import { createTestingApp, type TestingApp } from '../../../__tests__/utils';
|
||||
import { AppModule } from '../../../app.module';
|
||||
import { ConfigModule } from '../../../base/config';
|
||||
import { Models } from '../../../models';
|
||||
import { DocReader } from '..';
|
||||
import { DocReader, PgWorkspaceDocStorageAdapter } from '..';
|
||||
import { DatabaseDocReader } from '../reader';
|
||||
|
||||
const test = ava as TestFn<{
|
||||
models: Models;
|
||||
app: TestingApp;
|
||||
docReader: DocReader;
|
||||
adapter: PgWorkspaceDocStorageAdapter;
|
||||
}>;
|
||||
|
||||
test.before(async t => {
|
||||
@@ -24,6 +25,7 @@ test.before(async t => {
|
||||
|
||||
t.context.models = app.get(Models);
|
||||
t.context.docReader = app.get(DocReader);
|
||||
t.context.adapter = app.get(PgWorkspaceDocStorageAdapter);
|
||||
t.context.app = app;
|
||||
});
|
||||
|
||||
@@ -136,3 +138,53 @@ test('should return doc diff', async t => {
|
||||
applyUpdate(doc2, diff3!.missing);
|
||||
t.is(doc2.getText('content').toString(), 'hello world!@');
|
||||
});
|
||||
|
||||
test('should get doc content', async t => {
|
||||
const docId = randomUUID();
|
||||
const { docReader } = t.context;
|
||||
|
||||
const doc = new YDoc();
|
||||
const text = doc.getText('content');
|
||||
const updates: Buffer[] = [];
|
||||
|
||||
doc.on('update', update => {
|
||||
updates.push(Buffer.from(update));
|
||||
});
|
||||
|
||||
text.insert(0, 'hello');
|
||||
text.insert(5, 'world');
|
||||
text.insert(5, ' ');
|
||||
|
||||
await t.context.adapter.pushDocUpdates(workspace.id, docId, updates, user.id);
|
||||
|
||||
const docContent = await docReader.getDocContent(workspace.id, docId);
|
||||
// TODO(@fengmk2): should create a test ydoc with blocks
|
||||
t.is(docContent, null);
|
||||
});
|
||||
|
||||
test('should get workspace content', async t => {
|
||||
const { docReader } = t.context;
|
||||
|
||||
const doc = new YDoc();
|
||||
const text = doc.getText('content');
|
||||
const updates: Buffer[] = [];
|
||||
|
||||
doc.on('update', update => {
|
||||
updates.push(Buffer.from(update));
|
||||
});
|
||||
|
||||
text.insert(0, 'hello');
|
||||
text.insert(5, 'world');
|
||||
text.insert(5, ' ');
|
||||
|
||||
await t.context.adapter.pushDocUpdates(
|
||||
workspace.id,
|
||||
workspace.id,
|
||||
updates,
|
||||
user.id
|
||||
);
|
||||
|
||||
const workspaceContent = await docReader.getWorkspaceContent(workspace.id);
|
||||
// TODO(@fengmk2): should create a test ydoc with blocks
|
||||
t.is(workspaceContent, null);
|
||||
});
|
||||
|
||||
@@ -10,7 +10,7 @@ import { AppModule } from '../../../app.module';
|
||||
import { Config, UserFriendlyError } from '../../../base';
|
||||
import { ConfigModule } from '../../../base/config';
|
||||
import { Models } from '../../../models';
|
||||
import { DatabaseDocReader, DocReader } from '..';
|
||||
import { DatabaseDocReader, DocReader, PgWorkspaceDocStorageAdapter } from '..';
|
||||
import { RpcDocReader } from '../reader';
|
||||
|
||||
const test = ava as TestFn<{
|
||||
@@ -18,6 +18,7 @@ const test = ava as TestFn<{
|
||||
app: TestingApp;
|
||||
docReader: DocReader;
|
||||
databaseDocReader: DatabaseDocReader;
|
||||
adapter: PgWorkspaceDocStorageAdapter;
|
||||
config: Config;
|
||||
}>;
|
||||
|
||||
@@ -39,6 +40,7 @@ test.before(async t => {
|
||||
t.context.models = app.get(Models);
|
||||
t.context.docReader = app.get(DocReader);
|
||||
t.context.databaseDocReader = app.get(DatabaseDocReader);
|
||||
t.context.adapter = app.get(PgWorkspaceDocStorageAdapter);
|
||||
t.context.config = app.get(Config);
|
||||
t.context.app = app;
|
||||
});
|
||||
@@ -71,12 +73,36 @@ test('should return null when doc not found', async t => {
|
||||
});
|
||||
|
||||
test('should throw error when doc service internal error', async t => {
|
||||
const { docReader, databaseDocReader } = t.context;
|
||||
const { docReader, adapter } = t.context;
|
||||
const docId = randomUUID();
|
||||
mock.method(databaseDocReader, 'getDoc', async () => {
|
||||
mock.method(adapter, 'getDoc', async () => {
|
||||
throw new Error('mock doc service internal error');
|
||||
});
|
||||
const err = await t.throwsAsync(docReader.getDoc(workspace.id, docId), {
|
||||
let err = await t.throwsAsync(docReader.getDoc(workspace.id, docId), {
|
||||
instanceOf: UserFriendlyError,
|
||||
message: 'An internal error occurred.',
|
||||
name: 'internal_server_error',
|
||||
});
|
||||
t.is(err.type, 'internal_server_error');
|
||||
t.is(err.status, 500);
|
||||
|
||||
err = await t.throwsAsync(docReader.getDocDiff(workspace.id, docId), {
|
||||
instanceOf: UserFriendlyError,
|
||||
message: 'An internal error occurred.',
|
||||
name: 'internal_server_error',
|
||||
});
|
||||
t.is(err.type, 'internal_server_error');
|
||||
t.is(err.status, 500);
|
||||
|
||||
err = await t.throwsAsync(docReader.getDocContent(workspace.id, docId), {
|
||||
instanceOf: UserFriendlyError,
|
||||
message: 'An internal error occurred.',
|
||||
name: 'internal_server_error',
|
||||
});
|
||||
t.is(err.type, 'internal_server_error');
|
||||
t.is(err.status, 500);
|
||||
|
||||
err = await t.throwsAsync(docReader.getWorkspaceContent(workspace.id), {
|
||||
instanceOf: UserFriendlyError,
|
||||
message: 'An internal error occurred.',
|
||||
name: 'internal_server_error',
|
||||
@@ -232,3 +258,84 @@ test('should get doc diff fallback to database doc reader when endpoint network
|
||||
applyUpdate(doc2, diff!.missing);
|
||||
t.is(doc2.getText('content').toString(), 'hello world!');
|
||||
});
|
||||
|
||||
test('should get doc content', async t => {
|
||||
const docId = randomUUID();
|
||||
const { docReader, databaseDocReader } = t.context;
|
||||
mock.method(databaseDocReader, 'getDocContent', async () => {
|
||||
return {
|
||||
title: 'test title',
|
||||
summary: 'test summary',
|
||||
};
|
||||
});
|
||||
const docContent = await docReader.getDocContent(workspace.id, docId);
|
||||
t.deepEqual(docContent, {
|
||||
title: 'test title',
|
||||
summary: 'test summary',
|
||||
});
|
||||
});
|
||||
|
||||
test('should return null when doc content not exists', async t => {
|
||||
const docId = randomUUID();
|
||||
const { docReader, adapter } = t.context;
|
||||
|
||||
const doc = new YDoc();
|
||||
const text = doc.getText('content');
|
||||
const updates: Buffer[] = [];
|
||||
|
||||
doc.on('update', update => {
|
||||
updates.push(Buffer.from(update));
|
||||
});
|
||||
|
||||
text.insert(0, 'hello');
|
||||
text.insert(5, 'world');
|
||||
text.insert(5, ' ');
|
||||
|
||||
await adapter.pushDocUpdates(workspace.id, docId, updates, user.id);
|
||||
|
||||
const docContent = await docReader.getDocContent(workspace.id, docId);
|
||||
t.is(docContent, null);
|
||||
|
||||
const notExists = await docReader.getDocContent(workspace.id, randomUUID());
|
||||
t.is(notExists, null);
|
||||
});
|
||||
|
||||
test('should get workspace content from doc service rpc', async t => {
|
||||
const { docReader, databaseDocReader } = t.context;
|
||||
mock.method(databaseDocReader, 'getWorkspaceContent', async () => {
|
||||
return {
|
||||
name: 'test name',
|
||||
avatarKey: 'avatar key',
|
||||
};
|
||||
});
|
||||
|
||||
const workspaceContent = await docReader.getWorkspaceContent(workspace.id);
|
||||
t.deepEqual(workspaceContent, {
|
||||
name: 'test name',
|
||||
avatarKey: 'avatar key',
|
||||
});
|
||||
});
|
||||
|
||||
test('should return null when workspace bin meta not exists', async t => {
|
||||
const { docReader, adapter } = t.context;
|
||||
const doc = new YDoc();
|
||||
const text = doc.getText('content');
|
||||
const updates: Buffer[] = [];
|
||||
|
||||
doc.on('update', update => {
|
||||
updates.push(Buffer.from(update));
|
||||
});
|
||||
|
||||
text.insert(0, 'hello');
|
||||
text.insert(5, 'world');
|
||||
text.insert(5, ' ');
|
||||
|
||||
await adapter.pushDocUpdates(workspace.id, workspace.id, updates, user.id);
|
||||
|
||||
const workspaceContent = await docReader.getWorkspaceContent(workspace.id);
|
||||
t.is(workspaceContent, null);
|
||||
|
||||
// workspace not exists
|
||||
const notExists = await docReader.getWorkspaceContent(randomUUID());
|
||||
t.is(notExists, null);
|
||||
});
|
||||
|
||||
@@ -344,7 +344,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
|
||||
});
|
||||
|
||||
if (updatedSnapshot) {
|
||||
this.event.broadcast('doc.snapshot.updated', {
|
||||
this.event.emit('doc.snapshot.updated', {
|
||||
workspaceId: snapshot.spaceId,
|
||||
docId: snapshot.docId,
|
||||
});
|
||||
|
||||
17
packages/backend/server/src/core/doc/event.ts
Normal file
17
packages/backend/server/src/core/doc/event.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { OnEvent } from '../../base';
|
||||
import { DocReader } from './reader';
|
||||
|
||||
@Injectable()
|
||||
export class DocEventsListener {
|
||||
constructor(private readonly doc: DocReader) {}
|
||||
|
||||
@OnEvent('doc.snapshot.updated')
|
||||
async markDocContentCacheStale({
|
||||
workspaceId,
|
||||
docId,
|
||||
}: Events['doc.snapshot.updated']) {
|
||||
await this.doc.markDocContentCacheStale(workspaceId, docId);
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import { PermissionModule } from '../permission';
|
||||
import { QuotaModule } from '../quota';
|
||||
import { PgUserspaceDocStorageAdapter } from './adapters/userspace';
|
||||
import { PgWorkspaceDocStorageAdapter } from './adapters/workspace';
|
||||
import { DocEventsListener } from './event';
|
||||
import { DocStorageCronJob } from './job';
|
||||
import { DocStorageOptions } from './options';
|
||||
import { DatabaseDocReader, DocReader, DocReaderProvider } from './reader';
|
||||
@@ -19,6 +20,7 @@ import { DatabaseDocReader, DocReader, DocReaderProvider } from './reader';
|
||||
DocStorageCronJob,
|
||||
DocReaderProvider,
|
||||
DatabaseDocReader,
|
||||
DocEventsListener,
|
||||
],
|
||||
exports: [
|
||||
DatabaseDocReader,
|
||||
|
||||
@@ -1,17 +1,33 @@
|
||||
import { FactoryProvider, Injectable, Logger } from '@nestjs/common';
|
||||
import { ModuleRef } from '@nestjs/core';
|
||||
import { diffUpdate, encodeStateVectorFromUpdate } from 'yjs';
|
||||
import {
|
||||
applyUpdate,
|
||||
diffUpdate,
|
||||
Doc as YDoc,
|
||||
encodeStateVectorFromUpdate,
|
||||
} from 'yjs';
|
||||
|
||||
import {
|
||||
Cache,
|
||||
Config,
|
||||
CryptoHelper,
|
||||
getOrGenRequestId,
|
||||
UserFriendlyError,
|
||||
} from '../../base';
|
||||
import {
|
||||
type PageDocContent,
|
||||
parsePageDoc,
|
||||
parseWorkspaceDoc,
|
||||
type WorkspaceDocContent,
|
||||
} from '../utils/blocksuite';
|
||||
import { PgWorkspaceDocStorageAdapter } from './adapters/workspace';
|
||||
import { type DocDiff, type DocRecord } from './storage';
|
||||
|
||||
const DOC_CONTENT_CACHE_7_DAYS = 7 * 24 * 60 * 60 * 1000;
|
||||
|
||||
export abstract class DocReader {
|
||||
constructor(protected readonly cache: Cache) {}
|
||||
|
||||
abstract getDoc(
|
||||
workspaceId: string,
|
||||
docId: string
|
||||
@@ -23,6 +39,60 @@ export abstract class DocReader {
|
||||
stateVector?: Uint8Array
|
||||
): Promise<DocDiff | null>;
|
||||
|
||||
async getDocContent(
|
||||
workspaceId: string,
|
||||
docId: string
|
||||
): Promise<PageDocContent | null> {
|
||||
const cacheKey = this.cacheKey(workspaceId, docId);
|
||||
const cachedResult = await this.cache.get<PageDocContent>(cacheKey);
|
||||
if (cachedResult) {
|
||||
return cachedResult;
|
||||
}
|
||||
|
||||
const content = await this.getDocContentWithoutCache(workspaceId, docId);
|
||||
if (content) {
|
||||
await this.cache.set(cacheKey, content, {
|
||||
ttl: DOC_CONTENT_CACHE_7_DAYS,
|
||||
});
|
||||
}
|
||||
return content;
|
||||
}
|
||||
|
||||
async getWorkspaceContent(
|
||||
workspaceId: string
|
||||
): Promise<WorkspaceDocContent | null> {
|
||||
const cacheKey = this.cacheKey(workspaceId, workspaceId);
|
||||
const cachedResult = await this.cache.get<WorkspaceDocContent>(cacheKey);
|
||||
if (cachedResult) {
|
||||
return cachedResult;
|
||||
}
|
||||
|
||||
const content = await this.getWorkspaceContentWithoutCache(workspaceId);
|
||||
if (content) {
|
||||
await this.cache.set(cacheKey, content);
|
||||
}
|
||||
return content;
|
||||
}
|
||||
|
||||
async markDocContentCacheStale(workspaceId: string, docId: string) {
|
||||
await this.cache.delete(this.cacheKey(workspaceId, docId));
|
||||
}
|
||||
|
||||
private cacheKey(workspaceId: string, docId: string) {
|
||||
return workspaceId === docId
|
||||
? `workspace:${workspaceId}:content`
|
||||
: `workspace:${workspaceId}:doc:${docId}:content`;
|
||||
}
|
||||
|
||||
protected abstract getDocContentWithoutCache(
|
||||
workspaceId: string,
|
||||
guid: string
|
||||
): Promise<PageDocContent | null>;
|
||||
|
||||
protected abstract getWorkspaceContentWithoutCache(
|
||||
workspaceId: string
|
||||
): Promise<WorkspaceDocContent | null>;
|
||||
|
||||
protected docDiff(update: Uint8Array, stateVector?: Uint8Array) {
|
||||
const missing = stateVector ? diffUpdate(update, stateVector) : update;
|
||||
const state = encodeStateVectorFromUpdate(update);
|
||||
@@ -35,8 +105,11 @@ export abstract class DocReader {
|
||||
|
||||
@Injectable()
|
||||
export class DatabaseDocReader extends DocReader {
|
||||
constructor(protected readonly workspace: PgWorkspaceDocStorageAdapter) {
|
||||
super();
|
||||
constructor(
|
||||
protected override readonly cache: Cache,
|
||||
protected readonly workspace: PgWorkspaceDocStorageAdapter
|
||||
) {
|
||||
super(cache);
|
||||
}
|
||||
|
||||
async getDoc(workspaceId: string, docId: string): Promise<DocRecord | null> {
|
||||
@@ -57,6 +130,31 @@ export class DatabaseDocReader extends DocReader {
|
||||
timestamp: doc.timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
protected override async getDocContentWithoutCache(
|
||||
workspaceId: string,
|
||||
guid: string
|
||||
): Promise<PageDocContent | null> {
|
||||
const docRecord = await this.workspace.getDoc(workspaceId, guid);
|
||||
if (!docRecord) {
|
||||
return null;
|
||||
}
|
||||
const doc = new YDoc();
|
||||
applyUpdate(doc, docRecord.bin);
|
||||
return parsePageDoc(doc);
|
||||
}
|
||||
|
||||
protected override async getWorkspaceContentWithoutCache(
|
||||
workspaceId: string
|
||||
): Promise<WorkspaceDocContent | null> {
|
||||
const docRecord = await this.workspace.getDoc(workspaceId, workspaceId);
|
||||
if (!docRecord) {
|
||||
return null;
|
||||
}
|
||||
const doc = new YDoc();
|
||||
applyUpdate(doc, docRecord.bin);
|
||||
return parseWorkspaceDoc(doc);
|
||||
}
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
@@ -66,9 +164,10 @@ export class RpcDocReader extends DatabaseDocReader {
|
||||
constructor(
|
||||
private readonly config: Config,
|
||||
private readonly crypto: CryptoHelper,
|
||||
protected override readonly cache: Cache,
|
||||
protected override readonly workspace: PgWorkspaceDocStorageAdapter
|
||||
) {
|
||||
super(workspace);
|
||||
super(cache, workspace);
|
||||
}
|
||||
|
||||
private async fetch(
|
||||
@@ -128,11 +227,8 @@ export class RpcDocReader extends DatabaseDocReader {
|
||||
// other error
|
||||
this.logger.error(
|
||||
`Failed to fetch doc ${url}, fallback to database doc reader`,
|
||||
err.stack
|
||||
err
|
||||
);
|
||||
if (err.cause instanceof Error) {
|
||||
this.logger.error(err.cause.stack);
|
||||
}
|
||||
// fallback to database doc reader if the error is not user friendly, like network error
|
||||
return await super.getDoc(workspaceId, docId);
|
||||
}
|
||||
@@ -172,15 +268,61 @@ export class RpcDocReader extends DatabaseDocReader {
|
||||
const err = e as Error;
|
||||
this.logger.error(
|
||||
`Failed to fetch doc diff ${url}, fallback to database doc reader`,
|
||||
err.stack
|
||||
err
|
||||
);
|
||||
if (err.cause instanceof Error) {
|
||||
this.logger.error(err.cause.stack);
|
||||
}
|
||||
// fallback to database doc reader if the error is not user friendly, like network error
|
||||
return await super.getDocDiff(workspaceId, docId, stateVector);
|
||||
}
|
||||
}
|
||||
|
||||
protected override async getDocContentWithoutCache(
|
||||
workspaceId: string,
|
||||
docId: string
|
||||
): Promise<PageDocContent | null> {
|
||||
const url = `${this.config.docService.endpoint}/rpc/workspaces/${workspaceId}/docs/${docId}/content`;
|
||||
const accessToken = this.crypto.sign(docId);
|
||||
try {
|
||||
const res = await this.fetch(accessToken, url, 'GET');
|
||||
if (!res) {
|
||||
return null;
|
||||
}
|
||||
return (await res.json()) as PageDocContent;
|
||||
} catch (e) {
|
||||
if (e instanceof UserFriendlyError) {
|
||||
throw e;
|
||||
}
|
||||
const err = e as Error;
|
||||
this.logger.error(
|
||||
`Failed to fetch doc content ${url}, fallback to database doc reader`,
|
||||
err
|
||||
);
|
||||
return await super.getDocContentWithoutCache(workspaceId, docId);
|
||||
}
|
||||
}
|
||||
|
||||
protected override async getWorkspaceContentWithoutCache(
|
||||
workspaceId: string
|
||||
): Promise<WorkspaceDocContent | null> {
|
||||
const url = `${this.config.docService.endpoint}/rpc/workspaces/${workspaceId}/content`;
|
||||
const accessToken = this.crypto.sign(workspaceId);
|
||||
try {
|
||||
const res = await this.fetch(accessToken, url, 'GET');
|
||||
if (!res) {
|
||||
return null;
|
||||
}
|
||||
return (await res.json()) as WorkspaceDocContent;
|
||||
} catch (e) {
|
||||
if (e instanceof UserFriendlyError) {
|
||||
throw e;
|
||||
}
|
||||
const err = e as Error;
|
||||
this.logger.error(
|
||||
`Failed to fetch workspace content ${url}, fallback to database doc reader`,
|
||||
err
|
||||
);
|
||||
return await super.getWorkspaceContentWithoutCache(workspaceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const DocReaderProvider: FactoryProvider = {
|
||||
|
||||
@@ -10,7 +10,7 @@ import {
|
||||
UserNotFound,
|
||||
} from '../../../base';
|
||||
import { Models } from '../../../models';
|
||||
import { DocContentService } from '../../doc-renderer';
|
||||
import { DocReader } from '../../doc';
|
||||
import { PermissionService, WorkspaceRole } from '../../permission';
|
||||
import { WorkspaceBlobStorage } from '../../storage';
|
||||
|
||||
@@ -30,7 +30,7 @@ export class WorkspaceService {
|
||||
constructor(
|
||||
private readonly blobStorage: WorkspaceBlobStorage,
|
||||
private readonly cache: Cache,
|
||||
private readonly doc: DocContentService,
|
||||
private readonly doc: DocReader,
|
||||
private readonly mailer: MailService,
|
||||
private readonly permission: PermissionService,
|
||||
private readonly prisma: PrismaClient,
|
||||
|
||||
Reference in New Issue
Block a user