refactor: workspace provider (#2218)

This commit is contained in:
Himself65
2023-05-03 18:16:22 -05:00
committed by GitHub
parent ec39c23fb7
commit 9096ac2960
18 changed files with 377 additions and 255 deletions

View File

@@ -17,7 +17,7 @@ export type RootWorkspaceMetadata = {
/**
* root workspaces atom
* this atom stores the metadata of all workspaces,
* which is `id` and `flavour`, that is enough to load the real workspace data
* which is `id` and `flavor`, that is enough to load the real workspace data
*/
export const rootWorkspacesMetadataAtom = atomWithSyncStorage<
RootWorkspaceMetadata[]

View File

@@ -1,3 +1,4 @@
import { CallbackSet } from '@affine/workspace/utils';
import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { assertExists } from '@blocksuite/store';
import type { Awareness } from 'y-protocols/awareness';
@@ -23,6 +24,7 @@ export const createBroadCastChannelProvider = (
const awareness = blockSuiteWorkspace.awarenessStore
.awareness as unknown as Awareness;
let broadcastChannel: TypedBroadcastChannel | null = null;
const callbacks = new CallbackSet();
const handleBroadcastChannelMessage = (
event: BroadcastChannelMessageEvent
) => {
@@ -56,6 +58,9 @@ export const createBroadCastChannelProvider = (
break;
}
}
if (callbacks.ready) {
callbacks.forEach(cb => cb());
}
};
const handleDocUpdate = (updateV1: Uint8Array, origin: any) => {
if (origin === broadcastChannel) {
@@ -77,7 +82,11 @@ export const createBroadCastChannelProvider = (
};
return {
flavour: 'broadcast-channel',
background: false,
background: true,
get connected() {
return callbacks.ready;
},
callbacks,
connect: () => {
assertExists(blockSuiteWorkspace.id);
broadcastChannel = Object.assign(
@@ -101,6 +110,7 @@ export const createBroadCastChannelProvider = (
broadcastChannel.postMessage(['awareness:update', awarenessUpdate]);
doc.on('update', handleDocUpdate);
awareness.on('update', handleAwarenessUpdate);
callbacks.ready = true;
},
disconnect: () => {
assertExists(broadcastChannel);
@@ -111,6 +121,7 @@ export const createBroadCastChannelProvider = (
doc.off('update', handleDocUpdate);
awareness.off('update', handleAwarenessUpdate);
broadcastChannel.close();
callbacks.ready = false;
},
cleanup: () => {
assertExists(broadcastChannel);

View File

@@ -4,16 +4,22 @@ import {
getLoginStorage,
storageChangeSlot,
} from '@affine/workspace/affine/login';
import type { Provider, SQLiteProvider } from '@affine/workspace/type';
import type {
AffineWebSocketProvider,
LocalIndexedDBProvider,
LocalIndexedDBBackgroundProvider,
LocalIndexedDBDownloadProvider,
Provider,
SQLiteProvider,
} from '@affine/workspace/type';
import { CallbackSet } from '@affine/workspace/utils';
import type { BlobManager, Disposable } from '@blocksuite/store';
import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { assertExists } from '@blocksuite/store';
import {
assertExists,
Workspace as BlockSuiteWorkspace,
} from '@blocksuite/store';
import {
createIndexedDBProvider as create,
downloadBinary,
EarlyDisconnectError,
} from '@toeverything/y-indexeddb';
@@ -27,9 +33,15 @@ const createAffineWebSocketProvider = (
): AffineWebSocketProvider => {
let webSocketProvider: KeckProvider | null = null;
let dispose: Disposable | undefined = undefined;
const callbacks = new CallbackSet();
const cb = () => callbacks.forEach(cb => cb());
const apis: AffineWebSocketProvider = {
flavour: 'affine-websocket',
background: false,
background: true,
get connected() {
return callbacks.ready;
},
callbacks,
cleanup: () => {
assertExists(webSocketProvider);
webSocketProvider.destroy();
@@ -48,20 +60,19 @@ const createAffineWebSocketProvider = (
{
params: { token: getLoginStorage()?.token ?? '' },
awareness: blockSuiteWorkspace.awarenessStore.awareness,
// we maintain broadcast channel by ourselves
// @ts-expect-error
disableBc: true,
// we maintain a broadcast channel by ourselves
connect: false,
}
);
logger.info('connect', webSocketProvider.url);
webSocketProvider.on('synced', cb);
webSocketProvider.connect();
},
disconnect: () => {
assertExists(webSocketProvider);
logger.info('disconnect', webSocketProvider.url);
webSocketProvider.destroy();
webSocketProvider = null;
webSocketProvider.disconnect();
webSocketProvider.off('synced', cb);
dispose?.dispose();
},
};
@@ -69,52 +80,21 @@ const createAffineWebSocketProvider = (
return apis;
};
class CallbackSet extends Set<() => void> {
#ready = false;
get ready(): boolean {
return this.#ready;
}
set ready(v: boolean) {
this.#ready = v;
}
add(cb: () => void) {
if (this.ready) {
cb();
return this;
}
if (this.has(cb)) {
return this;
}
return super.add(cb);
}
delete(cb: () => void) {
if (this.has(cb)) {
return super.delete(cb);
}
return false;
}
}
const createIndexedDBProvider = (
const createIndexedDBBackgroundProvider = (
blockSuiteWorkspace: BlockSuiteWorkspace
): LocalIndexedDBProvider => {
): LocalIndexedDBBackgroundProvider => {
const indexeddbProvider = create(
blockSuiteWorkspace.id,
blockSuiteWorkspace.doc
);
const callbacks = new CallbackSet();
return {
flavour: 'local-indexeddb',
// fixme: remove callbacks
callbacks,
// fixme: remove whenSynced
whenSynced: indexeddbProvider.whenSynced,
// fixme: remove background long polling
flavour: 'local-indexeddb-background',
background: true,
get connected() {
return callbacks.ready;
},
callbacks,
cleanup: () => {
// todo: cleanup data
},
@@ -127,6 +107,7 @@ const createIndexedDBProvider = (
callbacks.forEach(cb => cb());
})
.catch(error => {
callbacks.ready = false;
if (error instanceof EarlyDisconnectError) {
return;
} else {
@@ -143,6 +124,40 @@ const createIndexedDBProvider = (
};
};
const createIndexedDBDownloadProvider = (
blockSuiteWorkspace: BlockSuiteWorkspace
): LocalIndexedDBDownloadProvider => {
let _resolve: () => void;
let _reject: (error: unknown) => void;
const promise = new Promise<void>((resolve, reject) => {
_resolve = resolve;
_reject = reject;
});
return {
flavour: 'local-indexeddb',
necessary: true,
get whenReady() {
return promise;
},
cleanup: () => {
// todo: cleanup data
},
sync: () => {
logger.info('connect indexeddb provider', blockSuiteWorkspace.id);
downloadBinary(blockSuiteWorkspace.id)
.then(binary => {
if (binary !== false) {
Y.applyUpdate(blockSuiteWorkspace.doc, binary);
}
_resolve();
})
.catch(error => {
_reject(error);
});
},
};
};
const createSQLiteProvider = (
blockSuiteWorkspace: BlockSuiteWorkspace
): SQLiteProvider => {
@@ -166,18 +181,20 @@ const createSQLiteProvider = (
const keysToPersist = allKeys.filter(k => !persistedKeys.includes(k));
logger.info('persisting blobs', keysToPersist, 'to sqlite');
keysToPersist.forEach(async k => {
const blob = await bs.get(k);
if (!blob) {
logger.warn('blob not found for', k);
return;
}
window.apis.db.addBlob(
blockSuiteWorkspace.id,
k,
new Uint8Array(await blob.arrayBuffer())
);
});
return Promise.all(
keysToPersist.map(async k => {
const blob = await bs.get(k);
if (!blob) {
logger.warn('blob not found for', k);
return;
}
return window.apis.db.addBlob(
blockSuiteWorkspace.id,
k,
new Uint8Array(await blob.arrayBuffer())
);
})
);
}
async function syncUpdates() {
@@ -202,16 +219,23 @@ const createSQLiteProvider = (
}
let unsubscribe = () => {};
let connected = false;
const callbacks = new CallbackSet();
const provider = {
return {
flavour: 'sqlite',
background: true,
callbacks,
get connected(): boolean {
return connected;
},
cleanup: () => {
throw new Error('Method not implemented.');
},
connect: async () => {
logger.info('connecting sqlite provider', blockSuiteWorkspace.id);
await syncUpdates();
connected = true;
blockSuiteWorkspace.doc.on('update', handleUpdate);
@@ -223,6 +247,7 @@ const createSQLiteProvider = (
if (timer) {
clearTimeout(timer);
}
// @ts-expect-error ignore the type
timer = setTimeout(() => {
syncUpdates();
@@ -237,16 +262,16 @@ const createSQLiteProvider = (
disconnect: () => {
unsubscribe();
blockSuiteWorkspace.doc.off('update', handleUpdate);
connected = false;
},
} satisfies SQLiteProvider;
return provider;
};
};
export {
createAffineWebSocketProvider,
createBroadCastChannelProvider,
createIndexedDBProvider,
createIndexedDBBackgroundProvider,
createIndexedDBDownloadProvider,
createSQLiteProvider,
};
@@ -257,7 +282,8 @@ export const createLocalProviders = (
[
config.enableBroadCastChannelProvider &&
createBroadCastChannelProvider(blockSuiteWorkspace),
createIndexedDBProvider(blockSuiteWorkspace),
createIndexedDBBackgroundProvider(blockSuiteWorkspace),
createIndexedDBDownloadProvider(blockSuiteWorkspace),
environment.isDesktop && createSQLiteProvider(blockSuiteWorkspace),
] as any[]
).filter(v => Boolean(v));

View File

@@ -9,44 +9,76 @@ export type JotaiStore = ReturnType<typeof createStore>;
export type BaseProvider = {
flavour: string;
// if this is true, we will connect the provider on the background
background: boolean;
connect: () => void;
disconnect: () => void;
// cleanup data when workspace is removed
cleanup: () => void;
};
/**
* @description
* If a provider is marked as a background provider,
* we will connect it in the `useEffect` in React.js.
*
* This means that the data might be stale when you use it.
*/
export interface BackgroundProvider extends BaseProvider {
// if this is true,
// we will connect the provider on the background
background: true;
get connected(): boolean;
connect(): void;
disconnect(): void;
callbacks: Set<() => void>;
}
export interface AffineDownloadProvider extends BaseProvider {
/**
* @description
* If a provider is marked as a necessary provider,
* we will connect it once you read the workspace.
*
* This means that the data will be fresh when you use it.
*
* Currently, there is only on necessary provider: `local-indexeddb`.
*/
export interface NecessaryProvider extends Omit<BaseProvider, 'disconnect'> {
// if this is true,
// we will ensure that the provider is connected before you can use it
necessary: true;
sync(): void;
get whenReady(): Promise<void>;
}
export interface AffineDownloadProvider extends BackgroundProvider {
flavour: 'affine-download';
}
export interface BroadCastChannelProvider extends BaseProvider {
/**
* Download the first binary from local indexeddb
*/
export interface BroadCastChannelProvider extends BackgroundProvider {
flavour: 'broadcast-channel';
}
export interface LocalIndexedDBProvider extends BackgroundProvider {
flavour: 'local-indexeddb';
whenSynced: Promise<void>;
/**
* Long polling provider with local indexeddb
*/
export interface LocalIndexedDBBackgroundProvider extends BackgroundProvider {
flavour: 'local-indexeddb-background';
}
export interface SQLiteProvider extends BaseProvider {
export interface SQLiteProvider extends BackgroundProvider {
flavour: 'sqlite';
}
export interface AffineWebSocketProvider extends BaseProvider {
export interface LocalIndexedDBDownloadProvider extends NecessaryProvider {
flavour: 'local-indexeddb';
}
export interface AffineWebSocketProvider extends BackgroundProvider {
flavour: 'affine-websocket';
}
export type Provider =
| LocalIndexedDBProvider
| AffineWebSocketProvider
| BroadCastChannelProvider;
export type Provider = BackgroundProvider | NecessaryProvider;
export interface AffineWorkspace extends RemoteWorkspace {
flavour: WorkspaceFlavour.AFFINE;

View File

@@ -16,6 +16,11 @@ export function cleanupWorkspace(flavour: WorkspaceFlavour) {
const hashMap = new Map<string, Workspace>();
/**
* @internal test only
*/
export const _cleanupBlockSuiteWorkspaceCache = () => hashMap.clear();
export function createEmptyBlockSuiteWorkspace(
id: string,
flavour: WorkspaceFlavour.AFFINE,
@@ -83,3 +88,33 @@ export function createEmptyBlockSuiteWorkspace(
hashMap.set(cacheKey, workspace);
return workspace;
}
export class CallbackSet extends Set<() => void> {
#ready = false;
get ready(): boolean {
return this.#ready;
}
set ready(v: boolean) {
this.#ready = v;
}
add(cb: () => void) {
if (this.ready) {
cb();
return this;
}
if (this.has(cb)) {
return this;
}
return super.add(cb);
}
delete(cb: () => void) {
if (this.has(cb)) {
return super.delete(cb);
}
return false;
}
}