refactor(workspace): split workspace interface and implementation (#5463)

@affine/workspace -> (@affine/workspace, @affine/workspace-impl)
This commit is contained in:
EYHN
2024-01-02 10:58:01 +00:00
parent 9d0b3b4947
commit 104c21d84c
77 changed files with 325 additions and 163 deletions

1
packages/common/workspace/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
lib

View File

@@ -0,0 +1,24 @@
{
"name": "@affine/workspace",
"private": true,
"main": "./src/index.ts",
"exports": {
".": "./src/index.ts"
},
"peerDependencies": {
"@blocksuite/blocks": "*",
"@blocksuite/global": "*",
"@blocksuite/store": "*"
},
"dependencies": {
"@affine/debug": "workspace:*",
"@affine/env": "workspace:*",
"@toeverything/infra": "workspace:*",
"lodash-es": "^4.17.21",
"yjs": "^13.6.10"
},
"devDependencies": {
"vitest": "1.1.0"
},
"version": "0.11.0"
}

View File

@@ -0,0 +1,4 @@
export interface AwarenessProvider {
connect(): void;
disconnect(): void;
}

View File

@@ -0,0 +1,202 @@
import { DebugLogger } from '@affine/debug';
import { difference } from 'lodash-es';
const logger = new DebugLogger('affine:blob-engine');
/**
* # BlobEngine
*
* sync blobs between storages in background.
*
* all operations priority use local, then use remote.
*/
export class BlobEngine {
private abort: AbortController | null = null;
constructor(
private readonly local: BlobStorage,
private readonly remotes: BlobStorage[]
) {}
start() {
if (this.abort) {
return;
}
this.abort = new AbortController();
const abortSignal = this.abort.signal;
const sync = () => {
if (abortSignal.aborted) {
return;
}
this.sync()
.catch(error => {
logger.error('sync blob error', error);
})
.finally(() => {
// sync every 1 minute
setTimeout(sync, 60000);
});
};
sync();
}
stop() {
this.abort?.abort();
this.abort = null;
}
get storages() {
return [this.local, ...this.remotes];
}
async sync() {
if (this.local.readonly) {
return;
}
logger.debug('start syncing blob...');
for (const remote of this.remotes) {
let localList: string[] = [];
let remoteList: string[] = [];
if (!remote.readonly) {
try {
localList = await this.local.list();
remoteList = await remote.list();
} catch (err) {
logger.error(`error when sync`, err);
continue;
}
const needUpload = difference(localList, remoteList);
for (const key of needUpload) {
try {
const data = await this.local.get(key);
if (data) {
await remote.set(key, data);
}
} catch (err) {
logger.error(
`error when sync ${key} from [${this.local.name}] to [${remote.name}]`,
err
);
}
}
}
const needDownload = difference(remoteList, localList);
for (const key of needDownload) {
try {
const data = await remote.get(key);
if (data) {
await this.local.set(key, data);
}
} catch (err) {
logger.error(
`error when sync ${key} from [${remote.name}] to [${this.local.name}]`,
err
);
}
}
}
logger.debug('finish syncing blob');
}
async get(key: string) {
logger.debug('get blob', key);
for (const storage of this.storages) {
const data = await storage.get(key);
if (data) {
return data;
}
}
return null;
}
async set(key: string, value: Blob) {
if (this.local.readonly) {
throw new Error('local peer is readonly');
}
// await upload to the local peer
await this.local.set(key, value);
// uploads to other peers in the background
Promise.allSettled(
this.remotes
.filter(r => !r.readonly)
.map(peer =>
peer.set(key, value).catch(err => {
logger.error('error when upload to peer', err);
})
)
)
.then(result => {
if (result.some(({ status }) => status === 'rejected')) {
logger.error(
`blob ${key} update finish, but some peers failed to update`
);
} else {
logger.debug(`blob ${key} update finish`);
}
})
.catch(() => {
// Promise.allSettled never reject
});
return key;
}
async delete(_key: string) {
// not supported
}
async list() {
const blobList = new Set<string>();
for (const peer of this.storages) {
const list = await peer.list();
if (list) {
for (const blob of list) {
blobList.add(blob);
}
}
}
return Array.from(blobList);
}
}
export interface BlobStorage {
name: string;
readonly: boolean;
get: (key: string) => Promise<Blob | null>;
set: (key: string, value: Blob) => Promise<string>;
delete: (key: string) => Promise<void>;
list: () => Promise<string[]>;
}
export function createMemoryBlobStorage() {
const map = new Map<string, Blob>();
return {
name: 'memory',
readonly: false,
async get(key: string) {
return map.get(key) ?? null;
},
async set(key: string, value: Blob) {
map.set(key, value);
return key;
},
async delete(key: string) {
map.delete(key);
},
async list() {
return Array.from(map.keys());
},
} satisfies BlobStorage;
}

View File

@@ -0,0 +1,74 @@
import { Slot } from '@blocksuite/global/utils';
import { throwIfAborted } from '../utils/throw-if-aborted';
import type { AwarenessProvider } from './awareness';
import type { BlobEngine } from './blob';
import type { SyncEngine, SyncEngineStatus } from './sync';
export interface WorkspaceEngineStatus {
sync: SyncEngineStatus;
}
/**
* # WorkspaceEngine
*
* sync ydoc, blob, awareness together
*/
export class WorkspaceEngine {
_status: WorkspaceEngineStatus;
onStatusChange = new Slot<WorkspaceEngineStatus>();
get status() {
return this._status;
}
set status(status: WorkspaceEngineStatus) {
this._status = status;
this.onStatusChange.emit(status);
}
constructor(
public blob: BlobEngine,
public sync: SyncEngine,
public awareness: AwarenessProvider[]
) {
this._status = {
sync: sync.status,
};
sync.onStatusChange.on(status => {
this.status = {
sync: status,
};
});
}
start() {
this.sync.start();
for (const awareness of this.awareness) {
awareness.connect();
}
this.blob.start();
}
canGracefulStop() {
return this.sync.canGracefulStop();
}
async waitForGracefulStop(abort?: AbortSignal) {
await this.sync.waitForGracefulStop(abort);
throwIfAborted(abort);
this.forceStop();
}
forceStop() {
this.sync.forceStop();
for (const awareness of this.awareness) {
awareness.disconnect();
}
this.blob.stop();
}
}
export * from './awareness';
export * from './blob';
export * from './sync';

View File

@@ -0,0 +1,15 @@
export enum SyncEngineStep {
Stopped = 0,
Syncing = 1,
Synced = 2,
}
export enum SyncPeerStep {
Stopped = 0,
Retrying = 1,
LoadingRootDoc = 2,
LoadingSubDoc = 3,
Loaded = 4.5,
Syncing = 5,
Synced = 6,
}

View File

@@ -0,0 +1,278 @@
import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import { SharedPriorityTarget } from '../../utils/async-queue';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';
import { SyncEngineStep, SyncPeerStep } from './consts';
import { SyncPeer, type SyncPeerStatus } from './peer';
import type { SyncStorage } from './storage';
export interface SyncEngineStatus {
step: SyncEngineStep;
local: SyncPeerStatus | null;
remotes: (SyncPeerStatus | null)[];
retrying: boolean;
}
/**
* # SyncEngine
*
* ```
* ┌────────────┐
* │ SyncEngine │
* └─────┬──────┘
* │
* ▼
* ┌────────────┐
* │ SyncPeer │
* ┌─────────┤ local ├─────────┐
* │ └─────┬──────┘ │
* │ │ │
* ▼ ▼ ▼
* ┌────────────┐ ┌────────────┐ ┌────────────┐
* │ SyncPeer │ │ SyncPeer │ │ SyncPeer │
* │ Remote │ │ Remote │ │ Remote │
* └────────────┘ └────────────┘ └────────────┘
* ```
*
* Sync engine manage sync peers
*
* Sync steps:
* 1. start local sync
* 2. wait for local sync complete
* 3. start remote sync
* 4. continuously sync local and remote
*/
export class SyncEngine {
get rootDocId() {
return this.rootDoc.guid;
}
logger = new DebugLogger('affine:sync-engine:' + this.rootDocId);
private _status: SyncEngineStatus;
onStatusChange = new Slot<SyncEngineStatus>();
private set status(s: SyncEngineStatus) {
this.logger.debug('status change', s);
this._status = s;
this.onStatusChange.emit(s);
}
priorityTarget = new SharedPriorityTarget();
get status() {
return this._status;
}
private abort = new AbortController();
constructor(
private readonly rootDoc: Doc,
private readonly local: SyncStorage,
private readonly remotes: SyncStorage[]
) {
this._status = {
step: SyncEngineStep.Stopped,
local: null,
remotes: remotes.map(() => null),
retrying: false,
};
}
start() {
if (this.status.step !== SyncEngineStep.Stopped) {
this.forceStop();
}
this.abort = new AbortController();
this.sync(this.abort.signal).catch(err => {
// should never reach here
this.logger.error(err);
});
}
canGracefulStop() {
return !!this.status.local && this.status.local.pendingPushUpdates === 0;
}
async waitForGracefulStop(abort?: AbortSignal) {
await Promise.race([
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
new Promise<void>(resolve => {
this.onStatusChange.on(() => {
if (this.canGracefulStop()) {
resolve();
}
});
}),
]);
throwIfAborted(abort);
this.forceStop();
}
forceStop() {
this.abort.abort(MANUALLY_STOP);
this._status = {
step: SyncEngineStep.Stopped,
local: null,
remotes: this.remotes.map(() => null),
retrying: false,
};
}
// main sync process, should never return until abort
async sync(signal: AbortSignal) {
const state: {
localPeer: SyncPeer | null;
remotePeers: (SyncPeer | null)[];
} = {
localPeer: null,
remotePeers: this.remotes.map(() => null),
};
const cleanUp: (() => void)[] = [];
try {
// Step 1: start local sync peer
state.localPeer = new SyncPeer(
this.rootDoc,
this.local,
this.priorityTarget
);
cleanUp.push(
state.localPeer.onStatusChange.on(() => {
if (!signal.aborted)
this.updateSyncingState(state.localPeer, state.remotePeers);
}).dispose
);
this.updateSyncingState(state.localPeer, state.remotePeers);
// Step 2: wait for local sync complete
await state.localPeer.waitForLoaded(signal);
// Step 3: start remote sync peer
state.remotePeers = this.remotes.map(remote => {
const peer = new SyncPeer(this.rootDoc, remote, this.priorityTarget);
cleanUp.push(
peer.onStatusChange.on(() => {
if (!signal.aborted)
this.updateSyncingState(state.localPeer, state.remotePeers);
}).dispose
);
return peer;
});
this.updateSyncingState(state.localPeer, state.remotePeers);
// Step 4: continuously sync local and remote
// wait for abort
await new Promise((_, reject) => {
if (signal.aborted) {
reject(signal.reason);
}
signal.addEventListener('abort', () => {
reject(signal.reason);
});
});
} catch (error) {
if (error === MANUALLY_STOP || signal.aborted) {
return;
}
throw error;
} finally {
// stop peers
state.localPeer?.stop();
for (const remotePeer of state.remotePeers) {
remotePeer?.stop();
}
for (const clean of cleanUp) {
clean();
}
}
}
updateSyncingState(local: SyncPeer | null, remotes: (SyncPeer | null)[]) {
let step = SyncEngineStep.Synced;
const allPeer = [local, ...remotes];
for (const peer of allPeer) {
if (!peer || peer.status.step !== SyncPeerStep.Synced) {
step = SyncEngineStep.Syncing;
break;
}
}
this.status = {
step,
local: local?.status ?? null,
remotes: remotes.map(peer => peer?.status ?? null),
retrying: allPeer.some(
peer => peer?.status.step === SyncPeerStep.Retrying
),
};
}
async waitForSynced(abort?: AbortSignal) {
if (this.status.step === SyncEngineStep.Synced) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status.step === SyncEngineStep.Synced) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
async waitForLoadedRootDoc(abort?: AbortSignal) {
function isLoadedRootDoc(status: SyncEngineStatus) {
return ![status.local, ...status.remotes].some(
peer => !peer || peer.step <= SyncPeerStep.LoadingRootDoc
);
}
if (isLoadedRootDoc(this.status)) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (isLoadedRootDoc(status)) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
setPriorityRule(target: ((id: string) => boolean) | null) {
this.priorityTarget.priorityRule = target;
}
}

View File

@@ -0,0 +1,20 @@
/**
*
* **SyncEngine**
*
* Manages one local storage and multiple remote storages.
*
* Responsible for creating SyncPeers for synchronization, following the local-first strategy.
*
* **SyncPeer**
*
* Responsible for synchronizing a single storage with Y.Doc.
*
* Carries the main synchronization logic.
*
*/
export * from './consts';
export * from './engine';
export * from './peer';
export * from './storage';

View File

@@ -0,0 +1,444 @@
import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import { isEqual } from '@blocksuite/global/utils';
import type { Doc } from 'yjs';
import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs';
import {
PriorityAsyncQueue,
SharedPriorityTarget,
} from '../../utils/async-queue';
import { mergeUpdates } from '../../utils/merge-updates';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';
import { SyncPeerStep } from './consts';
import type { SyncStorage } from './storage';
export interface SyncPeerStatus {
step: SyncPeerStep;
totalDocs: number;
loadedDocs: number;
pendingPullUpdates: number;
pendingPushUpdates: number;
}
/**
* # SyncPeer
* A SyncPeer is responsible for syncing one Storage with one Y.Doc and its subdocs.
*
* ```
* ┌─────┐
* │Start│
* └──┬──┘
* │
* ┌──────┐ ┌─────▼──────┐ ┌────┐
* │listen◄─────┤pull rootdoc│ │peer│
* └──┬───┘ └─────┬──────┘ └──┬─┘
* │ │ onLoad() │
* ┌──▼───┐ ┌─────▼──────┐ ┌────▼────┐
* │listen◄─────┤pull subdocs│ │subscribe│
* └──┬───┘ └─────┬──────┘ └────┬────┘
* │ │ onReady() │
* ┌──▼──┐ ┌─────▼───────┐ ┌──▼──┐
* │queue├──────►apply updates◄───────┤queue│
* └─────┘ └─────────────┘ └─────┘
* ```
*
* listen: listen for updates from ydoc, typically from user modifications.
* subscribe: listen for updates from storage, typically from other users.
*
*/
export class SyncPeer {
private _status: SyncPeerStatus = {
step: SyncPeerStep.LoadingRootDoc,
totalDocs: 1,
loadedDocs: 0,
pendingPullUpdates: 0,
pendingPushUpdates: 0,
};
onStatusChange = new Slot<SyncPeerStatus>();
readonly abort = new AbortController();
get name() {
return this.storage.name;
}
logger = new DebugLogger('affine:sync-peer:' + this.name);
constructor(
private readonly rootDoc: Doc,
private readonly storage: SyncStorage,
private readonly priorityTarget = new SharedPriorityTarget()
) {
this.logger.debug('peer start');
this.syncRetryLoop(this.abort.signal).catch(err => {
// should not reach here
console.error(err);
});
}
private set status(s: SyncPeerStatus) {
if (!isEqual(s, this._status)) {
this.logger.debug('status change', s);
this._status = s;
this.onStatusChange.emit(s);
}
}
get status() {
return this._status;
}
/**
* stop sync
*
* SyncPeer is one-time use, this peer should be discarded after call stop().
*/
stop() {
this.logger.debug('peer stop');
this.abort.abort(MANUALLY_STOP);
}
/**
* auto retry after 5 seconds if sync failed
*/
async syncRetryLoop(abort: AbortSignal) {
while (abort.aborted === false) {
try {
await this.sync(abort);
} catch (err) {
if (err === MANUALLY_STOP || abort.aborted) {
return;
}
this.logger.error('sync error', err);
}
try {
this.logger.error('retry after 5 seconds');
this.status = {
step: SyncPeerStep.Retrying,
totalDocs: 1,
loadedDocs: 0,
pendingPullUpdates: 0,
pendingPushUpdates: 0,
};
await Promise.race([
new Promise<void>(resolve => {
setTimeout(resolve, 5 * 1000);
}),
new Promise((_, reject) => {
// exit if manually stopped
if (abort.aborted) {
reject(abort.reason);
}
abort.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
} catch (err) {
if (err === MANUALLY_STOP || abort.aborted) {
return;
}
// should never reach here
throw err;
}
}
}
private readonly state: {
connectedDocs: Map<string, Doc>;
pushUpdatesQueue: PriorityAsyncQueue<{
id: string;
data: Uint8Array[];
}>;
pushingUpdate: boolean;
pullUpdatesQueue: PriorityAsyncQueue<{
id: string;
data: Uint8Array;
}>;
subdocLoading: boolean;
subdocsLoadQueue: PriorityAsyncQueue<{ id: string; doc: Doc }>;
} = {
connectedDocs: new Map(),
pushUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget),
pushingUpdate: false,
pullUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget),
subdocLoading: false,
subdocsLoadQueue: new PriorityAsyncQueue([], this.priorityTarget),
};
initState() {
this.state.connectedDocs.clear();
this.state.pushUpdatesQueue.clear();
this.state.pullUpdatesQueue.clear();
this.state.subdocsLoadQueue.clear();
this.state.pushingUpdate = false;
this.state.subdocLoading = false;
}
/**
* main synchronization logic
*/
async sync(abortOuter: AbortSignal) {
this.initState();
const abortInner = new AbortController();
abortOuter.addEventListener('abort', reason => {
abortInner.abort(reason);
});
let dispose: (() => void) | null = null;
try {
this.reportSyncStatus();
// start listen storage updates
dispose = await this.storage.subscribe(
this.handleStorageUpdates,
reason => {
// abort if storage disconnect, should trigger retry loop
abortInner.abort('subscribe disconnect:' + reason);
}
);
throwIfAborted(abortInner.signal);
// Step 1: load root doc
await this.connectDoc(this.rootDoc, abortInner.signal);
// Step 2: load subdocs
this.state.subdocsLoadQueue.push(
...Array.from(this.rootDoc.getSubdocs()).map(doc => ({
id: doc.guid,
doc,
}))
);
this.reportSyncStatus();
this.rootDoc.on('subdocs', this.handleSubdocsUpdate);
// Finally: start sync
await Promise.all([
// load subdocs
(async () => {
while (throwIfAborted(abortInner.signal)) {
const subdoc = await this.state.subdocsLoadQueue.next(
abortInner.signal
);
this.state.subdocLoading = true;
this.reportSyncStatus();
await this.connectDoc(subdoc.doc, abortInner.signal);
this.state.subdocLoading = false;
this.reportSyncStatus();
}
})(),
// pull updates
(async () => {
while (throwIfAborted(abortInner.signal)) {
const { id, data } = await this.state.pullUpdatesQueue.next(
abortInner.signal
);
// don't apply empty data or Uint8Array([0, 0])
if (
!(
data.byteLength === 0 ||
(data.byteLength === 2 && data[0] === 0 && data[1] === 0)
)
) {
const subdoc = this.state.connectedDocs.get(id);
if (subdoc) {
applyUpdate(subdoc, data, this.name);
}
}
this.reportSyncStatus();
}
})(),
// push updates
(async () => {
while (throwIfAborted(abortInner.signal)) {
const { id, data } = await this.state.pushUpdatesQueue.next(
abortInner.signal
);
this.state.pushingUpdate = true;
this.reportSyncStatus();
const merged = mergeUpdates(data);
// don't push empty data or Uint8Array([0, 0])
if (
!(
merged.byteLength === 0 ||
(merged.byteLength === 2 && merged[0] === 0 && merged[1] === 0)
)
) {
await this.storage.push(id, merged);
}
this.state.pushingUpdate = false;
this.reportSyncStatus();
}
})(),
]);
} finally {
dispose?.();
for (const docs of this.state.connectedDocs.values()) {
this.disconnectDoc(docs);
}
this.rootDoc.off('subdocs', this.handleSubdocsUpdate);
}
}
async connectDoc(doc: Doc, abort: AbortSignal) {
const { data: docData, state: inStorageState } =
(await this.storage.pull(doc.guid, encodeStateVector(doc))) ?? {};
throwIfAborted(abort);
if (docData) {
applyUpdate(doc, docData, 'load');
}
// diff root doc and in-storage, save updates to pendingUpdates
this.state.pushUpdatesQueue.push({
id: doc.guid,
data: [encodeStateAsUpdate(doc, inStorageState)],
});
this.state.connectedDocs.set(doc.guid, doc);
// start listen root doc changes
doc.on('update', this.handleYDocUpdates);
// mark rootDoc as loaded
doc.emit('sync', [true]);
this.reportSyncStatus();
}
disconnectDoc(doc: Doc) {
doc.off('update', this.handleYDocUpdates);
this.state.connectedDocs.delete(doc.guid);
this.reportSyncStatus();
}
// handle updates from ydoc
handleYDocUpdates = (update: Uint8Array, origin: string, doc: Doc) => {
// don't push updates from storage
if (origin === this.name) {
return;
}
const exist = this.state.pushUpdatesQueue.find(({ id }) => id === doc.guid);
if (exist) {
exist.data.push(update);
} else {
this.state.pushUpdatesQueue.push({
id: doc.guid,
data: [update],
});
}
this.reportSyncStatus();
};
// handle subdocs changes, append new subdocs to queue, remove subdocs from queue
handleSubdocsUpdate = ({
added,
removed,
}: {
added: Set<Doc>;
removed: Set<Doc>;
}) => {
for (const subdoc of added) {
this.state.subdocsLoadQueue.push({ id: subdoc.guid, doc: subdoc });
}
for (const subdoc of removed) {
this.disconnectDoc(subdoc);
this.state.subdocsLoadQueue.remove(doc => doc.doc === subdoc);
}
this.reportSyncStatus();
};
// handle updates from storage
handleStorageUpdates = (id: string, data: Uint8Array) => {
this.state.pullUpdatesQueue.push({
id,
data,
});
this.reportSyncStatus();
};
reportSyncStatus() {
let step;
if (this.state.connectedDocs.size === 0) {
step = SyncPeerStep.LoadingRootDoc;
} else if (this.state.subdocsLoadQueue.length || this.state.subdocLoading) {
step = SyncPeerStep.LoadingSubDoc;
} else if (
this.state.pullUpdatesQueue.length ||
this.state.pushUpdatesQueue.length ||
this.state.pushingUpdate
) {
step = SyncPeerStep.Syncing;
} else {
step = SyncPeerStep.Synced;
}
this.status = {
step: step,
totalDocs:
this.state.connectedDocs.size + this.state.subdocsLoadQueue.length,
loadedDocs: this.state.connectedDocs.size,
pendingPullUpdates:
this.state.pullUpdatesQueue.length + (this.state.subdocLoading ? 1 : 0),
pendingPushUpdates:
this.state.pushUpdatesQueue.length + (this.state.pushingUpdate ? 1 : 0),
};
}
async waitForSynced(abort?: AbortSignal) {
if (this.status.step >= SyncPeerStep.Synced) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status.step >= SyncPeerStep.Synced) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
async waitForLoaded(abort?: AbortSignal) {
if (this.status.step > SyncPeerStep.Loaded) {
return;
} else {
return Promise.race([
new Promise<void>(resolve => {
this.onStatusChange.on(status => {
if (status.step > SyncPeerStep.Loaded) {
resolve();
}
});
}),
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
}
}
}

View File

@@ -0,0 +1,25 @@
export interface SyncStorage {
/**
* for debug
*/
name: string;
pull(
docId: string,
state: Uint8Array
): Promise<{ data: Uint8Array; state?: Uint8Array } | null>;
push(docId: string, data: Uint8Array): Promise<void>;
/**
* Subscribe to updates from peer
*
* @param cb callback to handle updates
* @param disconnect callback to handle disconnect, reason can be something like 'network-error'
*
* @returns unsubscribe function
*/
subscribe(
cb: (docId: string, data: Uint8Array) => void,
disconnect: (reason: string) => void
): Promise<() => void>;
}

View File

@@ -0,0 +1,13 @@
import type { WorkspaceMetadata } from './metadata';
import type { Workspace } from './workspace';
export interface WorkspaceFactory {
name: string;
openWorkspace(metadata: WorkspaceMetadata): Workspace;
/**
* get blob without open workspace
*/
getWorkspaceBlob(id: string, blobKey: string): Promise<Blob | null>;
}

View File

@@ -0,0 +1,6 @@
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import { Schema } from '@blocksuite/store';
export const globalBlockSuiteSchema = new Schema();
globalBlockSuiteSchema.register(AffineSchemas).register(__unstableSchemas);

View File

@@ -0,0 +1,7 @@
export * from './engine';
export * from './factory';
export * from './global-schema';
export * from './list';
export * from './manager';
export * from './metadata';
export * from './workspace';

View File

@@ -0,0 +1,21 @@
import { type WorkspaceMetadata } from '../metadata';
const CACHE_STORAGE_KEY = 'jotai-workspaces';
export function readWorkspaceListCache() {
const metadata = localStorage.getItem(CACHE_STORAGE_KEY);
if (metadata) {
try {
const items = JSON.parse(metadata) as WorkspaceMetadata[];
return [...items];
} catch (e) {
console.error('cannot parse worksapce', e);
}
return [];
}
return [];
}
export function writeWorkspaceListCache(metadata: WorkspaceMetadata[]) {
localStorage.setItem(CACHE_STORAGE_KEY, JSON.stringify(metadata));
}

View File

@@ -0,0 +1,300 @@
import { DebugLogger } from '@affine/debug';
import type { WorkspaceFlavour } from '@affine/env/workspace';
import { Slot } from '@blocksuite/global/utils';
import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { differenceWith } from 'lodash-es';
import type { BlobStorage } from '../engine';
import type { WorkspaceMetadata } from '../metadata';
import { readWorkspaceListCache, writeWorkspaceListCache } from './cache';
import { type WorkspaceInfo, WorkspaceInformation } from './information';
export * from './information';
const logger = new DebugLogger('affine:workspace:list');
export interface WorkspaceListProvider {
name: WorkspaceFlavour;
/**
* get workspaces list
*/
getList(): Promise<WorkspaceMetadata[]>;
/**
* delete workspace by id
*/
delete(workspaceId: string): Promise<void>;
/**
* create workspace
* @param initial callback to put initial data to workspace
*/
create(
initial: (
workspace: BlockSuiteWorkspace,
blobStorage: BlobStorage
) => Promise<void>
): Promise<string>;
/**
* Start subscribe workspaces list
*
* @returns unsubscribe function
*/
subscribe(
callback: (changed: {
added?: WorkspaceMetadata[];
deleted?: WorkspaceMetadata[];
}) => void
): () => void;
/**
* get workspace avatar and name by id
*
* @param id workspace id
*/
getInformation(id: string): Promise<WorkspaceInfo | undefined>;
}
export interface WorkspaceListStatus {
/**
* is workspace list doing first loading.
* if false, UI can display workspace not found page.
*/
loading: boolean;
workspaceList: WorkspaceMetadata[];
}
/**
* # WorkspaceList
*
* manage multiple workspace metadata list providers.
* provide a __cache-first__ and __offline useable__ workspace list.
*/
export class WorkspaceList {
private readonly abortController = new AbortController();
private readonly workspaceInformationList = new Map<
string,
WorkspaceInformation
>();
onStatusChanged = new Slot<WorkspaceListStatus>();
private _status: Readonly<WorkspaceListStatus> = {
loading: true,
workspaceList: [],
};
get status() {
return this._status;
}
set status(status) {
this._status = status;
// update cache
writeWorkspaceListCache(status.workspaceList);
this.onStatusChanged.emit(this._status);
}
get workspaceList() {
return this.status.workspaceList;
}
constructor(private readonly providers: WorkspaceListProvider[]) {
// initialize workspace list from cache
const cache = readWorkspaceListCache();
const workspaceList = cache;
this.status = {
...this.status,
workspaceList,
};
// start first load
this.startLoad();
}
/**
* create workspace
* @param flavour workspace flavour
* @param initial callback to put initial data to workspace
* @returns workspace id
*/
async create(
flavour: WorkspaceFlavour,
initial: (
workspace: BlockSuiteWorkspace,
blobStorage: BlobStorage
) => Promise<void>
) {
const provider = this.providers.find(x => x.name === flavour);
if (!provider) {
throw new Error(`Unknown workspace flavour: ${flavour}`);
}
const id = await provider.create(initial);
const metadata = {
id,
flavour,
};
// update workspace list
this.status = this.addWorkspace(this.status, metadata);
return id;
}
/**
* delete workspace
* @param workspaceMetadata
*/
async delete(workspaceMetadata: WorkspaceMetadata) {
logger.info(
`delete workspace [${workspaceMetadata.flavour}] ${workspaceMetadata.id}`
);
const provider = this.providers.find(
x => x.name === workspaceMetadata.flavour
);
if (!provider) {
throw new Error(
`Unknown workspace flavour: ${workspaceMetadata.flavour}`
);
}
await provider.delete(workspaceMetadata.id);
// delete workspace from list
this.status = this.deleteWorkspace(this.status, workspaceMetadata);
}
/**
* add workspace to list
*/
private addWorkspace(
status: WorkspaceListStatus,
workspaceMetadata: WorkspaceMetadata
) {
if (status.workspaceList.some(x => x.id === workspaceMetadata.id)) {
return status;
}
return {
...status,
workspaceList: status.workspaceList.concat(workspaceMetadata),
};
}
/**
* delete workspace from list
*/
private deleteWorkspace(
status: WorkspaceListStatus,
workspaceMetadata: WorkspaceMetadata
) {
if (!status.workspaceList.some(x => x.id === workspaceMetadata.id)) {
return status;
}
return {
...status,
workspaceList: status.workspaceList.filter(
x => x.id !== workspaceMetadata.id
),
};
}
/**
* callback for subscribe workspaces list
*/
private handleWorkspaceChange(changed: {
added?: WorkspaceMetadata[];
deleted?: WorkspaceMetadata[];
}) {
let status = this.status;
for (const added of changed.added ?? []) {
status = this.addWorkspace(status, added);
}
for (const deleted of changed.deleted ?? []) {
status = this.deleteWorkspace(status, deleted);
}
this.status = status;
}
/**
* start first load workspace list
*/
private startLoad() {
for (const provider of this.providers) {
// subscribe workspace list change
const unsubscribe = provider.subscribe(changed => {
this.handleWorkspaceChange(changed);
});
// unsubscribe when abort
if (this.abortController.signal.aborted) {
unsubscribe();
return;
}
this.abortController.signal.addEventListener('abort', () => {
unsubscribe();
});
}
this.revalidate()
.catch(error => {
logger.error('load workspace list error: ' + error);
})
.finally(() => {
this.status = {
...this.status,
loading: false,
};
});
}
async revalidate() {
await Promise.allSettled(
this.providers.map(async provider => {
try {
const list = await provider.getList();
const oldList = this.workspaceList.filter(
w => w.flavour === provider.name
);
this.handleWorkspaceChange({
added: differenceWith(list, oldList, (a, b) => a.id === b.id),
deleted: differenceWith(oldList, list, (a, b) => a.id === b.id),
});
} catch (error) {
logger.error('load workspace list error: ' + error);
}
})
);
}
/**
* get workspace information, if not exists, create it.
*/
getInformation(meta: WorkspaceMetadata) {
const exists = this.workspaceInformationList.get(meta.id);
if (exists) {
return exists;
}
return this.createInformation(meta);
}
private createInformation(workspaceMetadata: WorkspaceMetadata) {
const provider = this.providers.find(
x => x.name === workspaceMetadata.flavour
);
if (!provider) {
throw new Error(
`Unknown workspace flavour: ${workspaceMetadata.flavour}`
);
}
const information = new WorkspaceInformation(workspaceMetadata, provider);
information.fetch();
this.workspaceInformationList.set(workspaceMetadata.id, information);
return information;
}
dispose() {
this.abortController.abort();
}
}

View File

@@ -0,0 +1,97 @@
import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import type { WorkspaceMetadata } from '../metadata';
import type { Workspace } from '../workspace';
import type { WorkspaceListProvider } from '.';
const logger = new DebugLogger('affine:workspace:list:information');
const WORKSPACE_INFORMATION_CACHE_KEY = 'workspace-information:';
export interface WorkspaceInfo {
avatar?: string;
name?: string;
}
/**
* # WorkspaceInformation
*
* This class take care of workspace avatar and name
*
* The class will try to get from 3 places:
* - local cache
* - fetch from `WorkspaceListProvider`, which will fetch from database or server
* - sync with active workspace
*/
export class WorkspaceInformation {
private _info: WorkspaceInfo = {};
public set info(info: WorkspaceInfo) {
if (info.avatar !== this._info.avatar || info.name !== this._info.name) {
localStorage.setItem(
WORKSPACE_INFORMATION_CACHE_KEY + this.meta.id,
JSON.stringify(info)
);
this._info = info;
this.onUpdated.emit(info);
}
}
public get info() {
return this._info;
}
public onUpdated = new Slot<WorkspaceInfo>();
constructor(
public meta: WorkspaceMetadata,
public provider: WorkspaceListProvider
) {
const cached = this.getCachedInformation();
// init with cached information
this.info = { ...cached };
}
/**
* sync information with workspace
*/
syncWithWorkspace(workspace: Workspace) {
this.info = {
avatar: workspace.blockSuiteWorkspace.meta.avatar ?? this.info.avatar,
name: workspace.blockSuiteWorkspace.meta.name ?? this.info.name,
};
workspace.blockSuiteWorkspace.meta.commonFieldsUpdated.on(() => {
this.info = {
avatar: workspace.blockSuiteWorkspace.meta.avatar ?? this.info.avatar,
name: workspace.blockSuiteWorkspace.meta.name ?? this.info.name,
};
});
}
getCachedInformation() {
const cache = localStorage.getItem(
WORKSPACE_INFORMATION_CACHE_KEY + this.meta.id
);
if (cache) {
return JSON.parse(cache) as WorkspaceInfo;
}
return null;
}
/**
* fetch information from provider
*/
fetch() {
this.provider
.getInformation(this.meta.id)
.then(info => {
if (info) {
this.info = info;
}
})
.catch(err => {
logger.warn('get workspace information error: ' + err);
});
}
}

View File

@@ -0,0 +1,169 @@
import { DebugLogger } from '@affine/debug';
import { WorkspaceFlavour } from '@affine/env/workspace';
import { assertEquals } from '@blocksuite/global/utils';
import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import { fixWorkspaceVersion } from '@toeverything/infra/blocksuite';
import { applyUpdate, encodeStateAsUpdate } from 'yjs';
import type { BlobStorage } from '.';
import type { WorkspaceFactory } from './factory';
import type { WorkspaceList } from './list';
import type { WorkspaceMetadata } from './metadata';
import { WorkspacePool } from './pool';
import type { Workspace } from './workspace';
const logger = new DebugLogger('affine:workspace-manager');
/**
* # `WorkspaceManager`
*
* This class acts as the central hub for managing various aspects of workspaces.
* It is structured as follows:
*
* ```
* ┌───────────┐
* │ Workspace │
* │ Manager │
* └─────┬─────┘
* ┌─────────────┼─────────────┐
* ┌───┴───┐ ┌───┴───┐ ┌─────┴─────┐
* │ List │ │ Pool │ │ Factories │
* └───────┘ └───────┘ └───────────┘
* ```
*
* Manage every about workspace
*
* # List
*
* The `WorkspaceList` component stores metadata for all workspaces, also include workspace avatar and custom name.
*
* # Factories
*
* This class contains a collection of `WorkspaceFactory`,
* We utilize `metadata.flavour` to identify the appropriate factory for opening a workspace.
* Once opened, workspaces are stored in the `WorkspacePool`.
*
* # Pool
*
* The `WorkspacePool` use reference counting to manage active workspaces.
* Calling `use()` to create a reference to the workspace. Calling `release()` to release the reference.
* When the reference count is 0, it will close the workspace.
*
*/
export class WorkspaceManager {
pool: WorkspacePool = new WorkspacePool();
constructor(
public list: WorkspaceList,
public factories: WorkspaceFactory[]
) {}
/**
* get workspace reference by metadata.
*
* You basically don't need to call this function directly, use the react hook `useWorkspace(metadata)` instead.
*
* @returns the workspace reference and a release function, don't forget to call release function when you don't
* need the workspace anymore.
*/
use(metadata: WorkspaceMetadata): {
workspace: Workspace;
release: () => void;
} {
const exist = this.pool.get(metadata.id);
if (exist) {
return exist;
}
const workspace = this.open(metadata);
const ref = this.pool.put(workspace);
return ref;
}
createWorkspace(
flavour: WorkspaceFlavour,
initial: (
workspace: BlockSuiteWorkspace,
blobStorage: BlobStorage
) => Promise<void>
): Promise<string> {
logger.info(`create workspace [${flavour}]`);
return this.list.create(flavour, initial);
}
/**
* delete workspace by metadata, same as `WorkspaceList.deleteWorkspace`
*/
async deleteWorkspace(metadata: WorkspaceMetadata) {
await this.list.delete(metadata);
}
/**
* helper function to transform local workspace to cloud workspace
*/
async transformLocalToCloud(local: Workspace): Promise<WorkspaceMetadata> {
assertEquals(local.flavour, WorkspaceFlavour.LOCAL);
await local.engine.sync.waitForSynced();
const newId = await this.list.create(
WorkspaceFlavour.AFFINE_CLOUD,
async (ws, bs) => {
applyUpdate(ws.doc, encodeStateAsUpdate(local.blockSuiteWorkspace.doc));
for (const subdoc of local.blockSuiteWorkspace.doc.getSubdocs()) {
for (const newSubdoc of ws.doc.getSubdocs()) {
if (newSubdoc.guid === subdoc.guid) {
applyUpdate(newSubdoc, encodeStateAsUpdate(subdoc));
}
}
}
const blobList = await local.engine.blob.list();
for (const blobKey of blobList) {
const blob = await local.engine.blob.get(blobKey);
if (blob) {
await bs.set(blobKey, blob);
}
}
}
);
await this.list.delete(local.meta);
return {
id: newId,
flavour: WorkspaceFlavour.AFFINE_CLOUD,
};
}
/**
* helper function to get blob without open workspace, its be used for download workspace avatars.
*/
getWorkspaceBlob(metadata: WorkspaceMetadata, blobKey: string) {
const factory = this.factories.find(x => x.name === metadata.flavour);
if (!factory) {
throw new Error(`Unknown workspace flavour: ${metadata.flavour}`);
}
return factory.getWorkspaceBlob(metadata.id, blobKey);
}
private open(metadata: WorkspaceMetadata) {
logger.info(`open workspace [${metadata.flavour}] ${metadata.id} `);
const factory = this.factories.find(x => x.name === metadata.flavour);
if (!factory) {
throw new Error(`Unknown workspace flavour: ${metadata.flavour}`);
}
const workspace = factory.openWorkspace(metadata);
// sync information with workspace list, when workspace's avatar and name changed, information will be updated
this.list.getInformation(metadata).syncWithWorkspace(workspace);
// apply compatibility fix
fixWorkspaceVersion(workspace.blockSuiteWorkspace.doc);
return workspace;
}
}

View File

@@ -0,0 +1,3 @@
import type { WorkspaceFlavour } from '@affine/env/workspace';
export type WorkspaceMetadata = { id: string; flavour: WorkspaceFlavour };

View File

@@ -0,0 +1,86 @@
import { DebugLogger } from '@affine/debug';
import { Unreachable } from '@affine/env/constant';
import type { Workspace } from './workspace';
const logger = new DebugLogger('affine:workspace-manager:pool');
/**
* Collection of opened workspaces. use reference counting to manage active workspaces.
*/
export class WorkspacePool {
openedWorkspaces = new Map<string, { workspace: Workspace; rc: number }>();
timeoutToGc: NodeJS.Timeout | null = null;
get(workspaceId: string): {
workspace: Workspace;
release: () => void;
} | null {
const exist = this.openedWorkspaces.get(workspaceId);
if (exist) {
exist.rc++;
let released = false;
return {
workspace: exist.workspace,
release: () => {
// avoid double release
if (released) {
return;
}
released = true;
exist.rc--;
this.requestGc();
},
};
}
return null;
}
put(workspace: Workspace) {
const ref = { workspace, rc: 0 };
this.openedWorkspaces.set(workspace.meta.id, ref);
const r = this.get(workspace.meta.id);
if (!r) {
throw new Unreachable();
}
return r;
}
private requestGc() {
if (this.timeoutToGc) {
clearInterval(this.timeoutToGc);
}
// do gc every 1s
this.timeoutToGc = setInterval(() => {
this.gc();
}, 1000);
}
private gc() {
for (const [id, { workspace, rc }] of new Map(
this.openedWorkspaces /* clone the map, because the origin will be modified during iteration */
)) {
if (rc === 0 && workspace.canGracefulStop()) {
// we can safely close the workspace
logger.info(`close workspace [${workspace.flavour}] ${workspace.id}`);
workspace.forceStop();
this.openedWorkspaces.delete(id);
}
}
for (const [_, { rc }] of this.openedWorkspaces) {
if (rc === 0) {
return;
}
}
// if all workspaces has referrer, stop gc
if (this.timeoutToGc) {
clearInterval(this.timeoutToGc);
}
}
}

View File

@@ -0,0 +1,148 @@
import { Unreachable } from '@affine/env/constant';
import { WorkspaceFlavour } from '@affine/env/workspace';
import { Slot } from '@blocksuite/global/utils';
import {
checkWorkspaceCompatibility,
MigrationPoint,
} from '@toeverything/infra/blocksuite';
import {
forceUpgradePages,
upgradeV1ToV2,
} from '@toeverything/infra/blocksuite';
import { migrateGuidCompatibility } from '@toeverything/infra/blocksuite';
import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import type { WorkspaceManager } from '..';
import type { Workspace } from '../workspace';
export interface WorkspaceUpgradeStatus {
needUpgrade: boolean;
upgrading: boolean;
}
export class WorkspaceUpgradeController {
_status: Readonly<WorkspaceUpgradeStatus> = {
needUpgrade: false,
upgrading: false,
};
readonly onStatusChange = new Slot<WorkspaceUpgradeStatus>();
get status() {
return this._status;
}
set status(value) {
if (
value.needUpgrade !== this._status.needUpgrade ||
value.upgrading !== this._status.upgrading
) {
this._status = value;
this.onStatusChange.emit(value);
}
}
constructor(private readonly workspace: Workspace) {
workspace.blockSuiteWorkspace.doc.on('update', () => {
this.checkIfNeedUpgrade();
});
}
checkIfNeedUpgrade() {
const needUpgrade = !!checkWorkspaceCompatibility(
this.workspace.blockSuiteWorkspace,
this.workspace.flavour === WorkspaceFlavour.AFFINE_CLOUD
);
this.status = {
...this.status,
needUpgrade,
};
return needUpgrade;
}
async upgrade(workspaceManager: WorkspaceManager): Promise<string | null> {
if (this.status.upgrading) {
return null;
}
this.status = { ...this.status, upgrading: true };
try {
await this.workspace.engine.sync.waitForSynced();
const step = checkWorkspaceCompatibility(
this.workspace.blockSuiteWorkspace,
this.workspace.flavour === WorkspaceFlavour.AFFINE_CLOUD
);
if (!step) {
return null;
}
// Clone a new doc to prevent change events.
const clonedDoc = new YDoc({
guid: this.workspace.blockSuiteWorkspace.doc.guid,
});
applyDoc(clonedDoc, this.workspace.blockSuiteWorkspace.doc);
if (step === MigrationPoint.SubDoc) {
const newWorkspace = await workspaceManager.createWorkspace(
WorkspaceFlavour.LOCAL,
async (workspace, blobStorage) => {
await upgradeV1ToV2(clonedDoc, workspace.doc);
migrateGuidCompatibility(clonedDoc);
await forceUpgradePages(
workspace.doc,
this.workspace.blockSuiteWorkspace.schema
);
const blobList =
await this.workspace.blockSuiteWorkspace.blob.list();
for (const blobKey of blobList) {
const blob =
await this.workspace.blockSuiteWorkspace.blob.get(blobKey);
if (blob) {
await blobStorage.set(blobKey, blob);
}
}
}
);
await workspaceManager.deleteWorkspace(this.workspace.meta);
return newWorkspace;
} else if (step === MigrationPoint.GuidFix) {
migrateGuidCompatibility(clonedDoc);
await forceUpgradePages(
clonedDoc,
this.workspace.blockSuiteWorkspace.schema
);
applyDoc(this.workspace.blockSuiteWorkspace.doc, clonedDoc);
await this.workspace.engine.sync.waitForSynced();
return null;
} else if (step === MigrationPoint.BlockVersion) {
await forceUpgradePages(
clonedDoc,
this.workspace.blockSuiteWorkspace.schema
);
applyDoc(this.workspace.blockSuiteWorkspace.doc, clonedDoc);
await this.workspace.engine.sync.waitForSynced();
return null;
} else {
throw new Unreachable();
}
} finally {
this.status = { ...this.status, upgrading: false };
}
}
}
function applyDoc(target: YDoc, result: YDoc) {
applyUpdate(target, encodeStateAsUpdate(result));
for (const targetSubDoc of target.subdocs.values()) {
const resultSubDocs = Array.from(result.subdocs.values());
const resultSubDoc = resultSubDocs.find(
item => item.guid === targetSubDoc.guid
);
if (resultSubDoc) {
applyDoc(targetSubDoc, resultSubDoc);
}
}
}

View File

@@ -0,0 +1,45 @@
import { describe, expect, test, vi } from 'vitest';
import { AsyncQueue } from '../async-queue';
describe('async-queue', () => {
test('push & pop', async () => {
const queue = new AsyncQueue();
queue.push(1, 2, 3);
expect(queue.length).toBe(3);
expect(await queue.next()).toBe(1);
expect(await queue.next()).toBe(2);
expect(await queue.next()).toBe(3);
expect(queue.length).toBe(0);
});
test('await', async () => {
const queue = new AsyncQueue<number>();
queue.push(1, 2);
expect(await queue.next()).toBe(1);
expect(await queue.next()).toBe(2);
let v = -1;
// setup 2 pop tasks
queue.next().then(next => {
v = next;
});
queue.next().then(next => {
v = next;
});
// Wait for 100ms
await new Promise(resolve => setTimeout(resolve, 100));
// v should not be changed
expect(v).toBe(-1);
// push 3, should trigger the first pop task
queue.push(3);
await vi.waitFor(() => v === 3);
// push 4, should trigger the second pop task
queue.push(4);
await vi.waitFor(() => v === 4);
});
});

View File

@@ -0,0 +1,13 @@
import { describe, expect, test } from 'vitest';
import { throwIfAborted } from '../throw-if-aborted';
describe('throw-if-aborted', () => {
test('basic', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;
expect(throwIfAborted(abortSignal)).toBe(true);
abortController.abort('TEST_ABORT');
expect(() => throwIfAborted(abortSignal)).toThrowError('TEST_ABORT');
});
});

View File

@@ -0,0 +1,101 @@
export class AsyncQueue<T> {
private _queue: T[];
private _resolveUpdate: (() => void) | null = null;
private _waitForUpdate: Promise<void> | null = null;
constructor(init: T[] = []) {
this._queue = init;
}
get length() {
return this._queue.length;
}
async next(
abort?: AbortSignal,
dequeue: (arr: T[]) => T | undefined = a => a.shift()
): Promise<T> {
const update = dequeue(this._queue);
if (update) {
return update;
} else {
if (!this._waitForUpdate) {
this._waitForUpdate = new Promise(resolve => {
this._resolveUpdate = resolve;
});
}
await Promise.race([
this._waitForUpdate,
new Promise((_, reject) => {
if (abort?.aborted) {
reject(abort?.reason);
}
abort?.addEventListener('abort', () => {
reject(abort.reason);
});
}),
]);
return this.next(abort, dequeue);
}
}
push(...updates: T[]) {
this._queue.push(...updates);
if (this._resolveUpdate) {
const resolve = this._resolveUpdate;
this._resolveUpdate = null;
this._waitForUpdate = null;
resolve();
}
}
remove(predicate: (update: T) => boolean) {
const index = this._queue.findIndex(predicate);
if (index !== -1) {
this._queue.splice(index, 1);
}
}
find(predicate: (update: T) => boolean) {
return this._queue.find(predicate);
}
clear() {
this._queue = [];
}
}
export class PriorityAsyncQueue<
T extends { id: string },
> extends AsyncQueue<T> {
constructor(
init: T[] = [],
public readonly priorityTarget: SharedPriorityTarget = new SharedPriorityTarget()
) {
super(init);
}
override next(abort?: AbortSignal | undefined): Promise<T> {
return super.next(abort, arr => {
if (this.priorityTarget.priorityRule !== null) {
const index = arr.findIndex(
update => this.priorityTarget.priorityRule?.(update.id)
);
if (index !== -1) {
return arr.splice(index, 1)[0];
}
}
return arr.shift();
});
}
}
/**
* Shared priority target can be shared by multiple queues.
*/
export class SharedPriorityTarget {
public priorityRule: ((id: string) => boolean) | null = null;
}

View File

@@ -0,0 +1,17 @@
import { applyUpdate, Doc, encodeStateAsUpdate } from 'yjs';
export function mergeUpdates(updates: Uint8Array[]) {
if (updates.length === 0) {
return new Uint8Array();
}
if (updates.length === 1) {
return updates[0];
}
const doc = new Doc();
doc.transact(() => {
updates.forEach(update => {
applyUpdate(doc, update);
});
});
return encodeStateAsUpdate(doc);
}

View File

@@ -0,0 +1,9 @@
// because AbortSignal.throwIfAborted is not available in abortcontroller-polyfill
export function throwIfAborted(abort?: AbortSignal) {
if (abort?.aborted) {
throw new Error(abort.reason);
}
return true;
}
export const MANUALLY_STOP = 'manually-stop';

View File

@@ -0,0 +1,137 @@
import { DebugLogger } from '@affine/debug';
import { Slot } from '@blocksuite/global/utils';
import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import type { WorkspaceEngine, WorkspaceEngineStatus } from './engine';
import type { WorkspaceMetadata } from './metadata';
import {
WorkspaceUpgradeController,
type WorkspaceUpgradeStatus,
} from './upgrade';
const logger = new DebugLogger('affine:workspace');
export type WorkspaceStatus = {
mode: 'ready' | 'closed';
engine: WorkspaceEngineStatus;
upgrade: WorkspaceUpgradeStatus;
};
/**
* # Workspace
*
* ```
* ┌───────────┐
* │ Workspace │
* └─────┬─────┘
* │
* │
* ┌──────────────┼─────────────┐
* │ │ │
* ┌───┴─────┐ ┌──────┴─────┐ ┌───┴────┐
* │ Upgrade │ │ blocksuite │ │ Engine │
* └─────────┘ └────────────┘ └───┬────┘
* │
* ┌──────┼─────────┐
* │ │ │
* ┌──┴─┐ ┌──┴─┐ ┌─────┴───┐
* │sync│ │blob│ │awareness│
* └────┘ └────┘ └─────────┘
* ```
*
* This class contains all the components needed to run a workspace.
*/
export class Workspace {
get id() {
return this.meta.id;
}
get flavour() {
return this.meta.flavour;
}
private _status: WorkspaceStatus;
upgrade: WorkspaceUpgradeController;
/**
* event on workspace stop, workspace is one-time use, so it will be triggered only once
*/
onStop = new Slot();
onStatusChange = new Slot<WorkspaceStatus>();
get status() {
return this._status;
}
set status(status: WorkspaceStatus) {
this._status = status;
this.onStatusChange.emit(status);
}
constructor(
public meta: WorkspaceMetadata,
public engine: WorkspaceEngine,
public blockSuiteWorkspace: BlockSuiteWorkspace
) {
this.upgrade = new WorkspaceUpgradeController(this);
this._status = {
mode: 'closed',
engine: engine.status,
upgrade: this.upgrade.status,
};
this.engine.onStatusChange.on(status => {
this.status = {
...this.status,
engine: status,
};
});
this.upgrade.onStatusChange.on(status => {
this.status = {
...this.status,
upgrade: status,
};
});
this.start();
}
/**
* workspace start when create and workspace is one-time use
*/
private start() {
if (this.status.mode === 'ready') {
return;
}
logger.info('start workspace', this.id);
this.engine.start();
this.status = {
...this.status,
mode: 'ready',
engine: this.engine.status,
};
}
canGracefulStop() {
return this.engine.canGracefulStop() && !this.status.upgrade.upgrading;
}
forceStop() {
if (this.status.mode === 'closed') {
return;
}
logger.info('stop workspace', this.id);
this.engine.forceStop();
this.status = {
...this.status,
mode: 'closed',
engine: this.engine.status,
};
this.onStop.emit();
}
// same as `WorkspaceEngine.sync.setPriorityRule`
setPriorityRule(target: ((id: string) => boolean) | null) {
this.engine.sync.setPriorityRule(target);
}
}

View File

@@ -0,0 +1,16 @@
{
"extends": "../../../tsconfig.json",
"include": ["./src"],
"compilerOptions": {
"noEmit": false,
"outDir": "lib"
},
"references": [
{ "path": "../../../tests/fixtures" },
{ "path": "../../common/env" },
{ "path": "../../common/debug" },
{ "path": "../../common/infra" },
{ "path": "../../frontend/graphql" },
{ "path": "../../frontend/electron-api" }
]
}