feat: merge keck

This commit is contained in:
DarkSky
2022-07-22 17:04:52 +08:00
parent 7d433b53d4
commit 0871fbda07
19 changed files with 1302 additions and 58 deletions

18
apps/keck/.eslintrc.json Normal file
View File

@@ -0,0 +1,18 @@
{
"extends": ["../../.eslintrc.json"],
"ignorePatterns": ["!**/*"],
"overrides": [
{
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
"rules": {}
},
{
"files": ["*.ts", "*.tsx"],
"rules": {}
},
{
"files": ["*.js", "*.jsx"],
"rules": {}
}
]
}

16
apps/keck/jest.config.ts Normal file
View File

@@ -0,0 +1,16 @@
/* eslint-disable */
export default {
displayName: 'keck',
preset: '../../jest.preset.js',
globals: {
'ts-jest': {
tsconfig: '<rootDir>/tsconfig.spec.json',
},
},
testEnvironment: 'node',
transform: {
'^.+\\.[tj]s$': 'ts-jest',
},
moduleFileExtensions: ['ts', 'js', 'html'],
coverageDirectory: '../../coverage/apps/keck',
};

22
apps/keck/package.json Normal file
View File

@@ -0,0 +1,22 @@
{
"name": "@toeverything/keck",
"version": "0.0.1",
"license": "MIT",
"author": "DarkSky <darksky2048@gmail.com>",
"main": "jest.config.ts",
"dependencies": {
"authing-js-sdk": "^4.23.33",
"firebase-admin": "^11.0.0",
"lib0": "^0.2.51",
"lru-cache": "^7.13.0",
"nanoid": "^4.0.0",
"readable-stream": "^4.1.0",
"ws": "^8.8.0",
"y-protocols": "^1.0.5",
"yjs": "^13.5.39"
},
"devDependencies": {
"@types/readable-stream": "^2.3.13",
"@types/ws": "^8.5.3"
}
}

View File

@@ -0,0 +1,9 @@
{
"name": "@toeverything/keck",
"license": "MIT",
"version": "0.0.1",
"dependencies": {
"level": "^8.0.0",
"level-read-stream": "1.1.0"
}
}

52
apps/keck/project.json Normal file
View File

@@ -0,0 +1,52 @@
{
"$schema": "../../node_modules/nx/schemas/project-schema.json",
"sourceRoot": "apps/keck/src",
"projectType": "application",
"targets": {
"build": {
"executor": "@nrwl/node:webpack",
"outputs": ["{options.outputPath}"],
"options": {
"outputPath": "dist/apps/keck",
"main": "apps/keck/src/index.ts",
"tsConfig": "apps/keck/tsconfig.app.json",
"assets": []
},
"configurations": {
"production": {
"optimization": true,
"extractLicenses": true,
"inspect": false,
"fileReplacements": [
{
"replace": "apps/keck/src/environments/environment.ts",
"with": "apps/keck/src/environments/environment.prod.ts"
}
]
}
}
},
"serve": {
"executor": "@nrwl/node:node",
"options": {
"buildTarget": "keck:build"
}
},
"lint": {
"executor": "@nrwl/linter:eslint",
"outputs": ["{options.outputFile}"],
"options": {
"lintFilePatterns": ["apps/keck/**/*.ts"]
}
},
"test": {
"executor": "@nrwl/jest:jest",
"outputs": ["coverage/apps/keck"],
"options": {
"jestConfig": "apps/keck/jest.config.ts",
"passWithNoTests": true
}
}
},
"tags": []
}

View File

@@ -0,0 +1,3 @@
export const environment = {
production: true,
};

View File

@@ -0,0 +1,3 @@
export const environment = {
production: false,
};

118
apps/keck/src/index.ts Normal file
View File

@@ -0,0 +1,118 @@
import WebSocket = require('ws');
import http = require('http');
// import authing = require('authing-js-sdk');
import firebaseApp = require('firebase-admin/app');
import firebaseAuth = require('firebase-admin/auth');
import LRUCache = require('lru-cache');
import nanoid = require('nanoid');
import { handleConnection } from './utils';
import { URL } from 'url';
if (process.env.NODE_ENV !== 'development') {
firebaseApp.initializeApp({
credential: firebaseApp.cert({
clientEmail: process.env.FIREBASE_ACCOUNT,
privateKey: process.env.FIREBASE_CERT,
projectId: process.env.FIREBASE_PROJECT,
}),
projectId: process.env.FIREBASE_PROJECT,
});
}
const _getWorkspace = (path: string) => {
const [_, part1] = path.split('/collaboration/');
const [workspace] = part1?.split('/') || [];
return workspace;
};
const AFFINE_COMMON_WORKSPACE = 'affine2vin277tcmafwq';
const _checkAuth = async (
request: http.IncomingMessage,
response: http.ServerResponse,
callback: (response: http.OutgoingMessage, workspace: string) => boolean
) => {
if (process.env.NODE_ENV === 'development') {
const url = new URL(request.url, `http://${request.headers.host}`);
const workspace = _getWorkspace(url.pathname);
if (workspace) return callback(response, workspace);
return false;
} else {
try {
const decodedToken = await firebaseAuth
.getAuth()
.verifyIdToken(request.headers.token as string);
const allowWorkspace = [AFFINE_COMMON_WORKSPACE, decodedToken.uid];
const url = new URL(request.url, `http://${request.headers.host}`);
const workspace = _getWorkspace(url.pathname);
if (allowWorkspace.includes(workspace)) {
return callback(response, workspace);
}
} catch (error) {
console.log(error);
}
return false;
}
};
const HOST = process.env.HOST || 'localhost';
const PORT = process.env.PORT || 3000;
const _tokens = new LRUCache<string, string>({
max: 10240,
ttl: 1000 * 60 * 5,
});
const _server = http.createServer((request, response) => {
if (
request.method === 'POST' &&
typeof request.headers.token === 'string'
) {
_checkAuth(request, response, (response, workspace) => {
const protocol = nanoid.nanoid(16);
_tokens.set(protocol, workspace);
response.end(JSON.stringify({ protocol }));
return true;
})
.then(responded => {
if (!responded) {
response.writeHead(401).end();
}
})
.catch(error => {
console.log(error);
response.writeHead(401).end();
});
return;
}
response.writeHead(200, { 'Content-Type': 'text/plain' });
response.end('okay');
});
const _websocketServer = new WebSocket.Server({ noServer: true });
_websocketServer.on('connection', handleConnection);
_server.on('upgrade', (request, socket, head) => {
// You may check auth of request here..
// See https://github.com/websockets/ws#client-authentication
const protocol = request.headers['sec-websocket-protocol'];
if (typeof protocol === 'string' && _tokens.get(protocol)) {
_websocketServer.handleUpgrade(request, socket, head, ws => {
_websocketServer.emit(
'connection',
ws,
request,
_tokens.get(protocol)
);
});
} else {
socket.write('HTTP/1.1 401 Unauthorized');
socket.destroy();
}
});
_server.listen(PORT, () => {
console.log(`running at '${HOST}' on port ${PORT}`);
});

239
apps/keck/src/utils.ts Normal file
View File

@@ -0,0 +1,239 @@
import WebSocket = require('ws');
import http = require('http');
import Y = require('yjs');
import lib0 = require('lib0');
import awarenessProtocol = require('y-protocols/awareness');
import syncProtocol = require('y-protocols/sync');
// import { getPersistenceStorage } from './persistence';
const { encoding, decoding, mutex, map } = lib0;
const wsReadyStateConnecting = 0;
const wsReadyStateOpen = 1;
// disable gc when using snapshots!
const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0';
type Persistence =
| ((arg0: string, arg1: WSSharedDoc) => Promise<any>)
| undefined;
const persistence: Persistence = null; // getPersistenceStorage('./affine');
const docs: Map<string, WSSharedDoc> = new Map();
const messageSync = 0;
const messageAwareness = 1;
// const messageAuth = 2
const updateHandler = (update: Uint8Array, origin: any, doc: WSSharedDoc) => {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
syncProtocol.writeUpdate(encoder, update);
const message = encoding.toUint8Array(encoder);
doc.conns.forEach((_, conn) => send(doc, conn, message));
};
type AwarenessEvent = {
added: Array<number>;
updated: Array<number>;
removed: Array<number>;
};
export class WSSharedDoc extends Y.Doc {
name: string;
mux: lib0.mutex.mutex;
conns: Map<any, any>;
awareness: awarenessProtocol.Awareness;
/**
* @param {string} name
*/
constructor(name: string) {
super({ gc: gcEnabled });
this.name = name;
this.mux = mutex.createMutex();
/**
* Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed
* @type {Map<Object, Set<number>>}
*/
this.conns = new Map();
/**
* @type {awarenessProtocol.Awareness}
*/
this.awareness = new awarenessProtocol.Awareness(this);
this.awareness.setLocalState(null);
const awarenessChangeHandler = (
{ added, updated, removed }: AwarenessEvent,
conn: object | null
) => {
const changedClients = added.concat(updated, removed);
if (conn !== null) {
const connControlledIds: Set<number> = this.conns.get(conn);
if (connControlledIds !== undefined) {
added.forEach(clientId => {
connControlledIds.add(clientId);
});
removed.forEach(clientId => {
connControlledIds.delete(clientId);
});
}
}
// broadcast awareness update
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageAwareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
this.awareness,
changedClients
)
);
const buff = encoding.toUint8Array(encoder);
this.conns.forEach((_, c) => {
send(this, c, buff);
});
};
this.awareness.on('update', awarenessChangeHandler);
this.on('update', updateHandler);
}
}
// Gets a Y.Doc by name, whether in memory or on disk
const getYDoc = (docname: string, gc = true): WSSharedDoc =>
map.setIfUndefined(docs, docname, () => {
const doc = new WSSharedDoc(docname);
doc.gc = gc;
if (persistence !== null) {
persistence(docname, doc);
}
docs.set(docname, doc);
return doc;
});
const messageListener = (conn: any, doc: WSSharedDoc, message: Uint8Array) => {
try {
const encoder = encoding.createEncoder();
const decoder = decoding.createDecoder(message);
const messageType = decoding.readVarUint(decoder);
switch (messageType) {
case messageSync:
encoding.writeVarUint(encoder, messageSync);
syncProtocol.readSyncMessage(decoder, encoder, doc, null);
if (encoding.length(encoder) > 1) {
send(doc, conn, encoding.toUint8Array(encoder));
}
break;
case messageAwareness: {
awarenessProtocol.applyAwarenessUpdate(
doc.awareness,
decoding.readVarUint8Array(decoder),
conn
);
break;
}
}
} catch (err) {
console.error(err);
doc.emit('error', [err]);
}
};
const closeConn = (doc: WSSharedDoc, conn: any) => {
if (doc.conns.has(conn)) {
const controlledIds: Set<number> = doc.conns.get(conn);
doc.conns.delete(conn);
awarenessProtocol.removeAwarenessStates(
doc.awareness,
Array.from(controlledIds),
null
);
if (doc.conns.size === 0 && persistence !== null) {
// if persisted, we store state and destroy ydocument
persistence(doc.name, doc).then(() => {
doc.destroy();
});
docs.delete(doc.name);
}
}
conn.close();
};
const send = (doc: WSSharedDoc, conn: any, m: Uint8Array) => {
if (
conn.readyState !== wsReadyStateConnecting &&
conn.readyState !== wsReadyStateOpen
) {
closeConn(doc, conn);
}
try {
conn.send(m, (/** @param {any} err */ err: any) => {
err != null && closeConn(doc, conn);
});
} catch (e) {
closeConn(doc, conn);
}
};
export const handleConnection = (
socket: WebSocket.WebSocket,
request: http.IncomingMessage,
docName: string
) => {
const gc = true;
socket.binaryType = 'arraybuffer';
// get doc, initialize if it does not exist yet
const doc = getYDoc(docName, gc);
doc.conns.set(socket, new Set());
// listen and reply to events
socket.on('message', (message: ArrayBuffer) =>
messageListener(socket, doc, new Uint8Array(message))
);
// Check if connection is still alive
let pongReceived = true;
const pingInterval = setInterval(() => {
if (!pongReceived) {
if (doc.conns.has(socket)) {
closeConn(doc, socket);
}
clearInterval(pingInterval);
} else if (doc.conns.has(socket)) {
pongReceived = false;
try {
socket.ping();
} catch (e) {
closeConn(doc, socket);
clearInterval(pingInterval);
}
}
}, 30 * 1000);
socket.on('close', () => {
closeConn(doc, socket);
clearInterval(pingInterval);
});
socket.on('pong', () => {
pongReceived = true;
});
// put the following in a variables in a block so the interval handlers don't keep in in
// scope
{
// send sync step 1
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
syncProtocol.writeSyncStep1(encoder, doc);
send(doc, socket, encoding.toUint8Array(encoder));
const awarenessStates = doc.awareness.getStates();
if (awarenessStates.size > 0) {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageAwareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
doc.awareness,
Array.from(awarenessStates.keys())
)
);
send(doc, socket, encoding.toUint8Array(encoder));
}
}
};

View File

@@ -0,0 +1,10 @@
{
"extends": "./tsconfig.json",
"compilerOptions": {
"outDir": "../../dist/out-tsc",
"module": "commonjs",
"types": ["node"]
},
"exclude": ["jest.config.ts", "**/*.spec.ts", "**/*.test.ts"],
"include": ["**/*.ts"]
}

13
apps/keck/tsconfig.json Normal file
View File

@@ -0,0 +1,13 @@
{
"extends": "../../tsconfig.base.json",
"files": [],
"include": [],
"references": [
{
"path": "./tsconfig.app.json"
},
{
"path": "./tsconfig.spec.json"
}
]
}

View File

@@ -0,0 +1,9 @@
{
"extends": "./tsconfig.json",
"compilerOptions": {
"outDir": "../../dist/out-tsc",
"module": "commonjs",
"types": ["jest", "node"]
},
"include": ["jest.config.ts", "**/*.test.ts", "**/*.spec.ts", "**/*.d.ts"]
}