From 1d1fb6ca31768f304703e174ae021a74950ee246 Mon Sep 17 00:00:00 2001 From: Alex Yang Date: Wed, 6 Sep 2023 20:54:39 -0700 Subject: [PATCH] feat(core): await sync doc (#4247) --- .../src/components/migration-fallback.tsx | 98 +++++-------------- packages/workspace/src/affine/crud.ts | 34 ++++--- packages/y-provider/src/data-source.ts | 37 +++++++ 3 files changed, 82 insertions(+), 87 deletions(-) diff --git a/apps/core/src/components/migration-fallback.tsx b/apps/core/src/components/migration-fallback.tsx index ee2863d873..97fc4de5eb 100644 --- a/apps/core/src/components/migration-fallback.tsx +++ b/apps/core/src/components/migration-fallback.tsx @@ -3,12 +3,14 @@ import type { LocalIndexedDBBackgroundProvider, SQLiteProvider, } from '@affine/env/workspace'; +import { + syncDataSourceFromDoc, + syncDocFromDataSource, +} from '@affine/y-provider'; import { assertExists } from '@blocksuite/global/utils'; import { Button } from '@toeverything/components/button'; import { forceUpgradePages } from '@toeverything/infra/blocksuite'; import { useCallback, useMemo, useState } from 'react'; -import type { Doc as YDoc } from 'yjs'; -import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs'; import { useCurrentWorkspace } from '../hooks/current/use-current-workspace'; @@ -36,85 +38,31 @@ export const MigrationFallback = function MigrationFallback() { }, [providers]); const handleClick = useCallback(async () => { setDone(false); - const downloadRecursively = async (doc: YDoc) => { - { - const docState = await localProvider.datasource.queryDocState( - doc.guid, - { - stateVector: encodeStateVector(doc), - } - ); - console.log('download indexeddb', doc.guid); - if (docState) { - applyUpdate(doc, docState.missing, 'migration'); - } - } - if (remoteProvider) { - { - const docState = await remoteProvider.datasource.queryDocState( - doc.guid, - { - stateVector: encodeStateVector(doc), - } - ); - console.log('download remote', doc.guid); - if (docState) { - applyUpdate(doc, docState.missing, 'migration'); - } - } - } - await Promise.all( - [...doc.subdocs].map(async subdoc => { - await downloadRecursively(subdoc); - }) + await syncDocFromDataSource( + workspace.blockSuiteWorkspace.doc, + localProvider.datasource + ); + if (remoteProvider) { + await syncDocFromDataSource( + workspace.blockSuiteWorkspace.doc, + remoteProvider.datasource ); - { - await localProvider.datasource.sendDocUpdate( - doc.guid, - encodeStateAsUpdate(doc) - ); - console.log('upload indexeddb', doc.guid); - if (remoteProvider) { - await remoteProvider.datasource.sendDocUpdate( - doc.guid, - encodeStateAsUpdate(doc) - ); - console.log('upload remote', doc.guid); - } - } - }; - const uploadRecursively = async (doc: YDoc) => { - { - await localProvider.datasource.sendDocUpdate( - doc.guid, - encodeStateAsUpdate(doc) - ); - console.log('upload indexeddb', doc.guid); - if (remoteProvider) { - await remoteProvider.datasource.sendDocUpdate( - doc.guid, - encodeStateAsUpdate(doc) - ); - console.log('upload remote', doc.guid); - } - } - await Promise.all( - [...doc.subdocs].map(async subdoc => { - await uploadRecursively(subdoc); - }) - ); - }; + } - await downloadRecursively(workspace.blockSuiteWorkspace.doc); - console.log('download done'); - - console.log('start migration'); await forceUpgradePages({ getCurrentRootDoc: async () => workspace.blockSuiteWorkspace.doc, getSchema: () => workspace.blockSuiteWorkspace.schema, }); - await uploadRecursively(workspace.blockSuiteWorkspace.doc); - console.log('migration done'); + await syncDataSourceFromDoc( + workspace.blockSuiteWorkspace.doc, + localProvider.datasource + ); + if (remoteProvider) { + await syncDataSourceFromDoc( + workspace.blockSuiteWorkspace.doc, + remoteProvider.datasource + ); + } setDone(true); }, [ localProvider.datasource, diff --git a/packages/workspace/src/affine/crud.ts b/packages/workspace/src/affine/crud.ts index 576f38a07c..430ebc3024 100644 --- a/packages/workspace/src/affine/crud.ts +++ b/packages/workspace/src/affine/crud.ts @@ -9,6 +9,8 @@ import { getWorkspaceQuery, getWorkspacesQuery, } from '@affine/graphql'; +import { createAffineDataSource } from '@affine/workspace/affine/index'; +import { syncDataSourceFromDoc } from '@affine/y-provider'; import { createIndexeddbStorage, Workspace } from '@blocksuite/store'; import { migrateLocalBlobStorage } from '@toeverything/infra/blocksuite'; import { @@ -38,35 +40,43 @@ async function deleteLocalBlobStorage(id: string) { const createdWorkspaces = proxy([]); export const CRUD: WorkspaceCRUD = { - create: async blockSuiteWorkspace => { - if (createdWorkspaces.some(id => id === blockSuiteWorkspace.id)) { + create: async upstreamWorkspace => { + if (createdWorkspaces.some(id => id === upstreamWorkspace.id)) { throw new Error('workspace already created'); } const { createWorkspace } = await fetcher({ query: createWorkspaceMutation, variables: { init: new File( - [Y.encodeStateAsUpdate(blockSuiteWorkspace.doc)], + [Y.encodeStateAsUpdate(upstreamWorkspace.doc)], 'initBinary.yDoc' ), }, }); - createdWorkspaces.push(blockSuiteWorkspace.id); - const newBLockSuiteWorkspace = getOrCreateWorkspace( + createdWorkspaces.push(upstreamWorkspace.id); + const newBlockSuiteWorkspace = getOrCreateWorkspace( createWorkspace.id, WorkspaceFlavour.AFFINE_CLOUD ); + const datasource = createAffineDataSource( + createWorkspace.id, + newBlockSuiteWorkspace.doc, + newBlockSuiteWorkspace.awarenessStore.awareness + ); + + await syncDataSourceFromDoc(upstreamWorkspace.doc, datasource); + Y.applyUpdate( - newBLockSuiteWorkspace.doc, - Y.encodeStateAsUpdate(blockSuiteWorkspace.doc) + newBlockSuiteWorkspace.doc, + Y.encodeStateAsUpdate(upstreamWorkspace.doc) ); await Promise.all( - [...blockSuiteWorkspace.doc.subdocs].map(async subdoc => { + [...upstreamWorkspace.doc.subdocs].map(async subdoc => { subdoc.load(); return subdoc.whenLoaded.then(() => { - newBLockSuiteWorkspace.doc.subdocs.forEach(newSubdoc => { + newBlockSuiteWorkspace.doc.subdocs.forEach(newSubdoc => { if (newSubdoc.guid === subdoc.guid) { Y.applyUpdate(newSubdoc, Y.encodeStateAsUpdate(subdoc)); } @@ -76,12 +86,12 @@ export const CRUD: WorkspaceCRUD = { ); const provider = createIndexedDBProvider( - newBLockSuiteWorkspace.doc, + newBlockSuiteWorkspace.doc, DEFAULT_DB_NAME ); provider.connect(); - migrateLocalBlobStorage(blockSuiteWorkspace.id, createWorkspace.id) - .then(() => deleteLocalBlobStorage(blockSuiteWorkspace.id)) + migrateLocalBlobStorage(upstreamWorkspace.id, createWorkspace.id) + .then(() => deleteLocalBlobStorage(upstreamWorkspace.id)) .catch(e => { console.error('error when moving blob storage:', e); }); diff --git a/packages/y-provider/src/data-source.ts b/packages/y-provider/src/data-source.ts index f5cbecacda..ac95e0606b 100644 --- a/packages/y-provider/src/data-source.ts +++ b/packages/y-provider/src/data-source.ts @@ -1,3 +1,6 @@ +import type { Doc as YDoc } from 'yjs'; +import { applyUpdate, encodeStateAsUpdate } from 'yjs'; + import type { DocState } from './types'; export interface DatasourceDocAdapter { @@ -26,6 +29,40 @@ export interface DatasourceDocAdapter { ): () => void; } +export async function syncDocFromDataSource( + rootDoc: YDoc, + datasource: DatasourceDocAdapter +) { + const downloadDocStateRecursively = async (doc: YDoc) => { + const docState = await datasource.queryDocState(doc.guid); + if (docState) { + applyUpdate(doc, docState.missing, 'sync-doc-from-datasource'); + } + await Promise.all( + [...doc.subdocs].map(async subdoc => { + await downloadDocStateRecursively(subdoc); + }) + ); + }; + await downloadDocStateRecursively(rootDoc); +} + +export async function syncDataSourceFromDoc( + rootDoc: YDoc, + datasource: DatasourceDocAdapter +) { + const uploadDocStateRecursively = async (doc: YDoc) => { + await datasource.sendDocUpdate(doc.guid, encodeStateAsUpdate(doc)); + await Promise.all( + [...doc.subdocs].map(async subdoc => { + await uploadDocStateRecursively(subdoc); + }) + ); + }; + + await uploadDocStateRecursively(rootDoc); +} + /** * query the datasource from source, and save the latest update to target *