From b331a08744d791636411e26379eee87bfc9d9fb3 Mon Sep 17 00:00:00 2001 From: DarkSky <25152247+darkskygit@users.noreply.github.com> Date: Tue, 13 Jan 2026 22:03:55 +0800 Subject: [PATCH] feat: native update merge (#14250) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### PR Dependency Tree * **PR #14250** 👈 This tree was auto-generated by [Charcoal](https://github.com/danerwilliams/charcoal) ## Summary by CodeRabbit * **Backend Optimization** * Faster document retrieval via a native binary fetch path. * Native-accelerated merging of document updates for improved performance and consistency. * **Indexing & Reliability** * Indexing now only proceeds on valid parse results, with clearer warnings and richer metadata on failures. * More consistent sync behavior and enhanced diagnostic logging for indexing operations. * **Tests** * Expanded tests to cover native binary retrieval error handling. ✏️ Tip: You can customize this high-level summary in your review settings. --- .../doc/__tests__/reader-from-rpc.spec.ts | 3 + .../backend/server/src/core/doc/reader.ts | 8 +- .../server/src/core/doc/storage/doc.ts | 36 ++++- .../server/src/plugins/indexer/service.ts | 137 ++++++++++-------- 4 files changed, 112 insertions(+), 72 deletions(-) diff --git a/packages/backend/server/src/core/doc/__tests__/reader-from-rpc.spec.ts b/packages/backend/server/src/core/doc/__tests__/reader-from-rpc.spec.ts index cd47a6b260..d43488d36f 100644 --- a/packages/backend/server/src/core/doc/__tests__/reader-from-rpc.spec.ts +++ b/packages/backend/server/src/core/doc/__tests__/reader-from-rpc.spec.ts @@ -100,6 +100,9 @@ test('should throw error when doc service internal error', async t => { mock.method(adapter, 'getDoc', async () => { throw new Error('mock doc service internal error'); }); + mock.method(adapter, 'getDocBinNative', async () => { + throw new Error('mock doc service internal error'); + }); let err = await t.throwsAsync(docReader.getDoc(workspace.id, docId), { instanceOf: UserFriendlyError, message: 'An internal error occurred.', diff --git a/packages/backend/server/src/core/doc/reader.ts b/packages/backend/server/src/core/doc/reader.ts index 1dcfa94b2b..74c557fd7b 100644 --- a/packages/backend/server/src/core/doc/reader.ts +++ b/packages/backend/server/src/core/doc/reader.ts @@ -213,11 +213,9 @@ export class DatabaseDocReader extends DocReader { guid: string, fullContent?: boolean ): Promise { - const docRecord = await this.workspace.getDoc(workspaceId, guid); - if (!docRecord) { - return null; - } - return this.parseDocContent(docRecord.bin, fullContent ? -1 : 150); + const docBinary = await this.workspace.getDocBinNative(workspaceId, guid); + if (!docBinary) return null; + return this.parseDocContent(docBinary, fullContent ? -1 : 150); } protected override async getWorkspaceContentWithoutCache( diff --git a/packages/backend/server/src/core/doc/storage/doc.ts b/packages/backend/server/src/core/doc/storage/doc.ts index 486c10d99e..2af966a7aa 100644 --- a/packages/backend/server/src/core/doc/storage/doc.ts +++ b/packages/backend/server/src/core/doc/storage/doc.ts @@ -13,9 +13,15 @@ import { } from 'yjs'; import { CallMetric } from '../../../base'; +import { mergeUpdatesInApplyWay } from '../../../native'; import { Connection } from './connection'; import { SingletonLocker } from './lock'; +async function nativeMergeUpdates(updates: Uint8Array[]): Promise { + // use native module to merge updates + return mergeUpdatesInApplyWay(updates.map(u => Buffer.from(u))); +} + export interface DocRecord { spaceId: string; docId: string; @@ -95,6 +101,27 @@ export abstract class DocStorageAdapter extends Connection { return snapshot; } + /// get final binary only but not updating the snapshot in database + async getDocBinNative( + spaceId: string, + docId: string + ): Promise { + await using _lock = await this.lockDocForUpdate(spaceId, docId); + + const snapshot = await this.getDocSnapshot(spaceId, docId); + const updates = await this.getDocUpdates(spaceId, docId); + + if (updates.length) { + const docUpdate = await this.squash( + snapshot ? [snapshot, ...updates] : updates, + nativeMergeUpdates + ); + return docUpdate.bin; + } + + return snapshot?.bin; + } + @Transactional({ timeout: 60000 }) private async squashUpdatesToSnapshot( spaceId: string, @@ -223,8 +250,11 @@ export abstract class DocStorageAdapter extends Connection { ): Promise; @CallMetric('doc', 'squash') - protected async squash(updates: DocUpdate[]): Promise { - const merge = this.options?.mergeUpdates ?? mergeUpdates; + protected async squash( + updates: DocUpdate[], + merge?: (updates: Uint8Array[]) => Promise + ): Promise { + const mergeFn = merge ?? this.options?.mergeUpdates ?? mergeUpdates; const lastUpdate = updates.at(-1); if (!lastUpdate) { throw new Error('No updates to be squashed.'); @@ -235,7 +265,7 @@ export abstract class DocStorageAdapter extends Connection { return lastUpdate; } - const finalUpdate = await merge(updates.map(u => u.bin)); + const finalUpdate = await mergeFn(updates.map(u => u.bin)); return { bin: finalUpdate, diff --git a/packages/backend/server/src/plugins/indexer/service.ts b/packages/backend/server/src/plugins/indexer/service.ts index 09974bb2c9..abc2b83885 100644 --- a/packages/backend/server/src/plugins/indexer/service.ts +++ b/packages/backend/server/src/plugins/indexer/service.ts @@ -210,14 +210,6 @@ export class IndexerService { docId: string, options?: OperationOptions ) { - const workspaceSnapshot = await this.models.doc.getSnapshot( - workspaceId, - workspaceId - ); - if (!workspaceSnapshot) { - this.logger.debug(`workspace ${workspaceId} not found`); - return; - } const docSnapshot = await this.models.doc.getSnapshot(workspaceId, docId); if (!docSnapshot) { this.logger.debug(`doc ${workspaceId}/${docId} not found`); @@ -227,64 +219,81 @@ export class IndexerService { this.logger.debug(`doc ${workspaceId}/${docId} is empty, skip indexing`); return; } - const result = await readAllBlocksFromDocSnapshot(docId, docSnapshot.blob); - if (!result) { - this.logger.warn( - `parse doc ${workspaceId}/${docId} failed, workspaceSnapshot size: ${workspaceSnapshot.blob.length}, docSnapshot size: ${docSnapshot.blob.length}` - ); - return; - } - await this.write( - SearchTable.doc, - [ - { - workspaceId, - docId, - title: result.title, - summary: result.summary, - // NOTE(@fengmk): journal is not supported yet - // journal: result.journal, - createdByUserId: docSnapshot.createdBy ?? '', - updatedByUserId: docSnapshot.updatedBy ?? '', - createdAt: docSnapshot.createdAt, - updatedAt: docSnapshot.updatedAt, - }, - ], - options - ); - await this.deleteBlocksByDocId(workspaceId, docId, options); - await this.write( - SearchTable.block, - result.blocks.map(block => ({ - workspaceId, - docId, - blockId: block.blockId, - content: block.content ?? '', - flavour: block.flavour, - blob: block.blob, - refDocId: block.refDocId, - ref: block.ref, - parentFlavour: block.parentFlavour, - parentBlockId: block.parentBlockId, - additional: block.additional - ? JSON.stringify(block.additional) - : undefined, - markdownPreview: undefined, - createdByUserId: docSnapshot.createdBy ?? '', - updatedByUserId: docSnapshot.updatedBy ?? '', - createdAt: docSnapshot.createdAt, - updatedAt: docSnapshot.updatedAt, - })), - options - ); - - await this.queue.add('copilot.embedding.updateDoc', { + const metadata = { workspaceId, docId, - }); - this.logger.log( - `synced doc ${workspaceId}/${docId} with ${result.blocks.length} blocks` - ); + docSnapshotSize: docSnapshot.blob.length, + }; + + try { + const result = await readAllBlocksFromDocSnapshot( + docId, + docSnapshot.blob + ); + if (result) { + await this.write( + SearchTable.doc, + [ + { + workspaceId, + docId, + title: result.title, + summary: result.summary, + // NOTE(@fengmk): journal is not supported yet + // journal: result.journal, + createdByUserId: docSnapshot.createdBy ?? '', + updatedByUserId: docSnapshot.updatedBy ?? '', + createdAt: docSnapshot.createdAt, + updatedAt: docSnapshot.updatedAt, + }, + ], + options + ); + await this.deleteBlocksByDocId(workspaceId, docId, options); + await this.write( + SearchTable.block, + result.blocks.map(block => ({ + workspaceId, + docId, + blockId: block.blockId, + content: block.content ?? '', + flavour: block.flavour, + blob: block.blob, + refDocId: block.refDocId, + ref: block.ref, + parentFlavour: block.parentFlavour, + parentBlockId: block.parentBlockId, + additional: block.additional + ? JSON.stringify(block.additional) + : undefined, + markdownPreview: undefined, + createdByUserId: docSnapshot.createdBy ?? '', + updatedByUserId: docSnapshot.updatedBy ?? '', + createdAt: docSnapshot.createdAt, + updatedAt: docSnapshot.updatedAt, + })), + options + ); + + await this.queue.add('copilot.embedding.updateDoc', { + workspaceId, + docId, + }); + this.logger.verbose( + `synced doc ${workspaceId}/${docId} with ${result.blocks.length} blocks` + ); + } else { + this.logger.warn( + `failed to parse ${workspaceId}/${docId}, no result returned`, + metadata + ); + } + } catch (err) { + this.logger.warn( + `failed to parse ${workspaceId}/${docId}: ${err}`, + metadata + ); + } } async deleteDoc(