Merge remote-tracking branch 'refs/remotes/origin/feat/cloud-sync-saika'

Conflicts:
	packages/data-center/package.json
	packages/data-center/src/datacenter.ts
	packages/data-center/src/index.ts
	pnpm-lock.yaml
This commit is contained in:
linonetwo
2023-01-11 18:30:57 +08:00
27 changed files with 857 additions and 3855 deletions

View File

@@ -1,9 +1,12 @@
import { BaseProvider } from '../base.js';
import type { ProviderConstructorParams } from '../base';
import type { User, WorkspaceInfo, WorkspaceMeta } from '../../types';
import type {
ProviderConstructorParams,
CreateWorkspaceInfoParams,
WorkspaceMeta0,
} from '../base';
import type { User } from '../../types';
import { Workspace as BlocksuiteWorkspace } from '@blocksuite/store';
import { BlockSchema } from '@blocksuite/blocks/models';
import { applyUpdate } from 'yjs';
import { storage } from './storage.js';
import assert from 'assert';
import { WebsocketProvider } from './sync.js';
@@ -12,28 +15,39 @@ import { getApis } from './apis/index.js';
import type { Apis, WorkspaceDetail, Callback } from './apis';
import { setDefaultAvatar } from '../utils.js';
import { MessageCode } from '../../message';
import { token } from './apis/token.js';
import { WebsocketClient } from './channel';
export interface AffineProviderConstructorParams
extends ProviderConstructorParams {
apis?: Apis;
}
const {
Y: { applyUpdate, encodeStateAsUpdate },
} = BlocksuiteWorkspace;
export class AffineProvider extends BaseProvider {
public id = 'affine';
private _workspacesCache: Map<string, BlocksuiteWorkspace> = new Map();
private _onTokenRefresh?: Callback = undefined;
private _wsMap: Map<string, WebsocketProvider> = new Map();
private _apis: Apis;
private _channel: WebsocketClient;
// private _idbMap: Map<string, IndexedDBProvider> = new Map();
constructor({ apis, ...params }: AffineProviderConstructorParams) {
super(params);
this._apis = apis || getApis();
this.init().then(() => {
if (this._apis.token.isLogin) {
this.loadWorkspaces();
}
});
this._channel = new WebsocketClient(
`${window.location.protocol === 'https:' ? 'wss' : 'ws'}://${
window.location.host
}/global/sync/`,
this._logger
);
if (token.isLogin) {
this._connectChannel();
}
}
override async init() {
@@ -64,6 +78,32 @@ export class AffineProvider extends BaseProvider {
}
}
private _connectChannel() {
this._channel.connect();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
this._channel.on('message', (message: any) => {
console.log('message', message);
});
}
private _getWebsocketProvider(workspace: BlocksuiteWorkspace) {
const { doc, room } = workspace;
assert(room);
assert(doc);
let ws = this._wsMap.get(room);
if (!ws) {
const wsUrl = `${
window.location.protocol === 'https:' ? 'wss' : 'ws'
}://${window.location.host}/api/sync/`;
ws = new WebsocketProvider(wsUrl, room, doc, {
params: { token: this._apis.token.refresh },
});
this._wsMap.set(room, ws);
}
return ws;
}
private async _applyCloudUpdates(blocksuiteWorkspace: BlocksuiteWorkspace) {
const { doc, room: workspaceId } = blocksuiteWorkspace;
assert(workspaceId, 'Blocksuite Workspace without room(workspaceId).');
@@ -78,20 +118,10 @@ export class AffineProvider extends BaseProvider {
override async warpWorkspace(workspace: BlocksuiteWorkspace) {
await this._applyCloudUpdates(workspace);
const { doc, room } = workspace;
const { room } = workspace;
assert(room);
this.linkLocal(workspace);
let ws = this._wsMap.get(room);
if (!ws) {
const wsUrl = `${
window.location.protocol === 'https:' ? 'wss' : 'ws'
}://${window.location.host}/api/sync/`;
ws = new WebsocketProvider(wsUrl, room, doc, {
params: { token: this._apis.token.refresh },
});
this._wsMap.set(room, ws);
}
const ws = this._getWebsocketProvider(workspace);
// close all websocket links
Array.from(this._wsMap.entries()).forEach(([id, ws]) => {
if (id !== room) {
@@ -115,12 +145,13 @@ export class AffineProvider extends BaseProvider {
return [];
}
const workspacesList = await this._apis.getWorkspaces();
const workspaces: WorkspaceInfo[] = workspacesList.map(w => {
const workspaces: WorkspaceMeta0[] = workspacesList.map(w => {
return {
...w,
memberCount: 0,
name: '',
provider: 'affine',
syncMode: 'core',
};
});
const workspaceInstances = workspaces.map(({ id }) => {
@@ -198,6 +229,9 @@ export class AffineProvider extends BaseProvider {
}
}
const user = await this._apis.signInWithGoogle?.();
if (!this._channel.connected) {
this._connectChannel();
}
if (!user) {
this._messageCenter.send(MessageCode.loginError);
}
@@ -205,6 +239,7 @@ export class AffineProvider extends BaseProvider {
public override async getUserInfo(): Promise<User | undefined> {
const user = this._apis.token.user;
await this.init;
return user
? {
id: user.id,
@@ -268,19 +303,17 @@ export class AffineProvider extends BaseProvider {
}
public override async createWorkspaceInfo(
meta: WorkspaceMeta
): Promise<WorkspaceInfo> {
const { id } = await this._apis.createWorkspace(
meta as Required<WorkspaceMeta>
);
meta: CreateWorkspaceInfoParams
): Promise<WorkspaceMeta0> {
const { id } = await this._apis.createWorkspace(meta);
const workspaceInfo: WorkspaceInfo = {
const workspaceInfo: WorkspaceMeta0 = {
name: meta.name,
id: id,
isPublish: false,
published: false,
avatar: '',
owner: await this.getUserInfo(),
isLocal: true,
syncMode: 'core',
memberCount: 1,
provider: 'affine',
};
@@ -289,7 +322,7 @@ export class AffineProvider extends BaseProvider {
public override async createWorkspace(
blocksuiteWorkspace: BlocksuiteWorkspace,
meta: WorkspaceMeta
meta: WorkspaceMeta0
): Promise<BlocksuiteWorkspace | undefined> {
const workspaceId = blocksuiteWorkspace.room;
assert(workspaceId, 'Blocksuite Workspace without room(workspaceId).');
@@ -298,13 +331,13 @@ export class AffineProvider extends BaseProvider {
this._applyCloudUpdates(blocksuiteWorkspace);
this.linkLocal(blocksuiteWorkspace);
const workspaceInfo: WorkspaceInfo = {
const workspaceInfo: WorkspaceMeta0 = {
name: meta.name,
id: workspaceId,
isPublish: false,
published: false,
avatar: '',
owner: undefined,
isLocal: true,
syncMode: 'core',
memberCount: 1,
provider: 'affine',
};
@@ -335,4 +368,33 @@ export class AffineProvider extends BaseProvider {
}
: null;
}
public override async assign(
to: BlocksuiteWorkspace,
from: BlocksuiteWorkspace
): Promise<BlocksuiteWorkspace> {
assert(to.room, 'Blocksuite Workspace without room(workspaceId).');
const ws = this._getWebsocketProvider(to);
applyUpdate(to.doc, encodeStateAsUpdate(from.doc));
// TODO: upload blobs and make sure doc is synced
await new Promise<void>((resolve, reject) => {
ws.once('synced', () => {
setTimeout(() => resolve(), 1000);
});
ws.once('lost-connection', () => reject());
ws.once('connection-error', () => reject());
});
return to;
}
public override async logout(): Promise<void> {
token.clear();
this._channel.disconnect();
this._wsMap.forEach(ws => ws.disconnect());
storage.removeItem('token');
}
public override async getWorkspaceMembers(id: string) {
return this._apis.getWorkspaceMembers({ id });
}
}

View File

@@ -140,6 +140,10 @@ class Token {
this.callbacks.splice(index, 1);
}
}
clear() {
this._setToken();
}
}
export const token = new Token();

View File

@@ -79,7 +79,6 @@ export async function getWorkspaceMembers(
export interface CreateWorkspaceParams {
name: string;
avatar: string;
}
export async function createWorkspace(

View File

@@ -0,0 +1,53 @@
import websocket from 'lib0/websocket';
import { Logger } from 'src/types';
import { token } from './apis/token';
const RECONNECT_INTERVAL_TIME = 5000;
const MAX_RECONNECT_TIMES = 50;
export class WebsocketClient extends websocket.WebsocketClient {
public shouldReconnect = false;
private _reconnectInterval: number | null = null;
private _logger: Logger;
constructor(
url: string,
logger: Logger,
options?: { binaryType: 'arraybuffer' | 'blob' | null }
) {
super(url, options);
this._logger = logger;
this._setupChannel();
}
private _setupChannel() {
this.on('connect', () => {
this._logger('Affine channel connected');
this.shouldReconnect = true;
if (this._reconnectInterval) {
window.clearInterval(this._reconnectInterval);
}
});
this.on('disconnect', ({ error }: { error: Error }) => {
if (error) {
let times = 0;
// Try reconnect if connect error has occurred
this._reconnectInterval = window.setInterval(() => {
if (this.shouldReconnect && token.isLogin && !this.connected) {
try {
this.connect();
this._logger(`try reconnect channel ${++times} times`);
if (times > MAX_RECONNECT_TIMES) {
this._logger('reconnect failed, max reconnect times reached');
this._reconnectInterval &&
window.clearInterval(this._reconnectInterval);
}
} catch (e) {
this._logger('reconnect failed', e);
}
}
}, RECONNECT_INTERVAL_TIME);
}
});
}
}

View File

@@ -1,7 +1,9 @@
import { Workspace as BlocksuiteWorkspace, uuidv4 } from '@blocksuite/store';
import { MessageCenter } from '../message';
import { Logger, User, WorkspaceInfo, WorkspaceMeta } from '../types';
import type { WorkspaceMetaCollectionScope } from '../workspace-meta-collection';
import { Logger, User } from '../types';
import type { WorkspaceUnitCollectionScope } from '../workspace-unit-collection';
import type { WorkspaceUnitCtorParams } from '../workspace-unit';
import { Member } from './affine/apis';
const defaultLogger = () => {
return;
@@ -9,13 +11,19 @@ const defaultLogger = () => {
export interface ProviderConstructorParams {
logger?: Logger;
workspaces: WorkspaceMetaCollectionScope;
workspaces: WorkspaceUnitCollectionScope;
messageCenter: MessageCenter;
}
export type WorkspaceMeta0 = WorkspaceUnitCtorParams;
export type CreateWorkspaceInfoParams = Pick<WorkspaceUnitCtorParams, 'name'>;
export type UpdateWorkspaceMetaParams = Partial<
Pick<WorkspaceUnitCtorParams, 'name' | 'avatar'>
>;
export class BaseProvider {
public readonly id: string = 'base';
protected _workspaces!: WorkspaceMetaCollectionScope;
protected _workspaces!: WorkspaceUnitCollectionScope;
protected _logger!: Logger;
protected _messageCenter!: MessageCenter;
@@ -37,8 +45,8 @@ export class BaseProvider {
}
public async createWorkspaceInfo(
meta: WorkspaceMeta
): Promise<WorkspaceInfo> {
params: CreateWorkspaceInfoParams
): Promise<WorkspaceMeta0> {
throw new Error(`provider: ${this.id} createWorkspaceInfo Not implemented`);
}
@@ -70,7 +78,7 @@ export class BaseProvider {
/**
* load workspaces
**/
public async loadWorkspaces(): Promise<WorkspaceInfo[]> {
public async loadWorkspaces(): Promise<WorkspaceMeta0[]> {
throw new Error(`provider: ${this.id} loadWorkSpace Not implemented`);
}
@@ -157,10 +165,10 @@ export class BaseProvider {
*/
public async updateWorkspaceMeta(
id: string,
meta: Partial<WorkspaceMeta>
params: UpdateWorkspaceMetaParams
): Promise<void> {
id;
meta;
params;
return;
}
@@ -170,7 +178,7 @@ export class BaseProvider {
*/
public async createWorkspace(
blocksuiteWorkspace: BlocksuiteWorkspace,
meta: WorkspaceMeta
meta: WorkspaceMeta0
): Promise<BlocksuiteWorkspace | undefined> {
return blocksuiteWorkspace;
}
@@ -196,4 +204,24 @@ export class BaseProvider {
): Promise<BlocksuiteWorkspace> {
return workspace;
}
/**
* merge one workspaces to another
* @param workspace
* @returns
*/
public async assign(to: BlocksuiteWorkspace, from: BlocksuiteWorkspace) {
from;
return to;
}
/**
* get workspace members
* @param {string} workspaceId
* @returns
*/
public getWorkspaceMembers(workspaceId: string): Promise<Member[]> {
workspaceId;
return Promise.resolve([]);
}
}

View File

@@ -1,14 +1,19 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import * as idb from 'lib0/indexeddb.js';
import { Observable } from 'lib0/observable.js';
import type { Doc } from 'yjs';
import { applyUpdate, encodeStateAsUpdate, transact } from 'yjs';
import { Workspace as BlocksuiteWorkspace } from '@blocksuite/store';
const customStoreName = 'custom';
const updatesStoreName = 'updates';
const PREFERRED_TRIM_SIZE = 500;
const {
Y: { applyUpdate, transact, encodeStateAsUpdate },
} = BlocksuiteWorkspace;
type Doc = Parameters<typeof transact>[0];
const fetchUpdates = async (provider: IndexedDBProvider) => {
const [updatesStore] = idb.transact(provider.db as IDBDatabase, [
updatesStoreName,

View File

@@ -0,0 +1,20 @@
import assert from 'assert';
import * as idb from 'lib0/indexeddb.js';
import { Workspace as BlocksuiteWorkspace } from '@blocksuite/store';
const { encodeStateAsUpdate } = BlocksuiteWorkspace.Y;
export const initStore = async (blocksuiteWorkspace: BlocksuiteWorkspace) => {
const workspaceId = blocksuiteWorkspace.room;
assert(workspaceId);
await idb.deleteDB(workspaceId);
const db = await idb.openDB(workspaceId, db =>
idb.createStores(db, [['updates', { autoIncrement: true }], ['custom']])
);
const currState = encodeStateAsUpdate(blocksuiteWorkspace.doc);
const [updatesStore] = idb.transact(db, ['updates']); // , 'readonly')
if (updatesStore) {
await idb.addAutoKey(updatesStore, currState);
}
};

View File

@@ -1,13 +1,15 @@
import { test, expect } from '@playwright/test';
import { WorkspaceMetaCollection } from '../../workspace-meta-collection.js';
import { WorkspaceUnitCollection } from '../../workspace-unit-collection.js';
import { LocalProvider } from './local.js';
import { createBlocksuiteWorkspace } from '../../utils/index.js';
import { MessageCenter } from '../../message/index.js';
import 'fake-indexeddb/auto';
test.describe.serial('local provider', () => {
const workspaceMetaCollection = new WorkspaceMetaCollection();
const workspaceMetaCollection = new WorkspaceUnitCollection();
const provider = new LocalProvider({
workspaces: workspaceMetaCollection.createScope(),
messageCenter: new MessageCenter(),
});
const workspaceName = 'workspace-test';
@@ -16,23 +18,20 @@ test.describe.serial('local provider', () => {
test('create workspace', async () => {
const workspaceInfo = await provider.createWorkspaceInfo({
name: workspaceName,
avatar: 'avatar-url-test',
});
workspaceId = workspaceInfo.id;
const blocksuiteWorkspace = createBlocksuiteWorkspace(workspaceId);
await provider.createWorkspace(blocksuiteWorkspace, {
name: workspaceName,
avatar: 'avatar-url-test',
});
await provider.createWorkspace(blocksuiteWorkspace, workspaceInfo);
expect(workspaceMetaCollection.workspaces.length).toEqual(1);
expect(workspaceMetaCollection.workspaces[0].name).toEqual(workspaceName);
});
test('workspace list cache', async () => {
const workspacesMetaCollection1 = new WorkspaceMetaCollection();
const workspacesMetaCollection1 = new WorkspaceUnitCollection();
const provider1 = new LocalProvider({
workspaces: workspacesMetaCollection1.createScope(),
messageCenter: new MessageCenter(),
});
await provider1.loadWorkspaces();
expect(workspacesMetaCollection1.workspaces.length).toEqual(1);

View File

@@ -1,9 +1,14 @@
import { BaseProvider } from '../base.js';
import type { ProviderConstructorParams } from '../base';
import type {
ProviderConstructorParams,
WorkspaceMeta0,
UpdateWorkspaceMetaParams,
CreateWorkspaceInfoParams,
} from '../base';
import { varStorage as storage } from 'lib0/storage';
import { WorkspaceInfo, WorkspaceMeta } from '../../types';
import { Workspace as BlocksuiteWorkspace, uuidv4 } from '@blocksuite/store';
import { IndexedDBProvider } from './indexeddb.js';
import { IndexedDBProvider } from './indexeddb/indexeddb.js';
import { initStore } from './indexeddb/utils.js';
import assert from 'assert';
import { setDefaultAvatar } from '../utils.js';
@@ -15,11 +20,26 @@ export class LocalProvider extends BaseProvider {
constructor(params: ProviderConstructorParams) {
super(params);
this.loadWorkspaces();
}
private _storeWorkspaces(workspaces: WorkspaceInfo[]) {
storage.setItem(WORKSPACE_KEY, JSON.stringify(workspaces));
private _storeWorkspaces(workspaces: WorkspaceMeta0[]) {
storage.setItem(
WORKSPACE_KEY,
JSON.stringify(
workspaces.map(w => {
return {
id: w.id,
name: w.name,
avatar: w.avatar,
owner: w.owner,
published: w.published,
memberCount: w.memberCount,
provider: w.provider,
syncMode: w.syncMode,
};
})
)
);
}
public override async linkLocal(workspace: BlocksuiteWorkspace) {
@@ -41,12 +61,12 @@ export class LocalProvider extends BaseProvider {
return workspace;
}
override loadWorkspaces(): Promise<WorkspaceInfo[]> {
override loadWorkspaces(): Promise<WorkspaceMeta0[]> {
const workspaceStr = storage.getItem(WORKSPACE_KEY);
let workspaces: WorkspaceInfo[] = [];
let workspaces: WorkspaceMeta0[] = [];
if (workspaceStr) {
try {
workspaces = JSON.parse(workspaceStr) as WorkspaceInfo[];
workspaces = JSON.parse(workspaceStr) as WorkspaceMeta0[];
workspaces.forEach(workspace => {
this._workspaces.add(workspace);
});
@@ -70,22 +90,22 @@ export class LocalProvider extends BaseProvider {
public override async updateWorkspaceMeta(
id: string,
meta: Partial<WorkspaceMeta>
meta: UpdateWorkspaceMetaParams
) {
this._workspaces.update(id, meta);
this._storeWorkspaces(this._workspaces.list());
}
public override async createWorkspaceInfo(
meta: WorkspaceMeta
): Promise<WorkspaceInfo> {
const workspaceInfo: WorkspaceInfo = {
meta: CreateWorkspaceInfoParams
): Promise<WorkspaceMeta0> {
const workspaceInfo: WorkspaceMeta0 = {
name: meta.name,
id: uuidv4(),
isPublish: false,
published: false,
avatar: '',
owner: undefined,
isLocal: true,
syncMode: 'core',
memberCount: 1,
provider: 'local',
};
@@ -94,25 +114,16 @@ export class LocalProvider extends BaseProvider {
public override async createWorkspace(
blocksuiteWorkspace: BlocksuiteWorkspace,
meta: WorkspaceMeta
meta: WorkspaceMeta0
): Promise<BlocksuiteWorkspace | undefined> {
const workspaceId = blocksuiteWorkspace.room;
assert(workspaceId, 'Blocksuite Workspace without room(workspaceId).');
assert(meta.name, 'Workspace name is required');
this._logger('Creating affine workspace');
const workspaceInfo: WorkspaceInfo = {
name: meta.name,
id: workspaceId,
isPublish: false,
avatar: '',
owner: undefined,
isLocal: true,
memberCount: 1,
provider: 'local',
const workspaceInfo: WorkspaceMeta0 = {
...meta,
};
this.linkLocal(blocksuiteWorkspace);
blocksuiteWorkspace.meta.setName(meta.name);
if (!meta.avatar) {
@@ -120,6 +131,8 @@ export class LocalProvider extends BaseProvider {
workspaceInfo.avatar = blocksuiteWorkspace.meta.avatar;
}
await initStore(blocksuiteWorkspace);
this._workspaces.add(workspaceInfo);
this._storeWorkspaces(this._workspaces.list());

View File

@@ -5,6 +5,9 @@ import { getDefaultHeadImgBlob } from '../utils/index.js';
export const setDefaultAvatar = async (
blocksuiteWorkspace: BlocksuiteWorkspace
) => {
if (typeof document === 'undefined') {
return;
}
const blob = await getDefaultHeadImgBlob(blocksuiteWorkspace.meta.name);
const blobStorage = await blocksuiteWorkspace.blobs;
assert(blobStorage, 'No blob storage');