Merge remote-tracking branch 'refs/remotes/origin/feat/datacenter'

Conflicts:
	packages/app/src/components/workspace-modal/languageMenu.tsx
	packages/data-center/package.json
	pnpm-lock.yaml
This commit is contained in:
linonetwo
2023-01-30 11:00:31 +08:00
116 changed files with 2392 additions and 3945 deletions

View File

@@ -2,7 +2,6 @@ import { BaseProvider } from '../base.js';
import type {
ProviderConstructorParams,
CreateWorkspaceInfoParams,
WorkspaceMeta0,
} from '../base';
import type { User } from '../../types';
import { Workspace as BlocksuiteWorkspace } from '@blocksuite/store';
@@ -12,18 +11,13 @@ import { WebsocketProvider } from './sync.js';
// import { IndexedDBProvider } from '../local/indexeddb';
import { getApis, Workspace } from './apis/index.js';
import type { Apis, WorkspaceDetail, Callback } from './apis';
import { setDefaultAvatar } from '../utils.js';
import { MessageCode } from '../../message/index.js';
import { token } from './apis/token.js';
import { WebsocketClient } from './channel';
import {
loadWorkspaceUnit,
createWorkspaceUnit,
syncToCloud,
} from './utils.js';
import { loadWorkspaceUnit, createWorkspaceUnit } from './utils.js';
import { WorkspaceUnit } from '../../workspace-unit.js';
import { createBlocksuiteWorkspace, applyUpdate } from '../../utils/index.js';
import type { SyncMode } from '../../workspace-unit';
import { MessageCenter } from '../../message/index.js';
type ChannelMessage = {
ws_list: Workspace[];
@@ -42,9 +36,8 @@ const {
export class AffineProvider extends BaseProvider {
public id = 'affine';
private _workspacesCache: Map<string, BlocksuiteWorkspace> = new Map();
private _onTokenRefresh?: Callback = undefined;
private _wsMap: Map<string, WebsocketProvider> = new Map();
private _wsMap: Map<BlocksuiteWorkspace, WebsocketProvider> = new Map();
private _apis: Apis;
private _channel?: WebsocketClient;
// private _idbMap: Map<string, IndexedDBProvider> = new Map();
@@ -105,11 +98,23 @@ export class AffineProvider extends BaseProvider {
});
}
private _handlerAffineListMessage({ ws_details, metadata }: ChannelMessage) {
private async _handlerAffineListMessage({
ws_details,
metadata,
}: ChannelMessage) {
this._logger('receive server message');
Object.entries(ws_details).forEach(async ([id, detail]) => {
const addedWorkspaces: WorkspaceUnit[] = [];
const removeWorkspaceList = this._workspaces.list().map(w => w.id);
for (const [id, detail] of Object.entries(ws_details)) {
const { name, avatar } = metadata[id];
assert(name);
const index = removeWorkspaceList.indexOf(id);
if (index !== -1) {
removeWorkspaceList.splice(index, 1);
}
assert(
name,
'workspace name not found by id when receive server message'
);
const workspace = {
name: name,
avatar,
@@ -121,26 +126,31 @@ export class AffineProvider extends BaseProvider {
},
published: detail.public,
memberCount: detail.member_count,
provider: 'affine',
provider: this.id,
syncMode: 'core' as SyncMode,
};
if (this._workspaces.get(id)) {
// update workspaces
this._workspaces.update(id, workspace);
} else {
const workspaceUnit = await loadWorkspaceUnit(
{ id, ...workspace },
this._apis
);
this._workspaces.add(workspaceUnit);
addedWorkspaces.push(workspaceUnit);
}
});
}
// add workspaces
this._workspaces.add(addedWorkspaces);
// remove workspaces
this._workspaces.remove(removeWorkspaceList);
}
private _getWebsocketProvider(workspace: BlocksuiteWorkspace) {
const { doc, room } = workspace;
assert(room);
assert(doc);
let ws = this._wsMap.get(room);
let ws = this._wsMap.get(workspace);
if (!ws) {
const wsUrl = `${
window.location.protocol === 'https:' ? 'wss' : 'ws'
@@ -148,7 +158,7 @@ export class AffineProvider extends BaseProvider {
ws = new WebsocketProvider(wsUrl, room, doc, {
params: { token: this._apis.token.refresh },
});
this._wsMap.set(room, ws);
this._wsMap.set(workspace, ws);
}
return ws;
}
@@ -175,8 +185,8 @@ export class AffineProvider extends BaseProvider {
this.linkLocal(workspace);
const ws = this._getWebsocketProvider(workspace);
// close all websocket links
Array.from(this._wsMap.entries()).forEach(([id, ws]) => {
if (id !== room) {
Array.from(this._wsMap.entries()).forEach(([blocksuiteWorkspace, ws]) => {
if (blocksuiteWorkspace !== workspace) {
ws.disconnect();
}
});
@@ -207,7 +217,7 @@ export class AffineProvider extends BaseProvider {
owner: undefined,
published: w.public,
memberCount: 1,
provider: 'affine',
provider: this.id,
syncMode: 'core',
},
this._apis
@@ -232,14 +242,12 @@ export class AffineProvider extends BaseProvider {
this._connectChannel();
}
if (!user) {
this._messageCenter.send(MessageCode.loginError);
this._sendMessage(MessageCenter.messageCode.loginError);
}
}
public override async getUserInfo(): Promise<User | undefined> {
await this.init();
const user = this._apis.token.user;
await this.init;
return user
? {
id: user.id,
@@ -258,23 +266,29 @@ export class AffineProvider extends BaseProvider {
}
public override async clear(): Promise<void> {
for (const w of this._workspacesCache.values()) {
if (w.room) {
for (const w of this._workspaces.list()) {
if (w.id) {
try {
await this.deleteWorkspace(w.room);
this._workspaces.remove(w.room);
await this.deleteWorkspace(w.id);
this._workspaces.remove(w.id);
} catch (e) {
this._logger('has a problem of delete workspace ', e);
}
}
}
this._workspacesCache.clear();
this._workspaces.clear();
}
public override async closeWorkspace(id: string) {
// const idb = this._idbMap.get(id);
// idb?.destroy();
const ws = this._wsMap.get(id);
const workspaceUnit = this._workspaces.get(id);
const ws = workspaceUnit?.blocksuiteWorkspace
? this._wsMap.get(workspaceUnit?.blocksuiteWorkspace)
: null;
if (!ws) {
console.error('close workspace websocket which not exist.');
}
ws?.disconnect();
}
@@ -305,7 +319,22 @@ export class AffineProvider extends BaseProvider {
public override async createWorkspace(
meta: CreateWorkspaceInfoParams
): Promise<WorkspaceUnit | undefined> {
const { id } = await this._apis.createWorkspace(meta);
const workspaceUnitForUpload = await createWorkspaceUnit({
id: '',
name: meta.name,
avatar: undefined,
owner: await this.getUserInfo(),
published: false,
memberCount: 1,
provider: this.id,
syncMode: 'core',
});
const { id } = await this._apis.createWorkspace(
new Blob([
encodeStateAsUpdate(workspaceUnitForUpload.blocksuiteWorkspace!.doc)
.buffer,
])
);
const workspaceUnit = await createWorkspaceUnit({
id,
@@ -314,14 +343,10 @@ export class AffineProvider extends BaseProvider {
owner: await this.getUserInfo(),
published: false,
memberCount: 1,
provider: 'affine',
provider: this.id,
syncMode: 'core',
});
await syncToCloud(
workspaceUnit.blocksuiteWorkspace!,
this._apis.token.refresh
);
this._workspaces.add(workspaceUnit);
return workspaceUnit;
@@ -349,9 +374,11 @@ export class AffineProvider extends BaseProvider {
public override async extendWorkspace(
workspaceUnit: WorkspaceUnit
): Promise<WorkspaceUnit> {
const { id } = await this._apis.createWorkspace({
name: workspaceUnit.name,
});
const { id } = await this._apis.createWorkspace(
new Blob([
encodeStateAsUpdate(workspaceUnit.blocksuiteWorkspace!.doc).buffer,
])
);
const newWorkspaceUnit = new WorkspaceUnit({
id,
name: workspaceUnit.name,
@@ -359,7 +386,7 @@ export class AffineProvider extends BaseProvider {
owner: await this.getUserInfo(),
published: false,
memberCount: 1,
provider: 'affine',
provider: this.id,
syncMode: 'core',
});
@@ -370,8 +397,6 @@ export class AffineProvider extends BaseProvider {
encodeStateAsUpdate(workspaceUnit.blocksuiteWorkspace.doc)
);
await syncToCloud(blocksuiteWorkspace, this._apis.token.refresh);
newWorkspaceUnit.setBlocksuiteWorkspace(blocksuiteWorkspace);
this._workspaces.add(newWorkspaceUnit);
@@ -382,6 +407,7 @@ export class AffineProvider extends BaseProvider {
token.clear();
this._channel?.disconnect();
this._wsMap.forEach(ws => ws.disconnect());
this._workspaces.clear();
storage.removeItem('token');
}
@@ -389,7 +415,7 @@ export class AffineProvider extends BaseProvider {
return this._apis.getWorkspaceMembers({ id });
}
public override async acceptInvitation(invitingCode: string): Promise<void> {
await this._apis.acceptInviting({ invitingCode });
public override async acceptInvitation(invitingCode: string) {
return await this._apis.acceptInviting({ invitingCode });
}
}

View File

@@ -1,9 +1,15 @@
import kyOrigin from 'ky';
import ky from 'ky-universal';
import { MessageCenter } from '../../../message/index.js';
import { token } from './token.js';
export const bareClient = ky.extend({
prefixUrl: 'http://localhost:8080',
type KyInstance = typeof ky;
const messageCenter = MessageCenter.getInstance();
const _sendMessage = messageCenter.getMessageSender('affine');
export const bareClient: KyInstance = ky.extend({
prefixUrl: '/',
retry: 1,
hooks: {
// afterResponse: [
@@ -22,7 +28,7 @@ export const bareClient = ky.extend({
},
});
export const client = bareClient.extend({
export const client: KyInstance = bareClient.extend({
hooks: {
beforeRequest: [
async request => {
@@ -41,5 +47,15 @@ export const client = bareClient.extend({
request.headers.set('Authorization', token.token);
},
],
beforeError: [
error => {
const { response } = error;
if (response.status === 401) {
_sendMessage(MessageCenter.messageCode.noPermission);
}
return error;
},
],
},
});

View File

@@ -70,9 +70,6 @@ class Token {
}
async refreshToken(token?: string) {
if (!this._refreshToken && !token) {
throw new Error('No authorization token.');
}
if (!this._padding) {
this._padding = login({
type: 'Refresh',
@@ -194,6 +191,7 @@ export const getAuthorizer = () => {
return [signInWithGoogle, onAuthStateChanged] as const;
} catch (e) {
getLogger('getAuthorizer')(e);
console.error('getAuthorizer', e);
return [] as const;
}
};

View File

@@ -1,6 +1,20 @@
import { MessageCenter } from '../../../message/index.js';
import { bareClient, client } from './request.js';
import type { User } from './user';
const messageCenter = MessageCenter.getInstance();
const sendMessage = messageCenter.getMessageSender('affine');
const { messageCode } = MessageCenter;
class RequestError extends Error {
constructor(message: string, cause: unknown | null = null) {
super(message);
this.name = 'RequestError';
this.cause = cause;
}
}
export interface GetWorkspaceDetailParams {
id: string;
}
@@ -26,13 +40,18 @@ export interface Workspace {
}
export async function getWorkspaces(): Promise<Workspace[]> {
return client
.get('api/workspace', {
headers: {
'Cache-Control': 'no-cache',
},
})
.json();
try {
return client
.get('api/workspace', {
headers: {
'Cache-Control': 'no-cache',
},
})
.json();
} catch (error) {
sendMessage(messageCode.loadListFailed);
throw new RequestError('load list failed', error);
}
}
export interface WorkspaceDetail extends Workspace {
@@ -43,7 +62,13 @@ export interface WorkspaceDetail extends Workspace {
export async function getWorkspaceDetail(
params: GetWorkspaceDetailParams
): Promise<WorkspaceDetail | null> {
return client.get(`api/workspace/${params.id}`).json();
try {
const response = client.get(`api/workspace/${params.id}`);
return response.json();
} catch (error) {
sendMessage(messageCode.getDetailFailed);
throw new RequestError('get detail failed', error);
}
}
export interface Permission {
@@ -74,7 +99,12 @@ export interface GetWorkspaceMembersParams {
export async function getWorkspaceMembers(
params: GetWorkspaceDetailParams
): Promise<Member[]> {
return client.get(`api/workspace/${params.id}/permission`).json();
try {
return client.get(`api/workspace/${params.id}/permission`).json();
} catch (error) {
sendMessage(messageCode.getMembersFailed);
throw new RequestError('get members failed', error);
}
}
export interface CreateWorkspaceParams {
@@ -82,12 +112,14 @@ export interface CreateWorkspaceParams {
}
export async function createWorkspace(
params: CreateWorkspaceParams
encodedYDoc: Blob
): Promise<{ id: string }> {
// FIXME: avatar should be optional
return client
.post('api/workspace', { json: { ...params, avatar: '123' } })
.json();
try {
return client.post('api/workspace', { body: encodedYDoc }).json();
} catch (error) {
sendMessage(messageCode.createWorkspaceFailed);
throw new RequestError('create workspace failed', error);
}
}
export interface UpdateWorkspaceParams {
@@ -98,13 +130,18 @@ export interface UpdateWorkspaceParams {
export async function updateWorkspace(
params: UpdateWorkspaceParams
): Promise<{ public: boolean | null }> {
return client
.post(`api/workspace/${params.id}`, {
json: {
public: params.public,
},
})
.json();
try {
return client
.post(`api/workspace/${params.id}`, {
json: {
public: params.public,
},
})
.json();
} catch (error) {
sendMessage(messageCode.updateWorkspaceFailed);
throw new RequestError('update workspace failed', error);
}
}
export interface DeleteWorkspaceParams {
@@ -114,7 +151,10 @@ export interface DeleteWorkspaceParams {
export async function deleteWorkspace(
params: DeleteWorkspaceParams
): Promise<void> {
await client.delete(`api/workspace/${params.id}`);
await client.delete(`api/workspace/${params.id}`).catch(error => {
sendMessage(messageCode.deleteWorkspaceFailed);
throw new RequestError('delete workspace failed', error);
});
}
export interface InviteMemberParams {
@@ -126,13 +166,18 @@ export interface InviteMemberParams {
* Notice: Only support normal(contrast to private) workspace.
*/
export async function inviteMember(params: InviteMemberParams): Promise<void> {
return client
.post(`api/workspace/${params.id}/permission`, {
json: {
email: params.email,
},
})
.json();
try {
return client
.post(`api/workspace/${params.id}/permission`, {
json: {
email: params.email,
},
})
.json();
} catch (error) {
sendMessage(messageCode.inviteMemberFailed);
throw new RequestError('invite member failed', error);
}
}
export interface RemoveMemberParams {
@@ -140,7 +185,10 @@ export interface RemoveMemberParams {
}
export async function removeMember(params: RemoveMemberParams): Promise<void> {
await client.delete(`api/permission/${params.permissionId}`);
await client.delete(`api/permission/${params.permissionId}`).catch(error => {
sendMessage(messageCode.removeMemberFailed);
throw new RequestError('remove member failed', error);
});
}
export interface AcceptInvitingParams {
@@ -149,8 +197,13 @@ export interface AcceptInvitingParams {
export async function acceptInviting(
params: AcceptInvitingParams
): Promise<void> {
await bareClient.post(`api/invitation/${params.invitingCode}`);
): Promise<Permission> {
try {
return bareClient.post(`api/invitation/${params.invitingCode}`).json();
} catch (error) {
sendMessage(messageCode.acceptInvitingFailed);
throw new RequestError('accept inviting failed', error);
}
}
export async function uploadBlob(params: { blob: Blob }): Promise<string> {
@@ -160,7 +213,12 @@ export async function uploadBlob(params: { blob: Blob }): Promise<string> {
export async function getBlob(params: {
blobId: string;
}): Promise<ArrayBuffer> {
return client.get(`api/blob/${params.blobId}`).arrayBuffer();
try {
return client.get(`api/blob/${params.blobId}`).arrayBuffer();
} catch (error) {
sendMessage(messageCode.getBlobFailed);
throw new RequestError('get blob failed', error);
}
}
export interface LeaveWorkspaceParams {
@@ -168,15 +226,26 @@ export interface LeaveWorkspaceParams {
}
export async function leaveWorkspace({ id }: LeaveWorkspaceParams) {
await client.delete(`api/workspace/${id}/permission`).json();
await client
.delete(`api/workspace/${id}/permission`)
.json()
.catch(error => {
sendMessage(messageCode.leaveWorkspaceFailed);
throw new RequestError('leave workspace failed', error);
});
}
export async function downloadWorkspace(
workspaceId: string,
published = false
): Promise<ArrayBuffer> {
if (published) {
return bareClient.get(`api/public/doc/${workspaceId}`).arrayBuffer();
try {
if (published) {
return bareClient.get(`api/public/doc/${workspaceId}`).arrayBuffer();
}
return client.get(`api/workspace/${workspaceId}/doc`).arrayBuffer();
} catch (error) {
sendMessage(messageCode.downloadWorkspaceFailed);
throw new RequestError('download workspace failed', error);
}
return client.get(`api/workspace/${workspaceId}/doc`).arrayBuffer();
}

View File

@@ -3,13 +3,13 @@ import { Logger } from 'src/types';
import { token } from './apis/token';
import * as url from 'lib0/url';
const RECONNECT_INTERVAL_TIME = 5000;
const RECONNECT_INTERVAL_TIME = 500;
const MAX_RECONNECT_TIMES = 50;
export class WebsocketClient extends websocket.WebsocketClient {
public shouldReconnect = false;
private _reconnectInterval: number | null = null;
private _logger: Logger;
private _retryTimes = 0;
constructor(
serverUrl: string,
logger: Logger,
@@ -34,30 +34,28 @@ export class WebsocketClient extends websocket.WebsocketClient {
this.on('connect', () => {
this._logger('Affine channel connected');
this.shouldReconnect = true;
if (this._reconnectInterval) {
window.clearInterval(this._reconnectInterval);
}
this._retryTimes = 0;
});
this.on('disconnect', ({ error }: { error: Error }) => {
if (error) {
let times = 0;
// Try reconnect if connect error has occurred
this._reconnectInterval = window.setInterval(() => {
if (this.shouldReconnect && token.isLogin && !this.connected) {
try {
this.connect();
this._logger(`try reconnect channel ${++times} times`);
if (times > MAX_RECONNECT_TIMES) {
if (this.shouldReconnect && token.isLogin && !this.connected) {
try {
setTimeout(() => {
if (this._retryTimes <= MAX_RECONNECT_TIMES) {
this.connect();
this._logger(
`try reconnect channel ${++this._retryTimes} times`
);
} else {
this._logger('reconnect failed, max reconnect times reached');
this._reconnectInterval &&
window.clearInterval(this._reconnectInterval);
}
} catch (e) {
this._logger('reconnect failed', e);
}
}, RECONNECT_INTERVAL_TIME);
} catch (e) {
this._logger('reconnect failed', e);
}
}, RECONNECT_INTERVAL_TIME);
}
}
});
}

View File

@@ -1,10 +1,7 @@
import assert from 'assert';
import { Workspace as BlocksuiteWorkspace } from '@blocksuite/store';
import { WorkspaceUnit } from '../../workspace-unit.js';
import type { WorkspaceUnitCtorParams } from '../../workspace-unit';
import { createBlocksuiteWorkspace } from '../../utils/index.js';
import type { Apis } from './apis';
import { WebsocketProvider } from './sync.js';
import { setDefaultAvatar } from '../utils.js';
import { applyUpdate } from '../../utils/index.js';
@@ -42,37 +39,6 @@ export const loadWorkspaceUnit = async (
return workspaceUnit;
};
export const syncToCloud = async (
blocksuiteWorkspace: BlocksuiteWorkspace,
refreshToken: string
) => {
const workspaceId = blocksuiteWorkspace.room;
assert(workspaceId, 'Blocksuite workspace without room(workspaceId).');
const wsUrl = `${window.location.protocol === 'https:' ? 'wss' : 'ws'}://${
window.location.host
}/api/sync/`;
const ws = new WebsocketProvider(
wsUrl,
workspaceId,
blocksuiteWorkspace.doc,
{
params: { token: refreshToken },
}
);
await new Promise((resolve, reject) => {
ws.once('synced', () => {
// FIXME: we don't when send local data to cloud successfully, so hack to wait 1s.
// Server will support this by add a new api.
setTimeout(resolve, 1000);
});
ws.once('lost-connection', () => reject());
ws.once('connection-error', () => reject());
});
};
export const createWorkspaceUnit = async (params: WorkspaceUnitCtorParams) => {
const workspaceUnit = new WorkspaceUnit(params);

View File

@@ -4,6 +4,7 @@ import { Logger, User } from '../types';
import type { WorkspaceUnitCollectionScope } from '../workspace-unit-collection';
import type { WorkspaceUnitCtorParams, WorkspaceUnit } from '../workspace-unit';
import { Member } from './affine/apis';
import { Permission } from './affine/apis/workspace.js';
const defaultLogger = () => {
return;
@@ -22,10 +23,15 @@ export type UpdateWorkspaceMetaParams = Partial<
>;
export class BaseProvider {
/** provider id */
public readonly id: string = 'base';
/** workspace unit collection */
protected _workspaces!: WorkspaceUnitCollectionScope;
protected _logger!: Logger;
protected _messageCenter!: MessageCenter;
/** send message with message center */
protected _sendMessage!: ReturnType<
InstanceType<typeof MessageCenter>['getMessageSender']
>;
public constructor({
logger,
@@ -34,7 +40,7 @@ export class BaseProvider {
}: ProviderConstructorParams) {
this._logger = (logger || defaultLogger) as Logger;
this._workspaces = workspaces;
this._messageCenter = messageCenter;
this._sendMessage = messageCenter.getMessageSender(this.id);
}
/**
@@ -228,8 +234,10 @@ export class BaseProvider {
* @param {string} inviteCode
* @returns
*/
public async acceptInvitation(inviteCode: string): Promise<void> {
public async acceptInvitation(
inviteCode: string
): Promise<Permission | null> {
inviteCode;
return;
return null;
}
}

View File

@@ -20,6 +20,7 @@ export const writeUpdatesToLocal = async (
if (updatesStore) {
await idb.addAutoKey(updatesStore, currState);
}
db.close();
};
export const applyLocalUpdates = async (

View File

@@ -8,6 +8,7 @@ import type {
import { varStorage as storage } from 'lib0/storage';
import { Workspace as BlocksuiteWorkspace, uuidv4 } from '@blocksuite/store';
import { IndexedDBProvider } from './indexeddb/indexeddb.js';
import { applyLocalUpdates } from './indexeddb/utils.js';
import assert from 'assert';
import { loadWorkspaceUnit, createWorkspaceUnit } from './utils.js';
import type { WorkspaceUnit } from '../../workspace-unit';
@@ -48,6 +49,7 @@ export class LocalProvider extends BaseProvider {
workspace: BlocksuiteWorkspace
): Promise<BlocksuiteWorkspace> {
assert(workspace.room);
await applyLocalUpdates(workspace);
await this.linkLocal(workspace);
return workspace;
}
@@ -101,7 +103,7 @@ export class LocalProvider extends BaseProvider {
owner: undefined,
syncMode: 'core',
memberCount: 1,
provider: 'local',
provider: this.id,
});
this._workspaces.add(workspaceUnit);
this._storeWorkspaces(this._workspaces.list());