From c39814047accb0ba30a033458948e81946060a61 Mon Sep 17 00:00:00 2001 From: DarkSky Date: Tue, 26 Jul 2022 17:11:01 +0800 Subject: [PATCH] feat: jwt performance --- .../db-service/src/services/base.ts | 1 + .../db-service/src/services/database/index.ts | 3 +- libs/datasource/jwt-rpc/src/broadcast.ts | 77 +++++++ libs/datasource/jwt-rpc/src/processor.ts | 80 +++++++ libs/datasource/jwt-rpc/src/provider.ts | 8 +- .../src/{connector.ts => websocket.ts} | 197 ++++-------------- libs/datasource/jwt/src/adapter/yjs/index.ts | 7 +- libs/datasource/jwt/src/utils/index.ts | 2 +- libs/utils/src/utils.ts | 2 +- 9 files changed, 204 insertions(+), 173 deletions(-) create mode 100644 libs/datasource/jwt-rpc/src/broadcast.ts create mode 100644 libs/datasource/jwt-rpc/src/processor.ts rename libs/datasource/jwt-rpc/src/{connector.ts => websocket.ts} (52%) diff --git a/libs/datasource/db-service/src/services/base.ts b/libs/datasource/db-service/src/services/base.ts index d0788170b3..81c8f3e9ac 100644 --- a/libs/datasource/db-service/src/services/base.ts +++ b/libs/datasource/db-service/src/services/base.ts @@ -82,6 +82,7 @@ export abstract class ServiceBaseClass { workspace: string, blockId: string ): Promise { + if (!blockId) return undefined; const db = await this.database.getDatabase(workspace); const db_block = await db.get(blockId as 'block'); if (db_block.id !== blockId) { diff --git a/libs/datasource/db-service/src/services/database/index.ts b/libs/datasource/db-service/src/services/database/index.ts index c8422502b2..83d832a68b 100644 --- a/libs/datasource/db-service/src/services/database/index.ts +++ b/libs/datasource/db-service/src/services/database/index.ts @@ -19,7 +19,7 @@ const loading = new Set(); const waitLoading = async (key: string) => { while (loading.has(key)) { - await sleep(); + await sleep(50); } }; @@ -53,7 +53,6 @@ async function _getBlockDatabase( if (!workspaces[workspace]) { loading.add(workspace); - workspaces[workspace] = await BlockClient.init(workspace, { ...options, token: await _getCurrentToken(), diff --git a/libs/datasource/jwt-rpc/src/broadcast.ts b/libs/datasource/jwt-rpc/src/broadcast.ts new file mode 100644 index 0000000000..ed5dfd2ebc --- /dev/null +++ b/libs/datasource/jwt-rpc/src/broadcast.ts @@ -0,0 +1,77 @@ +import * as Y from 'yjs'; +import * as bc from 'lib0/broadcastchannel'; +import * as encoding from 'lib0/encoding'; +import * as syncProtocol from 'y-protocols/sync'; +import * as awarenessProtocol from 'y-protocols/awareness'; + +import { Message } from './handler'; +import { readMessage } from './processor'; +import { WebsocketProvider } from './provider'; + +export const registerBroadcastSubscriber = ( + provider: WebsocketProvider, + awareness: awarenessProtocol.Awareness, + document: Y.Doc +) => { + const channel = provider.broadcastChannel; + + const subscriber = (data: ArrayBuffer, origin: any) => { + if (origin !== provider) { + const encoder = readMessage(provider, new Uint8Array(data), false); + if (encoding.length(encoder) > 1) { + bc.publish(channel, encoding.toUint8Array(encoder), provider); + } + } + }; + + bc.subscribe(channel, subscriber); + let connected = true; + + // send sync step1 to bc + // write sync step 1 + const encoderSync = encoding.createEncoder(); + encoding.writeVarUint(encoderSync, Message.sync); + syncProtocol.writeSyncStep1(encoderSync, document); + bc.publish(channel, encoding.toUint8Array(encoderSync), this); + // broadcast local state + const encoderState = encoding.createEncoder(); + encoding.writeVarUint(encoderState, Message.sync); + syncProtocol.writeSyncStep2(encoderState, document); + bc.publish(channel, encoding.toUint8Array(encoderState), this); + // write queryAwareness + const encoderAwarenessQuery = encoding.createEncoder(); + encoding.writeVarUint(encoderAwarenessQuery, Message.queryAwareness); + bc.publish(channel, encoding.toUint8Array(encoderAwarenessQuery), this); + // broadcast local awareness state + const encoderAwarenessState = encoding.createEncoder(); + encoding.writeVarUint(encoderAwarenessState, Message.awareness); + encoding.writeVarUint8Array( + encoderAwarenessState, + awarenessProtocol.encodeAwarenessUpdate(awareness, [document.clientID]) + ); + bc.publish(channel, encoding.toUint8Array(encoderAwarenessState), this); + + const broadcastMessage = (buf: ArrayBuffer) => { + if (connected) bc.publish(channel, buf, provider); + }; + + const disconnect = () => { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, Message.awareness); + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + awareness, + [document.clientID], + new Map() + ) + ); + broadcastMessage(encoding.toUint8Array(encoder)); + if (connected) { + bc.unsubscribe(channel, subscriber); + connected = false; + } + }; + + return { broadcastMessage, disconnect }; +}; diff --git a/libs/datasource/jwt-rpc/src/processor.ts b/libs/datasource/jwt-rpc/src/processor.ts new file mode 100644 index 0000000000..4270fdf8dd --- /dev/null +++ b/libs/datasource/jwt-rpc/src/processor.ts @@ -0,0 +1,80 @@ +import * as Y from 'yjs'; +import * as encoding from 'lib0/encoding'; +import * as decoding from 'lib0/decoding'; +import * as syncProtocol from 'y-protocols/sync'; +import * as awarenessProtocol from 'y-protocols/awareness'; + +import { Message } from './handler'; +import { WebsocketProvider } from './provider'; + +export const readMessage = ( + provider: WebsocketProvider, + buf: Uint8Array, + emitSynced: boolean +): encoding.Encoder => { + const decoder = decoding.createDecoder(buf); + const encoder = encoding.createEncoder(); + const messageType = decoding.readVarUint(decoder) as Message; + const messageHandler = provider.messageHandlers[messageType]; + if (/** @type {any} */ messageHandler) { + messageHandler(encoder, decoder, provider, emitSynced, messageType); + } else { + console.error('Unable to compute message'); + } + return encoder; +}; + +export const registerUpdateHandler = ( + provider: WebsocketProvider, + awareness: awarenessProtocol.Awareness, + doc: Y.Doc, + broadcastMessage: (buf: ArrayBuffer) => void +) => { + const beforeUnloadHandler = () => { + awarenessProtocol.removeAwarenessStates( + awareness, + [doc.clientID], + 'window unload' + ); + }; + + const awarenessUpdateHandler = ({ added, updated, removed }: any) => { + const changedClients = added.concat(updated).concat(removed); + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, Message.awareness); + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients) + ); + broadcastMessage(encoding.toUint8Array(encoder)); + }; + + // Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel) + const documentUpdateHandler = (update: Uint8Array, origin: any) => { + if (origin !== provider) { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, Message.sync); + syncProtocol.writeUpdate(encoder, update); + broadcastMessage(encoding.toUint8Array(encoder)); + } + }; + + if (typeof window !== 'undefined') { + window.addEventListener('beforeunload', beforeUnloadHandler); + } else if (typeof process !== 'undefined') { + process.on('exit', beforeUnloadHandler); + } + + awareness.on('update', awarenessUpdateHandler); + doc.on('update', documentUpdateHandler); + return () => { + if (typeof window !== 'undefined') { + window.removeEventListener('beforeunload', beforeUnloadHandler); + } else if (typeof process !== 'undefined') { + process.off('exit', beforeUnloadHandler); + } + + awareness.off('update', awarenessUpdateHandler); + doc.off('update', documentUpdateHandler); + }; +}; diff --git a/libs/datasource/jwt-rpc/src/provider.ts b/libs/datasource/jwt-rpc/src/provider.ts index 9839e3a2fb..86a59b14c3 100644 --- a/libs/datasource/jwt-rpc/src/provider.ts +++ b/libs/datasource/jwt-rpc/src/provider.ts @@ -6,11 +6,9 @@ import * as url from 'lib0/url'; import * as awarenessProtocol from 'y-protocols/awareness'; import { handler } from './handler'; -import { - registerBroadcastSubscriber, - registerUpdateHandler, - registerWebsocket, -} from './connector'; +import { registerBroadcastSubscriber } from './broadcast'; +import { registerWebsocket } from './websocket'; +import { registerUpdateHandler } from './processor'; /** * Websocket Provider for Yjs. Creates a websocket connection to sync the shared document. diff --git a/libs/datasource/jwt-rpc/src/connector.ts b/libs/datasource/jwt-rpc/src/websocket.ts similarity index 52% rename from libs/datasource/jwt-rpc/src/connector.ts rename to libs/datasource/jwt-rpc/src/websocket.ts index a27652d9fc..fa43db554b 100644 --- a/libs/datasource/jwt-rpc/src/connector.ts +++ b/libs/datasource/jwt-rpc/src/websocket.ts @@ -1,154 +1,12 @@ -import * as Y from 'yjs'; -import * as bc from 'lib0/broadcastchannel'; import * as time from 'lib0/time'; import * as encoding from 'lib0/encoding'; -import * as decoding from 'lib0/decoding'; import * as syncProtocol from 'y-protocols/sync'; import * as awarenessProtocol from 'y-protocols/awareness'; import * as math from 'lib0/math'; -import { WebsocketProvider } from './provider'; import { Message } from './handler'; - -export const readMessage = ( - provider: WebsocketProvider, - buf: Uint8Array, - emitSynced: boolean -): encoding.Encoder => { - const decoder = decoding.createDecoder(buf); - const encoder = encoding.createEncoder(); - const messageType = decoding.readVarUint(decoder) as Message; - const messageHandler = provider.messageHandlers[messageType]; - if (/** @type {any} */ messageHandler) { - messageHandler(encoder, decoder, provider, emitSynced, messageType); - } else { - console.error('Unable to compute message'); - } - return encoder; -}; - -export const registerBroadcastSubscriber = ( - provider: WebsocketProvider, - awareness: awarenessProtocol.Awareness, - document: Y.Doc -) => { - const channel = provider.broadcastChannel; - - const subscriber = (data: ArrayBuffer, origin: any) => { - if (origin !== provider) { - const encoder = readMessage(provider, new Uint8Array(data), false); - if (encoding.length(encoder) > 1) { - bc.publish(channel, encoding.toUint8Array(encoder), provider); - } - } - }; - - bc.subscribe(channel, subscriber); - let connected = true; - - // send sync step1 to bc - // write sync step 1 - const encoderSync = encoding.createEncoder(); - encoding.writeVarUint(encoderSync, Message.sync); - syncProtocol.writeSyncStep1(encoderSync, document); - bc.publish(channel, encoding.toUint8Array(encoderSync), this); - // broadcast local state - const encoderState = encoding.createEncoder(); - encoding.writeVarUint(encoderState, Message.sync); - syncProtocol.writeSyncStep2(encoderState, document); - bc.publish(channel, encoding.toUint8Array(encoderState), this); - // write queryAwareness - const encoderAwarenessQuery = encoding.createEncoder(); - encoding.writeVarUint(encoderAwarenessQuery, Message.queryAwareness); - bc.publish(channel, encoding.toUint8Array(encoderAwarenessQuery), this); - // broadcast local awareness state - const encoderAwarenessState = encoding.createEncoder(); - encoding.writeVarUint(encoderAwarenessState, Message.awareness); - encoding.writeVarUint8Array( - encoderAwarenessState, - awarenessProtocol.encodeAwarenessUpdate(awareness, [document.clientID]) - ); - bc.publish(channel, encoding.toUint8Array(encoderAwarenessState), this); - - const broadcastMessage = (buf: ArrayBuffer) => { - if (connected) bc.publish(channel, buf, provider); - }; - - const disconnect = () => { - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, Message.awareness); - encoding.writeVarUint8Array( - encoder, - awarenessProtocol.encodeAwarenessUpdate( - awareness, - [document.clientID], - new Map() - ) - ); - broadcastMessage(encoding.toUint8Array(encoder)); - if (connected) { - bc.unsubscribe(channel, subscriber); - connected = false; - } - }; - - return { broadcastMessage, disconnect }; -}; - -export const registerUpdateHandler = ( - provider: WebsocketProvider, - awareness: awarenessProtocol.Awareness, - doc: Y.Doc, - broadcastMessage: (buf: ArrayBuffer) => void -) => { - const beforeUnloadHandler = () => { - awarenessProtocol.removeAwarenessStates( - awareness, - [doc.clientID], - 'window unload' - ); - }; - - const awarenessUpdateHandler = ({ added, updated, removed }: any) => { - const changedClients = added.concat(updated).concat(removed); - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, Message.awareness); - encoding.writeVarUint8Array( - encoder, - awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients) - ); - broadcastMessage(encoding.toUint8Array(encoder)); - }; - - // Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel) - const documentUpdateHandler = (update: Uint8Array, origin: any) => { - if (origin !== provider) { - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, Message.sync); - syncProtocol.writeUpdate(encoder, update); - broadcastMessage(encoding.toUint8Array(encoder)); - } - }; - - if (typeof window !== 'undefined') { - window.addEventListener('beforeunload', beforeUnloadHandler); - } else if (typeof process !== 'undefined') { - process.on('exit', beforeUnloadHandler); - } - - awareness.on('update', awarenessUpdateHandler); - doc.on('update', documentUpdateHandler); - return () => { - if (typeof window !== 'undefined') { - window.removeEventListener('beforeunload', beforeUnloadHandler); - } else if (typeof process !== 'undefined') { - process.off('exit', beforeUnloadHandler); - } - - awareness.off('update', awarenessUpdateHandler); - doc.off('update', documentUpdateHandler); - }; -}; +import { readMessage } from './processor'; +import { WebsocketProvider } from './provider'; enum WebSocketState { disconnected = 0, @@ -159,22 +17,41 @@ enum WebSocketState { // @todo - this should depend on awareness.outdatedTime const WEBSOCKET_RECONNECT = 30000; +const GET_TOKEN_BASELINE_TIMEOUT = 500; const _getToken = async ( remote: string, token: string, existsProtocol?: string, - reconnect = 3 + reconnect = 3, + timeout = 500 ) => { if (existsProtocol && reconnect > 0) { return { protocol: existsProtocol }; } const url = new URL(remote); url.protocol = window.location.protocol; - return fetch(url, { method: 'POST', headers: { token } }).then(r => - r.json() + const controller = new AbortController(); + const id = setTimeout( + () => controller.abort(), + GET_TOKEN_BASELINE_TIMEOUT + timeout ); + const resp = await fetch(url, { + method: 'POST', + headers: { token }, + signal: controller.signal, + }); + + clearTimeout(id); + + return resp.json(); }; +const _getTimeout = (provider: WebsocketProvider) => + math.min( + math.pow(2, provider.wsUnsuccessfulReconnects) * 100, + provider.maxBackOffTime + ); + export const registerWebsocket = ( provider: WebsocketProvider, token: string, @@ -187,7 +64,13 @@ export const registerWebsocket = ( let websocket: WebSocket | undefined = undefined; - _getToken(provider.url, token, existsProtocol, reconnect) + _getToken( + provider.url, + token, + existsProtocol, + reconnect, + _getTimeout(provider) + ) .then(({ protocol }) => { websocket = new WebSocket(provider.url, protocol); websocket.binaryType = 'arraybuffer'; @@ -233,17 +116,12 @@ export const registerWebsocket = ( } else { provider.wsUnsuccessfulReconnects++; } - if (reconnect <= 0) { - provider.emit('lost-connection', []); - } + if (reconnect <= 0) provider.emit('lost-connection', []); // Start with no reconnect timeout and increase timeout by // using exponential backoff starting with 100ms setTimeout( registerWebsocket, - math.min( - math.pow(2, provider.wsUnsuccessfulReconnects) * 100, - provider.maxBackOffTime - ), + _getTimeout(provider), provider, token, resyncInterval, @@ -284,16 +162,11 @@ export const registerWebsocket = ( provider.emit('status', [{ status: 'connecting' }]); }) .catch(err => { - if (reconnect <= 0) { - provider.emit('lost-connection', []); - } + provider.emit('lost-connection', []); provider.wsUnsuccessfulReconnects++; setTimeout( registerWebsocket, - math.min( - math.pow(2, provider.wsUnsuccessfulReconnects) * 100, - provider.maxBackOffTime - ), + _getTimeout(provider), provider, token, resyncInterval, diff --git a/libs/datasource/jwt/src/adapter/yjs/index.ts b/libs/datasource/jwt/src/adapter/yjs/index.ts index ca4d3904e2..b8691ce8fd 100644 --- a/libs/datasource/jwt/src/adapter/yjs/index.ts +++ b/libs/datasource/jwt/src/adapter/yjs/index.ts @@ -115,8 +115,8 @@ async function _initYjsDatabase( const doc = new Doc({ autoLoad: true, shouldLoad: true }); - const idb = await new IndexeddbPersistence(workspace, doc).whenSynced; - const [awareness, ws] = await _initWebsocketProvider( + const idbp = new IndexeddbPersistence(workspace, doc).whenSynced; + const wsp = _initWebsocketProvider( backend, workspace, doc, @@ -124,6 +124,8 @@ async function _initYjsDatabase( params ); + const [idb, [awareness, ws]] = await Promise.all([idbp, wsp]); + const binaries = new Doc({ autoLoad: true, shouldLoad: true }); const binariesIdb = await new IndexeddbPersistence( `${workspace}_binaries`, @@ -410,6 +412,7 @@ export class YjsAdapter implements AsyncDatabaseAdapter { binary?: ArrayBufferLike; } ): Promise { + console.trace('createBlock', options); const uuid = options.uuid || `affine${nanoid(16)}`; if (options.type === BlockTypes.binary) { if (options.binary && options.binary instanceof ArrayBuffer) { diff --git a/libs/datasource/jwt/src/utils/index.ts b/libs/datasource/jwt/src/utils/index.ts index 99218cae1d..f19e5633a1 100644 --- a/libs/datasource/jwt/src/utils/index.ts +++ b/libs/datasource/jwt/src/utils/index.ts @@ -51,7 +51,7 @@ export function isBlock(obj: any) { } export function sleep() { - return new Promise(resolve => setTimeout(resolve, 500)); + return new Promise(resolve => setTimeout(resolve, 100)); } export { BlockEventBus } from './event-bus'; diff --git a/libs/utils/src/utils.ts b/libs/utils/src/utils.ts index 96235fece7..4388128e92 100644 --- a/libs/utils/src/utils.ts +++ b/libs/utils/src/utils.ts @@ -23,7 +23,7 @@ export function getPageId() { return path ? path[2] : undefined; } -export async function sleep(delay?: number) { +export async function sleep(delay = 100) { return new Promise(res => { window.setTimeout(() => { res(true);