refactor: new provider (#4900)

This commit is contained in:
EYHN
2023-11-17 15:50:01 +08:00
committed by GitHub
parent 9baad36e41
commit aa4c7407de
48 changed files with 1783 additions and 1480 deletions

View File

@@ -9,16 +9,10 @@ import {
getWorkspaceQuery,
getWorkspacesQuery,
} from '@affine/graphql';
import { createAffineDataSource } from '@affine/workspace/affine/index';
import { createIndexeddbStorage, Workspace } from '@blocksuite/store';
import { migrateLocalBlobStorage } from '@toeverything/infra/blocksuite';
import {
createIndexedDBProvider,
DEFAULT_DB_NAME,
} from '@toeverything/y-indexeddb';
import { getSession } from 'next-auth/react';
import { proxy } from 'valtio/vanilla';
import { syncDataSourceFromDoc } from 'y-provider';
import { getOrCreateWorkspace } from '../manager';
import { fetcher } from './gql';
@@ -77,21 +71,6 @@ export const CRUD: WorkspaceCRUD<WorkspaceFlavour.AFFINE_CLOUD> = {
})
);
const datasource = createAffineDataSource(
createWorkspace.id,
newBlockSuiteWorkspace.doc,
newBlockSuiteWorkspace.awarenessStore.awareness
);
const disconnect = datasource.onDocUpdate(() => {});
await syncDataSourceFromDoc(upstreamWorkspace.doc, datasource);
disconnect();
const provider = createIndexedDBProvider(
newBlockSuiteWorkspace.doc,
DEFAULT_DB_NAME
);
provider.connect();
migrateLocalBlobStorage(upstreamWorkspace.id, createWorkspace.id)
.then(() => deleteLocalBlobStorage(upstreamWorkspace.id))
.catch(e => {

View File

@@ -0,0 +1,37 @@
import { fetchWithTraceReport } from '@affine/graphql';
const hashMap = new Map<string, CloudDoc>();
type DocPublishMode = 'edgeless' | 'page';
export type CloudDoc = {
arrayBuffer: ArrayBuffer;
publishMode: DocPublishMode;
};
export async function downloadBinaryFromCloud(
rootGuid: string,
pageGuid: string
): Promise<CloudDoc | null> {
const cached = hashMap.get(`${rootGuid}/${pageGuid}`);
if (cached) {
return cached;
}
const response = await fetchWithTraceReport(
runtimeConfig.serverUrlPrefix +
`/api/workspaces/${rootGuid}/docs/${pageGuid}`,
{
priority: 'high',
}
);
if (response.ok) {
const publishMode = (response.headers.get('publish-mode') ||
'page') as DocPublishMode;
const arrayBuffer = await response.arrayBuffer();
hashMap.set(`${rootGuid}/${pageGuid}`, { arrayBuffer, publishMode });
// return both arrayBuffer and publish mode
return { arrayBuffer, publishMode };
}
return null;
}

View File

@@ -1,263 +0,0 @@
import { DebugLogger } from '@affine/debug';
import type { Socket } from 'socket.io-client';
import { Manager } from 'socket.io-client';
import {
applyAwarenessUpdate,
type Awareness,
encodeAwarenessUpdate,
removeAwarenessStates,
} from 'y-protocols/awareness';
import type { DocDataSource } from 'y-provider';
import type { Doc } from 'yjs';
import { MultipleBatchSyncSender } from './batch-sync-sender';
import {
type AwarenessChanges,
base64ToUint8Array,
uint8ArrayToBase64,
} from './utils';
let ioManager: Manager | null = null;
// use lazy initialization to avoid global side effect
function getIoManager(): Manager {
if (ioManager) {
return ioManager;
}
ioManager = new Manager(runtimeConfig.serverUrlPrefix + '/', {
autoConnect: false,
transports: ['websocket'],
});
return ioManager;
}
const logger = new DebugLogger('affine:sync');
export const createAffineDataSource = (
id: string,
rootDoc: Doc,
awareness: Awareness
) => {
if (id !== rootDoc.guid) {
console.warn('important!! please use doc.guid as roomName');
}
logger.debug('createAffineDataSource', id, rootDoc.guid);
const socket = getIoManager().socket('/');
const syncSender = new MultipleBatchSyncSender(async (guid, updates) => {
const payload = await Promise.all(
updates.map(update => uint8ArrayToBase64(update))
);
return new Promise(resolve => {
socket.emit(
'client-update-v2',
{
workspaceId: rootDoc.guid,
guid,
updates: payload,
},
(response: {
// TODO: reuse `EventError` with server
error?: any;
data: any;
}) => {
// TODO: raise error with different code to users
if (response.error) {
logger.error('client-update-v2 error', {
workspaceId: rootDoc.guid,
guid,
response,
});
}
resolve({
accepted: !response.error,
// TODO: reuse `EventError` with server
retry: response.error?.code === 'INTERNAL',
});
}
);
});
});
return {
get socket() {
return socket;
},
queryDocState: async (guid, options) => {
const stateVector = options?.stateVector
? await uint8ArrayToBase64(options.stateVector)
: undefined;
return new Promise((resolve, reject) => {
logger.debug('doc-load-v2', {
workspaceId: rootDoc.guid,
guid,
stateVector,
});
socket.emit(
'doc-load-v2',
{
workspaceId: rootDoc.guid,
guid,
stateVector,
},
(
response: // TODO: reuse `EventError` with server
{ error: any } | { data: { missing: string; state: string } }
) => {
logger.debug('doc-load callback', {
workspaceId: rootDoc.guid,
guid,
stateVector,
response,
});
if ('error' in response) {
// TODO: result `EventError` with server
if (response.error.code === 'DOC_NOT_FOUND') {
resolve(false);
} else {
reject(new Error(response.error.message));
}
} else {
resolve({
missing: base64ToUint8Array(response.data.missing),
state: response.data.state
? base64ToUint8Array(response.data.state)
: undefined,
});
}
}
);
});
},
sendDocUpdate: async (guid: string, update: Uint8Array) => {
logger.debug('client-update-v2', {
workspaceId: rootDoc.guid,
guid,
update,
});
await syncSender.send(guid, update);
},
onDocUpdate: callback => {
const onUpdate = async (message: {
workspaceId: string;
guid: string;
updates: string[];
}) => {
if (message.workspaceId === rootDoc.guid) {
message.updates.forEach(update => {
callback(message.guid, base64ToUint8Array(update));
});
}
};
let destroyAwareness = () => {};
socket.on('server-updates', onUpdate);
socket.on('connect', () => {
socket.emit(
'client-handshake',
rootDoc.guid,
(response: { error?: any }) => {
if (!response.error) {
syncSender.start();
destroyAwareness = setupAffineAwareness(
socket,
rootDoc,
awareness
);
}
}
);
});
socket.connect();
return () => {
syncSender.stop();
socket.emit('client-leave', rootDoc.guid);
socket.off('server-updates', onUpdate);
destroyAwareness();
socket.disconnect();
};
},
} satisfies DocDataSource & { readonly socket: Socket };
};
function setupAffineAwareness(
conn: Socket,
rootDoc: Doc,
awareness: Awareness
) {
const awarenessBroadcast = ({
workspaceId,
awarenessUpdate,
}: {
workspaceId: string;
awarenessUpdate: string;
}) => {
if (workspaceId !== rootDoc.guid) {
return;
}
applyAwarenessUpdate(
awareness,
base64ToUint8Array(awarenessUpdate),
'server'
);
};
const awarenessUpdate = (changes: AwarenessChanges, origin: unknown) => {
if (origin === 'server') {
return;
}
const changedClients = Object.values(changes).reduce((res, cur) => [
...res,
...cur,
]);
const update = encodeAwarenessUpdate(awareness, changedClients);
uint8ArrayToBase64(update)
.then(encodedUpdate => {
conn.emit('awareness-update', {
workspaceId: rootDoc.guid,
awarenessUpdate: encodedUpdate,
});
})
.catch(err => logger.error(err));
};
const newClientAwarenessInitHandler = () => {
const awarenessUpdate = encodeAwarenessUpdate(awareness, [
awareness.clientID,
]);
uint8ArrayToBase64(awarenessUpdate)
.then(encodedAwarenessUpdate => {
conn.emit('awareness-update', {
guid: rootDoc.guid,
awarenessUpdate: encodedAwarenessUpdate,
});
})
.catch(err => logger.error(err));
};
const windowBeforeUnloadHandler = () => {
removeAwarenessStates(awareness, [awareness.clientID], 'window unload');
};
conn.on('server-awareness-broadcast', awarenessBroadcast);
conn.on('new-client-awareness-init', newClientAwarenessInitHandler);
awareness.on('update', awarenessUpdate);
window.addEventListener('beforeunload', windowBeforeUnloadHandler);
conn.emit('awareness-init', rootDoc.guid);
return () => {
awareness.off('update', awarenessUpdate);
conn.off('server-awareness-broadcast', awarenessBroadcast);
conn.off('new-client-awareness-init', newClientAwarenessInitHandler);
window.removeEventListener('unload', windowBeforeUnloadHandler);
};
}

View File

@@ -1,120 +0,0 @@
import { DebugLogger } from '@affine/debug';
import { createIndexeddbStorage } from '@blocksuite/store';
import {
createIndexedDBDatasource,
DEFAULT_DB_NAME,
downloadBinary,
} from '@toeverything/y-indexeddb';
import { syncDataSource } from 'y-provider';
import type { Doc } from 'yjs';
import { applyUpdate } from 'yjs';
import { createCloudBlobStorage } from '../blob/cloud-blob-storage';
import { createAffineDataSource } from '.';
import { CRUD } from './crud';
const performanceLogger = new DebugLogger('performance:sync');
let abortController: AbortController | undefined;
const downloadRootFromIndexedDB = async (
rootGuid: string,
doc: Doc,
signal: AbortSignal
): Promise<void> => {
if (signal.aborted) {
return;
}
const update = await downloadBinary(rootGuid);
if (update !== false) {
applyUpdate(doc, update);
}
};
export async function startSync() {
performanceLogger.info('start');
abortController = new AbortController();
const signal = abortController.signal;
const workspaces = await CRUD.list();
performanceLogger.info('CRUD list');
const syncDocPromises = workspaces.map(workspace =>
downloadRootFromIndexedDB(
workspace.id,
workspace.blockSuiteWorkspace.doc,
signal
)
);
await Promise.all(syncDocPromises);
performanceLogger.info('all sync promise');
const syncPromises = workspaces.map(workspace => {
const remoteDataSource = createAffineDataSource(
workspace.id,
workspace.blockSuiteWorkspace.doc,
workspace.blockSuiteWorkspace.awarenessStore.awareness
);
const indexeddbDataSource = createIndexedDBDatasource({
dbName: DEFAULT_DB_NAME,
});
return syncDataSource(
(): string[] => [
workspace.blockSuiteWorkspace.doc.guid,
...[...workspace.blockSuiteWorkspace.doc.subdocs].map(doc => doc.guid),
],
remoteDataSource,
indexeddbDataSource
);
});
const syncBlobPromises = workspaces.map(async workspace => {
const cloudBlobStorage = createCloudBlobStorage(workspace.id);
const indexeddbBlobStorage = createIndexeddbStorage(workspace.id);
return Promise.all([
cloudBlobStorage.crud.list(),
indexeddbBlobStorage.crud.list(),
]).then(([cloudKeys, indexeddbKeys]) => {
if (signal.aborted) {
return;
}
const cloudKeysSet = new Set(cloudKeys);
const indexeddbKeysSet = new Set(indexeddbKeys);
// missing in indexeddb
const missingLocalKeys = cloudKeys.filter(
key => !indexeddbKeysSet.has(key)
);
// missing in cloud
const missingCloudKeys = indexeddbKeys.filter(
key => !cloudKeysSet.has(key)
);
return Promise.all([
...missingLocalKeys.map(key =>
cloudBlobStorage.crud.get(key).then(async value => {
if (signal.aborted) {
return;
}
if (value) {
await indexeddbBlobStorage.crud.set(key, value);
}
})
),
...missingCloudKeys.map(key =>
indexeddbBlobStorage.crud.get(key).then(async value => {
if (signal.aborted) {
return;
}
if (value) {
await cloudBlobStorage.crud.set(key, value);
}
})
),
]);
});
});
await Promise.all([...syncPromises, ...syncBlobPromises]);
performanceLogger.info('sync done');
}
export async function stopSync() {
abortController?.abort();
}

View File

@@ -268,7 +268,6 @@ export const rootWorkspacesMetadataAtom = atom<
}
if (newWorkspaceId) {
set(currentPageIdAtom, null);
set(currentWorkspaceIdAtom, newWorkspaceId);
}
return metadata;

View File

@@ -1,90 +0,0 @@
/**
* @vitest-environment happy-dom
*/
import 'fake-indexeddb/auto';
import type {
LocalIndexedDBBackgroundProvider,
LocalIndexedDBDownloadProvider,
} from '@affine/env/workspace';
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import { Schema, Workspace } from '@blocksuite/store';
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';
import {
createIndexedDBBackgroundProvider,
createIndexedDBDownloadProvider,
} from '..';
const schema = new Schema();
schema.register(AffineSchemas).register(__unstableSchemas);
beforeEach(() => {
vi.useFakeTimers({ toFake: ['requestIdleCallback'] });
});
afterEach(() => {
globalThis.localStorage.clear();
globalThis.indexedDB.deleteDatabase('affine-local');
});
describe('download provider', () => {
test('basic', async () => {
let prev: any;
{
const workspace = new Workspace({
id: 'test',
isSSR: true,
schema,
});
const provider = createIndexedDBBackgroundProvider(
workspace.id,
workspace.doc,
{
awareness: workspace.awarenessStore.awareness,
}
) as LocalIndexedDBBackgroundProvider;
provider.connect();
const page = workspace.createPage({
id: 'page0',
});
await page.waitForLoaded();
const pageBlockId = page.addBlock('affine:page', {
title: new page.Text(''),
});
page.addBlock('affine:surface', {}, pageBlockId);
const frameId = page.addBlock('affine:note', {}, pageBlockId);
page.addBlock('affine:paragraph', {}, frameId);
await new Promise(resolve => setTimeout(resolve, 1000));
provider.disconnect();
prev = workspace.doc.toJSON();
}
{
const workspace = new Workspace({
id: 'test',
isSSR: true,
schema,
});
const provider = createIndexedDBDownloadProvider(
workspace.id,
workspace.doc,
{
awareness: workspace.awarenessStore.awareness,
}
) as LocalIndexedDBDownloadProvider;
provider.sync();
await provider.whenReady;
expect(workspace.doc.toJSON()).toEqual({
...prev,
// download provider only download the root doc
spaces: {
page0: {
blocks: {},
},
},
});
}
});
});

View File

@@ -1,103 +0,0 @@
/**
* @vitest-environment happy-dom
*/
import 'fake-indexeddb/auto';
import type { AffineSocketIOProvider } from '@affine/env/workspace';
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import { Schema, Workspace } from '@blocksuite/store';
import { describe, expect, test } from 'vitest';
import * as awarenessProtocol from 'y-protocols/awareness';
import { Doc } from 'yjs';
import { createAffineSocketIOProvider } from '..';
const schema = new Schema();
schema.register(AffineSchemas).register(__unstableSchemas);
describe('sockio provider', () => {
test.skip('test storage', async () => {
const workspaceId = 'test-storage-ws';
{
const workspace = new Workspace({
id: workspaceId,
isSSR: true,
schema,
});
const provider = createAffineSocketIOProvider(
workspace.id,
workspace.doc,
{
awareness: workspace.awarenessStore.awareness,
}
) as AffineSocketIOProvider;
provider.connect();
const page = workspace.createPage({
id: 'page',
});
await page.waitForLoaded();
page.addBlock('affine:page', {
title: new page.Text('123123'),
});
await new Promise(resolve => setTimeout(resolve, 1000));
}
{
const workspace = new Workspace({
id: workspaceId,
isSSR: true,
schema,
});
const provider = createAffineSocketIOProvider(
workspace.id,
workspace.doc,
{
awareness: workspace.awarenessStore.awareness,
}
) as AffineSocketIOProvider;
provider.connect();
await new Promise(resolve => setTimeout(resolve, 1000));
const page = workspace.getPage('page')!;
await page.waitForLoaded();
const block = page.getBlockByFlavour('affine:page');
expect(block[0].flavour).toEqual('affine:page');
}
});
test.skip('test collaboration', async () => {
const workspaceId = 'test-collboration-ws';
{
const doc = new Doc({ guid: workspaceId });
const provider = createAffineSocketIOProvider(doc.guid, doc, {
awareness: new awarenessProtocol.Awareness(doc),
}) as AffineSocketIOProvider;
const doc2 = new Doc({ guid: workspaceId });
const provider2 = createAffineSocketIOProvider(doc2.guid, doc2, {
awareness: new awarenessProtocol.Awareness(doc2),
}) as AffineSocketIOProvider;
provider.connect();
provider2.connect();
await new Promise(resolve => setTimeout(resolve, 500));
const subdoc = new Doc();
const folder = doc.getMap();
folder.set('subDoc', subdoc);
subdoc.getText().insert(0, 'subDoc content');
await new Promise(resolve => setTimeout(resolve, 1000));
expect(
(doc2.getMap().get('subDoc') as Doc).getText().toJSON(),
'subDoc content'
);
}
});
});

View File

@@ -1,165 +0,0 @@
import type {
SQLiteDBDownloadProvider,
SQLiteProvider,
} from '@affine/env/workspace';
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import type { Y as YType } from '@blocksuite/store';
import { Schema, Workspace } from '@blocksuite/store';
import type { DBHandlerManager } from '@toeverything/infra/handler';
import type {
EventMap,
UnwrapManagerHandlerToClientSide,
} from '@toeverything/infra/type';
import { nanoid } from 'nanoid';
import { setTimeout } from 'timers/promises';
import { beforeEach, describe, expect, test, vi } from 'vitest';
import { getDoc } from 'y-provider';
import {
createSQLiteDBDownloadProvider,
createSQLiteProvider,
} from '../sqlite-providers';
const Y = Workspace.Y;
let id: string;
let workspace: Workspace;
let provider: SQLiteProvider;
let downloadProvider: SQLiteDBDownloadProvider;
let offlineYdoc: YType.Doc;
let triggerDBUpdate:
| Parameters<typeof window.events.db.onExternalUpdate>[0]
| null = null;
const mockedAddBlob = vi.fn();
vi.stubGlobal('window', {
apis: {
db: {
getDocAsUpdates: async (_, guid) => {
const subdoc = guid ? getDoc(offlineYdoc, guid) : offlineYdoc;
if (!subdoc) {
return false;
}
return Y.encodeStateAsUpdate(subdoc);
},
applyDocUpdate: async (_, update, subdocId) => {
const subdoc = subdocId ? getDoc(offlineYdoc, subdocId) : offlineYdoc;
if (!subdoc) {
return;
}
Y.applyUpdate(subdoc, update, 'sqlite');
},
getBlobKeys: async () => {
// todo: may need to hack the way to get hash keys of blobs
return [];
},
addBlob: mockedAddBlob,
} satisfies Partial<UnwrapManagerHandlerToClientSide<DBHandlerManager>>,
},
events: {
db: {
onExternalUpdate: fn => {
triggerDBUpdate = fn;
return () => {
triggerDBUpdate = null;
};
},
},
} as Partial<EventMap>,
});
vi.stubGlobal('environment', {
isDesktop: true,
});
const schema = new Schema();
schema.register(AffineSchemas).register(__unstableSchemas);
beforeEach(() => {
id = nanoid();
workspace = new Workspace({
id,
isSSR: true,
schema,
});
provider = createSQLiteProvider(workspace.id, workspace.doc, {
awareness: workspace.awarenessStore.awareness,
}) as SQLiteProvider;
downloadProvider = createSQLiteDBDownloadProvider(
workspace.id,
workspace.doc,
{
awareness: workspace.awarenessStore.awareness,
}
) as SQLiteDBDownloadProvider;
offlineYdoc = new Y.Doc();
offlineYdoc.getText('text').insert(0, 'sqlite-hello');
});
describe('SQLite download provider', () => {
test('sync updates', async () => {
// on connect, the updates from sqlite should be sync'ed to the existing ydoc
workspace.doc.getText('text').insert(0, 'mem-hello');
downloadProvider.sync();
await downloadProvider.whenReady;
// depending on the nature of the sync, the data can be sync'ed in either direction
const options = ['sqlite-hellomem-hello', 'mem-hellosqlite-hello'];
const synced = options.filter(
o => o === workspace.doc.getText('text').toString()
);
expect(synced.length).toBe(1);
});
// there is no updates from sqlite for now
test.skip('on db update', async () => {
provider.connect();
await setTimeout(200);
offlineYdoc.getText('text').insert(0, 'sqlite-world');
triggerDBUpdate?.({
workspaceId: id + '-another-id',
update: Y.encodeStateAsUpdate(offlineYdoc),
});
// not yet updated (because the workspace id is different)
expect(workspace.doc.getText('text').toString()).toBe('');
triggerDBUpdate?.({
workspaceId: id,
update: Y.encodeStateAsUpdate(offlineYdoc),
});
expect(workspace.doc.getText('text').toString()).toBe(
'sqlite-worldsqlite-hello'
);
});
test('disconnect handlers', async () => {
const offHandler = vi.fn();
let handleUpdate = () => {};
let handleSubdocs = () => {};
workspace.doc.on = (event: string, fn: () => void) => {
if (event === 'update') {
handleUpdate = fn;
} else if (event === 'subdocs') {
handleSubdocs = fn;
}
};
workspace.doc.off = offHandler;
provider.connect();
provider.disconnect();
expect(triggerDBUpdate).toBe(null);
expect(offHandler).toBeCalledWith('update', handleUpdate);
expect(offHandler).toBeCalledWith('subdocs', handleSubdocs);
});
});

View File

@@ -0,0 +1,101 @@
import { DebugLogger } from '@affine/debug';
import {
applyAwarenessUpdate,
type Awareness,
encodeAwarenessUpdate,
removeAwarenessStates,
} from 'y-protocols/awareness';
import { getIoManager } from '../../utils/affine-io';
import { base64ToUint8Array, uint8ArrayToBase64 } from '../../utils/base64';
import type { AwarenessProvider } from '..';
const logger = new DebugLogger('affine:awareness:socketio');
export type AwarenessChanges = Record<
'added' | 'updated' | 'removed',
number[]
>;
export function createAffineAwarenessProvider(
workspaceId: string,
awareness: Awareness
): AwarenessProvider {
const socket = getIoManager().socket('/');
const awarenessBroadcast = ({
workspaceId,
awarenessUpdate,
}: {
workspaceId: string;
awarenessUpdate: string;
}) => {
if (workspaceId !== workspaceId) {
return;
}
applyAwarenessUpdate(
awareness,
base64ToUint8Array(awarenessUpdate),
'remote'
);
};
const awarenessUpdate = (changes: AwarenessChanges, origin: unknown) => {
if (origin === 'remote') {
return;
}
const changedClients = Object.values(changes).reduce((res, cur) => [
...res,
...cur,
]);
const update = encodeAwarenessUpdate(awareness, changedClients);
uint8ArrayToBase64(update)
.then(encodedUpdate => {
socket.emit('awareness-update', {
workspaceId: workspaceId,
awarenessUpdate: encodedUpdate,
});
})
.catch(err => logger.error(err));
};
const newClientAwarenessInitHandler = () => {
const awarenessUpdate = encodeAwarenessUpdate(awareness, [
awareness.clientID,
]);
uint8ArrayToBase64(awarenessUpdate)
.then(encodedAwarenessUpdate => {
socket.emit('awareness-update', {
guid: workspaceId,
awarenessUpdate: encodedAwarenessUpdate,
});
})
.catch(err => logger.error(err));
};
const windowBeforeUnloadHandler = () => {
removeAwarenessStates(awareness, [awareness.clientID], 'window unload');
};
return {
connect: () => {
socket.on('server-awareness-broadcast', awarenessBroadcast);
socket.on('new-client-awareness-init', newClientAwarenessInitHandler);
awareness.on('update', awarenessUpdate);
window.addEventListener('beforeunload', windowBeforeUnloadHandler);
socket.emit('awareness-init', workspaceId);
socket.connect();
},
disconnect: () => {
awareness.off('update', awarenessUpdate);
socket.off('server-awareness-broadcast', awarenessBroadcast);
socket.off('new-client-awareness-init', newClientAwarenessInitHandler);
window.removeEventListener('unload', windowBeforeUnloadHandler);
socket.disconnect();
},
};
}

View File

@@ -0,0 +1,63 @@
import type { Awareness } from 'y-protocols/awareness.js';
import {
applyAwarenessUpdate,
encodeAwarenessUpdate,
} from 'y-protocols/awareness.js';
import type { AwarenessProvider } from '..';
import type { AwarenessChanges } from '../affine';
type ChannelMessage =
| { type: 'connect' }
| { type: 'update'; update: Uint8Array };
export function createBroadcastChannelAwarenessProvider(
workspaceId: string,
awareness: Awareness
): AwarenessProvider {
const channel = new BroadcastChannel('awareness:' + workspaceId);
function handleAwarenessUpdate(changes: AwarenessChanges, origin: unknown) {
if (origin === 'remote') {
return;
}
const changedClients = Object.values(changes).reduce((res, cur) => [
...res,
...cur,
]);
const update = encodeAwarenessUpdate(awareness, changedClients);
channel.postMessage({
type: 'update',
update: update,
} satisfies ChannelMessage);
}
function handleChannelMessage(event: MessageEvent<ChannelMessage>) {
if (event.data.type === 'update') {
const update = event.data.update;
applyAwarenessUpdate(awareness, update, 'remote');
}
if (event.data.type === 'connect') {
channel.postMessage({
type: 'update',
update: encodeAwarenessUpdate(awareness, [awareness.clientID]),
} satisfies ChannelMessage);
}
}
return {
connect() {
channel.postMessage({
type: 'connect',
} satisfies ChannelMessage);
awareness.on('update', handleAwarenessUpdate);
channel.addEventListener('message', handleChannelMessage);
},
disconnect() {
awareness.off('update', handleAwarenessUpdate);
channel.removeEventListener('message', handleChannelMessage);
},
};
}

View File

@@ -0,0 +1,7 @@
export interface AwarenessProvider {
connect(): void;
disconnect(): void;
}
export * from './affine';
export * from './broadcast-channel';

View File

@@ -1,110 +0,0 @@
import { DebugLogger } from '@affine/debug';
import { fetchWithTraceReport } from '@affine/graphql';
import type { ActiveDocProvider, DocProviderCreator } from '@blocksuite/store';
import { Workspace } from '@blocksuite/store';
import type { Doc } from 'yjs';
const Y = Workspace.Y;
const logger = new DebugLogger('affine:cloud');
const hashMap = new Map<string, ArrayBuffer>();
type DocPublishMode = 'edgeless' | 'page';
export type CloudDoc = {
arrayBuffer: ArrayBuffer;
publishMode: DocPublishMode;
};
export async function downloadBinaryFromCloud(
rootGuid: string,
pageGuid: string
): Promise<CloudDoc | boolean> {
if (hashMap.has(`${rootGuid}/${pageGuid}`)) {
return true;
}
const response = await fetchWithTraceReport(
runtimeConfig.serverUrlPrefix +
`/api/workspaces/${rootGuid}/docs/${pageGuid}`,
{
priority: 'high',
}
);
if (response.ok) {
const publishMode = (response.headers.get('publish-mode') ||
'page') as DocPublishMode;
const arrayBuffer = await response.arrayBuffer();
hashMap.set(`${rootGuid}/${pageGuid}`, arrayBuffer);
// return both arrayBuffer and publish mode
return { arrayBuffer, publishMode };
}
return false;
}
async function downloadBinary(rootGuid: string, doc: Doc) {
const response = await downloadBinaryFromCloud(rootGuid, doc.guid);
if (typeof response !== 'boolean') {
const { arrayBuffer } = response;
Y.applyUpdate(doc, new Uint8Array(arrayBuffer), 'affine-cloud');
}
}
export const createCloudDownloadProvider: DocProviderCreator = (
id,
doc
): ActiveDocProvider => {
let _resolve: () => void;
let _reject: (error: unknown) => void;
const promise = new Promise<void>((resolve, reject) => {
_resolve = resolve;
_reject = reject;
});
return {
flavour: 'affine-cloud-download',
active: true,
sync() {
downloadBinary(id, doc)
.then(() => {
logger.info(`Downloaded ${id}`);
_resolve();
})
.catch(_reject);
},
get whenReady() {
return promise;
},
};
};
export const createMergeCloudSnapshotProvider: DocProviderCreator = (
id,
doc
): ActiveDocProvider => {
let _resolve: () => void;
const promise = new Promise<void>(resolve => {
_resolve = resolve;
});
return {
flavour: 'affine-cloud-merge-snapshot',
active: true,
sync() {
downloadBinary(id, doc)
.then(() => {
logger.info(`Downloaded ${id}`);
_resolve();
})
// ignore error
.catch(e => {
console.error(e);
_resolve();
});
},
get whenReady() {
return promise;
},
};
};

View File

@@ -1,160 +1,128 @@
import { DebugLogger } from '@affine/debug';
import type {
AffineSocketIOProvider,
LocalIndexedDBBackgroundProvider,
LocalIndexedDBDownloadProvider,
} from '@affine/env/workspace';
import { assertExists } from '@blocksuite/global/utils';
/**
* The `Provider` is responsible for sync `Y.Doc` with the local database and the Affine Cloud, serving as the source of
* Affine's local-first collaborative magic.
*
* When Affine boot, the `Provider` is tasked with reading content from the local database and loading it into the
* workspace, continuously storing any changes made by the user into the local database.
*
* When using Affine Cloud, the `Provider` also handles sync content with the Cloud.
*
* Additionally, the `Provider` is responsible for implementing a local-first capability, allowing users to edit offline
* with changes stored in the local database and sync with the Cloud when the network is restored.
*/
import type { DocProviderCreator } from '@blocksuite/store';
import { Workspace } from '@blocksuite/store';
import { createBroadcastChannelProvider } from '@blocksuite/store/providers/broadcast-channel';
import {
createIndexedDBDatasource,
createIndexedDBProvider as create,
} from '@toeverything/y-indexeddb';
import { createLazyProvider } from 'y-provider';
import { encodeStateVector } from 'yjs';
createAffineAwarenessProvider,
createBroadcastChannelAwarenessProvider,
} from './awareness';
import { createAffineStorage } from './storage/affine';
import { createIndexedDBStorage } from './storage/indexeddb';
import { createSQLiteStorage } from './storage/sqlite';
import { SyncEngine } from './sync';
import { createAffineDataSource } from '../affine';
import {
createCloudDownloadProvider,
createMergeCloudSnapshotProvider,
downloadBinaryFromCloud,
} from './cloud';
import {
createSQLiteDBDownloadProvider,
createSQLiteProvider,
} from './sqlite-providers';
const Y = Workspace.Y;
const logger = new DebugLogger('indexeddb-provider');
const createAffineSocketIOProvider: DocProviderCreator = (
id,
doc,
{ awareness }
): AffineSocketIOProvider => {
const dataSource = createAffineDataSource(id, doc, awareness);
const lazyProvider = createLazyProvider(doc, dataSource, {
origin: 'affine-socket-io',
});
Object.assign(lazyProvider, { flavour: 'affine-socket-io' });
return lazyProvider as unknown as AffineSocketIOProvider;
};
const createIndexedDBBackgroundProvider: DocProviderCreator = (
id,
blockSuiteWorkspace
): LocalIndexedDBBackgroundProvider => {
const indexeddbProvider = create(blockSuiteWorkspace);
let connected = false;
return {
flavour: 'local-indexeddb-background',
datasource: indexeddbProvider.datasource,
passive: true,
get status() {
return indexeddbProvider.status;
},
subscribeStatusChange: indexeddbProvider.subscribeStatusChange,
get connected() {
return connected;
},
cleanup: () => {
indexeddbProvider.cleanup().catch(console.error);
},
connect: () => {
logger.info('connect indexeddb provider', id);
indexeddbProvider.connect();
},
disconnect: () => {
assertExists(indexeddbProvider);
logger.info('disconnect indexeddb provider', id);
indexeddbProvider.disconnect();
connected = false;
},
};
};
const indexedDBDownloadOrigin = 'indexeddb-download-provider';
const createIndexedDBDownloadProvider: DocProviderCreator = (
id,
doc
): LocalIndexedDBDownloadProvider => {
const datasource = createIndexedDBDatasource({});
let _resolve: () => void;
let _reject: (error: unknown) => void;
const promise = new Promise<void>((resolve, reject) => {
_resolve = resolve;
_reject = reject;
});
return {
flavour: 'local-indexeddb',
active: true,
get whenReady() {
return promise;
},
cleanup: () => {
// todo: cleanup data
},
sync: () => {
logger.info('sync indexeddb provider', id);
datasource
.queryDocState(doc.guid, {
stateVector: encodeStateVector(doc),
})
.then(docState => {
if (docState) {
Y.applyUpdate(doc, docState.missing, indexedDBDownloadOrigin);
}
_resolve();
})
.catch(_reject);
},
};
};
export {
createAffineSocketIOProvider,
createBroadcastChannelProvider,
createIndexedDBBackgroundProvider,
createIndexedDBDownloadProvider,
createSQLiteDBDownloadProvider,
createSQLiteProvider,
downloadBinaryFromCloud,
};
export * from './sync';
export const createLocalProviders = (): DocProviderCreator[] => {
const providers = [
createIndexedDBBackgroundProvider,
createIndexedDBDownloadProvider,
] as DocProviderCreator[];
return [
(_, doc, { awareness }) => {
const engine = new SyncEngine(
doc,
environment.isDesktop
? createSQLiteStorage(doc.guid)
: createIndexedDBStorage(doc.guid),
[]
);
if (runtimeConfig.enableBroadcastChannelProvider) {
providers.push(createBroadcastChannelProvider);
}
const awarenessProviders = [
createBroadcastChannelAwarenessProvider(doc.guid, awareness),
];
if (environment.isDesktop && runtimeConfig.enableSQLiteProvider) {
providers.push(createSQLiteProvider, createSQLiteDBDownloadProvider);
}
let connected = false;
return providers;
return {
flavour: '_',
passive: true,
active: true,
sync() {
if (!connected) {
engine.start();
for (const provider of awarenessProviders) {
provider.connect();
}
connected = true;
}
},
get whenReady() {
return engine.waitForLoadedRootDoc();
},
connect() {
// TODO: actually connect
},
disconnect() {
// TODO: actually disconnect
},
get connected() {
return connected;
},
engine,
};
},
];
};
export const createAffineProviders = (): DocProviderCreator[] => {
return (
[
...createLocalProviders(),
runtimeConfig.enableCloud && createAffineSocketIOProvider,
runtimeConfig.enableCloud && createMergeCloudSnapshotProvider,
] as DocProviderCreator[]
).filter(v => Boolean(v));
return [
(_, doc, { awareness }) => {
const engine = new SyncEngine(
doc,
environment.isDesktop
? createSQLiteStorage(doc.guid)
: createIndexedDBStorage(doc.guid),
[createAffineStorage(doc.guid)]
);
const awarenessProviders = [
createBroadcastChannelAwarenessProvider(doc.guid, awareness),
createAffineAwarenessProvider(doc.guid, awareness),
];
let connected = false;
return {
flavour: '_',
passive: true,
active: true,
sync() {
if (!connected) {
engine.start();
for (const provider of awarenessProviders) {
provider.connect();
}
connected = true;
}
},
get whenReady() {
return engine.waitForLoadedRootDoc();
},
connect() {
// TODO: actually connect
},
disconnect() {
// TODO: actually disconnect
},
get connected() {
return connected;
},
engine,
};
},
];
};
export const createAffinePublicProviders = (): DocProviderCreator[] => {
return [createCloudDownloadProvider];
return [];
};

View File

@@ -1,3 +0,0 @@
import { DebugLogger } from '@affine/debug';
export const localProviderLogger = new DebugLogger('local-provider');

View File

@@ -1,133 +0,0 @@
import type {
SQLiteDBDownloadProvider,
SQLiteProvider,
} from '@affine/env/workspace';
import { assertExists } from '@blocksuite/global/utils';
import type { DocProviderCreator } from '@blocksuite/store';
import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { createLazyProvider, type DocDataSource } from 'y-provider';
import type { Doc } from 'yjs';
import { localProviderLogger as logger } from './logger';
const Y = BlockSuiteWorkspace.Y;
const sqliteOrigin = 'sqlite-provider-origin';
const createDatasource = (workspaceId: string): DocDataSource => {
if (!window.apis?.db) {
throw new Error('sqlite datasource is not available');
}
return {
queryDocState: async guid => {
const update = await window.apis.db.getDocAsUpdates(
workspaceId,
workspaceId === guid ? undefined : guid
);
if (update) {
return {
missing: update,
};
}
return false;
},
sendDocUpdate: async (guid, update) => {
return window.apis.db.applyDocUpdate(
workspaceId,
update,
workspaceId === guid ? undefined : guid
);
},
};
};
/**
* A provider that is responsible for syncing updates the workspace with the local SQLite database.
*/
export const createSQLiteProvider: DocProviderCreator = (
id,
rootDoc
): SQLiteProvider => {
const datasource = createDatasource(id);
let provider: ReturnType<typeof createLazyProvider> | null = null;
let connected = false;
return {
flavour: 'sqlite',
datasource,
passive: true,
get status() {
assertExists(provider);
return provider.status;
},
subscribeStatusChange(onStatusChange) {
assertExists(provider);
return provider.subscribeStatusChange(onStatusChange);
},
connect: () => {
provider = createLazyProvider(rootDoc, datasource, { origin: 'sqlite' });
provider.connect();
connected = true;
},
disconnect: () => {
provider?.disconnect();
provider = null;
connected = false;
},
get connected() {
return connected;
},
};
};
/**
* A provider that is responsible for DOWNLOADING updates from the local SQLite database.
*/
export const createSQLiteDBDownloadProvider: DocProviderCreator = (
id,
rootDoc
): SQLiteDBDownloadProvider => {
const { apis } = window;
let _resolve: () => void;
let _reject: (error: unknown) => void;
const promise = new Promise<void>((resolve, reject) => {
_resolve = resolve;
_reject = reject;
});
async function syncUpdates(doc: Doc) {
logger.info('syncing updates from sqlite', doc.guid);
const subdocId = doc.guid === id ? undefined : doc.guid;
const updates = await apis.db.getDocAsUpdates(id, subdocId);
if (updates) {
Y.applyUpdate(doc, updates, sqliteOrigin);
}
return true;
}
return {
flavour: 'sqlite-download',
active: true,
get whenReady() {
return promise;
},
cleanup: () => {
// todo
},
sync: () => {
logger.info('connect sqlite download provider', id);
syncUpdates(rootDoc)
.then(() => {
_resolve();
})
.catch(error => {
_reject(error);
});
},
};
};

View File

@@ -0,0 +1,162 @@
import { DebugLogger } from '@affine/debug';
import { getIoManager } from '../../utils/affine-io';
import { base64ToUint8Array, uint8ArrayToBase64 } from '../../utils/base64';
import type { Storage } from '..';
import { MultipleBatchSyncSender } from './batch-sync-sender';
const logger = new DebugLogger('affine:storage:socketio');
export function createAffineStorage(
workspaceId: string
): Storage & { disconnect: () => void } {
logger.debug('createAffineStorage', workspaceId);
const socket = getIoManager().socket('/');
const syncSender = new MultipleBatchSyncSender(async (guid, updates) => {
const payload = await Promise.all(
updates.map(update => uint8ArrayToBase64(update))
);
return new Promise(resolve => {
socket.emit(
'client-update-v2',
{
workspaceId,
guid,
updates: payload,
},
(response: {
// TODO: reuse `EventError` with server
error?: any;
data: any;
}) => {
// TODO: raise error with different code to users
if (response.error) {
logger.error('client-update-v2 error', {
workspaceId,
guid,
response,
});
}
resolve({
accepted: !response.error,
// TODO: reuse `EventError` with server
retry: response.error?.code === 'INTERNAL',
});
}
);
});
});
// TODO: handle error
socket.on('connect', () => {
socket.emit(
'client-handshake',
workspaceId,
(response: { error?: any }) => {
if (!response.error) {
syncSender.start();
}
}
);
});
socket.connect();
return {
name: 'socketio',
async pull(docId, state) {
const stateVector = state ? await uint8ArrayToBase64(state) : undefined;
return new Promise((resolve, reject) => {
logger.debug('doc-load-v2', {
workspaceId: workspaceId,
guid: docId,
stateVector,
});
socket.emit(
'doc-load-v2',
{
workspaceId: workspaceId,
guid: docId,
stateVector,
},
(
response: // TODO: reuse `EventError` with server
{ error: any } | { data: { missing: string; state: string } }
) => {
logger.debug('doc-load callback', {
workspaceId: workspaceId,
guid: docId,
stateVector,
response,
});
if ('error' in response) {
// TODO: result `EventError` with server
if (response.error.code === 'DOC_NOT_FOUND') {
resolve(null);
} else {
reject(new Error(response.error.message));
}
} else {
resolve({
data: base64ToUint8Array(response.data.missing),
state: response.data.state
? base64ToUint8Array(response.data.state)
: undefined,
});
}
}
);
});
},
async push(docId, update) {
logger.debug('client-update-v2', {
workspaceId,
guid: docId,
update,
});
await syncSender.send(docId, update);
},
async subscribe(cb, disconnect) {
const response: { error?: any } = await socket
.timeout(10000)
.emitWithAck('client-handshake', workspaceId);
if (response.error) {
throw new Error('client-handshake error, ' + response.error);
}
const handleUpdate = async (message: {
workspaceId: string;
guid: string;
updates: string[];
}) => {
if (message.workspaceId === workspaceId) {
message.updates.forEach(update => {
cb(message.guid, base64ToUint8Array(update));
});
}
};
socket.on('server-updates', handleUpdate);
socket.on('disconnect', reason => {
socket.off('server-updates', handleUpdate);
disconnect(reason);
});
return () => {
socket.off('server-updates', handleUpdate);
};
},
disconnect() {
syncSender.stop();
socket.emit('client-leave', workspaceId);
socket.disconnect();
},
};
}

View File

@@ -0,0 +1,29 @@
export interface Storage {
/**
* for debug
*/
name: string;
pull(
docId: string,
state: Uint8Array
): Promise<{ data: Uint8Array; state?: Uint8Array } | null>;
push(docId: string, data: Uint8Array): Promise<void>;
/**
* Subscribe to updates from peer
*
* @param cb callback to handle updates
* @param disconnect callback to handle disconnect, reason can be something like 'network-error'
*
* @returns unsubscribe function
*/
subscribe(
cb: (docId: string, data: Uint8Array) => void,
disconnect: (reason: string) => void
): Promise<() => void>;
}
export * from './affine';
export * from './indexeddb';
export * from './sqlite';

View File

@@ -0,0 +1,133 @@
import { type DBSchema, type IDBPDatabase, openDB } from 'idb';
import {
applyUpdate,
diffUpdate,
Doc,
encodeStateAsUpdate,
encodeStateVectorFromUpdate,
} from 'yjs';
import type { Storage } from '..';
export const dbVersion = 1;
export const DEFAULT_DB_NAME = 'affine-local';
export function mergeUpdates(updates: Uint8Array[]) {
const doc = new Doc();
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update);
});
});
return encodeStateAsUpdate(doc);
}
type UpdateMessage = {
timestamp: number;
update: Uint8Array;
};
type WorkspacePersist = {
id: string;
updates: UpdateMessage[];
};
interface BlockSuiteBinaryDB extends DBSchema {
workspace: {
key: string;
value: WorkspacePersist;
};
milestone: {
key: string;
value: unknown;
};
}
export function upgradeDB(db: IDBPDatabase<BlockSuiteBinaryDB>) {
db.createObjectStore('workspace', { keyPath: 'id' });
db.createObjectStore('milestone', { keyPath: 'id' });
}
type ChannelMessage = {
type: 'db-updated';
payload: { docId: string; update: Uint8Array };
};
export function createIndexedDBStorage(
workspaceId: string,
dbName = DEFAULT_DB_NAME,
mergeCount = 1
): Storage {
let dbPromise: Promise<IDBPDatabase<BlockSuiteBinaryDB>> | null = null;
const getDb = async () => {
if (dbPromise === null) {
dbPromise = openDB<BlockSuiteBinaryDB>(dbName, dbVersion, {
upgrade: upgradeDB,
});
}
return dbPromise;
};
// indexeddb could be shared between tabs, so we use broadcast channel to notify other tabs
const channel = new BroadcastChannel('indexeddb:' + workspaceId);
return {
name: 'indexeddb',
async pull(docId, state) {
const db = await getDb();
const store = db
.transaction('workspace', 'readonly')
.objectStore('workspace');
const data = await store.get(docId);
if (!data) {
return null;
}
const { updates } = data;
const update = mergeUpdates(updates.map(({ update }) => update));
const diff = state ? diffUpdate(update, state) : update;
return { data: diff, state: encodeStateVectorFromUpdate(update) };
},
async push(docId, update) {
const db = await getDb();
const store = db
.transaction('workspace', 'readwrite')
.objectStore('workspace');
// TODO: maybe we do not need to get data every time
const { updates } = (await store.get(docId)) ?? { updates: [] };
let rows: UpdateMessage[] = [
...updates,
{ timestamp: Date.now(), update },
];
if (mergeCount && rows.length >= mergeCount) {
const merged = mergeUpdates(rows.map(({ update }) => update));
rows = [{ timestamp: Date.now(), update: merged }];
}
await store.put({
id: docId,
updates: rows,
});
channel.postMessage({
type: 'db-updated',
payload: { docId, update },
} satisfies ChannelMessage);
},
async subscribe(cb, _disconnect) {
function onMessage(event: MessageEvent<ChannelMessage>) {
const { type, payload } = event.data;
if (type === 'db-updated') {
const { docId, update } = payload;
cb(docId, update);
}
}
channel.addEventListener('message', onMessage);
return () => {
channel.removeEventListener('message', onMessage);
};
},
};
}

View File

@@ -0,0 +1,38 @@
import { encodeStateVectorFromUpdate } from 'yjs';
import type { Storage } from '..';
export function createSQLiteStorage(workspaceId: string): Storage {
if (!window.apis?.db) {
throw new Error('sqlite datasource is not available');
}
return {
name: 'sqlite',
async pull(docId, _state) {
const update = await window.apis.db.getDocAsUpdates(
workspaceId,
workspaceId === docId ? undefined : docId
);
if (update) {
return {
data: update,
state: encodeStateVectorFromUpdate(update),
};
}
return null;
},
async push(docId, data) {
return window.apis.db.applyDocUpdate(
workspaceId,
data,
workspaceId === docId ? undefined : docId
);
},
async subscribe(_cb, _disconnect) {
return () => {};
},
};
}

View File

@@ -0,0 +1,62 @@
import 'fake-indexeddb/auto';
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import { Schema, Workspace } from '@blocksuite/store';
import { describe, expect, test } from 'vitest';
import { createIndexedDBStorage } from '../../storage';
import { SyncPeer } from '../';
const schema = new Schema();
schema.register(AffineSchemas).register(__unstableSchemas);
describe('sync', () => {
test('basic - indexeddb', async () => {
let prev: any;
{
const workspace = new Workspace({
id: 'test',
isSSR: true,
schema,
});
const syncPeer = new SyncPeer(
workspace.doc,
createIndexedDBStorage(workspace.doc.guid)
);
await syncPeer.waitForLoaded();
const page = workspace.createPage({
id: 'page0',
});
await page.waitForLoaded();
const pageBlockId = page.addBlock('affine:page', {
title: new page.Text(''),
});
page.addBlock('affine:surface', {}, pageBlockId);
const frameId = page.addBlock('affine:note', {}, pageBlockId);
page.addBlock('affine:paragraph', {}, frameId);
await syncPeer.waitForSynced();
syncPeer.stop();
prev = workspace.doc.toJSON();
}
{
const workspace = new Workspace({
id: 'test',
isSSR: true,
schema,
});
const syncPeer = new SyncPeer(
workspace.doc,
createIndexedDBStorage(workspace.doc.guid)
);
await syncPeer.waitForSynced();
expect(workspace.doc.toJSON()).toEqual({
...prev,
});
syncPeer.stop();
}
});
});

View File

@@ -0,0 +1,225 @@
import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import type { Storage } from '../storage';
import { SyncPeer, SyncPeerStatus } from './peer';
export const MANUALLY_STOP = 'manually-stop';
/**
* # SyncEngine
*
* ┌────────────┐
* │ SyncEngine │
* └─────┬──────┘
* │
* ▼
* ┌────────────┐
* │ SyncPeer │
* ┌─────────┤ local ├─────────┐
* │ └─────┬──────┘ │
* │ │ │
* ▼ ▼ ▼
* ┌────────────┐ ┌────────────┐ ┌────────────┐
* │ SyncPeer │ │ SyncPeer │ │ SyncPeer │
* │ Remote │ │ Remote │ │ Remote │
* └────────────┘ └────────────┘ └────────────┘
*
* Sync engine manage sync peers
*
* Sync steps:
* 1. start local sync
* 2. wait for local sync complete
* 3. start remote sync
* 4. continuously sync local and remote
*/
export enum SyncEngineStatus {
Stopped = 0,
Retrying = 1,
LoadingRootDoc = 2,
LoadingSubDoc = 3,
Syncing = 4,
Synced = 5,
}
export class SyncEngine {
get rootDocId() {
return this.rootDoc.guid;
}
logger = new DebugLogger('affine:sync-engine:' + this.rootDocId);
private _status = SyncEngineStatus.Stopped;
onStatusChange = new Slot<SyncEngineStatus>();
private set status(s: SyncEngineStatus) {
if (s !== this._status) {
this.logger.info('status change', SyncEngineStatus[s]);
this._status = s;
this.onStatusChange.emit(s);
}
}
get status() {
return this._status;
}
private abort = new AbortController();
constructor(
private rootDoc: Doc,
private local: Storage,
private remotes: Storage[]
) {}
start() {
if (this.status !== SyncEngineStatus.Stopped) {
this.stop();
}
this.abort = new AbortController();
this.status = SyncEngineStatus.LoadingRootDoc;
this.sync(this.abort.signal).catch(err => {
// should never reach here
this.logger.error(err);
});
}
stop() {
this.abort.abort(MANUALLY_STOP);
this.status = SyncEngineStatus.Stopped;
}
// main sync process, should never return until abort
async sync(signal: AbortSignal) {
let localPeer: SyncPeer | null = null;
const remotePeers: SyncPeer[] = [];
const cleanUp: (() => void)[] = [];
try {
// Step 1: start local sync peer
localPeer = new SyncPeer(this.rootDoc, this.local);
// Step 2: wait for local sync complete
await localPeer.waitForLoaded(signal);
// Step 3: start remote sync peer
remotePeers.push(
...this.remotes.map(remote => new SyncPeer(this.rootDoc, remote))
);
const peers = [localPeer, ...remotePeers];
this.updateSyncingState(peers);
for (const peer of peers) {
cleanUp.push(
peer.onStatusChange.on(() => {
if (!signal.aborted) this.updateSyncingState(peers);
}).dispose
);
}
// Step 4: continuously sync local and remote
// wait for abort
await new Promise((_, reject) => {
if (signal.aborted) {
reject(signal.reason);
}
signal.addEventListener('abort', () => {
reject(signal.reason);
});
});
} catch (error) {
if (error === MANUALLY_STOP) {
return;
}
throw error;
} finally {
// stop peers
localPeer?.stop();
for (const remotePeer of remotePeers) {
remotePeer.stop();
}
for (const clean of cleanUp) {
clean();
}
}
}
updateSyncingState(peers: SyncPeer[]) {
let status = SyncEngineStatus.Synced;
for (const peer of peers) {
if (peer.status !== SyncPeerStatus.Synced) {
status = SyncEngineStatus.Syncing;
break;
}
}
for (const peer of peers) {
if (peer.status === SyncPeerStatus.LoadingSubDoc) {
status = SyncEngineStatus.LoadingSubDoc;
break;
}
}
for (const peer of peers) {
if (peer.status === SyncPeerStatus.LoadingRootDoc) {
status = SyncEngineStatus.LoadingRootDoc;
break;
}
}
for (const peer of peers) {
if (peer.status === SyncPeerStatus.Retrying) {
status = SyncEngineStatus.Retrying;
break;
}
}
this.status = status;
}
async waitForSynced(abort?: AbortSignal) {
if (this.status == SyncEngineStatus.Synced) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status == SyncEngineStatus.Synced) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
async waitForLoadedRootDoc(abort?: AbortSignal) {
if (this.status > SyncEngineStatus.LoadingRootDoc) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status > SyncEngineStatus.LoadingRootDoc) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
}

View File

@@ -0,0 +1,18 @@
/**
*
* **SyncEngine**
*
* Manages one local storage and multiple remote storages.
*
* Responsible for creating SyncPeers for synchronization, following the local-first strategy.
*
* **SyncPeer**
*
* Responsible for synchronizing a single storage with Y.Doc.
*
* Carries the main synchronization logic.
*
*/
export * from './engine';
export * from './peer';

View File

@@ -0,0 +1,397 @@
import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs';
import type { Storage } from '../storage';
import { AsyncQueue } from '../utils/async-queue';
import { throwIfAborted } from '../utils/throw-if-aborted';
import { MANUALLY_STOP } from './engine';
/**
* # SyncPeer
* A SyncPeer is responsible for syncing one Storage with one Y.Doc and its subdocs.
*
* ┌─────┐
* │Start│
* └──┬──┘
* │
* ┌──────┐ ┌─────▼──────┐ ┌────┐
* │listen◄─────┤pull rootdoc│ │peer│
* └──┬───┘ └─────┬──────┘ └──┬─┘
* │ │ onLoad() │
* ┌──▼───┐ ┌─────▼──────┐ ┌────▼────┐
* │listen◄─────┤pull subdocs│ │subscribe│
* └──┬───┘ └─────┬──────┘ └────┬────┘
* │ │ onReady() │
* ┌──▼──┐ ┌─────▼───────┐ ┌──▼──┐
* │queue├──────►apply updates◄───────┤queue│
* └─────┘ └─────────────┘ └─────┘
*
* listen: listen for updates from ydoc, typically from user modifications.
* subscribe: listen for updates from storage, typically from other users.
*
*/
export enum SyncPeerStatus {
Stopped = 0,
Retrying = 1,
LoadingRootDoc = 2,
LoadingSubDoc = 3,
Loaded = 4.5,
Syncing = 5,
Synced = 6,
}
export class SyncPeer {
private _status = SyncPeerStatus.Stopped;
onStatusChange = new Slot<SyncPeerStatus>();
abort = new AbortController();
get name() {
return this.storage.name;
}
logger = new DebugLogger('affine:sync-peer:' + this.name);
constructor(
private rootDoc: Doc,
private storage: Storage
) {
this.logger.debug('peer start');
this.status = SyncPeerStatus.LoadingRootDoc;
this.syncRetryLoop(this.abort.signal).catch(err => {
// should not reach here
console.error(err);
});
}
private set status(s: SyncPeerStatus) {
if (s !== this._status) {
this.logger.debug('status change', SyncPeerStatus[s]);
this._status = s;
this.onStatusChange.emit(s);
}
}
get status() {
return this._status;
}
/**
* stop sync
*
* SyncPeer is one-time use, this peer should be discarded after call stop().
*/
stop() {
this.logger.debug('peer stop');
this.abort.abort(MANUALLY_STOP);
}
/**
* auto retry after 5 seconds if sync failed
*/
async syncRetryLoop(abort: AbortSignal) {
while (abort.aborted === false) {
try {
await this.sync(abort);
} catch (err) {
if (err === MANUALLY_STOP) {
return;
}
this.logger.error('sync error', err);
}
try {
this.logger.error('retry after 5 seconds');
this.status = SyncPeerStatus.Retrying;
await Promise.race([
new Promise<void>(resolve => {
setTimeout(resolve, 5 * 1000);
}),
new Promise((_, reject) => {
// exit if manually stopped
if (abort.aborted) {
reject(abort.reason);
}
abort.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
} catch (err) {
if (err === MANUALLY_STOP) {
return;
}
// should never reach here
throw err;
}
}
}
private state: {
connectedDocs: Map<string, Doc>;
pushUpdatesQueue: AsyncQueue<{
docId: string;
data: Uint8Array;
}>;
pullUpdatesQueue: AsyncQueue<{
docId: string;
data: Uint8Array;
}>;
subdocsLoadQueue: AsyncQueue<Doc>;
} = {
connectedDocs: new Map(),
pushUpdatesQueue: new AsyncQueue(),
pullUpdatesQueue: new AsyncQueue(),
subdocsLoadQueue: new AsyncQueue(),
};
/**
* main synchronization logic
*/
async sync(abortOuter: AbortSignal) {
const abortInner = new AbortController();
abortOuter.addEventListener('abort', reason => {
abortInner.abort(reason);
});
let dispose: (() => void) | null = null;
try {
// start listen storage updates
dispose = await this.storage.subscribe(
this.handleStorageUpdates,
reason => {
// abort if storage disconnect, should trigger retry loop
abortInner.abort('subscribe disconnect:' + reason);
}
);
throwIfAborted(abortInner.signal);
// Step 1: load root doc
this.status = SyncPeerStatus.LoadingRootDoc;
await this.connectDoc(this.rootDoc, abortInner.signal);
this.status = SyncPeerStatus.LoadingSubDoc;
// Step 2: load subdocs
this.state.subdocsLoadQueue.push(
...Array.from(this.rootDoc.getSubdocs())
);
this.rootDoc.on('subdocs', this.handleSubdocsUpdate);
while (this.state.subdocsLoadQueue.length > 0) {
const subdoc = await this.state.subdocsLoadQueue.next(
abortInner.signal
);
await this.connectDoc(subdoc, abortInner.signal);
}
this.status = SyncPeerStatus.Syncing;
this.updateSyncStatus();
// Finally: start sync
await Promise.all([
// listen subdocs
(async () => {
while (throwIfAborted(abortInner.signal)) {
const subdoc = await this.state.subdocsLoadQueue.next(
abortInner.signal
);
this.status = SyncPeerStatus.LoadingSubDoc;
await this.connectDoc(subdoc, abortInner.signal);
this.status = SyncPeerStatus.Syncing;
this.updateSyncStatus();
}
})(),
// pull updates
(async () => {
while (throwIfAborted(abortInner.signal)) {
const { docId, data } = await this.state.pullUpdatesQueue.next(
abortInner.signal
);
this.updateSyncStatus();
// don't apply empty data or Uint8Array([0, 0])
if (
!(
data.byteLength === 0 ||
(data.byteLength === 2 && data[0] === 0 && data[1] === 0)
)
) {
const subdoc = this.state.connectedDocs.get(docId);
if (subdoc) {
applyUpdate(subdoc, data, this.name);
}
}
}
})(),
// push updates
(async () => {
while (throwIfAborted(abortInner.signal)) {
const { docId, data } = await this.state.pushUpdatesQueue.next(
abortInner.signal
);
// don't push empty data or Uint8Array([0, 0])
if (
!(
data.byteLength === 0 ||
(data.byteLength === 2 && data[0] === 0 && data[1] === 0)
)
) {
await this.storage.push(docId, data);
}
this.updateSyncStatus();
}
})(),
]);
} finally {
dispose?.();
for (const docs of this.state.connectedDocs.values()) {
this.disconnectDoc(docs);
}
this.rootDoc.off('subdocs', this.handleSubdocsUpdate);
}
}
async connectDoc(doc: Doc, abort: AbortSignal) {
const { data: docData, state: inStorageState } =
(await this.storage.pull(doc.guid, encodeStateVector(doc))) ?? {};
throwIfAborted(abort);
if (docData) {
applyUpdate(doc, docData, 'load');
}
// diff root doc and in-storage, save updates to pendingUpdates
this.state.pushUpdatesQueue.push({
docId: doc.guid,
data: encodeStateAsUpdate(doc, inStorageState),
});
this.state.connectedDocs.set(doc.guid, doc);
// start listen root doc changes
doc.on('update', this.handleYDocUpdates);
// mark rootDoc as loaded
doc.emit('sync', [true]);
}
disconnectDoc(doc: Doc) {
doc.off('update', this.handleYDocUpdates);
this.state.connectedDocs.delete(doc.guid);
}
// handle updates from ydoc
handleYDocUpdates = (update: Uint8Array, origin: string, doc: Doc) => {
// don't push updates from storage
if (origin === this.name) {
return;
}
this.state.pushUpdatesQueue.push({
docId: doc.guid,
data: update,
});
this.updateSyncStatus();
};
// handle subdocs changes, append new subdocs to queue, remove subdocs from queue
handleSubdocsUpdate = ({
added,
removed,
}: {
added: Set<Doc>;
removed: Set<Doc>;
}) => {
for (const subdoc of added) {
this.state.subdocsLoadQueue.push(subdoc);
}
for (const subdoc of removed) {
this.disconnectDoc(subdoc);
this.state.subdocsLoadQueue.remove(doc => doc === subdoc);
}
this.updateSyncStatus();
};
// handle updates from storage
handleStorageUpdates = (docId: string, data: Uint8Array) => {
this.state.pullUpdatesQueue.push({
docId,
data,
});
this.updateSyncStatus();
};
updateSyncStatus() {
// if status is not syncing, do nothing
if (this.status < SyncPeerStatus.Syncing) {
return;
}
if (
this.state.pushUpdatesQueue.length === 0 &&
this.state.pullUpdatesQueue.length === 0 &&
this.state.subdocsLoadQueue.length === 0
) {
if (this.status === SyncPeerStatus.Syncing) {
this.status = SyncPeerStatus.Synced;
}
} else {
if (this.status === SyncPeerStatus.Synced) {
this.status = SyncPeerStatus.Syncing;
}
}
}
async waitForSynced(abort?: AbortSignal) {
if (this.status >= SyncPeerStatus.Synced) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status >= SyncPeerStatus.Synced) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
async waitForLoaded(abort?: AbortSignal) {
if (this.status > SyncPeerStatus.Loaded) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status > SyncPeerStatus.Loaded) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
}

View File

@@ -0,0 +1,45 @@
import { describe, expect, test, vi } from 'vitest';
import { AsyncQueue } from '../async-queue';
describe('async-queue', () => {
test('push & pop', async () => {
const queue = new AsyncQueue();
queue.push(1, 2, 3);
expect(queue.length).toBe(3);
expect(await queue.next()).toBe(1);
expect(await queue.next()).toBe(2);
expect(await queue.next()).toBe(3);
expect(queue.length).toBe(0);
});
test('await', async () => {
const queue = new AsyncQueue<number>();
queue.push(1, 2);
expect(await queue.next()).toBe(1);
expect(await queue.next()).toBe(2);
let v = -1;
// setup 2 pop tasks
queue.next().then(next => {
v = next;
});
queue.next().then(next => {
v = next;
});
// Wait for 100ms
await new Promise(resolve => setTimeout(resolve, 100));
// v should not be changed
expect(v).toBe(-1);
// push 3, should trigger the first pop task
queue.push(3);
await vi.waitFor(() => v === 3);
// push 4, should trigger the second pop task
queue.push(4);
await vi.waitFor(() => v === 4);
});
});

View File

@@ -0,0 +1,13 @@
import { describe, expect, test } from 'vitest';
import { throwIfAborted } from '../throw-if-aborted';
describe('throw-if-aborted', () => {
test('basic', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;
expect(throwIfAborted(abortSignal)).toBe(true);
abortController.abort('TEST_ABORT');
expect(() => throwIfAborted(abortSignal)).toThrowError('TEST_ABORT');
});
});

View File

@@ -0,0 +1,15 @@
import { Manager } from 'socket.io-client';
let ioManager: Manager | null = null;
// use lazy initialization socket.io io manager
export function getIoManager(): Manager {
if (ioManager) {
return ioManager;
}
ioManager = new Manager(runtimeConfig.serverUrlPrefix + '/', {
autoConnect: false,
transports: ['websocket'],
});
return ioManager;
}

View File

@@ -0,0 +1,58 @@
export class AsyncQueue<T> {
private _queue: T[];
private _resolveUpdate: (() => void) | null = null;
private _waitForUpdate: Promise<void> | null = null;
constructor(init: T[] = []) {
this._queue = init;
}
get length() {
return this._queue.length;
}
async next(abort?: AbortSignal): Promise<T> {
const update = this._queue.shift();
if (update) {
return update;
} else {
if (!this._waitForUpdate) {
this._waitForUpdate = new Promise(resolve => {
this._resolveUpdate = resolve;
});
}
await Promise.race([
this._waitForUpdate,
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
return this.next(abort);
}
}
push(...updates: T[]) {
this._queue.push(...updates);
if (this._resolveUpdate) {
const resolve = this._resolveUpdate;
this._resolveUpdate = null;
this._waitForUpdate = null;
resolve();
}
}
remove(predicate: (update: T) => boolean) {
const index = this._queue.findIndex(predicate);
if (index !== -1) {
this._queue.splice(index, 1);
}
}
}

View File

@@ -1,20 +1,3 @@
import type { Doc as YDoc } from 'yjs';
export type SubdocEvent = {
loaded: Set<YDoc>;
removed: Set<YDoc>;
added: Set<YDoc>;
};
export type UpdateHandler = (update: Uint8Array, origin: unknown) => void;
export type SubdocsHandler = (event: SubdocEvent) => void;
export type DestroyHandler = () => void;
export type AwarenessChanges = Record<
'added' | 'updated' | 'removed',
number[]
>;
export function uint8ArrayToBase64(array: Uint8Array): Promise<string> {
return new Promise<string>(resolve => {
// Create a blob from the Uint8Array

View File

@@ -0,0 +1,7 @@
// because AbortSignal.throwIfAborted is not available in abortcontroller-polyfill
export function throwIfAborted(abort?: AbortSignal) {
if (abort?.aborted) {
throw new Error(abort.reason);
}
return true;
}