feat: add affine global channel (#1762)

This commit is contained in:
Himself65
2023-03-30 18:21:26 -05:00
committed by GitHub
parent 3fa7d17dca
commit bb1224f9ee
38 changed files with 358 additions and 162 deletions

View File

@@ -2,6 +2,7 @@
"name": "@affine/workspace",
"private": true,
"exports": {
"./atom": "./src/atom.ts",
"./utils": "./src/utils.ts",
"./type": "./src/type.ts",
"./affine/*": "./src/affine/*.ts",

View File

@@ -94,17 +94,30 @@ export enum PermissionType {
Owner = 99,
}
export interface Workspace {
id: string;
type: WorkspaceType;
public: boolean;
permission: PermissionType;
}
export const userSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string(),
avatar_url: z.string(),
create_at: z.string(),
});
export interface WorkspaceDetail extends Workspace {
owner: User;
member_count: number;
}
export const workspaceSchema = z.object({
id: z.string(),
type: z.nativeEnum(WorkspaceType),
public: z.boolean(),
permission: z.nativeEnum(PermissionType),
});
export type Workspace = z.infer<typeof workspaceSchema>;
export const workspaceDetailSchema = z.object({
...workspaceSchema.shape,
owner: userSchema,
member_count: z.number(),
});
export type WorkspaceDetail = z.infer<typeof workspaceDetailSchema>;
export interface Permission {
id: string;

View File

@@ -0,0 +1,93 @@
import { DebugLogger } from '@affine/debug';
import {
getLoginStorage,
isExpired,
parseIdToken,
} from '@affine/workspace/affine/login';
import { assertExists } from '@blocksuite/global/utils';
import * as url from 'lib0/url';
import * as websocket from 'lib0/websocket';
const RECONNECT_INTERVAL_TIME = 500;
const MAX_RECONNECT_TIMES = 50;
export class WebsocketClient {
public readonly baseServerUrl: string;
private _client: websocket.WebsocketClient | null = null;
public shouldReconnect = false;
private _retryTimes = 0;
private _logger = new DebugLogger('affine:channel');
private _callback: ((message: any) => void) | null = null;
constructor(serverUrl: string) {
while (serverUrl.endsWith('/')) {
serverUrl = serverUrl.slice(0, serverUrl.length - 1);
}
this.baseServerUrl = serverUrl;
}
public connect(callback: (message: any) => void) {
const loginResponse = getLoginStorage();
assertExists(loginResponse, 'loginResponse is null');
const encodedParams = url.encodeQueryParams({
token: loginResponse.token,
});
const serverUrl =
this.baseServerUrl +
(encodedParams.length === 0 ? '' : '?' + encodedParams);
this._client = new websocket.WebsocketClient(serverUrl);
this._callback = callback;
this._setupChannel();
this._client.on('message', this._callback);
}
public disconnect() {
assertExists(this._client, 'client is null');
if (this._callback) {
this._client.off('message', this._callback);
}
this._client.disconnect();
this._client.destroy();
this._client = null;
}
private _setupChannel() {
assertExists(this._client, 'client is null');
const client = this._client;
client.on('connect', () => {
this._logger.debug('Affine channel connected');
this.shouldReconnect = true;
this._retryTimes = 0;
});
client.on('disconnect', ({ error }: { error: Error }) => {
if (error) {
const loginResponse = getLoginStorage();
const isLogin = loginResponse
? isExpired(parseIdToken(loginResponse.token))
: false;
// Try to re-connect if connect error has occurred
if (this.shouldReconnect && isLogin && !client.connected) {
try {
setTimeout(() => {
if (this._retryTimes <= MAX_RECONNECT_TIMES) {
assertExists(this._callback, 'callback is null');
this.connect(this._callback);
this._logger.info(
`try reconnect channel ${++this._retryTimes} times`
);
} else {
this._logger.error(
'reconnect failed, max reconnect times reached'
);
}
}, RECONNECT_INTERVAL_TIME);
} catch (e) {
this._logger.error('reconnect failed', e);
}
}
}
});
}
}

View File

@@ -0,0 +1,71 @@
import {
workspaceDetailSchema,
workspaceSchema,
} from '@affine/workspace/affine/api';
import { WebsocketClient } from '@affine/workspace/affine/channel';
import { jotaiStore, jotaiWorkspacesAtom } from '@affine/workspace/atom';
import type { WorkspaceCRUD } from '@affine/workspace/type';
import type { WorkspaceFlavour } from '@affine/workspace/type';
import { assertExists } from '@blocksuite/global/utils';
import { z } from 'zod';
const channelMessageSchema = z.object({
ws_list: z.array(workspaceSchema),
ws_details: z.record(workspaceDetailSchema),
metadata: z.record(
z.object({
avatar: z.string(),
name: z.string(),
})
),
});
type ChannelMessage = z.infer<typeof channelMessageSchema>;
export function createAffineGlobalChannel(
crud: WorkspaceCRUD<WorkspaceFlavour.AFFINE>
) {
let client: WebsocketClient | null;
async function handleMessage(channelMessage: ChannelMessage) {
const parseResult = channelMessageSchema.safeParse(channelMessage);
if (!parseResult.success) {
console.error(
'channelMessageSchema.safeParse(channelMessage) failed',
parseResult
);
}
const { ws_details } = channelMessage;
const currentWorkspaces = await crud.list();
for (const [id] of Object.entries(ws_details)) {
const workspaceIndex = currentWorkspaces.findIndex(
workspace => workspace.id === id
);
// If the workspace is not in the current workspace list, remove it
if (workspaceIndex === -1) {
jotaiStore.set(jotaiWorkspacesAtom, workspaces => {
const idx = workspaces.findIndex(workspace => workspace.id === id);
workspaces.splice(idx, 1);
return [...workspaces];
});
}
}
}
return {
connect: () => {
client = new WebsocketClient(
`${window.location.protocol === 'https:' ? 'wss' : 'ws'}://${
window.location.host
}/api/global/sync`
);
client.connect(handleMessage);
},
disconnect: () => {
assertExists(client, 'client is null');
client.disconnect();
client = null;
},
};
}

View File

@@ -0,0 +1,18 @@
import type { WorkspaceFlavour } from '@affine/workspace/type';
import { createStore } from 'jotai/index';
import { atomWithStorage } from 'jotai/utils';
export type JotaiWorkspace = {
id: string;
flavour: WorkspaceFlavour;
};
// root primitive atom that stores the list of workspaces which could be used in the app
// if a workspace is not in this list, it should not be used in the app
export const jotaiWorkspacesAtom = atomWithStorage<JotaiWorkspace[]>(
'jotai-workspaces',
[]
);
// global jotai store, which is used to store all the atoms
export const jotaiStore = createStore();

View File

@@ -1,6 +1,57 @@
import type { Workspace as RemoteWorkspace } from '@affine/workspace/affine/api';
import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
import type { FC } from 'react';
export type BaseProvider = {
flavour: string;
// if this is true, we will connect the provider on the background
background: boolean;
connect: () => void;
disconnect: () => void;
// cleanup data when workspace is removed
cleanup: () => void;
};
export interface BackgroundProvider extends BaseProvider {
background: true;
callbacks: Set<() => void>;
}
export interface AffineDownloadProvider extends BaseProvider {
flavour: 'affine-download';
}
export interface BroadCastChannelProvider extends BaseProvider {
flavour: 'broadcast-channel';
}
export interface LocalIndexedDBProvider extends BackgroundProvider {
flavour: 'local-indexeddb';
}
export interface AffineWebSocketProvider extends BaseProvider {
flavour: 'affine-websocket';
}
export type Provider =
| LocalIndexedDBProvider
| AffineWebSocketProvider
| BroadCastChannelProvider;
export interface AffineWorkspace extends RemoteWorkspace {
flavour: WorkspaceFlavour.AFFINE;
// empty
blockSuiteWorkspace: BlockSuiteWorkspace;
providers: Provider[];
}
export interface LocalWorkspace {
flavour: WorkspaceFlavour.LOCAL;
id: string;
blockSuiteWorkspace: BlockSuiteWorkspace;
providers: Provider[];
}
export const enum LoadPriority {
HIGH = 1,
MEDIUM = 2,
@@ -11,6 +62,7 @@ export const enum WorkspaceFlavour {
AFFINE = 'affine',
LOCAL = 'local',
}
export const settingPanel = {
General: 'general',
Collaboration: 'collaboration',
@@ -21,8 +73,11 @@ export const settingPanel = {
export const settingPanelValues = [...Object.values(settingPanel)] as const;
export type SettingPanel = (typeof settingPanel)[keyof typeof settingPanel];
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface WorkspaceRegistry {}
// built-in workspaces
export interface WorkspaceRegistry {
[WorkspaceFlavour.AFFINE]: AffineWorkspace;
[WorkspaceFlavour.LOCAL]: LocalWorkspace;
}
export interface WorkspaceCRUD<Flavour extends keyof WorkspaceRegistry> {
create: (blockSuiteWorkspace: BlockSuiteWorkspace) => Promise<string>;