feat(core): support syncing workspaces and blobs in the background (#4057)

This commit is contained in:
Alex Yang
2023-08-31 00:40:34 -05:00
committed by GitHub
parent 4e45554585
commit 55b3182799
9 changed files with 146 additions and 6 deletions

View File

@@ -0,0 +1,165 @@
import type {
AffineCloudWorkspace,
WorkspaceCRUD,
} from '@affine/env/workspace';
import { WorkspaceFlavour } from '@affine/env/workspace';
import {
createWorkspaceMutation,
deleteWorkspaceMutation,
getWorkspaceQuery,
getWorkspacesQuery,
} from '@affine/graphql';
import { createIndexeddbStorage, Workspace } from '@blocksuite/store';
import { migrateLocalBlobStorage } from '@toeverything/infra/blocksuite';
import {
createIndexedDBProvider,
DEFAULT_DB_NAME,
} from '@toeverything/y-indexeddb';
import { getSession } from 'next-auth/react';
import { proxy } from 'valtio/vanilla';
import { getOrCreateWorkspace } from '../manager';
import { fetcher } from './gql';
const Y = Workspace.Y;
async function deleteLocalBlobStorage(id: string) {
const storage = createIndexeddbStorage(id);
const keys = await storage.crud.list();
for (const key of keys) {
await storage.crud.delete(key);
}
}
// we don't need to persistence the state into local storage
// because if a user clicks create multiple time and nothing happened
// because of the server delay or something, he/she will wait.
// and also the user journey of creating workspace is long.
const createdWorkspaces = proxy<string[]>([]);
export const CRUD: WorkspaceCRUD<WorkspaceFlavour.AFFINE_CLOUD> = {
create: async blockSuiteWorkspace => {
if (createdWorkspaces.some(id => id === blockSuiteWorkspace.id)) {
throw new Error('workspace already created');
}
const { createWorkspace } = await fetcher({
query: createWorkspaceMutation,
variables: {
init: new File(
[Y.encodeStateAsUpdate(blockSuiteWorkspace.doc)],
'initBinary.yDoc'
),
},
});
createdWorkspaces.push(blockSuiteWorkspace.id);
const newBLockSuiteWorkspace = getOrCreateWorkspace(
createWorkspace.id,
WorkspaceFlavour.AFFINE_CLOUD
);
Y.applyUpdate(
newBLockSuiteWorkspace.doc,
Y.encodeStateAsUpdate(blockSuiteWorkspace.doc)
);
await Promise.all(
[...blockSuiteWorkspace.doc.subdocs].map(async subdoc => {
subdoc.load();
return subdoc.whenLoaded.then(() => {
newBLockSuiteWorkspace.doc.subdocs.forEach(newSubdoc => {
if (newSubdoc.guid === subdoc.guid) {
Y.applyUpdate(newSubdoc, Y.encodeStateAsUpdate(subdoc));
}
});
});
})
);
const provider = createIndexedDBProvider(
newBLockSuiteWorkspace.doc,
DEFAULT_DB_NAME
);
provider.connect();
migrateLocalBlobStorage(blockSuiteWorkspace.id, createWorkspace.id)
.then(() => deleteLocalBlobStorage(blockSuiteWorkspace.id))
.catch(e => {
console.error('error when moving blob storage:', e);
});
// todo(himself65): delete old workspace in the future
return createWorkspace.id;
},
delete: async workspace => {
await fetcher({
query: deleteWorkspaceMutation,
variables: {
id: workspace.id,
},
});
},
get: async id => {
if (!environment.isServer && !navigator.onLine) {
// no network
return null;
}
if (
!(await getSession()
.then(() => true)
.catch(() => false))
) {
return null;
}
try {
await fetcher({
query: getWorkspaceQuery,
variables: {
id,
},
});
return {
id,
flavour: WorkspaceFlavour.AFFINE_CLOUD,
blockSuiteWorkspace: getOrCreateWorkspace(
id,
WorkspaceFlavour.AFFINE_CLOUD
),
} satisfies AffineCloudWorkspace;
} catch (e) {
console.error('error when fetching cloud workspace:', e);
return null;
}
},
list: async () => {
if (!environment.isServer && !navigator.onLine) {
// no network
return [];
}
if (
!(await getSession()
.then(() => true)
.catch(() => false))
) {
return [];
}
try {
const { workspaces } = await fetcher({
query: getWorkspacesQuery,
});
const ids = workspaces.map(({ id }) => id);
return ids.map(
id =>
({
id,
flavour: WorkspaceFlavour.AFFINE_CLOUD,
blockSuiteWorkspace: getOrCreateWorkspace(
id,
WorkspaceFlavour.AFFINE_CLOUD
),
}) satisfies AffineCloudWorkspace
);
} catch (e) {
console.error('error when fetching cloud workspaces:', e);
return [];
}
},
};

View File

@@ -0,0 +1,93 @@
import { createIndexeddbStorage } from '@blocksuite/store';
import { pushBinary } from '@toeverything/y-indexeddb';
import type { Doc } from 'yjs';
import { applyUpdate } from 'yjs';
import { createCloudBlobStorage } from '../blob/cloud-blob-storage';
import { downloadBinaryFromCloud } from '../providers/cloud';
import { CRUD } from './crud';
let abortController: AbortController | undefined;
const downloadRecursive = async (
rootGuid: string,
doc: Doc,
signal: AbortSignal
): Promise<void> => {
if (signal.aborted) {
return;
}
const binary = await downloadBinaryFromCloud(rootGuid, doc.guid);
if (typeof binary !== 'boolean') {
const update = new Uint8Array(binary);
if (rootGuid === doc.guid) {
// only apply the root doc
applyUpdate(doc, update, 'affine-cloud-service');
} else {
await pushBinary(doc.guid, update);
}
}
return Promise.all(
[...doc.subdocs.values()].map(subdoc =>
downloadRecursive(rootGuid, subdoc, signal)
)
).then();
};
export async function startSync() {
abortController = new AbortController();
const signal = abortController.signal;
const workspaces = await CRUD.list();
const downloadCloudPromises = workspaces.map(workspace =>
downloadRecursive(workspace.id, workspace.blockSuiteWorkspace.doc, signal)
);
const syncBlobPromises = workspaces.map(async workspace => {
const cloudBlobStorage = createCloudBlobStorage(workspace.id);
const indexeddbBlobStorage = createIndexeddbStorage(workspace.id);
return Promise.all([
cloudBlobStorage.crud.list(),
indexeddbBlobStorage.crud.list(),
]).then(([cloudKeys, indexeddbKeys]) => {
if (signal.aborted) {
return;
}
const cloudKeysSet = new Set(cloudKeys);
const indexeddbKeysSet = new Set(indexeddbKeys);
// missing in indexeddb
const missingLocalKeys = cloudKeys.filter(
key => !indexeddbKeysSet.has(key)
);
// missing in cloud
const missingCloudKeys = indexeddbKeys.filter(
key => !cloudKeysSet.has(key)
);
return Promise.all([
...missingLocalKeys.map(key =>
cloudBlobStorage.crud.get(key).then(async value => {
if (signal.aborted) {
return;
}
if (value) {
await indexeddbBlobStorage.crud.set(key, value);
}
})
),
...missingCloudKeys.map(key =>
indexeddbBlobStorage.crud.get(key).then(async value => {
if (signal.aborted) {
return;
}
if (value) {
await cloudBlobStorage.crud.set(key, value);
}
})
),
]);
});
});
await Promise.all([...downloadCloudPromises, ...syncBlobPromises]);
}
export async function stopSync() {
abortController?.abort();
}

View File

View File

@@ -143,6 +143,7 @@ const fetchMetadata: FetchMetadata = async (get, { signal }) => {
removed.forEach(meta => {
metadata.splice(metadata.indexOf(meta), 1);
});
Adapter.Events['service:stop']?.();
continue;
}
try {
@@ -178,6 +179,7 @@ const fetchMetadata: FetchMetadata = async (get, { signal }) => {
} catch (e) {
console.error('list data error:', e);
}
Adapter.Events['service:start']?.();
}
}
const metadataMap = new Map(metadata.map(x => [x.id, x]));

View File

@@ -8,23 +8,30 @@ const Y = Workspace.Y;
const logger = new DebugLogger('affine:cloud');
const hashMap = new Map<string, ArrayBuffer>();
export async function downloadBinaryFromCloud(
rootGuid: string,
pageGuid: string
) {
): Promise<boolean | ArrayBuffer> {
if (hashMap.has(`${rootGuid}/${pageGuid}`)) {
return true;
}
const response = await fetchWithReport(
runtimeConfig.serverUrlPrefix +
`/api/workspaces/${rootGuid}/docs/${pageGuid}`
);
if (response.ok) {
return response.arrayBuffer();
const arrayBuffer = await response.arrayBuffer();
hashMap.set(`${rootGuid}/${pageGuid}`, arrayBuffer);
return arrayBuffer;
}
return false;
}
async function downloadBinary(rootGuid: string, doc: Doc) {
const buffer = await downloadBinaryFromCloud(rootGuid, doc.guid);
if (buffer) {
if (typeof buffer !== 'boolean') {
Y.applyUpdate(doc, new Uint8Array(buffer), 'affine-cloud');
}
}