feat(nbstore): add nbstore worker (#9185)

This commit is contained in:
EYHN
2024-12-20 08:01:23 +00:00
parent 30200ff86d
commit cbaf35df0b
51 changed files with 1144 additions and 501 deletions

View File

@@ -1,9 +1,6 @@
import { nanoid } from 'nanoid';
import {
type AwarenessRecord,
AwarenessStorage,
} from '../../storage/awareness';
import { type AwarenessRecord, AwarenessStorageBase } from '../../storage';
import { BroadcastChannelConnection } from './channel';
type ChannelMessage =
@@ -19,13 +16,13 @@ type ChannelMessage =
collectId: string;
}
| {
type: 'awareness-collect-fallback';
type: 'awareness-collect-feedback';
docId: string;
bin: Uint8Array;
collectId: string;
};
export class BroadcastChannelAwarenessStorage extends AwarenessStorage {
export class BroadcastChannelAwarenessStorage extends AwarenessStorageBase {
override readonly storageType = 'awareness';
override readonly connection = new BroadcastChannelConnection(this.options);
get channel() {
@@ -36,7 +33,7 @@ export class BroadcastChannelAwarenessStorage extends AwarenessStorage {
string,
Set<{
onUpdate: (update: AwarenessRecord, origin?: string) => void;
onCollect: () => AwarenessRecord;
onCollect: () => Promise<AwarenessRecord | null>;
}>
>();
@@ -57,12 +54,20 @@ export class BroadcastChannelAwarenessStorage extends AwarenessStorage {
override subscribeUpdate(
id: string,
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => AwarenessRecord
onCollect: () => Promise<AwarenessRecord | null>
): () => void {
const subscribers = this.subscriptions.get(id) ?? new Set();
subscribers.forEach(subscriber => {
const fallback = subscriber.onCollect();
onUpdate(fallback);
subscriber
.onCollect()
.then(awareness => {
if (awareness) {
onUpdate(awareness);
}
})
.catch(error => {
console.error('error in on collect awareness', error);
});
});
const collectUniqueId = nanoid();
@@ -84,18 +89,23 @@ export class BroadcastChannelAwarenessStorage extends AwarenessStorage {
message.data.type === 'awareness-collect' &&
message.data.docId === id
) {
const fallback = onCollect();
if (fallback) {
this.channel.postMessage({
type: 'awareness-collect-fallback',
docId: message.data.docId,
bin: fallback.bin,
collectId: collectUniqueId,
} satisfies ChannelMessage);
}
onCollect()
.then(awareness => {
if (awareness) {
this.channel.postMessage({
type: 'awareness-collect-feedback',
docId: message.data.docId,
bin: awareness.bin,
collectId: collectUniqueId,
} satisfies ChannelMessage);
}
})
.catch(error => {
console.error('error in on collect awareness', error);
});
}
if (
message.data.type === 'awareness-collect-fallback' &&
message.data.type === 'awareness-collect-feedback' &&
message.data.docId === id &&
message.data.collectId === collectUniqueId
) {

View File

@@ -1,7 +1,7 @@
import { Connection } from '../../connection';
import { AutoReconnectConnection } from '../../connection';
import type { StorageOptions } from '../../storage';
export class BroadcastChannelConnection extends Connection<BroadcastChannel> {
export class BroadcastChannelConnection extends AutoReconnectConnection<BroadcastChannel> {
readonly channelName = `channel:${this.opts.peer}:${this.opts.type}:${this.opts.id}`;
constructor(private readonly opts: StorageOptions) {

View File

@@ -3,7 +3,7 @@ import type { SocketOptions } from 'socket.io-client';
import { share } from '../../connection';
import {
type AwarenessRecord,
AwarenessStorage,
AwarenessStorageBase,
type AwarenessStorageOptions,
} from '../../storage/awareness';
import {
@@ -16,7 +16,7 @@ interface CloudAwarenessStorageOptions extends AwarenessStorageOptions {
socketOptions: SocketOptions;
}
export class CloudAwarenessStorage extends AwarenessStorage<CloudAwarenessStorageOptions> {
export class CloudAwarenessStorage extends AwarenessStorageBase<CloudAwarenessStorageOptions> {
connection = share(
new SocketConnection(this.peer, this.options.socketOptions)
);
@@ -38,7 +38,7 @@ export class CloudAwarenessStorage extends AwarenessStorage<CloudAwarenessStorag
override subscribeUpdate(
id: string,
onUpdate: (update: AwarenessRecord, origin?: string) => void,
onCollect: () => AwarenessRecord
onCollect: () => Promise<AwarenessRecord | null>
): () => void {
// TODO: handle disconnect
// leave awareness
@@ -92,14 +92,16 @@ export class CloudAwarenessStorage extends AwarenessStorage<CloudAwarenessStorag
docId === id
) {
(async () => {
const record = onCollect();
const encodedUpdate = await uint8ArrayToBase64(record.bin);
this.socket.emit('space:update-awareness', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId: record.docId,
awarenessUpdate: encodedUpdate,
});
const record = await onCollect();
if (record) {
const encodedUpdate = await uint8ArrayToBase64(record.bin);
this.socket.emit('space:update-awareness', {
spaceType: this.spaceType,
spaceId: this.spaceId,
docId: record.docId,
awarenessUpdate: encodedUpdate,
});
}
})().catch(err => console.error('awareness upload failed', err));
}
};

View File

@@ -7,16 +7,31 @@ import {
} from '@affine/graphql';
import { DummyConnection } from '../../connection';
import { type BlobRecord, BlobStorage } from '../../storage';
import {
type BlobRecord,
BlobStorageBase,
type BlobStorageOptions,
} from '../../storage';
export class CloudBlobStorage extends BlobStorage {
private readonly gql = gqlFetcherFactory(this.options.peer + '/graphql');
interface CloudBlobStorageOptions extends BlobStorageOptions {
apiBaseUrl: string;
}
export class CloudBlobStorage extends BlobStorageBase<CloudBlobStorageOptions> {
private readonly gql = gqlFetcherFactory(
this.options.apiBaseUrl + '/graphql'
);
override connection = new DummyConnection();
override async get(key: string) {
const res = await fetch(
this.options.peer + '/api/workspaces/' + this.spaceId + '/blobs/' + key,
this.options.apiBaseUrl +
'/api/workspaces/' +
this.spaceId +
'/blobs/' +
key,
{
cache: 'default',
headers: {
'x-affine-version': BUILD_CONFIG.appVersion,
},

View File

@@ -1,10 +1,14 @@
import type { SocketOptions } from 'socket.io-client';
import type { Socket, SocketOptions } from 'socket.io-client';
import { share } from '../../connection';
import {
type Connection,
type ConnectionStatus,
share,
} from '../../connection';
import {
type DocClock,
type DocClocks,
DocStorage,
DocStorageBase,
type DocStorageOptions,
type DocUpdate,
} from '../../storage';
@@ -17,63 +21,14 @@ import {
interface CloudDocStorageOptions extends DocStorageOptions {
socketOptions: SocketOptions;
serverBaseUrl: string;
}
export class CloudDocStorage extends DocStorage<CloudDocStorageOptions> {
connection = share(
new SocketConnection(this.peer, this.options.socketOptions)
);
private disposeConnectionStatusListener?: () => void;
private get socket() {
export class CloudDocStorage extends DocStorageBase<CloudDocStorageOptions> {
get socket() {
return this.connection.inner;
}
override connect() {
if (!this.disposeConnectionStatusListener) {
this.disposeConnectionStatusListener = this.connection.onStatusChanged(
status => {
if (status === 'connected') {
this.join().catch(err => {
console.error('doc storage join failed', err);
});
this.socket.on('space:broadcast-doc-update', this.onServerUpdate);
}
}
);
}
super.connect();
}
override disconnect() {
if (this.disposeConnectionStatusListener) {
this.disposeConnectionStatusListener();
}
this.socket.emit('space:leave', {
spaceType: this.spaceType,
spaceId: this.spaceId,
});
this.socket.off('space:broadcast-doc-update', this.onServerUpdate);
super.disconnect();
}
async join() {
try {
const res = await this.socket.emitWithAck('space:join', {
spaceType: this.spaceType,
spaceId: this.spaceId,
clientVersion: BUILD_CONFIG.appVersion,
});
if ('error' in res) {
this.connection.setStatus('closed', new Error(res.error.message));
}
} catch (e) {
this.connection.setStatus('error', e as Error);
}
}
onServerUpdate: ServerEventsMap['space:broadcast-doc-update'] = message => {
if (
this.spaceType === message.spaceType &&
@@ -88,6 +43,11 @@ export class CloudDocStorage extends DocStorage<CloudDocStorageOptions> {
}
};
readonly connection = new CloudDocStorageConnection(
this.options,
this.onServerUpdate
);
override async getDocSnapshot(docId: string) {
const response = await this.socket.emitWithAck('space:load-doc', {
spaceType: this.spaceType,
@@ -207,3 +167,84 @@ export class CloudDocStorage extends DocStorage<CloudDocStorageOptions> {
return 0;
}
}
class CloudDocStorageConnection implements Connection<Socket> {
connection = share(
new SocketConnection(
`${this.options.serverBaseUrl}/`,
this.options.socketOptions
)
);
private disposeConnectionStatusListener?: () => void;
private get socket() {
return this.connection.inner;
}
constructor(
private readonly options: CloudDocStorageOptions,
private readonly onServerUpdate: ServerEventsMap['space:broadcast-doc-update']
) {}
get status() {
return this.connection.status;
}
get inner() {
return this.connection.inner;
}
connect(): void {
if (!this.disposeConnectionStatusListener) {
this.disposeConnectionStatusListener = this.connection.onStatusChanged(
status => {
if (status === 'connected') {
this.join().catch(err => {
console.error('doc storage join failed', err);
});
this.socket.on('space:broadcast-doc-update', this.onServerUpdate);
}
}
);
}
return this.connection.connect();
}
async join() {
try {
const res = await this.socket.emitWithAck('space:join', {
spaceType: this.options.type,
spaceId: this.options.id,
clientVersion: BUILD_CONFIG.appVersion,
});
if ('error' in res) {
this.connection.setStatus('closed', new Error(res.error.message));
}
} catch (e) {
this.connection.setStatus('error', e as Error);
}
}
disconnect() {
if (this.disposeConnectionStatusListener) {
this.disposeConnectionStatusListener();
}
this.socket.emit('space:leave', {
spaceType: this.options.type,
spaceId: this.options.id,
});
this.socket.off('space:broadcast-doc-update', this.onServerUpdate);
this.connection.disconnect();
}
waitForConnected(signal?: AbortSignal): Promise<void> {
return this.connection.waitForConnected(signal);
}
onStatusChanged(
cb: (status: ConnectionStatus, error?: Error) => void
): () => void {
return this.connection.onStatusChanged(cb);
}
}

View File

@@ -4,7 +4,10 @@ import {
type SocketOptions,
} from 'socket.io-client';
import { Connection, type ConnectionStatus } from '../../connection';
import {
AutoReconnectConnection,
type ConnectionStatus,
} from '../../connection';
// TODO(@forehalo): use [UserFriendlyError]
interface EventError {
@@ -150,7 +153,7 @@ export function base64ToUint8Array(base64: string) {
return new Uint8Array(binaryArray);
}
export class SocketConnection extends Connection<Socket> {
export class SocketConnection extends AutoReconnectConnection<Socket> {
manager = new SocketIOManager(this.endpoint, {
autoConnect: false,
transports: ['websocket'],

View File

@@ -1,12 +1,12 @@
import { share } from '../../connection';
import {
type BlobRecord,
BlobStorage,
BlobStorageBase,
type ListedBlobRecord,
} from '../../storage';
import { IDBConnection } from './db';
export class IndexedDBBlobStorage extends BlobStorage {
export class IndexedDBBlobStorage extends BlobStorageBase {
readonly connection = share(new IDBConnection(this.options));
get db() {

View File

@@ -1,10 +1,10 @@
import { type IDBPDatabase, openDB } from 'idb';
import { Connection } from '../../connection';
import { AutoReconnectConnection } from '../../connection';
import type { StorageOptions } from '../../storage';
import { type DocStorageSchema, migrator } from './schema';
export class IDBConnection extends Connection<{
export class IDBConnection extends AutoReconnectConnection<{
db: IDBPDatabase<DocStorageSchema>;
channel: BroadcastChannel;
}> {

View File

@@ -2,7 +2,7 @@ import {
type DocClock,
type DocClocks,
type DocRecord,
DocStorage,
DocStorageBase,
type DocStorageOptions,
type DocUpdate,
} from '../../storage';
@@ -15,7 +15,7 @@ interface ChannelMessage {
origin?: string;
}
export class IndexedDBDocStorage extends DocStorage {
export class IndexedDBDocStorage extends DocStorageBase {
readonly connection = new IDBConnection(this.options);
get db() {

View File

@@ -1,7 +1,7 @@
import { share } from '../../connection';
import { type DocClock, type DocClocks, SyncStorage } from '../../storage';
import { BasicSyncStorage, type DocClock, type DocClocks } from '../../storage';
import { IDBConnection } from './db';
export class IndexedDBSyncStorage extends SyncStorage {
export class IndexedDBSyncStorage extends BasicSyncStorage {
readonly connection = share(new IDBConnection(this.options));
get db() {

View File

@@ -1,11 +1,11 @@
import { share } from '../../../connection';
import { BlobStorage, type ListedBlobRecord } from '../../../storage';
import { BlobStorageBase, type ListedBlobRecord } from '../../../storage';
import { BlobIDBConnection } from './db';
/**
* @deprecated readonly
*/
export class IndexedDBV1BlobStorage extends BlobStorage {
export class IndexedDBV1BlobStorage extends BlobStorageBase {
readonly connection = share(new BlobIDBConnection(this.spaceId));
get db() {

View File

@@ -1,6 +1,6 @@
import { type DBSchema, type IDBPDatabase, openDB } from 'idb';
import { Connection } from '../../../connection';
import { AutoReconnectConnection } from '../../../connection';
export interface DocDBSchema extends DBSchema {
workspace: {
@@ -15,7 +15,9 @@ export interface DocDBSchema extends DBSchema {
};
}
export class DocIDBConnection extends Connection<IDBPDatabase<DocDBSchema>> {
export class DocIDBConnection extends AutoReconnectConnection<
IDBPDatabase<DocDBSchema>
> {
override get shareId() {
return 'idb(old):affine-local';
}
@@ -40,7 +42,9 @@ export interface BlobDBSchema extends DBSchema {
};
}
export class BlobIDBConnection extends Connection<IDBPDatabase<BlobDBSchema>> {
export class BlobIDBConnection extends AutoReconnectConnection<
IDBPDatabase<BlobDBSchema>
> {
constructor(private readonly workspaceId: string) {
super();
}

View File

@@ -1,11 +1,15 @@
import { share } from '../../../connection';
import { type DocRecord, DocStorage, type DocUpdate } from '../../../storage';
import {
type DocRecord,
DocStorageBase,
type DocUpdate,
} from '../../../storage';
import { DocIDBConnection } from './db';
/**
* @deprecated readonly
*/
export class IndexedDBV1DocStorage extends DocStorage {
export class IndexedDBV1DocStorage extends DocStorageBase {
readonly connection = share(new DocIDBConnection());
get db() {

View File

@@ -1,5 +1,10 @@
import type { Storage } from '../storage';
import { CloudBlobStorage, CloudDocStorage } from './cloud';
import { BroadcastChannelAwarenessStorage } from './broadcast-channel/awareness';
import {
CloudAwarenessStorage,
CloudBlobStorage,
CloudDocStorage,
} from './cloud';
import {
IndexedDBBlobStorage,
IndexedDBDocStorage,
@@ -13,6 +18,7 @@ const idb: StorageConstructor[] = [
IndexedDBDocStorage,
IndexedDBBlobStorage,
IndexedDBSyncStorage,
BroadcastChannelAwarenessStorage,
];
const idbv1: StorageConstructor[] = [
@@ -20,7 +26,11 @@ const idbv1: StorageConstructor[] = [
IndexedDBV1BlobStorage,
];
const cloud: StorageConstructor[] = [CloudDocStorage, CloudBlobStorage];
const cloud: StorageConstructor[] = [
CloudDocStorage,
CloudBlobStorage,
CloudAwarenessStorage,
];
export const storages: StorageConstructor[] = cloud.concat(idbv1, idb);

View File

@@ -1,8 +1,8 @@
import { share } from '../../connection';
import { type BlobRecord, BlobStorage } from '../../storage';
import { type BlobRecord, BlobStorageBase } from '../../storage';
import { NativeDBConnection } from './db';
export class SqliteBlobStorage extends BlobStorage {
export class SqliteBlobStorage extends BlobStorageBase {
override connection = share(
new NativeDBConnection(this.peer, this.spaceType, this.spaceId)
);

View File

@@ -1,6 +1,6 @@
import { apis } from '@affine/electron-api';
import { Connection } from '../../connection';
import { AutoReconnectConnection } from '../../connection';
import { type SpaceType, universalId } from '../../storage';
type NativeDBApis = NonNullable<typeof apis>['nbstore'] extends infer APIs
@@ -13,7 +13,7 @@ type NativeDBApis = NonNullable<typeof apis>['nbstore'] extends infer APIs
}
: never;
export class NativeDBConnection extends Connection<void> {
export class NativeDBConnection extends AutoReconnectConnection<void> {
readonly apis: NativeDBApis;
constructor(

View File

@@ -1,8 +1,8 @@
import { share } from '../../connection';
import { type DocClock, DocStorage, type DocUpdate } from '../../storage';
import { type DocClock, DocStorageBase, type DocUpdate } from '../../storage';
import { NativeDBConnection } from './db';
export class SqliteDocStorage extends DocStorage {
export class SqliteDocStorage extends DocStorageBase {
override connection = share(
new NativeDBConnection(this.peer, this.spaceType, this.spaceId)
);

View File

@@ -1,8 +1,8 @@
import { share } from '../../connection';
import { type DocClock, SyncStorage } from '../../storage';
import { BasicSyncStorage, type DocClock } from '../../storage';
import { NativeDBConnection } from './db';
export class SqliteSyncStorage extends SyncStorage {
export class SqliteSyncStorage extends BasicSyncStorage {
override connection = share(
new NativeDBConnection(this.peer, this.spaceType, this.spaceId)
);

View File

@@ -1,13 +1,13 @@
import { apis } from '@affine/electron-api';
import { DummyConnection, share } from '../../../connection';
import { BlobStorage } from '../../../storage';
import { DummyConnection } from '../../../connection';
import { BlobStorageBase } from '../../../storage';
/**
* @deprecated readonly
*/
export class SqliteV1BlobStorage extends BlobStorage {
override connection = share(new DummyConnection());
export class SqliteV1BlobStorage extends BlobStorageBase {
override connection = new DummyConnection();
get db() {
if (!apis) {

View File

@@ -1,13 +1,17 @@
import { apis } from '@affine/electron-api';
import { DummyConnection, share } from '../../../connection';
import { type DocRecord, DocStorage, type DocUpdate } from '../../../storage';
import { DummyConnection } from '../../../connection';
import {
type DocRecord,
DocStorageBase,
type DocUpdate,
} from '../../../storage';
/**
* @deprecated readonly
*/
export class SqliteV1DocStorage extends DocStorage {
override connection = share(new DummyConnection());
export class SqliteV1DocStorage extends DocStorageBase {
override connection = new DummyConnection();
get db() {
if (!apis) {