refactor: remove y-indexeddb (#1771)

This commit is contained in:
Himself65
2023-04-02 02:57:50 -05:00
committed by GitHub
parent f5574c68fe
commit ed8f07f102
12 changed files with 918 additions and 35 deletions

View File

@@ -19,6 +19,7 @@
"@affine/env": "workspace:*",
"@blocksuite/blocks": "0.5.0-20230326033652-70ca43c",
"@blocksuite/store": "0.5.0-20230326033652-70ca43c",
"@toeverything/y-indexeddb": "workspace:*",
"firebase": "^9.19.1",
"jotai": "^2.0.3",
"js-base64": "^3.7.5",

View File

@@ -1,6 +1,6 @@
import { nanoid, Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { createIndexedDBProvider } from '@toeverything/y-indexeddb';
import { createJSONStorage } from 'jotai/utils';
import { IndexeddbPersistence } from 'y-indexeddb';
import { z } from 'zod';
import { createLocalProviders } from '../providers';
@@ -47,9 +47,10 @@ export const CRUD: WorkspaceCRUD<WorkspaceFlavour.LOCAL> = {
(_: string) => undefined
);
BlockSuiteWorkspace.Y.applyUpdateV2(blockSuiteWorkspace.doc, binary);
const persistence = new IndexeddbPersistence(id, blockSuiteWorkspace.doc);
const persistence = createIndexedDBProvider(id, blockSuiteWorkspace.doc);
persistence.connect();
await persistence.whenSynced.then(() => {
persistence.destroy();
persistence.disconnect();
});
storage.setItem(kStoreKey, [...data, id]);
console.log('create', id, storage.getItem(kStoreKey));

View File

@@ -0,0 +1,36 @@
{
"name": "@toeverything/y-indexeddb",
"type": "module",
"scripts": {
"build": "vite build"
},
"files": [
"dist"
],
"exports": {
".": "./src/index.ts"
},
"publishConfig": {
"main": "dist/index.umd.cjs",
"module": "dist/index.js",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.js",
"require": "./dist/index.umd.cjs"
}
}
},
"dependencies": {
"idb": "^7.1.1"
},
"devDependencies": {
"@blocksuite/blocks": "0.5.0-20230324040005-14417c2",
"@blocksuite/store": "0.5.0-20230324040005-14417c2",
"vite": "^4.2.1",
"vite-plugin-dts": "^2.1.0"
},
"peerDependencies": {
"yjs": "^13.5.51"
}
}

View File

@@ -0,0 +1,148 @@
/**
* @vitest-environment happy-dom
*/
import 'fake-indexeddb/auto';
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import { assertExists, uuidv4, Workspace } from '@blocksuite/store';
import { openDB } from 'idb';
import { beforeEach, describe, expect, test, vi } from 'vitest';
import type { WorkspacePersist } from '../index';
import { createIndexedDBProvider, dbVersion, setMergeCount } from '../index';
async function getUpdates(id: string): Promise<ArrayBuffer[]> {
const db = await openDB('affine-local', dbVersion);
const store = await db
.transaction('workspace', 'readonly')
.objectStore('workspace');
const data = (await store.get(id)) as WorkspacePersist | undefined;
assertExists(data, 'data should not be undefined');
expect(data.id).toBe(id);
return data.updates.map(({ update }) => update);
}
let id: string;
let workspace: Workspace;
beforeEach(() => {
id = uuidv4();
workspace = new Workspace({
id,
isSSR: true,
});
workspace.register(AffineSchemas).register(__unstableSchemas);
});
describe('indexeddb provider', () => {
test('connect', async () => {
const provider = createIndexedDBProvider(workspace.id, workspace.doc);
provider.connect();
await provider.whenSynced;
const db = await openDB('affine-local', dbVersion);
{
const store = await db
.transaction('workspace', 'readonly')
.objectStore('workspace');
const data = await store.get(id);
expect(data).toEqual({
id,
updates: [],
});
const page = workspace.createPage('page0');
const pageBlockId = page.addBlock('affine:page', { title: '' });
const frameId = page.addBlock('affine:frame', {}, pageBlockId);
page.addBlock('affine:paragraph', {}, frameId);
}
await new Promise(resolve => setTimeout(resolve, 1000));
{
const store = await db
.transaction('workspace', 'readonly')
.objectStore('workspace');
const data = (await store.get(id)) as WorkspacePersist | undefined;
assertExists(data);
expect(data.id).toBe(id);
const testWorkspace = new Workspace({
id: 'test',
})
.register(AffineSchemas)
.register(__unstableSchemas);
data.updates.forEach(({ update }) => {
Workspace.Y.applyUpdate(testWorkspace.doc, update);
});
const binary = Workspace.Y.encodeStateAsUpdate(testWorkspace.doc);
expect(binary).toEqual(Workspace.Y.encodeStateAsUpdate(workspace.doc));
}
const secondWorkspace = new Workspace({
id,
})
.register(AffineSchemas)
.register(__unstableSchemas);
const provider2 = createIndexedDBProvider(
secondWorkspace.id,
secondWorkspace.doc
);
provider2.connect();
await provider2.whenSynced;
expect(Workspace.Y.encodeStateAsUpdate(secondWorkspace.doc)).toEqual(
Workspace.Y.encodeStateAsUpdate(workspace.doc)
);
});
test('disconnect suddenly', async () => {
const provider = createIndexedDBProvider(workspace.id, workspace.doc);
const fn = vi.fn();
provider.connect();
provider.disconnect();
expect(fn).toBeCalledTimes(0);
await provider.whenSynced.catch(fn);
expect(fn).toBeCalledTimes(1);
});
test('connect and disconnect', async () => {
const provider = createIndexedDBProvider(workspace.id, workspace.doc);
provider.connect();
const p1 = provider.whenSynced;
await provider.whenSynced;
provider.disconnect();
{
const page = workspace.createPage('page0');
const pageBlockId = page.addBlock('affine:page', { title: '' });
const frameId = page.addBlock('affine:frame', {}, pageBlockId);
page.addBlock('affine:paragraph', {}, frameId);
}
{
const updates = await getUpdates(workspace.id);
expect(updates).toEqual([]);
}
provider.connect();
const p2 = provider.whenSynced;
await provider.whenSynced;
{
const updates = await getUpdates(workspace.id);
expect(updates).not.toEqual([]);
}
provider.disconnect();
expect(p1).not.toBe(p2);
});
test('merge', async () => {
setMergeCount(5);
const provider = createIndexedDBProvider(workspace.id, workspace.doc);
provider.connect();
{
const page = workspace.createPage('page0');
const pageBlockId = page.addBlock('affine:page', { title: '' });
const frameId = page.addBlock('affine:frame', {}, pageBlockId);
for (let i = 0; i < 100; i++) {
page.addBlock('affine:paragraph', {}, frameId);
}
}
await provider.whenSynced;
{
const updates = await getUpdates(id);
expect(updates.length).lessThanOrEqual(5);
}
});
});

View File

@@ -0,0 +1,249 @@
import { openDB } from 'idb';
import type { DBSchema, IDBPDatabase } from 'idb/build/entry';
import {
applyUpdate,
diffUpdate,
Doc,
encodeStateAsUpdate,
mergeUpdates,
} from 'yjs';
const indexeddbOrigin = Symbol('indexeddb-provider-origin');
let mergeCount = 500;
export class EarlyDisconnectError extends Error {
constructor() {
super('Early disconnect');
}
}
export function setMergeCount(count: number) {
mergeCount = count;
}
export const dbVersion = 1;
export function upgradeDB(db: IDBPDatabase<BlockSuiteBinaryDB>) {
db.createObjectStore('workspace', { keyPath: 'id' });
db.createObjectStore('milestone', { keyPath: 'id' });
}
export interface IndexedDBProvider {
connect: () => void;
disconnect: () => void;
cleanup: () => void;
whenSynced: Promise<void>;
}
export type UpdateMessage = {
timestamp: number;
update: Uint8Array;
};
export type WorkspacePersist = {
id: string;
updates: UpdateMessage[];
};
export type WorkspaceMilestone = {
id: string;
milestone: Record<string, Uint8Array>;
};
export interface BlockSuiteBinaryDB extends DBSchema {
workspace: {
key: string;
value: WorkspacePersist;
};
milestone: {
key: string;
value: WorkspaceMilestone;
};
}
export interface OldYjsDB extends DBSchema {
updates: {
key: number;
value: Uint8Array;
};
}
export const createIndexedDBProvider = (
id: string,
doc: Doc,
dbName = 'affine-local'
): IndexedDBProvider => {
let allDb: IDBDatabaseInfo[];
let resolve: () => void;
let reject: (reason?: unknown) => void;
let early = true;
let connect = false;
let destroy = false;
async function handleUpdate(update: Uint8Array, origin: unknown) {
const db = await dbPromise;
if (!connect) {
return;
}
if (origin === indexeddbOrigin) {
return;
}
const store = db
.transaction('workspace', 'readwrite')
.objectStore('workspace');
let data = await store.get(id);
if (!data) {
data = {
id,
updates: [],
};
}
data.updates.push({
timestamp: Date.now(),
update,
});
if (data.updates.length > mergeCount) {
const updates = data.updates.map(({ update }) => update);
const doc = new Doc();
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update, indexeddbOrigin);
});
}, indexeddbOrigin);
const update = encodeStateAsUpdate(doc);
data = {
id,
updates: [
{
timestamp: Date.now(),
update,
},
],
};
await store.put(data);
} else {
await store.put(data);
}
}
const dbPromise = openDB<BlockSuiteBinaryDB>(dbName, dbVersion, {
upgrade: upgradeDB,
});
const handleDestroy = async () => {
connect = true;
destroy = true;
const db = await dbPromise;
db.close();
};
const apis = {
connect: async () => {
apis.whenSynced = new Promise<void>((_resolve, _reject) => {
early = true;
resolve = _resolve;
reject = _reject;
});
connect = true;
doc.on('update', handleUpdate);
doc.on('destroy', handleDestroy);
// only run promise below, otherwise the logic is incorrect
const db = await dbPromise;
if (!allDb) {
allDb = await indexedDB.databases();
// run the migration
await Promise.all(
allDb.map(meta => {
if (meta.name && meta.version === 1) {
const name = meta.name;
const version = meta.version;
return openDB<IDBPDatabase<OldYjsDB>>(name, version).then(
async oldDB => {
if (!oldDB.objectStoreNames.contains('updates')) {
return;
}
const t = oldDB
.transaction('updates', 'readonly')
.objectStore('updates');
const updates = await t.getAll();
if (
!Array.isArray(updates) ||
!updates.every(update => update instanceof Uint8Array)
) {
return;
}
const update = mergeUpdates(updates);
const workspaceTransaction = db
.transaction('workspace', 'readwrite')
.objectStore('workspace');
const data = await workspaceTransaction.get(name);
if (!data) {
console.log('upgrading the database');
await workspaceTransaction.put({
id: name,
updates: [
{
timestamp: Date.now(),
update,
},
],
});
}
}
);
}
})
);
}
const store = db
.transaction('workspace', 'readwrite')
.objectStore('workspace');
const data = await store.get(id);
if (!connect) {
return;
}
if (!data) {
await db.put('workspace', {
id,
updates: [],
});
} else {
const updates = data.updates.map(({ update }) => update);
const update = mergeUpdates(updates);
const newUpdate = diffUpdate(encodeStateAsUpdate(doc), update);
await store.put({
...data,
updates: [
...data.updates,
{
timestamp: Date.now(),
update: newUpdate,
},
],
});
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update);
});
}, indexeddbOrigin);
}
early = false;
resolve();
},
disconnect() {
connect = false;
if (early) {
reject(new EarlyDisconnectError());
}
doc.off('update', handleUpdate);
doc.off('destroy', handleDestroy);
},
cleanup() {
destroy = true;
// todo
},
whenSynced: Promise.resolve(),
};
return apis;
};

View File

@@ -0,0 +1,9 @@
{
"extends": "../../tsconfig.json",
"include": ["./src"],
"references": [
{
"path": "./tsconfig.node.json"
}
]
}

View File

@@ -0,0 +1,9 @@
{
"compilerOptions": {
"composite": true,
"module": "ESNext",
"moduleResolution": "Node",
"allowSyntheticDefaultImports": true
},
"include": ["vite.config.ts"]
}

View File

@@ -0,0 +1,25 @@
import { resolve } from 'node:path';
import { fileURLToPath } from 'url';
import { defineConfig } from 'vite';
import dts from 'vite-plugin-dts';
const __dirname = fileURLToPath(new URL('.', import.meta.url));
export default defineConfig({
build: {
terserOptions: {
ecma: 2020,
},
sourcemap: true,
lib: {
entry: resolve(__dirname, 'src/index.ts'),
fileName: 'index',
name: 'BlockSuiteIndexedDBProvider',
},
rollupOptions: {
external: ['idb', '@blocksuite/store'],
},
},
plugins: [dts()],
});