feat: native sync state (#14190)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **New Features**
* Added indexed clock management capabilities for documents, enabling
get, set, and clear operations across Android, iOS, Electron, and web
platforms.

* **Refactor**
* Improved storage architecture to dynamically select platform-specific
implementations (SQLite for Electron, IndexedDB for others).

* **Bug Fixes**
* Enhanced document operations to properly maintain and clean up indexer
synchronization state during document lifecycle changes.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
DarkSky
2025-12-31 04:09:32 +08:00
committed by GitHub
parent 95ef04f3e0
commit 99332228da
18 changed files with 375 additions and 12 deletions
@@ -3,6 +3,7 @@ import type {
BlobRecord,
CrawlResult,
DocClock,
DocIndexedClock,
DocRecord,
ListedBlobRecord,
} from '../../storage';
@@ -29,6 +30,17 @@ export interface NativeDBApis {
deleteDoc: (id: string, docId: string) => Promise<void>;
getDocClocks: (id: string, after?: Date | null) => Promise<DocClock[]>;
getDocClock: (id: string, docId: string) => Promise<DocClock | null>;
getDocIndexedClock: (
id: string,
docId: string
) => Promise<DocIndexedClock | null>;
setDocIndexedClock: (
id: string,
docId: string,
indexedClock: Date,
indexerVersion: number
) => Promise<void>;
clearDocIndexedClock: (id: string, docId: string) => Promise<void>;
getBlob: (id: string, key: string) => Promise<BlobRecord | null>;
setBlob: (id: string, blob: BlobRecord) => Promise<void>;
deleteBlob: (id: string, key: string, permanently: boolean) => Promise<void>;
@@ -4,6 +4,7 @@ import { SqliteBlobSyncStorage } from './blob-sync';
import { SqliteDocStorage } from './doc';
import { SqliteDocSyncStorage } from './doc-sync';
import { SqliteIndexerStorage } from './indexer';
import { SqliteIndexerSyncStorage } from './indexer-sync';
export * from './blob';
export * from './blob-sync';
@@ -11,6 +12,7 @@ export { bindNativeDBApis, type NativeDBApis } from './db';
export * from './doc';
export * from './doc-sync';
export * from './indexer';
export * from './indexer-sync';
export const sqliteStorages = [
SqliteDocStorage,
@@ -18,4 +20,5 @@ export const sqliteStorages = [
SqliteDocSyncStorage,
SqliteBlobSyncStorage,
SqliteIndexerStorage,
SqliteIndexerSyncStorage,
] satisfies StorageConstructor[];
@@ -0,0 +1,38 @@
import { share } from '../../connection';
import {
type DocIndexedClock,
IndexerSyncStorageBase,
} from '../../storage/indexer-sync';
import { NativeDBConnection, type SqliteNativeDBOptions } from './db';
export class SqliteIndexerSyncStorage extends IndexerSyncStorageBase {
static readonly identifier = 'SqliteIndexerSyncStorage';
override connection = share(new NativeDBConnection(this.options));
constructor(private readonly options: SqliteNativeDBOptions) {
super();
}
private get db() {
return this.connection.apis;
}
override async getDocIndexedClock(
docId: string
): Promise<DocIndexedClock | null> {
return this.db.getDocIndexedClock(docId);
}
override async setDocIndexedClock(clock: DocIndexedClock): Promise<void> {
await this.db.setDocIndexedClock(
clock.docId,
clock.timestamp,
clock.indexerVersion
);
}
override async clearDocIndexedClock(docId: string): Promise<void> {
await this.db.clearDocIndexedClock(docId);
}
}
@@ -92,4 +92,5 @@ export * from './doc-sync';
export * from './errors';
export * from './history';
export * from './indexer';
export * from './indexer-sync';
export * from './storage';
@@ -1,4 +1,4 @@
import type { CrawlResult } from '@affine/nbstore';
import type { CrawlResult, DocIndexedClock } from '@affine/nbstore';
export interface Blob {
key: string;
@@ -184,7 +184,21 @@ export interface NbStorePlugin {
indexName: string;
docId: string;
query: string;
}) => Promise<{ matches: Array<{ start: number; end: number }> }>;
}) => Promise<{ matches: { start: number; end: number }[] }>;
ftsFlushIndex: (options: { id: string }) => Promise<void>;
ftsIndexVersion: () => Promise<{ indexVersion: number }>;
getDocIndexedClock: (options: {
id: string;
docId: string;
}) => Promise<DocIndexedClock | null>;
setDocIndexedClock: (options: {
id: string;
docId: string;
indexedClock: number;
indexerVersion: number;
}) => Promise<void>;
clearDocIndexedClock: (options: {
id: string;
docId: string;
}) => Promise<void>;
}
@@ -4,7 +4,9 @@ import {
} from '@affine/core/modules/workspace-engine';
import {
type BlobRecord,
type CrawlResult,
type DocClock,
type DocIndexedClock,
type DocRecord,
type ListedBlobRecord,
parseUniversalId,
@@ -321,7 +323,7 @@ export const NbStoreNativeDBApis: NativeDBApis = {
peer,
blobId,
});
return result?.uploadedAt ? new Date(result.uploadedAt) : null;
return result.uploadedAt ? new Date(result.uploadedAt) : null;
},
setBlobUploadedAt: async function (
id: string,
@@ -336,8 +338,11 @@ export const NbStoreNativeDBApis: NativeDBApis = {
uploadedAt: uploadedAt ? uploadedAt.getTime() : null,
});
},
crawlDocData: async function (id: string, docId: string) {
return NbStore.crawlDocData({ id, docId });
crawlDocData: async function (
id: string,
docId: string
): Promise<CrawlResult> {
return await NbStore.crawlDocData({ id, docId });
},
ftsAddDocument: async function (
id: string,
@@ -411,4 +416,29 @@ export const NbStoreNativeDBApis: NativeDBApis = {
ftsIndexVersion: function (): Promise<number> {
return NbStore.ftsIndexVersion().then(res => res.indexVersion);
},
getDocIndexedClock: function (
id: string,
docId: string
): Promise<DocIndexedClock | null> {
return NbStore.getDocIndexedClock({ id, docId });
},
setDocIndexedClock: function (
id: string,
docId: string,
indexedClock: Date,
indexerVersion: number
): Promise<void> {
return NbStore.setDocIndexedClock({
id,
docId,
indexedClock: indexedClock.getTime(),
indexerVersion,
});
},
clearDocIndexedClock: function (id: string, docId: string): Promise<void> {
return NbStore.clearDocIndexedClock({
id,
docId,
});
},
};
@@ -3,7 +3,6 @@ import '@affine/core/bootstrap/electron';
import { apis } from '@affine/electron-api';
import { broadcastChannelStorages } from '@affine/nbstore/broadcast-channel';
import { cloudStorages } from '@affine/nbstore/cloud';
import { idbStoragesIndexerOnly } from '@affine/nbstore/idb';
import { bindNativeDBApis, sqliteStorages } from '@affine/nbstore/sqlite';
import {
bindNativeDBV1Apis,
@@ -21,7 +20,6 @@ bindNativeDBApis(apis!.nbstore);
bindNativeDBV1Apis(apis!.db);
const storeManager = new StoreManagerConsumer([
...idbStoragesIndexerOnly,
...sqliteStorages,
...sqliteV1Storages,
...broadcastChannelStorages,
@@ -30,6 +30,9 @@ export const nbstoreHandlers: NativeDBApis = {
deleteDoc: POOL.deleteDoc.bind(POOL),
getDocClocks: POOL.getDocClocks.bind(POOL),
getDocClock: POOL.getDocClock.bind(POOL),
getDocIndexedClock: POOL.getDocIndexedClock.bind(POOL),
setDocIndexedClock: POOL.setDocIndexedClock.bind(POOL),
clearDocIndexedClock: POOL.clearDocIndexedClock.bind(POOL),
getBlob: POOL.getBlob.bind(POOL),
setBlob: POOL.setBlob.bind(POOL),
deleteBlob: POOL.deleteBlob.bind(POOL),
@@ -127,6 +127,12 @@ const needRefererDomains = [
/^(?:[a-zA-Z0-9-]+\.)*googlevideo\.com$/,
];
const defaultReferer = 'https://client.affine.local/';
const affineDomains = [
/^(?:[a-z0-9-]+\.)*usercontent\.affine\.pro$/i,
/^(?:[a-z0-9-]+\.)*affine\.pro$/i,
/^(?:[a-z0-9-]+\.)*affine\.fail$/i,
/^(?:[a-z0-9-]+\.)*affine\.run$/i,
];
function setHeader(
headers: Record<string, string[]>,
@@ -166,6 +172,17 @@ function ensureFrameAncestors(
});
}
function allowCors(headers: Record<string, string[]>) {
// Signed blob URLs redirect to *.usercontent.affine.pro without CORS headers.
setHeader(headers, 'Access-Control-Allow-Origin', '*');
setHeader(headers, 'Access-Control-Allow-Methods', 'GET, HEAD, OPTIONS');
setHeader(
headers,
'Access-Control-Allow-Headers',
'*, Authorization, Content-Type, Range'
);
}
export function registerProtocol() {
protocol.handle('assets', request => {
return handleFileRequest(request);
@@ -211,9 +228,9 @@ export function registerProtocol() {
}
}
const { protocol } = new URL(url);
const { protocol, hostname } = new URL(url);
// Only adjust CORS for assets responses; leave remote http(s) headers intact
// Adjust CORS for assets responses and allow blob redirects on affine domains
if (protocol === 'assets:') {
delete responseHeaders['access-control-allow-origin'];
delete responseHeaders['access-control-allow-headers'];
@@ -221,6 +238,11 @@ export function registerProtocol() {
delete responseHeaders['Access-Control-Allow-Headers'];
setHeader(responseHeaders, 'X-Frame-Options', 'SAMEORIGIN');
ensureFrameAncestors(responseHeaders, "'self'");
} else if (
(protocol === 'http:' || protocol === 'https:') &&
affineDomains.some(regex => regex.test(hostname))
) {
allowCors(responseHeaders);
}
}
})()
@@ -1,4 +1,4 @@
import type { CrawlResult } from '@affine/nbstore';
import type { CrawlResult, DocIndexedClock } from '@affine/nbstore';
export interface Blob {
key: string;
@@ -187,4 +187,18 @@ export interface NbStorePlugin {
}) => Promise<{ matches: { start: number; end: number }[] }>;
ftsFlushIndex: (options: { id: string }) => Promise<void>;
ftsIndexVersion: () => Promise<{ indexVersion: number }>;
getDocIndexedClock: (options: {
id: string;
docId: string;
}) => Promise<DocIndexedClock | null>;
setDocIndexedClock: (options: {
id: string;
docId: string;
indexedClock: number;
indexerVersion: number;
}) => Promise<void>;
clearDocIndexedClock: (options: {
id: string;
docId: string;
}) => Promise<void>;
}
@@ -6,6 +6,7 @@ import {
type BlobRecord,
type CrawlResult,
type DocClock,
type DocIndexedClock,
type DocRecord,
type ListedBlobRecord,
parseUniversalId,
@@ -415,4 +416,29 @@ export const NbStoreNativeDBApis: NativeDBApis = {
ftsIndexVersion: function (): Promise<number> {
return NbStore.ftsIndexVersion().then(res => res.indexVersion);
},
getDocIndexedClock: function (
id: string,
docId: string
): Promise<DocIndexedClock | null> {
return NbStore.getDocIndexedClock({ id, docId });
},
setDocIndexedClock: function (
id: string,
docId: string,
indexedClock: Date,
indexerVersion: number
): Promise<void> {
return NbStore.setDocIndexedClock({
id,
docId,
indexedClock: indexedClock.getTime(),
indexerVersion,
});
},
clearDocIndexedClock: function (id: string, docId: string): Promise<void> {
return NbStore.clearDocIndexedClock({
id,
docId,
});
},
};
@@ -20,6 +20,7 @@ import {
IndexedDBDocStorage,
IndexedDBDocSyncStorage,
IndexedDBIndexerStorage,
IndexedDBIndexerSyncStorage,
} from '@affine/nbstore/idb';
import {
IndexedDBV1BlobStorage,
@@ -31,6 +32,7 @@ import {
SqliteDocStorage,
SqliteDocSyncStorage,
SqliteIndexerStorage,
SqliteIndexerSyncStorage,
} from '@affine/nbstore/sqlite';
import {
SqliteV1BlobStorage,
@@ -136,6 +138,9 @@ class CloudWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
BUILD_CONFIG.isElectron || BUILD_CONFIG.isIOS || BUILD_CONFIG.isAndroid
? SqliteIndexerStorage
: IndexedDBIndexerStorage;
IndexerSyncStorageType = BUILD_CONFIG.isElectron
? SqliteIndexerSyncStorage
: IndexedDBIndexerSyncStorage;
async deleteWorkspace(id: string): Promise<void> {
await this.graphqlService.gql({
@@ -495,7 +500,7 @@ class CloudWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
},
},
indexerSync: {
name: 'IndexedDBIndexerSyncStorage',
name: this.IndexerSyncStorageType.identifier,
opts: {
flavour: this.flavour,
type: 'workspace',
@@ -11,6 +11,7 @@ import {
IndexedDBDocStorage,
IndexedDBDocSyncStorage,
IndexedDBIndexerStorage,
IndexedDBIndexerSyncStorage,
} from '@affine/nbstore/idb';
import {
IndexedDBV1BlobStorage,
@@ -22,6 +23,7 @@ import {
SqliteDocStorage,
SqliteDocSyncStorage,
SqliteIndexerStorage,
SqliteIndexerSyncStorage,
} from '@affine/nbstore/sqlite';
import {
SqliteV1BlobStorage,
@@ -113,6 +115,9 @@ class LocalWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
BUILD_CONFIG.isElectron || BUILD_CONFIG.isIOS || BUILD_CONFIG.isAndroid
? SqliteIndexerStorage
: IndexedDBIndexerStorage;
IndexerSyncStorageType = BUILD_CONFIG.isElectron
? SqliteIndexerSyncStorage
: IndexedDBIndexerSyncStorage;
async deleteWorkspace(id: string): Promise<void> {
setLocalWorkspaceIds(ids => ids.filter(x => x !== id));
@@ -365,7 +370,7 @@ class LocalWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
},
},
indexerSync: {
name: 'IndexedDBIndexerSyncStorage',
name: this.IndexerSyncStorageType.identifier,
opts: {
flavour: this.flavour,
type: 'workspace',
+9
View File
@@ -65,6 +65,9 @@ export declare class DocStoragePool {
deleteDoc(universalId: string, docId: string): Promise<void>
getDocClocks(universalId: string, after?: Date | undefined | null): Promise<Array<DocClock>>
getDocClock(universalId: string, docId: string): Promise<DocClock | null>
getDocIndexedClock(universalId: string, docId: string): Promise<DocIndexedClock | null>
setDocIndexedClock(universalId: string, docId: string, indexedClock: Date, indexerVersion: number): Promise<void>
clearDocIndexedClock(universalId: string, docId: string): Promise<void>
getBlob(universalId: string, key: string): Promise<Blob | null>
setBlob(universalId: string, blob: SetBlob): Promise<void>
deleteBlob(universalId: string, key: string, permanently: boolean): Promise<void>
@@ -104,6 +107,12 @@ export interface DocClock {
timestamp: Date
}
export interface DocIndexedClock {
docId: string
timestamp: Date
indexerVersion: number
}
export interface DocRecord {
docId: string
bin: Uint8Array
@@ -41,6 +41,11 @@ impl SqliteDocStorage {
.bind(&meta.space_id)
.execute(&self.pool)
.await?;
sqlx::query("UPDATE indexer_sync SET doc_id = $1 WHERE doc_id = $2;")
.bind(&space_id)
.bind(&meta.space_id)
.execute(&self.pool)
.await?;
sqlx::query("UPDATE peer_clocks SET doc_id = $1 WHERE doc_id = $2;")
.bind(&space_id)
@@ -207,6 +212,11 @@ impl SqliteDocStorage {
.execute(&mut *tx)
.await?;
sqlx::query("DELETE FROM indexer_sync WHERE doc_id = ?;")
.bind(&doc_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
@@ -0,0 +1,111 @@
use chrono::NaiveDateTime;
use super::{error::Result, storage::SqliteDocStorage, DocIndexedClock};
impl SqliteDocStorage {
pub async fn get_doc_indexed_clock(&self, doc_id: String) -> Result<Option<DocIndexedClock>> {
let record = sqlx::query!(
r#"SELECT doc_id, indexed_clock as "indexed_clock: NaiveDateTime", indexer_version
FROM indexer_sync WHERE doc_id = ?"#,
doc_id
)
.fetch_optional(&self.pool)
.await?;
Ok(record.map(|rec| DocIndexedClock {
doc_id: rec.doc_id,
timestamp: rec.indexed_clock,
indexer_version: rec.indexer_version,
}))
}
pub async fn set_doc_indexed_clock(
&self,
doc_id: String,
indexed_clock: NaiveDateTime,
indexer_version: i64,
) -> Result<()> {
sqlx::query(
r#"
INSERT INTO indexer_sync (doc_id, indexed_clock, indexer_version)
VALUES ($1, $2, $3)
ON CONFLICT(doc_id)
DO UPDATE SET indexed_clock=$2, indexer_version=$3;
"#,
)
.bind(doc_id)
.bind(indexed_clock)
.bind(indexer_version)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn clear_doc_indexed_clock(&self, doc_id: String) -> Result<()> {
sqlx::query("DELETE FROM indexer_sync WHERE doc_id = ?;")
.bind(doc_id)
.execute(&self.pool)
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use chrono::Utc;
use super::*;
async fn get_storage() -> SqliteDocStorage {
let storage = SqliteDocStorage::new(":memory:".to_string());
storage.connect().await.unwrap();
storage
}
#[tokio::test]
async fn set_and_get_indexed_clock() {
let storage = get_storage().await;
let ts = Utc::now().naive_utc();
storage
.set_doc_indexed_clock("doc1".to_string(), ts, 1)
.await
.unwrap();
let clock = storage
.get_doc_indexed_clock("doc1".to_string())
.await
.unwrap()
.unwrap();
assert_eq!(clock.doc_id, "doc1");
assert_eq!(clock.timestamp, ts);
assert_eq!(clock.indexer_version, 1);
}
#[tokio::test]
async fn clear_indexed_clock() {
let storage = get_storage().await;
let ts = Utc::now().naive_utc();
storage
.set_doc_indexed_clock("doc1".to_string(), ts, 1)
.await
.unwrap();
storage
.clear_doc_indexed_clock("doc1".to_string())
.await
.unwrap();
let clock = storage
.get_doc_indexed_clock("doc1".to_string())
.await
.unwrap();
assert!(clock.is_none());
}
}
@@ -4,6 +4,7 @@ pub mod doc;
pub mod doc_sync;
pub mod error;
pub mod indexer;
pub mod indexer_sync;
pub mod pool;
pub mod storage;
@@ -55,6 +56,14 @@ pub struct DocClock {
pub timestamp: NaiveDateTime,
}
#[derive(Debug)]
#[napi(object)]
pub struct DocIndexedClock {
pub doc_id: String,
pub timestamp: NaiveDateTime,
pub indexer_version: i64,
}
#[napi(object)]
pub struct SetBlob {
pub key: String,
@@ -235,6 +244,47 @@ impl DocStoragePool {
Ok(self.get(universal_id).await?.get_doc_clock(doc_id).await?)
}
#[napi]
pub async fn get_doc_indexed_clock(
&self,
universal_id: String,
doc_id: String,
) -> Result<Option<DocIndexedClock>> {
Ok(
self
.get(universal_id)
.await?
.get_doc_indexed_clock(doc_id)
.await?,
)
}
#[napi]
pub async fn set_doc_indexed_clock(
&self,
universal_id: String,
doc_id: String,
indexed_clock: NaiveDateTime,
indexer_version: i64,
) -> Result<()> {
self
.get(universal_id)
.await?
.set_doc_indexed_clock(doc_id, indexed_clock, indexer_version)
.await?;
Ok(())
}
#[napi]
pub async fn clear_doc_indexed_clock(&self, universal_id: String, doc_id: String) -> Result<()> {
self
.get(universal_id)
.await?
.clear_doc_indexed_clock(doc_id)
.await?;
Ok(())
}
#[napi(async_runtime)]
pub async fn get_blob(&self, universal_id: String, key: String) -> Result<Option<Blob>> {
Ok(self.get(universal_id).await?.get_blob(key).await?)
@@ -82,6 +82,18 @@ CREATE TABLE idx_snapshots (
index_name TEXT PRIMARY KEY NOT NULL,
data BLOB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);
"#,
None,
),
// add indexer sync table
(
"add_indexer_sync",
r#"
CREATE TABLE "indexer_sync" (
doc_id VARCHAR PRIMARY KEY NOT NULL,
indexed_clock TIMESTAMP NOT NULL DEFAULT 0,
indexer_version INTEGER NOT NULL DEFAULT 0
);
"#,
None,