refactor: move local workspace CRUD (#1778)

This commit is contained in:
Himself65
2023-04-02 01:34:57 -05:00
committed by GitHub
parent 5e56728dbc
commit 60324b8967
11 changed files with 224 additions and 153 deletions

View File

@@ -0,0 +1,82 @@
import { nanoid, Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { createJSONStorage } from 'jotai/utils';
import { IndexeddbPersistence } from 'y-indexeddb';
import { z } from 'zod';
import { createLocalProviders } from '../providers';
import type { LocalWorkspace, WorkspaceCRUD } from '../type';
import { WorkspaceFlavour } from '../type';
import { createEmptyBlockSuiteWorkspace } from '../utils';
const getStorage = () => createJSONStorage(() => localStorage);
const kStoreKey = 'affine-local-workspace';
const schema = z.array(z.string());
export const CRUD: WorkspaceCRUD<WorkspaceFlavour.LOCAL> = {
get: async workspaceId => {
const storage = getStorage();
!Array.isArray(storage.getItem(kStoreKey)) &&
storage.setItem(kStoreKey, []);
const data = storage.getItem(kStoreKey) as z.infer<typeof schema>;
const id = data.find(id => id === workspaceId);
if (!id) {
return null;
}
const blockSuiteWorkspace = createEmptyBlockSuiteWorkspace(
id,
(_: string) => undefined
);
const workspace: LocalWorkspace = {
id,
flavour: WorkspaceFlavour.LOCAL,
blockSuiteWorkspace: blockSuiteWorkspace,
providers: [...createLocalProviders(blockSuiteWorkspace)],
};
return workspace;
},
create: async ({ doc }) => {
const storage = getStorage();
!Array.isArray(storage.getItem(kStoreKey)) &&
storage.setItem(kStoreKey, []);
const data = storage.getItem(kStoreKey) as z.infer<typeof schema>;
const binary = BlockSuiteWorkspace.Y.encodeStateAsUpdateV2(doc);
const id = nanoid();
const blockSuiteWorkspace = createEmptyBlockSuiteWorkspace(
id,
(_: string) => undefined
);
BlockSuiteWorkspace.Y.applyUpdateV2(blockSuiteWorkspace.doc, binary);
const persistence = new IndexeddbPersistence(id, blockSuiteWorkspace.doc);
await persistence.whenSynced.then(() => {
persistence.destroy();
});
storage.setItem(kStoreKey, [...data, id]);
console.log('create', id, storage.getItem(kStoreKey));
return id;
},
delete: async workspace => {
const storage = getStorage();
!Array.isArray(storage.getItem(kStoreKey)) &&
storage.setItem(kStoreKey, []);
const data = storage.getItem(kStoreKey) as z.infer<typeof schema>;
const idx = data.findIndex(id => id === workspace.id);
if (idx === -1) {
throw new Error('workspace not found');
}
data.splice(idx, 1);
storage.setItem(kStoreKey, [...data]);
},
list: async () => {
const storage = getStorage();
!Array.isArray(storage.getItem(kStoreKey)) &&
storage.setItem(kStoreKey, []);
return (
await Promise.all(
(storage.getItem(kStoreKey) as z.infer<typeof schema>).map(id =>
CRUD.get(id)
)
)
).filter(item => item !== null) as LocalWorkspace[];
},
};

View File

@@ -0,0 +1,122 @@
import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { assertExists } from '@blocksuite/store';
import type { Awareness } from 'y-protocols/awareness';
import {
applyAwarenessUpdate,
encodeAwarenessUpdate,
} from 'y-protocols/awareness';
import type { BroadCastChannelProvider } from '../../type';
import { localProviderLogger } from '../logger';
import type {
AwarenessChanges,
BroadcastChannelMessageEvent,
TypedBroadcastChannel,
} from './type';
import { getClients } from './type';
export const createBroadCastChannelProvider = (
blockSuiteWorkspace: BlockSuiteWorkspace
): BroadCastChannelProvider => {
const Y = BlockSuiteWorkspace.Y;
const doc = blockSuiteWorkspace.doc;
const awareness = blockSuiteWorkspace.awarenessStore
.awareness as unknown as Awareness;
let broadcastChannel: TypedBroadcastChannel | null = null;
const handleBroadcastChannelMessage = (
event: BroadcastChannelMessageEvent
) => {
const [eventName] = event.data;
switch (eventName) {
case 'doc:diff': {
const [, diff, clientId] = event.data;
const update = Y.encodeStateAsUpdate(doc, diff);
broadcastChannel!.postMessage(['doc:update', update, clientId]);
break;
}
case 'doc:update': {
const [, update, clientId] = event.data;
if (!clientId || clientId === awareness.clientID) {
Y.applyUpdate(doc, update, broadcastChannel);
}
break;
}
case 'awareness:query': {
const [, clientId] = event.data;
const clients = getClients(awareness);
const update = encodeAwarenessUpdate(awareness, clients);
broadcastChannel!.postMessage(['awareness:update', update, clientId]);
break;
}
case 'awareness:update': {
const [, update, clientId] = event.data;
if (!clientId || clientId === awareness.clientID) {
applyAwarenessUpdate(awareness, update, broadcastChannel);
}
break;
}
}
};
const handleDocUpdate = (updateV1: Uint8Array, origin: any) => {
if (origin === broadcastChannel) {
// not self update, ignore
return;
}
broadcastChannel?.postMessage(['doc:update', updateV1]);
};
const handleAwarenessUpdate = (changes: AwarenessChanges, origin: any) => {
if (origin === broadcastChannel) {
return;
}
const changedClients = Object.values(changes).reduce((res, cur) => [
...res,
...cur,
]);
const update = encodeAwarenessUpdate(awareness, changedClients);
broadcastChannel?.postMessage(['awareness:update', update]);
};
return {
flavour: 'broadcast-channel',
background: false,
connect: () => {
assertExists(blockSuiteWorkspace.id);
broadcastChannel = Object.assign(
new BroadcastChannel(blockSuiteWorkspace.id),
{
onmessage: handleBroadcastChannelMessage,
}
);
localProviderLogger.info(
'connect broadcast channel',
blockSuiteWorkspace.id
);
const docDiff = Y.encodeStateVector(doc);
broadcastChannel.postMessage(['doc:diff', docDiff, awareness.clientID]);
const docUpdateV2 = Y.encodeStateAsUpdate(doc);
broadcastChannel.postMessage(['doc:update', docUpdateV2]);
broadcastChannel.postMessage(['awareness:query', awareness.clientID]);
const awarenessUpdate = encodeAwarenessUpdate(awareness, [
awareness.clientID,
]);
broadcastChannel.postMessage(['awareness:update', awarenessUpdate]);
doc.on('update', handleDocUpdate);
awareness.on('update', handleAwarenessUpdate);
},
disconnect: () => {
assertExists(broadcastChannel);
localProviderLogger.info(
'disconnect broadcast channel',
blockSuiteWorkspace.id
);
doc.off('update', handleDocUpdate);
awareness.off('update', handleAwarenessUpdate);
broadcastChannel.close();
},
cleanup: () => {
assertExists(broadcastChannel);
doc.off('update', handleDocUpdate);
awareness.off('update', handleAwarenessUpdate);
broadcastChannel.close();
},
};
};

View File

@@ -0,0 +1,86 @@
import type { Awareness as YAwareness } from 'y-protocols/awareness';
export type ClientId = YAwareness['clientID'];
// eslint-disable-next-line @typescript-eslint/ban-types
export type DefaultClientData = {};
type EventHandler = (...args: any[]) => void;
export type DefaultEvents = {
[eventName: string]: EventHandler;
};
type EventNameWithScope<
Scope extends string,
Type extends string = string
> = `${Scope}:${Type}`;
type DataScope = 'data';
type RoomScope = 'room';
type YDocScope = 'doc';
type AwarenessScope = 'awareness';
type ObservableScope = YDocScope | AwarenessScope;
type ObservableEventName = EventNameWithScope<ObservableScope>;
type ValidEventScope = DataScope | RoomScope | ObservableScope;
type ValidateEvents<
Events extends DefaultEvents & {
[EventName in keyof Events]: EventName extends EventNameWithScope<
infer EventScope
>
? EventScope extends ValidEventScope
? Events[EventName]
: never
: Events[EventName];
}
> = Events;
export type DefaultServerToClientEvents<
ClientData extends DefaultClientData = DefaultClientData
> = ValidateEvents<{
['data:update']: (data: ClientData) => void;
['doc:diff']: (diff: ArrayBuffer) => void;
['doc:update']: (update: ArrayBuffer) => void;
['awareness:update']: (update: ArrayBuffer) => void;
}>;
export type ServerToClientEvents<
ClientData extends DefaultClientData = DefaultClientData
> = DefaultServerToClientEvents<ClientData>;
export type DefaultClientToServerEvents = ValidateEvents<{
['room:close']: () => void;
['doc:diff']: (diff: Uint8Array) => void;
['doc:update']: (update: Uint8Array, callback?: () => void) => void;
['awareness:update']: (update: Uint8Array) => void;
}>;
export type ClientToServerEvents = DefaultClientToServerEvents;
type ClientToServerEventNames = keyof ClientToServerEvents;
export type BroadcastChannelMessageData<
EventName extends ClientToServerEventNames = ClientToServerEventNames
> =
| (EventName extends ObservableEventName
? [eventName: EventName, payload: Uint8Array, clientId?: ClientId]
: never)
| [eventName: `${AwarenessScope}:query`, clientId: ClientId];
export type BroadcastChannelMessageEvent =
MessageEvent<BroadcastChannelMessageData>;
export type AwarenessChanges = Record<
'added' | 'updated' | 'removed',
ClientId[]
>;
export interface TypedBroadcastChannel extends BroadcastChannel {
onmessage: ((event: BroadcastChannelMessageEvent) => void) | null;
postMessage: (message: BroadcastChannelMessageData) => void;
}
export const getClients = (awareness: YAwareness): ClientId[] => [
...awareness.getStates().keys(),
];

View File

@@ -0,0 +1,115 @@
import { config } from '@affine/env';
import { KeckProvider } from '@affine/workspace/affine/keck';
import { getLoginStorage } from '@affine/workspace/affine/login';
import type { Provider } from '@affine/workspace/type';
import type {
AffineWebSocketProvider,
LocalIndexedDBProvider,
} from '@affine/workspace/type';
import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { assertExists } from '@blocksuite/store';
import { IndexeddbPersistence } from 'y-indexeddb';
import { createBroadCastChannelProvider } from './broad-cast-channel';
import { localProviderLogger } from './logger';
const createAffineWebSocketProvider = (
blockSuiteWorkspace: BlockSuiteWorkspace
): AffineWebSocketProvider => {
let webSocketProvider: KeckProvider | null = null;
return {
flavour: 'affine-websocket',
background: false,
cleanup: () => {
assertExists(webSocketProvider);
webSocketProvider.destroy();
webSocketProvider = null;
},
connect: () => {
const wsUrl = `${
window.location.protocol === 'https:' ? 'wss' : 'ws'
}://${window.location.host}/api/sync/`;
webSocketProvider = new KeckProvider(
wsUrl,
blockSuiteWorkspace.id,
blockSuiteWorkspace.doc,
{
params: { token: getLoginStorage()?.token ?? '' },
// @ts-expect-error ignore the type
awareness: blockSuiteWorkspace.awarenessStore.awareness,
// we maintain broadcast channel by ourselves
disableBc: true,
connect: false,
}
);
localProviderLogger.info('connect', webSocketProvider.url);
webSocketProvider.connect();
},
disconnect: () => {
assertExists(webSocketProvider);
localProviderLogger.info('disconnect', webSocketProvider.url);
webSocketProvider.destroy();
webSocketProvider = null;
},
};
};
const createIndexedDBProvider = (
blockSuiteWorkspace: BlockSuiteWorkspace
): LocalIndexedDBProvider => {
let indexeddbProvider: IndexeddbPersistence | null = null;
const callbacks = new Set<() => void>();
return {
flavour: 'local-indexeddb',
callbacks,
// fixme: remove background long polling
background: true,
cleanup: () => {
assertExists(indexeddbProvider);
indexeddbProvider.clearData();
callbacks.clear();
indexeddbProvider = null;
},
connect: () => {
localProviderLogger.info(
'connect indexeddb provider',
blockSuiteWorkspace.id
);
indexeddbProvider = new IndexeddbPersistence(
blockSuiteWorkspace.id,
blockSuiteWorkspace.doc
);
indexeddbProvider.whenSynced.then(() => {
callbacks.forEach(cb => cb());
});
},
disconnect: () => {
assertExists(indexeddbProvider);
localProviderLogger.info(
'disconnect indexeddb provider',
blockSuiteWorkspace.id
);
indexeddbProvider.destroy();
indexeddbProvider = null;
},
};
};
export {
createAffineWebSocketProvider,
createBroadCastChannelProvider,
createIndexedDBProvider,
};
export const createLocalProviders = (
blockSuiteWorkspace: BlockSuiteWorkspace
): Provider[] => {
return (
[
config.enableBroadCastChannelProvider &&
createBroadCastChannelProvider(blockSuiteWorkspace),
config.enableIndexedDBProvider &&
createIndexedDBProvider(blockSuiteWorkspace),
] as any[]
).filter(v => Boolean(v));
};

View File

@@ -0,0 +1,3 @@
import { DebugLogger } from '@affine/debug';
export const localProviderLogger = new DebugLogger('local-provider');