mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-12 12:28:42 +00:00
perf: use lazy load provider for IDB and SQLITE (#3351)
This commit is contained in:
@@ -2,7 +2,6 @@ import { DebugLogger } from '@affine/debug';
|
||||
import type { LocalWorkspace, WorkspaceCRUD } from '@affine/env/workspace';
|
||||
import { WorkspaceFlavour } from '@affine/env/workspace';
|
||||
import { nanoid, Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
|
||||
import { createIndexedDBProvider } from '@toeverything/y-indexeddb';
|
||||
import { createJSONStorage } from 'jotai/utils';
|
||||
import { z } from 'zod';
|
||||
|
||||
@@ -75,12 +74,7 @@ export const CRUD: WorkspaceCRUD<WorkspaceFlavour.LOCAL> = {
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
const persistence = createIndexedDBProvider(blockSuiteWorkspace.doc);
|
||||
persistence.connect();
|
||||
await persistence.whenSynced.then(() => {
|
||||
persistence.disconnect();
|
||||
});
|
||||
// todo: do we need to persist doc to persistence datasource?
|
||||
saveWorkspaceToLocalStorage(id);
|
||||
return id;
|
||||
},
|
||||
|
||||
@@ -68,7 +68,15 @@ describe('download provider', () => {
|
||||
) as LocalIndexedDBDownloadProvider;
|
||||
provider.sync();
|
||||
await provider.whenReady;
|
||||
expect(workspace.doc.toJSON()).toEqual(prev);
|
||||
expect(workspace.doc.toJSON()).toEqual({
|
||||
...prev,
|
||||
// download provider only download the root doc
|
||||
spaces: {
|
||||
'space:page0': {
|
||||
blocks: {},
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -2,9 +2,11 @@ import type {
|
||||
SQLiteDBDownloadProvider,
|
||||
SQLiteProvider,
|
||||
} from '@affine/env/workspace';
|
||||
import { getDoc } from '@affine/y-provider';
|
||||
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
|
||||
import type { Y as YType } from '@blocksuite/store';
|
||||
import { uuidv4, Workspace } from '@blocksuite/store';
|
||||
import { setTimeout } from 'timers/promises';
|
||||
import { beforeEach, describe, expect, test, vi } from 'vitest';
|
||||
|
||||
import {
|
||||
@@ -30,18 +32,26 @@ const mockedAddBlob = vi.fn();
|
||||
vi.stubGlobal('window', {
|
||||
apis: {
|
||||
db: {
|
||||
getDocAsUpdates: async () => {
|
||||
return Y.encodeStateAsUpdate(offlineYdoc);
|
||||
getDocAsUpdates: async (workspaceId, guid) => {
|
||||
const subdoc = guid ? getDoc(offlineYdoc, guid) : offlineYdoc;
|
||||
if (!subdoc) {
|
||||
return false;
|
||||
}
|
||||
return Y.encodeStateAsUpdate(subdoc);
|
||||
},
|
||||
applyDocUpdate: async (id: string, update: Uint8Array) => {
|
||||
Y.applyUpdate(offlineYdoc, update, 'sqlite');
|
||||
applyDocUpdate: async (id, update, subdocId) => {
|
||||
const subdoc = subdocId ? getDoc(offlineYdoc, subdocId) : offlineYdoc;
|
||||
if (!subdoc) {
|
||||
return;
|
||||
}
|
||||
Y.applyUpdate(subdoc, update, 'sqlite');
|
||||
},
|
||||
getBlobKeys: async () => {
|
||||
// todo: may need to hack the way to get hash keys of blobs
|
||||
return [];
|
||||
},
|
||||
addBlob: mockedAddBlob,
|
||||
} satisfies Partial<NonNullable<typeof window.apis>['db']>,
|
||||
} as Partial<NonNullable<typeof window.apis>['db']>,
|
||||
},
|
||||
events: {
|
||||
db: {
|
||||
@@ -53,7 +63,7 @@ vi.stubGlobal('window', {
|
||||
};
|
||||
},
|
||||
},
|
||||
} satisfies Partial<NonNullable<typeof window.events>>,
|
||||
} as Partial<NonNullable<typeof window.events>>,
|
||||
});
|
||||
|
||||
vi.stubGlobal('environment', {
|
||||
@@ -84,48 +94,25 @@ beforeEach(() => {
|
||||
describe('SQLite download provider', () => {
|
||||
test('sync updates', async () => {
|
||||
// on connect, the updates from sqlite should be sync'ed to the existing ydoc
|
||||
// and ydoc should be sync'ed back to sqlite
|
||||
// Workspace.Y.applyUpdate(workspace.doc);
|
||||
workspace.doc.getText('text').insert(0, 'mem-hello');
|
||||
|
||||
expect(offlineYdoc.getText('text').toString()).toBe('sqlite-hello');
|
||||
|
||||
downloadProvider.sync();
|
||||
await downloadProvider.whenReady;
|
||||
|
||||
// depending on the nature of the sync, the data can be sync'ed in either direction
|
||||
const options = ['mem-hellosqlite-hello', 'sqlite-hellomem-hello'];
|
||||
const options = ['sqlite-hellomem-hello', 'mem-hellosqlite-hello'];
|
||||
const synced = options.filter(
|
||||
o => o === offlineYdoc.getText('text').toString()
|
||||
o => o === workspace.doc.getText('text').toString()
|
||||
);
|
||||
expect(synced.length).toBe(1);
|
||||
expect(workspace.doc.getText('text').toString()).toBe(synced[0]);
|
||||
|
||||
// workspace.doc.getText('text').insert(0, 'world');
|
||||
|
||||
// // check if the data are sync'ed
|
||||
// expect(offlineYdoc.getText('text').toString()).toBe('world' + synced[0]);
|
||||
});
|
||||
|
||||
test.fails('blobs will be synced to sqlite on connect', async () => {
|
||||
// mock bs.list
|
||||
const bin = new Uint8Array([1, 2, 3]);
|
||||
const blob = new Blob([bin]);
|
||||
workspace.blobs.list = vi.fn(async () => ['blob1']);
|
||||
workspace.blobs.get = vi.fn(async () => {
|
||||
return blob;
|
||||
});
|
||||
|
||||
downloadProvider.sync();
|
||||
await downloadProvider.whenReady;
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
|
||||
expect(mockedAddBlob).toBeCalledWith(id, 'blob1', bin);
|
||||
});
|
||||
|
||||
test('on db update', async () => {
|
||||
// there is no updates from sqlite for now
|
||||
test.skip('on db update', async () => {
|
||||
provider.connect();
|
||||
|
||||
await setTimeout(200);
|
||||
|
||||
offlineYdoc.getText('text').insert(0, 'sqlite-world');
|
||||
|
||||
// @ts-expect-error
|
||||
|
||||
@@ -10,7 +10,6 @@ import { createBroadcastChannelProvider } from '@blocksuite/store/providers/broa
|
||||
import {
|
||||
createIndexedDBProvider as create,
|
||||
downloadBinary,
|
||||
EarlyDisconnectError,
|
||||
} from '@toeverything/y-indexeddb';
|
||||
import type { Doc } from 'yjs';
|
||||
|
||||
@@ -40,17 +39,6 @@ const createIndexedDBBackgroundProvider: DocProviderCreator = (
|
||||
connect: () => {
|
||||
logger.info('connect indexeddb provider', id);
|
||||
indexeddbProvider.connect();
|
||||
indexeddbProvider.whenSynced
|
||||
.then(() => {
|
||||
connected = true;
|
||||
})
|
||||
.catch(error => {
|
||||
connected = false;
|
||||
if (error instanceof EarlyDisconnectError) {
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
});
|
||||
},
|
||||
disconnect: () => {
|
||||
assertExists(indexeddbProvider);
|
||||
@@ -61,7 +49,6 @@ const createIndexedDBBackgroundProvider: DocProviderCreator = (
|
||||
};
|
||||
};
|
||||
|
||||
const cache: WeakMap<Doc, Uint8Array> = new WeakMap();
|
||||
const indexedDBDownloadOrigin = 'indexeddb-download-provider';
|
||||
|
||||
const createIndexedDBDownloadProvider: DocProviderCreator = (
|
||||
@@ -74,18 +61,11 @@ const createIndexedDBDownloadProvider: DocProviderCreator = (
|
||||
_resolve = resolve;
|
||||
_reject = reject;
|
||||
});
|
||||
async function downloadBinaryRecursively(doc: Doc) {
|
||||
if (cache.has(doc)) {
|
||||
const binary = cache.get(doc) as Uint8Array;
|
||||
async function downloadAndApply(doc: Doc) {
|
||||
const binary = await downloadBinary(doc.guid);
|
||||
if (binary) {
|
||||
Y.applyUpdate(doc, binary, indexedDBDownloadOrigin);
|
||||
} else {
|
||||
const binary = await downloadBinary(doc.guid);
|
||||
if (binary) {
|
||||
Y.applyUpdate(doc, binary, indexedDBDownloadOrigin);
|
||||
cache.set(doc, binary);
|
||||
}
|
||||
}
|
||||
await Promise.all([...doc.subdocs].map(downloadBinaryRecursively));
|
||||
}
|
||||
return {
|
||||
flavour: 'local-indexeddb',
|
||||
@@ -98,7 +78,7 @@ const createIndexedDBDownloadProvider: DocProviderCreator = (
|
||||
},
|
||||
sync: () => {
|
||||
logger.info('sync indexeddb provider', id);
|
||||
downloadBinaryRecursively(doc).then(_resolve).catch(_reject);
|
||||
downloadAndApply(doc).then(_resolve).catch(_reject);
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
@@ -2,8 +2,10 @@ import type {
|
||||
SQLiteDBDownloadProvider,
|
||||
SQLiteProvider,
|
||||
} from '@affine/env/workspace';
|
||||
import { getDoc } from '@affine/y-provider';
|
||||
import { assertExists } from '@blocksuite/global/utils';
|
||||
import {
|
||||
createLazyProvider,
|
||||
type DatasourceDocAdapter,
|
||||
} from '@affine/y-provider';
|
||||
import type { DocProviderCreator } from '@blocksuite/store';
|
||||
import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
|
||||
import type { Doc } from 'yjs';
|
||||
@@ -14,32 +16,26 @@ const Y = BlockSuiteWorkspace.Y;
|
||||
|
||||
const sqliteOrigin = Symbol('sqlite-provider-origin');
|
||||
|
||||
type SubDocsEvent = {
|
||||
added: Set<Doc>;
|
||||
removed: Set<Doc>;
|
||||
loaded: Set<Doc>;
|
||||
};
|
||||
|
||||
// workaround: there maybe new updates before SQLite is connected
|
||||
// we need to exchange them with the SQLite db
|
||||
// will be removed later when we have lazy load doc provider
|
||||
const syncDiff = async (rootDoc: Doc, subdocId?: string) => {
|
||||
try {
|
||||
const workspaceId = rootDoc.guid;
|
||||
const doc = subdocId ? getDoc(rootDoc, subdocId) : rootDoc;
|
||||
if (!doc) {
|
||||
logger.error('doc not found', workspaceId, subdocId);
|
||||
return;
|
||||
}
|
||||
const update = await window.apis?.db.getDocAsUpdates(workspaceId, subdocId);
|
||||
const diff = Y.encodeStateAsUpdate(
|
||||
doc,
|
||||
Y.encodeStateVectorFromUpdate(update)
|
||||
);
|
||||
await window.apis.db.applyDocUpdate(workspaceId, diff, subdocId);
|
||||
} catch (err) {
|
||||
logger.error('failed to sync diff', err);
|
||||
const createDatasource = (workspaceId: string): DatasourceDocAdapter => {
|
||||
if (!window.apis?.db) {
|
||||
throw new Error('sqlite datasource is not available');
|
||||
}
|
||||
|
||||
return {
|
||||
queryDocState: async guid => {
|
||||
return window.apis.db.getDocAsUpdates(
|
||||
workspaceId,
|
||||
workspaceId === guid ? undefined : guid
|
||||
);
|
||||
},
|
||||
sendDocUpdate: async (guid, update) => {
|
||||
return window.apis.db.applyDocUpdate(
|
||||
guid,
|
||||
update,
|
||||
workspaceId === guid ? undefined : guid
|
||||
);
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -49,126 +45,27 @@ export const createSQLiteProvider: DocProviderCreator = (
|
||||
id,
|
||||
rootDoc
|
||||
): SQLiteProvider => {
|
||||
const { apis, events } = window;
|
||||
// make sure it is being used in Electron with APIs
|
||||
assertExists(apis);
|
||||
assertExists(events);
|
||||
|
||||
const updateHandlerMap = new WeakMap<
|
||||
Doc,
|
||||
(update: Uint8Array, origin: unknown) => void
|
||||
>();
|
||||
const subDocsHandlerMap = new WeakMap<Doc, (event: SubDocsEvent) => void>();
|
||||
|
||||
const createOrHandleUpdate = (doc: Doc) => {
|
||||
if (updateHandlerMap.has(doc)) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
return updateHandlerMap.get(doc)!;
|
||||
}
|
||||
|
||||
function handleUpdate(update: Uint8Array, origin: unknown) {
|
||||
if (origin === sqliteOrigin) {
|
||||
return;
|
||||
}
|
||||
const subdocId = doc.guid === id ? undefined : doc.guid;
|
||||
apis.db.applyDocUpdate(id, update, subdocId).catch(err => {
|
||||
logger.error(err);
|
||||
});
|
||||
}
|
||||
updateHandlerMap.set(doc, handleUpdate);
|
||||
return handleUpdate;
|
||||
};
|
||||
|
||||
const createOrGetHandleSubDocs = (doc: Doc) => {
|
||||
if (subDocsHandlerMap.has(doc)) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
return subDocsHandlerMap.get(doc)!;
|
||||
}
|
||||
function handleSubdocs(event: SubDocsEvent) {
|
||||
event.removed.forEach(doc => {
|
||||
untrackDoc(doc);
|
||||
});
|
||||
event.loaded.forEach(doc => {
|
||||
trackDoc(doc);
|
||||
});
|
||||
}
|
||||
subDocsHandlerMap.set(doc, handleSubdocs);
|
||||
return handleSubdocs;
|
||||
};
|
||||
|
||||
function trackDoc(doc: Doc) {
|
||||
syncDiff(rootDoc, rootDoc !== doc ? doc.guid : undefined).catch(
|
||||
logger.error
|
||||
);
|
||||
doc.on('update', createOrHandleUpdate(doc));
|
||||
doc.on('subdocs', createOrGetHandleSubDocs(doc));
|
||||
doc.subdocs.forEach(doc => {
|
||||
trackDoc(doc);
|
||||
});
|
||||
}
|
||||
|
||||
function untrackDoc(doc: Doc) {
|
||||
doc.subdocs.forEach(doc => {
|
||||
untrackDoc(doc);
|
||||
});
|
||||
doc.off('update', createOrHandleUpdate(doc));
|
||||
doc.off('subdocs', createOrGetHandleSubDocs(doc));
|
||||
}
|
||||
|
||||
let unsubscribe = () => {};
|
||||
let datasource: ReturnType<typeof createDatasource> | null = null;
|
||||
let provider: ReturnType<typeof createLazyProvider> | null = null;
|
||||
let connected = false;
|
||||
|
||||
const connect = () => {
|
||||
if (connected) {
|
||||
return;
|
||||
}
|
||||
logger.info('connecting sqlite provider', id);
|
||||
trackDoc(rootDoc);
|
||||
|
||||
unsubscribe = events.db.onExternalUpdate(
|
||||
({
|
||||
update,
|
||||
workspaceId,
|
||||
docId,
|
||||
}: {
|
||||
workspaceId: string;
|
||||
update: Uint8Array;
|
||||
docId?: string;
|
||||
}) => {
|
||||
if (workspaceId === id) {
|
||||
if (docId) {
|
||||
for (const doc of rootDoc.subdocs) {
|
||||
if (doc.guid === docId) {
|
||||
Y.applyUpdate(doc, update, sqliteOrigin);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Y.applyUpdate(rootDoc, update, sqliteOrigin);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
connected = true;
|
||||
logger.info('connecting sqlite done', id);
|
||||
};
|
||||
|
||||
const cleanup = () => {
|
||||
logger.info('disconnecting sqlite provider', id);
|
||||
unsubscribe();
|
||||
untrackDoc(rootDoc);
|
||||
connected = false;
|
||||
};
|
||||
|
||||
return {
|
||||
flavour: 'sqlite',
|
||||
passive: true,
|
||||
get connected(): boolean {
|
||||
connect: () => {
|
||||
datasource = createDatasource(id);
|
||||
provider = createLazyProvider(rootDoc, datasource);
|
||||
provider.connect();
|
||||
connected = true;
|
||||
},
|
||||
disconnect: () => {
|
||||
provider?.disconnect();
|
||||
datasource = null;
|
||||
provider = null;
|
||||
connected = false;
|
||||
},
|
||||
get connected() {
|
||||
return connected;
|
||||
},
|
||||
cleanup,
|
||||
connect,
|
||||
disconnect: cleanup,
|
||||
};
|
||||
};
|
||||
|
||||
@@ -180,7 +77,6 @@ export const createSQLiteDBDownloadProvider: DocProviderCreator = (
|
||||
rootDoc
|
||||
): SQLiteDBDownloadProvider => {
|
||||
const { apis } = window;
|
||||
let disconnected = false;
|
||||
|
||||
let _resolve: () => void;
|
||||
let _reject: (error: unknown) => void;
|
||||
@@ -194,33 +90,13 @@ export const createSQLiteDBDownloadProvider: DocProviderCreator = (
|
||||
const subdocId = doc.guid === id ? undefined : doc.guid;
|
||||
const updates = await apis.db.getDocAsUpdates(id, subdocId);
|
||||
|
||||
if (disconnected) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (updates) {
|
||||
Y.applyUpdate(doc, updates, sqliteOrigin);
|
||||
}
|
||||
|
||||
const mergedUpdates = Y.encodeStateAsUpdate(
|
||||
doc,
|
||||
Y.encodeStateVectorFromUpdate(updates)
|
||||
);
|
||||
|
||||
// also apply updates to sqlite
|
||||
await apis.db.applyDocUpdate(id, mergedUpdates, subdocId);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
async function syncAllUpdates(doc: Doc) {
|
||||
if (await syncUpdates(doc)) {
|
||||
// load all subdocs
|
||||
const subdocs = Array.from(doc.subdocs);
|
||||
await Promise.all(subdocs.map(syncAllUpdates));
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
flavour: 'sqlite-download',
|
||||
active: true,
|
||||
@@ -228,12 +104,12 @@ export const createSQLiteDBDownloadProvider: DocProviderCreator = (
|
||||
return promise;
|
||||
},
|
||||
cleanup: () => {
|
||||
disconnected = true;
|
||||
// todo
|
||||
},
|
||||
sync: async () => {
|
||||
logger.info('connect sqlite download provider', id);
|
||||
try {
|
||||
await syncAllUpdates(rootDoc);
|
||||
await syncUpdates(rootDoc);
|
||||
_resolve();
|
||||
} catch (error) {
|
||||
_reject(error);
|
||||
|
||||
Reference in New Issue
Block a user