feat(nbstore): add cloud implementation (#8810)

This commit is contained in:
forehalo
2024-12-10 10:48:27 +00:00
parent 1721875ab6
commit 2f80b4f822
32 changed files with 1030 additions and 315 deletions

View File

@@ -8,7 +8,8 @@
".": "./src/index.ts",
"./op": "./src/op/index.ts",
"./idb": "./src/impls/idb/index.ts",
"./idb/v1": "./src/impls/idb/v1/index.ts"
"./idb/v1": "./src/impls/idb/v1/index.ts",
"./cloud": "./src/impls/cloud/index.ts"
},
"dependencies": {
"@datastructures-js/binary-search-tree": "^5.3.2",
@@ -20,11 +21,15 @@
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
},
"devDependencies": {
"@affine/graphql": "workspace:*",
"fake-indexeddb": "^6.0.0",
"idb": "^8.0.0",
"socket.io-client": "^4.7.5",
"vitest": "2.1.4"
},
"peerDependencies": {
"idb": "^8.0.0"
"@affine/graphql": "workspace:*",
"idb": "^8.0.0",
"socket.io-client": "^4.7.5"
}
}

View File

@@ -0,0 +1,72 @@
import {
deleteBlobMutation,
gqlFetcherFactory,
listBlobsQuery,
releaseDeletedBlobsMutation,
setBlobMutation,
} from '@affine/graphql';
import { DummyConnection } from '../../connection';
import { type BlobRecord, BlobStorage } from '../../storage';
export class CloudBlobStorage extends BlobStorage {
private readonly gql = gqlFetcherFactory(this.options.peer + '/graphql');
override connection = new DummyConnection();
override async get(key: string) {
const res = await fetch(
this.options.peer + '/api/workspaces/' + this.spaceId + '/blobs/' + key,
{ cache: 'default' }
);
if (!res.ok) {
return null;
}
const data = await res.arrayBuffer();
return {
key,
data: new Uint8Array(data),
mime: res.headers.get('content-type') || '',
size: data.byteLength,
createdAt: new Date(res.headers.get('last-modified') || Date.now()),
};
}
override async set(blob: BlobRecord) {
await this.gql({
query: setBlobMutation,
variables: {
workspaceId: this.spaceId,
blob: new File([blob.data], blob.key, { type: blob.mime }),
},
});
}
override async delete(key: string, permanently: boolean) {
await this.gql({
query: deleteBlobMutation,
variables: { workspaceId: this.spaceId, key, permanently },
});
}
override async release() {
await this.gql({
query: releaseDeletedBlobsMutation,
variables: { workspaceId: this.spaceId },
});
}
override async list() {
const res = await this.gql({
query: listBlobsQuery,
variables: { workspaceId: this.spaceId },
});
return res.workspace.blobs.map(blob => ({
...blob,
createdAt: new Date(blob.createdAt),
}));
}
}

View File

@@ -0,0 +1,199 @@
import { noop } from 'lodash-es';
import type { SocketOptions } from 'socket.io-client';
import { share } from '../../connection';
import {
type DocClock,
type DocClocks,
DocStorage,
type DocStorageOptions,
type DocUpdate,
} from '../../storage';
import {
base64ToUint8Array,
type ServerEventsMap,
SocketConnection,
uint8ArrayToBase64,
} from './socket';
interface CloudDocStorageOptions extends DocStorageOptions {
socketOptions: SocketOptions;
}
export class CloudDocStorage extends DocStorage<CloudDocStorageOptions> {
connection = share(
new SocketConnection(this.peer, this.options.socketOptions)
);
private get socket() {
return this.connection.inner;
}
override async connect(): Promise<void> {
await super.connect();
this.connection.onStatusChanged(status => {
if (status === 'connected') {
this.join().catch(noop);
this.socket.on('space:broadcast-doc-update', this.onServerUpdate);
}
});
}
override async disconnect(): Promise<void> {
this.socket.emit('space:leave', {
spaceType: this.spaceType,
spaceId: this.spaceId,
});
this.socket.off('space:broadcast-doc-update', this.onServerUpdate);
await super.connect();
}
async join() {
try {
const res = await this.socket.emitWithAck('space:join', {
spaceType: this.spaceType,
spaceId: this.spaceId,
clientVersion: BUILD_CONFIG.appVersion,
});
if ('error' in res) {
this.connection.setStatus('closed', new Error(res.error.message));
}
} catch (e) {
this.connection.setStatus('error', e as Error);
}
}
onServerUpdate: ServerEventsMap['space:broadcast-doc-update'] = message => {
if (
this.spaceType === message.spaceType &&
this.spaceId === message.spaceId
) {
this.emit('update', {
docId: message.docId,
bin: base64ToUint8Array(message.update),
timestamp: new Date(message.timestamp),
editor: message.editor,
});
}
};
override async getDocSnapshot(docId: string) {
const response = await this.socket.emitWithAck('space:load-doc', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId,
});
if ('error' in response) {
// TODO: use [UserFriendlyError]
throw new Error(response.error.message);
}
return {
docId,
bin: base64ToUint8Array(response.data.missing),
timestamp: new Date(response.data.timestamp),
};
}
override async getDocDiff(docId: string, state?: Uint8Array) {
const response = await this.socket.emitWithAck('space:load-doc', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId,
stateVector: state ? await uint8ArrayToBase64(state) : void 0,
});
if ('error' in response) {
// TODO: use [UserFriendlyError]
throw new Error(response.error.message);
}
return {
docId,
missing: base64ToUint8Array(response.data.missing),
state: base64ToUint8Array(response.data.state),
timestamp: new Date(response.data.timestamp),
};
}
override async pushDocUpdate(update: DocUpdate) {
const response = await this.socket.emitWithAck('space:push-doc-update', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId: update.docId,
updates: await uint8ArrayToBase64(update.bin),
});
if ('error' in response) {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(response.error.message);
}
return {
docId: update.docId,
timestamp: new Date(response.data.timestamp),
};
}
/**
* Just a rough implementation, cloud doc storage should not need this method.
*/
override async getDocTimestamp(docId: string): Promise<DocClock | null> {
const response = await this.socket.emitWithAck('space:load-doc', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId,
});
if ('error' in response) {
// TODO: use [UserFriendlyError]
throw new Error(response.error.message);
}
return {
docId,
timestamp: new Date(response.data.timestamp),
};
}
override async getDocTimestamps(after?: Date) {
const response = await this.socket.emitWithAck(
'space:load-doc-timestamps',
{
spaceType: this.spaceType,
spaceId: this.spaceId,
timestamp: after ? after.getTime() : undefined,
}
);
if ('error' in response) {
// TODO(@forehalo): use [UserFriendlyError]
throw new Error(response.error.message);
}
return Object.entries(response.data).reduce((ret, [docId, timestamp]) => {
ret[docId] = new Date(timestamp);
return ret;
}, {} as DocClocks);
}
override async deleteDoc(docId: string) {
this.socket.emit('space:delete-doc', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId,
});
}
protected async setDocSnapshot() {
return false;
}
protected async getDocUpdates() {
return [];
}
protected async markUpdatesMerged() {
return 0;
}
}

View File

@@ -0,0 +1,2 @@
export * from './blob';
export * from './doc';

View File

@@ -0,0 +1,173 @@
import {
Manager as SocketIOManager,
type Socket as SocketIO,
type SocketOptions,
} from 'socket.io-client';
import { Connection, type ConnectionStatus } from '../../connection';
// TODO(@forehalo): use [UserFriendlyError]
interface EventError {
name: string;
message: string;
}
type WebsocketResponse<T> =
| {
error: EventError;
}
| {
data: T;
};
interface ServerEvents {
'space:broadcast-doc-update': {
spaceType: string;
spaceId: string;
docId: string;
update: string;
timestamp: number;
editor: string;
};
}
interface ClientEvents {
'space:join': [
{ spaceType: string; spaceId: string; clientVersion: string },
{ clientId: string },
];
'space:leave': { spaceType: string; spaceId: string };
'space:join-awareness': [
{
spaceType: string;
spaceId: string;
docId: string;
clientVersion: string;
},
{ clientId: string },
];
'space:leave-awareness': {
spaceType: string;
spaceId: string;
docId: string;
};
'space:push-doc-update': [
{ spaceType: string; spaceId: string; docId: string; updates: string },
{ timestamp: number },
];
'space:load-doc-timestamps': [
{
spaceType: string;
spaceId: string;
timestamp?: number;
},
Record<string, number>,
];
'space:load-doc': [
{
spaceType: string;
spaceId: string;
docId: string;
stateVector?: string;
},
{
missing: string;
state: string;
timestamp: number;
},
];
'space:delete-doc': { spaceType: string; spaceId: string; docId: string };
}
export type ServerEventsMap = {
[Key in keyof ServerEvents]: (data: ServerEvents[Key]) => void;
};
export type ClientEventsMap = {
[Key in keyof ClientEvents]: ClientEvents[Key] extends Array<any>
? (
data: ClientEvents[Key][0],
ack: (res: WebsocketResponse<ClientEvents[Key][1]>) => void
) => void
: (data: ClientEvents[Key]) => void;
};
export type Socket = SocketIO<ServerEventsMap, ClientEventsMap>;
export function uint8ArrayToBase64(array: Uint8Array): Promise<string> {
return new Promise<string>(resolve => {
// Create a blob from the Uint8Array
const blob = new Blob([array]);
const reader = new FileReader();
reader.onload = function () {
const dataUrl = reader.result as string | null;
if (!dataUrl) {
resolve('');
return;
}
// The result includes the `data:` URL prefix and the MIME type. We only want the Base64 data
const base64 = dataUrl.split(',')[1];
resolve(base64);
};
reader.readAsDataURL(blob);
});
}
export function base64ToUint8Array(base64: string) {
const binaryString = atob(base64);
const binaryArray = binaryString.split('').map(function (char) {
return char.charCodeAt(0);
});
return new Uint8Array(binaryArray);
}
export class SocketConnection extends Connection<Socket> {
manager = new SocketIOManager(this.endpoint, {
autoConnect: false,
transports: ['websocket'],
secure: new URL(this.endpoint).protocol === 'https:',
});
constructor(
private readonly endpoint: string,
private readonly socketOptions: SocketOptions
) {
super();
}
override get shareId() {
return `socket:${this.endpoint}`;
}
override async doConnect() {
const conn = this.manager.socket('/', this.socketOptions);
await new Promise<void>((resolve, reject) => {
conn.once('connect', () => {
resolve();
});
conn.once('connect_error', err => {
reject(err);
});
conn.open();
});
return conn;
}
override async doDisconnect(conn: Socket) {
conn.close();
}
/**
* Socket connection allow explicitly set status by user
*
* used when join space failed
*/
override setStatus(status: ConnectionStatus, error?: Error) {
super.setStatus(status, error);
}
}

View File

@@ -1,4 +1,5 @@
import type { Storage } from '../storage';
import { CloudBlobStorage, CloudDocStorage } from './cloud';
import {
IndexedDBBlobStorage,
IndexedDBDocStorage,
@@ -19,7 +20,9 @@ const idbv1: StorageConstructor[] = [
IndexedDBV1BlobStorage,
];
export const storages: StorageConstructor[] = [...idbv1, ...idb];
const cloud: StorageConstructor[] = [CloudDocStorage, CloudBlobStorage];
export const storages: StorageConstructor[] = cloud.concat(idbv1, idb);
const AvailableStorageImplementations = storages.reduce(
(acc, curr) => {