diff --git a/.docker/selfhost/schema.json b/.docker/selfhost/schema.json index 60d65f7ebf..a1b50b112b 100644 --- a/.docker/selfhost/schema.json +++ b/.docker/selfhost/schema.json @@ -70,6 +70,18 @@ "concurrency": 1 } }, + "queues.indexer": { + "type": "object", + "description": "The config for indexer job queue\n@default {\"concurrency\":1}", + "properties": { + "concurrency": { + "type": "number" + } + }, + "default": { + "concurrency": 1 + } + }, "queues.notification": { "type": "object", "description": "The config for notification job queue\n@default {\"concurrency\":10}", diff --git a/.github/actions/server-test-env/action.yml b/.github/actions/server-test-env/action.yml index 3f08733e43..b54ed8ba1f 100644 --- a/.github/actions/server-test-env/action.yml +++ b/.github/actions/server-test-env/action.yml @@ -4,6 +4,11 @@ description: 'Prepare Server Test Environment' runs: using: 'composite' steps: + - name: Bundle @affine/reader + shell: bash + run: | + yarn affine @affine/reader build + - name: Initialize database shell: bash run: | @@ -21,6 +26,7 @@ runs: yarn affine @affine/server prisma generate yarn affine @affine/server prisma migrate deploy yarn affine @affine/server data-migration run + - name: Import config shell: bash run: | diff --git a/.github/workflows/build-images.yml b/.github/workflows/build-images.yml index 7492f97a6f..c8fbad4ce6 100644 --- a/.github/workflows/build-images.yml +++ b/.github/workflows/build-images.yml @@ -174,6 +174,8 @@ jobs: path: ./packages/backend/native - name: List server-native files run: ls -alh ./packages/backend/native + - name: Build @affine/reader + run: yarn workspace @affine/reader build - name: Build Server run: yarn workspace @affine/server build - name: Upload server dist diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 7447d07905..c77de2f63c 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -172,6 +172,11 @@ jobs: name: server-native.node path: ./packages/backend/native + - name: Bundle @affine/reader + shell: bash + run: | + yarn workspace @affine/reader build + - name: Run Check run: | yarn affine init diff --git a/docs/developing-server.md b/docs/developing-server.md index 478179e88d..4fe6c98515 100644 --- a/docs/developing-server.md +++ b/docs/developing-server.md @@ -35,6 +35,12 @@ Server also requires native packages to be built, you can build them by running yarn affine @affine/server-native build ``` +## Build @affine/reader package + +```sh +yarn affine @affine/reader build +``` + ## Prepare dev environment ```sh diff --git a/packages/backend/server/package.json b/packages/backend/server/package.json index c534723c00..4dc509a61d 100644 --- a/packages/backend/server/package.json +++ b/packages/backend/server/package.json @@ -26,6 +26,7 @@ "postinstall": "prisma generate" }, "dependencies": { + "@affine/reader": "workspace:*", "@affine/server-native": "workspace:*", "@ai-sdk/anthropic": "^1.2.10", "@ai-sdk/google": "^1.2.10", diff --git a/packages/backend/server/src/__tests__/__fixtures__/test-doc.snapshot.bin b/packages/backend/server/src/__tests__/__fixtures__/test-doc.snapshot.bin new file mode 100644 index 0000000000..7c59832629 Binary files /dev/null and b/packages/backend/server/src/__tests__/__fixtures__/test-doc.snapshot.bin differ diff --git a/packages/backend/server/src/__tests__/__fixtures__/test-root-doc.snapshot.bin b/packages/backend/server/src/__tests__/__fixtures__/test-root-doc.snapshot.bin new file mode 100644 index 0000000000..35502927ed Binary files /dev/null and b/packages/backend/server/src/__tests__/__fixtures__/test-root-doc.snapshot.bin differ diff --git a/packages/backend/server/src/__tests__/mocks/doc-snapshot.mock.ts b/packages/backend/server/src/__tests__/mocks/doc-snapshot.mock.ts new file mode 100644 index 0000000000..16a1bbb1ad --- /dev/null +++ b/packages/backend/server/src/__tests__/mocks/doc-snapshot.mock.ts @@ -0,0 +1,42 @@ +import { readFile } from 'node:fs/promises'; +import path from 'node:path'; + +import { faker } from '@faker-js/faker'; +import type { Snapshot } from '@prisma/client'; + +import { Mocker } from './factory'; + +export type MockDocSnapshotInput = { + user: { id: string }; + workspaceId: string; + docId?: string; + blob?: Uint8Array; +}; + +export type MockedDocSnapshot = Snapshot; + +export class MockDocSnapshot extends Mocker< + MockDocSnapshotInput, + MockedDocSnapshot +> { + override async create(input: MockDocSnapshotInput) { + if (!input.blob) { + const snapshot = await readFile( + path.join(import.meta.dirname, '../__fixtures__/test-doc.snapshot.bin') + ); + input.blob = snapshot; + } + const snapshot = await this.db.snapshot.create({ + data: { + id: input.docId ?? faker.string.nanoid(), + workspaceId: input.workspaceId, + blob: input.blob, + createdAt: new Date(), + updatedAt: new Date(), + createdBy: input.user.id, + updatedBy: input.user.id, + }, + }); + return snapshot; + } +} diff --git a/packages/backend/server/src/__tests__/mocks/index.ts b/packages/backend/server/src/__tests__/mocks/index.ts index 50e82b334a..b6abf141c8 100644 --- a/packages/backend/server/src/__tests__/mocks/index.ts +++ b/packages/backend/server/src/__tests__/mocks/index.ts @@ -6,6 +6,7 @@ export * from './workspace-user.mock'; import { MockCopilotProvider } from './copilot.mock'; import { MockDocMeta } from './doc-meta.mock'; +import { MockDocSnapshot } from './doc-snapshot.mock'; import { MockEventBus } from './eventbus.mock'; import { MockMailer } from './mailer.mock'; import { MockJobQueue } from './queue.mock'; @@ -22,6 +23,7 @@ export const Mockers = { WorkspaceUser: MockWorkspaceUser, UserSettings: MockUserSettings, DocMeta: MockDocMeta, + DocSnapshot: MockDocSnapshot, }; export { MockCopilotProvider, MockEventBus, MockJobQueue, MockMailer }; diff --git a/packages/backend/server/src/__tests__/mocks/queue.mock.ts b/packages/backend/server/src/__tests__/mocks/queue.mock.ts index 25a3ceff4f..d44a9b6321 100644 --- a/packages/backend/server/src/__tests__/mocks/queue.mock.ts +++ b/packages/backend/server/src/__tests__/mocks/queue.mock.ts @@ -5,6 +5,7 @@ import { JobQueue } from '../../base'; export class MockJobQueue { add = Sinon.createStubInstance(JobQueue).add.resolves(); + remove = Sinon.createStubInstance(JobQueue).remove.resolves(); last(name: Job): { name: Job; payload: Jobs[Job] } { const addJobName = this.add.lastCall?.args[0]; diff --git a/packages/backend/server/src/__tests__/mocks/workspace.mock.ts b/packages/backend/server/src/__tests__/mocks/workspace.mock.ts index bfc1fd9446..5a95b78604 100644 --- a/packages/backend/server/src/__tests__/mocks/workspace.mock.ts +++ b/packages/backend/server/src/__tests__/mocks/workspace.mock.ts @@ -1,3 +1,6 @@ +import { readFile } from 'node:fs/promises'; +import path from 'node:path'; + import { faker } from '@faker-js/faker'; import type { Prisma, Workspace } from '@prisma/client'; import { omit } from 'lodash-es'; @@ -7,6 +10,7 @@ import { Mocker } from './factory'; export type MockWorkspaceInput = Prisma.WorkspaceCreateInput & { owner?: { id: string }; + snapshot?: Uint8Array | true; }; export type MockedWorkspace = Workspace; @@ -14,8 +18,18 @@ export type MockedWorkspace = Workspace; export class MockWorkspace extends Mocker { override async create(input?: Partial) { const owner = input?.owner; - input = omit(input, 'owner'); - return await this.db.workspace.create({ + if (input?.snapshot === true) { + const snapshot = await readFile( + path.join( + import.meta.dirname, + '../__fixtures__/test-root-doc.snapshot.bin' + ) + ); + input.snapshot = snapshot; + } + const snapshot = input?.snapshot; + input = omit(input, 'owner', 'snapshot'); + const workspace = await this.db.workspace.create({ data: { name: faker.animal.cat(), public: false, @@ -31,5 +45,21 @@ export class MockWorkspace extends Mocker { : undefined, }, }); + + // create a rootDoc snapshot + if (snapshot) { + await this.db.snapshot.create({ + data: { + id: workspace.id, + workspaceId: workspace.id, + blob: snapshot, + createdAt: new Date(), + updatedAt: new Date(), + createdBy: owner?.id, + updatedBy: owner?.id, + }, + }); + } + return workspace; } } diff --git a/packages/backend/server/src/base/job/queue/config.ts b/packages/backend/server/src/base/job/queue/config.ts index 0f32961fce..5aeb0f5de7 100644 --- a/packages/backend/server/src/base/job/queue/config.ts +++ b/packages/backend/server/src/base/job/queue/config.ts @@ -61,6 +61,14 @@ defineModuleConfig('job', { schema, }, + 'queues.indexer': { + desc: 'The config for indexer job queue', + default: { + concurrency: 1, + }, + schema, + }, + 'queues.notification': { desc: 'The config for notification job queue', default: { diff --git a/packages/backend/server/src/base/job/queue/def.ts b/packages/backend/server/src/base/job/queue/def.ts index d1f2aa60eb..50e18eb8ea 100644 --- a/packages/backend/server/src/base/job/queue/def.ts +++ b/packages/backend/server/src/base/job/queue/def.ts @@ -27,6 +27,7 @@ export enum Queue { NOTIFICATION = 'notification', DOC = 'doc', COPILOT = 'copilot', + INDEXER = 'indexer', } export const QUEUES = Object.values(Queue); diff --git a/packages/backend/server/src/base/job/queue/executor.ts b/packages/backend/server/src/base/job/queue/executor.ts index 93689bab20..71e8e2e69a 100644 --- a/packages/backend/server/src/base/job/queue/executor.ts +++ b/packages/backend/server/src/base/job/queue/executor.ts @@ -27,11 +27,15 @@ export class JobExecutor implements OnModuleDestroy { @OnEvent('config.init') async onConfigInit() { - const queues = env.flavors.graphql ? difference(QUEUES, [Queue.DOC]) : []; + const queues = env.flavors.graphql + ? difference(QUEUES, [Queue.DOC, Queue.INDEXER]) + : []; // NOTE(@forehalo): only enable doc queue in doc service if (env.flavors.doc) { queues.push(Queue.DOC); + // NOTE(@fengmk2): Once the index task cannot be processed in time, it needs to be separated from the doc service and deployed independently. + queues.push(Queue.INDEXER); } await this.startWorkers(queues); diff --git a/packages/backend/server/src/core/doc/event.ts b/packages/backend/server/src/core/doc/event.ts index b61c030d32..d148cf5ff1 100644 --- a/packages/backend/server/src/core/doc/event.ts +++ b/packages/backend/server/src/core/doc/event.ts @@ -1,14 +1,17 @@ import { Injectable } from '@nestjs/common'; -import { OnEvent } from '../../base'; +import { JobQueue, OnEvent } from '../../base'; import { Models } from '../../models'; +import { PgWorkspaceDocStorageAdapter } from './adapters/workspace'; import { DocReader } from './reader'; @Injectable() export class DocEventsListener { constructor( private readonly docReader: DocReader, - private readonly models: Models + private readonly models: Models, + private readonly workspace: PgWorkspaceDocStorageAdapter, + private readonly queue: JobQueue ) {} @OnEvent('doc.snapshot.updated') @@ -26,6 +29,17 @@ export class DocEventsListener { return; } await this.models.doc.upsertMeta(workspaceId, docId, content); + await this.queue.add( + 'indexer.indexDoc', + { + workspaceId, + docId, + }, + { + jobId: `${workspaceId}/${docId}`, + priority: 100, + } + ); } else { // update workspace content to database const content = this.docReader.parseWorkspaceContent(blob); @@ -33,6 +47,33 @@ export class DocEventsListener { return; } await this.models.workspace.update(workspaceId, content); + await this.queue.add( + 'indexer.indexWorkspace', + { + workspaceId, + }, + { + jobId: workspaceId, + priority: 100, + } + ); + } + } + + @OnEvent('user.deleted') + async clearUserWorkspaces(payload: Events['user.deleted']) { + for (const workspace of payload.ownedWorkspaces) { + await this.workspace.deleteSpace(workspace); + await this.queue.add( + 'indexer.deleteWorkspace', + { + workspaceId: workspace, + }, + { + jobId: workspace, + priority: 0, + } + ); } } } diff --git a/packages/backend/server/src/core/doc/job.ts b/packages/backend/server/src/core/doc/job.ts index dc1cde21f7..1d0a5c184c 100644 --- a/packages/backend/server/src/core/doc/job.ts +++ b/packages/backend/server/src/core/doc/job.ts @@ -1,9 +1,8 @@ import { Injectable } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; -import { JobQueue, OnEvent, OnJob } from '../../base'; +import { JobQueue, OnJob } from '../../base'; import { Models } from '../../models'; -import { PgWorkspaceDocStorageAdapter } from './adapters/workspace'; declare global { interface Jobs { @@ -15,7 +14,6 @@ declare global { export class DocStorageCronJob { constructor( private readonly models: Models, - private readonly workspace: PgWorkspaceDocStorageAdapter, private readonly queue: JobQueue ) {} @@ -34,11 +32,4 @@ export class DocStorageCronJob { async cleanExpiredHistories() { await this.models.history.cleanExpired(); } - - @OnEvent('user.deleted') - async clearUserWorkspaces(payload: Events['user.deleted']) { - for (const workspace of payload.ownedWorkspaces) { - await this.workspace.deleteSpace(workspace); - } - } } diff --git a/packages/backend/server/src/core/utils/__tests__/__snapshots__/blocksute.spec.ts.md b/packages/backend/server/src/core/utils/__tests__/__snapshots__/blocksute.spec.ts.md new file mode 100644 index 0000000000..a263260663 --- /dev/null +++ b/packages/backend/server/src/core/utils/__tests__/__snapshots__/blocksute.spec.ts.md @@ -0,0 +1,644 @@ +# Snapshot report for `src/core/utils/__tests__/blocksute.spec.ts` + +The actual snapshot is saved in `blocksute.spec.ts.snap`. + +Generated by [AVA](https://avajs.dev). + +## can read all doc ids from workspace snapshot + +> Snapshot 1 + + [ + '5nS9BSp3Px', + ] + +## can read all blocks from doc snapshot + +> Snapshot 1 + + { + blocks: [ + { + additional: { + displayMode: 'edgeless', + noteBlockId: undefined, + }, + blockId: 'TnUgtVg7Eu', + content: 'Write, Draw, Plan all at Once.', + docId: 'doc-0', + flavour: 'affine:page', + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'RX4CG2zsBk', + }, + blockId: 'FoPQcAyV_m', + content: 'AFFiNE is an open source all in one workspace, an operating system for all the building blocks of your team wiki, knowledge management and digital assets and a better alternative to Notion and Miro. ', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'RX4CG2zsBk', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'RX4CG2zsBk', + }, + blockId: 'oz48nn_zp8', + content: '', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'RX4CG2zsBk', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'RX4CG2zsBk', + }, + blockId: 'g8a-D9-jXS', + content: 'You own your data, with no compromises', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'RX4CG2zsBk', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'RX4CG2zsBk', + }, + blockId: 'J8lHN1GR_5', + content: 'Local-first & Real-time collaborative', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'RX4CG2zsBk', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'RX4CG2zsBk', + }, + blockId: 'xCuWdM0VLz', + content: 'We love the idea proposed by Ink & Switch in the famous article about you owning your data, despite the cloud. Furthermore, AFFiNE is the first all-in-one workspace that keeps your data ownership with no compromises on real-time collaboration and editing experience.', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'RX4CG2zsBk', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'RX4CG2zsBk', + }, + blockId: 'zElMi0tViK', + content: 'AFFiNE is a local-first application upon CRDTs with real-time collaboration support. Your data is always stored locally while multiple nodes remain synced in real-time.', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'RX4CG2zsBk', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'RX4CG2zsBk', + }, + blockId: 'Z4rK0OF9Wk', + content: '', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'RX4CG2zsBk', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'S1mkc8zUoU', + }, + blockId: 'DQ0Ryb-SpW', + content: 'Blocks that assemble your next docs, tasks kanban or whiteboard', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'S1mkc8zUoU', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'yGlBdshAqN', + }, + blockId: 'HAZC3URZp_', + content: 'There is a large overlap of their atomic "building blocks" between these apps. They are neither open source nor have a plugin system like VS Code for contributors to customize. We want to have something that contains all the features we love and goes one step further. ', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'yGlBdshAqN', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'yGlBdshAqN', + }, + blockId: '0H87ypiuv8', + content: 'We are building AFFiNE to be a fundamental open source platform that contains all the building blocks for docs, task management and visual collaboration, hoping you can shape your next workflow with us that can make your life better and also connect others, too.', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'yGlBdshAqN', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'yGlBdshAqN', + }, + blockId: 'Sp4G1KD0Wn', + content: 'If you want to learn more about the product design of AFFiNE, here goes the concepts:', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'yGlBdshAqN', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'yGlBdshAqN', + }, + blockId: 'RsUhDuEqXa', + content: 'To Shape, not to adapt. AFFiNE is built for individuals & teams who care about their data, who refuse vendor lock-in, and who want to have control over their essential tools.', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'yGlBdshAqN', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: '6lDiuDqZGL', + }, + blockId: 'Z2HibKzAr-', + content: 'A true canvas for blocks in any form', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: '6lDiuDqZGL', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: '6lDiuDqZGL', + }, + blockId: 'UwvWddamzM', + content: 'Many editor apps claimed to be a canvas for productivity. Since the Mother of All Demos, Douglas Engelbart, a creative and programable digital workspace has been a pursuit and an ultimate mission for generations of tool makers. ', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: '6lDiuDqZGL', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: '6lDiuDqZGL', + }, + blockId: 'g9xKUjhJj1', + content: '', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: '6lDiuDqZGL', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: '6lDiuDqZGL', + }, + blockId: 'wDTn4YJ4pm', + content: '"We shape our tools and thereafter our tools shape us”. A lot of pioneers have inspired us a long the way, e.g.:', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: '6lDiuDqZGL', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: '6lDiuDqZGL', + }, + blockId: 'xFrrdiP3-V', + content: 'Quip & Notion with their great concept of "everything is a block"', + docId: 'doc-0', + flavour: 'affine:list', + parentBlockId: '6lDiuDqZGL', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: '6lDiuDqZGL', + }, + blockId: 'Tp9xyN4Okl', + content: 'Trello with their Kanban', + docId: 'doc-0', + flavour: 'affine:list', + parentBlockId: '6lDiuDqZGL', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: '6lDiuDqZGL', + }, + blockId: 'K_4hUzKZFQ', + content: 'Airtable & Miro with their no-code programable datasheets', + docId: 'doc-0', + flavour: 'affine:list', + parentBlockId: '6lDiuDqZGL', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: '6lDiuDqZGL', + }, + blockId: 'QwMzON2s7x', + content: 'Miro & Whimiscal with their edgeless visual whiteboard', + docId: 'doc-0', + flavour: 'affine:list', + parentBlockId: '6lDiuDqZGL', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: '6lDiuDqZGL', + }, + blockId: 'FFVmit6u1T', + content: 'Remnote & Capacities with their object-based tag system', + docId: 'doc-0', + flavour: 'affine:list', + parentBlockId: '6lDiuDqZGL', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'cauvaHOQmh', + }, + blockId: 'YqnG5O6AE6', + content: 'For more details, please refer to our RoadMap', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'cauvaHOQmh', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'cauvaHOQmh', + }, + blockId: 'sbDTmZMZcq', + content: 'Self Host', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'cauvaHOQmh', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'cauvaHOQmh', + }, + blockId: 'QVvitesfbj', + content: 'Self host AFFiNE', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'cauvaHOQmh', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: 'Learning From', + displayMode: 'page', + noteBlockId: '2jwCeO8Yot', + }, + blockId: 'U_GoHFD9At', + content: [ + 'Learning From', + 'Title', + 'Tag', + 'Reference', + 'Developers', + 'AFFiNE', + ], + docId: 'doc-0', + flavour: 'affine:database', + }, + { + additional: { + databaseName: 'Learning From', + displayMode: 'page', + noteBlockId: '2jwCeO8Yot', + }, + blockId: 'tpyOZbPc1P', + content: 'Affine Development', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'U_GoHFD9At', + parentFlavour: 'affine:database', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: 'Learning From', + displayMode: 'page', + noteBlockId: '2jwCeO8Yot', + }, + blockId: 'VMx9lHw3TR', + content: 'For developers or installations guides, please go to AFFiNE Doc', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'U_GoHFD9At', + parentFlavour: 'affine:database', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: 'Learning From', + displayMode: 'page', + noteBlockId: '2jwCeO8Yot', + }, + blockId: 'Q6LnVyKoGS', + content: 'Quip & Notion with their great concept of "everything is a block"', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'U_GoHFD9At', + parentFlavour: 'affine:database', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: 'Learning From', + displayMode: 'page', + noteBlockId: '2jwCeO8Yot', + }, + blockId: 'EkFHpB-mJi', + content: 'Trello with their Kanban', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'U_GoHFD9At', + parentFlavour: 'affine:database', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: 'Learning From', + displayMode: 'page', + noteBlockId: '2jwCeO8Yot', + }, + blockId: '3aMlphe2lp', + content: 'Airtable & Miro with their no-code programable datasheets', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'U_GoHFD9At', + parentFlavour: 'affine:database', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: 'Learning From', + displayMode: 'page', + noteBlockId: '2jwCeO8Yot', + }, + blockId: 'MiZtUig-fL', + content: 'Miro & Whimiscal with their edgeless visual whiteboard', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'U_GoHFD9At', + parentFlavour: 'affine:database', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: 'Learning From', + displayMode: 'page', + noteBlockId: '2jwCeO8Yot', + }, + blockId: 'erYE2C7cc5', + content: 'Remnote & Capacities with their object-based tag system', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'U_GoHFD9At', + parentFlavour: 'affine:database', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'c9MF_JiRgx', + }, + blockId: 'NyHXrMX3R1', + content: 'Affine Development', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'c9MF_JiRgx', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'c9MF_JiRgx', + }, + blockId: '9-K49otbCv', + content: 'For developer or installation guides, please go to AFFiNE Development', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'c9MF_JiRgx', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + databaseName: undefined, + displayMode: 'page', + noteBlockId: 'c9MF_JiRgx', + }, + blockId: 'faFteK9eG-', + content: '', + docId: 'doc-0', + flavour: 'affine:paragraph', + parentBlockId: 'c9MF_JiRgx', + parentFlavour: 'affine:note', + ref: [], + refDocId: [], + }, + { + additional: { + displayMode: 'edgeless', + noteBlockId: undefined, + }, + blockId: '6x7ALjUDjj', + content: [ + 'What is AFFiNE', + 'Related Articles', + ' ', + 'Self-host', + '', + 'AFFiNE ', + 'Development', + 'You can check these URLs to learn about AFFiNE', + 'Database Reference', + ], + docId: 'doc-0', + flavour: 'affine:surface', + parentBlockId: 'TnUgtVg7Eu', + parentFlavour: 'affine:page', + }, + { + additional: { + displayMode: 'edgeless', + noteBlockId: undefined, + }, + blockId: 'ECrtbvW6xx', + docId: 'doc-0', + flavour: 'affine:bookmark', + }, + { + additional: { + displayMode: 'edgeless', + noteBlockId: undefined, + }, + blockId: '5W--UQLN11', + docId: 'doc-0', + flavour: 'affine:bookmark', + }, + { + additional: { + displayMode: 'edgeless', + noteBlockId: undefined, + }, + blob: [ + 'BFZk3c2ERp-sliRvA7MQ_p3NdkdCLt2Ze0DQ9i21dpA=', + ], + blockId: 'lcZphIJe63', + docId: 'doc-0', + flavour: 'affine:image', + parentBlockId: '6x7ALjUDjj', + parentFlavour: 'affine:surface', + }, + { + additional: { + displayMode: 'edgeless', + noteBlockId: undefined, + }, + blob: [ + 'HWvCItS78DzPGbwcuaGcfkpVDUvL98IvH5SIK8-AcL8=', + ], + blockId: 'JlgVJdWU12', + docId: 'doc-0', + flavour: 'affine:image', + parentBlockId: '6x7ALjUDjj', + parentFlavour: 'affine:surface', + }, + { + additional: { + displayMode: 'edgeless', + noteBlockId: undefined, + }, + blob: [ + 'ZRKpsBoC88qEMmeiXKXqywfA1rLvWoLa5rpEh9x9Oj0=', + ], + blockId: 'lht7AqBqnF', + docId: 'doc-0', + flavour: 'affine:image', + parentBlockId: '6x7ALjUDjj', + parentFlavour: 'affine:surface', + }, + ], + summary: 'AFFiNE is an open source all in one workspace, an operating system for all the building blocks of your team wiki, knowledge management and digital assets and a better alternative to Notion and Miro. You own your data, with no compromisesLocal-first & Real-time collaborativeWe love the idea proposed by Ink & Switch in the famous article about you owning your data, despite the cloud. Furthermore, AFFiNE is the first all-in-one workspace that keeps your data ownership with no compromises on real-time collaboration and editing experience.AFFiNE is a local-first application upon CRDTs with real-time collaboration support. Your data is always stored locally while multiple nodes remain synced in real-time.Blocks that assemble your next docs, tasks kanban or whiteboardThere is a large overlap of their atomic "building blocks" between these apps. They are neither open source nor have a plugin system like VS Code for contributors to customize. We want to have something that contains all the features we love and goes one step further. ', + title: 'Write, Draw, Plan all at Once.', + } diff --git a/packages/backend/server/src/core/utils/__tests__/__snapshots__/blocksute.spec.ts.snap b/packages/backend/server/src/core/utils/__tests__/__snapshots__/blocksute.spec.ts.snap new file mode 100644 index 0000000000..06ec55a74f Binary files /dev/null and b/packages/backend/server/src/core/utils/__tests__/__snapshots__/blocksute.spec.ts.snap differ diff --git a/packages/backend/server/src/core/utils/__tests__/blocksute.spec.ts b/packages/backend/server/src/core/utils/__tests__/blocksute.spec.ts new file mode 100644 index 0000000000..f760d51acc --- /dev/null +++ b/packages/backend/server/src/core/utils/__tests__/blocksute.spec.ts @@ -0,0 +1,57 @@ +import test from 'ava'; +import { omit } from 'lodash-es'; + +import { createModule } from '../../../__tests__/create-module'; +import { Mockers } from '../../../__tests__/mocks'; +import { Models } from '../../../models'; +import { + readAllBlocksFromDocSnapshot, + readAllDocIdsFromWorkspaceSnapshot, +} from '../blocksuite'; + +const module = await createModule({}); +const models = module.get(Models); + +const owner = await module.create(Mockers.User); +const workspace = await module.create(Mockers.Workspace, { + snapshot: true, + owner, +}); + +const docSnapshot = await module.create(Mockers.DocSnapshot, { + workspaceId: workspace.id, + user: owner, +}); + +test.after.always(async () => { + await module.close(); +}); + +test('can read all doc ids from workspace snapshot', async t => { + const rootDoc = await models.doc.get(workspace.id, workspace.id); + t.truthy(rootDoc); + + const docIds = readAllDocIdsFromWorkspaceSnapshot(rootDoc!.blob); + + t.deepEqual(docIds, ['5nS9BSp3Px']); + t.snapshot(docIds); +}); + +test('can read all blocks from doc snapshot', async t => { + const rootDoc = await models.doc.get(workspace.id, workspace.id); + t.truthy(rootDoc); + const doc = await models.doc.get(workspace.id, docSnapshot.id); + t.truthy(doc); + + const result = await readAllBlocksFromDocSnapshot( + workspace.id, + rootDoc!.blob, + 'doc-0', + docSnapshot.blob + ); + + t.snapshot({ + ...result, + blocks: result!.blocks.map(block => omit(block, ['yblock'])), + }); +}); diff --git a/packages/backend/server/src/core/utils/blocksuite.ts b/packages/backend/server/src/core/utils/blocksuite.ts index 0317bf9232..22ee899023 100644 --- a/packages/backend/server/src/core/utils/blocksuite.ts +++ b/packages/backend/server/src/core/utils/blocksuite.ts @@ -1,12 +1,17 @@ // TODO(@forehalo): // Because of the `@affine/server` package can't import directly from workspace packages, -// this is a temprory solution to get the block suite data(title, description) from given yjs binary or yjs doc. +// this is a temporary solution to get the block suite data(title, description) from given yjs binary or yjs doc. // The logic is mainly copied from // - packages/frontend/core/src/modules/docs-search/worker/in-worker.ts // - packages/frontend/core/src/components/page-list/use-block-suite-page-preview.ts // and it's better to be provided by blocksuite -import { Array, Doc, Map } from 'yjs'; +// eslint-disable-next-line @typescript-eslint/no-restricted-imports -- import from bundle +import { + readAllBlocksFromDoc, + readAllDocIdsFromRootDoc, +} from '@affine/reader/dist'; +import { applyUpdate, Array as YArray, Doc as YDoc, Map as YMap } from 'yjs'; export interface PageDocContent { title: string; @@ -31,7 +36,7 @@ type KnownFlavour = | 'affine:callout' | 'affine:table'; -export function parseWorkspaceDoc(doc: Doc): WorkspaceDocContent | null { +export function parseWorkspaceDoc(doc: YDoc): WorkspaceDocContent | null { // not a workspace doc if (!doc.share.has('meta')) { return null; @@ -50,7 +55,7 @@ export interface ParsePageOptions { } export function parsePageDoc( - doc: Doc, + doc: YDoc, opts: ParsePageOptions = { maxSummaryLength: 150 } ): PageDocContent | null { // not a page doc @@ -58,7 +63,7 @@ export function parsePageDoc( return null; } - const blocks = doc.getMap>('blocks'); + const blocks = doc.getMap>('blocks'); if (!blocks.size) { return null; @@ -71,7 +76,7 @@ export function parsePageDoc( let summaryLenNeeded = opts.maxSummaryLength; - let root: Map | null = null; + let root: YMap | null = null; for (const block of blocks.values()) { const flavour = block.get('sys:flavour') as KnownFlavour; if (flavour === 'affine:page') { @@ -86,8 +91,8 @@ export function parsePageDoc( const queue: string[] = [root.get('sys:id')]; - function pushChildren(block: Map) { - const children = block.get('sys:children') as Array | undefined; + function pushChildren(block: YMap) { + const children = block.get('sys:children') as YArray | undefined; if (children?.length) { for (let i = children.length - 1; i >= 0; i--) { queue.push(children.get(i)); @@ -157,3 +162,34 @@ export function parsePageDoc( return content; } + +export function readAllDocIdsFromWorkspaceSnapshot(snapshot: Uint8Array) { + const rootDoc = new YDoc(); + applyUpdate(rootDoc, snapshot); + return readAllDocIdsFromRootDoc(rootDoc, { + includeTrash: false, + }); +} + +export async function readAllBlocksFromDocSnapshot( + workspaceId: string, + workspaceSnapshot: Uint8Array, + docId: string, + docSnapshot: Uint8Array, + maxSummaryLength?: number +) { + const rootYDoc = new YDoc({ + guid: workspaceId, + }); + applyUpdate(rootYDoc, workspaceSnapshot); + const ydoc = new YDoc({ + guid: docId, + }); + applyUpdate(ydoc, docSnapshot); + return await readAllBlocksFromDoc({ + ydoc, + rootYDoc, + spaceId: workspaceId, + maxSummaryLength, + }); +} diff --git a/packages/backend/server/src/models/doc.ts b/packages/backend/server/src/models/doc.ts index 67ea6fddcb..d013261bcd 100644 --- a/packages/backend/server/src/models/doc.ts +++ b/packages/backend/server/src/models/doc.ts @@ -158,14 +158,7 @@ export class DocModel extends BaseModel { * Get a doc by workspaceId and docId. */ async get(workspaceId: string, docId: string): Promise { - const row = await this.db.snapshot.findUnique({ - where: { - workspaceId_id: { - workspaceId, - id: docId, - }, - }, - }); + const row = await this.getSnapshot(workspaceId, docId); if (!row) { return null; } @@ -178,6 +171,17 @@ export class DocModel extends BaseModel { }; } + async getSnapshot(workspaceId: string, docId: string) { + return await this.db.snapshot.findUnique({ + where: { + workspaceId_id: { + workspaceId, + id: docId, + }, + }, + }); + } + async getAuthors(workspaceId: string, docId: string) { return await this.db.snapshot.findUnique({ where: { diff --git a/packages/backend/server/src/plugins/indexer/__tests__/__snapshots__/service.spec.ts.md b/packages/backend/server/src/plugins/indexer/__tests__/__snapshots__/service.spec.ts.md index 9a0864619b..e472c3492e 100644 --- a/packages/backend/server/src/plugins/indexer/__tests__/__snapshots__/service.spec.ts.md +++ b/packages/backend/server/src/plugins/indexer/__tests__/__snapshots__/service.spec.ts.md @@ -454,3 +454,43 @@ Generated by [AVA](https://avajs.dev). ], }, ] + +## should index doc work + +> Snapshot 1 + + { + summary: [ + 'AFFiNE is an open source all in one workspace, an operating system for all the building blocks of your team wiki, knowledge management and digital assets and a better alternative to Notion and Miro. You own your data, with no compromisesLocal-first & Real-time collaborativeWe love the idea proposed by Ink & Switch in the famous article about you owning your data, despite the cloud. Furthermore, AFFiNE is the first all-in-one workspace that keeps your data ownership with no compromises on real-time collaboration and editing experience.AFFiNE is a local-first application upon CRDTs with real-time collaboration support. Your data is always stored locally while multiple nodes remain synced in real-time.Blocks that assemble your next docs, tasks kanban or whiteboardThere is a large overlap of their atomic "building blocks" between these apps. They are neither open source nor have a plugin system like VS Code for contributors to customize. We want to have something that contains all the features we love and goes one step further. ', + ], + title: [ + 'Write, Draw, Plan all at Once.', + ], + } + +> Snapshot 2 + + [ + { + blockId: [ + 'VMx9lHw3TR', + ], + content: [ + 'For developers or installations guides, please go to AFFiNE Doc', + ], + flavour: [ + 'affine:paragraph', + ], + }, + { + blockId: [ + '9-K49otbCv', + ], + content: [ + 'For developer or installation guides, please go to AFFiNE Development', + ], + flavour: [ + 'affine:paragraph', + ], + }, + ] diff --git a/packages/backend/server/src/plugins/indexer/__tests__/__snapshots__/service.spec.ts.snap b/packages/backend/server/src/plugins/indexer/__tests__/__snapshots__/service.spec.ts.snap index c0fe57b987..80e888c3ac 100644 Binary files a/packages/backend/server/src/plugins/indexer/__tests__/__snapshots__/service.spec.ts.snap and b/packages/backend/server/src/plugins/indexer/__tests__/__snapshots__/service.spec.ts.snap differ diff --git a/packages/backend/server/src/plugins/indexer/__tests__/job.spec.ts b/packages/backend/server/src/plugins/indexer/__tests__/job.spec.ts new file mode 100644 index 0000000000..ab338a3ccb --- /dev/null +++ b/packages/backend/server/src/plugins/indexer/__tests__/job.spec.ts @@ -0,0 +1,108 @@ +import { randomUUID } from 'node:crypto'; +import { mock } from 'node:test'; + +import test from 'ava'; +import Sinon from 'sinon'; + +import { createModule } from '../../../__tests__/create-module'; +import { Mockers } from '../../../__tests__/mocks'; +import { ServerConfigModule } from '../../../core/config'; +import { IndexerModule, IndexerService } from '..'; +import { SearchProviderFactory } from '../factory'; +import { IndexerJob } from '../job'; +import { ManticoresearchProvider } from '../providers'; + +const module = await createModule({ + imports: [IndexerModule, ServerConfigModule], + providers: [IndexerService], +}); +const indexerService = module.get(IndexerService); +const indexerJob = module.get(IndexerJob); +const searchProviderFactory = module.get(SearchProviderFactory); +const manticoresearch = module.get(ManticoresearchProvider); + +const user = await module.create(Mockers.User); +const workspace = await module.create(Mockers.Workspace, { + snapshot: true, + owner: user, +}); + +test.after.always(async () => { + await module.close(); +}); + +test.afterEach.always(() => { + Sinon.restore(); + mock.reset(); +}); + +test.beforeEach(() => { + mock.method(searchProviderFactory, 'get', () => { + return manticoresearch; + }); +}); + +test('should handle indexer.indexDoc job', async t => { + const spy = Sinon.spy(indexerService, 'indexDoc'); + await indexerJob.indexDoc({ + workspaceId: workspace.id, + docId: randomUUID(), + }); + t.is(spy.callCount, 1); +}); + +test('should handle indexer.deleteDoc job', async t => { + const spy = Sinon.spy(indexerService, 'deleteDoc'); + await indexerJob.deleteDoc({ + workspaceId: workspace.id, + docId: randomUUID(), + }); + t.is(spy.callCount, 1); +}); + +test('should handle indexer.indexWorkspace job', async t => { + const count = module.queue.count('indexer.deleteDoc'); + const spy = Sinon.spy(indexerService, 'listDocIds'); + await indexerJob.indexWorkspace({ + workspaceId: workspace.id, + }); + t.is(spy.callCount, 1); + const { payload } = await module.queue.waitFor('indexer.indexDoc'); + t.is(payload.workspaceId, workspace.id); + t.is(payload.docId, '5nS9BSp3Px'); + // no delete job + t.is(module.queue.count('indexer.deleteDoc'), count); +}); + +test('should not sync existing doc', async t => { + const count = module.queue.count('indexer.indexDoc'); + mock.method(indexerService, 'listDocIds', async () => { + return ['5nS9BSp3Px']; + }); + await indexerJob.indexWorkspace({ + workspaceId: workspace.id, + }); + t.is(module.queue.count('indexer.indexDoc'), count); +}); + +test('should delete doc from indexer when docId is not in workspace', async t => { + const count = module.queue.count('indexer.deleteDoc'); + mock.method(indexerService, 'listDocIds', async () => { + return ['mock-doc-id1', 'mock-doc-id2']; + }); + await indexerJob.indexWorkspace({ + workspaceId: workspace.id, + }); + const { payload } = await module.queue.waitFor('indexer.indexDoc'); + t.is(payload.workspaceId, workspace.id); + t.is(payload.docId, '5nS9BSp3Px'); + t.is(module.queue.count('indexer.deleteDoc'), count + 2); +}); + +test('should handle indexer.deleteWorkspace job', async t => { + const spy = Sinon.spy(indexerService, 'deleteWorkspace'); + await indexerJob.deleteWorkspace({ + workspaceId: workspace.id, + }); + t.is(spy.callCount, 1); +}); diff --git a/packages/backend/server/src/plugins/indexer/__tests__/service.spec.ts b/packages/backend/server/src/plugins/indexer/__tests__/service.spec.ts index 7c7ff44b0d..0ec43fb7eb 100644 --- a/packages/backend/server/src/plugins/indexer/__tests__/service.spec.ts +++ b/packages/backend/server/src/plugins/indexer/__tests__/service.spec.ts @@ -27,7 +27,10 @@ const indexerService = module.get(IndexerService); const searchProviderFactory = module.get(SearchProviderFactory); const manticoresearch = module.get(ManticoresearchProvider); const user = await module.create(Mockers.User); -const workspace = await module.create(Mockers.Workspace); +const workspace = await module.create(Mockers.Workspace, { + snapshot: true, + owner: user, +}); mock.method(searchProviderFactory, 'get', () => { return manticoresearch; @@ -1580,3 +1583,524 @@ test('should throw error when field is not allowed in aggregate input', async t }); // #endregion + +// #region deleteWorkspace() + +test('should delete workspace work', async t => { + const workspaceId = randomUUID(); + const docId1 = randomUUID(); + const docId2 = randomUUID(); + await indexerService.write( + SearchTable.doc, + [ + { + workspaceId, + docId: docId1, + title: 'hello world', + summary: 'this is a test', + createdByUserId: user.id, + updatedByUserId: user.id, + createdAt: new Date(), + updatedAt: new Date(), + }, + { + workspaceId, + docId: docId2, + title: 'hello world', + summary: 'this is a test', + createdByUserId: user.id, + updatedByUserId: user.id, + createdAt: new Date(), + updatedAt: new Date(), + }, + ], + { + refresh: true, + } + ); + await indexerService.write( + SearchTable.block, + [ + { + workspaceId, + docId: docId1, + blockId: randomUUID(), + content: 'hello world', + flavour: 'affine:text', + createdByUserId: user.id, + updatedByUserId: user.id, + createdAt: new Date(), + updatedAt: new Date(), + }, + ], + { + refresh: true, + } + ); + + let result = await indexerService.search({ + table: SearchTable.doc, + query: { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + options: { + fields: ['workspaceId', 'docId', 'title', 'summary'], + }, + }); + + t.is(result.total, 2); + t.is(result.nodes.length, 2); + + let result2 = await indexerService.search({ + table: SearchTable.block, + query: { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + options: { + fields: ['workspaceId', 'docId', 'blockId', 'content', 'flavour'], + }, + }); + + t.is(result2.total, 1); + t.is(result2.nodes.length, 1); + + await indexerService.deleteWorkspace(workspaceId, { + refresh: true, + }); + + result = await indexerService.search({ + table: SearchTable.doc, + query: { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + options: { + fields: ['workspaceId', 'docId', 'title', 'summary'], + }, + }); + t.is(result.total, 0); + t.is(result.nodes.length, 0); + + result2 = await indexerService.search({ + table: SearchTable.block, + query: { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + options: { + fields: ['workspaceId', 'docId', 'blockId', 'content', 'flavour'], + }, + }); + + t.is(result2.total, 0); + t.is(result2.nodes.length, 0); +}); + +// #endregion + +// #region deleteDoc() + +test('should delete doc work', async t => { + const workspaceId = randomUUID(); + const docId1 = randomUUID(); + const docId2 = randomUUID(); + await indexerService.write( + SearchTable.doc, + [ + { + workspaceId, + docId: docId1, + title: 'hello world', + summary: 'this is a test', + createdByUserId: user.id, + updatedByUserId: user.id, + createdAt: new Date(), + updatedAt: new Date(), + }, + { + workspaceId, + docId: docId2, + title: 'hello world', + summary: 'this is a test', + createdByUserId: user.id, + updatedByUserId: user.id, + createdAt: new Date(), + updatedAt: new Date(), + }, + ], + { + refresh: true, + } + ); + await indexerService.write( + SearchTable.block, + [ + { + workspaceId, + docId: docId1, + blockId: randomUUID(), + content: 'hello world', + flavour: 'affine:text', + createdByUserId: user.id, + updatedByUserId: user.id, + createdAt: new Date(), + updatedAt: new Date(), + }, + { + workspaceId, + docId: docId2, + blockId: randomUUID(), + content: 'hello world', + flavour: 'affine:text', + createdByUserId: user.id, + updatedByUserId: user.id, + createdAt: new Date(), + updatedAt: new Date(), + }, + ], + { + refresh: true, + } + ); + + let result1 = await indexerService.search({ + table: SearchTable.doc, + query: { + type: SearchQueryType.boolean, + occur: SearchQueryOccur.must, + queries: [ + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + { + type: SearchQueryType.match, + field: 'docId', + match: docId1, + }, + ], + }, + options: { + fields: ['workspaceId', 'docId', 'title', 'summary'], + }, + }); + + t.is(result1.total, 1); + t.is(result1.nodes.length, 1); + t.deepEqual(result1.nodes[0].fields.docId, [docId1]); + + let result2 = await indexerService.search({ + table: SearchTable.doc, + query: { + type: SearchQueryType.boolean, + occur: SearchQueryOccur.must, + queries: [ + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + { + type: SearchQueryType.match, + field: 'docId', + match: docId2, + }, + ], + }, + options: { + fields: ['workspaceId', 'docId', 'title', 'summary'], + }, + }); + + t.is(result2.total, 1); + t.is(result2.nodes.length, 1); + t.deepEqual(result2.nodes[0].fields.docId, [docId2]); + + let result3 = await indexerService.search({ + table: SearchTable.block, + query: { + type: SearchQueryType.boolean, + occur: SearchQueryOccur.must, + queries: [ + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + { + type: SearchQueryType.match, + field: 'docId', + match: docId1, + }, + ], + }, + options: { + fields: ['workspaceId', 'docId', 'blockId', 'content', 'flavour'], + }, + }); + t.is(result3.total, 1); + t.is(result3.nodes.length, 1); + t.deepEqual(result3.nodes[0].fields.docId, [docId1]); + + let result4 = await indexerService.search({ + table: SearchTable.block, + query: { + type: SearchQueryType.boolean, + occur: SearchQueryOccur.must, + queries: [ + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + { + type: SearchQueryType.match, + field: 'docId', + match: docId2, + }, + ], + }, + options: { + fields: ['workspaceId', 'docId', 'blockId', 'content', 'flavour'], + }, + }); + t.is(result4.total, 1); + t.is(result4.nodes.length, 1); + t.deepEqual(result4.nodes[0].fields.docId, [docId2]); + + const count = module.event.count('doc.indexer.deleted'); + + await indexerService.deleteDoc(workspaceId, docId1, { + refresh: true, + }); + t.is(module.event.count('doc.indexer.deleted'), count + 1); + + // make sure the docId1 is deleted + result1 = await indexerService.search({ + table: SearchTable.doc, + query: { + type: SearchQueryType.boolean, + occur: SearchQueryOccur.must, + queries: [ + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + { + type: SearchQueryType.match, + field: 'docId', + match: docId1, + }, + ], + }, + options: { + fields: ['workspaceId', 'docId', 'title', 'summary'], + }, + }); + + t.is(result1.total, 0); + t.is(result1.nodes.length, 0); + + // make sure the docId2 is not deleted + result2 = await indexerService.search({ + table: SearchTable.doc, + query: { + type: SearchQueryType.boolean, + occur: SearchQueryOccur.must, + queries: [ + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + { + type: SearchQueryType.match, + field: 'docId', + match: docId2, + }, + ], + }, + options: { + fields: ['workspaceId', 'docId', 'title', 'summary'], + }, + }); + + t.is(result2.total, 1); + t.is(result2.nodes.length, 1); + t.deepEqual(result2.nodes[0].fields.docId, [docId2]); + + // make sure the docId1 block is deleted + result3 = await indexerService.search({ + table: SearchTable.block, + query: { + type: SearchQueryType.boolean, + occur: SearchQueryOccur.must, + queries: [ + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + { + type: SearchQueryType.match, + field: 'docId', + match: docId1, + }, + ], + }, + options: { + fields: ['workspaceId', 'docId', 'blockId', 'content', 'flavour'], + }, + }); + + t.is(result3.total, 0); + t.is(result3.nodes.length, 0); + + // docId2 block should not be deleted + result4 = await indexerService.search({ + table: SearchTable.block, + query: { + type: SearchQueryType.boolean, + occur: SearchQueryOccur.must, + queries: [ + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + { + type: SearchQueryType.match, + field: 'docId', + match: docId2, + }, + ], + }, + options: { + fields: ['workspaceId', 'docId', 'blockId', 'content', 'flavour'], + }, + }); + + t.is(result4.total, 1); + t.is(result4.nodes.length, 1); + t.deepEqual(result4.nodes[0].fields.docId, [docId2]); +}); + +// #endregion + +// #region listDocIds() + +test('should list doc ids work', async t => { + const workspaceId = randomUUID(); + const docs = []; + const docCount = 20011; + for (let i = 0; i < docCount; i++) { + docs.push({ + workspaceId, + docId: randomUUID(), + title: `hello world ${i} ${randomUUID()}`, + summary: `this is a test ${i} ${randomUUID()}`, + createdByUserId: user.id, + updatedByUserId: user.id, + createdAt: new Date(), + updatedAt: new Date(), + }); + } + await indexerService.write(SearchTable.doc, docs, { + refresh: true, + }); + + const docIds = await indexerService.listDocIds(workspaceId); + + t.is(docIds.length, docCount); + t.deepEqual(docIds.sort(), docs.map(doc => doc.docId).sort()); + + await indexerService.deleteWorkspace(workspaceId, { + refresh: true, + }); + const docIds2 = await indexerService.listDocIds(workspaceId); + + t.is(docIds2.length, 0); +}); + +// #endregion + +// #region indexDoc() + +test('should index doc work', async t => { + const count = module.event.count('doc.indexer.updated'); + const docSnapshot = await module.create(Mockers.DocSnapshot, { + workspaceId: workspace.id, + user, + }); + + await indexerService.indexDoc(workspace.id, docSnapshot.id, { + refresh: true, + }); + + const result = await indexerService.search({ + table: SearchTable.doc, + query: { + type: SearchQueryType.match, + field: 'docId', + match: docSnapshot.id, + }, + options: { + fields: ['workspaceId', 'docId', 'title', 'summary'], + }, + }); + + t.is(result.total, 1); + t.deepEqual(result.nodes[0].fields.workspaceId, [workspace.id]); + t.deepEqual(result.nodes[0].fields.docId, [docSnapshot.id]); + t.snapshot(omit(result.nodes[0].fields, ['workspaceId', 'docId'])); + + // search blocks + const result2 = await indexerService.search({ + table: SearchTable.block, + query: { + type: SearchQueryType.boolean, + occur: SearchQueryOccur.must, + queries: [ + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspace.id, + }, + { + type: SearchQueryType.match, + field: 'content', + match: + 'For developers or installations guides, please go to AFFiNE Doc', + }, + ], + }, + options: { + fields: ['workspaceId', 'docId', 'blockId', 'content', 'flavour'], + highlights: [ + { + field: 'content', + before: '', + end: '', + }, + ], + pagination: { + limit: 2, + }, + }, + }); + + t.is(result2.nodes.length, 2); + t.snapshot( + result2.nodes.map(node => omit(node.fields, ['workspaceId', 'docId'])) + ); + t.is(module.event.count('doc.indexer.updated'), count + 1); +}); +// #endregion diff --git a/packages/backend/server/src/plugins/indexer/index.ts b/packages/backend/server/src/plugins/indexer/index.ts index d98c806973..6235e9ace0 100644 --- a/packages/backend/server/src/plugins/indexer/index.ts +++ b/packages/backend/server/src/plugins/indexer/index.ts @@ -5,6 +5,7 @@ import { Module } from '@nestjs/common'; import { ServerConfigModule } from '../../core/config'; import { PermissionModule } from '../../core/permission'; import { SearchProviderFactory } from './factory'; +import { IndexerJob } from './job'; import { SearchProviders } from './providers'; import { IndexerResolver } from './resolver'; import { IndexerService } from './service'; @@ -14,6 +15,7 @@ import { IndexerService } from './service'; providers: [ IndexerResolver, IndexerService, + IndexerJob, SearchProviderFactory, ...SearchProviders, ], @@ -22,3 +24,16 @@ import { IndexerService } from './service'; export class IndexerModule {} export { IndexerService }; + +declare global { + interface Events { + 'doc.indexer.updated': { + workspaceId: string; + docId: string; + }; + 'doc.indexer.deleted': { + workspaceId: string; + docId: string; + }; + } +} diff --git a/packages/backend/server/src/plugins/indexer/job.ts b/packages/backend/server/src/plugins/indexer/job.ts new file mode 100644 index 0000000000..d94fc9e5a5 --- /dev/null +++ b/packages/backend/server/src/plugins/indexer/job.ts @@ -0,0 +1,110 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import { JobQueue, OnJob } from '../../base'; +import { readAllDocIdsFromWorkspaceSnapshot } from '../../core/utils/blocksuite'; +import { Models } from '../../models'; +import { IndexerService } from './service'; + +declare global { + interface Jobs { + 'indexer.indexDoc': { + workspaceId: string; + docId: string; + }; + 'indexer.deleteDoc': { + workspaceId: string; + docId: string; + }; + 'indexer.indexWorkspace': { + workspaceId: string; + }; + 'indexer.deleteWorkspace': { + workspaceId: string; + }; + } +} + +@Injectable() +export class IndexerJob { + private readonly logger = new Logger(IndexerJob.name); + + constructor( + private readonly models: Models, + private readonly service: IndexerService, + private readonly queue: JobQueue + ) {} + + @OnJob('indexer.indexDoc') + async indexDoc({ workspaceId, docId }: Jobs['indexer.indexDoc']) { + // delete the 'indexer.deleteDoc' job from the queue + await this.queue.remove(`${workspaceId}/${docId}`, 'indexer.deleteDoc'); + await this.service.indexDoc(workspaceId, docId); + } + + @OnJob('indexer.deleteDoc') + async deleteDoc({ workspaceId, docId }: Jobs['indexer.deleteDoc']) { + // delete the 'indexer.updateDoc' job from the queue + await this.queue.remove(`${workspaceId}/${docId}`, 'indexer.indexDoc'); + await this.service.deleteDoc(workspaceId, docId); + } + + @OnJob('indexer.indexWorkspace') + async indexWorkspace({ workspaceId }: Jobs['indexer.indexWorkspace']) { + await this.queue.remove(workspaceId, 'indexer.deleteWorkspace'); + const snapshot = await this.models.doc.getSnapshot( + workspaceId, + workspaceId + ); + if (!snapshot) { + this.logger.warn(`workspace ${workspaceId} not found`); + return; + } + const docIdsInWorkspace = readAllDocIdsFromWorkspaceSnapshot(snapshot.blob); + const docIdsInIndexer = await this.service.listDocIds(workspaceId); + const docIdsInWorkspaceSet = new Set(docIdsInWorkspace); + const docIdsInIndexerSet = new Set(docIdsInIndexer); + // diff the docIdsInWorkspace and docIdsInIndexer + const missingDocIds = docIdsInWorkspace.filter( + docId => !docIdsInIndexerSet.has(docId) + ); + const deletedDocIds = docIdsInIndexer.filter( + docId => !docIdsInWorkspaceSet.has(docId) + ); + for (const docId of deletedDocIds) { + await this.queue.add( + 'indexer.deleteDoc', + { + workspaceId, + docId, + }, + { + jobId: `${workspaceId}/${docId}`, + // the delete job should be higher priority than the update job + priority: 0, + } + ); + } + for (const docId of missingDocIds) { + await this.queue.add( + 'indexer.indexDoc', + { + workspaceId, + docId, + }, + { + jobId: `${workspaceId}/${docId}`, + priority: 100, + } + ); + } + this.logger.debug( + `indexed workspace ${workspaceId} with ${missingDocIds.length} missing docs and ${deletedDocIds.length} deleted docs` + ); + } + + @OnJob('indexer.deleteWorkspace') + async deleteWorkspace({ workspaceId }: Jobs['indexer.deleteWorkspace']) { + await this.queue.remove(workspaceId, 'indexer.indexWorkspace'); + await this.service.deleteWorkspace(workspaceId); + } +} diff --git a/packages/backend/server/src/plugins/indexer/service.ts b/packages/backend/server/src/plugins/indexer/service.ts index 2a9949aa8b..c1bbb48876 100644 --- a/packages/backend/server/src/plugins/indexer/service.ts +++ b/packages/backend/server/src/plugins/indexer/service.ts @@ -1,7 +1,13 @@ import { Injectable, Logger } from '@nestjs/common'; import { camelCase, chunk, mapKeys, snakeCase } from 'lodash-es'; -import { InvalidIndexerInput, SearchProviderNotFound } from '../../base'; +import { + EventBus, + InvalidIndexerInput, + SearchProviderNotFound, +} from '../../base'; +import { readAllBlocksFromDocSnapshot } from '../../core/utils/blocksuite'; +import { Models } from '../../models'; import { SearchProviderType } from './config'; import { SearchProviderFactory } from './factory'; import { @@ -30,6 +36,7 @@ import { SearchHighlight, SearchInput, SearchQuery, + SearchQueryOccur, SearchQueryType, } from './types'; @@ -99,7 +106,11 @@ export interface SearchNodeWithMeta extends SearchNode { export class IndexerService { private readonly logger = new Logger(IndexerService.name); - constructor(private readonly factory: SearchProviderFactory) {} + constructor( + private readonly models: Models, + private readonly factory: SearchProviderFactory, + private readonly event: EventBus + ) {} async createTables() { let searchProvider: SearchProvider | undefined; @@ -161,6 +172,204 @@ export class IndexerService { return result; } + async listDocIds(workspaceId: string) { + const docIds: string[] = []; + let cursor: string | undefined; + do { + const result = await this.search({ + table: SearchTable.doc, + query: { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + options: { + fields: ['docId'], + pagination: { + limit: 10000, + cursor, + }, + }, + }); + docIds.push(...result.nodes.map(node => node.fields.docId[0] as string)); + cursor = result.nextCursor; + this.logger.debug( + `get ${result.nodes.length} new / ${docIds.length} total doc ids for workspace ${workspaceId}, nextCursor: ${cursor}` + ); + } while (cursor); + return docIds; + } + + async indexDoc( + workspaceId: string, + docId: string, + options?: OperationOptions + ) { + const workspaceSnapshot = await this.models.doc.getSnapshot( + workspaceId, + workspaceId + ); + if (!workspaceSnapshot) { + this.logger.debug(`workspace ${workspaceId} not found`); + return; + } + const docSnapshot = await this.models.doc.getSnapshot(workspaceId, docId); + if (!docSnapshot) { + this.logger.debug(`doc ${workspaceId}/${docId} not found`); + return; + } + if (docSnapshot.blob.length <= 2) { + this.logger.debug(`doc ${workspaceId}/${docId} is empty, skip indexing`); + return; + } + const result = await readAllBlocksFromDocSnapshot( + workspaceId, + workspaceSnapshot.blob, + docId, + docSnapshot.blob + ); + if (!result) { + this.logger.warn( + `parse doc ${workspaceId}/${docId} failed, workspaceSnapshot size: ${workspaceSnapshot.blob.length}, docSnapshot size: ${docSnapshot.blob.length}` + ); + return; + } + await this.write( + SearchTable.doc, + [ + { + workspaceId, + docId, + title: result.title, + summary: result.summary, + // NOTE(@fengmk): journal is not supported yet + // journal: result.journal, + createdByUserId: docSnapshot.createdBy ?? '', + updatedByUserId: docSnapshot.updatedBy ?? '', + createdAt: docSnapshot.createdAt, + updatedAt: docSnapshot.updatedAt, + }, + ], + options + ); + await this.deleteBlocksByDocId(workspaceId, docId, options); + await this.write( + SearchTable.block, + result.blocks.map(block => ({ + workspaceId, + docId, + blockId: block.blockId, + content: block.content ?? '', + flavour: block.flavour, + blob: block.blob, + refDocId: block.refDocId, + ref: block.ref, + parentFlavour: block.parentFlavour, + parentBlockId: block.parentBlockId, + additional: block.additional + ? JSON.stringify(block.additional) + : undefined, + markdownPreview: block.markdownPreview, + createdByUserId: docSnapshot.createdBy ?? '', + updatedByUserId: docSnapshot.updatedBy ?? '', + createdAt: docSnapshot.createdAt, + updatedAt: docSnapshot.updatedAt, + })), + options + ); + this.event.emit('doc.indexer.updated', { + workspaceId, + docId, + }); + this.logger.debug( + `synced doc ${workspaceId}/${docId} with ${result.blocks.length} blocks` + ); + } + + async deleteDoc( + workspaceId: string, + docId: string, + options?: OperationOptions + ) { + await this.deleteByQuery( + SearchTable.doc, + { + type: SearchQueryType.boolean, + occur: SearchQueryOccur.must, + queries: [ + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + { + type: SearchQueryType.match, + field: 'docId', + match: docId, + }, + ], + }, + options + ); + this.logger.debug(`deleted doc ${workspaceId}/${docId}`); + await this.deleteBlocksByDocId(workspaceId, docId, options); + this.event.emit('doc.indexer.deleted', { + workspaceId, + docId, + }); + } + + async deleteBlocksByDocId( + workspaceId: string, + docId: string, + options?: OperationOptions + ) { + await this.deleteByQuery( + SearchTable.block, + { + type: SearchQueryType.boolean, + occur: SearchQueryOccur.must, + queries: [ + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + { + type: SearchQueryType.match, + field: 'docId', + match: docId, + }, + ], + }, + options + ); + this.logger.debug(`deleted all blocks in doc ${workspaceId}/${docId}`); + } + + async deleteWorkspace(workspaceId: string, options?: OperationOptions) { + await this.deleteByQuery( + SearchTable.doc, + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + options + ); + this.logger.debug(`deleted all docs in workspace ${workspaceId}`); + await this.deleteByQuery( + SearchTable.block, + { + type: SearchQueryType.match, + field: 'workspaceId', + match: workspaceId, + }, + options + ); + this.logger.debug(`deleted all blocks in workspace ${workspaceId}`); + } + async deleteByQuery( table: T, query: SearchQuery, diff --git a/packages/backend/server/tsconfig.json b/packages/backend/server/tsconfig.json index 6997a939f7..fec2cae0e5 100644 --- a/packages/backend/server/tsconfig.json +++ b/packages/backend/server/tsconfig.json @@ -12,6 +12,7 @@ }, "include": ["./src"], "references": [ + { "path": "../../common/reader" }, { "path": "../native" }, { "path": "../../../tools/cli" }, { "path": "../../../tools/utils" }, diff --git a/packages/frontend/admin/src/config.json b/packages/frontend/admin/src/config.json index e4f8ac5bdb..9760727906 100644 --- a/packages/frontend/admin/src/config.json +++ b/packages/frontend/admin/src/config.json @@ -31,6 +31,10 @@ "type": "Object", "desc": "The config for doc job queue" }, + "queues.indexer": { + "type": "Object", + "desc": "The config for indexer job queue" + }, "queues.notification": { "type": "Object", "desc": "The config for notification job queue" diff --git a/tools/utils/src/workspace.gen.ts b/tools/utils/src/workspace.gen.ts index b7387e5970..3a29cc84c2 100644 --- a/tools/utils/src/workspace.gen.ts +++ b/tools/utils/src/workspace.gen.ts @@ -1084,6 +1084,7 @@ export const PackageList = [ location: 'packages/backend/server', name: '@affine/server', workspaceDependencies: [ + 'packages/common/reader', 'packages/backend/native', 'tools/cli', 'tools/utils', diff --git a/yarn.lock b/yarn.lock index 6ad19327af..ae1daf0f6f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -906,6 +906,7 @@ __metadata: "@affine-tools/cli": "workspace:*" "@affine-tools/utils": "workspace:*" "@affine/graphql": "workspace:*" + "@affine/reader": "workspace:*" "@affine/server-native": "workspace:*" "@ai-sdk/anthropic": "npm:^1.2.10" "@ai-sdk/google": "npm:^1.2.10"