From aa4c7407de7abdbfe535268213a06b4401923f6c Mon Sep 17 00:00:00 2001 From: EYHN Date: Fri, 17 Nov 2023 15:50:01 +0800 Subject: [PATCH] refactor: new provider (#4900) --- .../core/src/adapters/local/index.tsx | 11 - .../workspace-card/index.tsx | 75 ++-- .../workspace-card/styles.ts | 5 - .../workspace-upgrade/upgrade-hooks.ts | 57 +-- .../hooks/current/use-current-sync-engine.ts | 33 ++ .../core/src/hooks/use-datasource-sync.ts | 91 ---- .../core/src/layouts/workspace-layout.tsx | 28 -- .../core/src/pages/share/detail-page.tsx | 6 +- .../core/src/pages/workspace/detail-page.tsx | 132 ++++-- .../core/src/pages/workspace/index.tsx | 30 +- packages/frontend/workspace/package.json | 5 +- .../frontend/workspace/src/affine/crud.ts | 21 - .../frontend/workspace/src/affine/download.ts | 37 ++ .../frontend/workspace/src/affine/index.ts | 263 ------------ .../frontend/workspace/src/affine/sync.ts | 120 ------ packages/frontend/workspace/src/atom.ts | 1 - .../__tests__/indexeddb-provider.spec.ts | 90 ---- .../__tests__/socketio-provider.spec.ts | 103 ----- .../__tests__/sqlite-provider.spec.ts | 165 -------- .../src/providers/awareness/affine/index.ts | 101 +++++ .../awareness/broadcast-channel/index.ts | 63 +++ .../src/providers/awareness/index.ts | 7 + .../workspace/src/providers/cloud/index.ts | 110 ----- .../frontend/workspace/src/providers/index.ts | 258 +++++------- .../workspace/src/providers/logger.ts | 3 - .../src/providers/sqlite-providers.ts | 133 ------ .../storage}/affine/batch-sync-sender.ts | 0 .../src/providers/storage/affine/index.ts | 162 +++++++ .../workspace/src/providers/storage/index.ts | 29 ++ .../src/providers/storage/indexeddb/index.ts | 133 ++++++ .../src/providers/storage/sqlite/index.ts | 38 ++ .../src/providers/sync/__tests__/sync.spec.ts | 62 +++ .../workspace/src/providers/sync/engine.ts | 225 ++++++++++ .../workspace/src/providers/sync/index.ts | 18 + .../workspace/src/providers/sync/peer.ts | 397 ++++++++++++++++++ .../utils/__tests__/async-queue.spec.ts | 45 ++ .../utils/__tests__/throw-if-aborted.spec.ts | 13 + .../src/providers/utils/affine-io.ts | 15 + .../src/providers/utils/async-queue.ts | 58 +++ .../utils.ts => providers/utils/base64.ts} | 17 - .../src/providers/utils/throw-if-aborted.ts | 7 + tests/affine-desktop/e2e/basic.spec.ts | 35 +- tests/affine-local/e2e/duplicate-page.spec.ts | 2 +- .../e2e/local-first-collections-items.spec.ts | 6 +- tests/affine-local/e2e/quick-search.spec.ts | 39 +- tests/affine-local/e2e/settings.spec.ts | 4 +- tests/kit/utils/workspace.ts | 8 +- yarn.lock | 2 + 48 files changed, 1783 insertions(+), 1480 deletions(-) create mode 100644 packages/frontend/core/src/hooks/current/use-current-sync-engine.ts delete mode 100644 packages/frontend/core/src/hooks/use-datasource-sync.ts create mode 100644 packages/frontend/workspace/src/affine/download.ts delete mode 100644 packages/frontend/workspace/src/affine/index.ts delete mode 100644 packages/frontend/workspace/src/affine/sync.ts delete mode 100644 packages/frontend/workspace/src/providers/__tests__/indexeddb-provider.spec.ts delete mode 100644 packages/frontend/workspace/src/providers/__tests__/socketio-provider.spec.ts delete mode 100644 packages/frontend/workspace/src/providers/__tests__/sqlite-provider.spec.ts create mode 100644 packages/frontend/workspace/src/providers/awareness/affine/index.ts create mode 100644 packages/frontend/workspace/src/providers/awareness/broadcast-channel/index.ts create mode 100644 packages/frontend/workspace/src/providers/awareness/index.ts delete mode 100644 packages/frontend/workspace/src/providers/cloud/index.ts delete mode 100644 packages/frontend/workspace/src/providers/logger.ts delete mode 100644 packages/frontend/workspace/src/providers/sqlite-providers.ts rename packages/frontend/workspace/src/{ => providers/storage}/affine/batch-sync-sender.ts (100%) create mode 100644 packages/frontend/workspace/src/providers/storage/affine/index.ts create mode 100644 packages/frontend/workspace/src/providers/storage/index.ts create mode 100644 packages/frontend/workspace/src/providers/storage/indexeddb/index.ts create mode 100644 packages/frontend/workspace/src/providers/storage/sqlite/index.ts create mode 100644 packages/frontend/workspace/src/providers/sync/__tests__/sync.spec.ts create mode 100644 packages/frontend/workspace/src/providers/sync/engine.ts create mode 100644 packages/frontend/workspace/src/providers/sync/index.ts create mode 100644 packages/frontend/workspace/src/providers/sync/peer.ts create mode 100644 packages/frontend/workspace/src/providers/utils/__tests__/async-queue.spec.ts create mode 100644 packages/frontend/workspace/src/providers/utils/__tests__/throw-if-aborted.spec.ts create mode 100644 packages/frontend/workspace/src/providers/utils/affine-io.ts create mode 100644 packages/frontend/workspace/src/providers/utils/async-queue.ts rename packages/frontend/workspace/src/{affine/utils.ts => providers/utils/base64.ts} (67%) create mode 100644 packages/frontend/workspace/src/providers/utils/throw-if-aborted.ts diff --git a/packages/frontend/core/src/adapters/local/index.tsx b/packages/frontend/core/src/adapters/local/index.tsx index a6a84d5b96..566b701f7a 100644 --- a/packages/frontend/core/src/adapters/local/index.tsx +++ b/packages/frontend/core/src/adapters/local/index.tsx @@ -3,7 +3,6 @@ import { DEFAULT_WORKSPACE_NAME, PageNotFoundError, } from '@affine/env/constant'; -import type { LocalIndexedDBDownloadProvider } from '@affine/env/workspace'; import type { WorkspaceAdapter } from '@affine/env/workspace'; import { LoadPriority, @@ -18,7 +17,6 @@ import { getOrCreateWorkspace, globalBlockSuiteSchema, } from '@affine/workspace/manager'; -import { createIndexedDBDownloadProvider } from '@affine/workspace/providers'; import { getBlockSuiteWorkspaceAtom } from '@toeverything/infra/__internal__/workspace'; import { getCurrentStore } from '@toeverything/infra/atom'; import { initEmptyPage } from '@toeverything/infra/blocksuite'; @@ -66,15 +64,6 @@ export const LocalAdapter: WorkspaceAdapter = { logger.error('init page with empty failed', error); }); } - const provider = createIndexedDBDownloadProvider( - blockSuiteWorkspace.id, - blockSuiteWorkspace.doc, - { - awareness: blockSuiteWorkspace.awarenessStore.awareness, - } - ) as LocalIndexedDBDownloadProvider; - provider.sync(); - provider.whenReady.catch(console.error); saveWorkspaceToLocalStorage(blockSuiteWorkspace.id); logger.debug('create first workspace'); return [blockSuiteWorkspace.id]; diff --git a/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx b/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx index d7829bea30..dd73f412d7 100644 --- a/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx +++ b/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/index.tsx @@ -1,4 +1,5 @@ import { WorkspaceFlavour } from '@affine/env/workspace'; +import { SyncEngineStatus } from '@affine/workspace/providers'; import { CloudWorkspaceIcon, LocalWorkspaceIcon, @@ -9,16 +10,17 @@ import { Avatar } from '@toeverything/components/avatar'; import { Tooltip } from '@toeverything/components/tooltip'; import { useBlockSuiteWorkspaceAvatarUrl } from '@toeverything/hooks/use-block-suite-workspace-avatar-url'; import { useBlockSuiteWorkspaceName } from '@toeverything/hooks/use-block-suite-workspace-name'; -import { atom, useSetAtom } from 'jotai'; +import { debounce } from 'lodash-es'; import { forwardRef, type HTMLAttributes, - type MouseEvent, useCallback, + useEffect, useMemo, + useState, } from 'react'; -import { useDatasourceSync } from '../../../../hooks/use-datasource-sync'; +import { useCurrentSyncEngine } from '../../../../hooks/current/use-current-sync-engine'; import { useSystemOnline } from '../../../../hooks/use-system-online'; import type { AllWorkspace } from '../../../../shared'; import { Loading } from './loading-icon'; @@ -29,8 +31,6 @@ import { StyledWorkspaceStatus, } from './styles'; -const hoverAtom = atom(false); - // FIXME: // 1. Remove mui style // 2. Refactor the code to improve readability @@ -86,63 +86,62 @@ const WorkspaceStatus = ({ }) => { const isOnline = useSystemOnline(); - // todo: finish display sync status - const [forceSyncStatus, startForceSync] = useDatasourceSync( - currentWorkspace.blockSuiteWorkspace + const [syncEngineStatus, setSyncEngineStatus] = useState( + SyncEngineStatus.Synced ); - const setIsHovered = useSetAtom(hoverAtom); + const syncEngine = useCurrentSyncEngine(); + + useEffect(() => { + setSyncEngineStatus(syncEngine?.status ?? SyncEngineStatus.Synced); + const disposable = syncEngine?.onStatusChange.on( + debounce(status => { + setSyncEngineStatus(status); + }, 500) + ); + return () => { + disposable?.dispose(); + }; + }, [syncEngine]); const content = useMemo(() => { + // TODO: add i18n if (currentWorkspace.flavour === WorkspaceFlavour.LOCAL) { return 'Saved locally'; } if (!isOnline) { return 'Disconnected, please check your network connection'; } - switch (forceSyncStatus.type) { - case 'syncing': + switch (syncEngineStatus) { + case SyncEngineStatus.Syncing: + case SyncEngineStatus.LoadingSubDoc: + case SyncEngineStatus.LoadingRootDoc: return 'Syncing with AFFiNE Cloud'; - case 'error': - return 'Sync failed due to server issues, please try again later.'; + case SyncEngineStatus.Retrying: + return 'Sync disconnected due to unexpected issues, reconnecting.'; default: - return 'Sync with AFFiNE Cloud'; + return 'Synced with AFFiNE Cloud'; } - }, [currentWorkspace.flavour, forceSyncStatus.type, isOnline]); + }, [currentWorkspace.flavour, syncEngineStatus, isOnline]); const CloudWorkspaceSyncStatus = useCallback(() => { - if (forceSyncStatus.type === 'syncing') { + if ( + syncEngineStatus === SyncEngineStatus.Syncing || + syncEngineStatus === SyncEngineStatus.LoadingSubDoc || + syncEngineStatus === SyncEngineStatus.LoadingRootDoc + ) { return SyncingWorkspaceStatus(); - } else if (forceSyncStatus.type === 'error') { + } else if (syncEngineStatus === SyncEngineStatus.Retrying) { return UnSyncWorkspaceStatus(); } else { return CloudWorkspaceStatus(); } - }, [forceSyncStatus.type]); + }, [syncEngineStatus]); - const handleClick = useCallback( - (e: MouseEvent) => { - e.stopPropagation(); - if ( - currentWorkspace.flavour === WorkspaceFlavour.LOCAL || - forceSyncStatus.type === 'syncing' - ) { - return; - } - startForceSync(); - }, - [currentWorkspace.flavour, forceSyncStatus.type, startForceSync] - ); return (
- { - setIsHovered(true); - }} - onMouseLeave={() => setIsHovered(false)} - onClick={handleClick} - > + {currentWorkspace.flavour === WorkspaceFlavour.AFFINE_CLOUD ? ( !isOnline ? ( diff --git a/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/styles.ts b/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/styles.ts index a510341dfb..00423908a0 100644 --- a/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/styles.ts +++ b/packages/frontend/core/src/components/pure/workspace-slider-bar/workspace-card/styles.ts @@ -45,10 +45,5 @@ export const StyledWorkspaceStatus = styled('div')(() => { color: 'var(--affine-icon-color)', fontSize: 'var(--affine-font-base)', }, - ':hover': { - cursor: 'pointer', - borderRadius: '4px', - background: 'var(--affine-hover-color)', - }, }; }); diff --git a/packages/frontend/core/src/components/workspace-upgrade/upgrade-hooks.ts b/packages/frontend/core/src/components/workspace-upgrade/upgrade-hooks.ts index a0e6e1201c..78422f8ec8 100644 --- a/packages/frontend/core/src/components/workspace-upgrade/upgrade-hooks.ts +++ b/packages/frontend/core/src/components/workspace-upgrade/upgrade-hooks.ts @@ -1,13 +1,7 @@ -import type { - AffineSocketIOProvider, - LocalIndexedDBBackgroundProvider, - SQLiteProvider, -} from '@affine/env/workspace'; -import { assertExists } from '@blocksuite/global/utils'; import { forceUpgradePages } from '@toeverything/infra/blocksuite'; -import { useCallback, useMemo, useState } from 'react'; -import { syncDataSourceFromDoc, syncDocFromDataSource } from 'y-provider'; +import { useCallback, useState } from 'react'; +import { useCurrentSyncEngine } from '../../hooks/current/use-current-sync-engine'; import { useCurrentWorkspace } from '../../hooks/current/use-current-workspace'; export type UpgradeState = 'pending' | 'upgrading' | 'done' | 'error'; @@ -17,56 +11,20 @@ export function useUpgradeWorkspace() { const [error, setError] = useState(null); const [workspace] = useCurrentWorkspace(); - const providers = workspace.blockSuiteWorkspace.providers; - const remoteProvider: AffineSocketIOProvider | undefined = useMemo(() => { - return providers.find( - (provider): provider is AffineSocketIOProvider => - provider.flavour === 'affine-socket-io' - ); - }, [providers]); - const localProvider = useMemo(() => { - const sqliteProvider = providers.find( - (provider): provider is SQLiteProvider => provider.flavour === 'sqlite' - ); - const indexedDbProvider = providers.find( - (provider): provider is LocalIndexedDBBackgroundProvider => - provider.flavour === 'local-indexeddb-background' - ); - const provider = sqliteProvider || indexedDbProvider; - assertExists(provider, 'no local provider'); - return provider; - }, [providers]); + const syncEngine = useCurrentSyncEngine(); const upgradeWorkspace = useCallback(() => { setState('upgrading'); setError(null); (async () => { - await syncDocFromDataSource( - workspace.blockSuiteWorkspace.doc, - localProvider.datasource - ); - if (remoteProvider) { - await syncDocFromDataSource( - workspace.blockSuiteWorkspace.doc, - remoteProvider.datasource - ); - } - + await syncEngine?.waitForSynced(); await forceUpgradePages({ getCurrentRootDoc: async () => workspace.blockSuiteWorkspace.doc, getSchema: () => workspace.blockSuiteWorkspace.schema, }); - await syncDataSourceFromDoc( - workspace.blockSuiteWorkspace.doc, - localProvider.datasource - ); - if (remoteProvider) { - await syncDataSourceFromDoc( - workspace.blockSuiteWorkspace.doc, - remoteProvider.datasource - ); - } + + await syncEngine?.waitForSynced(); setState('done'); })().catch((e: any) => { @@ -75,10 +33,9 @@ export function useUpgradeWorkspace() { setState('error'); }); }, [ - localProvider.datasource, - remoteProvider, workspace.blockSuiteWorkspace.doc, workspace.blockSuiteWorkspace.schema, + syncEngine, ]); return [state, error, upgradeWorkspace] as const; diff --git a/packages/frontend/core/src/hooks/current/use-current-sync-engine.ts b/packages/frontend/core/src/hooks/current/use-current-sync-engine.ts new file mode 100644 index 0000000000..2fd2fcdac0 --- /dev/null +++ b/packages/frontend/core/src/hooks/current/use-current-sync-engine.ts @@ -0,0 +1,33 @@ +import type { SyncEngine, SyncEngineStatus } from '@affine/workspace/providers'; +import { useEffect, useState } from 'react'; + +import { useCurrentWorkspace } from './use-current-workspace'; + +export function useCurrentSyncEngine(): SyncEngine | undefined { + const [workspace] = useCurrentWorkspace(); + // FIXME: This is a hack to get the sync engine, we need refactor this in the future. + const syncEngine = ( + workspace.blockSuiteWorkspace.providers[0] as { engine?: SyncEngine } + )?.engine; + + return syncEngine; +} + +export function useCurrentSyncEngineStatus(): SyncEngineStatus | undefined { + const syncEngine = useCurrentSyncEngine(); + const [status, setStatus] = useState(); + + useEffect(() => { + if (syncEngine) { + setStatus(syncEngine.status); + return syncEngine.onStatusChange.on(status => { + setStatus(status); + }).dispose; + } else { + setStatus(undefined); + } + return; + }, [syncEngine]); + + return status; +} diff --git a/packages/frontend/core/src/hooks/use-datasource-sync.ts b/packages/frontend/core/src/hooks/use-datasource-sync.ts deleted file mode 100644 index 7bb2e9b545..0000000000 --- a/packages/frontend/core/src/hooks/use-datasource-sync.ts +++ /dev/null @@ -1,91 +0,0 @@ -import { pushNotificationAtom } from '@affine/component/notification-center'; -import type { - AffineSocketIOProvider, - LocalIndexedDBBackgroundProvider, - SQLiteProvider, -} from '@affine/env/workspace'; -import { assertExists } from '@blocksuite/global/utils'; -import type { Workspace } from '@blocksuite/store'; -import { useSetAtom } from 'jotai'; -import { startTransition, useCallback, useMemo, useState } from 'react'; -import { type Status, syncDataSource } from 'y-provider'; - -export function useDatasourceSync(workspace: Workspace) { - const [status, setStatus] = useState({ - type: 'idle', - }); - const pushNotification = useSetAtom(pushNotificationAtom); - const providers = workspace.providers; - const remoteProvider: AffineSocketIOProvider | undefined = useMemo(() => { - return providers.find( - (provider): provider is AffineSocketIOProvider => - provider.flavour === 'affine-socket-io' - ); - }, [providers]); - const localProvider = useMemo(() => { - const sqliteProvider = providers.find( - (provider): provider is SQLiteProvider => provider.flavour === 'sqlite' - ); - const indexedDbProvider = providers.find( - (provider): provider is LocalIndexedDBBackgroundProvider => - provider.flavour === 'local-indexeddb-background' - ); - const provider = sqliteProvider || indexedDbProvider; - assertExists(provider, 'no local provider'); - return provider; - }, [providers]); - return [ - status, - useCallback(() => { - if (!remoteProvider) { - return; - } - startTransition(() => { - setStatus({ - type: 'syncing', - }); - }); - syncDataSource( - () => [ - workspace.doc.guid, - ...[...workspace.doc.subdocs].map(doc => doc.guid), - ], - remoteProvider.datasource, - localProvider.datasource - ) - .then(async () => { - // by default, the syncing status will show for 2.4s - setTimeout(() => { - startTransition(() => { - setStatus({ - type: 'synced', - }); - pushNotification({ - title: 'Synced successfully', - type: 'success', - }); - }); - }, 2400); - }) - .catch(error => { - startTransition(() => { - setStatus({ - type: 'error', - error, - }); - pushNotification({ - title: 'Unable to Sync', - message: 'Server error, please try again later.', - type: 'error', - }); - }); - }); - }, [ - remoteProvider, - localProvider.datasource, - workspace.doc.guid, - workspace.doc.subdocs, - pushNotification, - ]), - ] as const; -} diff --git a/packages/frontend/core/src/layouts/workspace-layout.tsx b/packages/frontend/core/src/layouts/workspace-layout.tsx index e8779f6b0c..4140d33d63 100644 --- a/packages/frontend/core/src/layouts/workspace-layout.tsx +++ b/packages/frontend/core/src/layouts/workspace-layout.tsx @@ -15,7 +15,6 @@ import { import { useAFFiNEI18N } from '@affine/i18n/hooks'; import { rootWorkspacesMetadataAtom } from '@affine/workspace/atom'; import { assertExists } from '@blocksuite/global/utils'; -import type { Page } from '@blocksuite/store'; import type { DragEndEvent } from '@dnd-kit/core'; import { DndContext, @@ -27,7 +26,6 @@ import { useSensors, } from '@dnd-kit/core'; import { useBlockSuitePageMeta } from '@toeverything/hooks/use-block-suite-page-meta'; -import { loadPage } from '@toeverything/hooks/use-block-suite-workspace-page'; import { currentWorkspaceIdAtom } from '@toeverything/infra/atom'; import { useAtom, useAtomValue, useSetAtom } from 'jotai'; import type { PropsWithChildren, ReactNode } from 'react'; @@ -117,30 +115,6 @@ type WorkspaceLayoutProps = { incompatible?: boolean; }; -// fix https://github.com/toeverything/AFFiNE/issues/4825 -function useLoadWorkspacePages() { - const [currentWorkspace] = useCurrentWorkspace(); - const pageMetas = useBlockSuitePageMeta(currentWorkspace.blockSuiteWorkspace); - - useEffect(() => { - if (currentWorkspace) { - const timer = setTimeout(() => { - const pageIds = pageMetas.map(meta => meta.id); - const pages = pageIds - .map(id => currentWorkspace.blockSuiteWorkspace.getPage(id)) - .filter((p): p is Page => !!p); - pages.forEach(page => { - loadPage(page, -10).catch(e => console.error(e)); - }); - }, 10 * 1000); // load pages after 10s - return () => { - clearTimeout(timer); - }; - } - return; - }, [currentWorkspace, pageMetas]); -} - export const WorkspaceLayout = function WorkspacesSuspense({ children, incompatible = false, @@ -255,8 +229,6 @@ export const WorkspaceLayoutInner = ({ const inTrashPage = pageMeta?.trash ?? false; const setMainContainer = useSetAtom(mainContainerAtom); - useLoadWorkspacePages(); - return ( <> {/* This DndContext is used for drag page from all-pages list into a folder in sidebar */} diff --git a/packages/frontend/core/src/pages/share/detail-page.tsx b/packages/frontend/core/src/pages/share/detail-page.tsx index 07ce82d3a8..0776f83237 100644 --- a/packages/frontend/core/src/pages/share/detail-page.tsx +++ b/packages/frontend/core/src/pages/share/detail-page.tsx @@ -1,9 +1,9 @@ import { MainContainer } from '@affine/component/workspace'; import { DebugLogger } from '@affine/debug'; import { WorkspaceFlavour } from '@affine/env/workspace'; +import type { CloudDoc } from '@affine/workspace/affine/download'; +import { downloadBinaryFromCloud } from '@affine/workspace/affine/download'; import { getOrCreateWorkspace } from '@affine/workspace/manager'; -import { downloadBinaryFromCloud } from '@affine/workspace/providers'; -import type { CloudDoc } from '@affine/workspace/providers/cloud'; import { assertExists } from '@blocksuite/global/utils'; import type { Page } from '@blocksuite/store'; import { noop } from 'foxact/noop'; @@ -30,7 +30,7 @@ type LoaderData = { }; function assertDownloadResponse( - value: CloudDoc | boolean + value: CloudDoc | null ): asserts value is CloudDoc { if ( !value || diff --git a/packages/frontend/core/src/pages/workspace/detail-page.tsx b/packages/frontend/core/src/pages/workspace/detail-page.tsx index 5a546028f1..5bcf79eb8d 100644 --- a/packages/frontend/core/src/pages/workspace/detail-page.tsx +++ b/packages/frontend/core/src/pages/workspace/detail-page.tsx @@ -5,20 +5,18 @@ import { } from '@affine/component/page-list'; import { WorkspaceSubPath } from '@affine/env/workspace'; import { globalBlockSuiteSchema } from '@affine/workspace/manager'; +import { SyncEngineStatus } from '@affine/workspace/providers'; import type { EditorContainer } from '@blocksuite/editor'; import { assertExists } from '@blocksuite/global/utils'; import type { Page } from '@blocksuite/store'; import { contentLayoutAtom, currentPageIdAtom, - currentWorkspaceAtom, currentWorkspaceIdAtom, - getCurrentStore, } from '@toeverything/infra/atom'; import { useAtomValue, useSetAtom } from 'jotai'; -import { type ReactElement, useCallback } from 'react'; -import type { LoaderFunction } from 'react-router-dom'; -import { redirect } from 'react-router-dom'; +import { type ReactElement, useCallback, useEffect, useState } from 'react'; +import { type LoaderFunction, useParams } from 'react-router-dom'; import type { Map as YMap } from 'yjs'; import { getUIAdapter } from '../../adapters/workspace'; @@ -27,6 +25,7 @@ import { collectionsCRUDAtom } from '../../atoms/collections'; import { currentModeAtom } from '../../atoms/mode'; import { WorkspaceHeader } from '../../components/workspace-header'; import { useRegisterBlocksuiteEditorCommands } from '../../hooks/affine/use-register-blocksuite-editor-commands'; +import { useCurrentSyncEngineStatus } from '../../hooks/current/use-current-sync-engine'; import { useCurrentWorkspace } from '../../hooks/current/use-current-workspace'; import { useNavigateHelper } from '../../hooks/use-navigate-helper'; import { performanceRenderLogger } from '../../shared'; @@ -107,46 +106,113 @@ const DetailPageImpl = (): ReactElement => { export const DetailPage = (): ReactElement => { const [currentWorkspace] = useCurrentWorkspace(); + const currentSyncEngineStatus = useCurrentSyncEngineStatus(); const currentPageId = useAtomValue(currentPageIdAtom); - const page = currentPageId - ? currentWorkspace.blockSuiteWorkspace.getPage(currentPageId) - : null; + const [page, setPage] = useState(null); + const [pageLoaded, setPageLoaded] = useState(false); - if (!currentPageId || !page) { + // load page by current page id + useEffect(() => { + if (!currentPageId) { + setPage(null); + return; + } + + const exists = currentWorkspace.blockSuiteWorkspace.getPage(currentPageId); + + if (exists) { + setPage(exists); + return; + } + + const dispose = currentWorkspace.blockSuiteWorkspace.slots.pagesUpdated.on( + () => { + const exists = + currentWorkspace.blockSuiteWorkspace.getPage(currentPageId); + + if (exists) { + setPage(exists); + } + } + ); + + return dispose.dispose; + }, [currentPageId, currentWorkspace]); + + const navigate = useNavigateHelper(); + + // if sync engine has been synced and the page is null, wait 1s and jump to 404 page. + useEffect(() => { + if (currentSyncEngineStatus === SyncEngineStatus.Synced && !page) { + const timeout = setTimeout(() => { + navigate.jumpTo404(); + }, 1000); + return () => { + clearTimeout(timeout); + }; + } + return; + }, [currentSyncEngineStatus, navigate, page]); + + // wait for page to be loaded + useEffect(() => { + if (page) { + if (!page.loaded) { + setPageLoaded(true); + } else { + setPageLoaded(false); + // call waitForLoaded to trigger load + page + .load(() => {}) + .catch(() => { + // do nothing + }); + return page.slots.ready.on(() => { + setPageLoaded(true); + }).dispose; + } + } else { + setPageLoaded(false); + } + return; + }, [page]); + + if (!currentPageId || !page || !pageLoaded) { return ; } + + if (page.meta.jumpOnce) { + currentWorkspace.blockSuiteWorkspace.setPageMeta(page.id, { + jumpOnce: false, + }); + } + return ; }; -export const loader: LoaderFunction = async args => { - const rootStore = getCurrentStore(); - rootStore.set(contentLayoutAtom, 'editor'); - if (args.params.workspaceId) { - localStorage.setItem('last_workspace_id', args.params.workspaceId); - rootStore.set(currentWorkspaceIdAtom, args.params.workspaceId); - } - const currentWorkspace = await rootStore.get(currentWorkspaceAtom); - if (args.params.pageId) { - const pageId = args.params.pageId; - localStorage.setItem('last_page_id', pageId); - const page = currentWorkspace.getPage(pageId); - if (!page) { - return redirect('/404'); - } - if (page.meta.jumpOnce) { - currentWorkspace.setPageMeta(page.id, { - jumpOnce: false, - }); - } - rootStore.set(currentPageIdAtom, pageId); - } else { - return redirect('/404'); - } +export const loader: LoaderFunction = async () => { return null; }; export const Component = () => { performanceRenderLogger.info('DetailPage'); + const setContentLayout = useSetAtom(contentLayoutAtom); + const setCurrentWorkspaceId = useSetAtom(currentWorkspaceIdAtom); + const setCurrentPageId = useSetAtom(currentPageIdAtom); + const params = useParams(); + + useEffect(() => { + setContentLayout('editor'); + if (params.workspaceId) { + localStorage.setItem('last_workspace_id', params.workspaceId); + setCurrentWorkspaceId(params.workspaceId); + } + if (params.pageId) { + localStorage.setItem('last_page_id', params.pageId); + setCurrentPageId(params.pageId); + } + }, [params, setContentLayout, setCurrentPageId, setCurrentWorkspaceId]); + return ; }; diff --git a/packages/frontend/core/src/pages/workspace/index.tsx b/packages/frontend/core/src/pages/workspace/index.tsx index 29a2097252..5102ae3b0a 100644 --- a/packages/frontend/core/src/pages/workspace/index.tsx +++ b/packages/frontend/core/src/pages/workspace/index.tsx @@ -7,12 +7,14 @@ import { getCurrentStore, } from '@toeverything/infra/atom'; import { guidCompatibilityFix } from '@toeverything/infra/blocksuite'; -import type { ReactElement } from 'react'; +import { useSetAtom } from 'jotai'; +import { type ReactElement, useEffect } from 'react'; import { type LoaderFunction, Outlet, redirect, useLoaderData, + useParams, } from 'react-router-dom'; import { WorkspaceLayout } from '../../layouts/workspace-layout'; @@ -24,6 +26,12 @@ export const loader: LoaderFunction = async args => { workspaceLoaderLogger.info('start'); const rootStore = getCurrentStore(); + + if (args.params.workspaceId) { + localStorage.setItem('last_workspace_id', args.params.workspaceId); + rootStore.set(currentWorkspaceIdAtom, args.params.workspaceId); + } + const meta = await rootStore.get(rootWorkspacesMetadataAtom); workspaceLoaderLogger.info('meta loaded'); @@ -31,10 +39,7 @@ export const loader: LoaderFunction = async args => { if (!currentMetadata) { return redirect('/404'); } - if (args.params.workspaceId) { - localStorage.setItem('last_workspace_id', args.params.workspaceId); - rootStore.set(currentWorkspaceIdAtom, args.params.workspaceId); - } + if (!args.params.pageId) { rootStore.set(currentPageIdAtom, null); } @@ -43,6 +48,10 @@ export const loader: LoaderFunction = async args => { workspaceLoaderLogger.info('get cloud workspace atom'); const workspace = await rootStore.get(workspaceAtom); + if (!workspace.doc.isLoaded) { + await workspace.doc.whenLoaded; + } + workspaceLoaderLogger.info('workspace loaded'); return (() => { guidCompatibilityFix(workspace.doc); const blockVersions = workspace.meta.blockVersions; @@ -65,6 +74,17 @@ export const loader: LoaderFunction = async args => { export const Component = (): ReactElement => { performanceRenderLogger.info('WorkspaceLayout'); + const setCurrentWorkspaceId = useSetAtom(currentWorkspaceIdAtom); + + const params = useParams(); + + useEffect(() => { + if (params.workspaceId) { + localStorage.setItem('last_workspace_id', params.workspaceId); + setCurrentWorkspaceId(params.workspaceId); + } + }, [params, setCurrentWorkspaceId]); + const incompatible = useLoaderData(); return ( diff --git a/packages/frontend/workspace/package.json b/packages/frontend/workspace/package.json index 301a17552b..64e82c8ae2 100644 --- a/packages/frontend/workspace/package.json +++ b/packages/frontend/workspace/package.json @@ -4,15 +4,13 @@ "exports": { "./atom": "./src/atom.ts", "./manager": "./src/manager/index.ts", - "./type": "./src/type.ts", - "./migration": "./src/migration/index.ts", "./local/crud": "./src/local/crud.ts", - "./affine": "./src/affine/index.ts", "./affine/*": "./src/affine/*.ts", "./providers": "./src/providers/index.ts" }, "peerDependencies": { "@blocksuite/blocks": "*", + "@blocksuite/global": "*", "@blocksuite/store": "*" }, "dependencies": { @@ -23,6 +21,7 @@ "@toeverything/hooks": "workspace:*", "@toeverything/y-indexeddb": "workspace:*", "async-call-rpc": "^6.3.1", + "idb": "^7.1.1", "is-svg": "^5.0.0", "jotai": "^2.4.3", "js-base64": "^3.7.5", diff --git a/packages/frontend/workspace/src/affine/crud.ts b/packages/frontend/workspace/src/affine/crud.ts index 0edbcf663e..16e08d8306 100644 --- a/packages/frontend/workspace/src/affine/crud.ts +++ b/packages/frontend/workspace/src/affine/crud.ts @@ -9,16 +9,10 @@ import { getWorkspaceQuery, getWorkspacesQuery, } from '@affine/graphql'; -import { createAffineDataSource } from '@affine/workspace/affine/index'; import { createIndexeddbStorage, Workspace } from '@blocksuite/store'; import { migrateLocalBlobStorage } from '@toeverything/infra/blocksuite'; -import { - createIndexedDBProvider, - DEFAULT_DB_NAME, -} from '@toeverything/y-indexeddb'; import { getSession } from 'next-auth/react'; import { proxy } from 'valtio/vanilla'; -import { syncDataSourceFromDoc } from 'y-provider'; import { getOrCreateWorkspace } from '../manager'; import { fetcher } from './gql'; @@ -77,21 +71,6 @@ export const CRUD: WorkspaceCRUD = { }) ); - const datasource = createAffineDataSource( - createWorkspace.id, - newBlockSuiteWorkspace.doc, - newBlockSuiteWorkspace.awarenessStore.awareness - ); - - const disconnect = datasource.onDocUpdate(() => {}); - await syncDataSourceFromDoc(upstreamWorkspace.doc, datasource); - disconnect(); - - const provider = createIndexedDBProvider( - newBlockSuiteWorkspace.doc, - DEFAULT_DB_NAME - ); - provider.connect(); migrateLocalBlobStorage(upstreamWorkspace.id, createWorkspace.id) .then(() => deleteLocalBlobStorage(upstreamWorkspace.id)) .catch(e => { diff --git a/packages/frontend/workspace/src/affine/download.ts b/packages/frontend/workspace/src/affine/download.ts new file mode 100644 index 0000000000..b397c742f9 --- /dev/null +++ b/packages/frontend/workspace/src/affine/download.ts @@ -0,0 +1,37 @@ +import { fetchWithTraceReport } from '@affine/graphql'; + +const hashMap = new Map(); +type DocPublishMode = 'edgeless' | 'page'; + +export type CloudDoc = { + arrayBuffer: ArrayBuffer; + publishMode: DocPublishMode; +}; + +export async function downloadBinaryFromCloud( + rootGuid: string, + pageGuid: string +): Promise { + const cached = hashMap.get(`${rootGuid}/${pageGuid}`); + if (cached) { + return cached; + } + const response = await fetchWithTraceReport( + runtimeConfig.serverUrlPrefix + + `/api/workspaces/${rootGuid}/docs/${pageGuid}`, + { + priority: 'high', + } + ); + if (response.ok) { + const publishMode = (response.headers.get('publish-mode') || + 'page') as DocPublishMode; + const arrayBuffer = await response.arrayBuffer(); + hashMap.set(`${rootGuid}/${pageGuid}`, { arrayBuffer, publishMode }); + + // return both arrayBuffer and publish mode + return { arrayBuffer, publishMode }; + } + + return null; +} diff --git a/packages/frontend/workspace/src/affine/index.ts b/packages/frontend/workspace/src/affine/index.ts deleted file mode 100644 index 7cb2099e47..0000000000 --- a/packages/frontend/workspace/src/affine/index.ts +++ /dev/null @@ -1,263 +0,0 @@ -import { DebugLogger } from '@affine/debug'; -import type { Socket } from 'socket.io-client'; -import { Manager } from 'socket.io-client'; -import { - applyAwarenessUpdate, - type Awareness, - encodeAwarenessUpdate, - removeAwarenessStates, -} from 'y-protocols/awareness'; -import type { DocDataSource } from 'y-provider'; -import type { Doc } from 'yjs'; - -import { MultipleBatchSyncSender } from './batch-sync-sender'; -import { - type AwarenessChanges, - base64ToUint8Array, - uint8ArrayToBase64, -} from './utils'; - -let ioManager: Manager | null = null; - -// use lazy initialization to avoid global side effect -function getIoManager(): Manager { - if (ioManager) { - return ioManager; - } - ioManager = new Manager(runtimeConfig.serverUrlPrefix + '/', { - autoConnect: false, - transports: ['websocket'], - }); - return ioManager; -} - -const logger = new DebugLogger('affine:sync'); - -export const createAffineDataSource = ( - id: string, - rootDoc: Doc, - awareness: Awareness -) => { - if (id !== rootDoc.guid) { - console.warn('important!! please use doc.guid as roomName'); - } - - logger.debug('createAffineDataSource', id, rootDoc.guid); - const socket = getIoManager().socket('/'); - const syncSender = new MultipleBatchSyncSender(async (guid, updates) => { - const payload = await Promise.all( - updates.map(update => uint8ArrayToBase64(update)) - ); - - return new Promise(resolve => { - socket.emit( - 'client-update-v2', - { - workspaceId: rootDoc.guid, - guid, - updates: payload, - }, - (response: { - // TODO: reuse `EventError` with server - error?: any; - data: any; - }) => { - // TODO: raise error with different code to users - if (response.error) { - logger.error('client-update-v2 error', { - workspaceId: rootDoc.guid, - guid, - response, - }); - } - - resolve({ - accepted: !response.error, - // TODO: reuse `EventError` with server - retry: response.error?.code === 'INTERNAL', - }); - } - ); - }); - }); - - return { - get socket() { - return socket; - }, - queryDocState: async (guid, options) => { - const stateVector = options?.stateVector - ? await uint8ArrayToBase64(options.stateVector) - : undefined; - - return new Promise((resolve, reject) => { - logger.debug('doc-load-v2', { - workspaceId: rootDoc.guid, - guid, - stateVector, - }); - socket.emit( - 'doc-load-v2', - { - workspaceId: rootDoc.guid, - guid, - stateVector, - }, - ( - response: // TODO: reuse `EventError` with server - { error: any } | { data: { missing: string; state: string } } - ) => { - logger.debug('doc-load callback', { - workspaceId: rootDoc.guid, - guid, - stateVector, - response, - }); - - if ('error' in response) { - // TODO: result `EventError` with server - if (response.error.code === 'DOC_NOT_FOUND') { - resolve(false); - } else { - reject(new Error(response.error.message)); - } - } else { - resolve({ - missing: base64ToUint8Array(response.data.missing), - state: response.data.state - ? base64ToUint8Array(response.data.state) - : undefined, - }); - } - } - ); - }); - }, - sendDocUpdate: async (guid: string, update: Uint8Array) => { - logger.debug('client-update-v2', { - workspaceId: rootDoc.guid, - guid, - update, - }); - - await syncSender.send(guid, update); - }, - onDocUpdate: callback => { - const onUpdate = async (message: { - workspaceId: string; - guid: string; - updates: string[]; - }) => { - if (message.workspaceId === rootDoc.guid) { - message.updates.forEach(update => { - callback(message.guid, base64ToUint8Array(update)); - }); - } - }; - let destroyAwareness = () => {}; - socket.on('server-updates', onUpdate); - socket.on('connect', () => { - socket.emit( - 'client-handshake', - rootDoc.guid, - (response: { error?: any }) => { - if (!response.error) { - syncSender.start(); - destroyAwareness = setupAffineAwareness( - socket, - rootDoc, - awareness - ); - } - } - ); - }); - - socket.connect(); - return () => { - syncSender.stop(); - socket.emit('client-leave', rootDoc.guid); - socket.off('server-updates', onUpdate); - destroyAwareness(); - socket.disconnect(); - }; - }, - } satisfies DocDataSource & { readonly socket: Socket }; -}; - -function setupAffineAwareness( - conn: Socket, - rootDoc: Doc, - awareness: Awareness -) { - const awarenessBroadcast = ({ - workspaceId, - awarenessUpdate, - }: { - workspaceId: string; - awarenessUpdate: string; - }) => { - if (workspaceId !== rootDoc.guid) { - return; - } - applyAwarenessUpdate( - awareness, - base64ToUint8Array(awarenessUpdate), - 'server' - ); - }; - - const awarenessUpdate = (changes: AwarenessChanges, origin: unknown) => { - if (origin === 'server') { - return; - } - - const changedClients = Object.values(changes).reduce((res, cur) => [ - ...res, - ...cur, - ]); - - const update = encodeAwarenessUpdate(awareness, changedClients); - uint8ArrayToBase64(update) - .then(encodedUpdate => { - conn.emit('awareness-update', { - workspaceId: rootDoc.guid, - awarenessUpdate: encodedUpdate, - }); - }) - .catch(err => logger.error(err)); - }; - - const newClientAwarenessInitHandler = () => { - const awarenessUpdate = encodeAwarenessUpdate(awareness, [ - awareness.clientID, - ]); - uint8ArrayToBase64(awarenessUpdate) - .then(encodedAwarenessUpdate => { - conn.emit('awareness-update', { - guid: rootDoc.guid, - awarenessUpdate: encodedAwarenessUpdate, - }); - }) - .catch(err => logger.error(err)); - }; - - const windowBeforeUnloadHandler = () => { - removeAwarenessStates(awareness, [awareness.clientID], 'window unload'); - }; - - conn.on('server-awareness-broadcast', awarenessBroadcast); - conn.on('new-client-awareness-init', newClientAwarenessInitHandler); - awareness.on('update', awarenessUpdate); - - window.addEventListener('beforeunload', windowBeforeUnloadHandler); - - conn.emit('awareness-init', rootDoc.guid); - - return () => { - awareness.off('update', awarenessUpdate); - conn.off('server-awareness-broadcast', awarenessBroadcast); - conn.off('new-client-awareness-init', newClientAwarenessInitHandler); - window.removeEventListener('unload', windowBeforeUnloadHandler); - }; -} diff --git a/packages/frontend/workspace/src/affine/sync.ts b/packages/frontend/workspace/src/affine/sync.ts deleted file mode 100644 index c126439cc9..0000000000 --- a/packages/frontend/workspace/src/affine/sync.ts +++ /dev/null @@ -1,120 +0,0 @@ -import { DebugLogger } from '@affine/debug'; -import { createIndexeddbStorage } from '@blocksuite/store'; -import { - createIndexedDBDatasource, - DEFAULT_DB_NAME, - downloadBinary, -} from '@toeverything/y-indexeddb'; -import { syncDataSource } from 'y-provider'; -import type { Doc } from 'yjs'; -import { applyUpdate } from 'yjs'; - -import { createCloudBlobStorage } from '../blob/cloud-blob-storage'; -import { createAffineDataSource } from '.'; -import { CRUD } from './crud'; - -const performanceLogger = new DebugLogger('performance:sync'); -let abortController: AbortController | undefined; - -const downloadRootFromIndexedDB = async ( - rootGuid: string, - doc: Doc, - signal: AbortSignal -): Promise => { - if (signal.aborted) { - return; - } - const update = await downloadBinary(rootGuid); - if (update !== false) { - applyUpdate(doc, update); - } -}; - -export async function startSync() { - performanceLogger.info('start'); - - abortController = new AbortController(); - const signal = abortController.signal; - const workspaces = await CRUD.list(); - performanceLogger.info('CRUD list'); - - const syncDocPromises = workspaces.map(workspace => - downloadRootFromIndexedDB( - workspace.id, - workspace.blockSuiteWorkspace.doc, - signal - ) - ); - await Promise.all(syncDocPromises); - performanceLogger.info('all sync promise'); - - const syncPromises = workspaces.map(workspace => { - const remoteDataSource = createAffineDataSource( - workspace.id, - workspace.blockSuiteWorkspace.doc, - workspace.blockSuiteWorkspace.awarenessStore.awareness - ); - const indexeddbDataSource = createIndexedDBDatasource({ - dbName: DEFAULT_DB_NAME, - }); - return syncDataSource( - (): string[] => [ - workspace.blockSuiteWorkspace.doc.guid, - ...[...workspace.blockSuiteWorkspace.doc.subdocs].map(doc => doc.guid), - ], - remoteDataSource, - indexeddbDataSource - ); - }); - - const syncBlobPromises = workspaces.map(async workspace => { - const cloudBlobStorage = createCloudBlobStorage(workspace.id); - const indexeddbBlobStorage = createIndexeddbStorage(workspace.id); - return Promise.all([ - cloudBlobStorage.crud.list(), - indexeddbBlobStorage.crud.list(), - ]).then(([cloudKeys, indexeddbKeys]) => { - if (signal.aborted) { - return; - } - const cloudKeysSet = new Set(cloudKeys); - const indexeddbKeysSet = new Set(indexeddbKeys); - // missing in indexeddb - const missingLocalKeys = cloudKeys.filter( - key => !indexeddbKeysSet.has(key) - ); - // missing in cloud - const missingCloudKeys = indexeddbKeys.filter( - key => !cloudKeysSet.has(key) - ); - return Promise.all([ - ...missingLocalKeys.map(key => - cloudBlobStorage.crud.get(key).then(async value => { - if (signal.aborted) { - return; - } - if (value) { - await indexeddbBlobStorage.crud.set(key, value); - } - }) - ), - ...missingCloudKeys.map(key => - indexeddbBlobStorage.crud.get(key).then(async value => { - if (signal.aborted) { - return; - } - if (value) { - await cloudBlobStorage.crud.set(key, value); - } - }) - ), - ]); - }); - }); - await Promise.all([...syncPromises, ...syncBlobPromises]); - performanceLogger.info('sync done'); -} - -export async function stopSync() { - abortController?.abort(); -} diff --git a/packages/frontend/workspace/src/atom.ts b/packages/frontend/workspace/src/atom.ts index 2b9fed2c73..72547946a9 100644 --- a/packages/frontend/workspace/src/atom.ts +++ b/packages/frontend/workspace/src/atom.ts @@ -268,7 +268,6 @@ export const rootWorkspacesMetadataAtom = atom< } if (newWorkspaceId) { - set(currentPageIdAtom, null); set(currentWorkspaceIdAtom, newWorkspaceId); } return metadata; diff --git a/packages/frontend/workspace/src/providers/__tests__/indexeddb-provider.spec.ts b/packages/frontend/workspace/src/providers/__tests__/indexeddb-provider.spec.ts deleted file mode 100644 index 583cf7510c..0000000000 --- a/packages/frontend/workspace/src/providers/__tests__/indexeddb-provider.spec.ts +++ /dev/null @@ -1,90 +0,0 @@ -/** - * @vitest-environment happy-dom - */ -import 'fake-indexeddb/auto'; - -import type { - LocalIndexedDBBackgroundProvider, - LocalIndexedDBDownloadProvider, -} from '@affine/env/workspace'; -import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models'; -import { Schema, Workspace } from '@blocksuite/store'; -import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'; - -import { - createIndexedDBBackgroundProvider, - createIndexedDBDownloadProvider, -} from '..'; - -const schema = new Schema(); - -schema.register(AffineSchemas).register(__unstableSchemas); - -beforeEach(() => { - vi.useFakeTimers({ toFake: ['requestIdleCallback'] }); -}); - -afterEach(() => { - globalThis.localStorage.clear(); - globalThis.indexedDB.deleteDatabase('affine-local'); -}); - -describe('download provider', () => { - test('basic', async () => { - let prev: any; - { - const workspace = new Workspace({ - id: 'test', - isSSR: true, - schema, - }); - const provider = createIndexedDBBackgroundProvider( - workspace.id, - workspace.doc, - { - awareness: workspace.awarenessStore.awareness, - } - ) as LocalIndexedDBBackgroundProvider; - provider.connect(); - const page = workspace.createPage({ - id: 'page0', - }); - await page.waitForLoaded(); - const pageBlockId = page.addBlock('affine:page', { - title: new page.Text(''), - }); - page.addBlock('affine:surface', {}, pageBlockId); - const frameId = page.addBlock('affine:note', {}, pageBlockId); - page.addBlock('affine:paragraph', {}, frameId); - await new Promise(resolve => setTimeout(resolve, 1000)); - provider.disconnect(); - prev = workspace.doc.toJSON(); - } - - { - const workspace = new Workspace({ - id: 'test', - isSSR: true, - schema, - }); - const provider = createIndexedDBDownloadProvider( - workspace.id, - workspace.doc, - { - awareness: workspace.awarenessStore.awareness, - } - ) as LocalIndexedDBDownloadProvider; - provider.sync(); - await provider.whenReady; - expect(workspace.doc.toJSON()).toEqual({ - ...prev, - // download provider only download the root doc - spaces: { - page0: { - blocks: {}, - }, - }, - }); - } - }); -}); diff --git a/packages/frontend/workspace/src/providers/__tests__/socketio-provider.spec.ts b/packages/frontend/workspace/src/providers/__tests__/socketio-provider.spec.ts deleted file mode 100644 index 7a155bd24a..0000000000 --- a/packages/frontend/workspace/src/providers/__tests__/socketio-provider.spec.ts +++ /dev/null @@ -1,103 +0,0 @@ -/** - * @vitest-environment happy-dom - */ -import 'fake-indexeddb/auto'; - -import type { AffineSocketIOProvider } from '@affine/env/workspace'; -import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models'; -import { Schema, Workspace } from '@blocksuite/store'; -import { describe, expect, test } from 'vitest'; -import * as awarenessProtocol from 'y-protocols/awareness'; -import { Doc } from 'yjs'; - -import { createAffineSocketIOProvider } from '..'; - -const schema = new Schema(); - -schema.register(AffineSchemas).register(__unstableSchemas); - -describe('sockio provider', () => { - test.skip('test storage', async () => { - const workspaceId = 'test-storage-ws'; - { - const workspace = new Workspace({ - id: workspaceId, - isSSR: true, - schema, - }); - const provider = createAffineSocketIOProvider( - workspace.id, - workspace.doc, - { - awareness: workspace.awarenessStore.awareness, - } - ) as AffineSocketIOProvider; - provider.connect(); - const page = workspace.createPage({ - id: 'page', - }); - - await page.waitForLoaded(); - page.addBlock('affine:page', { - title: new page.Text('123123'), - }); - - await new Promise(resolve => setTimeout(resolve, 1000)); - } - - { - const workspace = new Workspace({ - id: workspaceId, - isSSR: true, - schema, - }); - const provider = createAffineSocketIOProvider( - workspace.id, - workspace.doc, - { - awareness: workspace.awarenessStore.awareness, - } - ) as AffineSocketIOProvider; - - provider.connect(); - - await new Promise(resolve => setTimeout(resolve, 1000)); - const page = workspace.getPage('page')!; - await page.waitForLoaded(); - const block = page.getBlockByFlavour('affine:page'); - expect(block[0].flavour).toEqual('affine:page'); - } - }); - - test.skip('test collaboration', async () => { - const workspaceId = 'test-collboration-ws'; - { - const doc = new Doc({ guid: workspaceId }); - const provider = createAffineSocketIOProvider(doc.guid, doc, { - awareness: new awarenessProtocol.Awareness(doc), - }) as AffineSocketIOProvider; - - const doc2 = new Doc({ guid: workspaceId }); - const provider2 = createAffineSocketIOProvider(doc2.guid, doc2, { - awareness: new awarenessProtocol.Awareness(doc2), - }) as AffineSocketIOProvider; - - provider.connect(); - provider2.connect(); - - await new Promise(resolve => setTimeout(resolve, 500)); - - const subdoc = new Doc(); - const folder = doc.getMap(); - folder.set('subDoc', subdoc); - subdoc.getText().insert(0, 'subDoc content'); - - await new Promise(resolve => setTimeout(resolve, 1000)); - - expect( - (doc2.getMap().get('subDoc') as Doc).getText().toJSON(), - 'subDoc content' - ); - } - }); -}); diff --git a/packages/frontend/workspace/src/providers/__tests__/sqlite-provider.spec.ts b/packages/frontend/workspace/src/providers/__tests__/sqlite-provider.spec.ts deleted file mode 100644 index f96b4c7e5c..0000000000 --- a/packages/frontend/workspace/src/providers/__tests__/sqlite-provider.spec.ts +++ /dev/null @@ -1,165 +0,0 @@ -import type { - SQLiteDBDownloadProvider, - SQLiteProvider, -} from '@affine/env/workspace'; -import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models'; -import type { Y as YType } from '@blocksuite/store'; -import { Schema, Workspace } from '@blocksuite/store'; -import type { DBHandlerManager } from '@toeverything/infra/handler'; -import type { - EventMap, - UnwrapManagerHandlerToClientSide, -} from '@toeverything/infra/type'; -import { nanoid } from 'nanoid'; -import { setTimeout } from 'timers/promises'; -import { beforeEach, describe, expect, test, vi } from 'vitest'; -import { getDoc } from 'y-provider'; - -import { - createSQLiteDBDownloadProvider, - createSQLiteProvider, -} from '../sqlite-providers'; - -const Y = Workspace.Y; - -let id: string; -let workspace: Workspace; -let provider: SQLiteProvider; -let downloadProvider: SQLiteDBDownloadProvider; - -let offlineYdoc: YType.Doc; - -let triggerDBUpdate: - | Parameters[0] - | null = null; - -const mockedAddBlob = vi.fn(); - -vi.stubGlobal('window', { - apis: { - db: { - getDocAsUpdates: async (_, guid) => { - const subdoc = guid ? getDoc(offlineYdoc, guid) : offlineYdoc; - if (!subdoc) { - return false; - } - return Y.encodeStateAsUpdate(subdoc); - }, - applyDocUpdate: async (_, update, subdocId) => { - const subdoc = subdocId ? getDoc(offlineYdoc, subdocId) : offlineYdoc; - if (!subdoc) { - return; - } - Y.applyUpdate(subdoc, update, 'sqlite'); - }, - getBlobKeys: async () => { - // todo: may need to hack the way to get hash keys of blobs - return []; - }, - addBlob: mockedAddBlob, - } satisfies Partial>, - }, - events: { - db: { - onExternalUpdate: fn => { - triggerDBUpdate = fn; - return () => { - triggerDBUpdate = null; - }; - }, - }, - } as Partial, -}); - -vi.stubGlobal('environment', { - isDesktop: true, -}); - -const schema = new Schema(); - -schema.register(AffineSchemas).register(__unstableSchemas); - -beforeEach(() => { - id = nanoid(); - workspace = new Workspace({ - id, - isSSR: true, - schema, - }); - provider = createSQLiteProvider(workspace.id, workspace.doc, { - awareness: workspace.awarenessStore.awareness, - }) as SQLiteProvider; - downloadProvider = createSQLiteDBDownloadProvider( - workspace.id, - workspace.doc, - { - awareness: workspace.awarenessStore.awareness, - } - ) as SQLiteDBDownloadProvider; - offlineYdoc = new Y.Doc(); - offlineYdoc.getText('text').insert(0, 'sqlite-hello'); -}); - -describe('SQLite download provider', () => { - test('sync updates', async () => { - // on connect, the updates from sqlite should be sync'ed to the existing ydoc - workspace.doc.getText('text').insert(0, 'mem-hello'); - - downloadProvider.sync(); - await downloadProvider.whenReady; - - // depending on the nature of the sync, the data can be sync'ed in either direction - const options = ['sqlite-hellomem-hello', 'mem-hellosqlite-hello']; - const synced = options.filter( - o => o === workspace.doc.getText('text').toString() - ); - expect(synced.length).toBe(1); - }); - - // there is no updates from sqlite for now - test.skip('on db update', async () => { - provider.connect(); - - await setTimeout(200); - - offlineYdoc.getText('text').insert(0, 'sqlite-world'); - - triggerDBUpdate?.({ - workspaceId: id + '-another-id', - update: Y.encodeStateAsUpdate(offlineYdoc), - }); - - // not yet updated (because the workspace id is different) - expect(workspace.doc.getText('text').toString()).toBe(''); - - triggerDBUpdate?.({ - workspaceId: id, - update: Y.encodeStateAsUpdate(offlineYdoc), - }); - - expect(workspace.doc.getText('text').toString()).toBe( - 'sqlite-worldsqlite-hello' - ); - }); - - test('disconnect handlers', async () => { - const offHandler = vi.fn(); - let handleUpdate = () => {}; - let handleSubdocs = () => {}; - workspace.doc.on = (event: string, fn: () => void) => { - if (event === 'update') { - handleUpdate = fn; - } else if (event === 'subdocs') { - handleSubdocs = fn; - } - }; - workspace.doc.off = offHandler; - provider.connect(); - - provider.disconnect(); - - expect(triggerDBUpdate).toBe(null); - expect(offHandler).toBeCalledWith('update', handleUpdate); - expect(offHandler).toBeCalledWith('subdocs', handleSubdocs); - }); -}); diff --git a/packages/frontend/workspace/src/providers/awareness/affine/index.ts b/packages/frontend/workspace/src/providers/awareness/affine/index.ts new file mode 100644 index 0000000000..41d652e983 --- /dev/null +++ b/packages/frontend/workspace/src/providers/awareness/affine/index.ts @@ -0,0 +1,101 @@ +import { DebugLogger } from '@affine/debug'; +import { + applyAwarenessUpdate, + type Awareness, + encodeAwarenessUpdate, + removeAwarenessStates, +} from 'y-protocols/awareness'; + +import { getIoManager } from '../../utils/affine-io'; +import { base64ToUint8Array, uint8ArrayToBase64 } from '../../utils/base64'; +import type { AwarenessProvider } from '..'; + +const logger = new DebugLogger('affine:awareness:socketio'); + +export type AwarenessChanges = Record< + 'added' | 'updated' | 'removed', + number[] +>; + +export function createAffineAwarenessProvider( + workspaceId: string, + awareness: Awareness +): AwarenessProvider { + const socket = getIoManager().socket('/'); + + const awarenessBroadcast = ({ + workspaceId, + awarenessUpdate, + }: { + workspaceId: string; + awarenessUpdate: string; + }) => { + if (workspaceId !== workspaceId) { + return; + } + applyAwarenessUpdate( + awareness, + base64ToUint8Array(awarenessUpdate), + 'remote' + ); + }; + + const awarenessUpdate = (changes: AwarenessChanges, origin: unknown) => { + if (origin === 'remote') { + return; + } + + const changedClients = Object.values(changes).reduce((res, cur) => [ + ...res, + ...cur, + ]); + + const update = encodeAwarenessUpdate(awareness, changedClients); + uint8ArrayToBase64(update) + .then(encodedUpdate => { + socket.emit('awareness-update', { + workspaceId: workspaceId, + awarenessUpdate: encodedUpdate, + }); + }) + .catch(err => logger.error(err)); + }; + + const newClientAwarenessInitHandler = () => { + const awarenessUpdate = encodeAwarenessUpdate(awareness, [ + awareness.clientID, + ]); + uint8ArrayToBase64(awarenessUpdate) + .then(encodedAwarenessUpdate => { + socket.emit('awareness-update', { + guid: workspaceId, + awarenessUpdate: encodedAwarenessUpdate, + }); + }) + .catch(err => logger.error(err)); + }; + + const windowBeforeUnloadHandler = () => { + removeAwarenessStates(awareness, [awareness.clientID], 'window unload'); + }; + + return { + connect: () => { + socket.on('server-awareness-broadcast', awarenessBroadcast); + socket.on('new-client-awareness-init', newClientAwarenessInitHandler); + awareness.on('update', awarenessUpdate); + + window.addEventListener('beforeunload', windowBeforeUnloadHandler); + + socket.emit('awareness-init', workspaceId); + socket.connect(); + }, + disconnect: () => { + awareness.off('update', awarenessUpdate); + socket.off('server-awareness-broadcast', awarenessBroadcast); + socket.off('new-client-awareness-init', newClientAwarenessInitHandler); + window.removeEventListener('unload', windowBeforeUnloadHandler); + socket.disconnect(); + }, + }; +} diff --git a/packages/frontend/workspace/src/providers/awareness/broadcast-channel/index.ts b/packages/frontend/workspace/src/providers/awareness/broadcast-channel/index.ts new file mode 100644 index 0000000000..3fa9f7cb26 --- /dev/null +++ b/packages/frontend/workspace/src/providers/awareness/broadcast-channel/index.ts @@ -0,0 +1,63 @@ +import type { Awareness } from 'y-protocols/awareness.js'; +import { + applyAwarenessUpdate, + encodeAwarenessUpdate, +} from 'y-protocols/awareness.js'; + +import type { AwarenessProvider } from '..'; +import type { AwarenessChanges } from '../affine'; + +type ChannelMessage = + | { type: 'connect' } + | { type: 'update'; update: Uint8Array }; + +export function createBroadcastChannelAwarenessProvider( + workspaceId: string, + awareness: Awareness +): AwarenessProvider { + const channel = new BroadcastChannel('awareness:' + workspaceId); + + function handleAwarenessUpdate(changes: AwarenessChanges, origin: unknown) { + if (origin === 'remote') { + return; + } + + const changedClients = Object.values(changes).reduce((res, cur) => [ + ...res, + ...cur, + ]); + + const update = encodeAwarenessUpdate(awareness, changedClients); + channel.postMessage({ + type: 'update', + update: update, + } satisfies ChannelMessage); + } + + function handleChannelMessage(event: MessageEvent) { + if (event.data.type === 'update') { + const update = event.data.update; + applyAwarenessUpdate(awareness, update, 'remote'); + } + if (event.data.type === 'connect') { + channel.postMessage({ + type: 'update', + update: encodeAwarenessUpdate(awareness, [awareness.clientID]), + } satisfies ChannelMessage); + } + } + + return { + connect() { + channel.postMessage({ + type: 'connect', + } satisfies ChannelMessage); + awareness.on('update', handleAwarenessUpdate); + channel.addEventListener('message', handleChannelMessage); + }, + disconnect() { + awareness.off('update', handleAwarenessUpdate); + channel.removeEventListener('message', handleChannelMessage); + }, + }; +} diff --git a/packages/frontend/workspace/src/providers/awareness/index.ts b/packages/frontend/workspace/src/providers/awareness/index.ts new file mode 100644 index 0000000000..2fc09f42e9 --- /dev/null +++ b/packages/frontend/workspace/src/providers/awareness/index.ts @@ -0,0 +1,7 @@ +export interface AwarenessProvider { + connect(): void; + disconnect(): void; +} + +export * from './affine'; +export * from './broadcast-channel'; diff --git a/packages/frontend/workspace/src/providers/cloud/index.ts b/packages/frontend/workspace/src/providers/cloud/index.ts deleted file mode 100644 index a5dd71c4b6..0000000000 --- a/packages/frontend/workspace/src/providers/cloud/index.ts +++ /dev/null @@ -1,110 +0,0 @@ -import { DebugLogger } from '@affine/debug'; -import { fetchWithTraceReport } from '@affine/graphql'; -import type { ActiveDocProvider, DocProviderCreator } from '@blocksuite/store'; -import { Workspace } from '@blocksuite/store'; -import type { Doc } from 'yjs'; - -const Y = Workspace.Y; - -const logger = new DebugLogger('affine:cloud'); - -const hashMap = new Map(); - -type DocPublishMode = 'edgeless' | 'page'; - -export type CloudDoc = { - arrayBuffer: ArrayBuffer; - publishMode: DocPublishMode; -}; - -export async function downloadBinaryFromCloud( - rootGuid: string, - pageGuid: string -): Promise { - if (hashMap.has(`${rootGuid}/${pageGuid}`)) { - return true; - } - const response = await fetchWithTraceReport( - runtimeConfig.serverUrlPrefix + - `/api/workspaces/${rootGuid}/docs/${pageGuid}`, - { - priority: 'high', - } - ); - if (response.ok) { - const publishMode = (response.headers.get('publish-mode') || - 'page') as DocPublishMode; - const arrayBuffer = await response.arrayBuffer(); - hashMap.set(`${rootGuid}/${pageGuid}`, arrayBuffer); - - // return both arrayBuffer and publish mode - return { arrayBuffer, publishMode }; - } - return false; -} - -async function downloadBinary(rootGuid: string, doc: Doc) { - const response = await downloadBinaryFromCloud(rootGuid, doc.guid); - if (typeof response !== 'boolean') { - const { arrayBuffer } = response; - Y.applyUpdate(doc, new Uint8Array(arrayBuffer), 'affine-cloud'); - } -} - -export const createCloudDownloadProvider: DocProviderCreator = ( - id, - doc -): ActiveDocProvider => { - let _resolve: () => void; - let _reject: (error: unknown) => void; - const promise = new Promise((resolve, reject) => { - _resolve = resolve; - _reject = reject; - }); - - return { - flavour: 'affine-cloud-download', - active: true, - sync() { - downloadBinary(id, doc) - .then(() => { - logger.info(`Downloaded ${id}`); - _resolve(); - }) - .catch(_reject); - }, - get whenReady() { - return promise; - }, - }; -}; - -export const createMergeCloudSnapshotProvider: DocProviderCreator = ( - id, - doc -): ActiveDocProvider => { - let _resolve: () => void; - const promise = new Promise(resolve => { - _resolve = resolve; - }); - - return { - flavour: 'affine-cloud-merge-snapshot', - active: true, - sync() { - downloadBinary(id, doc) - .then(() => { - logger.info(`Downloaded ${id}`); - _resolve(); - }) - // ignore error - .catch(e => { - console.error(e); - _resolve(); - }); - }, - get whenReady() { - return promise; - }, - }; -}; diff --git a/packages/frontend/workspace/src/providers/index.ts b/packages/frontend/workspace/src/providers/index.ts index 29864d1009..a3f2b467e8 100644 --- a/packages/frontend/workspace/src/providers/index.ts +++ b/packages/frontend/workspace/src/providers/index.ts @@ -1,160 +1,128 @@ -import { DebugLogger } from '@affine/debug'; -import type { - AffineSocketIOProvider, - LocalIndexedDBBackgroundProvider, - LocalIndexedDBDownloadProvider, -} from '@affine/env/workspace'; -import { assertExists } from '@blocksuite/global/utils'; +/** + * The `Provider` is responsible for sync `Y.Doc` with the local database and the Affine Cloud, serving as the source of + * Affine's local-first collaborative magic. + * + * When Affine boot, the `Provider` is tasked with reading content from the local database and loading it into the + * workspace, continuously storing any changes made by the user into the local database. + * + * When using Affine Cloud, the `Provider` also handles sync content with the Cloud. + * + * Additionally, the `Provider` is responsible for implementing a local-first capability, allowing users to edit offline + * with changes stored in the local database and sync with the Cloud when the network is restored. + */ + import type { DocProviderCreator } from '@blocksuite/store'; -import { Workspace } from '@blocksuite/store'; -import { createBroadcastChannelProvider } from '@blocksuite/store/providers/broadcast-channel'; + import { - createIndexedDBDatasource, - createIndexedDBProvider as create, -} from '@toeverything/y-indexeddb'; -import { createLazyProvider } from 'y-provider'; -import { encodeStateVector } from 'yjs'; + createAffineAwarenessProvider, + createBroadcastChannelAwarenessProvider, +} from './awareness'; +import { createAffineStorage } from './storage/affine'; +import { createIndexedDBStorage } from './storage/indexeddb'; +import { createSQLiteStorage } from './storage/sqlite'; +import { SyncEngine } from './sync'; -import { createAffineDataSource } from '../affine'; -import { - createCloudDownloadProvider, - createMergeCloudSnapshotProvider, - downloadBinaryFromCloud, -} from './cloud'; -import { - createSQLiteDBDownloadProvider, - createSQLiteProvider, -} from './sqlite-providers'; - -const Y = Workspace.Y; -const logger = new DebugLogger('indexeddb-provider'); - -const createAffineSocketIOProvider: DocProviderCreator = ( - id, - doc, - { awareness } -): AffineSocketIOProvider => { - const dataSource = createAffineDataSource(id, doc, awareness); - const lazyProvider = createLazyProvider(doc, dataSource, { - origin: 'affine-socket-io', - }); - - Object.assign(lazyProvider, { flavour: 'affine-socket-io' }); - - return lazyProvider as unknown as AffineSocketIOProvider; -}; - -const createIndexedDBBackgroundProvider: DocProviderCreator = ( - id, - blockSuiteWorkspace -): LocalIndexedDBBackgroundProvider => { - const indexeddbProvider = create(blockSuiteWorkspace); - - let connected = false; - return { - flavour: 'local-indexeddb-background', - datasource: indexeddbProvider.datasource, - passive: true, - get status() { - return indexeddbProvider.status; - }, - subscribeStatusChange: indexeddbProvider.subscribeStatusChange, - get connected() { - return connected; - }, - cleanup: () => { - indexeddbProvider.cleanup().catch(console.error); - }, - connect: () => { - logger.info('connect indexeddb provider', id); - indexeddbProvider.connect(); - }, - disconnect: () => { - assertExists(indexeddbProvider); - logger.info('disconnect indexeddb provider', id); - indexeddbProvider.disconnect(); - connected = false; - }, - }; -}; - -const indexedDBDownloadOrigin = 'indexeddb-download-provider'; - -const createIndexedDBDownloadProvider: DocProviderCreator = ( - id, - doc -): LocalIndexedDBDownloadProvider => { - const datasource = createIndexedDBDatasource({}); - let _resolve: () => void; - let _reject: (error: unknown) => void; - const promise = new Promise((resolve, reject) => { - _resolve = resolve; - _reject = reject; - }); - - return { - flavour: 'local-indexeddb', - active: true, - get whenReady() { - return promise; - }, - cleanup: () => { - // todo: cleanup data - }, - sync: () => { - logger.info('sync indexeddb provider', id); - datasource - .queryDocState(doc.guid, { - stateVector: encodeStateVector(doc), - }) - .then(docState => { - if (docState) { - Y.applyUpdate(doc, docState.missing, indexedDBDownloadOrigin); - } - _resolve(); - }) - .catch(_reject); - }, - }; -}; - -export { - createAffineSocketIOProvider, - createBroadcastChannelProvider, - createIndexedDBBackgroundProvider, - createIndexedDBDownloadProvider, - createSQLiteDBDownloadProvider, - createSQLiteProvider, - downloadBinaryFromCloud, -}; +export * from './sync'; export const createLocalProviders = (): DocProviderCreator[] => { - const providers = [ - createIndexedDBBackgroundProvider, - createIndexedDBDownloadProvider, - ] as DocProviderCreator[]; + return [ + (_, doc, { awareness }) => { + const engine = new SyncEngine( + doc, + environment.isDesktop + ? createSQLiteStorage(doc.guid) + : createIndexedDBStorage(doc.guid), + [] + ); - if (runtimeConfig.enableBroadcastChannelProvider) { - providers.push(createBroadcastChannelProvider); - } + const awarenessProviders = [ + createBroadcastChannelAwarenessProvider(doc.guid, awareness), + ]; - if (environment.isDesktop && runtimeConfig.enableSQLiteProvider) { - providers.push(createSQLiteProvider, createSQLiteDBDownloadProvider); - } + let connected = false; - return providers; + return { + flavour: '_', + passive: true, + active: true, + sync() { + if (!connected) { + engine.start(); + + for (const provider of awarenessProviders) { + provider.connect(); + } + connected = true; + } + }, + get whenReady() { + return engine.waitForLoadedRootDoc(); + }, + connect() { + // TODO: actually connect + }, + disconnect() { + // TODO: actually disconnect + }, + get connected() { + return connected; + }, + engine, + }; + }, + ]; }; export const createAffineProviders = (): DocProviderCreator[] => { - return ( - [ - ...createLocalProviders(), - runtimeConfig.enableCloud && createAffineSocketIOProvider, - runtimeConfig.enableCloud && createMergeCloudSnapshotProvider, - ] as DocProviderCreator[] - ).filter(v => Boolean(v)); + return [ + (_, doc, { awareness }) => { + const engine = new SyncEngine( + doc, + environment.isDesktop + ? createSQLiteStorage(doc.guid) + : createIndexedDBStorage(doc.guid), + [createAffineStorage(doc.guid)] + ); + + const awarenessProviders = [ + createBroadcastChannelAwarenessProvider(doc.guid, awareness), + createAffineAwarenessProvider(doc.guid, awareness), + ]; + + let connected = false; + + return { + flavour: '_', + passive: true, + active: true, + sync() { + if (!connected) { + engine.start(); + + for (const provider of awarenessProviders) { + provider.connect(); + } + connected = true; + } + }, + get whenReady() { + return engine.waitForLoadedRootDoc(); + }, + connect() { + // TODO: actually connect + }, + disconnect() { + // TODO: actually disconnect + }, + get connected() { + return connected; + }, + engine, + }; + }, + ]; }; export const createAffinePublicProviders = (): DocProviderCreator[] => { - return [createCloudDownloadProvider]; + return []; }; diff --git a/packages/frontend/workspace/src/providers/logger.ts b/packages/frontend/workspace/src/providers/logger.ts deleted file mode 100644 index b9d2b1dd68..0000000000 --- a/packages/frontend/workspace/src/providers/logger.ts +++ /dev/null @@ -1,3 +0,0 @@ -import { DebugLogger } from '@affine/debug'; - -export const localProviderLogger = new DebugLogger('local-provider'); diff --git a/packages/frontend/workspace/src/providers/sqlite-providers.ts b/packages/frontend/workspace/src/providers/sqlite-providers.ts deleted file mode 100644 index 0369c65a27..0000000000 --- a/packages/frontend/workspace/src/providers/sqlite-providers.ts +++ /dev/null @@ -1,133 +0,0 @@ -import type { - SQLiteDBDownloadProvider, - SQLiteProvider, -} from '@affine/env/workspace'; -import { assertExists } from '@blocksuite/global/utils'; -import type { DocProviderCreator } from '@blocksuite/store'; -import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; -import { createLazyProvider, type DocDataSource } from 'y-provider'; -import type { Doc } from 'yjs'; - -import { localProviderLogger as logger } from './logger'; - -const Y = BlockSuiteWorkspace.Y; - -const sqliteOrigin = 'sqlite-provider-origin'; - -const createDatasource = (workspaceId: string): DocDataSource => { - if (!window.apis?.db) { - throw new Error('sqlite datasource is not available'); - } - - return { - queryDocState: async guid => { - const update = await window.apis.db.getDocAsUpdates( - workspaceId, - workspaceId === guid ? undefined : guid - ); - - if (update) { - return { - missing: update, - }; - } - - return false; - }, - sendDocUpdate: async (guid, update) => { - return window.apis.db.applyDocUpdate( - workspaceId, - update, - workspaceId === guid ? undefined : guid - ); - }, - }; -}; - -/** - * A provider that is responsible for syncing updates the workspace with the local SQLite database. - */ -export const createSQLiteProvider: DocProviderCreator = ( - id, - rootDoc -): SQLiteProvider => { - const datasource = createDatasource(id); - let provider: ReturnType | null = null; - let connected = false; - return { - flavour: 'sqlite', - datasource, - passive: true, - get status() { - assertExists(provider); - return provider.status; - }, - subscribeStatusChange(onStatusChange) { - assertExists(provider); - return provider.subscribeStatusChange(onStatusChange); - }, - connect: () => { - provider = createLazyProvider(rootDoc, datasource, { origin: 'sqlite' }); - provider.connect(); - connected = true; - }, - disconnect: () => { - provider?.disconnect(); - provider = null; - connected = false; - }, - get connected() { - return connected; - }, - }; -}; - -/** - * A provider that is responsible for DOWNLOADING updates from the local SQLite database. - */ -export const createSQLiteDBDownloadProvider: DocProviderCreator = ( - id, - rootDoc -): SQLiteDBDownloadProvider => { - const { apis } = window; - - let _resolve: () => void; - let _reject: (error: unknown) => void; - const promise = new Promise((resolve, reject) => { - _resolve = resolve; - _reject = reject; - }); - - async function syncUpdates(doc: Doc) { - logger.info('syncing updates from sqlite', doc.guid); - const subdocId = doc.guid === id ? undefined : doc.guid; - const updates = await apis.db.getDocAsUpdates(id, subdocId); - - if (updates) { - Y.applyUpdate(doc, updates, sqliteOrigin); - } - - return true; - } - - return { - flavour: 'sqlite-download', - active: true, - get whenReady() { - return promise; - }, - cleanup: () => { - // todo - }, - sync: () => { - logger.info('connect sqlite download provider', id); - syncUpdates(rootDoc) - .then(() => { - _resolve(); - }) - .catch(error => { - _reject(error); - }); - }, - }; -}; diff --git a/packages/frontend/workspace/src/affine/batch-sync-sender.ts b/packages/frontend/workspace/src/providers/storage/affine/batch-sync-sender.ts similarity index 100% rename from packages/frontend/workspace/src/affine/batch-sync-sender.ts rename to packages/frontend/workspace/src/providers/storage/affine/batch-sync-sender.ts diff --git a/packages/frontend/workspace/src/providers/storage/affine/index.ts b/packages/frontend/workspace/src/providers/storage/affine/index.ts new file mode 100644 index 0000000000..622ef8b6b4 --- /dev/null +++ b/packages/frontend/workspace/src/providers/storage/affine/index.ts @@ -0,0 +1,162 @@ +import { DebugLogger } from '@affine/debug'; + +import { getIoManager } from '../../utils/affine-io'; +import { base64ToUint8Array, uint8ArrayToBase64 } from '../../utils/base64'; +import type { Storage } from '..'; +import { MultipleBatchSyncSender } from './batch-sync-sender'; + +const logger = new DebugLogger('affine:storage:socketio'); + +export function createAffineStorage( + workspaceId: string +): Storage & { disconnect: () => void } { + logger.debug('createAffineStorage', workspaceId); + const socket = getIoManager().socket('/'); + + const syncSender = new MultipleBatchSyncSender(async (guid, updates) => { + const payload = await Promise.all( + updates.map(update => uint8ArrayToBase64(update)) + ); + + return new Promise(resolve => { + socket.emit( + 'client-update-v2', + { + workspaceId, + guid, + updates: payload, + }, + (response: { + // TODO: reuse `EventError` with server + error?: any; + data: any; + }) => { + // TODO: raise error with different code to users + if (response.error) { + logger.error('client-update-v2 error', { + workspaceId, + guid, + response, + }); + } + + resolve({ + accepted: !response.error, + // TODO: reuse `EventError` with server + retry: response.error?.code === 'INTERNAL', + }); + } + ); + }); + }); + + // TODO: handle error + socket.on('connect', () => { + socket.emit( + 'client-handshake', + workspaceId, + (response: { error?: any }) => { + if (!response.error) { + syncSender.start(); + } + } + ); + }); + + socket.connect(); + + return { + name: 'socketio', + async pull(docId, state) { + const stateVector = state ? await uint8ArrayToBase64(state) : undefined; + + return new Promise((resolve, reject) => { + logger.debug('doc-load-v2', { + workspaceId: workspaceId, + guid: docId, + stateVector, + }); + socket.emit( + 'doc-load-v2', + { + workspaceId: workspaceId, + guid: docId, + stateVector, + }, + ( + response: // TODO: reuse `EventError` with server + { error: any } | { data: { missing: string; state: string } } + ) => { + logger.debug('doc-load callback', { + workspaceId: workspaceId, + guid: docId, + stateVector, + response, + }); + + if ('error' in response) { + // TODO: result `EventError` with server + if (response.error.code === 'DOC_NOT_FOUND') { + resolve(null); + } else { + reject(new Error(response.error.message)); + } + } else { + resolve({ + data: base64ToUint8Array(response.data.missing), + state: response.data.state + ? base64ToUint8Array(response.data.state) + : undefined, + }); + } + } + ); + }); + }, + async push(docId, update) { + logger.debug('client-update-v2', { + workspaceId, + guid: docId, + update, + }); + + await syncSender.send(docId, update); + }, + async subscribe(cb, disconnect) { + const response: { error?: any } = await socket + .timeout(10000) + .emitWithAck('client-handshake', workspaceId); + + if (response.error) { + throw new Error('client-handshake error, ' + response.error); + } + + const handleUpdate = async (message: { + workspaceId: string; + guid: string; + updates: string[]; + }) => { + if (message.workspaceId === workspaceId) { + message.updates.forEach(update => { + cb(message.guid, base64ToUint8Array(update)); + }); + } + }; + socket.on('server-updates', handleUpdate); + + socket.on('disconnect', reason => { + socket.off('server-updates', handleUpdate); + disconnect(reason); + }); + + return () => { + socket.off('server-updates', handleUpdate); + }; + }, + disconnect() { + syncSender.stop(); + socket.emit('client-leave', workspaceId); + socket.disconnect(); + }, + }; +} diff --git a/packages/frontend/workspace/src/providers/storage/index.ts b/packages/frontend/workspace/src/providers/storage/index.ts new file mode 100644 index 0000000000..f9847f195a --- /dev/null +++ b/packages/frontend/workspace/src/providers/storage/index.ts @@ -0,0 +1,29 @@ +export interface Storage { + /** + * for debug + */ + name: string; + + pull( + docId: string, + state: Uint8Array + ): Promise<{ data: Uint8Array; state?: Uint8Array } | null>; + push(docId: string, data: Uint8Array): Promise; + + /** + * Subscribe to updates from peer + * + * @param cb callback to handle updates + * @param disconnect callback to handle disconnect, reason can be something like 'network-error' + * + * @returns unsubscribe function + */ + subscribe( + cb: (docId: string, data: Uint8Array) => void, + disconnect: (reason: string) => void + ): Promise<() => void>; +} + +export * from './affine'; +export * from './indexeddb'; +export * from './sqlite'; diff --git a/packages/frontend/workspace/src/providers/storage/indexeddb/index.ts b/packages/frontend/workspace/src/providers/storage/indexeddb/index.ts new file mode 100644 index 0000000000..79a165635c --- /dev/null +++ b/packages/frontend/workspace/src/providers/storage/indexeddb/index.ts @@ -0,0 +1,133 @@ +import { type DBSchema, type IDBPDatabase, openDB } from 'idb'; +import { + applyUpdate, + diffUpdate, + Doc, + encodeStateAsUpdate, + encodeStateVectorFromUpdate, +} from 'yjs'; + +import type { Storage } from '..'; + +export const dbVersion = 1; +export const DEFAULT_DB_NAME = 'affine-local'; + +export function mergeUpdates(updates: Uint8Array[]) { + const doc = new Doc(); + doc.transact(() => { + updates.forEach(update => { + applyUpdate(doc, update); + }); + }); + return encodeStateAsUpdate(doc); +} + +type UpdateMessage = { + timestamp: number; + update: Uint8Array; +}; + +type WorkspacePersist = { + id: string; + updates: UpdateMessage[]; +}; + +interface BlockSuiteBinaryDB extends DBSchema { + workspace: { + key: string; + value: WorkspacePersist; + }; + milestone: { + key: string; + value: unknown; + }; +} + +export function upgradeDB(db: IDBPDatabase) { + db.createObjectStore('workspace', { keyPath: 'id' }); + db.createObjectStore('milestone', { keyPath: 'id' }); +} + +type ChannelMessage = { + type: 'db-updated'; + payload: { docId: string; update: Uint8Array }; +}; + +export function createIndexedDBStorage( + workspaceId: string, + dbName = DEFAULT_DB_NAME, + mergeCount = 1 +): Storage { + let dbPromise: Promise> | null = null; + const getDb = async () => { + if (dbPromise === null) { + dbPromise = openDB(dbName, dbVersion, { + upgrade: upgradeDB, + }); + } + return dbPromise; + }; + + // indexeddb could be shared between tabs, so we use broadcast channel to notify other tabs + const channel = new BroadcastChannel('indexeddb:' + workspaceId); + + return { + name: 'indexeddb', + async pull(docId, state) { + const db = await getDb(); + const store = db + .transaction('workspace', 'readonly') + .objectStore('workspace'); + const data = await store.get(docId); + + if (!data) { + return null; + } + + const { updates } = data; + const update = mergeUpdates(updates.map(({ update }) => update)); + + const diff = state ? diffUpdate(update, state) : update; + + return { data: diff, state: encodeStateVectorFromUpdate(update) }; + }, + async push(docId, update) { + const db = await getDb(); + const store = db + .transaction('workspace', 'readwrite') + .objectStore('workspace'); + + // TODO: maybe we do not need to get data every time + const { updates } = (await store.get(docId)) ?? { updates: [] }; + let rows: UpdateMessage[] = [ + ...updates, + { timestamp: Date.now(), update }, + ]; + if (mergeCount && rows.length >= mergeCount) { + const merged = mergeUpdates(rows.map(({ update }) => update)); + rows = [{ timestamp: Date.now(), update: merged }]; + } + await store.put({ + id: docId, + updates: rows, + }); + channel.postMessage({ + type: 'db-updated', + payload: { docId, update }, + } satisfies ChannelMessage); + }, + async subscribe(cb, _disconnect) { + function onMessage(event: MessageEvent) { + const { type, payload } = event.data; + if (type === 'db-updated') { + const { docId, update } = payload; + cb(docId, update); + } + } + channel.addEventListener('message', onMessage); + return () => { + channel.removeEventListener('message', onMessage); + }; + }, + }; +} diff --git a/packages/frontend/workspace/src/providers/storage/sqlite/index.ts b/packages/frontend/workspace/src/providers/storage/sqlite/index.ts new file mode 100644 index 0000000000..be7dafd45d --- /dev/null +++ b/packages/frontend/workspace/src/providers/storage/sqlite/index.ts @@ -0,0 +1,38 @@ +import { encodeStateVectorFromUpdate } from 'yjs'; + +import type { Storage } from '..'; + +export function createSQLiteStorage(workspaceId: string): Storage { + if (!window.apis?.db) { + throw new Error('sqlite datasource is not available'); + } + + return { + name: 'sqlite', + async pull(docId, _state) { + const update = await window.apis.db.getDocAsUpdates( + workspaceId, + workspaceId === docId ? undefined : docId + ); + + if (update) { + return { + data: update, + state: encodeStateVectorFromUpdate(update), + }; + } + + return null; + }, + async push(docId, data) { + return window.apis.db.applyDocUpdate( + workspaceId, + data, + workspaceId === docId ? undefined : docId + ); + }, + async subscribe(_cb, _disconnect) { + return () => {}; + }, + }; +} diff --git a/packages/frontend/workspace/src/providers/sync/__tests__/sync.spec.ts b/packages/frontend/workspace/src/providers/sync/__tests__/sync.spec.ts new file mode 100644 index 0000000000..5f06e61949 --- /dev/null +++ b/packages/frontend/workspace/src/providers/sync/__tests__/sync.spec.ts @@ -0,0 +1,62 @@ +import 'fake-indexeddb/auto'; + +import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models'; +import { Schema, Workspace } from '@blocksuite/store'; +import { describe, expect, test } from 'vitest'; + +import { createIndexedDBStorage } from '../../storage'; +import { SyncPeer } from '../'; + +const schema = new Schema(); + +schema.register(AffineSchemas).register(__unstableSchemas); + +describe('sync', () => { + test('basic - indexeddb', async () => { + let prev: any; + { + const workspace = new Workspace({ + id: 'test', + isSSR: true, + schema, + }); + + const syncPeer = new SyncPeer( + workspace.doc, + createIndexedDBStorage(workspace.doc.guid) + ); + await syncPeer.waitForLoaded(); + + const page = workspace.createPage({ + id: 'page0', + }); + await page.waitForLoaded(); + const pageBlockId = page.addBlock('affine:page', { + title: new page.Text(''), + }); + page.addBlock('affine:surface', {}, pageBlockId); + const frameId = page.addBlock('affine:note', {}, pageBlockId); + page.addBlock('affine:paragraph', {}, frameId); + await syncPeer.waitForSynced(); + syncPeer.stop(); + prev = workspace.doc.toJSON(); + } + + { + const workspace = new Workspace({ + id: 'test', + isSSR: true, + schema, + }); + const syncPeer = new SyncPeer( + workspace.doc, + createIndexedDBStorage(workspace.doc.guid) + ); + await syncPeer.waitForSynced(); + expect(workspace.doc.toJSON()).toEqual({ + ...prev, + }); + syncPeer.stop(); + } + }); +}); diff --git a/packages/frontend/workspace/src/providers/sync/engine.ts b/packages/frontend/workspace/src/providers/sync/engine.ts new file mode 100644 index 0000000000..432e4199a3 --- /dev/null +++ b/packages/frontend/workspace/src/providers/sync/engine.ts @@ -0,0 +1,225 @@ +import { DebugLogger } from '@affine/debug'; +import { Slot } from '@blocksuite/global/utils'; +import type { Doc } from 'yjs'; + +import type { Storage } from '../storage'; +import { SyncPeer, SyncPeerStatus } from './peer'; + +export const MANUALLY_STOP = 'manually-stop'; + +/** + * # SyncEngine + * + * ┌────────────┐ + * │ SyncEngine │ + * └─────┬──────┘ + * │ + * ▼ + * ┌────────────┐ + * │ SyncPeer │ + * ┌─────────┤ local ├─────────┐ + * │ └─────┬──────┘ │ + * │ │ │ + * ▼ ▼ ▼ + * ┌────────────┐ ┌────────────┐ ┌────────────┐ + * │ SyncPeer │ │ SyncPeer │ │ SyncPeer │ + * │ Remote │ │ Remote │ │ Remote │ + * └────────────┘ └────────────┘ └────────────┘ + * + * Sync engine manage sync peers + * + * Sync steps: + * 1. start local sync + * 2. wait for local sync complete + * 3. start remote sync + * 4. continuously sync local and remote + */ +export enum SyncEngineStatus { + Stopped = 0, + Retrying = 1, + LoadingRootDoc = 2, + LoadingSubDoc = 3, + Syncing = 4, + Synced = 5, +} + +export class SyncEngine { + get rootDocId() { + return this.rootDoc.guid; + } + + logger = new DebugLogger('affine:sync-engine:' + this.rootDocId); + private _status = SyncEngineStatus.Stopped; + onStatusChange = new Slot(); + private set status(s: SyncEngineStatus) { + if (s !== this._status) { + this.logger.info('status change', SyncEngineStatus[s]); + this._status = s; + this.onStatusChange.emit(s); + } + } + + get status() { + return this._status; + } + + private abort = new AbortController(); + + constructor( + private rootDoc: Doc, + private local: Storage, + private remotes: Storage[] + ) {} + + start() { + if (this.status !== SyncEngineStatus.Stopped) { + this.stop(); + } + this.abort = new AbortController(); + + this.status = SyncEngineStatus.LoadingRootDoc; + this.sync(this.abort.signal).catch(err => { + // should never reach here + this.logger.error(err); + }); + } + + stop() { + this.abort.abort(MANUALLY_STOP); + this.status = SyncEngineStatus.Stopped; + } + + // main sync process, should never return until abort + async sync(signal: AbortSignal) { + let localPeer: SyncPeer | null = null; + const remotePeers: SyncPeer[] = []; + const cleanUp: (() => void)[] = []; + try { + // Step 1: start local sync peer + localPeer = new SyncPeer(this.rootDoc, this.local); + + // Step 2: wait for local sync complete + await localPeer.waitForLoaded(signal); + + // Step 3: start remote sync peer + remotePeers.push( + ...this.remotes.map(remote => new SyncPeer(this.rootDoc, remote)) + ); + + const peers = [localPeer, ...remotePeers]; + + this.updateSyncingState(peers); + + for (const peer of peers) { + cleanUp.push( + peer.onStatusChange.on(() => { + if (!signal.aborted) this.updateSyncingState(peers); + }).dispose + ); + } + + // Step 4: continuously sync local and remote + + // wait for abort + await new Promise((_, reject) => { + if (signal.aborted) { + reject(signal.reason); + } + signal.addEventListener('abort', () => { + reject(signal.reason); + }); + }); + } catch (error) { + if (error === MANUALLY_STOP) { + return; + } + throw error; + } finally { + // stop peers + localPeer?.stop(); + for (const remotePeer of remotePeers) { + remotePeer.stop(); + } + for (const clean of cleanUp) { + clean(); + } + } + } + + updateSyncingState(peers: SyncPeer[]) { + let status = SyncEngineStatus.Synced; + for (const peer of peers) { + if (peer.status !== SyncPeerStatus.Synced) { + status = SyncEngineStatus.Syncing; + break; + } + } + for (const peer of peers) { + if (peer.status === SyncPeerStatus.LoadingSubDoc) { + status = SyncEngineStatus.LoadingSubDoc; + break; + } + } + for (const peer of peers) { + if (peer.status === SyncPeerStatus.LoadingRootDoc) { + status = SyncEngineStatus.LoadingRootDoc; + break; + } + } + for (const peer of peers) { + if (peer.status === SyncPeerStatus.Retrying) { + status = SyncEngineStatus.Retrying; + break; + } + } + this.status = status; + } + + async waitForSynced(abort?: AbortSignal) { + if (this.status == SyncEngineStatus.Synced) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (status == SyncEngineStatus.Synced) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } + + async waitForLoadedRootDoc(abort?: AbortSignal) { + if (this.status > SyncEngineStatus.LoadingRootDoc) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (status > SyncEngineStatus.LoadingRootDoc) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } +} diff --git a/packages/frontend/workspace/src/providers/sync/index.ts b/packages/frontend/workspace/src/providers/sync/index.ts new file mode 100644 index 0000000000..882fbe154a --- /dev/null +++ b/packages/frontend/workspace/src/providers/sync/index.ts @@ -0,0 +1,18 @@ +/** + * + * **SyncEngine** + * + * Manages one local storage and multiple remote storages. + * + * Responsible for creating SyncPeers for synchronization, following the local-first strategy. + * + * **SyncPeer** + * + * Responsible for synchronizing a single storage with Y.Doc. + * + * Carries the main synchronization logic. + * + */ + +export * from './engine'; +export * from './peer'; diff --git a/packages/frontend/workspace/src/providers/sync/peer.ts b/packages/frontend/workspace/src/providers/sync/peer.ts new file mode 100644 index 0000000000..1a208b67d3 --- /dev/null +++ b/packages/frontend/workspace/src/providers/sync/peer.ts @@ -0,0 +1,397 @@ +import { DebugLogger } from '@affine/debug'; +import { Slot } from '@blocksuite/global/utils'; +import type { Doc } from 'yjs'; +import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs'; + +import type { Storage } from '../storage'; +import { AsyncQueue } from '../utils/async-queue'; +import { throwIfAborted } from '../utils/throw-if-aborted'; +import { MANUALLY_STOP } from './engine'; + +/** + * # SyncPeer + * A SyncPeer is responsible for syncing one Storage with one Y.Doc and its subdocs. + * + * ┌─────┐ + * │Start│ + * └──┬──┘ + * │ + * ┌──────┐ ┌─────▼──────┐ ┌────┐ + * │listen◄─────┤pull rootdoc│ │peer│ + * └──┬───┘ └─────┬──────┘ └──┬─┘ + * │ │ onLoad() │ + * ┌──▼───┐ ┌─────▼──────┐ ┌────▼────┐ + * │listen◄─────┤pull subdocs│ │subscribe│ + * └──┬───┘ └─────┬──────┘ └────┬────┘ + * │ │ onReady() │ + * ┌──▼──┐ ┌─────▼───────┐ ┌──▼──┐ + * │queue├──────►apply updates◄───────┤queue│ + * └─────┘ └─────────────┘ └─────┘ + * + * listen: listen for updates from ydoc, typically from user modifications. + * subscribe: listen for updates from storage, typically from other users. + * + */ +export enum SyncPeerStatus { + Stopped = 0, + Retrying = 1, + LoadingRootDoc = 2, + LoadingSubDoc = 3, + Loaded = 4.5, + Syncing = 5, + Synced = 6, +} + +export class SyncPeer { + private _status = SyncPeerStatus.Stopped; + onStatusChange = new Slot(); + abort = new AbortController(); + get name() { + return this.storage.name; + } + logger = new DebugLogger('affine:sync-peer:' + this.name); + + constructor( + private rootDoc: Doc, + private storage: Storage + ) { + this.logger.debug('peer start'); + this.status = SyncPeerStatus.LoadingRootDoc; + + this.syncRetryLoop(this.abort.signal).catch(err => { + // should not reach here + console.error(err); + }); + } + + private set status(s: SyncPeerStatus) { + if (s !== this._status) { + this.logger.debug('status change', SyncPeerStatus[s]); + this._status = s; + this.onStatusChange.emit(s); + } + } + + get status() { + return this._status; + } + + /** + * stop sync + * + * SyncPeer is one-time use, this peer should be discarded after call stop(). + */ + stop() { + this.logger.debug('peer stop'); + this.abort.abort(MANUALLY_STOP); + } + + /** + * auto retry after 5 seconds if sync failed + */ + async syncRetryLoop(abort: AbortSignal) { + while (abort.aborted === false) { + try { + await this.sync(abort); + } catch (err) { + if (err === MANUALLY_STOP) { + return; + } + + this.logger.error('sync error', err); + } + try { + this.logger.error('retry after 5 seconds'); + this.status = SyncPeerStatus.Retrying; + await Promise.race([ + new Promise(resolve => { + setTimeout(resolve, 5 * 1000); + }), + new Promise((_, reject) => { + // exit if manually stopped + if (abort.aborted) { + reject(abort.reason); + } + abort.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } catch (err) { + if (err === MANUALLY_STOP) { + return; + } + + // should never reach here + throw err; + } + } + } + + private state: { + connectedDocs: Map; + pushUpdatesQueue: AsyncQueue<{ + docId: string; + data: Uint8Array; + }>; + pullUpdatesQueue: AsyncQueue<{ + docId: string; + data: Uint8Array; + }>; + subdocsLoadQueue: AsyncQueue; + } = { + connectedDocs: new Map(), + pushUpdatesQueue: new AsyncQueue(), + pullUpdatesQueue: new AsyncQueue(), + subdocsLoadQueue: new AsyncQueue(), + }; + + /** + * main synchronization logic + */ + async sync(abortOuter: AbortSignal) { + const abortInner = new AbortController(); + + abortOuter.addEventListener('abort', reason => { + abortInner.abort(reason); + }); + + let dispose: (() => void) | null = null; + try { + // start listen storage updates + dispose = await this.storage.subscribe( + this.handleStorageUpdates, + reason => { + // abort if storage disconnect, should trigger retry loop + abortInner.abort('subscribe disconnect:' + reason); + } + ); + throwIfAborted(abortInner.signal); + + // Step 1: load root doc + this.status = SyncPeerStatus.LoadingRootDoc; + + await this.connectDoc(this.rootDoc, abortInner.signal); + + this.status = SyncPeerStatus.LoadingSubDoc; + + // Step 2: load subdocs + this.state.subdocsLoadQueue.push( + ...Array.from(this.rootDoc.getSubdocs()) + ); + + this.rootDoc.on('subdocs', this.handleSubdocsUpdate); + + while (this.state.subdocsLoadQueue.length > 0) { + const subdoc = await this.state.subdocsLoadQueue.next( + abortInner.signal + ); + await this.connectDoc(subdoc, abortInner.signal); + } + + this.status = SyncPeerStatus.Syncing; + this.updateSyncStatus(); + + // Finally: start sync + await Promise.all([ + // listen subdocs + (async () => { + while (throwIfAborted(abortInner.signal)) { + const subdoc = await this.state.subdocsLoadQueue.next( + abortInner.signal + ); + this.status = SyncPeerStatus.LoadingSubDoc; + await this.connectDoc(subdoc, abortInner.signal); + this.status = SyncPeerStatus.Syncing; + this.updateSyncStatus(); + } + })(), + // pull updates + (async () => { + while (throwIfAborted(abortInner.signal)) { + const { docId, data } = await this.state.pullUpdatesQueue.next( + abortInner.signal + ); + this.updateSyncStatus(); + // don't apply empty data or Uint8Array([0, 0]) + if ( + !( + data.byteLength === 0 || + (data.byteLength === 2 && data[0] === 0 && data[1] === 0) + ) + ) { + const subdoc = this.state.connectedDocs.get(docId); + if (subdoc) { + applyUpdate(subdoc, data, this.name); + } + } + } + })(), + // push updates + (async () => { + while (throwIfAborted(abortInner.signal)) { + const { docId, data } = await this.state.pushUpdatesQueue.next( + abortInner.signal + ); + + // don't push empty data or Uint8Array([0, 0]) + if ( + !( + data.byteLength === 0 || + (data.byteLength === 2 && data[0] === 0 && data[1] === 0) + ) + ) { + await this.storage.push(docId, data); + } + + this.updateSyncStatus(); + } + })(), + ]); + } finally { + dispose?.(); + for (const docs of this.state.connectedDocs.values()) { + this.disconnectDoc(docs); + } + this.rootDoc.off('subdocs', this.handleSubdocsUpdate); + } + } + + async connectDoc(doc: Doc, abort: AbortSignal) { + const { data: docData, state: inStorageState } = + (await this.storage.pull(doc.guid, encodeStateVector(doc))) ?? {}; + throwIfAborted(abort); + + if (docData) { + applyUpdate(doc, docData, 'load'); + } + + // diff root doc and in-storage, save updates to pendingUpdates + this.state.pushUpdatesQueue.push({ + docId: doc.guid, + data: encodeStateAsUpdate(doc, inStorageState), + }); + + this.state.connectedDocs.set(doc.guid, doc); + + // start listen root doc changes + doc.on('update', this.handleYDocUpdates); + + // mark rootDoc as loaded + doc.emit('sync', [true]); + } + + disconnectDoc(doc: Doc) { + doc.off('update', this.handleYDocUpdates); + this.state.connectedDocs.delete(doc.guid); + } + + // handle updates from ydoc + handleYDocUpdates = (update: Uint8Array, origin: string, doc: Doc) => { + // don't push updates from storage + if (origin === this.name) { + return; + } + this.state.pushUpdatesQueue.push({ + docId: doc.guid, + data: update, + }); + this.updateSyncStatus(); + }; + + // handle subdocs changes, append new subdocs to queue, remove subdocs from queue + handleSubdocsUpdate = ({ + added, + removed, + }: { + added: Set; + removed: Set; + }) => { + for (const subdoc of added) { + this.state.subdocsLoadQueue.push(subdoc); + } + + for (const subdoc of removed) { + this.disconnectDoc(subdoc); + this.state.subdocsLoadQueue.remove(doc => doc === subdoc); + } + this.updateSyncStatus(); + }; + + // handle updates from storage + handleStorageUpdates = (docId: string, data: Uint8Array) => { + this.state.pullUpdatesQueue.push({ + docId, + data, + }); + this.updateSyncStatus(); + }; + + updateSyncStatus() { + // if status is not syncing, do nothing + if (this.status < SyncPeerStatus.Syncing) { + return; + } + if ( + this.state.pushUpdatesQueue.length === 0 && + this.state.pullUpdatesQueue.length === 0 && + this.state.subdocsLoadQueue.length === 0 + ) { + if (this.status === SyncPeerStatus.Syncing) { + this.status = SyncPeerStatus.Synced; + } + } else { + if (this.status === SyncPeerStatus.Synced) { + this.status = SyncPeerStatus.Syncing; + } + } + } + + async waitForSynced(abort?: AbortSignal) { + if (this.status >= SyncPeerStatus.Synced) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (status >= SyncPeerStatus.Synced) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } + + async waitForLoaded(abort?: AbortSignal) { + if (this.status > SyncPeerStatus.Loaded) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (status > SyncPeerStatus.Loaded) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } +} diff --git a/packages/frontend/workspace/src/providers/utils/__tests__/async-queue.spec.ts b/packages/frontend/workspace/src/providers/utils/__tests__/async-queue.spec.ts new file mode 100644 index 0000000000..017401ec84 --- /dev/null +++ b/packages/frontend/workspace/src/providers/utils/__tests__/async-queue.spec.ts @@ -0,0 +1,45 @@ +import { describe, expect, test, vi } from 'vitest'; + +import { AsyncQueue } from '../async-queue'; + +describe('async-queue', () => { + test('push & pop', async () => { + const queue = new AsyncQueue(); + queue.push(1, 2, 3); + expect(queue.length).toBe(3); + expect(await queue.next()).toBe(1); + expect(await queue.next()).toBe(2); + expect(await queue.next()).toBe(3); + expect(queue.length).toBe(0); + }); + + test('await', async () => { + const queue = new AsyncQueue(); + queue.push(1, 2); + expect(await queue.next()).toBe(1); + expect(await queue.next()).toBe(2); + + let v = -1; + + // setup 2 pop tasks + queue.next().then(next => { + v = next; + }); + queue.next().then(next => { + v = next; + }); + + // Wait for 100ms + await new Promise(resolve => setTimeout(resolve, 100)); + // v should not be changed + expect(v).toBe(-1); + + // push 3, should trigger the first pop task + queue.push(3); + await vi.waitFor(() => v === 3); + + // push 4, should trigger the second pop task + queue.push(4); + await vi.waitFor(() => v === 4); + }); +}); diff --git a/packages/frontend/workspace/src/providers/utils/__tests__/throw-if-aborted.spec.ts b/packages/frontend/workspace/src/providers/utils/__tests__/throw-if-aborted.spec.ts new file mode 100644 index 0000000000..137f748a6b --- /dev/null +++ b/packages/frontend/workspace/src/providers/utils/__tests__/throw-if-aborted.spec.ts @@ -0,0 +1,13 @@ +import { describe, expect, test } from 'vitest'; + +import { throwIfAborted } from '../throw-if-aborted'; + +describe('throw-if-aborted', () => { + test('basic', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + expect(throwIfAborted(abortSignal)).toBe(true); + abortController.abort('TEST_ABORT'); + expect(() => throwIfAborted(abortSignal)).toThrowError('TEST_ABORT'); + }); +}); diff --git a/packages/frontend/workspace/src/providers/utils/affine-io.ts b/packages/frontend/workspace/src/providers/utils/affine-io.ts new file mode 100644 index 0000000000..d2f7e26029 --- /dev/null +++ b/packages/frontend/workspace/src/providers/utils/affine-io.ts @@ -0,0 +1,15 @@ +import { Manager } from 'socket.io-client'; + +let ioManager: Manager | null = null; + +// use lazy initialization socket.io io manager +export function getIoManager(): Manager { + if (ioManager) { + return ioManager; + } + ioManager = new Manager(runtimeConfig.serverUrlPrefix + '/', { + autoConnect: false, + transports: ['websocket'], + }); + return ioManager; +} diff --git a/packages/frontend/workspace/src/providers/utils/async-queue.ts b/packages/frontend/workspace/src/providers/utils/async-queue.ts new file mode 100644 index 0000000000..d0002332b9 --- /dev/null +++ b/packages/frontend/workspace/src/providers/utils/async-queue.ts @@ -0,0 +1,58 @@ +export class AsyncQueue { + private _queue: T[]; + + private _resolveUpdate: (() => void) | null = null; + private _waitForUpdate: Promise | null = null; + + constructor(init: T[] = []) { + this._queue = init; + } + + get length() { + return this._queue.length; + } + + async next(abort?: AbortSignal): Promise { + const update = this._queue.shift(); + if (update) { + return update; + } else { + if (!this._waitForUpdate) { + this._waitForUpdate = new Promise(resolve => { + this._resolveUpdate = resolve; + }); + } + + await Promise.race([ + this._waitForUpdate, + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + + return this.next(abort); + } + } + + push(...updates: T[]) { + this._queue.push(...updates); + if (this._resolveUpdate) { + const resolve = this._resolveUpdate; + this._resolveUpdate = null; + this._waitForUpdate = null; + resolve(); + } + } + + remove(predicate: (update: T) => boolean) { + const index = this._queue.findIndex(predicate); + if (index !== -1) { + this._queue.splice(index, 1); + } + } +} diff --git a/packages/frontend/workspace/src/affine/utils.ts b/packages/frontend/workspace/src/providers/utils/base64.ts similarity index 67% rename from packages/frontend/workspace/src/affine/utils.ts rename to packages/frontend/workspace/src/providers/utils/base64.ts index 7c37f9c043..28096e678e 100644 --- a/packages/frontend/workspace/src/affine/utils.ts +++ b/packages/frontend/workspace/src/providers/utils/base64.ts @@ -1,20 +1,3 @@ -import type { Doc as YDoc } from 'yjs'; - -export type SubdocEvent = { - loaded: Set; - removed: Set; - added: Set; -}; - -export type UpdateHandler = (update: Uint8Array, origin: unknown) => void; -export type SubdocsHandler = (event: SubdocEvent) => void; -export type DestroyHandler = () => void; - -export type AwarenessChanges = Record< - 'added' | 'updated' | 'removed', - number[] ->; - export function uint8ArrayToBase64(array: Uint8Array): Promise { return new Promise(resolve => { // Create a blob from the Uint8Array diff --git a/packages/frontend/workspace/src/providers/utils/throw-if-aborted.ts b/packages/frontend/workspace/src/providers/utils/throw-if-aborted.ts new file mode 100644 index 0000000000..4646b0fbd9 --- /dev/null +++ b/packages/frontend/workspace/src/providers/utils/throw-if-aborted.ts @@ -0,0 +1,7 @@ +// because AbortSignal.throwIfAborted is not available in abortcontroller-polyfill +export function throwIfAborted(abort?: AbortSignal) { + if (abort?.aborted) { + throw new Error(abort.reason); + } + return true; +} diff --git a/tests/affine-desktop/e2e/basic.spec.ts b/tests/affine-desktop/e2e/basic.spec.ts index 048907f36a..c77223a573 100644 --- a/tests/affine-desktop/e2e/basic.spec.ts +++ b/tests/affine-desktop/e2e/basic.spec.ts @@ -62,43 +62,38 @@ test('app sidebar router forward/back', async ({ page }) => { }); } { - const title = (await page - .locator('.affine-doc-page-block-title') - .textContent()) as string; - expect(title.trim()).toBe('test3'); + await expect(page.locator('.affine-doc-page-block-title')).toHaveText( + 'test3' + ); } await page.click('[data-testid="app-sidebar-arrow-button-back"]'); await page.click('[data-testid="app-sidebar-arrow-button-back"]'); { - const title = (await page - .locator('.affine-doc-page-block-title') - .textContent()) as string; - expect(title.trim()).toBe('test1'); + await expect(page.locator('.affine-doc-page-block-title')).toHaveText( + 'test1' + ); } await page.click('[data-testid="app-sidebar-arrow-button-forward"]'); await page.click('[data-testid="app-sidebar-arrow-button-forward"]'); { - const title = (await page - .locator('.affine-doc-page-block-title') - .textContent()) as string; - expect(title.trim()).toBe('test3'); + await expect(page.locator('.affine-doc-page-block-title')).toHaveText( + 'test3' + ); } await historyShortcut(page, 'goBack'); await historyShortcut(page, 'goBack'); { - const title = (await page - .locator('.affine-doc-page-block-title') - .textContent()) as string; - expect(title.trim()).toBe('test1'); + await expect(page.locator('.affine-doc-page-block-title')).toHaveText( + 'test1' + ); } await historyShortcut(page, 'goForward'); await historyShortcut(page, 'goForward'); { - const title = (await page - .locator('.affine-doc-page-block-title') - .textContent()) as string; - expect(title.trim()).toBe('test3'); + await expect(page.locator('.affine-doc-page-block-title')).toHaveText( + 'test3' + ); } }); // } diff --git a/tests/affine-local/e2e/duplicate-page.spec.ts b/tests/affine-local/e2e/duplicate-page.spec.ts index 894e5fafe1..71e6b70890 100644 --- a/tests/affine-local/e2e/duplicate-page.spec.ts +++ b/tests/affine-local/e2e/duplicate-page.spec.ts @@ -18,5 +18,5 @@ test('Duplicate page should work', async ({ page }) => { const duplicateButton = page.getByTestId('editor-option-menu-duplicate'); await duplicateButton.click({ delay: 100 }); const title2 = getBlockSuiteEditorTitle(page); - expect(await title2.innerText()).toBe('test(1)'); + await expect(title2).toHaveText('test(1)', { timeout: 1000 }); }); diff --git a/tests/affine-local/e2e/local-first-collections-items.spec.ts b/tests/affine-local/e2e/local-first-collections-items.spec.ts index 089c27aec6..b59187f108 100644 --- a/tests/affine-local/e2e/local-first-collections-items.spec.ts +++ b/tests/affine-local/e2e/local-first-collections-items.spec.ts @@ -5,7 +5,10 @@ import { getBlockSuiteEditorTitle, waitForEditorLoad, } from '@affine-test/kit/utils/page-logic'; -import { clickSideBarCurrentWorkspaceBanner } from '@affine-test/kit/utils/sidebar'; +import { + clickSideBarAllPageButton, + clickSideBarCurrentWorkspaceBanner, +} from '@affine-test/kit/utils/sidebar'; import { createLocalWorkspace } from '@affine-test/kit/utils/workspace'; import type { Page } from '@playwright/test'; import { expect } from '@playwright/test'; @@ -76,6 +79,7 @@ test('Show collections items in sidebar', async ({ page }) => { skipInitialPage: true, }); expect(await items.count()).toBe(1); + await clickSideBarAllPageButton(page); await createLocalWorkspace( { name: 'Test 1', diff --git a/tests/affine-local/e2e/quick-search.spec.ts b/tests/affine-local/e2e/quick-search.spec.ts index a35bb61740..d86dcda8ef 100644 --- a/tests/affine-local/e2e/quick-search.spec.ts +++ b/tests/affine-local/e2e/quick-search.spec.ts @@ -9,9 +9,11 @@ import { import { clickSideBarAllPageButton } from '@affine-test/kit/utils/sidebar'; import { expect, type Page } from '@playwright/test'; -const openQuickSearchByShortcut = async (page: Page) => { +const openQuickSearchByShortcut = async (page: Page, checkVisible = true) => { await withCtrlOrMeta(page, () => page.keyboard.press('k', { delay: 50 })); - await page.waitForTimeout(1000); + if (checkVisible) { + expect(page.getByTestId('cmdk-quick-search')).toBeVisible(); + } }; const keyboardDownAndSelect = async (page: Page, label: string) => { @@ -188,7 +190,7 @@ test('Navigate to the 404 page and try to open quick search', async ({ await page.goto('http://localhost:8080/404'); const notFoundTip = page.locator('button >> text=Back to My Content'); await expect(notFoundTip).toBeVisible(); - await openQuickSearchByShortcut(page); + await openQuickSearchByShortcut(page, false); const quickSearch = page.locator('[data-testid=cmdk-quick-search]'); await expect(quickSearch).toBeVisible({ visible: false }); }); @@ -215,6 +217,7 @@ test('Autofocus input after select', async ({ page }) => { await openHomePage(page); await waitForEditorLoad(page); await clickNewPageButton(page); + await page.waitForTimeout(500); // wait for new page loaded await openQuickSearchByShortcut(page); await page.keyboard.press('ArrowUp'); const locator = page.locator('[cmdk-input]'); @@ -251,8 +254,9 @@ test('assert the recent browse pages are on the recent list', async ({ await waitForEditorLoad(page); { const title = getBlockSuiteEditorTitle(page); - await title.pressSequentially('sgtokidoki'); - expect(await title.innerText()).toBe('sgtokidoki'); + await title.click(); + await title.pressSequentially('sgtokidoki', { delay: 100 }); + await expect(title).toHaveText('sgtokidoki'); } // create second page @@ -262,8 +266,9 @@ test('assert the recent browse pages are on the recent list', async ({ await waitForEditorLoad(page); { const title = getBlockSuiteEditorTitle(page); - await title.pressSequentially('theliquidhorse'); - expect(await title.innerText()).toBe('theliquidhorse'); + await title.click(); + await title.pressSequentially('theliquidhorse', { delay: 100 }); + await expect(title).toHaveText('theliquidhorse'); } await page.waitForTimeout(200); @@ -273,8 +278,9 @@ test('assert the recent browse pages are on the recent list', async ({ await waitForEditorLoad(page); { const title = getBlockSuiteEditorTitle(page); - await title.pressSequentially('battlekot'); - expect(await title.innerText()).toBe('battlekot'); + await title.click(); + await title.pressSequentially('battlekot', { delay: 100 }); + await expect(title).toHaveText('battlekot'); } await openQuickSearchByShortcut(page); @@ -283,9 +289,9 @@ test('assert the recent browse pages are on the recent list', async ({ const quickSearchItems = page.locator( '[cmdk-item] [data-testid="cmdk-label"]' ); - expect(await quickSearchItems.nth(0).textContent()).toBe('battlekot'); - expect(await quickSearchItems.nth(1).textContent()).toBe('theliquidhorse'); - expect(await quickSearchItems.nth(2).textContent()).toBe('sgtokidoki'); + await expect(quickSearchItems.nth(0)).toHaveText('battlekot'); + await expect(quickSearchItems.nth(1)).toHaveText('theliquidhorse'); + await expect(quickSearchItems.nth(2)).toHaveText('sgtokidoki'); } // create forth page, and check does the recent page list only contains three pages @@ -299,8 +305,9 @@ test('assert the recent browse pages are on the recent list', async ({ await waitForEditorLoad(page); { const title = getBlockSuiteEditorTitle(page); - await title.pressSequentially('affine is the best'); - expect(await title.innerText()).toBe('affine is the best'); + await title.click(); + await title.pressSequentially('affine is the best', { delay: 100 }); + await expect(title).toHaveText('affine is the best', { timeout: 500 }); } await page.waitForTimeout(1000); await openQuickSearchByShortcut(page); @@ -308,9 +315,7 @@ test('assert the recent browse pages are on the recent list', async ({ const quickSearchItems = page.locator( '[cmdk-item] [data-testid="cmdk-label"]' ); - expect(await quickSearchItems.nth(0).textContent()).toBe( - 'affine is the best' - ); + await expect(quickSearchItems.nth(0)).toHaveText('affine is the best'); } }); diff --git a/tests/affine-local/e2e/settings.spec.ts b/tests/affine-local/e2e/settings.spec.ts index 195f414041..2b3c65d059 100644 --- a/tests/affine-local/e2e/settings.spec.ts +++ b/tests/affine-local/e2e/settings.spec.ts @@ -110,11 +110,11 @@ test('Different workspace should have different name in the setting panel', asyn await createLocalWorkspace({ name: 'New Workspace 3' }, page); await openSettingModal(page); await page.getByTestId('current-workspace-label').click(); - expect(await page.getByTestId('workspace-name-input').inputValue()).toBe( + await expect(page.getByTestId('workspace-name-input')).toHaveValue( 'New Workspace 3' ); await page.getByText('New Workspace 2').click(); - expect(await page.getByTestId('workspace-name-input').inputValue()).toBe( + await expect(page.getByTestId('workspace-name-input')).toHaveValue( 'New Workspace 2' ); }); diff --git a/tests/kit/utils/workspace.ts b/tests/kit/utils/workspace.ts index aafea8b05e..82830a70b9 100644 --- a/tests/kit/utils/workspace.ts +++ b/tests/kit/utils/workspace.ts @@ -1,4 +1,6 @@ -import type { Page } from '@playwright/test'; +import { expect, type Page } from '@playwright/test'; + +import { waitForEditorLoad } from './page-logic'; interface CreateWorkspaceParams { name: string; @@ -32,6 +34,10 @@ export async function createLocalWorkspace( delay: 500, }); + await waitForEditorLoad(page); + + await expect(page.getByTestId('workspace-name')).toHaveText(params.name); + // if (isDesktop) { // await page.getByTestId('create-workspace-continue-button').click(); // } diff --git a/yarn.lock b/yarn.lock index b81887b871..85d2cc2656 100644 --- a/yarn.lock +++ b/yarn.lock @@ -900,6 +900,7 @@ __metadata: "@types/ws": "npm:^8.5.7" async-call-rpc: "npm:^6.3.1" fake-indexeddb: "npm:^5.0.0" + idb: "npm:^7.1.1" is-svg: "npm:^5.0.0" jotai: "npm:^2.4.3" js-base64: "npm:^3.7.5" @@ -921,6 +922,7 @@ __metadata: zod: "npm:^3.22.4" peerDependencies: "@blocksuite/blocks": "*" + "@blocksuite/global": "*" "@blocksuite/store": "*" languageName: unknown linkType: soft