refactor: workspace manager (#5060)

This commit is contained in:
EYHN
2023-12-15 07:20:50 +00:00
parent af15aa06d4
commit fe2851d3e9
217 changed files with 3605 additions and 4244 deletions

View File

@@ -2,12 +2,9 @@
"name": "@affine/workspace",
"private": true,
"exports": {
".": "./src/index.ts",
"./atom": "./src/atom.ts",
"./manager": "./src/manager/index.ts",
"./blob": "./src/blob/index.ts",
"./local/crud": "./src/local/crud.ts",
"./affine/*": "./src/affine/*.ts",
"./providers": "./src/providers/index.ts"
"./affine/*": "./src/affine/*.ts"
},
"peerDependencies": {
"@blocksuite/blocks": "*",
@@ -19,8 +16,7 @@
"@affine/debug": "workspace:*",
"@affine/env": "workspace:*",
"@affine/graphql": "workspace:*",
"@toeverything/hooks": "workspace:*",
"@toeverything/y-indexeddb": "workspace:*",
"@toeverything/infra": "workspace:*",
"async-call-rpc": "^6.3.1",
"idb": "^8.0.0",
"idb-keyval": "^6.2.1",
@@ -34,6 +30,7 @@
"next-auth": "^4.24.5",
"react": "18.2.0",
"react-dom": "18.2.0",
"rxjs": "^7.8.1",
"socket.io-client": "^4.7.2",
"swr": "2.2.4",
"valtio": "^1.11.2",

View File

@@ -1,162 +0,0 @@
import type {
AffineCloudWorkspace,
WorkspaceCRUD,
} from '@affine/env/workspace';
import { WorkspaceFlavour } from '@affine/env/workspace';
import {
createWorkspaceMutation,
deleteWorkspaceMutation,
getWorkspaceQuery,
getWorkspacesQuery,
} from '@affine/graphql';
import { createIndexeddbStorage, Workspace } from '@blocksuite/store';
import { migrateLocalBlobStorage } from '@toeverything/infra/blocksuite';
import { getSession } from 'next-auth/react';
import { proxy } from 'valtio/vanilla';
import { getOrCreateWorkspace } from '../manager';
import { fetcher } from './gql';
const Y = Workspace.Y;
async function deleteLocalBlobStorage(id: string) {
const storage = createIndexeddbStorage(id);
const keys = await storage.crud.list();
for (const key of keys) {
await storage.crud.delete(key);
}
}
// we don't need to persistence the state into local storage
// because if a user clicks create multiple time and nothing happened
// because of the server delay or something, he/she will wait.
// and also the user journey of creating workspace is long.
const createdWorkspaces = proxy<string[]>([]);
export const CRUD: WorkspaceCRUD<WorkspaceFlavour.AFFINE_CLOUD> = {
create: async upstreamWorkspace => {
if (createdWorkspaces.some(id => id === upstreamWorkspace.id)) {
throw new Error('workspace already created');
}
const { createWorkspace } = await fetcher({
query: createWorkspaceMutation,
});
createdWorkspaces.push(upstreamWorkspace.id);
const newBlockSuiteWorkspace = getOrCreateWorkspace(
createWorkspace.id,
WorkspaceFlavour.AFFINE_CLOUD
);
if (environment.isDesktop) {
// this will clone all data from existing db to new db file, including docs and blobs
await window.apis.workspace.clone(
upstreamWorkspace.id,
createWorkspace.id
);
// skip apply updates in memory and we will use providers to sync data from db
} else {
Y.applyUpdate(
newBlockSuiteWorkspace.doc,
Y.encodeStateAsUpdate(upstreamWorkspace.doc)
);
await Promise.all(
[...upstreamWorkspace.doc.subdocs].map(async subdoc => {
subdoc.load();
return subdoc.whenLoaded.then(() => {
newBlockSuiteWorkspace.doc.subdocs.forEach(newSubdoc => {
if (newSubdoc.guid === subdoc.guid) {
Y.applyUpdate(newSubdoc, Y.encodeStateAsUpdate(subdoc));
}
});
});
})
);
migrateLocalBlobStorage(upstreamWorkspace.id, createWorkspace.id)
.then(() => deleteLocalBlobStorage(upstreamWorkspace.id))
.catch(e => {
console.error('error when moving blob storage:', e);
});
// todo(himself65): delete old workspace in the future
}
return createWorkspace.id;
},
delete: async workspace => {
await fetcher({
query: deleteWorkspaceMutation,
variables: {
id: workspace.id,
},
});
},
get: async id => {
if (!environment.isServer && !navigator.onLine) {
// no network
return null;
}
if (
!(await getSession()
.then(() => true)
.catch(() => false))
) {
return null;
}
try {
await fetcher({
query: getWorkspaceQuery,
variables: {
id,
},
});
return {
id,
flavour: WorkspaceFlavour.AFFINE_CLOUD,
blockSuiteWorkspace: getOrCreateWorkspace(
id,
WorkspaceFlavour.AFFINE_CLOUD
),
} satisfies AffineCloudWorkspace;
} catch (e) {
console.error('error when fetching cloud workspace:', e);
return null;
}
},
list: async () => {
if (!environment.isServer && !navigator.onLine) {
// no network
return [];
}
if (
!(await getSession()
.then(() => true)
.catch(() => false))
) {
return [];
}
try {
const { workspaces } = await fetcher({
query: getWorkspacesQuery,
});
const ids = workspaces.map(({ id }) => id);
return ids.map(
id =>
({
id,
flavour: WorkspaceFlavour.AFFINE_CLOUD,
blockSuiteWorkspace: getOrCreateWorkspace(
id,
WorkspaceFlavour.AFFINE_CLOUD
),
}) satisfies AffineCloudWorkspace
);
} catch (e) {
console.error('error when fetching cloud workspaces:', e);
return [];
}
},
};

View File

@@ -1,37 +0,0 @@
import { fetchWithTraceReport } from '@affine/graphql';
const hashMap = new Map<string, CloudDoc>();
type DocPublishMode = 'edgeless' | 'page';
export type CloudDoc = {
arrayBuffer: ArrayBuffer;
publishMode: DocPublishMode;
};
export async function downloadBinaryFromCloud(
rootGuid: string,
pageGuid: string
): Promise<CloudDoc | null> {
const cached = hashMap.get(`${rootGuid}/${pageGuid}`);
if (cached) {
return cached;
}
const response = await fetchWithTraceReport(
runtimeConfig.serverUrlPrefix +
`/api/workspaces/${rootGuid}/docs/${pageGuid}`,
{
priority: 'high',
}
);
if (response.ok) {
const publishMode = (response.headers.get('publish-mode') ||
'page') as DocPublishMode;
const arrayBuffer = await response.arrayBuffer();
hashMap.set(`${rootGuid}/${pageGuid}`, { arrayBuffer, publishMode });
// return both arrayBuffer and publish mode
return { arrayBuffer, publishMode };
}
return null;
}

View File

@@ -8,9 +8,8 @@ import type {
RecursiveMaybeFields,
} from '@affine/graphql';
import { gqlFetcherFactory } from '@affine/graphql';
import { useAsyncCallback } from '@toeverything/hooks/affine-async-hooks';
import type { GraphQLError } from 'graphql';
import { useMemo } from 'react';
import { useCallback, useMemo } from 'react';
import type { Key, SWRConfiguration, SWRResponse } from 'swr';
import useSWR, { useSWRConfig } from 'swr';
import useSWRImutable from 'swr/immutable';
@@ -46,7 +45,7 @@ export const fetcher = gqlFetcherFactory(
* ```
*/
type useQueryFn = <Query extends GraphQLQuery>(
options: QueryOptions<Query>,
options?: QueryOptions<Query>,
config?: Omit<
SWRConfiguration<
QueryResponse<Query>,
@@ -128,11 +127,13 @@ export function useQueryInfinite<Query extends GraphQLQuery>(
const loadingMore = size > 0 && data && !data[size - 1];
// todo: find a generic way to know whether or not there are more items to load
const loadMore = useAsyncCallback(async () => {
const loadMore = useCallback(() => {
if (loadingMore) {
return;
}
await setSize(size => size + 1);
setSize(size => size + 1).catch(err => {
console.error(err);
});
}, [loadingMore, setSize]);
return {
data,

View File

@@ -1,261 +1,57 @@
import { DebugLogger } from '@affine/debug';
import type { WorkspaceAdapter } from '@affine/env/workspace';
import { WorkspaceFlavour } from '@affine/env/workspace';
import { assertEquals, assertExists } from '@blocksuite/global/utils';
import {
currentPageIdAtom,
currentWorkspaceIdAtom,
} from '@toeverything/infra/atom';
import { WorkspaceVersion } from '@toeverything/infra/blocksuite';
import { type Atom, atom } from 'jotai/vanilla';
import { z } from 'zod';
import { atom } from 'jotai';
import { atomWithObservable } from 'jotai/utils';
import { Observable } from 'rxjs';
import { getOrCreateWorkspace } from './manager';
import { type Workspace, workspaceManager, type WorkspaceMetadata } from '.';
const performanceJotaiLogger = new DebugLogger('performance:jotai');
const logger = new DebugLogger('affine:workspace:atom');
const rootWorkspaceMetadataV1Schema = z.object({
id: z.string(),
flavour: z.nativeEnum(WorkspaceFlavour),
});
// readonly atom for workspace manager, currently only one workspace manager is supported
export const workspaceManagerAtom = atom(() => workspaceManager);
const rootWorkspaceMetadataV2Schema = rootWorkspaceMetadataV1Schema.extend({
version: z.nativeEnum(WorkspaceVersion),
});
const rootWorkspaceMetadataArraySchema = z.array(
z.union([rootWorkspaceMetadataV1Schema, rootWorkspaceMetadataV2Schema])
);
export type RootWorkspaceMetadataV2 = z.infer<
typeof rootWorkspaceMetadataV2Schema
>;
export type RootWorkspaceMetadataV1 = z.infer<
typeof rootWorkspaceMetadataV1Schema
>;
export type RootWorkspaceMetadata =
| RootWorkspaceMetadataV1
| RootWorkspaceMetadataV2;
export const workspaceAdaptersAtom = atom<
Record<
WorkspaceFlavour,
Pick<
WorkspaceAdapter<WorkspaceFlavour>,
'CRUD' | 'Events' | 'flavour' | 'loadPriority'
>
>
>(
null as unknown as Record<
WorkspaceFlavour,
Pick<
WorkspaceAdapter<WorkspaceFlavour>,
'CRUD' | 'Events' | 'flavour' | 'loadPriority'
>
>
);
// #region root atoms
// root primitive atom that stores the necessary data for the whole app
// be careful when you use this atom,
// it should be used only in the root component
/**
* root workspaces atom
* this atom stores the metadata of all workspaces,
* which is `id` and `flavor,` that is enough to load the real workspace data
*/
const METADATA_STORAGE_KEY = 'jotai-workspaces';
const rootWorkspacesMetadataPrimitiveAtom = atom<Promise<
RootWorkspaceMetadata[]
> | null>(null);
type Getter = <Value>(atom: Atom<Value>) => Value;
type FetchMetadata = (get: Getter) => Promise<RootWorkspaceMetadata[]>;
/**
* @internal
*/
const fetchMetadata: FetchMetadata = async get => {
performanceJotaiLogger.info('fetch metadata start');
const WorkspaceAdapters = get(workspaceAdaptersAtom);
assertExists(WorkspaceAdapters, 'workspace adapter should be defined');
const metadata: RootWorkspaceMetadata[] = [];
// step 1: try load metadata from localStorage.
//
// we need this step because workspaces have the order.
{
const loadFromLocalStorage = (): RootWorkspaceMetadata[] => {
// don't change this key,
// otherwise it will cause the data loss in the production
const primitiveMetadata = localStorage.getItem(METADATA_STORAGE_KEY);
if (primitiveMetadata) {
try {
const items = JSON.parse(primitiveMetadata) as z.infer<
typeof rootWorkspaceMetadataArraySchema
>;
rootWorkspaceMetadataArraySchema.parse(items);
return [...items];
} catch (e) {
console.error('cannot parse worksapce', e);
}
return [];
}
return [];
};
metadata.push(...loadFromLocalStorage());
}
// step 2: fetch from adapters
{
const Adapters = Object.values(WorkspaceAdapters).sort(
(a, b) => a.loadPriority - b.loadPriority
);
for (const Adapter of Adapters) {
performanceJotaiLogger.info('%s adapter', Adapter.flavour);
const { CRUD, flavour: currentFlavour } = Adapter;
const appAccessFn = Adapter.Events['app:access'];
const canAccess = appAccessFn && !(await appAccessFn());
performanceJotaiLogger.info('%s app:access', Adapter.flavour);
if (canAccess) {
// skip the adapter if the user doesn't have access to it
const removed = metadata.filter(
meta => meta.flavour === currentFlavour
);
removed.forEach(meta => {
metadata.splice(metadata.indexOf(meta), 1);
});
Adapter.Events['service:stop']?.();
continue;
}
try {
const item = await CRUD.list();
performanceJotaiLogger.info('%s CRUD list', Adapter.flavour);
// remove the metadata that is not in the list
// because we treat the workspace adapter as the source of truth
{
const removed = metadata.filter(
meta =>
meta.flavour === currentFlavour &&
!item.some(x => x.id === meta.id)
);
removed.forEach(meta => {
metadata.splice(metadata.indexOf(meta), 1);
});
}
// sort the metadata by the order of the list
if (metadata.length) {
item.sort((a, b) => {
return (
metadata.findIndex(x => x.id === a.id) -
metadata.findIndex(x => x.id === b.id)
);
});
}
metadata.push(
...item.map(x => ({
id: x.id,
flavour: x.flavour,
version: WorkspaceVersion.DatabaseV3,
}))
);
} catch (e) {
console.error('list data error:', e);
}
performanceJotaiLogger.info('%s service:start', Adapter.flavour);
Adapter.Events['service:start']?.();
}
}
const metadataMap = new Map(metadata.map(x => [x.id, x]));
// init workspace data
metadataMap.forEach((meta, id) => {
if (
meta.flavour === WorkspaceFlavour.AFFINE_CLOUD ||
meta.flavour === WorkspaceFlavour.LOCAL
) {
getOrCreateWorkspace(id, meta.flavour);
} else {
throw new Error(`unknown flavour ${meta.flavour}`);
}
});
const result = Array.from(metadataMap.values());
performanceJotaiLogger.info('fetch metadata done', result);
return result;
};
const rootWorkspacesMetadataPromiseAtom = atom<
Promise<RootWorkspaceMetadata[]>
>(async get => {
const primitiveMetadata = get(rootWorkspacesMetadataPrimitiveAtom);
assertEquals(
primitiveMetadata,
null,
'rootWorkspacesMetadataPrimitiveAtom should be null'
);
return fetchMetadata(get);
});
type SetStateAction<Value> = Value | ((prev: Value) => Value);
export const rootWorkspacesMetadataAtom = atom<
Promise<RootWorkspaceMetadata[]>,
[
setStateAction: SetStateAction<RootWorkspaceMetadata[]>,
newWorkspaceId?: string,
],
void
>(
async get => {
const maybeMetadata = get(rootWorkspacesMetadataPrimitiveAtom);
if (maybeMetadata !== null) {
return maybeMetadata;
}
return get(rootWorkspacesMetadataPromiseAtom);
},
async (get, set, action, newWorkspaceId) => {
const metadataPromise = get(rootWorkspacesMetadataPromiseAtom);
const oldWorkspaceId = get(currentWorkspaceIdAtom);
const oldPageId = get(currentPageIdAtom);
// get metadata
set(rootWorkspacesMetadataPrimitiveAtom, async maybeMetadataPromise => {
let metadata: RootWorkspaceMetadata[] =
(await maybeMetadataPromise) ?? (await metadataPromise);
// update metadata
if (typeof action === 'function') {
metadata = action(metadata);
} else {
metadata = action;
}
const metadataMap = new Map(metadata.map(x => [x.id, x]));
metadata = Array.from(metadataMap.values());
// write back to localStorage
rootWorkspaceMetadataArraySchema.parse(metadata);
localStorage.setItem(METADATA_STORAGE_KEY, JSON.stringify(metadata));
// if the current workspace is deleted, reset the current workspace
if (oldWorkspaceId && metadata.some(x => x.id === oldWorkspaceId)) {
set(currentWorkspaceIdAtom, oldWorkspaceId);
set(currentPageIdAtom, oldPageId);
}
if (newWorkspaceId) {
set(currentWorkspaceIdAtom, newWorkspaceId);
}
return metadata;
// workspace metadata list, use rxjs to push updates
export const workspaceListAtom = atomWithObservable<WorkspaceMetadata[]>(
get => {
const workspaceManager = get(workspaceManagerAtom);
return new Observable<WorkspaceMetadata[]>(subscriber => {
subscriber.next(workspaceManager.list.workspaceList);
return workspaceManager.list.onStatusChanged.on(status => {
subscriber.next(status.workspaceList);
}).dispose;
});
},
{
initialValue: [],
}
);
export const refreshRootMetadataAtom = atom(null, (get, set) => {
set(rootWorkspacesMetadataPrimitiveAtom, fetchMetadata(get));
});
// workspace list loading status, if is false, UI can display not found page when workspace id is not in the list.
export const workspaceListLoadingStatusAtom = atomWithObservable<boolean>(
get => {
const workspaceManager = get(workspaceManagerAtom);
return new Observable<boolean>(subscriber => {
subscriber.next(workspaceManager.list.status.loading);
return workspaceManager.list.onStatusChanged.on(status => {
subscriber.next(status.loading);
}).dispose;
});
},
{
initialValue: true,
}
);
//#endregion
// current workspace
export const currentWorkspaceAtom = atom<Workspace | null>(null);
// wait for current workspace, if current workspace is null, it will suspend
export const waitForCurrentWorkspaceAtom = atom(get => {
const currentWorkspace = get(currentWorkspaceAtom);
if (!currentWorkspace) {
// suspended
logger.info('suspended for current workspace');
return new Promise<Workspace>(_ => {});
}
return currentWorkspace;
});

View File

@@ -1,37 +0,0 @@
import { BlobEngine } from './engine';
import {
createAffineCloudBlobStorage,
createIndexeddbBlobStorage,
createSQLiteBlobStorage,
createStaticBlobStorage,
} from './storage';
export * from './engine';
export * from './storage';
export function createLocalBlobStorage(workspaceId: string) {
if (environment.isDesktop) {
return createSQLiteBlobStorage(workspaceId);
} else {
return createIndexeddbBlobStorage(workspaceId);
}
}
export function createLocalBlobEngine(workspaceId: string) {
return new BlobEngine(createLocalBlobStorage(workspaceId), [
createStaticBlobStorage(),
]);
}
export function createAffineCloudBlobEngine(workspaceId: string) {
return new BlobEngine(createLocalBlobStorage(workspaceId), [
createStaticBlobStorage(),
createAffineCloudBlobStorage(workspaceId),
]);
}
export function createAffinePublicBlobEngine(workspaceId: string) {
return new BlobEngine(createAffineCloudBlobStorage(workspaceId), [
createStaticBlobStorage(),
]);
}

View File

@@ -1,4 +0,0 @@
export * from './affine-cloud';
export * from './indexeddb';
export * from './sqlite';
export * from './static';

View File

@@ -2,6 +2,3 @@ export interface AwarenessProvider {
connect(): void;
disconnect(): void;
}
export * from './affine';
export * from './broadcast-channel';

View File

@@ -3,12 +3,51 @@ import { difference } from 'lodash-es';
const logger = new DebugLogger('affine:blob-engine');
/**
* # BlobEngine
*
* sync blobs between storages in background.
*
* all operations priority use local, then use remote.
*/
export class BlobEngine {
private abort: AbortController | null = null;
constructor(
private readonly local: BlobStorage,
private readonly remotes: BlobStorage[]
) {}
start() {
if (this.abort) {
return;
}
this.abort = new AbortController();
const abortSignal = this.abort.signal;
const sync = () => {
if (abortSignal.aborted) {
return;
}
this.sync()
.catch(error => {
logger.error('sync blob error', error);
})
.finally(() => {
// sync every 1 minute
setTimeout(sync, 60000);
});
};
sync();
}
stop() {
this.abort?.abort();
this.abort = null;
}
get storages() {
return [this.local, ...this.remotes];
}
@@ -19,17 +58,18 @@ export class BlobEngine {
}
logger.debug('start syncing blob...');
for (const remote of this.remotes) {
let localList;
let remoteList;
try {
localList = await this.local.list();
remoteList = await remote.list();
} catch (err) {
logger.error(`error when sync`, err);
continue;
}
let localList: string[] = [];
let remoteList: string[] = [];
if (!remote.readonly) {
try {
localList = await this.local.list();
remoteList = await remote.list();
} catch (err) {
logger.error(`error when sync`, err);
continue;
}
const needUpload = difference(localList, remoteList);
for (const key of needUpload) {
try {
@@ -74,7 +114,7 @@ export class BlobEngine {
return data;
}
}
return undefined;
return null;
}
async set(key: string, value: Blob) {
@@ -107,6 +147,8 @@ export class BlobEngine {
.catch(() => {
// Promise.allSettled never reject
});
return key;
}
async delete(_key: string) {
@@ -132,8 +174,29 @@ export class BlobEngine {
export interface BlobStorage {
name: string;
readonly: boolean;
get: (key: string) => Promise<Blob | undefined>;
set: (key: string, value: Blob) => Promise<void>;
get: (key: string) => Promise<Blob | null>;
set: (key: string, value: Blob) => Promise<string>;
delete: (key: string) => Promise<void>;
list: () => Promise<string[]>;
}
export function createMemoryBlobStorage() {
const map = new Map<string, Blob>();
return {
name: 'memory',
readonly: false,
async get(key: string) {
return map.get(key) ?? null;
},
async set(key: string, value: Blob) {
map.set(key, value);
return key;
},
async delete(key: string) {
map.delete(key);
},
async list() {
return Array.from(map.keys());
},
} satisfies BlobStorage;
}

View File

@@ -0,0 +1,74 @@
import { Slot } from '@blocksuite/global/utils';
import { throwIfAborted } from '../utils/throw-if-aborted';
import type { AwarenessProvider } from './awareness';
import type { BlobEngine } from './blob';
import type { SyncEngine, SyncEngineStatus } from './sync';
export interface WorkspaceEngineStatus {
sync: SyncEngineStatus;
}
/**
* # WorkspaceEngine
*
* sync ydoc, blob, awareness together
*/
export class WorkspaceEngine {
_status: WorkspaceEngineStatus;
onStatusChange = new Slot<WorkspaceEngineStatus>();
get status() {
return this._status;
}
set status(status: WorkspaceEngineStatus) {
this._status = status;
this.onStatusChange.emit(status);
}
constructor(
public blob: BlobEngine,
public sync: SyncEngine,
public awareness: AwarenessProvider[]
) {
this._status = {
sync: sync.status,
};
sync.onStatusChange.on(status => {
this.status = {
sync: status,
};
});
}
start() {
this.sync.start();
for (const awareness of this.awareness) {
awareness.connect();
}
this.blob.start();
}
canGracefulStop() {
return this.sync.canGracefulStop();
}
async waitForGracefulStop(abort?: AbortSignal) {
await this.sync.waitForGracefulStop(abort);
throwIfAborted(abort);
this.forceStop();
}
forceStop() {
this.sync.forceStop();
for (const awareness of this.awareness) {
awareness.disconnect();
}
this.blob.stop();
}
}
export * from './awareness';
export * from './blob';
export * from './sync';

View File

@@ -7,7 +7,7 @@ import { Schema, Workspace } from '@blocksuite/store';
import { beforeEach, describe, expect, test, vi } from 'vitest';
import { Doc } from 'yjs';
import { createIndexedDBStorage } from '../../storage';
import { createIndexedDBStorage } from '../../../impl/local/sync-indexeddb';
import { SyncEngine, SyncEngineStep, SyncPeerStep } from '../';
import { createTestStorage } from './test-storage';
@@ -50,7 +50,7 @@ describe('SyncEngine', () => {
const frameId = page.addBlock('affine:note', {}, pageBlockId);
page.addBlock('affine:paragraph', {}, frameId);
await syncEngine.waitForSynced();
syncEngine.stop();
syncEngine.forceStop();
prev = workspace.doc.toJSON();
}
@@ -70,7 +70,7 @@ describe('SyncEngine', () => {
expect(workspace.doc.toJSON()).toEqual({
...prev,
});
syncEngine.stop();
syncEngine.forceStop();
}
{
@@ -89,7 +89,7 @@ describe('SyncEngine', () => {
expect(workspace.doc.toJSON()).toEqual({
...prev,
});
syncEngine.stop();
syncEngine.forceStop();
}
{
@@ -108,7 +108,7 @@ describe('SyncEngine', () => {
expect(workspace.doc.toJSON()).toEqual({
...prev,
});
syncEngine.stop();
syncEngine.forceStop();
}
});

View File

@@ -4,7 +4,7 @@ import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import { Schema, Workspace } from '@blocksuite/store';
import { beforeEach, describe, expect, test, vi } from 'vitest';
import { createIndexedDBStorage } from '../../storage';
import { createIndexedDBStorage } from '../../../impl/local/sync-indexeddb';
import { SyncPeer, SyncPeerStep } from '../';
const schema = new Schema();

View File

@@ -1,6 +1,6 @@
import type { Storage } from '../../storage';
import type { SyncStorage } from '..';
export function createTestStorage(origin: Storage) {
export function createTestStorage(origin: SyncStorage) {
const controler = {
pausedPull: Promise.resolve(),
resumePull: () => {},

View File

@@ -0,0 +1,15 @@
export enum SyncEngineStep {
Stopped = 0,
Syncing = 1,
Synced = 2,
}
export enum SyncPeerStep {
Stopped = 0,
Retrying = 1,
LoadingRootDoc = 2,
LoadingSubDoc = 3,
Loaded = 4.5,
Syncing = 5,
Synced = 6,
}

View File

@@ -2,10 +2,11 @@ import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import type { Storage } from '../storage';
import { SharedPriorityTarget } from '../utils/async-queue';
import { MANUALLY_STOP, SyncEngineStep } from './consts';
import { SyncPeer, type SyncPeerStatus, SyncPeerStep } from './peer';
import { SharedPriorityTarget } from '../../utils/async-queue';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';
import { SyncEngineStep, SyncPeerStep } from './consts';
import { SyncPeer, type SyncPeerStatus } from './peer';
import type { SyncStorage } from './storage';
export interface SyncEngineStatus {
step: SyncEngineStep;
@@ -52,7 +53,7 @@ export class SyncEngine {
private _status: SyncEngineStatus;
onStatusChange = new Slot<SyncEngineStatus>();
private set status(s: SyncEngineStatus) {
this.logger.info('status change', SyncEngineStep[s.step]);
this.logger.debug('status change', s);
this._status = s;
this.onStatusChange.emit(s);
}
@@ -67,8 +68,8 @@ export class SyncEngine {
constructor(
private readonly rootDoc: Doc,
private readonly local: Storage,
private readonly remotes: Storage[]
private readonly local: SyncStorage,
private readonly remotes: SyncStorage[]
) {
this._status = {
step: SyncEngineStep.Stopped,
@@ -80,7 +81,7 @@ export class SyncEngine {
start() {
if (this.status.step !== SyncEngineStep.Stopped) {
this.stop();
this.forceStop();
}
this.abort = new AbortController();
@@ -90,7 +91,33 @@ export class SyncEngine {
});
}
stop() {
canGracefulStop() {
return !!this.status.local && this.status.local.pendingPushUpdates > 0;
}
async waitForGracefulStop(abort?: AbortSignal) {
await Promise.race([
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
new Promise<void>(resolve => {
this.onStatusChange.on(() => {
if (this.canGracefulStop()) {
resolve();
}
});
}),
]);
throwIfAborted(abort);
this.forceStop();
}
forceStop() {
this.abort.abort(MANUALLY_STOP);
this._status = {
step: SyncEngineStep.Stopped,
@@ -157,7 +184,7 @@ export class SyncEngine {
});
});
} catch (error) {
if (error === MANUALLY_STOP) {
if (error === MANUALLY_STOP || signal.aborted) {
return;
}
throw error;

View File

@@ -17,3 +17,4 @@
export * from './consts';
export * from './engine';
export * from './peer';
export * from './storage';

View File

@@ -4,20 +4,14 @@ import { isEqual } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs';
import { mergeUpdates, type Storage } from '../storage';
import { PriorityAsyncQueue, SharedPriorityTarget } from '../utils/async-queue';
import { throwIfAborted } from '../utils/throw-if-aborted';
import { MANUALLY_STOP } from './consts';
export enum SyncPeerStep {
Stopped = 0,
Retrying = 1,
LoadingRootDoc = 2,
LoadingSubDoc = 3,
Loaded = 4.5,
Syncing = 5,
Synced = 6,
}
import {
PriorityAsyncQueue,
SharedPriorityTarget,
} from '../../utils/async-queue';
import { mergeUpdates } from '../../utils/merge-updates';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';
import { SyncPeerStep } from './consts';
import type { SyncStorage } from './storage';
export interface SyncPeerStatus {
step: SyncPeerStep;
@@ -62,7 +56,7 @@ export class SyncPeer {
pendingPushUpdates: 0,
};
onStatusChange = new Slot<SyncPeerStatus>();
abort = new AbortController();
readonly abort = new AbortController();
get name() {
return this.storage.name;
}
@@ -70,7 +64,7 @@ export class SyncPeer {
constructor(
private readonly rootDoc: Doc,
private readonly storage: Storage,
private readonly storage: SyncStorage,
private readonly priorityTarget = new SharedPriorityTarget()
) {
this.logger.debug('peer start');
@@ -111,7 +105,7 @@ export class SyncPeer {
try {
await this.sync(abort);
} catch (err) {
if (err === MANUALLY_STOP) {
if (err === MANUALLY_STOP || abort.aborted) {
return;
}
@@ -141,7 +135,7 @@ export class SyncPeer {
}),
]);
} catch (err) {
if (err === MANUALLY_STOP) {
if (err === MANUALLY_STOP || abort.aborted) {
return;
}

View File

@@ -1,4 +1,4 @@
export interface Storage {
export interface SyncStorage {
/**
* for debug
*/
@@ -23,7 +23,3 @@ export interface Storage {
disconnect: (reason: string) => void
): Promise<() => void>;
}
export * from './affine';
export * from './indexeddb';
export * from './sqlite';

View File

@@ -0,0 +1,13 @@
import type { WorkspaceMetadata } from './metadata';
import type { Workspace } from './workspace';
export interface WorkspaceFactory {
name: string;
openWorkspace(metadata: WorkspaceMetadata): Workspace;
/**
* get blob without open workspace
*/
getWorkspaceBlob(id: string, blobKey: string): Promise<Blob | null>;
}

View File

@@ -0,0 +1,6 @@
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import { Schema } from '@blocksuite/store';
export const globalBlockSuiteSchema = new Schema();
globalBlockSuiteSchema.register(AffineSchemas).register(__unstableSchemas);

View File

@@ -6,18 +6,15 @@ import {
removeAwarenessStates,
} from 'y-protocols/awareness';
import type { AwarenessProvider } from '../../engine/awareness';
import { getIoManager } from '../../utils/affine-io';
import { base64ToUint8Array, uint8ArrayToBase64 } from '../../utils/base64';
import type { AwarenessProvider } from '..';
const logger = new DebugLogger('affine:awareness:socketio');
export type AwarenessChanges = Record<
'added' | 'updated' | 'removed',
number[]
>;
type AwarenessChanges = Record<'added' | 'updated' | 'removed', number[]>;
export function createAffineAwarenessProvider(
export function createCloudAwarenessProvider(
workspaceId: string,
awareness: Awareness
): AwarenessProvider {
@@ -87,11 +84,14 @@ export function createAffineAwarenessProvider(
window.addEventListener('beforeunload', windowBeforeUnloadHandler);
socket.emit('awareness-init', workspaceId);
socket.connect();
socket.emit('client-handshake-awareness', workspaceId);
socket.emit('awareness-init', workspaceId);
},
disconnect: () => {
awareness.off('update', awarenessUpdate);
socket.emit('client-leave-awareness', workspaceId);
socket.off('server-awareness-broadcast', awarenessBroadcast);
socket.off('new-client-awareness-init', newClientAwarenessInitHandler);
window.removeEventListener('unload', windowBeforeUnloadHandler);

View File

@@ -5,10 +5,10 @@ import {
listBlobsQuery,
setBlobMutation,
} from '@affine/graphql';
import { fetcher } from '@affine/workspace/affine/gql';
import type { BlobStorage } from '../engine';
import { bufferToBlob } from '../util';
import { fetcher } from '../../affine/gql';
import type { BlobStorage } from '../../engine/blob';
import { bufferToBlob } from '../../utils/buffer-to-blob';
export const createAffineCloudBlobStorage = (
workspaceId: string
@@ -25,7 +25,7 @@ export const createAffineCloudBlobStorage = (
async res => {
if (!res.ok) {
// status not in the range 200-299
return undefined;
return null;
}
return bufferToBlob(await res.arrayBuffer());
}
@@ -54,6 +54,7 @@ export const createAffineCloudBlobStorage = (
},
});
console.assert(result.setBlob === key, 'Blob hash mismatch');
return result.setBlob;
},
list: async () => {
const result = await fetcher({

View File

@@ -0,0 +1,2 @@
export const CLOUD_WORKSPACE_CHANGED_BROADCAST_CHANNEL_KEY =
'affine-cloud-workspace-changed';

View File

@@ -0,0 +1,6 @@
export * from './awareness';
export * from './blob';
export * from './consts';
export * from './list';
export * from './sync';
export * from './workspace-factory';

View File

@@ -0,0 +1,155 @@
import { WorkspaceFlavour } from '@affine/env/workspace';
import {
createWorkspaceMutation,
deleteWorkspaceMutation,
getWorkspacesQuery,
} from '@affine/graphql';
import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { difference } from 'lodash-es';
import { nanoid } from 'nanoid';
import { applyUpdate, encodeStateAsUpdate } from 'yjs';
import { fetcher } from '../../affine/gql';
import { globalBlockSuiteSchema } from '../../global-schema';
import type { WorkspaceListProvider } from '../../list';
import { createLocalBlobStorage } from '../local/blob';
import { createLocalStorage } from '../local/sync';
import { CLOUD_WORKSPACE_CHANGED_BROADCAST_CHANNEL_KEY } from './consts';
import { createAffineStaticStorage } from './sync';
async function getCloudWorkspaceList() {
try {
const { workspaces } = await fetcher({
query: getWorkspacesQuery,
});
const ids = workspaces.map(({ id }) => id);
return ids.map(id => ({
id,
flavour: WorkspaceFlavour.AFFINE_CLOUD,
}));
} catch (err) {
if (err instanceof Array && err[0]?.message === 'Forbidden resource') {
// user not logged in
return [];
}
throw err;
}
}
export function createCloudWorkspaceListProvider(): WorkspaceListProvider {
const notifyChannel = new BroadcastChannel(
CLOUD_WORKSPACE_CHANGED_BROADCAST_CHANNEL_KEY
);
return {
name: WorkspaceFlavour.AFFINE_CLOUD,
async getList() {
return getCloudWorkspaceList();
},
async create(initial) {
const tempId = nanoid();
const workspace = new BlockSuiteWorkspace({
id: tempId,
idGenerator: () => nanoid(),
schema: globalBlockSuiteSchema,
});
// create workspace on cloud, get workspace id
const {
createWorkspace: { id: workspaceId },
} = await fetcher({
query: createWorkspaceMutation,
});
// save the initial state to local storage, then sync to cloud
const blobStorage = createLocalBlobStorage(workspaceId);
const syncStorage = createLocalStorage(workspaceId);
// apply initial state
await initial(workspace, blobStorage);
// save workspace to local storage, should be vary fast
await syncStorage.push(workspaceId, encodeStateAsUpdate(workspace.doc));
for (const subdocs of workspace.doc.getSubdocs()) {
await syncStorage.push(subdocs.guid, encodeStateAsUpdate(subdocs));
}
// notify all browser tabs, so they can update their workspace list
notifyChannel.postMessage(null);
return workspaceId;
},
async delete(id) {
await fetcher({
query: deleteWorkspaceMutation,
variables: {
id,
},
});
// notify all browser tabs, so they can update their workspace list
notifyChannel.postMessage(null);
},
subscribe(callback) {
let lastWorkspaceIDs: string[] = [];
function scan() {
(async () => {
const allWorkspaceIDs = (await getCloudWorkspaceList()).map(
workspace => workspace.id
);
const added = difference(allWorkspaceIDs, lastWorkspaceIDs);
const deleted = difference(lastWorkspaceIDs, allWorkspaceIDs);
lastWorkspaceIDs = allWorkspaceIDs;
callback({
added: added.map(id => ({
id,
flavour: WorkspaceFlavour.AFFINE_CLOUD,
})),
deleted: deleted.map(id => ({
id,
flavour: WorkspaceFlavour.AFFINE_CLOUD,
})),
});
})().catch(err => {
console.error(err);
});
}
scan();
// rescan if other tabs notify us
notifyChannel.addEventListener('message', scan);
return () => {
notifyChannel.removeEventListener('message', scan);
};
},
async getInformation(id) {
// get information from both cloud and local storage
// we use affine 'static' storage here, which use http protocol, no need to websocket.
const cloudStorage = createAffineStaticStorage(id);
const localStorage = createLocalStorage(id);
// download root doc
const localData = await localStorage.pull(id, new Uint8Array([]));
const cloudData = await cloudStorage.pull(id, new Uint8Array([]));
if (!cloudData && !localData) {
return;
}
const bs = new BlockSuiteWorkspace({
id,
schema: globalBlockSuiteSchema,
});
if (localData) applyUpdate(bs.doc, localData.data);
if (cloudData) applyUpdate(bs.doc, cloudData.data);
return {
name: bs.meta.name,
avatar: bs.meta.avatar,
};
},
};
}

View File

@@ -1,15 +1,16 @@
import { DebugLogger } from '@affine/debug';
import { fetchWithTraceReport } from '@affine/graphql';
import { getIoManager } from '../../utils/affine-io';
import { base64ToUint8Array, uint8ArrayToBase64 } from '../../utils/base64';
import type { Storage } from '..';
import type { SyncStorage } from '../../../engine/sync';
import { getIoManager } from '../../../utils/affine-io';
import { base64ToUint8Array, uint8ArrayToBase64 } from '../../../utils/base64';
import { MultipleBatchSyncSender } from './batch-sync-sender';
const logger = new DebugLogger('affine:storage:socketio');
export function createAffineStorage(
workspaceId: string
): Storage & { disconnect: () => void } {
): SyncStorage & { disconnect: () => void } {
logger.debug('createAffineStorage', workspaceId);
const socket = getIoManager().socket('/');
@@ -53,7 +54,7 @@ export function createAffineStorage(
// TODO: handle error
socket.on('connect', () => {
socket.emit(
'client-handshake',
'client-handshake-sync',
workspaceId,
(response: { error?: any }) => {
if (!response.error) {
@@ -66,7 +67,7 @@ export function createAffineStorage(
socket.connect();
return {
name: 'socketio',
name: 'affine-cloud',
async pull(docId, state) {
const stateVector = state ? await uint8ArrayToBase64(state) : undefined;
@@ -125,7 +126,7 @@ export function createAffineStorage(
async subscribe(cb, disconnect) {
const response: { error?: any } = await socket
.timeout(10000)
.emitWithAck('client-handshake', workspaceId);
.emitWithAck('client-handshake-sync', workspaceId);
if (response.error) {
throw new Error('client-handshake error, ' + response.error);
@@ -155,8 +156,38 @@ export function createAffineStorage(
},
disconnect() {
syncSender.stop();
socket.emit('client-leave', workspaceId);
socket.emit('client-leave-sync', workspaceId);
socket.disconnect();
},
};
}
export function createAffineStaticStorage(workspaceId: string): SyncStorage {
logger.debug('createAffineStaticStorage', workspaceId);
return {
name: 'affine-cloud-static',
async pull(docId) {
const response = await fetchWithTraceReport(
runtimeConfig.serverUrlPrefix +
`/api/workspaces/${workspaceId}/docs/${docId}`,
{
priority: 'high',
}
);
if (response.ok) {
const arrayBuffer = await response.arrayBuffer();
return { data: new Uint8Array(arrayBuffer) };
}
return null;
},
async push() {
throw new Error('Not implemented');
},
async subscribe() {
throw new Error('Not implemented');
},
};
}

View File

@@ -0,0 +1,77 @@
import { setupEditorFlags } from '@affine/env/global';
import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { nanoid } from 'nanoid';
import { BlobEngine, SyncEngine, WorkspaceEngine } from '../../engine';
import type { WorkspaceFactory } from '../../factory';
import { globalBlockSuiteSchema } from '../../global-schema';
import { Workspace } from '../../workspace';
import { createBroadcastChannelAwarenessProvider } from '../local/awareness';
import { createLocalBlobStorage } from '../local/blob';
import { createStaticBlobStorage } from '../local/blob-static';
import { createLocalStorage } from '../local/sync';
import { createCloudAwarenessProvider } from './awareness';
import { createAffineCloudBlobStorage } from './blob';
import { createAffineStorage } from './sync';
export const cloudWorkspaceFactory: WorkspaceFactory = {
name: 'affine-cloud',
openWorkspace(metadata) {
const blobEngine = new BlobEngine(createLocalBlobStorage(metadata.id), [
createAffineCloudBlobStorage(metadata.id),
createStaticBlobStorage(),
]);
// create blocksuite workspace
const bs = new BlockSuiteWorkspace({
id: metadata.id,
blobStorages: [
() => ({
crud: blobEngine,
}),
],
idGenerator: () => nanoid(),
schema: globalBlockSuiteSchema,
});
const affineStorage = createAffineStorage(metadata.id);
const syncEngine = new SyncEngine(bs.doc, createLocalStorage(metadata.id), [
affineStorage,
]);
const awarenessProviders = [
createBroadcastChannelAwarenessProvider(
metadata.id,
bs.awarenessStore.awareness
),
createCloudAwarenessProvider(metadata.id, bs.awarenessStore.awareness),
];
const engine = new WorkspaceEngine(
blobEngine,
syncEngine,
awarenessProviders
);
setupEditorFlags(bs);
const workspace = new Workspace(metadata, engine, bs);
workspace.onStop.once(() => {
// affine sync storage need manually disconnect
affineStorage.disconnect();
});
return workspace;
},
async getWorkspaceBlob(id: string, blobKey: string): Promise<Blob | null> {
// try to get blob from local storage first
const localBlobStorage = createLocalBlobStorage(id);
const localBlob = await localBlobStorage.get(blobKey);
if (localBlob) {
return localBlob;
}
const blobStorage = createAffineCloudBlobStorage(id);
return await blobStorage.get(blobKey);
},
};

View File

@@ -0,0 +1,2 @@
export * from './cloud';
export * from './local';

View File

@@ -4,8 +4,9 @@ import {
encodeAwarenessUpdate,
} from 'y-protocols/awareness.js';
import type { AwarenessProvider } from '..';
import type { AwarenessChanges } from '../affine';
import type { AwarenessProvider } from '../../engine/awareness';
type AwarenessChanges = Record<'added' | 'updated' | 'removed', number[]>;
type ChannelMessage =
| { type: 'connect' }

View File

@@ -1,7 +1,7 @@
import { createStore, del, get, keys, set } from 'idb-keyval';
import type { BlobStorage } from '../engine';
import { bufferToBlob } from '../util';
import type { BlobStorage } from '../../engine/blob';
import { bufferToBlob } from '../../utils/buffer-to-blob';
export const createIndexeddbBlobStorage = (
workspaceId: string
@@ -16,11 +16,12 @@ export const createIndexeddbBlobStorage = (
if (res) {
return bufferToBlob(res);
}
return undefined;
return null;
},
set: async (key: string, value: Blob) => {
await set(key, await value.arrayBuffer(), db);
await set(key, value.type, mimeTypeDb);
return key;
},
delete: async (key: string) => {
await del(key, db);

View File

@@ -1,7 +1,7 @@
import { assertExists } from '@blocksuite/global/utils';
import type { BlobStorage } from '../engine';
import { bufferToBlob } from '../util';
import type { BlobStorage } from '../../engine/blob';
import { bufferToBlob } from '../../utils/buffer-to-blob';
export const createSQLiteBlobStorage = (workspaceId: string): BlobStorage => {
const apis = window.apis;
@@ -14,7 +14,7 @@ export const createSQLiteBlobStorage = (workspaceId: string): BlobStorage => {
if (buffer) {
return bufferToBlob(buffer);
}
return undefined;
return null;
},
set: async (key: string, value: Blob) => {
await apis.db.addBlob(
@@ -22,6 +22,7 @@ export const createSQLiteBlobStorage = (workspaceId: string): BlobStorage => {
key,
new Uint8Array(await value.arrayBuffer())
);
return key;
},
delete: async (key: string) => {
return apis.db.deleteBlob(workspaceId, key);

View File

@@ -1,4 +1,4 @@
import type { BlobStorage } from '../engine';
import type { BlobStorage } from '../../engine/blob';
export const predefinedStaticFiles = [
'029uztLz2CzJezK7UUhrbGiWUdZ0J7NVs_qR6RDsvb8=',
@@ -45,7 +45,7 @@ export const createStaticBlobStorage = (): BlobStorage => {
predefinedStaticFiles.includes(key) || key.startsWith('/static/');
if (!isStaticResource) {
return undefined;
return null;
}
const path = key.startsWith('/static/') ? key : `/static/${key}`;
@@ -55,10 +55,11 @@ export const createStaticBlobStorage = (): BlobStorage => {
return await response.blob();
}
return undefined;
return null;
},
set: async () => {
set: async key => {
// ignore
return key;
},
delete: async () => {
// ignore

View File

@@ -0,0 +1,10 @@
import { createIndexeddbBlobStorage } from './blob-indexeddb';
import { createSQLiteBlobStorage } from './blob-sqlite';
export function createLocalBlobStorage(workspaceId: string) {
if (environment.isDesktop) {
return createSQLiteBlobStorage(workspaceId);
} else {
return createIndexeddbBlobStorage(workspaceId);
}
}

View File

@@ -0,0 +1,3 @@
export const LOCAL_WORKSPACE_LOCAL_STORAGE_KEY = 'affine-local-workspace';
export const LOCAL_WORKSPACE_CREATED_BROADCAST_CHANNEL_KEY =
'affine-local-workspace-created';

View File

@@ -0,0 +1,11 @@
export * from './awareness';
export * from './blob';
export * from './blob-indexeddb';
export * from './blob-sqlite';
export * from './blob-static';
export * from './consts';
export * from './list';
export * from './sync';
export * from './sync-indexeddb';
export * from './sync-sqlite';
export * from './workspace-factory';

View File

@@ -0,0 +1,129 @@
import { WorkspaceFlavour } from '@affine/env/workspace';
import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { difference } from 'lodash-es';
import { nanoid } from 'nanoid';
import { applyUpdate, encodeStateAsUpdate } from 'yjs';
import { globalBlockSuiteSchema } from '../../global-schema';
import type { WorkspaceListProvider } from '../../list';
import { createLocalBlobStorage } from './blob';
import {
LOCAL_WORKSPACE_CREATED_BROADCAST_CHANNEL_KEY,
LOCAL_WORKSPACE_LOCAL_STORAGE_KEY,
} from './consts';
import { createLocalStorage } from './sync';
export function createLocalWorkspaceListProvider(): WorkspaceListProvider {
const notifyChannel = new BroadcastChannel(
LOCAL_WORKSPACE_CREATED_BROADCAST_CHANNEL_KEY
);
return {
name: WorkspaceFlavour.LOCAL,
getList() {
return Promise.resolve(
JSON.parse(
localStorage.getItem(LOCAL_WORKSPACE_LOCAL_STORAGE_KEY) ?? '[]'
).map((id: string) => ({ id, flavour: WorkspaceFlavour.LOCAL }))
);
},
subscribe(callback) {
let lastWorkspaceIDs: string[] = [];
function scan() {
const allWorkspaceIDs: string[] = JSON.parse(
localStorage.getItem(LOCAL_WORKSPACE_LOCAL_STORAGE_KEY) ?? '[]'
);
const added = difference(allWorkspaceIDs, lastWorkspaceIDs);
const deleted = difference(lastWorkspaceIDs, allWorkspaceIDs);
lastWorkspaceIDs = allWorkspaceIDs;
callback({
added: added.map(id => ({ id, flavour: WorkspaceFlavour.LOCAL })),
deleted: deleted.map(id => ({ id, flavour: WorkspaceFlavour.LOCAL })),
});
}
scan();
// rescan if other tabs notify us
notifyChannel.addEventListener('message', scan);
return () => {
notifyChannel.removeEventListener('message', scan);
};
},
async create(initial) {
const id = nanoid();
const blobStorage = createLocalBlobStorage(id);
const syncStorage = createLocalStorage(id);
const workspace = new BlockSuiteWorkspace({
id: id,
idGenerator: () => nanoid(),
schema: globalBlockSuiteSchema,
});
// apply initial state
await initial(workspace, blobStorage);
// save workspace to local storage
await syncStorage.push(id, encodeStateAsUpdate(workspace.doc));
for (const subdocs of workspace.doc.getSubdocs()) {
await syncStorage.push(subdocs.guid, encodeStateAsUpdate(subdocs));
}
// save workspace id to local storage
const allWorkspaceIDs: string[] = JSON.parse(
localStorage.getItem(LOCAL_WORKSPACE_LOCAL_STORAGE_KEY) ?? '[]'
);
allWorkspaceIDs.push(id);
localStorage.setItem(
LOCAL_WORKSPACE_LOCAL_STORAGE_KEY,
JSON.stringify(allWorkspaceIDs)
);
// notify all browser tabs, so they can update their workspace list
notifyChannel.postMessage(id);
return id;
},
async delete(workspaceId) {
const allWorkspaceIDs: string[] = JSON.parse(
localStorage.getItem(LOCAL_WORKSPACE_LOCAL_STORAGE_KEY) ?? '[]'
);
localStorage.setItem(
LOCAL_WORKSPACE_LOCAL_STORAGE_KEY,
JSON.stringify(allWorkspaceIDs.filter(x => x !== workspaceId))
);
if (window.apis && environment.isDesktop) {
await window.apis.workspace.delete(workspaceId);
}
// notify all browser tabs, so they can update their workspace list
notifyChannel.postMessage(workspaceId);
},
async getInformation(id) {
// get information from root doc
const storage = createLocalStorage(id);
const data = await storage.pull(id, new Uint8Array([]));
if (!data) {
return;
}
const bs = new BlockSuiteWorkspace({
id,
schema: globalBlockSuiteSchema,
});
applyUpdate(bs.doc, data.data);
return {
name: bs.meta.name,
avatar: bs.meta.avatar,
};
},
};
}

View File

@@ -1,33 +1,12 @@
import { type DBSchema, type IDBPDatabase, openDB } from 'idb';
import {
applyUpdate,
diffUpdate,
Doc,
encodeStateAsUpdate,
encodeStateVectorFromUpdate,
} from 'yjs';
import { diffUpdate, encodeStateVectorFromUpdate } from 'yjs';
import type { Storage } from '..';
import type { SyncStorage } from '../../engine/sync';
import { mergeUpdates } from '../../utils/merge-updates';
export const dbVersion = 1;
export const DEFAULT_DB_NAME = 'affine-local';
export function mergeUpdates(updates: Uint8Array[]) {
if (updates.length === 0) {
return new Uint8Array();
}
if (updates.length === 1) {
return updates[0];
}
const doc = new Doc();
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update);
});
});
return encodeStateAsUpdate(doc);
}
type UpdateMessage = {
timestamp: number;
update: Uint8Array;
@@ -63,7 +42,7 @@ export function createIndexedDBStorage(
workspaceId: string,
dbName = DEFAULT_DB_NAME,
mergeCount = 1
): Storage {
): SyncStorage {
let dbPromise: Promise<IDBPDatabase<BlockSuiteBinaryDB>> | null = null;
const getDb = async () => {
if (dbPromise === null) {
@@ -93,7 +72,7 @@ export function createIndexedDBStorage(
const { updates } = data;
const update = mergeUpdates(updates.map(({ update }) => update));
const diff = state ? diffUpdate(update, state) : update;
const diff = state.length ? diffUpdate(update, state) : update;
return { data: diff, state: encodeStateVectorFromUpdate(update) };
},

View File

@@ -1,8 +1,8 @@
import { encodeStateVectorFromUpdate } from 'yjs';
import type { Storage } from '..';
import type { SyncStorage } from '../../engine/sync';
export function createSQLiteStorage(workspaceId: string): Storage {
export function createSQLiteStorage(workspaceId: string): SyncStorage {
if (!window.apis?.db) {
throw new Error('sqlite datasource is not available');
}

View File

@@ -0,0 +1,7 @@
import { createIndexedDBStorage } from './sync-indexeddb';
import { createSQLiteStorage } from './sync-sqlite';
export const createLocalStorage = (workspaceId: string) =>
environment.isDesktop
? createSQLiteStorage(workspaceId)
: createIndexedDBStorage(workspaceId);

View File

@@ -0,0 +1,54 @@
import { setupEditorFlags } from '@affine/env/global';
import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { nanoid } from 'nanoid';
import { WorkspaceEngine } from '../../engine';
import { BlobEngine } from '../../engine/blob';
import { SyncEngine } from '../../engine/sync';
import type { WorkspaceFactory } from '../../factory';
import { globalBlockSuiteSchema } from '../../global-schema';
import { Workspace } from '../../workspace';
import { createBroadcastChannelAwarenessProvider } from './awareness';
import { createLocalBlobStorage } from './blob';
import { createStaticBlobStorage } from './blob-static';
import { createLocalStorage } from './sync';
export const localWorkspaceFactory: WorkspaceFactory = {
name: 'local',
openWorkspace(metadata) {
const blobEngine = new BlobEngine(createLocalBlobStorage(metadata.id), [
createStaticBlobStorage(),
]);
const bs = new BlockSuiteWorkspace({
id: metadata.id,
blobStorages: [
() => ({
crud: blobEngine,
}),
],
idGenerator: () => nanoid(),
schema: globalBlockSuiteSchema,
});
const syncEngine = new SyncEngine(
bs.doc,
createLocalStorage(metadata.id),
[]
);
const awarenessProvider = createBroadcastChannelAwarenessProvider(
metadata.id,
bs.awarenessStore.awareness
);
const engine = new WorkspaceEngine(blobEngine, syncEngine, [
awarenessProvider,
]);
setupEditorFlags(bs);
return new Workspace(metadata, engine, bs);
},
async getWorkspaceBlob(id, blobKey) {
const blobStorage = createLocalBlobStorage(id);
return await blobStorage.get(blobKey);
},
};

View File

@@ -0,0 +1,29 @@
import {
cloudWorkspaceFactory,
createCloudWorkspaceListProvider,
createLocalWorkspaceListProvider,
localWorkspaceFactory,
} from './impl';
import { WorkspaceList } from './list';
import { WorkspaceManager } from './manager';
const list = new WorkspaceList([
createLocalWorkspaceListProvider(),
createCloudWorkspaceListProvider(),
]);
export const workspaceManager = new WorkspaceManager(list, [
localWorkspaceFactory,
cloudWorkspaceFactory,
]);
(window as any).workspaceManager = workspaceManager;
export * from './engine';
export * from './factory';
export * from './global-schema';
export * from './impl';
export * from './list';
export * from './manager';
export * from './metadata';
export * from './workspace';

View File

@@ -0,0 +1,21 @@
import { type WorkspaceMetadata } from '../metadata';
const CACHE_STORAGE_KEY = 'jotai-workspaces';
export function readWorkspaceListCache() {
const metadata = localStorage.getItem(CACHE_STORAGE_KEY);
if (metadata) {
try {
const items = JSON.parse(metadata) as WorkspaceMetadata[];
return [...items];
} catch (e) {
console.error('cannot parse worksapce', e);
}
return [];
}
return [];
}
export function writeWorkspaceListCache(metadata: WorkspaceMetadata[]) {
localStorage.setItem(CACHE_STORAGE_KEY, JSON.stringify(metadata));
}

View File

@@ -0,0 +1,300 @@
import { DebugLogger } from '@affine/debug';
import type { WorkspaceFlavour } from '@affine/env/workspace';
import { Slot } from '@blocksuite/global/utils';
import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { differenceWith } from 'lodash-es';
import type { BlobStorage } from '../engine';
import type { WorkspaceMetadata } from '../metadata';
import { readWorkspaceListCache, writeWorkspaceListCache } from './cache';
import { type WorkspaceInfo, WorkspaceInformation } from './information';
export * from './information';
const logger = new DebugLogger('affine:workspace:list');
export interface WorkspaceListProvider {
name: WorkspaceFlavour;
/**
* get workspaces list
*/
getList(): Promise<WorkspaceMetadata[]>;
/**
* delete workspace by id
*/
delete(workspaceId: string): Promise<void>;
/**
* create workspace
* @param initial callback to put initial data to workspace
*/
create(
initial: (
workspace: BlockSuiteWorkspace,
blobStorage: BlobStorage
) => Promise<void>
): Promise<string>;
/**
* Start subscribe workspaces list
*
* @returns unsubscribe function
*/
subscribe(
callback: (changed: {
added?: WorkspaceMetadata[];
deleted?: WorkspaceMetadata[];
}) => void
): () => void;
/**
* get workspace avatar and name by id
*
* @param id workspace id
*/
getInformation(id: string): Promise<WorkspaceInfo | undefined>;
}
export interface WorkspaceListStatus {
/**
* is workspace list doing first loading.
* if false, UI can display workspace not found page.
*/
loading: boolean;
workspaceList: WorkspaceMetadata[];
}
/**
* # WorkspaceList
*
* manage multiple workspace metadata list providers.
* provide a __cache-first__ and __offline useable__ workspace list.
*/
export class WorkspaceList {
private readonly abortController = new AbortController();
private readonly workspaceInformationList = new Map<
string,
WorkspaceInformation
>();
onStatusChanged = new Slot<WorkspaceListStatus>();
private _status: Readonly<WorkspaceListStatus> = {
loading: true,
workspaceList: [],
};
get status() {
return this._status;
}
set status(status) {
this._status = status;
// update cache
writeWorkspaceListCache(status.workspaceList);
this.onStatusChanged.emit(this._status);
}
get workspaceList() {
return this.status.workspaceList;
}
constructor(private readonly providers: WorkspaceListProvider[]) {
// initialize workspace list from cache
const cache = readWorkspaceListCache();
const workspaceList = cache;
this.status = {
...this.status,
workspaceList,
};
// start first load
this.startLoad();
}
/**
* create workspace
* @param flavour workspace flavour
* @param initial callback to put initial data to workspace
* @returns workspace id
*/
async create(
flavour: WorkspaceFlavour,
initial: (
workspace: BlockSuiteWorkspace,
blobStorage: BlobStorage
) => Promise<void>
) {
const provider = this.providers.find(x => x.name === flavour);
if (!provider) {
throw new Error(`Unknown workspace flavour: ${flavour}`);
}
const id = await provider.create(initial);
const metadata = {
id,
flavour,
};
// update workspace list
this.status = this.addWorkspace(this.status, metadata);
return id;
}
/**
* delete workspace
* @param workspaceMetadata
*/
async delete(workspaceMetadata: WorkspaceMetadata) {
logger.info(
`delete workspace [${workspaceMetadata.flavour}] ${workspaceMetadata.id}`
);
const provider = this.providers.find(
x => x.name === workspaceMetadata.flavour
);
if (!provider) {
throw new Error(
`Unknown workspace flavour: ${workspaceMetadata.flavour}`
);
}
await provider.delete(workspaceMetadata.id);
// delete workspace from list
this.status = this.deleteWorkspace(this.status, workspaceMetadata);
}
/**
* add workspace to list
*/
private addWorkspace(
status: WorkspaceListStatus,
workspaceMetadata: WorkspaceMetadata
) {
if (status.workspaceList.some(x => x.id === workspaceMetadata.id)) {
return status;
}
return {
...status,
workspaceList: status.workspaceList.concat(workspaceMetadata),
};
}
/**
* delete workspace from list
*/
private deleteWorkspace(
status: WorkspaceListStatus,
workspaceMetadata: WorkspaceMetadata
) {
if (!status.workspaceList.some(x => x.id === workspaceMetadata.id)) {
return status;
}
return {
...status,
workspaceList: status.workspaceList.filter(
x => x.id !== workspaceMetadata.id
),
};
}
/**
* callback for subscribe workspaces list
*/
private handleWorkspaceChange(changed: {
added?: WorkspaceMetadata[];
deleted?: WorkspaceMetadata[];
}) {
let status = this.status;
for (const added of changed.added ?? []) {
status = this.addWorkspace(status, added);
}
for (const deleted of changed.deleted ?? []) {
status = this.deleteWorkspace(status, deleted);
}
this.status = status;
}
/**
* start first load workspace list
*/
private startLoad() {
for (const provider of this.providers) {
// subscribe workspace list change
const unsubscribe = provider.subscribe(changed => {
this.handleWorkspaceChange(changed);
});
// unsubscribe when abort
if (this.abortController.signal.aborted) {
unsubscribe();
return;
}
this.abortController.signal.addEventListener('abort', () => {
unsubscribe();
});
}
this.revalidate()
.catch(error => {
logger.error('load workspace list error: ' + error);
})
.finally(() => {
this.status = {
...this.status,
loading: false,
};
});
}
async revalidate() {
await Promise.allSettled(
this.providers.map(async provider => {
try {
const list = await provider.getList();
const oldList = this.workspaceList.filter(
w => w.flavour === provider.name
);
this.handleWorkspaceChange({
added: differenceWith(list, oldList, (a, b) => a.id === b.id),
deleted: differenceWith(oldList, list, (a, b) => a.id === b.id),
});
} catch (error) {
logger.error('load workspace list error: ' + error);
}
})
);
}
/**
* get workspace information, if not exists, create it.
*/
getInformation(meta: WorkspaceMetadata) {
const exists = this.workspaceInformationList.get(meta.id);
if (exists) {
return exists;
}
return this.createInformation(meta);
}
private createInformation(workspaceMetadata: WorkspaceMetadata) {
const provider = this.providers.find(
x => x.name === workspaceMetadata.flavour
);
if (!provider) {
throw new Error(
`Unknown workspace flavour: ${workspaceMetadata.flavour}`
);
}
const information = new WorkspaceInformation(workspaceMetadata, provider);
information.fetch();
this.workspaceInformationList.set(workspaceMetadata.id, information);
return information;
}
dispose() {
this.abortController.abort();
}
}

View File

@@ -0,0 +1,97 @@
import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import type { WorkspaceMetadata } from '../metadata';
import type { Workspace } from '../workspace';
import type { WorkspaceListProvider } from '.';
const logger = new DebugLogger('affine:workspace:list:information');
const WORKSPACE_INFORMATION_CACHE_KEY = 'workspace-information:';
export interface WorkspaceInfo {
avatar?: string;
name?: string;
}
/**
* # WorkspaceInformation
*
* This class take care of workspace avatar and name
*
* The class will try to get from 3 places:
* - local cache
* - fetch from `WorkspaceListProvider`, which will fetch from database or server
* - sync with active workspace
*/
export class WorkspaceInformation {
private _info: WorkspaceInfo = {};
public set info(info: WorkspaceInfo) {
if (info.avatar !== this._info.avatar || info.name !== this._info.name) {
localStorage.setItem(
WORKSPACE_INFORMATION_CACHE_KEY + this.meta.id,
JSON.stringify(info)
);
this._info = info;
this.onUpdated.emit(info);
}
}
public get info() {
return this._info;
}
public onUpdated = new Slot<WorkspaceInfo>();
constructor(
public meta: WorkspaceMetadata,
public provider: WorkspaceListProvider
) {
const cached = this.getCachedInformation();
// init with cached information
this.info = { ...cached };
}
/**
* sync information with workspace
*/
syncWithWorkspace(workspace: Workspace) {
this.info = {
avatar: workspace.blockSuiteWorkspace.meta.avatar ?? this.info.avatar,
name: workspace.blockSuiteWorkspace.meta.name ?? this.info.name,
};
workspace.blockSuiteWorkspace.meta.commonFieldsUpdated.on(() => {
this.info = {
avatar: workspace.blockSuiteWorkspace.meta.avatar ?? this.info.avatar,
name: workspace.blockSuiteWorkspace.meta.name ?? this.info.name,
};
});
}
getCachedInformation() {
const cache = localStorage.getItem(
WORKSPACE_INFORMATION_CACHE_KEY + this.meta.id
);
if (cache) {
return JSON.parse(cache) as WorkspaceInfo;
}
return null;
}
/**
* fetch information from provider
*/
fetch() {
this.provider
.getInformation(this.meta.id)
.then(info => {
if (info) {
this.info = info;
}
})
.catch(err => {
logger.warn('get workspace information error: ' + err);
});
}
}

View File

@@ -1,71 +0,0 @@
/**
* @vitest-environment happy-dom
*/
import 'fake-indexeddb/auto';
import type { WorkspaceCRUD } from '@affine/env/workspace';
import { WorkspaceFlavour } from '@affine/env/workspace';
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import { assertExists } from '@blocksuite/global/utils';
import { Schema, Workspace } from '@blocksuite/store';
import { afterEach, assertType, describe, expect, test } from 'vitest';
import { CRUD } from '../crud';
const schema = new Schema();
schema.register(AffineSchemas).register(__unstableSchemas);
afterEach(() => {
localStorage.clear();
});
describe('crud', () => {
test('type', () => {
assertType<WorkspaceCRUD<WorkspaceFlavour.LOCAL>>(CRUD);
});
test('basic', async () => {
const workspace = await CRUD.get('not_exist');
expect(workspace).toBeNull();
expect(await CRUD.list()).toEqual([]);
});
test('delete not exist', async () => {
await expect(async () =>
CRUD.delete(new Workspace({ id: 'test', schema }))
).rejects.toThrowError();
});
test('create & delete', async () => {
const workspace = new Workspace({ id: 'test', schema });
const page = workspace.createPage({ id: 'page0' });
await page.waitForLoaded();
const pageBlockId = page.addBlock('affine:page', {
title: new page.Text(''),
});
page.addBlock('affine:surface', {}, pageBlockId);
const frameId = page.addBlock('affine:note', {}, pageBlockId);
page.addBlock('affine:paragraph', {}, frameId);
const id = await CRUD.create(workspace);
const list = await CRUD.list();
expect(list.length).toBe(1);
expect(list[0].id).toBe(id);
const localWorkspace = list.at(0);
assertExists(localWorkspace);
expect(localWorkspace.id).toBe(id);
expect(localWorkspace.flavour).toBe(WorkspaceFlavour.LOCAL);
expect(localWorkspace.blockSuiteWorkspace.doc.toJSON()).toEqual({
meta: expect.anything(),
spaces: expect.objectContaining({
page0: expect.anything(),
}),
});
await CRUD.delete(localWorkspace.blockSuiteWorkspace);
expect(await CRUD.get(id)).toBeNull();
expect(await CRUD.list()).toEqual([]);
});
});

View File

@@ -1,112 +0,0 @@
import { DebugLogger } from '@affine/debug';
import type { LocalWorkspace, WorkspaceCRUD } from '@affine/env/workspace';
import { WorkspaceFlavour } from '@affine/env/workspace';
import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { createJSONStorage } from 'jotai/utils';
import { nanoid } from 'nanoid';
import { z } from 'zod';
import { getOrCreateWorkspace } from '../manager';
const getStorage = () => createJSONStorage(() => localStorage);
const kStoreKey = 'affine-local-workspace';
const schema = z.array(z.string());
const logger = new DebugLogger('affine:workspace:local:crud');
/**
* @internal
*/
export function saveWorkspaceToLocalStorage(workspaceId: string) {
const storage = getStorage();
!Array.isArray(storage.getItem(kStoreKey, [])) &&
storage.setItem(kStoreKey, []);
const data = storage.getItem(kStoreKey, []) as z.infer<typeof schema>;
const id = data.find(id => id === workspaceId);
if (!id) {
logger.debug('saveWorkspaceToLocalStorage', workspaceId);
storage.setItem(kStoreKey, [...data, workspaceId]);
}
}
export const CRUD: WorkspaceCRUD<WorkspaceFlavour.LOCAL> = {
get: async workspaceId => {
logger.debug('get', workspaceId);
const storage = getStorage();
!Array.isArray(storage.getItem(kStoreKey, [])) &&
storage.setItem(kStoreKey, []);
const data = storage.getItem(kStoreKey, []) as z.infer<typeof schema>;
const id = data.find(id => id === workspaceId);
if (!id) {
return null;
}
const blockSuiteWorkspace = getOrCreateWorkspace(
id,
WorkspaceFlavour.LOCAL
);
const workspace: LocalWorkspace = {
id,
flavour: WorkspaceFlavour.LOCAL,
blockSuiteWorkspace: blockSuiteWorkspace,
};
return workspace;
},
create: async ({ doc }) => {
logger.debug('create', doc);
const storage = getStorage();
!Array.isArray(storage.getItem(kStoreKey, [])) &&
storage.setItem(kStoreKey, []);
const binary = BlockSuiteWorkspace.Y.encodeStateAsUpdate(doc);
const id = nanoid();
const blockSuiteWorkspace = getOrCreateWorkspace(
id,
WorkspaceFlavour.LOCAL
);
BlockSuiteWorkspace.Y.applyUpdate(blockSuiteWorkspace.doc, binary);
doc.getSubdocs().forEach(subdoc => {
blockSuiteWorkspace.doc.getSubdocs().forEach(newDoc => {
if (subdoc.guid === newDoc.guid) {
BlockSuiteWorkspace.Y.applyUpdate(
newDoc,
BlockSuiteWorkspace.Y.encodeStateAsUpdate(subdoc)
);
}
});
});
// todo: do we need to persist doc to persistence datasource?
saveWorkspaceToLocalStorage(id);
return id;
},
delete: async workspace => {
logger.debug('delete', workspace);
const storage = getStorage();
!Array.isArray(storage.getItem(kStoreKey, [])) &&
storage.setItem(kStoreKey, []);
const data = storage.getItem(kStoreKey, []) as z.infer<typeof schema>;
const idx = data.findIndex(id => id === workspace.id);
if (idx === -1) {
throw new Error('workspace not found');
}
data.splice(idx, 1);
storage.setItem(kStoreKey, [...data]);
// flywire
if (window.apis && environment.isDesktop) {
await window.apis.workspace.delete(workspace.id);
}
},
list: async () => {
logger.debug('list');
const storage = getStorage();
const allWorkspaceIDs: string[] = storage.getItem(kStoreKey, []) as z.infer<
typeof schema
>;
const workspaces = (
await Promise.all(allWorkspaceIDs.map(id => CRUD.get(id)))
).filter(item => item !== null) as LocalWorkspace[];
return workspaces;
},
};

View File

@@ -0,0 +1,185 @@
import { DebugLogger } from '@affine/debug';
import { WorkspaceFlavour } from '@affine/env/workspace';
import { assertEquals } from '@blocksuite/global/utils';
import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { fixWorkspaceVersion } from '@toeverything/infra/blocksuite';
import { applyUpdate, encodeStateAsUpdate } from 'yjs';
import type { BlobStorage } from '.';
import type { WorkspaceFactory } from './factory';
import { LOCAL_WORKSPACE_LOCAL_STORAGE_KEY } from './impl/local/consts';
import type { WorkspaceList } from './list';
import type { WorkspaceMetadata } from './metadata';
import { WorkspacePool } from './pool';
import type { Workspace } from './workspace';
const logger = new DebugLogger('affine:workspace-manager');
/**
* # `WorkspaceManager`
*
* This class acts as the central hub for managing various aspects of workspaces.
* It is structured as follows:
*
* ```
* ┌───────────┐
* │ Workspace │
* │ Manager │
* └─────┬─────┘
* ┌─────────────┼─────────────┐
* ┌───┴───┐ ┌───┴───┐ ┌─────┴─────┐
* │ List │ │ Pool │ │ Factories │
* └───────┘ └───────┘ └───────────┘
* ```
*
* Manage every about workspace
*
* # List
*
* The `WorkspaceList` component stores metadata for all workspaces, also include workspace avatar and custom name.
*
* # Factories
*
* This class contains a collection of `WorkspaceFactory`,
* We utilize `metadata.flavour` to identify the appropriate factory for opening a workspace.
* Once opened, workspaces are stored in the `WorkspacePool`.
*
* # Pool
*
* The `WorkspacePool` use reference counting to manage active workspaces.
* Calling `use()` to create a reference to the workspace. Calling `release()` to release the reference.
* When the reference count is 0, it will close the workspace.
*
*/
export class WorkspaceManager {
pool: WorkspacePool = new WorkspacePool();
constructor(
public list: WorkspaceList,
public factories: WorkspaceFactory[]
) {}
/**
* get workspace reference by metadata.
*
* You basically don't need to call this function directly, use the react hook `useWorkspace(metadata)` instead.
*
* @returns the workspace reference and a release function, don't forget to call release function when you don't
* need the workspace anymore.
*/
use(metadata: WorkspaceMetadata): {
workspace: Workspace;
release: () => void;
} {
const exist = this.pool.get(metadata.id);
if (exist) {
return exist;
}
const workspace = this.open(metadata);
const ref = this.pool.put(workspace);
return ref;
}
createWorkspace(
flavour: WorkspaceFlavour,
initial: (
workspace: BlockSuiteWorkspace,
blobStorage: BlobStorage
) => Promise<void>
): Promise<string> {
logger.info(`create workspace [${flavour}]`);
return this.list.create(flavour, initial);
}
/**
* delete workspace by metadata, same as `WorkspaceList.deleteWorkspace`
*/
async deleteWorkspace(metadata: WorkspaceMetadata) {
await this.list.delete(metadata);
}
/**
* helper function to transform local workspace to cloud workspace
*/
async transformLocalToCloud(local: Workspace): Promise<WorkspaceMetadata> {
assertEquals(local.flavour, WorkspaceFlavour.LOCAL);
await local.engine.sync.waitForSynced();
const newId = await this.list.create(
WorkspaceFlavour.AFFINE_CLOUD,
async (ws, bs) => {
applyUpdate(ws.doc, encodeStateAsUpdate(local.blockSuiteWorkspace.doc));
for (const subdoc of local.blockSuiteWorkspace.doc.getSubdocs()) {
for (const newSubdoc of ws.doc.getSubdocs()) {
if (newSubdoc.guid === subdoc.guid) {
applyUpdate(newSubdoc, encodeStateAsUpdate(subdoc));
}
}
}
const blobList = await local.engine.blob.list();
for (const blobKey of blobList) {
const blob = await local.engine.blob.get(blobKey);
if (blob) {
await bs.set(blobKey, blob);
}
}
}
);
await this.list.delete(local.meta);
return {
id: newId,
flavour: WorkspaceFlavour.AFFINE_CLOUD,
};
}
/**
* helper function to get blob without open workspace, its be used for download workspace avatars.
*/
getWorkspaceBlob(metadata: WorkspaceMetadata, blobKey: string) {
const factory = this.factories.find(x => x.name === metadata.flavour);
if (!factory) {
throw new Error(`Unknown workspace flavour: ${metadata.flavour}`);
}
return factory.getWorkspaceBlob(metadata.id, blobKey);
}
/**
* a hack for directly add local workspace to workspace list
* Used after copying sqlite database file to appdata folder
*/
_addLocalWorkspace(id: string) {
const allWorkspaceIDs: string[] = JSON.parse(
localStorage.getItem(LOCAL_WORKSPACE_LOCAL_STORAGE_KEY) ?? '[]'
);
allWorkspaceIDs.push(id);
localStorage.setItem(
LOCAL_WORKSPACE_LOCAL_STORAGE_KEY,
JSON.stringify(allWorkspaceIDs)
);
}
private open(metadata: WorkspaceMetadata) {
logger.info(`open workspace [${metadata.flavour}] ${metadata.id} `);
const factory = this.factories.find(x => x.name === metadata.flavour);
if (!factory) {
throw new Error(`Unknown workspace flavour: ${metadata.flavour}`);
}
const workspace = factory.openWorkspace(metadata);
// sync information with workspace list, when workspace's avatar and name changed, information will be updated
this.list.getInformation(metadata).syncWithWorkspace(workspace);
// apply compatibility fix
fixWorkspaceVersion(workspace.blockSuiteWorkspace.doc);
return workspace;
}
}

View File

@@ -1,142 +0,0 @@
import type { BlockSuiteFeatureFlags } from '@affine/env/global';
import { WorkspaceFlavour } from '@affine/env/workspace';
import { createAffinePublicProviders } from '@affine/workspace/providers';
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import type { DocProviderCreator } from '@blocksuite/store';
import { Schema, Workspace } from '@blocksuite/store';
import { INTERNAL_BLOCKSUITE_HASH_MAP } from '@toeverything/infra/__internal__/workspace';
import { nanoid } from 'nanoid';
import type { Doc } from 'yjs';
import type { Transaction } from 'yjs';
import type { BlobEngine } from '../blob';
import {
createAffineCloudBlobEngine,
createAffinePublicBlobEngine,
createLocalBlobEngine,
} from '../blob';
import { createAffineProviders, createLocalProviders } from '../providers';
function setEditorFlags(workspace: Workspace) {
Object.entries(runtimeConfig.editorFlags).forEach(([key, value]) => {
workspace.awarenessStore.setFlag(
key as keyof BlockSuiteFeatureFlags,
value
);
});
}
type UpdateCallback = (
update: Uint8Array,
origin: string | number | null,
doc: Doc,
transaction: Transaction
) => void;
type SubdocEvent = {
loaded: Set<Doc>;
removed: Set<Doc>;
added: Set<Doc>;
};
const docUpdateCallbackWeakMap = new WeakMap<Doc, UpdateCallback>();
export const globalBlockSuiteSchema = new Schema();
globalBlockSuiteSchema.register(AffineSchemas).register(__unstableSchemas);
const createMonitor = (doc: Doc) => {
const onUpdate: UpdateCallback = (_, origin) => {
if (process.env.NODE_ENV === 'development') {
if (typeof origin !== 'string' && typeof origin !== 'number') {
console.warn(
'origin is not a string or number, this will cause problems in the future',
origin
);
}
} else {
// todo: add monitor in the future
}
};
docUpdateCallbackWeakMap.set(doc, onUpdate);
doc.on('update', onUpdate);
const onSubdocs = (event: SubdocEvent) => {
event.added.forEach(subdoc => {
if (!docUpdateCallbackWeakMap.has(subdoc)) {
createMonitor(subdoc);
}
});
event.removed.forEach(subdoc => {
if (docUpdateCallbackWeakMap.has(subdoc)) {
docUpdateCallbackWeakMap.delete(subdoc);
}
});
};
doc.on('subdocs', onSubdocs);
doc.on('destroy', () => {
docUpdateCallbackWeakMap.delete(doc);
doc.off('update', onSubdocs);
});
};
const workspaceBlobEngineWeakMap = new WeakMap<Workspace, BlobEngine>();
export function getBlobEngine(workspace: Workspace) {
// temporary solution to get blob engine from workspace
return workspaceBlobEngineWeakMap.get(workspace);
}
// if not exist, create a new workspace
export function getOrCreateWorkspace(
id: string,
flavour: WorkspaceFlavour
): Workspace {
const providerCreators: DocProviderCreator[] = [];
if (INTERNAL_BLOCKSUITE_HASH_MAP.has(id)) {
return INTERNAL_BLOCKSUITE_HASH_MAP.get(id) as Workspace;
}
let blobEngine: BlobEngine;
if (flavour === WorkspaceFlavour.AFFINE_CLOUD) {
blobEngine = createAffineCloudBlobEngine(id);
providerCreators.push(...createAffineProviders());
} else if (flavour === WorkspaceFlavour.LOCAL) {
blobEngine = createLocalBlobEngine(id);
providerCreators.push(...createLocalProviders());
} else if (flavour === WorkspaceFlavour.AFFINE_PUBLIC) {
blobEngine = createAffinePublicBlobEngine(id);
providerCreators.push(...createAffinePublicProviders());
} else {
throw new Error('unsupported flavour');
}
const workspace = new Workspace({
id,
providerCreators: typeof window === 'undefined' ? [] : providerCreators,
blobStorages: [
() => ({
crud: {
async get(key) {
return (await blobEngine.get(key)) ?? null;
},
async set(key, value) {
await blobEngine.set(key, value);
return key;
},
async delete(key) {
return blobEngine.delete(key);
},
async list() {
return blobEngine.list();
},
},
}),
],
idGenerator: () => nanoid(),
schema: globalBlockSuiteSchema,
});
workspaceBlobEngineWeakMap.set(workspace, blobEngine);
createMonitor(workspace.doc);
setEditorFlags(workspace);
INTERNAL_BLOCKSUITE_HASH_MAP.set(id, workspace);
return workspace;
}

View File

@@ -0,0 +1,3 @@
import type { WorkspaceFlavour } from '@affine/env/workspace';
export type WorkspaceMetadata = { id: string; flavour: WorkspaceFlavour };

View File

@@ -0,0 +1,86 @@
import { DebugLogger } from '@affine/debug';
import { Unreachable } from '@affine/env/constant';
import type { Workspace } from './workspace';
const logger = new DebugLogger('affine:workspace-manager:pool');
/**
* Collection of opened workspaces. use reference counting to manage active workspaces.
*/
export class WorkspacePool {
openedWorkspaces = new Map<string, { workspace: Workspace; rc: number }>();
timeoutToGc: NodeJS.Timeout | null = null;
get(workspaceId: string): {
workspace: Workspace;
release: () => void;
} | null {
const exist = this.openedWorkspaces.get(workspaceId);
if (exist) {
exist.rc++;
let released = false;
return {
workspace: exist.workspace,
release: () => {
// avoid double release
if (released) {
return;
}
released = true;
exist.rc--;
this.requestGc();
},
};
}
return null;
}
put(workspace: Workspace) {
const ref = { workspace, rc: 0 };
this.openedWorkspaces.set(workspace.meta.id, ref);
const r = this.get(workspace.meta.id);
if (!r) {
throw new Unreachable();
}
return r;
}
private requestGc() {
if (this.timeoutToGc) {
clearInterval(this.timeoutToGc);
}
// do gc every 1s
this.timeoutToGc = setInterval(() => {
this.gc();
}, 1000);
}
private gc() {
for (const [id, { workspace, rc }] of new Map(
this.openedWorkspaces /* clone the map, because the origin will be modified during iteration */
)) {
if (rc === 0 && workspace.canGracefulStop()) {
// we can safely close the workspace
logger.info(`close workspace [${workspace.flavour}] ${workspace.id}`);
workspace.forceStop();
this.openedWorkspaces.delete(id);
}
}
for (const [_, { rc }] of this.openedWorkspaces) {
if (rc === 0) {
return;
}
}
// if all workspaces has referrer, stop gc
if (this.timeoutToGc) {
clearInterval(this.timeoutToGc);
}
}
}

View File

@@ -1,142 +0,0 @@
/**
* The `Provider` is responsible for sync `Y.Doc` with the local database and the Affine Cloud, serving as the source of
* Affine's local-first collaborative magic.
*
* When Affine boot, the `Provider` is tasked with reading content from the local database and loading it into the
* workspace, continuously storing any changes made by the user into the local database.
*
* When using Affine Cloud, the `Provider` also handles sync content with the Cloud.
*
* Additionally, the `Provider` is responsible for implementing a local-first capability, allowing users to edit offline
* with changes stored in the local database and sync with the Cloud when the network is restored.
*/
import type { DocProviderCreator } from '@blocksuite/store';
import {
createAffineAwarenessProvider,
createBroadcastChannelAwarenessProvider,
} from './awareness';
import { createAffineStorage } from './storage/affine';
import { createIndexedDBStorage } from './storage/indexeddb';
import { createSQLiteStorage } from './storage/sqlite';
import { SyncEngine } from './sync';
export * from './sync';
export const createLocalProviders = (): DocProviderCreator[] => {
return [
(_, doc, { awareness }) => {
const engine = new SyncEngine(
doc,
environment.isDesktop
? createSQLiteStorage(doc.guid)
: createIndexedDBStorage(doc.guid),
[]
);
const awarenessProviders = [
createBroadcastChannelAwarenessProvider(doc.guid, awareness),
];
let connected = false;
return {
flavour: '_',
passive: true,
active: true,
sync() {
if (!connected) {
engine.start();
for (const provider of awarenessProviders) {
provider.connect();
}
connected = true;
}
},
get whenReady() {
return engine.waitForLoadedRootDoc();
},
connect() {
if (!connected) {
engine.start();
for (const provider of awarenessProviders) {
provider.connect();
}
connected = true;
}
},
disconnect() {
// TODO: actually disconnect
},
get connected() {
return connected;
},
engine,
};
},
];
};
export const createAffineProviders = (): DocProviderCreator[] => {
return [
(_, doc, { awareness }) => {
const engine = new SyncEngine(
doc,
environment.isDesktop
? createSQLiteStorage(doc.guid)
: createIndexedDBStorage(doc.guid),
[createAffineStorage(doc.guid)]
);
const awarenessProviders = [
createBroadcastChannelAwarenessProvider(doc.guid, awareness),
createAffineAwarenessProvider(doc.guid, awareness),
];
let connected = false;
return {
flavour: '_',
passive: true,
active: true,
sync() {
if (!connected) {
engine.start();
for (const provider of awarenessProviders) {
provider.connect();
}
connected = true;
}
},
get whenReady() {
return engine.waitForLoadedRootDoc();
},
connect() {
if (!connected) {
engine.start();
for (const provider of awarenessProviders) {
provider.connect();
}
connected = true;
}
},
disconnect() {
// TODO: actually disconnect
},
get connected() {
return connected;
},
engine,
};
},
];
};
export const createAffinePublicProviders = (): DocProviderCreator[] => {
return [];
};

View File

@@ -1,7 +0,0 @@
export const MANUALLY_STOP = 'manually-stop';
export enum SyncEngineStep {
Stopped = 0,
Syncing = 1,
Synced = 2,
}

View File

@@ -0,0 +1,146 @@
import { Unreachable } from '@affine/env/constant';
import { WorkspaceFlavour } from '@affine/env/workspace';
import { Slot } from '@blocksuite/global/utils';
import {
checkWorkspaceCompatibility,
MigrationPoint,
} from '@toeverything/infra/blocksuite';
import {
forceUpgradePages,
upgradeV1ToV2,
} from '@toeverything/infra/blocksuite';
import { migrateGuidCompatibility } from '@toeverything/infra/blocksuite';
import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import type { WorkspaceManager } from '..';
import type { Workspace } from '../workspace';
export interface WorkspaceUpgradeStatus {
needUpgrade: boolean;
upgrading: boolean;
}
export class WorkspaceUpgradeController {
_status: Readonly<WorkspaceUpgradeStatus> = {
needUpgrade: false,
upgrading: false,
};
readonly onStatusChange = new Slot<WorkspaceUpgradeStatus>();
get status() {
return this._status;
}
set status(value) {
if (
value.needUpgrade !== this._status.needUpgrade ||
value.upgrading !== this._status.upgrading
) {
this._status = value;
this.onStatusChange.emit(value);
}
}
constructor(private readonly workspace: Workspace) {
workspace.blockSuiteWorkspace.doc.on('update', () => {
this.checkIfNeedUpgrade();
});
}
checkIfNeedUpgrade() {
const needUpgrade = !!checkWorkspaceCompatibility(
this.workspace.blockSuiteWorkspace
);
this.status = {
...this.status,
needUpgrade,
};
return needUpgrade;
}
async upgrade(workspaceManager: WorkspaceManager): Promise<string | null> {
if (this.status.upgrading) {
return null;
}
this.status = { ...this.status, upgrading: true };
try {
await this.workspace.engine.sync.waitForSynced();
const step = checkWorkspaceCompatibility(
this.workspace.blockSuiteWorkspace
);
if (!step) {
return null;
}
// Clone a new doc to prevent change events.
const clonedDoc = new YDoc({
guid: this.workspace.blockSuiteWorkspace.doc.guid,
});
applyDoc(clonedDoc, this.workspace.blockSuiteWorkspace.doc);
if (step === MigrationPoint.SubDoc) {
const newWorkspace = await workspaceManager.createWorkspace(
WorkspaceFlavour.LOCAL,
async (workspace, blobStorage) => {
await upgradeV1ToV2(clonedDoc, workspace.doc);
migrateGuidCompatibility(clonedDoc);
await forceUpgradePages(
workspace.doc,
this.workspace.blockSuiteWorkspace.schema
);
const blobList =
await this.workspace.blockSuiteWorkspace.blob.list();
for (const blobKey of blobList) {
const blob =
await this.workspace.blockSuiteWorkspace.blob.get(blobKey);
if (blob) {
await blobStorage.set(blobKey, blob);
}
}
}
);
await workspaceManager.deleteWorkspace(this.workspace.meta);
return newWorkspace;
} else if (step === MigrationPoint.GuidFix) {
migrateGuidCompatibility(clonedDoc);
await forceUpgradePages(
clonedDoc,
this.workspace.blockSuiteWorkspace.schema
);
applyDoc(this.workspace.blockSuiteWorkspace.doc, clonedDoc);
await this.workspace.engine.sync.waitForSynced();
return null;
} else if (step === MigrationPoint.BlockVersion) {
await forceUpgradePages(
clonedDoc,
this.workspace.blockSuiteWorkspace.schema
);
applyDoc(this.workspace.blockSuiteWorkspace.doc, clonedDoc);
await this.workspace.engine.sync.waitForSynced();
return null;
} else {
throw new Unreachable();
}
} finally {
this.status = { ...this.status, upgrading: false };
}
}
}
function applyDoc(target: YDoc, result: YDoc) {
applyUpdate(target, encodeStateAsUpdate(result));
for (const targetSubDoc of target.subdocs.values()) {
const resultSubDocs = Array.from(result.subdocs.values());
const resultSubDoc = resultSubDocs.find(
item => item.guid === targetSubDoc.guid
);
if (resultSubDoc) {
applyDoc(targetSubDoc, resultSubDoc);
}
}
}

View File

@@ -2,7 +2,7 @@ import { Buffer } from 'node:buffer';
import { describe, expect, test } from 'vitest';
import { isSvgBuffer } from '../util';
import { isSvgBuffer } from '../buffer-to-blob';
describe('isSvgBuffer', () => {
test('basic', async () => {

View File

@@ -0,0 +1,17 @@
import { applyUpdate, Doc, encodeStateAsUpdate } from 'yjs';
export function mergeUpdates(updates: Uint8Array[]) {
if (updates.length === 0) {
return new Uint8Array();
}
if (updates.length === 1) {
return updates[0];
}
const doc = new Doc();
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update);
});
});
return encodeStateAsUpdate(doc);
}

View File

@@ -5,3 +5,5 @@ export function throwIfAborted(abort?: AbortSignal) {
}
return true;
}
export const MANUALLY_STOP = 'manually-stop';

View File

@@ -0,0 +1,137 @@
import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import type { WorkspaceEngine, WorkspaceEngineStatus } from './engine';
import type { WorkspaceMetadata } from './metadata';
import {
WorkspaceUpgradeController,
type WorkspaceUpgradeStatus,
} from './upgrade';
const logger = new DebugLogger('affine:workspace');
export type WorkspaceStatus = {
mode: 'ready' | 'closed';
engine: WorkspaceEngineStatus;
upgrade: WorkspaceUpgradeStatus;
};
/**
* # Workspace
*
* ```
* ┌───────────┐
* │ Workspace │
* └─────┬─────┘
* │
* │
* ┌──────────────┼─────────────┐
* │ │ │
* ┌───┴─────┐ ┌──────┴─────┐ ┌───┴────┐
* │ Upgrade │ │ blocksuite │ │ Engine │
* └─────────┘ └────────────┘ └───┬────┘
* │
* ┌──────┼─────────┐
* │ │ │
* ┌──┴─┐ ┌──┴─┐ ┌─────┴───┐
* │sync│ │blob│ │awareness│
* └────┘ └────┘ └─────────┘
* ```
*
* This class contains all the components needed to run a workspace.
*/
export class Workspace {
get id() {
return this.meta.id;
}
get flavour() {
return this.meta.flavour;
}
private _status: WorkspaceStatus;
upgrade: WorkspaceUpgradeController;
/**
* event on workspace stop, workspace is one-time use, so it will be triggered only once
*/
onStop = new Slot();
onStatusChange = new Slot<WorkspaceStatus>();
get status() {
return this._status;
}
set status(status: WorkspaceStatus) {
this._status = status;
this.onStatusChange.emit(status);
}
constructor(
public meta: WorkspaceMetadata,
public engine: WorkspaceEngine,
public blockSuiteWorkspace: BlockSuiteWorkspace
) {
this.upgrade = new WorkspaceUpgradeController(this);
this._status = {
mode: 'closed',
engine: engine.status,
upgrade: this.upgrade.status,
};
this.engine.onStatusChange.on(status => {
this.status = {
...this.status,
engine: status,
};
});
this.upgrade.onStatusChange.on(status => {
this.status = {
...this.status,
upgrade: status,
};
});
this.start();
}
/**
* workspace start when create and workspace is one-time use
*/
private start() {
if (this.status.mode === 'ready') {
return;
}
logger.info('start workspace', this.id);
this.engine.start();
this.status = {
...this.status,
mode: 'ready',
engine: this.engine.status,
};
}
canGracefulStop() {
return this.engine.canGracefulStop() && !this.status.upgrade.upgrading;
}
forceStop() {
if (this.status.mode === 'closed') {
return;
}
logger.info('stop workspace', this.id);
this.engine.forceStop();
this.status = {
...this.status,
mode: 'closed',
engine: this.engine.status,
};
this.onStop.emit();
}
// same as `WorkspaceEngine.sync.setPriorityRule`
setPriorityRule(target: ((id: string) => boolean) | null) {
this.engine.sync.setPriorityRule(target);
}
}

View File

@@ -7,12 +7,9 @@
},
"references": [
{ "path": "../../../tests/fixtures" },
{ "path": "../../common/y-indexeddb" },
{ "path": "../../common/y-provider" },
{ "path": "../../common/env" },
{ "path": "../../common/debug" },
{ "path": "../../common/infra" },
{ "path": "../../frontend/hooks" },
{ "path": "../../frontend/graphql" }
]
}