feat!: upgrade blocksuite version (#2833)

This commit is contained in:
Alex Yang
2023-06-25 01:16:46 +08:00
committed by GitHub
parent aa86d3a2ee
commit 7fcc5e599e
59 changed files with 564 additions and 1064 deletions

View File

@@ -3,44 +3,42 @@ import type {
AffineWebSocketProvider,
LocalIndexedDBBackgroundProvider,
LocalIndexedDBDownloadProvider,
Provider,
SQLiteDBDownloadProvider,
SQLiteProvider,
} from '@affine/env/workspace';
import type { BlobManager, Disposable } from '@blocksuite/store';
import {
assertExists,
Workspace as BlockSuiteWorkspace,
} from '@blocksuite/store';
import type { Disposable, DocProviderCreator } from '@blocksuite/store';
import { assertExists, Workspace } from '@blocksuite/store';
import { createBroadcastChannelProvider } from '@blocksuite/store/providers/broadcast-channel';
import {
createIndexedDBProvider as create,
downloadBinary,
EarlyDisconnectError,
} from '@toeverything/y-indexeddb';
import type { Doc } from 'yjs';
import { KeckProvider } from '../affine/keck';
import { getLoginStorage, storageChangeSlot } from '../affine/login';
import { CallbackSet } from '../utils';
import { createAffineDownloadProvider } from './affine-download';
import { createBroadCastChannelProvider } from './broad-cast-channel';
import { localProviderLogger as logger } from './logger';
const Y = BlockSuiteWorkspace.Y;
const Y = Workspace.Y;
const createAffineWebSocketProvider = (
blockSuiteWorkspace: BlockSuiteWorkspace
const createAffineWebSocketProvider: DocProviderCreator = (
id,
doc,
{ awareness }
): AffineWebSocketProvider => {
let webSocketProvider: KeckProvider | null = null;
let dispose: Disposable | undefined = undefined;
const callbacks = new CallbackSet();
const cb = () => callbacks.forEach(cb => cb());
const apis: AffineWebSocketProvider = {
const apis = {
flavour: 'affine-websocket',
background: true,
passive: true,
get connected() {
return callbacks.ready;
},
callbacks,
cleanup: () => {
assertExists(webSocketProvider);
webSocketProvider.destroy();
@@ -54,11 +52,11 @@ const createAffineWebSocketProvider = (
});
webSocketProvider = new KeckProvider(
websocketPrefixUrl + '/api/sync/',
blockSuiteWorkspace.id,
blockSuiteWorkspace.doc,
id,
doc,
{
params: { token: getLoginStorage()?.token ?? '' },
awareness: blockSuiteWorkspace.awarenessStore.awareness,
awareness,
// we maintain a broadcast channel by ourselves
connect: false,
}
@@ -74,28 +72,28 @@ const createAffineWebSocketProvider = (
webSocketProvider.off('synced', cb);
dispose?.dispose();
},
};
} satisfies AffineWebSocketProvider;
return apis;
};
const createIndexedDBBackgroundProvider = (
blockSuiteWorkspace: BlockSuiteWorkspace
const createIndexedDBBackgroundProvider: DocProviderCreator = (
id,
blockSuiteWorkspace
): LocalIndexedDBBackgroundProvider => {
const indexeddbProvider = create(blockSuiteWorkspace.doc);
const indexeddbProvider = create(blockSuiteWorkspace);
const callbacks = new CallbackSet();
return {
flavour: 'local-indexeddb-background',
background: true,
passive: true,
get connected() {
return callbacks.ready;
},
callbacks,
cleanup: () => {
// todo: cleanup data
indexeddbProvider.cleanup().catch(console.error);
},
connect: () => {
logger.info('connect indexeddb provider', blockSuiteWorkspace.id);
logger.info('connect indexeddb provider', id);
indexeddbProvider.connect();
indexeddbProvider.whenSynced
.then(() => {
@@ -113,15 +111,16 @@ const createIndexedDBBackgroundProvider = (
},
disconnect: () => {
assertExists(indexeddbProvider);
logger.info('disconnect indexeddb provider', blockSuiteWorkspace.id);
logger.info('disconnect indexeddb provider', id);
indexeddbProvider.disconnect();
callbacks.ready = false;
},
};
};
const createIndexedDBDownloadProvider = (
blockSuiteWorkspace: BlockSuiteWorkspace
const createIndexedDBDownloadProvider: DocProviderCreator = (
id,
doc
): LocalIndexedDBDownloadProvider => {
let _resolve: () => void;
let _reject: (error: unknown) => void;
@@ -129,9 +128,16 @@ const createIndexedDBDownloadProvider = (
_resolve = resolve;
_reject = reject;
});
async function downloadBinaryRecursively(doc: Doc) {
const binary = await downloadBinary(doc.guid);
if (binary) {
Y.applyUpdate(doc, binary);
await Promise.all([...doc.subdocs].map(downloadBinaryRecursively));
}
}
return {
flavour: 'local-indexeddb',
necessary: true,
active: true,
get whenReady() {
return promise;
},
@@ -139,26 +145,15 @@ const createIndexedDBDownloadProvider = (
// todo: cleanup data
},
sync: () => {
logger.info('connect indexeddb provider', blockSuiteWorkspace.id);
downloadBinary(blockSuiteWorkspace.id)
.then(binary => {
if (binary !== false) {
Y.applyUpdate(blockSuiteWorkspace.doc, binary);
}
_resolve();
})
.catch(error => {
_reject(error);
});
logger.info('connect indexeddb provider', id);
downloadBinaryRecursively(doc).then(_resolve).catch(_reject);
},
};
};
const sqliteOrigin = Symbol('sqlite-provider-origin');
const createSQLiteProvider = (
blockSuiteWorkspace: BlockSuiteWorkspace
): SQLiteProvider => {
const createSQLiteProvider: DocProviderCreator = (id, doc): SQLiteProvider => {
const { apis, events } = window;
// make sure it is being used in Electron with APIs
assertExists(apis);
@@ -168,7 +163,7 @@ const createSQLiteProvider = (
if (origin === sqliteOrigin) {
return;
}
apis.db.applyDocUpdate(blockSuiteWorkspace.id, update).catch(err => {
apis.db.applyDocUpdate(id, update).catch(err => {
console.error(err);
});
}
@@ -176,11 +171,9 @@ const createSQLiteProvider = (
let unsubscribe = () => {};
let connected = false;
const callbacks = new CallbackSet();
const connect = () => {
logger.info('connecting sqlite provider', blockSuiteWorkspace.id);
blockSuiteWorkspace.doc.on('update', handleUpdate);
logger.info('connecting sqlite provider', id);
doc.on('update', handleUpdate);
unsubscribe = events.db.onExternalUpdate(
({
update,
@@ -189,26 +182,25 @@ const createSQLiteProvider = (
workspaceId: string;
update: Uint8Array;
}) => {
if (workspaceId === blockSuiteWorkspace.id) {
Y.applyUpdate(blockSuiteWorkspace.doc, update, sqliteOrigin);
if (workspaceId === id) {
Y.applyUpdate(doc, update, sqliteOrigin);
}
}
);
connected = true;
logger.info('connecting sqlite done', blockSuiteWorkspace.id);
logger.info('connecting sqlite done', id);
};
const cleanup = () => {
logger.info('disconnecting sqlite provider', blockSuiteWorkspace.id);
logger.info('disconnecting sqlite provider', id);
unsubscribe();
blockSuiteWorkspace.doc.off('update', handleUpdate);
doc.off('update', handleUpdate);
connected = false;
};
return {
flavour: 'sqlite',
background: true,
callbacks,
passive: true,
get connected(): boolean {
return connected;
},
@@ -218,8 +210,9 @@ const createSQLiteProvider = (
};
};
const createSQLiteDBDownloadProvider = (
blockSuiteWorkspace: BlockSuiteWorkspace
const createSQLiteDBDownloadProvider: DocProviderCreator = (
id,
doc
): SQLiteDBDownloadProvider => {
const { apis } = window;
let disconnected = false;
@@ -232,64 +225,59 @@ const createSQLiteDBDownloadProvider = (
});
async function syncUpdates() {
logger.info('syncing updates from sqlite', blockSuiteWorkspace.id);
const updates = await apis.db.getDocAsUpdates(blockSuiteWorkspace.id);
logger.info('syncing updates from sqlite', id);
const updates = await apis.db.getDocAsUpdates(id);
if (disconnected) {
return;
}
if (updates) {
Y.applyUpdate(blockSuiteWorkspace.doc, updates, sqliteOrigin);
Y.applyUpdate(doc, updates, sqliteOrigin);
}
const diff = Y.encodeStateAsUpdate(blockSuiteWorkspace.doc, updates);
const diff = Y.encodeStateAsUpdate(doc, updates);
// also apply updates to sqlite
await apis.db.applyDocUpdate(blockSuiteWorkspace.id, diff);
const bs = blockSuiteWorkspace.blobs;
if (bs && !disconnected) {
await syncBlobIntoSQLite(bs);
}
await apis.db.applyDocUpdate(id, diff);
}
async function syncBlobIntoSQLite(bs: BlobManager) {
const persistedKeys = await apis.db.getBlobKeys(blockSuiteWorkspace.id);
if (disconnected) {
return;
}
const allKeys = await bs.list().catch(() => []);
const keysToPersist = allKeys.filter(k => !persistedKeys.includes(k));
logger.info('persisting blobs', keysToPersist, 'to sqlite');
return Promise.all(
keysToPersist.map(async k => {
const blob = await bs.get(k);
if (!blob) {
logger.warn('blob not found for', k);
return;
}
if (disconnected) {
return;
}
return apis?.db.addBlob(
blockSuiteWorkspace.id,
k,
new Uint8Array(await blob.arrayBuffer())
);
})
);
}
// fixme(pengx17): should n't sync blob in doc provider
// async function _syncBlobIntoSQLite(bs: BlobManager) {
// const persistedKeys = await apis.db.getBlobKeys(id);
//
// if (disconnected) {
// return;
// }
//
// const allKeys = await bs.list().catch(() => []);
// const keysToPersist = allKeys.filter(k => !persistedKeys.includes(k));
//
// logger.info('persisting blobs', keysToPersist, 'to sqlite');
// return Promise.all(
// keysToPersist.map(async k => {
// const blob = await bs.get(k);
// if (!blob) {
// logger.warn('blob not found for', k);
// return;
// }
//
// if (disconnected) {
// return;
// }
//
// return apis?.db.addBlob(
// id,
// k,
// new Uint8Array(await blob.arrayBuffer())
// );
// })
// );
// }
return {
flavour: 'sqlite-download',
necessary: true,
active: true,
get whenReady() {
return promise;
},
@@ -297,7 +285,7 @@ const createSQLiteDBDownloadProvider = (
disconnected = true;
},
sync: async () => {
logger.info('connect indexeddb provider', blockSuiteWorkspace.id);
logger.info('connect indexeddb provider', id);
try {
await syncUpdates();
_resolve();
@@ -311,45 +299,37 @@ const createSQLiteDBDownloadProvider = (
export {
createAffineDownloadProvider,
createAffineWebSocketProvider,
createBroadCastChannelProvider,
createBroadcastChannelProvider,
createIndexedDBBackgroundProvider,
createIndexedDBDownloadProvider,
createSQLiteDBDownloadProvider,
createSQLiteProvider,
};
export const createLocalProviders = (
blockSuiteWorkspace: BlockSuiteWorkspace
): Provider[] => {
export const createLocalProviders = (): DocProviderCreator[] => {
const providers = [
createIndexedDBBackgroundProvider(blockSuiteWorkspace),
createIndexedDBDownloadProvider(blockSuiteWorkspace),
] as Provider[];
createIndexedDBBackgroundProvider,
createIndexedDBDownloadProvider,
] as DocProviderCreator[];
if (config.enableBroadCastChannelProvider) {
providers.push(createBroadCastChannelProvider(blockSuiteWorkspace));
if (config.enableBroadcastChannelProvider) {
providers.push(createBroadcastChannelProvider);
}
if (environment.isDesktop) {
providers.push(
createSQLiteProvider(blockSuiteWorkspace),
createSQLiteDBDownloadProvider(blockSuiteWorkspace)
);
providers.push(createSQLiteProvider, createSQLiteDBDownloadProvider);
}
return providers;
};
export const createAffineProviders = (
blockSuiteWorkspace: BlockSuiteWorkspace
): Provider[] => {
export const createAffineProviders = (): DocProviderCreator[] => {
return (
[
createAffineDownloadProvider(blockSuiteWorkspace),
createAffineWebSocketProvider(blockSuiteWorkspace),
config.enableBroadCastChannelProvider &&
createBroadCastChannelProvider(blockSuiteWorkspace),
createIndexedDBDownloadProvider(blockSuiteWorkspace),
] as any[]
createAffineDownloadProvider,
createAffineWebSocketProvider,
config.enableBroadcastChannelProvider && createBroadcastChannelProvider,
createIndexedDBDownloadProvider,
] as DocProviderCreator[]
).filter(v => Boolean(v));
};