feat(nbstore): add sqlite implementation (#8811)

This commit is contained in:
forehalo
2024-12-13 06:13:05 +00:00
parent 932e1da7f3
commit 8c24f2b906
66 changed files with 2932 additions and 397 deletions

View File

@@ -1 +0,0 @@
export type SpaceType = 'userspace' | 'workspace';

View File

@@ -2,9 +2,9 @@ import { ValidationResult } from '@affine/native';
import fs from 'fs-extra';
import { nanoid } from 'nanoid';
import { ensureSQLiteDB } from '../db/ensure-db';
import { logger } from '../logger';
import { mainRPC } from '../main-rpc';
import { ensureSQLiteDB } from '../nbstore/v1';
import { storeWorkspaceMeta } from '../workspace';
import { getWorkspaceDBPath, getWorkspacesBasePath } from '../workspace/meta';
@@ -69,14 +69,20 @@ function getDefaultDBFileName(name: string, id: string) {
*
* It will just copy the file to the given path
*/
export async function saveDBFileAs(
workspaceId: string
): Promise<SaveDBFileResult> {
export async function saveDBFileAs(id: string): Promise<SaveDBFileResult> {
try {
const db = await ensureSQLiteDB('workspace', workspaceId);
await db.checkpoint(); // make sure all changes (WAL) are written to db
const fakedResult = getFakedResult();
// TODO(@forehalo): use `nbstore` when it is ready
// const storage = await ensureStorage(id);
const storage = await ensureSQLiteDB('workspace', id);
await storage.checkpoint(); // make sure all changes (WAL) are written to db
const fakedResult = getFakedResult();
const dbPath = storage.path;
if (!dbPath) {
return {
error: 'DB_FILE_PATH_INVALID',
};
}
const ret =
fakedResult ??
(await mainRPC.showSaveDialog({
@@ -91,8 +97,8 @@ export async function saveDBFileAs(
},
],
defaultPath: getDefaultDBFileName(
await db.getWorkspaceName(),
workspaceId
(await storage.getWorkspaceName()) ?? 'db',
id
),
message: 'Save Workspace as a SQLite Database file',
}));
@@ -103,7 +109,7 @@ export async function saveDBFileAs(
};
}
await fs.copyFile(db.path, filePath);
await fs.copyFile(dbPath, filePath);
logger.log('saved', filePath);
if (!fakedResult) {
mainRPC.showItemInFolder(filePath).catch(err => {
@@ -188,28 +194,35 @@ export async function loadDBFile(): Promise<LoadDBFileResult> {
return { error: 'DB_FILE_PATH_INVALID' };
}
const { SqliteConnection } = await import('@affine/native');
const validationResult = await SqliteConnection.validate(originalPath);
if (validationResult !== ValidationResult.Valid) {
return { error: 'DB_FILE_INVALID' }; // invalid db file
}
// copy the db file to a new workspace id
const workspaceId = nanoid(10);
const internalFilePath = await getWorkspaceDBPath('workspace', workspaceId);
return loadV1DBFile(originalPath, workspaceId);
await fs.ensureDir(await getWorkspacesBasePath());
await fs.copy(originalPath, internalFilePath);
logger.info(`loadDBFile, copy: ${originalPath} -> ${internalFilePath}`);
// TODO(forehalo): use `nbstore` when it is ready
// let storage = new DocStorage(originalPath);
await storeWorkspaceMeta(workspaceId, {
id: workspaceId,
mainDBPath: internalFilePath,
});
// // if imported db is not a valid v2 db, we will treat it as a v1 db
// if (!(await storage.validate())) {
// return loadV1DBFile(originalPath, workspaceId);
// }
return { workspaceId };
// // v2 import logic
// const internalFilePath = await getSpaceDBPath(
// 'local',
// 'workspace',
// workspaceId
// );
// await fs.ensureDir(await getWorkspacesBasePath());
// await fs.copy(originalPath, internalFilePath);
// logger.info(`loadDBFile, copy: ${originalPath} -> ${internalFilePath}`);
// storage = new DocStorage(internalFilePath);
// await storage.connect();
// await storage.setSpaceId(workspaceId);
// await storage.close();
// return {
// workspaceId,
// };
} catch (err) {
logger.error('loadDBFile', err);
return {
@@ -217,3 +230,31 @@ export async function loadDBFile(): Promise<LoadDBFileResult> {
};
}
}
async function loadV1DBFile(
originalPath: string,
workspaceId: string
): Promise<LoadDBFileResult> {
const { SqliteConnection } = await import('@affine/native');
const validationResult = await SqliteConnection.validate(originalPath);
if (validationResult !== ValidationResult.Valid) {
return { error: 'DB_FILE_INVALID' }; // invalid db file
}
const internalFilePath = await getWorkspaceDBPath('workspace', workspaceId);
await fs.ensureDir(await getWorkspacesBasePath());
await fs.copy(originalPath, internalFilePath);
logger.info(`loadDBFile, copy: ${originalPath} -> ${internalFilePath}`);
await storeWorkspaceMeta(workspaceId, {
id: workspaceId,
mainDBPath: internalFilePath,
});
return {
workspaceId,
};
}

View File

@@ -9,8 +9,8 @@ export const dialogHandlers = {
loadDBFile: async () => {
return loadDBFile();
},
saveDBFileAs: async (workspaceId: string) => {
return saveDBFileAs(workspaceId);
saveDBFileAs: async (id: string) => {
return saveDBFileAs(id);
},
selectDBFileLocation: async () => {
return selectDBFileLocation();

View File

@@ -1,17 +1,24 @@
import { dbEvents, dbHandlers } from './db';
import { dialogHandlers } from './dialog';
import {
dbEventsV1,
dbHandlersV1,
nbstoreEvents,
nbstoreHandlers,
} from './nbstore';
import { provideExposed } from './provide';
import { workspaceEvents, workspaceHandlers } from './workspace';
export const handlers = {
db: dbHandlers,
db: dbHandlersV1,
nbstore: nbstoreHandlers,
workspace: workspaceHandlers,
dialog: dialogHandlers,
};
export const events = {
db: dbEvents,
db: dbEventsV1,
workspace: workspaceEvents,
nbstore: nbstoreEvents,
};
const getExposedMeta = () => {

View File

@@ -0,0 +1,33 @@
import { type BlobRecord, BlobStorage, share } from '@affine/nbstore';
import { NativeDBConnection } from './db';
export class SqliteBlobStorage extends BlobStorage {
override connection = share(
new NativeDBConnection(this.peer, this.spaceType, this.spaceId)
);
get db() {
return this.connection.inner;
}
override async get(key: string) {
return this.db.getBlob(key);
}
override async set(blob: BlobRecord) {
await this.db.setBlob(blob);
}
override async delete(key: string, permanently: boolean) {
await this.db.deleteBlob(key, permanently);
}
override async release() {
await this.db.releaseBlobs();
}
override async list() {
return this.db.listBlobs();
}
}

View File

@@ -0,0 +1,40 @@
import path from 'node:path';
import { DocStorage as NativeDocStorage } from '@affine/native';
import { Connection, type SpaceType } from '@affine/nbstore';
import fs from 'fs-extra';
import { logger } from '../logger';
import { getSpaceDBPath } from '../workspace/meta';
export class NativeDBConnection extends Connection<NativeDocStorage> {
constructor(
private readonly peer: string,
private readonly type: SpaceType,
private readonly id: string
) {
super();
}
async getDBPath() {
return await getSpaceDBPath(this.peer, this.type, this.id);
}
override get shareId(): string {
return `sqlite:${this.peer}:${this.type}:${this.id}`;
}
override async doConnect() {
const dbPath = await this.getDBPath();
await fs.ensureDir(path.dirname(dbPath));
const conn = new NativeDocStorage(dbPath);
await conn.connect();
logger.info('[nbstore] connection established', this.shareId);
return conn;
}
override async doDisconnect(conn: NativeDocStorage) {
await conn.close();
logger.info('[nbstore] connection closed', this.shareId);
}
}

View File

@@ -0,0 +1,83 @@
import {
type DocClocks,
type DocRecord,
DocStorage,
type DocUpdate,
share,
} from '@affine/nbstore';
import { NativeDBConnection } from './db';
export class SqliteDocStorage extends DocStorage {
override connection = share(
new NativeDBConnection(this.peer, this.spaceType, this.spaceId)
);
get db() {
return this.connection.inner;
}
override async pushDocUpdate(update: DocUpdate) {
const timestamp = await this.db.pushUpdate(update.docId, update.bin);
return { docId: update.docId, timestamp };
}
override async deleteDoc(docId: string) {
await this.db.deleteDoc(docId);
}
override async getDocTimestamps(after?: Date) {
const clocks = await this.db.getDocClocks(after);
return clocks.reduce((ret, cur) => {
ret[cur.docId] = cur.timestamp;
return ret;
}, {} as DocClocks);
}
override async getDocTimestamp(docId: string) {
return this.db.getDocClock(docId);
}
protected override async getDocSnapshot(docId: string) {
const snapshot = await this.db.getDocSnapshot(docId);
if (!snapshot) {
return null;
}
return {
docId,
bin: snapshot.data,
timestamp: snapshot.timestamp,
};
}
protected override async setDocSnapshot(
snapshot: DocRecord
): Promise<boolean> {
return this.db.setDocSnapshot({
docId: snapshot.docId,
data: Buffer.from(snapshot.bin),
timestamp: new Date(snapshot.timestamp),
});
}
protected override async getDocUpdates(docId: string) {
return this.db.getDocUpdates(docId).then(updates =>
updates.map(update => ({
docId,
bin: update.data,
timestamp: update.createdAt,
}))
);
}
protected override markUpdatesMerged(docId: string, updates: DocRecord[]) {
return this.db.markUpdatesMerged(
docId,
updates.map(update => update.timestamp)
);
}
}

View File

@@ -0,0 +1,143 @@
import {
type BlobRecord,
type DocClock,
type DocUpdate,
} from '@affine/nbstore';
import type { MainEventRegister } from '../type';
import {
type ConnectionStatus,
ensureStorage,
getStorage,
onConnectionChanged,
} from './storage';
export const nbstoreHandlers = {
connect: async (id: string) => {
await ensureStorage(id);
},
close: async (id: string) => {
const store = getStorage(id);
if (store) {
await store.disconnect();
// The store may be shared with other tabs, so we don't delete it from cache
// the underlying connection will handle the close correctly
// STORE_CACHE.delete(`${spaceType}:${spaceId}`);
}
},
pushDocUpdate: async (id: string, update: DocUpdate) => {
const store = await ensureStorage(id);
return store.get('doc').pushDocUpdate(update);
},
getDoc: async (id: string, docId: string) => {
const store = await ensureStorage(id);
return store.get('doc').getDoc(docId);
},
deleteDoc: async (id: string, docId: string) => {
const store = await ensureStorage(id);
return store.get('doc').deleteDoc(docId);
},
getDocTimestamps: async (id: string, after?: Date) => {
const store = await ensureStorage(id);
return store.get('doc').getDocTimestamps(after);
},
getDocTimestamp: async (id: string, docId: string) => {
const store = await ensureStorage(id);
return store.get('doc').getDocTimestamp(docId);
},
setBlob: async (id: string, blob: BlobRecord) => {
const store = await ensureStorage(id);
return store.get('blob').set(blob);
},
getBlob: async (id: string, key: string) => {
const store = await ensureStorage(id);
return store.get('blob').get(key);
},
deleteBlob: async (id: string, key: string, permanently: boolean) => {
const store = await ensureStorage(id);
return store.get('blob').delete(key, permanently);
},
listBlobs: async (id: string) => {
const store = await ensureStorage(id);
return store.get('blob').list();
},
releaseBlobs: async (id: string) => {
const store = await ensureStorage(id);
return store.get('blob').release();
},
getPeerRemoteClocks: async (id: string, peer: string) => {
const store = await ensureStorage(id);
return store.get('sync').getPeerRemoteClocks(peer);
},
getPeerRemoteClock: async (id: string, peer: string, docId: string) => {
const store = await ensureStorage(id);
return store.get('sync').getPeerRemoteClock(peer, docId);
},
setPeerRemoteClock: async (id: string, peer: string, clock: DocClock) => {
const store = await ensureStorage(id);
return store.get('sync').setPeerRemoteClock(peer, clock);
},
getPeerPulledRemoteClocks: async (id: string, peer: string) => {
const store = await ensureStorage(id);
return store.get('sync').getPeerPulledRemoteClocks(peer);
},
getPeerPulledRemoteClock: async (id: string, peer: string, docId: string) => {
const store = await ensureStorage(id);
return store.get('sync').getPeerPulledRemoteClock(peer, docId);
},
setPeerPulledRemoteClock: async (
id: string,
peer: string,
clock: DocClock
) => {
const store = await ensureStorage(id);
return store.get('sync').setPeerPulledRemoteClock(peer, clock);
},
getPeerPushedClocks: async (id: string, peer: string) => {
const store = await ensureStorage(id);
return store.get('sync').getPeerPushedClocks(peer);
},
getPeerPushedClock: async (id: string, peer: string, docId: string) => {
const store = await ensureStorage(id);
return store.get('sync').getPeerPushedClock(peer, docId);
},
setPeerPushedClock: async (id: string, peer: string, clock: DocClock) => {
const store = await ensureStorage(id);
return store.get('sync').setPeerPushedClock(peer, clock);
},
clearClocks: async (id: string) => {
const store = await ensureStorage(id);
return store.get('sync').clearClocks();
},
};
export const nbstoreEvents = {
onConnectionStatusChanged: (fn: (payload: ConnectionStatus) => void) => {
const sub = onConnectionChanged(fn);
return () => {
sub.unsubscribe();
};
},
} satisfies Record<string, MainEventRegister>;

View File

@@ -0,0 +1,4 @@
export { nbstoreEvents, nbstoreHandlers } from './handlers';
export * from './storage';
export { dbEvents as dbEventsV1, dbHandlers as dbHandlersV1 } from './v1';
export { universalId } from '@affine/nbstore';

View File

@@ -0,0 +1,127 @@
import {
parseUniversalId,
SpaceStorage,
type SpaceType,
type StorageType,
} from '@affine/nbstore';
import { Subject } from 'rxjs';
import { applyUpdate, Doc as YDoc } from 'yjs';
import { logger } from '../logger';
import { SqliteBlobStorage } from './blob';
import { NativeDBConnection } from './db';
import { SqliteDocStorage } from './doc';
import { SqliteSyncStorage } from './sync';
export class SqliteSpaceStorage extends SpaceStorage {
get connection() {
const docStore = this.get('doc');
if (!docStore) {
throw new Error('doc store not found');
}
const connection = docStore.connection;
if (!(connection instanceof NativeDBConnection)) {
throw new Error('doc store connection is not a Sqlite connection');
}
return connection;
}
async getDBPath() {
return this.connection.getDBPath();
}
async getWorkspaceName() {
const docStore = this.tryGet('doc');
if (!docStore) {
return null;
}
const doc = await docStore.getDoc(docStore.spaceId);
if (!doc) {
return null;
}
const ydoc = new YDoc();
applyUpdate(ydoc, doc.bin);
return ydoc.getMap('meta').get('name') as string;
}
async checkpoint() {
await this.connection.inner.checkpoint();
}
}
const STORE_CACHE = new Map<string, SqliteSpaceStorage>();
export interface ConnectionStatus {
peer: string;
spaceType: SpaceType;
spaceId: string;
storage: StorageType;
status: string;
error?: Error;
}
const CONNECTION$ = new Subject<ConnectionStatus>();
process.on('beforeExit', () => {
CONNECTION$.complete();
STORE_CACHE.forEach(store => {
store.destroy().catch(err => {
logger.error('[nbstore] destroy store failed', err);
});
});
});
export function onConnectionChanged(fn: (payload: ConnectionStatus) => void) {
return CONNECTION$.subscribe({ next: fn });
}
export function getStorage(universalId: string) {
return STORE_CACHE.get(universalId);
}
export async function ensureStorage(universalId: string) {
const { peer, type, id } = parseUniversalId(universalId);
let store = STORE_CACHE.get(universalId);
if (!store) {
const opts = {
peer,
type,
id,
};
store = new SqliteSpaceStorage([
new SqliteDocStorage(opts),
new SqliteBlobStorage(opts),
new SqliteSyncStorage(opts),
]);
store.on('connection', ({ storage, status, error }) => {
CONNECTION$.next({
peer,
spaceType: type,
spaceId: id,
storage,
status,
error,
});
logger.info(
`[nbstore] status changed: ${status}, spaceType: ${type}, spaceId: ${id}, storage: ${storage}`
);
if (error) {
logger.error(`[nbstore] connection error: ${error}`);
}
});
await store.connect();
STORE_CACHE.set(universalId, store);
}
return store;
}

View File

@@ -0,0 +1,70 @@
import {
type DocClock,
type DocClocks,
share,
SyncStorage,
} from '@affine/nbstore';
import { NativeDBConnection } from './db';
export class SqliteSyncStorage extends SyncStorage {
override connection = share(
new NativeDBConnection(this.peer, this.spaceType, this.spaceId)
);
get db() {
return this.connection.inner;
}
override async getPeerRemoteClocks(peer: string) {
const records = await this.db.getPeerRemoteClocks(peer);
return records.reduce((clocks, { docId, timestamp }) => {
clocks[docId] = timestamp;
return clocks;
}, {} as DocClocks);
}
override async getPeerRemoteClock(peer: string, docId: string) {
return this.db.getPeerRemoteClock(peer, docId);
}
override async setPeerRemoteClock(peer: string, clock: DocClock) {
await this.db.setPeerRemoteClock(peer, clock.docId, clock.timestamp);
}
override async getPeerPulledRemoteClock(peer: string, docId: string) {
return this.db.getPeerPulledRemoteClock(peer, docId);
}
override async getPeerPulledRemoteClocks(peer: string) {
const records = await this.db.getPeerPulledRemoteClocks(peer);
return records.reduce((clocks, { docId, timestamp }) => {
clocks[docId] = timestamp;
return clocks;
}, {} as DocClocks);
}
override async setPeerPulledRemoteClock(peer: string, clock: DocClock) {
await this.db.setPeerPulledRemoteClock(peer, clock.docId, clock.timestamp);
}
override async getPeerPushedClocks(peer: string) {
const records = await this.db.getPeerPushedClocks(peer);
return records.reduce((clocks, { docId, timestamp }) => {
clocks[docId] = timestamp;
return clocks;
}, {} as DocClocks);
}
override async getPeerPushedClock(peer: string, docId: string) {
return this.db.getPeerPushedClock(peer, docId);
}
override async setPeerPushedClock(peer: string, clock: DocClock) {
await this.db.setPeerPushedClock(peer, clock.docId, clock.timestamp);
}
override async clearClocks() {
await this.db.clearClocks();
}
}

View File

@@ -2,7 +2,7 @@ import type { InsertRow } from '@affine/native';
import { SqliteConnection } from '@affine/native';
import type { ByteKVBehavior } from '@toeverything/infra/storage';
import { logger } from '../logger';
import { logger } from '../../logger';
/**
* A base class for SQLite DB adapter that provides basic methods around updates & blobs

View File

@@ -1,5 +1,6 @@
import { logger } from '../logger';
import type { SpaceType } from './types';
import type { SpaceType } from '@affine/nbstore';
import { logger } from '../../logger';
import type { WorkspaceSQLiteDB } from './workspace-db-adapter';
import { openWorkspaceDatabase } from './workspace-db-adapter';

View File

@@ -1,7 +1,8 @@
import { mainRPC } from '../main-rpc';
import type { MainEventRegister } from '../type';
import type { SpaceType } from '@affine/nbstore';
import { mainRPC } from '../../main-rpc';
import type { MainEventRegister } from '../../type';
import { ensureSQLiteDB } from './ensure-db';
import type { SpaceType } from './types';
export * from './ensure-db';

View File

@@ -1,12 +1,12 @@
import type { SpaceType } from '@affine/nbstore';
import { AsyncLock } from '@toeverything/infra/utils';
import { Subject } from 'rxjs';
import { applyUpdate, Doc as YDoc } from 'yjs';
import { logger } from '../logger';
import { getWorkspaceMeta } from '../workspace/meta';
import { logger } from '../../logger';
import { getWorkspaceMeta } from '../../workspace/meta';
import { SQLiteAdapter } from './db-adapter';
import { mergeUpdate } from './merge-update';
import type { SpaceType } from './types';
const TRIM_SIZE = 1;

View File

@@ -2,17 +2,17 @@ import path from 'node:path';
import fs from 'fs-extra';
import { ensureSQLiteDB } from '../db/ensure-db';
import { logger } from '../logger';
import { ensureSQLiteDB } from '../nbstore/v1/ensure-db';
import type { WorkspaceMeta } from '../type';
import {
getDeletedWorkspacesBasePath,
getWorkspaceBasePath,
getWorkspaceBasePathV1,
getWorkspaceMeta,
} from './meta';
export async function deleteWorkspace(id: string) {
const basePath = await getWorkspaceBasePath('workspace', id);
const basePath = await getWorkspaceBasePathV1('workspace', id);
const movedPath = path.join(await getDeletedWorkspacesBasePath(), `${id}`);
try {
const db = await ensureSQLiteDB('workspace', id);
@@ -30,7 +30,7 @@ export async function storeWorkspaceMeta(
meta: Partial<WorkspaceMeta>
) {
try {
const basePath = await getWorkspaceBasePath('workspace', workspaceId);
const basePath = await getWorkspaceBasePathV1('workspace', workspaceId);
await fs.ensureDir(basePath);
const metaPath = path.join(basePath, 'meta.json');
const currentMeta = await getWorkspaceMeta('workspace', workspaceId);

View File

@@ -1,9 +1,9 @@
import path from 'node:path';
import type { SpaceType } from '@affine/nbstore';
import fs from 'fs-extra';
import { isWindows } from '../../shared/utils';
import type { SpaceType } from '../db/types';
import { logger } from '../logger';
import { mainRPC } from '../main-rpc';
import type { WorkspaceMeta } from '../type';
@@ -22,7 +22,7 @@ export async function getWorkspacesBasePath() {
return path.join(await getAppDataPath(), 'workspaces');
}
export async function getWorkspaceBasePath(
export async function getWorkspaceBasePathV1(
spaceType: SpaceType,
workspaceId: string
) {
@@ -33,6 +33,34 @@ export async function getWorkspaceBasePath(
);
}
export async function getSpaceBasePath(spaceType: SpaceType) {
return path.join(
await getAppDataPath(),
spaceType === 'userspace' ? 'userspaces' : 'workspaces'
);
}
export function escapeFilename(name: string) {
// replace all special characters with '_' and replace repeated '_' with a single '_' and remove trailing '_'
return name
.replaceAll(/[\\/!@#$%^&*()+~`"':;,?<>|]/g, '_')
.replaceAll(/_+/g, '_')
.replace(/_+$/, '');
}
export async function getSpaceDBPath(
peer: string,
spaceType: SpaceType,
id: string
) {
return path.join(
await getSpaceBasePath(spaceType),
escapeFilename(peer),
id,
'storage.db'
);
}
export async function getDeletedWorkspacesBasePath() {
return path.join(await getAppDataPath(), 'deleted-workspaces');
}
@@ -42,7 +70,7 @@ export async function getWorkspaceDBPath(
workspaceId: string
) {
return path.join(
await getWorkspaceBasePath(spaceType, workspaceId),
await getWorkspaceBasePathV1(spaceType, workspaceId),
'storage.db'
);
}
@@ -52,7 +80,7 @@ export async function getWorkspaceMetaPath(
workspaceId: string
) {
return path.join(
await getWorkspaceBasePath(spaceType, workspaceId),
await getWorkspaceBasePathV1(spaceType, workspaceId),
'meta.json'
);
}
@@ -66,7 +94,7 @@ export async function getWorkspaceMeta(
workspaceId: string
): Promise<WorkspaceMeta> {
try {
const basePath = await getWorkspaceBasePath(spaceType, workspaceId);
const basePath = await getWorkspaceBasePathV1(spaceType, workspaceId);
const metaPath = await getWorkspaceMetaPath(spaceType, workspaceId);
if (
!(await fs