feat: confilict

This commit is contained in:
DiamondThree
2023-01-09 18:50:37 +08:00
17 changed files with 151 additions and 724 deletions

View File

@@ -1,6 +1,6 @@
import { Workspaces } from './workspaces';
import type { WorkspacesChangeEvent } from './workspaces';
import { BlobStorage, Workspace } from '@blocksuite/store';
import { Workspace } from '@blocksuite/store';
import { BaseProvider } from './provider/base';
import { LocalProvider } from './provider/local/local';
import { AffineProvider } from './provider';
@@ -17,7 +17,7 @@ import { applyUpdate, encodeStateAsUpdate } from 'yjs';
export class DataCenter {
private readonly _workspaces = new Workspaces();
private readonly _logger = getLogger('dc');
private readonly _blobStorage: BlobStorage = new BlobStorage();
private _workspaceInstances: Map<string, Workspace> = new Map();
/**
* A mainProvider must exist as the only data trustworthy source.
*/
@@ -35,14 +35,12 @@ export class DataCenter {
new LocalProvider({
logger: dc._logger,
workspaces: dc._workspaces.createScope(),
blobs: dc._blobStorage,
})
);
dc.registerProvider(
new AffineProvider({
logger: dc._logger,
workspaces: dc._workspaces.createScope(),
blobs: dc._blobStorage,
})
);
@@ -83,7 +81,7 @@ export class DataCenter {
/**
* create new workspace , new workspace is a local workspace
* @param {string} name workspace name
* @returns {Promise<WS>}
* @returns {Promise<Workspace>}
*/
public async createWorkspace(workspaceMeta: WorkspaceMeta) {
assert(
@@ -112,9 +110,14 @@ export class DataCenter {
* @param {string} workspaceId workspace id
*/
private _getWorkspace(workspaceId: string) {
return new Workspace({
room: workspaceId,
}).register(BlockSchema);
const workspaceInfo = this._workspaces.find(workspaceId);
assert(workspaceInfo, 'Workspace not found');
return (
this._workspaceInstances.get(workspaceId) ||
new Workspace({
room: workspaceId,
}).register(BlockSchema)
);
}
/**
@@ -152,8 +155,9 @@ export class DataCenter {
const provider = this.providerMap.get(workspaceInfo.provider);
assert(provider, `provide '${workspaceInfo.provider}' is not registered`);
this._logger(`Loading ${workspaceInfo.provider} workspace: `, workspaceId);
return await provider.warpWorkspace(this._getWorkspace(workspaceId));
const workspace = this._getWorkspace(workspaceId);
this._workspaceInstances.set(workspaceId, workspace);
return await provider.warpWorkspace(workspace);
}
/**
@@ -340,8 +344,9 @@ export class DataCenter {
* @param id
* @returns {Promise<string | null>} blob url
*/
async getBlob(id: string): Promise<string | null> {
return await this._blobStorage.get(id);
async getBlob(workspace: Workspace, id: string): Promise<string | null> {
const blob = await workspace.blobs;
return (await blob?.get(id)) || '';
}
/**
@@ -349,7 +354,8 @@ export class DataCenter {
* @param id
* @returns {Promise<string | null>} blob url
*/
async setBlob(blob: Blob): Promise<string> {
return await this._blobStorage.set(blob);
async setBlob(workspace: Workspace, blob: Blob): Promise<string> {
const blobStorage = await workspace.blobs;
return (await blobStorage?.set(blob)) || '';
}
}

View File

@@ -1,15 +1,3 @@
import {
getWorkspaces,
getWorkspaceDetail,
WorkspaceDetail,
downloadWorkspace,
deleteWorkspace,
leaveWorkspace,
inviteMember,
removeMember,
createWorkspace,
updateWorkspace,
} from './apis/workspace';
import { BaseProvider } from '../base';
import type { ProviderConstructorParams } from '../base';
import { User, Workspace as WS, WorkspaceMeta } from '../../types';
@@ -21,9 +9,15 @@ import { varStorage as storage } from 'lib0/storage';
import assert from 'assert';
import { getAuthorizer } from './apis/token';
import { WebsocketProvider } from './sync';
import { IndexedDBProvider } from '../indexeddb';
// import { IndexedDBProvider } from '../local/indexeddb';
import { getDefaultHeadImgBlob } from '../../utils';
import { getUserByEmail } from './apis/user';
import { getApis } from './apis/index.js';
import type { Apis, WorkspaceDetail } from './apis';
export interface AffineProviderConstructorParams
extends ProviderConstructorParams {
apis?: Apis;
}
export class AffineProvider extends BaseProvider {
public id = 'affine';
@@ -32,10 +26,12 @@ export class AffineProvider extends BaseProvider {
private readonly _authorizer = getAuthorizer();
private _user: User | undefined = undefined;
private _wsMap: Map<string, WebsocketProvider> = new Map();
private _idbMap: Map<string, IndexedDBProvider> = new Map();
private _apis: Apis;
// private _idbMap: Map<string, IndexedDBProvider> = new Map();
constructor(params: ProviderConstructorParams) {
constructor({ apis, ...params }: AffineProviderConstructorParams) {
super(params);
this._apis = apis || getApis();
}
override async init() {
@@ -69,16 +65,26 @@ export class AffineProvider extends BaseProvider {
override async warpWorkspace(workspace: Workspace) {
const { doc, room } = workspace;
assert(room);
this._initWorkspaceDb(workspace);
const updates = await downloadWorkspace(room);
this.linkLocal(workspace);
const updates = await this._apis.downloadWorkspace(room);
if (updates) {
await new Promise(resolve => {
doc.once('update', resolve);
applyUpdate(doc, new Uint8Array(updates));
});
}
const ws = new WebsocketProvider('/', room, doc);
this._wsMap.set(room, ws);
let ws = this._wsMap.get(room);
if (!ws) {
ws = new WebsocketProvider('/', room, doc);
this._wsMap.set(room, ws);
}
// close all websocket links
Array.from(this._wsMap.entries()).forEach(([id, ws]) => {
if (id !== room) {
ws.disconnect();
}
});
ws.connect();
await new Promise<void>((resolve, reject) => {
// TODO: synced will also be triggered on reconnection after losing sync
// There needs to be an event mechanism to emit the synchronization state to the upper layer
@@ -94,7 +100,7 @@ export class AffineProvider extends BaseProvider {
if (!token.isLogin) {
return [];
}
const workspacesList = await getWorkspaces();
const workspacesList = await this._apis.getWorkspaces();
const workspaces: WS[] = workspacesList.map(w => {
return {
...w,
@@ -112,7 +118,7 @@ export class AffineProvider extends BaseProvider {
this._workspacesCache.set(id, workspace);
if (workspace) {
return new Promise<Workspace>(resolve => {
downloadWorkspace(id).then(data => {
this._apis.downloadWorkspace(id).then(data => {
applyUpdate(workspace.doc, new Uint8Array(data));
resolve(workspace);
});
@@ -135,7 +141,7 @@ export class AffineProvider extends BaseProvider {
const { id } = w;
return new Promise<{ id: string; detail: WorkspaceDetail | null }>(
resolve => {
getWorkspaceDetail({ id }).then(data => {
this._apis.getWorkspaceDetail({ id }).then(data => {
resolve({ id, detail: data || null });
});
}
@@ -193,8 +199,8 @@ export class AffineProvider extends BaseProvider {
public override async deleteWorkspace(id: string): Promise<void> {
await this.closeWorkspace(id);
IndexedDBProvider.delete(id);
await deleteWorkspace({ id });
// IndexedDBProvider.delete(id);
await this._apis.deleteWorkspace({ id });
this._workspaces.remove(id);
}
@@ -213,53 +219,49 @@ export class AffineProvider extends BaseProvider {
}
public override async closeWorkspace(id: string) {
const idb = this._idbMap.get(id);
idb?.destroy();
// const idb = this._idbMap.get(id);
// idb?.destroy();
const ws = this._wsMap.get(id);
ws?.disconnect();
}
public override async leaveWorkspace(id: string): Promise<void> {
await leaveWorkspace({ id });
await this._apis.leaveWorkspace({ id });
}
public override async invite(id: string, email: string): Promise<void> {
return await inviteMember({ id, email });
return await this._apis.inviteMember({ id, email });
}
public override async removeMember(permissionId: number): Promise<void> {
return await removeMember({ permissionId });
return await this._apis.removeMember({ permissionId });
}
private async _initWorkspaceDb(workspace: Workspace) {
assert(workspace.room);
let idb = this._idbMap.get(workspace.room);
idb?.destroy();
idb = new IndexedDBProvider(workspace.room, workspace.doc);
this._idbMap.set(workspace.room, idb);
await idb.whenSynced;
this._logger('Local data loaded');
return idb;
public override async linkLocal(workspace: Workspace) {
return workspace;
// assert(workspace.room);
// let idb = this._idbMap.get(workspace.room);
// idb?.destroy();
// idb = new IndexedDBProvider(workspace.room, workspace.doc);
// this._idbMap.set(workspace.room, idb);
// await idb.whenSynced;
// this._logger('Local data loaded');
// return workspace;
}
public override async createWorkspace(
meta: WorkspaceMeta
): Promise<Workspace | undefined> {
assert(meta.name, 'Workspace name is required');
if (!meta.avatar) {
// set default avatar
const blob = await getDefaultHeadImgBlob(meta.name);
const blobId = await this.setBlob(blob);
meta.avatar = (await this.getBlob(blobId)) || '';
}
const { id } = await createWorkspace(meta as Required<WorkspaceMeta>);
const { id } = await this._apis.createWorkspace(
meta as Required<WorkspaceMeta>
);
this._logger('Creating affine workspace');
const nw = new Workspace({
room: id,
}).register(BlockSchema);
nw.meta.setName(meta.name);
nw.meta.setAvatar(meta.avatar);
this._initWorkspaceDb(nw);
this.linkLocal(nw);
const workspaceInfo: WS = {
name: meta.name,
@@ -272,19 +274,31 @@ export class AffineProvider extends BaseProvider {
provider: 'local',
};
if (!meta.avatar) {
// set default avatar
const blob = await getDefaultHeadImgBlob(meta.name);
const blobStorage = await nw.blobs;
assert(blobStorage, 'No blob storage');
const blobId = await blobStorage.set(blob);
const avatar = await blobStorage.get(blobId);
if (avatar) {
nw.meta.setAvatar(avatar);
workspaceInfo.avatar = avatar;
}
}
this._workspaces.add(workspaceInfo);
return nw;
}
public override async publish(id: string, isPublish: boolean): Promise<void> {
await updateWorkspace({ id, public: isPublish });
await this._apis.updateWorkspace({ id, public: isPublish });
}
public override async getUserByEmail(
workspace_id: string,
email: string
): Promise<User | null> {
const user = await getUserByEmail({ workspace_id, email });
const user = await this._apis.getUserByEmail({ workspace_id, email });
return user
? {
id: user.id,

View File

@@ -22,5 +22,5 @@ export const getApis = (): Apis => {
};
export type { AccessTokenMessage } from './token';
export type { Member, Workspace } from './workspace';
export type { Member, Workspace, WorkspaceDetail } from './workspace';
export { WorkspaceType } from './workspace.js';

View File

@@ -9,19 +9,16 @@ const defaultLogger = () => {
export interface ProviderConstructorParams {
logger?: Logger;
workspaces: WorkspacesScope;
blobs: BlobStorage;
}
export class BaseProvider {
public readonly id: string = 'base';
protected _workspaces!: WorkspacesScope;
protected _logger!: Logger;
protected _blobs!: BlobStorage;
public constructor({ logger, workspaces, blobs }: ProviderConstructorParams) {
public constructor({ logger, workspaces }: ProviderConstructorParams) {
this._logger = (logger || defaultLogger) as Logger;
this._workspaces = workspaces;
this._blobs = blobs;
}
/**
@@ -69,19 +66,19 @@ export class BaseProvider {
return;
}
async getBlob(id: string): Promise<string | null> {
return await this._blobs.get(id);
}
// async getBlob(id: string): Promise<string | null> {
// return await this._blobs.get(id);
// }
async setBlob(blob: Blob): Promise<string> {
return await this._blobs.set(blob);
}
// async setBlob(blob: Blob): Promise<string> {
// return await this._blobs.set(blob);
// }
/**
* clear all local data in provider
*/
async clear() {
this._blobs.clear();
// this._blobs.clear();
}
/**
@@ -172,4 +169,13 @@ export class BaseProvider {
email;
return null;
}
/**
* link workspace to local caches
* @param workspace
* @returns
*/
public async linkLocal(workspace: Workspace): Promise<Workspace> {
return workspace;
}
}

View File

@@ -1 +1 @@
export * from './local';
export * from './local.js';

View File

@@ -1,14 +1,12 @@
import { describe, test, expect } from 'vitest';
import { Workspaces } from '../../workspaces';
import { LocalProvider } from './local';
import { test, expect } from '@playwright/test';
import { Workspaces } from '../../workspaces/index.js';
import { LocalProvider } from './local.js';
import 'fake-indexeddb/auto';
import { BlobStorage } from '@blocksuite/store';
describe('local provider', () => {
test.describe.serial('local provider', () => {
const workspaces = new Workspaces();
const provider = new LocalProvider({
workspaces: workspaces.createScope(),
blobs: new BlobStorage(),
});
const workspaceName = 'workspace-test';
@@ -29,7 +27,6 @@ describe('local provider', () => {
const workspaces1 = new Workspaces();
const provider1 = new LocalProvider({
workspaces: workspaces1.createScope(),
blobs: new BlobStorage(),
});
await provider1.loadWorkspaces();
expect(workspaces1.workspaces.length).toEqual(1);
@@ -46,7 +43,15 @@ describe('local provider', () => {
test('delete workspace', async () => {
expect(workspaces.workspaces.length).toEqual(1);
await provider.deleteWorkspace(workspaces.workspaces[0].id);
expect(workspaces.workspaces.length).toEqual(0);
/**
* FIXME
* If we don't wrap setTimeout,
* Running deleteWorkspace will crash the worker, and get error like next line:
* InvalidStateError: An operation was called on an object on which it is not allowed or at a time when it is not allowed. Also occurs if a request is made on a source object that has been deleted or removed. Use TransactionInactiveError or ReadOnlyError when possible, as they are more specific variations of InvalidStateError.
* */
setTimeout(async () => {
await provider.deleteWorkspace(workspaces.workspaces[0].id);
expect(workspaces.workspaces.length).toEqual(0);
}, 10);
});
});

View File

@@ -1,11 +1,11 @@
import { BaseProvider } from '../base';
import { BaseProvider } from '../base.js';
import type { ProviderConstructorParams } from '../base';
import { varStorage as storage } from 'lib0/storage';
import { Workspace as WS, WorkspaceMeta } from '../../types';
import { Workspace, uuidv4 } from '@blocksuite/store';
import { IndexedDBProvider } from '../indexeddb';
import { IndexedDBProvider } from './indexeddb.js';
import assert from 'assert';
import { getDefaultHeadImgBlob } from '../../utils';
import { getDefaultHeadImgBlob } from '../../utils/index.js';
const WORKSPACE_KEY = 'workspaces';
@@ -22,21 +22,21 @@ export class LocalProvider extends BaseProvider {
storage.setItem(WORKSPACE_KEY, JSON.stringify(workspaces));
}
private async _initWorkspaceDb(workspace: Workspace) {
public override async linkLocal(workspace: Workspace) {
assert(workspace.room);
let idb = this._idbMap.get(workspace.room);
idb?.destroy();
idb = new IndexedDBProvider(workspace.room, workspace.doc);
this._idbMap.set(workspace.room, idb);
this._logger('Local data loaded');
return idb;
return workspace;
}
public override async warpWorkspace(
workspace: Workspace
): Promise<Workspace> {
assert(workspace.room);
await this._initWorkspaceDb(workspace);
await this.linkLocal(workspace);
return workspace;
}
@@ -79,12 +79,6 @@ export class LocalProvider extends BaseProvider {
meta: WorkspaceMeta
): Promise<Workspace | undefined> {
assert(meta.name, 'Workspace name is required');
if (!meta.avatar) {
// set default avatar
const blob = await getDefaultHeadImgBlob(meta.name);
const blobId = await this.setBlob(blob);
meta.avatar = (await this.getBlob(blobId)) || '';
}
this._logger('Creating affine workspace');
const workspaceInfo: WS = {
@@ -99,9 +93,20 @@ export class LocalProvider extends BaseProvider {
};
const workspace = new Workspace({ room: workspaceInfo.id });
this._initWorkspaceDb(workspace);
this.linkLocal(workspace);
workspace.meta.setName(meta.name);
workspace.meta.setAvatar(meta.avatar);
if (!meta.avatar) {
// set default avatar
const blob = await getDefaultHeadImgBlob(meta.name);
const blobStorage = await workspace.blobs;
assert(blobStorage, 'No blob storage');
const blobId = await blobStorage.set(blob);
const avatar = await blobStorage.get(blobId);
if (avatar) {
workspace.meta.setAvatar(avatar);
workspaceInfo.avatar = avatar;
}
}
this._workspaces.add(workspaceInfo);
this._storeWorkspaces(this._workspaces.list());

View File

@@ -1,2 +1,2 @@
export { Workspaces } from './workspaces';
export { Workspaces } from './workspaces.js';
export type { WorkspacesScope, WorkspacesChangeEvent } from './workspaces';

View File

@@ -1,8 +1,8 @@
import { describe, test, expect } from 'vitest';
import { Workspaces } from './workspaces';
import { test, expect } from '@playwright/test';
import { Workspaces } from './workspaces.js';
import type { WorkspacesChangeEvent } from './workspaces';
describe('workspaces observable', () => {
test.describe.serial('workspaces observable', () => {
const workspaces = new Workspaces();
const scope = workspaces.createScope();

View File

@@ -1,101 +0,0 @@
import { Workspace as WS } from '../types';
import { Observable } from 'lib0/observable';
import { uuidv4 } from '@blocksuite/store';
import { DataCenter } from '../datacenter';
export class Workspaces extends Observable<string> {
private _workspaces: WS[];
private readonly _dc: DataCenter;
constructor(dc: DataCenter) {
super();
this._workspaces = [];
this._dc = dc;
}
public init() {
this._loadWorkspaces();
}
get workspaces() {
return this._workspaces;
}
/**
* emit when workspaces changed
* @param {(workspace: WS[]) => void} cb
*/
onWorkspacesChange(cb: (workspace: WS[]) => void) {
this.on('change', cb);
}
private async _loadWorkspaces() {
const providers = this._dc.providers;
let workspaces: WS[] = [];
providers.forEach(async p => {
const pWorkspaces = await p.loadWorkspaces();
workspaces = [...workspaces, ...pWorkspaces];
this._updateWorkspaces([...workspaces, ...pWorkspaces]);
});
}
/**
* focus load all workspaces list
*/
public async refreshWorkspaces() {
this._loadWorkspaces();
}
private _updateWorkspaces(workspaces: WS[]) {
this._workspaces = workspaces;
this.emit('change', this._workspaces);
}
private _getDefaultWorkspace(name: string): WS {
return {
name,
id: uuidv4(),
isPublish: false,
avatar: '',
owner: undefined,
isLocal: true,
memberCount: 1,
provider: 'local',
};
}
/** add a local workspaces */
public addLocalWorkspace(name: string) {
const workspace = this._getDefaultWorkspace(name);
this._updateWorkspaces([...this._workspaces, workspace]);
return workspace;
}
/** delete a workspaces by id */
public delete(id: string) {
const index = this._workspaces.findIndex(w => w.id === id);
if (index >= 0) {
this._workspaces.splice(index, 1);
this._updateWorkspaces(this._workspaces);
}
}
/** get workspace info by id */
public getWorkspace(id: string) {
return this._workspaces.find(w => w.id === id);
}
/** check if workspace exists */
public hasWorkspace(id: string) {
return this._workspaces.some(w => w.id === id);
}
public updateWorkspaceInfo(id: string, info: Partial<WS>) {
const index = this._workspaces.findIndex(w => w.id === id);
if (index >= 0) {
this._workspaces[index] = { ...this._workspaces[index], ...info };
this._updateWorkspaces(this._workspaces);
}
}
}