mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-12 04:18:54 +00:00
feat: sqlite subdocument (#2816)
Co-authored-by: Alex Yang <himself65@outlook.com>
This commit is contained in:
@@ -12,6 +12,7 @@ import type { PlaywrightTestConfig } from '@playwright/test';
|
||||
*/
|
||||
const config: PlaywrightTestConfig = {
|
||||
testDir: './tests',
|
||||
testIgnore: '**/lib/**',
|
||||
fullyParallel: true,
|
||||
timeout: process.env.CI ? 50_000 : 30_000,
|
||||
use: {
|
||||
|
||||
@@ -20,14 +20,31 @@ afterEach(async () => {
|
||||
await fs.remove(tmpDir);
|
||||
});
|
||||
|
||||
let testYDoc: Y.Doc;
|
||||
let testYSubDoc: Y.Doc;
|
||||
|
||||
function getTestUpdates() {
|
||||
const testYDoc = new Y.Doc();
|
||||
testYDoc = new Y.Doc();
|
||||
const yText = testYDoc.getText('test');
|
||||
yText.insert(0, 'hello');
|
||||
|
||||
testYSubDoc = new Y.Doc();
|
||||
testYDoc.getMap('subdocs').set('test-subdoc', testYSubDoc);
|
||||
|
||||
const updates = Y.encodeStateAsUpdate(testYDoc);
|
||||
|
||||
return updates;
|
||||
}
|
||||
|
||||
function getTestSubDocUpdates() {
|
||||
const yText = testYSubDoc.getText('test');
|
||||
yText.insert(0, 'hello');
|
||||
|
||||
const updates = Y.encodeStateAsUpdate(testYSubDoc);
|
||||
|
||||
return updates;
|
||||
}
|
||||
|
||||
test('can create new db file if not exists', async () => {
|
||||
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
|
||||
const workspaceId = v4();
|
||||
@@ -68,6 +85,31 @@ test('on applyUpdate (from renderer), will trigger update', async () => {
|
||||
await db.destroy();
|
||||
});
|
||||
|
||||
test('on applyUpdate (from renderer, subdoc), will trigger update', async () => {
|
||||
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
|
||||
const workspaceId = v4();
|
||||
const onUpdate = vi.fn();
|
||||
const insertUpdates = vi.fn();
|
||||
|
||||
const db = await openWorkspaceDatabase(workspaceId);
|
||||
db.applyUpdate(getTestUpdates(), 'renderer');
|
||||
|
||||
db.db!.insertUpdates = insertUpdates;
|
||||
db.update$.subscribe(onUpdate);
|
||||
|
||||
const subdocUpdates = getTestSubDocUpdates();
|
||||
db.applyUpdate(subdocUpdates, 'renderer', testYSubDoc.guid);
|
||||
|
||||
expect(onUpdate).toHaveBeenCalled();
|
||||
expect(insertUpdates).toHaveBeenCalledWith([
|
||||
{
|
||||
docId: testYSubDoc.guid,
|
||||
data: subdocUpdates,
|
||||
},
|
||||
]);
|
||||
await db.destroy();
|
||||
});
|
||||
|
||||
test('on applyUpdate (from external), will trigger update & send external update event', async () => {
|
||||
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
|
||||
const workspaceId = v4();
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { SqliteConnection } from '@affine/native';
|
||||
import { type InsertRow, SqliteConnection } from '@affine/native';
|
||||
|
||||
import { logger } from '../logger';
|
||||
|
||||
@@ -79,21 +79,34 @@ export abstract class BaseSQLiteAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
async getUpdates() {
|
||||
async getUpdates(docId?: string) {
|
||||
try {
|
||||
if (!this.db) {
|
||||
logger.warn(`${this.path} is not connected`);
|
||||
return [];
|
||||
}
|
||||
return await this.db.getUpdates();
|
||||
return await this.db.getUpdates(docId);
|
||||
} catch (error) {
|
||||
logger.error('getUpdates', error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async getAllUpdates() {
|
||||
try {
|
||||
if (!this.db) {
|
||||
logger.warn(`${this.path} is not connected`);
|
||||
return [];
|
||||
}
|
||||
return await this.db.getAllUpdates();
|
||||
} catch (error) {
|
||||
logger.error('getAllUpdates', error);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
// add a single update to SQLite
|
||||
async addUpdateToSQLite(updates: Uint8Array[]) {
|
||||
async addUpdateToSQLite(updates: InsertRow[]) {
|
||||
// batch write instead write per key stroke?
|
||||
try {
|
||||
if (!this.db) {
|
||||
|
||||
@@ -7,13 +7,17 @@ export * from './ensure-db';
|
||||
export * from './subjects';
|
||||
|
||||
export const dbHandlers = {
|
||||
getDocAsUpdates: async (id: string) => {
|
||||
const workspaceDB = await ensureSQLiteDB(id);
|
||||
return workspaceDB.getDocAsUpdates();
|
||||
getDocAsUpdates: async (workspaceId: string, subdocId?: string) => {
|
||||
const workspaceDB = await ensureSQLiteDB(workspaceId);
|
||||
return workspaceDB.getDocAsUpdates(subdocId);
|
||||
},
|
||||
applyDocUpdate: async (id: string, update: Uint8Array) => {
|
||||
const workspaceDB = await ensureSQLiteDB(id);
|
||||
return workspaceDB.applyUpdate(update);
|
||||
applyDocUpdate: async (
|
||||
workspaceId: string,
|
||||
update: Uint8Array,
|
||||
subdocId?: string
|
||||
) => {
|
||||
const workspaceDB = await ensureSQLiteDB(workspaceId);
|
||||
return workspaceDB.applyUpdate(update, 'renderer', subdocId);
|
||||
},
|
||||
addBlob: async (workspaceId: string, key: string, data: Uint8Array) => {
|
||||
const workspaceDB = await ensureSQLiteDB(workspaceId);
|
||||
@@ -38,7 +42,11 @@ export const dbHandlers = {
|
||||
|
||||
export const dbEvents = {
|
||||
onExternalUpdate: (
|
||||
fn: (update: { workspaceId: string; update: Uint8Array }) => void
|
||||
fn: (update: {
|
||||
workspaceId: string;
|
||||
update: Uint8Array;
|
||||
docId?: string;
|
||||
}) => void
|
||||
) => {
|
||||
const sub = dbSubjects.externalUpdate.subscribe(fn);
|
||||
return () => {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import assert from 'node:assert';
|
||||
|
||||
import type { SqliteConnection } from '@affine/native';
|
||||
import type { InsertRow } from '@affine/native';
|
||||
import { debounce } from 'lodash-es';
|
||||
import * as Y from 'yjs';
|
||||
|
||||
@@ -8,19 +8,19 @@ import { logger } from '../logger';
|
||||
import type { YOrigin } from '../type';
|
||||
import { getWorkspaceMeta } from '../workspace';
|
||||
import { BaseSQLiteAdapter } from './base-db-adapter';
|
||||
import { mergeUpdate } from './merge-update';
|
||||
import type { WorkspaceSQLiteDB } from './workspace-db-adapter';
|
||||
|
||||
const FLUSH_WAIT_TIME = 5000;
|
||||
const FLUSH_MAX_WAIT_TIME = 10000;
|
||||
|
||||
// todo: trim db when it is too big
|
||||
export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
role = 'secondary';
|
||||
yDoc = new Y.Doc();
|
||||
firstConnected = false;
|
||||
destroyed = false;
|
||||
|
||||
updateQueue: Uint8Array[] = [];
|
||||
updateQueue: { data: Uint8Array; docId?: string }[] = [];
|
||||
|
||||
unsubscribers = new Set<() => void>();
|
||||
|
||||
@@ -29,10 +29,23 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
public upstream: WorkspaceSQLiteDB
|
||||
) {
|
||||
super(path);
|
||||
this.setupAndListen();
|
||||
this.init();
|
||||
logger.debug('[SecondaryWorkspaceSQLiteDB] created', this.workspaceId);
|
||||
}
|
||||
|
||||
getDoc(docId?: string) {
|
||||
if (!docId) {
|
||||
return this.yDoc;
|
||||
}
|
||||
// this should be pretty fast and we don't need to cache it
|
||||
for (const subdoc of this.yDoc.subdocs) {
|
||||
if (subdoc.guid === docId) {
|
||||
return subdoc;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
override async destroy() {
|
||||
await this.flushUpdateQueue();
|
||||
this.unsubscribers.forEach(unsub => unsub());
|
||||
@@ -47,7 +60,7 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
|
||||
// do not update db immediately, instead, push to a queue
|
||||
// and flush the queue in a future time
|
||||
async addUpdateToUpdateQueue(db: SqliteConnection, update: Uint8Array) {
|
||||
async addUpdateToUpdateQueue(update: InsertRow) {
|
||||
this.updateQueue.push(update);
|
||||
await this.debouncedFlush();
|
||||
}
|
||||
@@ -101,55 +114,82 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
setupAndListen() {
|
||||
if (this.firstConnected) {
|
||||
setupListener(docId?: string) {
|
||||
const doc = this.getDoc(docId);
|
||||
if (!doc) {
|
||||
return;
|
||||
}
|
||||
this.firstConnected = true;
|
||||
|
||||
const onUpstreamUpdate = (update: Uint8Array, origin: YOrigin) => {
|
||||
if (origin === 'renderer') {
|
||||
// update to upstream yDoc should be replicated to self yDoc
|
||||
this.applyUpdate(update, 'upstream');
|
||||
this.applyUpdate(update, 'upstream', docId);
|
||||
}
|
||||
};
|
||||
|
||||
const onSelfUpdate = async (update: Uint8Array, origin: YOrigin) => {
|
||||
// for self update from upstream, we need to push it to external DB
|
||||
if (origin === 'upstream' && this.db) {
|
||||
await this.addUpdateToUpdateQueue(this.db, update);
|
||||
if (origin === 'upstream') {
|
||||
await this.addUpdateToUpdateQueue({
|
||||
data: update,
|
||||
docId,
|
||||
});
|
||||
}
|
||||
|
||||
if (origin === 'self') {
|
||||
this.upstream.applyUpdate(update, 'external');
|
||||
this.upstream.applyUpdate(update, 'external', docId);
|
||||
}
|
||||
};
|
||||
|
||||
const onSubdocs = ({ added }: { added: Set<Y.Doc> }) => {
|
||||
added.forEach(subdoc => {
|
||||
this.setupListener(subdoc.guid);
|
||||
});
|
||||
};
|
||||
|
||||
// listen to upstream update
|
||||
this.upstream.yDoc.on('update', onUpstreamUpdate);
|
||||
this.yDoc.on('update', onSelfUpdate);
|
||||
this.yDoc.on('subdocs', onSubdocs);
|
||||
|
||||
this.unsubscribers.add(() => {
|
||||
this.upstream.yDoc.off('update', onUpstreamUpdate);
|
||||
this.yDoc.off('update', onSelfUpdate);
|
||||
this.yDoc.off('subdocs', onSubdocs);
|
||||
});
|
||||
|
||||
this.run(() => {
|
||||
// apply all updates from upstream
|
||||
const upstreamUpdate = this.upstream.getDocAsUpdates();
|
||||
// to initialize the yDoc, we need to apply all updates from the db
|
||||
this.applyUpdate(upstreamUpdate, 'upstream');
|
||||
})
|
||||
.then(() => {
|
||||
logger.debug('run success');
|
||||
})
|
||||
.catch(err => {
|
||||
logger.error('run error', err);
|
||||
});
|
||||
}
|
||||
|
||||
applyUpdate = (data: Uint8Array, origin: YOrigin = 'upstream') => {
|
||||
Y.applyUpdate(this.yDoc, data, origin);
|
||||
init() {
|
||||
if (this.firstConnected) {
|
||||
return;
|
||||
}
|
||||
this.firstConnected = true;
|
||||
this.setupListener();
|
||||
// apply all updates from upstream
|
||||
// we assume here that the upstream ydoc is already sync'ed
|
||||
const syncUpstreamDoc = (docId?: string) => {
|
||||
const update = this.upstream.getDocAsUpdates(docId);
|
||||
if (update) {
|
||||
this.applyUpdate(update, 'upstream');
|
||||
}
|
||||
};
|
||||
syncUpstreamDoc();
|
||||
this.upstream.yDoc.subdocs.forEach(subdoc => {
|
||||
syncUpstreamDoc(subdoc.guid);
|
||||
});
|
||||
}
|
||||
|
||||
applyUpdate = (
|
||||
data: Uint8Array,
|
||||
origin: YOrigin = 'upstream',
|
||||
docId?: string
|
||||
) => {
|
||||
const doc = this.getDoc(docId);
|
||||
if (doc) {
|
||||
Y.applyUpdate(this.yDoc, data, origin);
|
||||
} else {
|
||||
logger.warn('applyUpdate: doc not found', docId);
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: have a better solution to handle blobs
|
||||
@@ -186,23 +226,33 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
async pull() {
|
||||
const start = performance.now();
|
||||
assert(this.upstream.db, 'upstream db should be connected');
|
||||
const updates = await this.run(async () => {
|
||||
const rows = await this.run(async () => {
|
||||
// TODO: no need to get all updates, just get the latest ones (using a cursor, etc)?
|
||||
await this.syncBlobs();
|
||||
return (await this.getUpdates()).map(update => update.data);
|
||||
return await this.getAllUpdates();
|
||||
});
|
||||
|
||||
if (!updates || this.destroyed) {
|
||||
if (!rows || this.destroyed) {
|
||||
return;
|
||||
}
|
||||
|
||||
const merged = mergeUpdate(updates);
|
||||
this.applyUpdate(merged, 'self');
|
||||
// apply root doc first
|
||||
rows.forEach(row => {
|
||||
if (!row.docId) {
|
||||
this.applyUpdate(row.data, 'self');
|
||||
}
|
||||
});
|
||||
|
||||
rows.forEach(row => {
|
||||
if (row.docId) {
|
||||
this.applyUpdate(row.data, 'self', row.docId);
|
||||
}
|
||||
});
|
||||
|
||||
logger.debug(
|
||||
'pull external updates',
|
||||
this.path,
|
||||
updates.length,
|
||||
rows.length,
|
||||
(performance.now() - start).toFixed(2),
|
||||
'ms'
|
||||
);
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
import { Subject } from 'rxjs';
|
||||
|
||||
export const dbSubjects = {
|
||||
externalUpdate: new Subject<{ workspaceId: string; update: Uint8Array }>(),
|
||||
externalUpdate: new Subject<{
|
||||
workspaceId: string;
|
||||
update: Uint8Array;
|
||||
docId?: string;
|
||||
}>(),
|
||||
};
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import type { InsertRow } from '@affine/native';
|
||||
import { debounce } from 'lodash-es';
|
||||
import { Subject } from 'rxjs';
|
||||
import * as Y from 'yjs';
|
||||
|
||||
@@ -5,9 +7,10 @@ import { logger } from '../logger';
|
||||
import type { YOrigin } from '../type';
|
||||
import { getWorkspaceMeta } from '../workspace';
|
||||
import { BaseSQLiteAdapter } from './base-db-adapter';
|
||||
import { mergeUpdate } from './merge-update';
|
||||
import { dbSubjects } from './subjects';
|
||||
|
||||
const TRIM_SIZE = 500;
|
||||
|
||||
export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
role = 'primary';
|
||||
yDoc = new Y.Doc();
|
||||
@@ -28,33 +31,76 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
this.firstConnected = false;
|
||||
}
|
||||
|
||||
getDoc(docId?: string) {
|
||||
if (!docId) {
|
||||
return this.yDoc;
|
||||
}
|
||||
// this should be pretty fast and we don't need to cache it
|
||||
for (const subdoc of this.yDoc.subdocs) {
|
||||
if (subdoc.guid === docId) {
|
||||
return subdoc;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
getWorkspaceName = () => {
|
||||
return this.yDoc.getMap('space:meta').get('name') as string;
|
||||
return this.yDoc.getMap('meta').get('name') as string;
|
||||
};
|
||||
|
||||
setupListener(docId?: string) {
|
||||
const doc = this.getDoc(docId);
|
||||
if (doc) {
|
||||
const onUpdate = async (update: Uint8Array, origin: YOrigin) => {
|
||||
const insertRows = [{ data: update, docId }];
|
||||
if (origin === 'renderer') {
|
||||
await this.addUpdateToSQLite(insertRows);
|
||||
} else if (origin === 'external') {
|
||||
dbSubjects.externalUpdate.next({
|
||||
workspaceId: this.workspaceId,
|
||||
update,
|
||||
docId,
|
||||
});
|
||||
await this.addUpdateToSQLite(insertRows);
|
||||
logger.debug('external update', this.workspaceId);
|
||||
}
|
||||
};
|
||||
const onSubdocs = ({ added }: { added: Set<Y.Doc> }) => {
|
||||
added.forEach(subdoc => {
|
||||
this.setupListener(subdoc.guid);
|
||||
});
|
||||
};
|
||||
|
||||
doc.on('update', onUpdate);
|
||||
doc.on('subdocs', onSubdocs);
|
||||
} else {
|
||||
logger.error('setupListener: doc not found', docId);
|
||||
}
|
||||
}
|
||||
|
||||
async init() {
|
||||
const db = await super.connectIfNeeded();
|
||||
|
||||
if (!this.firstConnected) {
|
||||
this.yDoc.on('update', async (update: Uint8Array, origin: YOrigin) => {
|
||||
if (origin === 'renderer') {
|
||||
await this.addUpdateToSQLite([update]);
|
||||
} else if (origin === 'external') {
|
||||
dbSubjects.externalUpdate.next({
|
||||
workspaceId: this.workspaceId,
|
||||
update,
|
||||
});
|
||||
await this.addUpdateToSQLite([update]);
|
||||
logger.debug('external update', this.workspaceId);
|
||||
}
|
||||
});
|
||||
this.setupListener();
|
||||
}
|
||||
|
||||
const updates = await this.getUpdates();
|
||||
const merged = mergeUpdate(updates.map(update => update.data));
|
||||
const updates = await this.getAllUpdates();
|
||||
|
||||
// to initialize the yDoc, we need to apply all updates from the db
|
||||
this.applyUpdate(merged, 'self');
|
||||
// apply root first (without ID).
|
||||
// subdoc will be available after root is applied
|
||||
updates.forEach(update => {
|
||||
if (!update.docId) {
|
||||
this.applyUpdate(update.data, 'self');
|
||||
}
|
||||
});
|
||||
|
||||
// then, for all subdocs, apply the updates
|
||||
updates.forEach(update => {
|
||||
if (update.docId) {
|
||||
this.applyUpdate(update.data, 'self', update.docId);
|
||||
}
|
||||
});
|
||||
|
||||
this.firstConnected = true;
|
||||
this.update$.next();
|
||||
@@ -62,18 +108,32 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
return db;
|
||||
}
|
||||
|
||||
getDocAsUpdates = () => {
|
||||
return Y.encodeStateAsUpdate(this.yDoc);
|
||||
// unlike getUpdates, this will return updates in yDoc
|
||||
getDocAsUpdates = (docId?: string) => {
|
||||
const doc = docId ? this.getDoc(docId) : this.yDoc;
|
||||
if (doc) {
|
||||
return Y.encodeStateAsUpdate(doc);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
|
||||
// non-blocking and use yDoc to validate the update
|
||||
// after that, the update is added to the db
|
||||
applyUpdate = (data: Uint8Array, origin: YOrigin = 'renderer') => {
|
||||
applyUpdate = (
|
||||
data: Uint8Array,
|
||||
origin: YOrigin = 'renderer',
|
||||
docId?: string
|
||||
) => {
|
||||
// todo: trim the updates when the number of records is too large
|
||||
// 1. store the current ydoc state in the db
|
||||
// 2. then delete the old updates
|
||||
// yjs-idb will always trim the db for the first time after DB is loaded
|
||||
Y.applyUpdate(this.yDoc, data, origin);
|
||||
const doc = this.getDoc(docId);
|
||||
if (doc) {
|
||||
Y.applyUpdate(doc, data, origin);
|
||||
} else {
|
||||
logger.warn('applyUpdate: doc not found', docId);
|
||||
}
|
||||
};
|
||||
|
||||
override async addBlob(key: string, value: Uint8Array) {
|
||||
@@ -87,10 +147,30 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
await super.deleteBlob(key);
|
||||
}
|
||||
|
||||
override async addUpdateToSQLite(data: Uint8Array[]) {
|
||||
override async addUpdateToSQLite(data: InsertRow[]) {
|
||||
this.update$.next();
|
||||
data.forEach(row => {
|
||||
this.trimWhenNecessary(row.docId)?.catch(err => {
|
||||
logger.error('trimWhenNecessary failed', err);
|
||||
});
|
||||
});
|
||||
await super.addUpdateToSQLite(data);
|
||||
}
|
||||
|
||||
trimWhenNecessary = debounce(async (docId?: string) => {
|
||||
if (this.firstConnected) {
|
||||
const count = (await this.db?.getUpdatesCount(docId)) ?? 0;
|
||||
if (count > TRIM_SIZE) {
|
||||
logger.debug(`trim ${this.workspaceId}:${docId} ${count}`);
|
||||
const update = this.getDocAsUpdates(docId);
|
||||
if (update) {
|
||||
const insertRows = [{ data: update, docId }];
|
||||
await this.db?.replaceUpdates(docId, insertRows);
|
||||
logger.debug(`trim ${this.workspaceId}:${docId} successfully`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 1000);
|
||||
}
|
||||
|
||||
export async function openWorkspaceDatabase(workspaceId: string) {
|
||||
|
||||
@@ -34,7 +34,6 @@ export function registerProtocol() {
|
||||
const url = request.url.replace(/^file:\/\//, '');
|
||||
const realpath = toAbsolutePath(url);
|
||||
callback(realpath);
|
||||
console.log('interceptFileProtocol realpath', request.url, realpath);
|
||||
return true;
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user