diff --git a/packages/frontend/core/src/layouts/workspace-layout.tsx b/packages/frontend/core/src/layouts/workspace-layout.tsx index 7b4406314a..12bbf066a6 100644 --- a/packages/frontend/core/src/layouts/workspace-layout.tsx +++ b/packages/frontend/core/src/layouts/workspace-layout.tsx @@ -14,6 +14,7 @@ import { } from '@affine/component/workspace'; import { useAFFiNEI18N } from '@affine/i18n/hooks'; import { rootWorkspacesMetadataAtom } from '@affine/workspace/atom'; +import { getBlobEngine } from '@affine/workspace/manager'; import { assertExists } from '@blocksuite/global/utils'; import type { DragEndEvent } from '@dnd-kit/core'; import { @@ -116,10 +117,44 @@ type WorkspaceLayoutProps = { migration?: MigrationPoint; }; +const useSyncWorkspaceBlob = () => { + // temporary solution for sync blob + + const [currentWorkspace] = useCurrentWorkspace(); + + useEffect(() => { + const blobEngine = getBlobEngine(currentWorkspace.blockSuiteWorkspace); + let stopped = false; + function sync() { + if (stopped) { + return; + } + + blobEngine + ?.sync() + .catch(error => { + console.error('sync blob error', error); + }) + .finally(() => { + // sync every 1 minute + setTimeout(sync, 60000); + }); + } + + // after currentWorkspace changed, wait 1 second to start sync + setTimeout(sync, 1000); + + return () => { + stopped = true; + }; + }, [currentWorkspace]); +}; + export const WorkspaceLayout = function WorkspacesSuspense({ children, migration, }: PropsWithChildren) { + useSyncWorkspaceBlob(); return ( diff --git a/packages/frontend/workspace/package.json b/packages/frontend/workspace/package.json index ec63f14b88..5b745d3b4b 100644 --- a/packages/frontend/workspace/package.json +++ b/packages/frontend/workspace/package.json @@ -22,6 +22,7 @@ "@toeverything/y-indexeddb": "workspace:*", "async-call-rpc": "^6.3.1", "idb": "^7.1.1", + "idb-keyval": "^6.2.1", "is-svg": "^5.0.0", "jotai": "^2.5.1", "js-base64": "^3.7.5", diff --git a/packages/frontend/workspace/src/blob/cloud-blob-storage.ts b/packages/frontend/workspace/src/blob/cloud-blob-storage.ts deleted file mode 100644 index 99eeb49067..0000000000 --- a/packages/frontend/workspace/src/blob/cloud-blob-storage.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { - checkBlobSizesQuery, - deleteBlobMutation, - fetchWithTraceReport, - listBlobsQuery, - setBlobMutation, -} from '@affine/graphql'; -import { fetcher } from '@affine/workspace/affine/gql'; -import type { BlobStorage } from '@blocksuite/store'; - -import { predefinedStaticFiles } from './local-static-storage'; -import { bufferToBlob } from './util'; - -export const createCloudBlobStorage = (workspaceId: string): BlobStorage => { - return { - crud: { - get: async key => { - const suffix = key.startsWith('/') - ? key - : predefinedStaticFiles.includes(key) - ? `/static/${key}` - : `/api/workspaces/${workspaceId}/blobs/${key}`; - - return fetchWithTraceReport( - runtimeConfig.serverUrlPrefix + suffix - ).then(async res => { - if (!res.ok) { - // status not in the range 200-299 - return null; - } - return bufferToBlob(await res.arrayBuffer()); - }); - }, - set: async (key, value) => { - const { - checkBlobSize: { size }, - } = await fetcher({ - query: checkBlobSizesQuery, - variables: { - workspaceId, - size: value.size, - }, - }); - - if (size <= 0) { - throw new Error('Blob size limit exceeded'); - } - - const result = await fetcher({ - query: setBlobMutation, - variables: { - workspaceId, - blob: new File([value], key), - }, - }); - console.assert(result.setBlob === key, 'Blob hash mismatch'); - return key; - }, - list: async () => { - const result = await fetcher({ - query: listBlobsQuery, - variables: { - workspaceId, - }, - }); - return result.listBlobs; - }, - delete: async (key: string) => { - await fetcher({ - query: deleteBlobMutation, - variables: { - workspaceId, - hash: key, - }, - }); - }, - }, - }; -}; diff --git a/packages/frontend/workspace/src/blob/engine.ts b/packages/frontend/workspace/src/blob/engine.ts new file mode 100644 index 0000000000..16f993bdaa --- /dev/null +++ b/packages/frontend/workspace/src/blob/engine.ts @@ -0,0 +1,139 @@ +import { DebugLogger } from '@affine/debug'; +import { difference } from 'lodash-es'; + +const logger = new DebugLogger('affine:blob-engine'); + +export class BlobEngine { + constructor( + private local: BlobStorage, + private remotes: BlobStorage[] + ) {} + + get storages() { + return [this.local, ...this.remotes]; + } + + async sync() { + if (this.local.readonly) { + return; + } + logger.debug('start syncing blob...'); + for (const remote of this.remotes) { + let localList; + let remoteList; + try { + localList = await this.local.list(); + remoteList = await remote.list(); + } catch (err) { + logger.error(`error when sync`, err); + continue; + } + + if (!remote.readonly) { + const needUpload = difference(localList, remoteList); + for (const key of needUpload) { + try { + const data = await this.local.get(key); + if (data) { + await remote.set(key, data); + } + } catch (err) { + logger.error( + `error when sync ${key} from [${this.local.name}] to [${remote.name}]`, + err + ); + } + } + } + + const needDownload = difference(remoteList, localList); + + for (const key of needDownload) { + try { + const data = await remote.get(key); + if (data) { + await this.local.set(key, data); + } + } catch (err) { + logger.error( + `error when sync ${key} from [${remote.name}] to [${this.local.name}]`, + err + ); + } + } + } + + logger.debug('finish syncing blob'); + } + + async get(key: string) { + logger.debug('get blob', key); + for (const storage of this.storages) { + const data = await storage.get(key); + if (data) { + return data; + } + } + return undefined; + } + + async set(key: string, value: Blob) { + if (this.local.readonly) { + throw new Error('local peer is readonly'); + } + + // await upload to the local peer + await this.local.set(key, value); + + // uploads to other peers in the background + Promise.allSettled( + this.remotes + .filter(r => !r.readonly) + .map(peer => + peer.set(key, value).catch(err => { + logger.error('error when upload to peer', err); + }) + ) + ) + .then(result => { + if (result.some(({ status }) => status === 'rejected')) { + logger.error( + `blob ${key} update finish, but some peers failed to update` + ); + } else { + logger.debug(`blob ${key} update finish`); + } + }) + .catch(() => { + // Promise.allSettled never reject + }); + } + + async delete(_key: string) { + // not supported + } + + async list() { + const blobList = new Set(); + + for (const peer of this.storages) { + const list = await peer.list(); + if (list) { + for (const blob of list) { + blobList.add(blob); + } + } + } + + return Array.from(blobList); + } +} + +export interface BlobStorage { + name: string; + readonly: boolean; + get: (key: string) => Promise; + set: (key: string, value: Blob) => Promise; + delete: (key: string) => Promise; + list: () => Promise; +} diff --git a/packages/frontend/workspace/src/blob/index.ts b/packages/frontend/workspace/src/blob/index.ts new file mode 100644 index 0000000000..e6fc140b24 --- /dev/null +++ b/packages/frontend/workspace/src/blob/index.ts @@ -0,0 +1,37 @@ +import { BlobEngine } from './engine'; +import { + createAffineCloudBlobStorage, + createIndexeddbBlobStorage, + createSQLiteBlobStorage, + createStaticBlobStorage, +} from './storage'; + +export * from './engine'; +export * from './storage'; + +export function createLocalBlobStorage(workspaceId: string) { + if (environment.isDesktop) { + return createSQLiteBlobStorage(workspaceId); + } else { + return createIndexeddbBlobStorage(workspaceId); + } +} + +export function createLocalBlobEngine(workspaceId: string) { + return new BlobEngine(createLocalBlobStorage(workspaceId), [ + createStaticBlobStorage(), + ]); +} + +export function createAffineCloudBlobEngine(workspaceId: string) { + return new BlobEngine(createLocalBlobStorage(workspaceId), [ + createStaticBlobStorage(), + createAffineCloudBlobStorage(workspaceId), + ]); +} + +export function createAffinePublicBlobEngine(workspaceId: string) { + return new BlobEngine(createAffineCloudBlobStorage(workspaceId), [ + createStaticBlobStorage(), + ]); +} diff --git a/packages/frontend/workspace/src/blob/sqlite-blob-storage.ts b/packages/frontend/workspace/src/blob/sqlite-blob-storage.ts deleted file mode 100644 index c095eeee33..0000000000 --- a/packages/frontend/workspace/src/blob/sqlite-blob-storage.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { assertExists } from '@blocksuite/global/utils'; -import type { BlobStorage } from '@blocksuite/store'; - -import { bufferToBlob } from './util'; - -export const createSQLiteStorage = (workspaceId: string): BlobStorage => { - const apis = window.apis; - assertExists(apis); - return { - crud: { - get: async (key: string) => { - const buffer = await apis.db.getBlob(workspaceId, key); - if (buffer) { - return bufferToBlob(buffer); - } - return null; - }, - set: async (key: string, value: Blob) => { - await apis.db.addBlob( - workspaceId, - key, - new Uint8Array(await value.arrayBuffer()) - ); - return key; - }, - delete: async (key: string) => { - return apis.db.deleteBlob(workspaceId, key); - }, - list: async () => { - return apis.db.getBlobKeys(workspaceId); - }, - }, - }; -}; diff --git a/packages/frontend/workspace/src/blob/storage/affine-cloud.ts b/packages/frontend/workspace/src/blob/storage/affine-cloud.ts new file mode 100644 index 0000000000..91ffa92a4f --- /dev/null +++ b/packages/frontend/workspace/src/blob/storage/affine-cloud.ts @@ -0,0 +1,76 @@ +import { + checkBlobSizesQuery, + deleteBlobMutation, + fetchWithTraceReport, + listBlobsQuery, + setBlobMutation, +} from '@affine/graphql'; +import { fetcher } from '@affine/workspace/affine/gql'; + +import type { BlobStorage } from '../engine'; + +export const createAffineCloudBlobStorage = ( + workspaceId: string +): BlobStorage => { + return { + name: 'affine-cloud', + readonly: false, + get: async key => { + const suffix = key.startsWith('/') + ? key + : `/api/workspaces/${workspaceId}/blobs/${key}`; + + return fetchWithTraceReport(runtimeConfig.serverUrlPrefix + suffix).then( + async res => { + if (!res.ok) { + // status not in the range 200-299 + return undefined; + } + return await res.blob(); + } + ); + }, + set: async (key, value) => { + const { + checkBlobSize: { size }, + } = await fetcher({ + query: checkBlobSizesQuery, + variables: { + workspaceId, + size: value.size, + }, + }); + + if (size <= 0) { + throw new Error('Blob size limit exceeded'); + } + + const result = await fetcher({ + query: setBlobMutation, + variables: { + workspaceId, + blob: new File([value], key), + }, + }); + console.assert(result.setBlob === key, 'Blob hash mismatch'); + }, + list: async () => { + const result = await fetcher({ + query: listBlobsQuery, + variables: { + workspaceId, + }, + }); + return result.listBlobs; + }, + delete: async (key: string) => { + await fetcher({ + query: deleteBlobMutation, + variables: { + workspaceId, + hash: key, + }, + }); + }, + }; +}; diff --git a/packages/frontend/workspace/src/blob/storage/index.ts b/packages/frontend/workspace/src/blob/storage/index.ts new file mode 100644 index 0000000000..a8d9bd6e86 --- /dev/null +++ b/packages/frontend/workspace/src/blob/storage/index.ts @@ -0,0 +1,4 @@ +export * from './affine-cloud'; +export * from './indexeddb'; +export * from './sqlite'; +export * from './static'; diff --git a/packages/frontend/workspace/src/blob/storage/indexeddb.ts b/packages/frontend/workspace/src/blob/storage/indexeddb.ts new file mode 100644 index 0000000000..66973bd28a --- /dev/null +++ b/packages/frontend/workspace/src/blob/storage/indexeddb.ts @@ -0,0 +1,32 @@ +import { createStore, del, get, keys, set } from 'idb-keyval'; + +import type { BlobStorage } from '../engine'; + +export const createIndexeddbBlobStorage = ( + workspaceId: string +): BlobStorage => { + const db = createStore(`${workspaceId}_blob`, 'blob'); + const mimeTypeDb = createStore(`${workspaceId}_blob_mime`, 'blob_mime'); + return { + name: 'indexeddb', + readonly: false, + get: async (key: string) => { + const res = await get(key, db); + if (res) { + return new Blob([res], { type: await get(key, mimeTypeDb) }); + } + return undefined; + }, + set: async (key: string, value: Blob) => { + await set(key, await value.arrayBuffer(), db); + await set(key, value.type, mimeTypeDb); + }, + delete: async (key: string) => { + await del(key, db); + await del(key, mimeTypeDb); + }, + list: async () => { + return keys(db); + }, + }; +}; diff --git a/packages/frontend/workspace/src/blob/storage/sqlite.ts b/packages/frontend/workspace/src/blob/storage/sqlite.ts new file mode 100644 index 0000000000..3f3a0c080c --- /dev/null +++ b/packages/frontend/workspace/src/blob/storage/sqlite.ts @@ -0,0 +1,33 @@ +import { assertExists } from '@blocksuite/global/utils'; + +import type { BlobStorage } from '../engine'; +import { bufferToBlob } from '../util'; + +export const createSQLiteBlobStorage = (workspaceId: string): BlobStorage => { + const apis = window.apis; + assertExists(apis); + return { + name: 'sqlite', + readonly: false, + get: async (key: string) => { + const buffer = await apis.db.getBlob(workspaceId, key); + if (buffer) { + return bufferToBlob(buffer); + } + return undefined; + }, + set: async (key: string, value: Blob) => { + await apis.db.addBlob( + workspaceId, + key, + new Uint8Array(await value.arrayBuffer()) + ); + }, + delete: async (key: string) => { + return apis.db.deleteBlob(workspaceId, key); + }, + list: async () => { + return apis.db.getBlobKeys(workspaceId); + }, + }; +}; diff --git a/packages/frontend/workspace/src/blob/local-static-storage.ts b/packages/frontend/workspace/src/blob/storage/static.ts similarity index 67% rename from packages/frontend/workspace/src/blob/local-static-storage.ts rename to packages/frontend/workspace/src/blob/storage/static.ts index 93c3bf79c8..56578543c3 100644 --- a/packages/frontend/workspace/src/blob/local-static-storage.ts +++ b/packages/frontend/workspace/src/blob/storage/static.ts @@ -1,6 +1,4 @@ -import type { BlobStorage } from '@blocksuite/store'; - -import { bufferToBlob } from './util'; +import type { BlobStorage } from '../engine'; export const predefinedStaticFiles = [ '029uztLz2CzJezK7UUhrbGiWUdZ0J7NVs_qR6RDsvb8=', @@ -38,38 +36,36 @@ export const predefinedStaticFiles = [ 'v2yF7lY2L5rtorTtTmYFsoMb9dBPKs5M1y9cUKxcI1M=', ]; -export const createStaticStorage = (): BlobStorage => { +export const createStaticBlobStorage = (): BlobStorage => { return { - crud: { - get: async (key: string) => { - const isStaticResource = - predefinedStaticFiles.includes(key) || key.startsWith('/static/'); + name: 'static', + readonly: true, + get: async (key: string) => { + const isStaticResource = + predefinedStaticFiles.includes(key) || key.startsWith('/static/'); - if (!isStaticResource) { - return null; - } + if (!isStaticResource) { + return undefined; + } - const path = key.startsWith('/static/') ? key : `/static/${key}`; - const response = await fetch(path); + const path = key.startsWith('/static/') ? key : `/static/${key}`; + const response = await fetch(path); - if (response.ok) { - const buffer = await response.arrayBuffer(); - return bufferToBlob(buffer); - } + if (response.ok) { + return await response.blob(); + } - return null; - }, - set: async (key: string) => { - // ignore - return key; - }, - delete: async () => { - // ignore - }, - list: async () => { - // ignore - return []; - }, + return undefined; + }, + set: async () => { + // ignore + }, + delete: async () => { + // ignore + }, + list: async () => { + // ignore + return []; }, }; }; diff --git a/packages/frontend/workspace/src/manager/index.ts b/packages/frontend/workspace/src/manager/index.ts index 2bc94c2cff..0c0daec087 100644 --- a/packages/frontend/workspace/src/manager/index.ts +++ b/packages/frontend/workspace/src/manager/index.ts @@ -3,16 +3,19 @@ import type { BlockSuiteFeatureFlags } from '@affine/env/global'; import { WorkspaceFlavour } from '@affine/env/workspace'; import { createAffinePublicProviders } from '@affine/workspace/providers'; import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models'; -import type { DocProviderCreator, StoreOptions } from '@blocksuite/store'; -import { createIndexeddbStorage, Schema, Workspace } from '@blocksuite/store'; +import type { DocProviderCreator } from '@blocksuite/store'; +import { Schema, Workspace } from '@blocksuite/store'; import { INTERNAL_BLOCKSUITE_HASH_MAP } from '@toeverything/infra/__internal__/workspace'; import { nanoid } from 'nanoid'; import type { Doc } from 'yjs'; import type { Transaction } from 'yjs'; -import { createCloudBlobStorage } from '../blob/cloud-blob-storage'; -import { createStaticStorage } from '../blob/local-static-storage'; -import { createSQLiteStorage } from '../blob/sqlite-blob-storage'; +import type { BlobEngine } from '../blob'; +import { + createAffineCloudBlobEngine, + createAffinePublicBlobEngine, + createLocalBlobEngine, +} from '../blob'; import { createAffineProviders, createLocalProviders } from '../providers'; function setEditorFlags(workspace: Workspace) { @@ -81,6 +84,12 @@ const createMonitor = (doc: Doc) => { }); }; +const workspaceBlobEngineWeakMap = new WeakMap(); +export function getBlobEngine(workspace: Workspace) { + // temporary solution to get blob engine from workspace + return workspaceBlobEngineWeakMap.get(workspace); +} + // if not exist, create a new workspace export function getOrCreateWorkspace( id: string, @@ -91,48 +100,47 @@ export function getOrCreateWorkspace( return INTERNAL_BLOCKSUITE_HASH_MAP.get(id) as Workspace; } - const blobStorages: StoreOptions['blobStorages'] = []; + let blobEngine: BlobEngine; if (flavour === WorkspaceFlavour.AFFINE_CLOUD) { - if (isBrowser) { - blobStorages.push(createIndexeddbStorage); - blobStorages.push(createCloudBlobStorage); - if (environment.isDesktop && runtimeConfig.enableSQLiteProvider) { - blobStorages.push(createSQLiteStorage); - } - providerCreators.push(...createAffineProviders()); - - // todo(JimmFly): add support for cloud storage - } + blobEngine = createAffineCloudBlobEngine(id); + providerCreators.push(...createAffineProviders()); } else if (flavour === WorkspaceFlavour.LOCAL) { - if (isBrowser) { - blobStorages.push(createIndexeddbStorage); - if (environment.isDesktop && runtimeConfig.enableSQLiteProvider) { - blobStorages.push(createSQLiteStorage); - } - } + blobEngine = createLocalBlobEngine(id); providerCreators.push(...createLocalProviders()); } else if (flavour === WorkspaceFlavour.AFFINE_PUBLIC) { - if (isBrowser) { - blobStorages.push(createIndexeddbStorage); - if (environment.isDesktop && runtimeConfig.enableSQLiteProvider) { - blobStorages.push(createSQLiteStorage); - } - } - blobStorages.push(createCloudBlobStorage); + blobEngine = createAffinePublicBlobEngine(id); providerCreators.push(...createAffinePublicProviders()); } else { throw new Error('unsupported flavour'); } - blobStorages.push(createStaticStorage); const workspace = new Workspace({ id, isSSR: !isBrowser, providerCreators: typeof window === 'undefined' ? [] : providerCreators, - blobStorages: blobStorages, + blobStorages: [ + () => ({ + crud: { + async get(key) { + return (await blobEngine.get(key)) ?? null; + }, + async set(key, value) { + await blobEngine.set(key, value); + return key; + }, + async delete(key) { + return blobEngine.delete(key); + }, + async list() { + return blobEngine.list(); + }, + }, + }), + ], idGenerator: () => nanoid(), schema: globalBlockSuiteSchema, }); + workspaceBlobEngineWeakMap.set(workspace, blobEngine); createMonitor(workspace.doc); setEditorFlags(workspace); INTERNAL_BLOCKSUITE_HASH_MAP.set(id, workspace); diff --git a/yarn.lock b/yarn.lock index 9b79d3a023..5b39ea07cc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -892,6 +892,7 @@ __metadata: async-call-rpc: "npm:^6.3.1" fake-indexeddb: "npm:^5.0.0" idb: "npm:^7.1.1" + idb-keyval: "npm:^6.2.1" is-svg: "npm:^5.0.0" jotai: "npm:^2.5.1" js-base64: "npm:^3.7.5"