feat: native update merge (#14250)

#### PR Dependency Tree


* **PR #14250** 👈

This tree was auto-generated by
[Charcoal](https://github.com/danerwilliams/charcoal)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Backend Optimization**
  * Faster document retrieval via a native binary fetch path.
* Native-accelerated merging of document updates for improved
performance and consistency.
* **Indexing & Reliability**
* Indexing now only proceeds on valid parse results, with clearer
warnings and richer metadata on failures.
* More consistent sync behavior and enhanced diagnostic logging for
indexing operations.
* **Tests**
  * Expanded tests to cover native binary retrieval error handling.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2026-01-13 22:03:55 +08:00
committed by GitHub
parent 279b7bb64f
commit b331a08744
4 changed files with 112 additions and 72 deletions

View File

@@ -100,6 +100,9 @@ test('should throw error when doc service internal error', async t => {
mock.method(adapter, 'getDoc', async () => {
throw new Error('mock doc service internal error');
});
mock.method(adapter, 'getDocBinNative', async () => {
throw new Error('mock doc service internal error');
});
let err = await t.throwsAsync(docReader.getDoc(workspace.id, docId), {
instanceOf: UserFriendlyError,
message: 'An internal error occurred.',

View File

@@ -213,11 +213,9 @@ export class DatabaseDocReader extends DocReader {
guid: string,
fullContent?: boolean
): Promise<PageDocContent | null> {
const docRecord = await this.workspace.getDoc(workspaceId, guid);
if (!docRecord) {
return null;
}
return this.parseDocContent(docRecord.bin, fullContent ? -1 : 150);
const docBinary = await this.workspace.getDocBinNative(workspaceId, guid);
if (!docBinary) return null;
return this.parseDocContent(docBinary, fullContent ? -1 : 150);
}
protected override async getWorkspaceContentWithoutCache(

View File

@@ -13,9 +13,15 @@ import {
} from 'yjs';
import { CallMetric } from '../../../base';
import { mergeUpdatesInApplyWay } from '../../../native';
import { Connection } from './connection';
import { SingletonLocker } from './lock';
async function nativeMergeUpdates(updates: Uint8Array[]): Promise<Uint8Array> {
// use native module to merge updates
return mergeUpdatesInApplyWay(updates.map(u => Buffer.from(u)));
}
export interface DocRecord {
spaceId: string;
docId: string;
@@ -95,6 +101,27 @@ export abstract class DocStorageAdapter extends Connection {
return snapshot;
}
/// get final binary only but not updating the snapshot in database
async getDocBinNative(
spaceId: string,
docId: string
): Promise<Uint8Array | undefined> {
await using _lock = await this.lockDocForUpdate(spaceId, docId);
const snapshot = await this.getDocSnapshot(spaceId, docId);
const updates = await this.getDocUpdates(spaceId, docId);
if (updates.length) {
const docUpdate = await this.squash(
snapshot ? [snapshot, ...updates] : updates,
nativeMergeUpdates
);
return docUpdate.bin;
}
return snapshot?.bin;
}
@Transactional<TransactionalAdapterPrisma>({ timeout: 60000 })
private async squashUpdatesToSnapshot(
spaceId: string,
@@ -223,8 +250,11 @@ export abstract class DocStorageAdapter extends Connection {
): Promise<boolean>;
@CallMetric('doc', 'squash')
protected async squash(updates: DocUpdate[]): Promise<DocUpdate> {
const merge = this.options?.mergeUpdates ?? mergeUpdates;
protected async squash(
updates: DocUpdate[],
merge?: (updates: Uint8Array[]) => Promise<Uint8Array>
): Promise<DocUpdate> {
const mergeFn = merge ?? this.options?.mergeUpdates ?? mergeUpdates;
const lastUpdate = updates.at(-1);
if (!lastUpdate) {
throw new Error('No updates to be squashed.');
@@ -235,7 +265,7 @@ export abstract class DocStorageAdapter extends Connection {
return lastUpdate;
}
const finalUpdate = await merge(updates.map(u => u.bin));
const finalUpdate = await mergeFn(updates.map(u => u.bin));
return {
bin: finalUpdate,

View File

@@ -210,14 +210,6 @@ export class IndexerService {
docId: string,
options?: OperationOptions
) {
const workspaceSnapshot = await this.models.doc.getSnapshot(
workspaceId,
workspaceId
);
if (!workspaceSnapshot) {
this.logger.debug(`workspace ${workspaceId} not found`);
return;
}
const docSnapshot = await this.models.doc.getSnapshot(workspaceId, docId);
if (!docSnapshot) {
this.logger.debug(`doc ${workspaceId}/${docId} not found`);
@@ -227,64 +219,81 @@ export class IndexerService {
this.logger.debug(`doc ${workspaceId}/${docId} is empty, skip indexing`);
return;
}
const result = await readAllBlocksFromDocSnapshot(docId, docSnapshot.blob);
if (!result) {
this.logger.warn(
`parse doc ${workspaceId}/${docId} failed, workspaceSnapshot size: ${workspaceSnapshot.blob.length}, docSnapshot size: ${docSnapshot.blob.length}`
);
return;
}
await this.write(
SearchTable.doc,
[
{
workspaceId,
docId,
title: result.title,
summary: result.summary,
// NOTE(@fengmk): journal is not supported yet
// journal: result.journal,
createdByUserId: docSnapshot.createdBy ?? '',
updatedByUserId: docSnapshot.updatedBy ?? '',
createdAt: docSnapshot.createdAt,
updatedAt: docSnapshot.updatedAt,
},
],
options
);
await this.deleteBlocksByDocId(workspaceId, docId, options);
await this.write(
SearchTable.block,
result.blocks.map(block => ({
workspaceId,
docId,
blockId: block.blockId,
content: block.content ?? '',
flavour: block.flavour,
blob: block.blob,
refDocId: block.refDocId,
ref: block.ref,
parentFlavour: block.parentFlavour,
parentBlockId: block.parentBlockId,
additional: block.additional
? JSON.stringify(block.additional)
: undefined,
markdownPreview: undefined,
createdByUserId: docSnapshot.createdBy ?? '',
updatedByUserId: docSnapshot.updatedBy ?? '',
createdAt: docSnapshot.createdAt,
updatedAt: docSnapshot.updatedAt,
})),
options
);
await this.queue.add('copilot.embedding.updateDoc', {
const metadata = {
workspaceId,
docId,
});
this.logger.log(
`synced doc ${workspaceId}/${docId} with ${result.blocks.length} blocks`
);
docSnapshotSize: docSnapshot.blob.length,
};
try {
const result = await readAllBlocksFromDocSnapshot(
docId,
docSnapshot.blob
);
if (result) {
await this.write(
SearchTable.doc,
[
{
workspaceId,
docId,
title: result.title,
summary: result.summary,
// NOTE(@fengmk): journal is not supported yet
// journal: result.journal,
createdByUserId: docSnapshot.createdBy ?? '',
updatedByUserId: docSnapshot.updatedBy ?? '',
createdAt: docSnapshot.createdAt,
updatedAt: docSnapshot.updatedAt,
},
],
options
);
await this.deleteBlocksByDocId(workspaceId, docId, options);
await this.write(
SearchTable.block,
result.blocks.map(block => ({
workspaceId,
docId,
blockId: block.blockId,
content: block.content ?? '',
flavour: block.flavour,
blob: block.blob,
refDocId: block.refDocId,
ref: block.ref,
parentFlavour: block.parentFlavour,
parentBlockId: block.parentBlockId,
additional: block.additional
? JSON.stringify(block.additional)
: undefined,
markdownPreview: undefined,
createdByUserId: docSnapshot.createdBy ?? '',
updatedByUserId: docSnapshot.updatedBy ?? '',
createdAt: docSnapshot.createdAt,
updatedAt: docSnapshot.updatedAt,
})),
options
);
await this.queue.add('copilot.embedding.updateDoc', {
workspaceId,
docId,
});
this.logger.verbose(
`synced doc ${workspaceId}/${docId} with ${result.blocks.length} blocks`
);
} else {
this.logger.warn(
`failed to parse ${workspaceId}/${docId}, no result returned`,
metadata
);
}
} catch (err) {
this.logger.warn(
`failed to parse ${workspaceId}/${docId}: ${err}`,
metadata
);
}
}
async deleteDoc(