feat: support sub-doc feature (#2774)

This commit is contained in:
Himself65
2023-06-14 23:22:35 +08:00
committed by GitHub
parent 8d5330df74
commit 5d75ceeeb5
5 changed files with 590 additions and 117 deletions

View File

@@ -28,8 +28,8 @@
"idb": "^7.1.1"
},
"devDependencies": {
"@blocksuite/blocks": "0.0.0-20230607055421-9b20fcaf-nightly",
"@blocksuite/store": "0.0.0-20230607055421-9b20fcaf-nightly",
"@blocksuite/blocks": "0.0.0-20230613142146-d72d4600-nightly",
"@blocksuite/store": "0.0.0-20230613142146-d72d4600-nightly",
"vite": "^4.3.9",
"vite-plugin-dts": "^2.3.0",
"y-indexeddb": "^9.0.11"

View File

@@ -3,8 +3,8 @@
*/
import 'fake-indexeddb/auto';
import { initEmptyPage } from '@affine/env/blocksuite';
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import type { Page } from '@blocksuite/store';
import { assertExists, uuidv4, Workspace } from '@blocksuite/store';
import { openDB } from 'idb';
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';
@@ -24,6 +24,21 @@ import {
setMergeCount,
} from '../index';
function initEmptyPage(page: Page) {
const pageBlockId = page.addBlock('affine:page', {
title: new page.Text(''),
});
const surfaceBlockId = page.addBlock('affine:surface', {}, pageBlockId);
const frameBLockId = page.addBlock('affine:frame', {}, pageBlockId);
const paragraphBlockId = page.addBlock('affine:paragraph', {}, frameBLockId);
return {
pageBlockId,
surfaceBlockId,
frameBLockId,
paragraphBlockId,
};
}
async function getUpdates(id: string): Promise<Uint8Array[]> {
const db = await openDB(rootDBName, dbVersion);
const store = await db
@@ -73,7 +88,8 @@ describe('indexeddb provider', () => {
},
],
});
const page = workspace.createPage('page0');
const page = workspace.createPage({ id: 'page0' });
await page.waitForLoaded();
const pageBlockId = page.addBlock('affine:page', { title: '' });
const frameId = page.addBlock('affine:frame', {}, pageBlockId);
page.addBlock('affine:paragraph', {}, frameId);
@@ -143,7 +159,8 @@ describe('indexeddb provider', () => {
provider.disconnect();
expect(provider.connected).toBe(false);
{
const page = workspace.createPage('page0');
const page = workspace.createPage({ id: 'page0' });
await page.waitForLoaded();
const pageBlockId = page.addBlock('affine:page', { title: '' });
const frameId = page.addBlock('affine:frame', {}, pageBlockId);
page.addBlock('affine:paragraph', {}, frameId);
@@ -214,10 +231,11 @@ describe('indexeddb provider', () => {
);
provider.connect();
{
const page = workspace.createPage('page0');
const page = workspace.createPage({ id: 'page0' });
await page.waitForLoaded();
const pageBlockId = page.addBlock('affine:page', { title: '' });
const frameId = page.addBlock('affine:frame', {}, pageBlockId);
for (let i = 0; i < 100; i++) {
for (let i = 0; i < 99; i++) {
page.addBlock('affine:paragraph', {}, frameId);
}
}
@@ -372,9 +390,89 @@ describe('milestone', () => {
});
});
describe('subDoc', () => {
test('basic', async () => {
let json1: any, json2: any;
{
const doc = new Doc();
const map = doc.getMap();
const subDoc = new Doc();
subDoc.load();
map.set('1', subDoc);
map.set('2', 'test');
const provider = createIndexedDBProvider('test', doc);
provider.connect();
await provider.whenSynced;
provider.disconnect();
json1 = doc.toJSON();
}
{
const doc = new Doc();
const provider = createIndexedDBProvider('test', doc);
provider.connect();
await provider.whenSynced;
const map = doc.getMap();
const subDoc = map.get('1') as Doc;
subDoc.load();
provider.disconnect();
json2 = doc.toJSON();
}
expect(json1['']['1'].toJSON()).toEqual(json2['']['1'].toJSON());
expect(json1['']['2']).toEqual(json2['']['2']);
});
test('blocksuite', async () => {
const page0 = workspace.createPage({
id: 'page0',
});
await page0.waitForLoaded();
const { paragraphBlockId: paragraphBlockIdPage1 } = initEmptyPage(page0);
const provider = createIndexedDBProvider(
workspace.id,
workspace.doc,
rootDBName
);
provider.connect();
const page1 = workspace.createPage({
id: 'page1',
});
await page1.waitForLoaded();
const { paragraphBlockId: paragraphBlockIdPage2 } = initEmptyPage(page1);
await new Promise(resolve => setTimeout(resolve, 1000));
provider.disconnect();
{
const newWorkspace = new Workspace({
id,
isSSR: true,
});
newWorkspace.register(AffineSchemas).register(__unstableSchemas);
const provider = createIndexedDBProvider(
newWorkspace.id,
newWorkspace.doc,
rootDBName
);
provider.connect();
await provider.whenSynced;
const page0 = newWorkspace.getPage('page0') as Page;
await page0.waitForLoaded();
{
const block = page0.getBlockById(paragraphBlockIdPage1);
assertExists(block);
}
const page1 = newWorkspace.getPage('page1') as Page;
await page1.waitForLoaded();
{
const block = page1.getBlockById(paragraphBlockIdPage2);
assertExists(block);
}
}
});
});
describe('utils', () => {
test('download binary', async () => {
const page = workspace.createPage('page0');
const page = workspace.createPage({ id: 'page0' });
await page.waitForLoaded();
initEmptyPage(page);
const provider = createIndexedDBProvider(
workspace.id,
@@ -397,7 +495,12 @@ describe('utils', () => {
applyUpdate(newWorkspace.doc, update);
await new Promise<void>(resolve =>
setTimeout(() => {
expect(workspace.doc.toJSON()).toEqual(newWorkspace.doc.toJSON());
expect(workspace.doc.toJSON()['meta']).toEqual(
newWorkspace.doc.toJSON()['meta']
);
expect(Object.keys(workspace.doc.toJSON()['spaces'])).toEqual(
Object.keys(newWorkspace.doc.toJSON()['spaces'])
);
resolve();
}, 0)
);

View File

@@ -142,6 +142,12 @@ export const getMilestones = async (
return milestone.milestone;
};
type SubDocsEvent = {
added: Set<Doc>;
removed: Set<Doc>;
loaded: Set<Doc>;
};
export const createIndexedDBProvider = (
id: string,
doc: Doc,
@@ -151,62 +157,175 @@ export const createIndexedDBProvider = (
let reject: (reason?: unknown) => void;
let early = true;
let connected = false;
async function handleUpdate(update: Uint8Array, origin: unknown) {
const db = await dbPromise;
if (!connected) {
return;
}
if (origin === indexeddbOrigin) {
return;
}
const store = db
.transaction('workspace', 'readwrite')
.objectStore('workspace');
let data = await store.get(id);
if (!data) {
data = {
id,
updates: [],
};
}
data.updates.push({
timestamp: Date.now(),
update,
});
if (data.updates.length > mergeCount) {
const updates = data.updates.map(({ update }) => update);
const doc = new Doc();
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update, indexeddbOrigin);
});
}, indexeddbOrigin);
const update = encodeStateAsUpdate(doc);
data = {
id,
updates: [
{
timestamp: Date.now(),
update,
},
],
};
await writeOperation(store.put(data));
} else {
await writeOperation(store.put(data));
}
}
const dbPromise = openDB<BlockSuiteBinaryDB>(dbName, dbVersion, {
upgrade: upgradeDB,
});
const handleDestroy = async () => {
connected = true;
const db = await dbPromise;
db.close();
const updateHandlerMap = new WeakMap<
Doc,
(update: Uint8Array, origin: unknown) => void
>();
const destroyHandlerMap = new WeakMap<Doc, () => void>();
const subDocsHandlerMap = new WeakMap<Doc, (event: SubDocsEvent) => void>();
const createOrGetHandleUpdate = (id: string, doc: Doc) => {
if (updateHandlerMap.has(doc)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return updateHandlerMap.get(doc)!;
}
const fn = async function handleUpdate(
update: Uint8Array,
origin: unknown
) {
const db = await dbPromise;
if (!connected) {
return;
}
if (origin === indexeddbOrigin) {
return;
}
const store = db
.transaction('workspace', 'readwrite')
.objectStore('workspace');
let data = await store.get(id);
if (!data) {
data = {
id,
updates: [],
};
}
data.updates.push({
timestamp: Date.now(),
update,
});
if (data.updates.length > mergeCount) {
const updates = data.updates.map(({ update }) => update);
const doc = new Doc();
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update, indexeddbOrigin);
});
}, indexeddbOrigin);
const update = encodeStateAsUpdate(doc);
data = {
id,
updates: [
{
timestamp: Date.now(),
update,
},
],
};
await writeOperation(store.put(data));
} else {
await writeOperation(store.put(data));
}
};
updateHandlerMap.set(doc, fn);
return fn;
};
/* deepscan-disable UNUSED_PARAM */
const createOrGetHandleDestroy = (_: string, doc: Doc) => {
if (destroyHandlerMap.has(doc)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return destroyHandlerMap.get(doc)!;
}
const fn = async function handleDestroy() {
const db = await dbPromise;
db.close();
};
destroyHandlerMap.set(doc, fn);
return fn;
};
/* deepscan-disable UNUSED_PARAM */
const createOrGetHandleSubDocs = (_: string, doc: Doc) => {
if (subDocsHandlerMap.has(doc)) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return subDocsHandlerMap.get(doc)!;
}
const fn = async function handleSubDocs(event: SubDocsEvent) {
event.removed.forEach(doc => {
unTrackDoc(doc.guid, doc);
});
event.loaded.forEach(doc => {
trackDoc(doc.guid, doc);
});
};
subDocsHandlerMap.set(doc, fn);
return fn;
};
function trackDoc(id: string, doc: Doc) {
doc.on('update', createOrGetHandleUpdate(id, doc));
doc.on('destroy', createOrGetHandleDestroy(id, doc));
doc.on('subdocs', createOrGetHandleSubDocs(id, doc));
}
function unTrackDoc(id: string, doc: Doc) {
doc.subdocs.forEach(doc => {
unTrackDoc(doc.guid, doc);
});
doc.off('update', createOrGetHandleUpdate(id, doc));
doc.off('destroy', createOrGetHandleDestroy(id, doc));
doc.off('subdocs', createOrGetHandleSubDocs(id, doc));
}
async function saveDocOperation(id: string, doc: Doc) {
const db = await dbPromise;
const store = db
.transaction('workspace', 'readwrite')
.objectStore('workspace');
const data = await store.get(id);
if (!connected) {
return;
}
if (!data) {
await writeOperation(
db.put('workspace', {
id,
updates: [
{
timestamp: Date.now(),
update: encodeStateAsUpdate(doc),
},
],
})
);
} else {
const updates = data.updates.map(({ update }) => update);
const fakeDoc = new Doc();
fakeDoc.transact(() => {
updates.forEach(update => {
applyUpdate(fakeDoc, update);
});
}, indexeddbOrigin);
const newUpdate = diffUpdate(
encodeStateAsUpdate(doc),
encodeStateAsUpdate(fakeDoc)
);
await writeOperation(
store.put({
...data,
updates: [
...data.updates,
{
timestamp: Date.now(),
update: newUpdate,
},
],
})
);
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update);
});
}, indexeddbOrigin);
}
}
const apis = {
connect: async () => {
if (connected) return;
@@ -217,60 +336,23 @@ export const createIndexedDBProvider = (
reject = _reject;
});
connected = true;
doc.on('update', handleUpdate);
doc.on('destroy', handleDestroy);
// only run promise below, otherwise the logic is incorrect
trackDoc(id, doc);
// only the runs `await` below, otherwise the logic is incorrect
const db = await dbPromise;
await tryMigrate(db, id, dbName);
const store = db
.transaction('workspace', 'readwrite')
.objectStore('workspace');
const data = await store.get(id);
if (!connected) {
return;
}
if (!data) {
await writeOperation(
db.put('workspace', {
id,
updates: [
{
timestamp: Date.now(),
update: encodeStateAsUpdate(doc),
},
],
})
);
} else {
const updates = data.updates.map(({ update }) => update);
const fakeDoc = new Doc();
fakeDoc.transact(() => {
updates.forEach(update => {
applyUpdate(fakeDoc, update);
});
}, indexeddbOrigin);
const newUpdate = diffUpdate(
encodeStateAsUpdate(doc),
encodeStateAsUpdate(fakeDoc)
);
await writeOperation(
store.put({
...data,
updates: [
...data.updates,
{
timestamp: Date.now(),
update: newUpdate,
},
],
})
);
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update);
});
}, indexeddbOrigin);
const docs: [string, Doc][] = [];
docs.push([id, doc]);
while (docs.length > 0) {
const [id, doc] = docs.pop() as [string, Doc];
await saveDocOperation(id, doc);
doc.subdocs.forEach(doc => {
docs.push([doc.guid, doc]);
});
}
early = false;
resolve();
},
@@ -279,8 +361,7 @@ export const createIndexedDBProvider = (
if (early) {
reject(new EarlyDisconnectError());
}
doc.off('update', handleUpdate);
doc.off('destroy', handleDestroy);
unTrackDoc(id, doc);
},
async cleanup() {
if (connected) {

View File

@@ -9,9 +9,6 @@
"references": [
{
"path": "./tsconfig.node.json"
},
{
"path": "../env"
}
]
}