feat: add helper process (#2753)

This commit is contained in:
Peng Xiao
2023-06-13 10:01:43 +08:00
committed by GitHub
parent dff8a0db7d
commit 5ba2dff008
74 changed files with 1002 additions and 1048 deletions

View File

@@ -0,0 +1 @@
tmp

View File

@@ -0,0 +1,134 @@
import path from 'node:path';
import { setTimeout } from 'node:timers/promises';
import fs from 'fs-extra';
import { v4 } from 'uuid';
import { afterEach, beforeEach, expect, test, vi } from 'vitest';
const tmpDir = path.join(__dirname, 'tmp');
const appDataPath = path.join(tmpDir, 'app-data');
vi.doMock('../../main-rpc', () => ({
mainRPC: {
getPath: async () => appDataPath,
},
}));
const constructorStub = vi.fn();
const destroyStub = vi.fn();
destroyStub.mockReturnValue(Promise.resolve());
function existProcess() {
process.emit('beforeExit', 0);
}
vi.doMock('../secondary-db', () => {
return {
SecondaryWorkspaceSQLiteDB: class {
constructor(...args: any[]) {
constructorStub(...args);
}
connectIfNeeded = () => Promise.resolve();
pull = () => Promise.resolve();
destroy = destroyStub;
},
};
});
beforeEach(() => {
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(async () => {
existProcess();
// wait for the db to be closed on Windows
if (process.platform === 'win32') {
await setTimeout(200);
}
await fs.remove(tmpDir);
vi.useRealTimers();
});
test('can get a valid WorkspaceSQLiteDB', async () => {
const { ensureSQLiteDB } = await import('../ensure-db');
const workspaceId = v4();
const db0 = await ensureSQLiteDB(workspaceId);
expect(db0).toBeDefined();
expect(db0.workspaceId).toBe(workspaceId);
const db1 = await ensureSQLiteDB(v4());
expect(db1).not.toBe(db0);
expect(db1.workspaceId).not.toBe(db0.workspaceId);
// ensure that the db is cached
expect(await ensureSQLiteDB(workspaceId)).toBe(db0);
});
test('db should be destroyed when app quits', async () => {
const { ensureSQLiteDB } = await import('../ensure-db');
const workspaceId = v4();
const db0 = await ensureSQLiteDB(workspaceId);
const db1 = await ensureSQLiteDB(v4());
expect(db0.db).not.toBeNull();
expect(db1.db).not.toBeNull();
existProcess();
// wait the async `db.destroy()` to be called
await setTimeout(100);
expect(db0.db).toBeNull();
expect(db1.db).toBeNull();
});
test('db should be removed in db$Map after destroyed', async () => {
const { ensureSQLiteDB, db$Map } = await import('../ensure-db');
const workspaceId = v4();
const db = await ensureSQLiteDB(workspaceId);
await db.destroy();
await setTimeout(100);
expect(db$Map.has(workspaceId)).toBe(false);
});
test('if db has a secondary db path, we should also poll that', async () => {
const { ensureSQLiteDB } = await import('../ensure-db');
const { storeWorkspaceMeta } = await import('../../workspace');
const workspaceId = v4();
await storeWorkspaceMeta(workspaceId, {
secondaryDBPath: path.join(tmpDir, 'secondary.db'),
});
const db = await ensureSQLiteDB(workspaceId);
await setTimeout(10);
expect(constructorStub).toBeCalledTimes(1);
expect(constructorStub).toBeCalledWith(path.join(tmpDir, 'secondary.db'), db);
// if secondary meta is changed
await storeWorkspaceMeta(workspaceId, {
secondaryDBPath: path.join(tmpDir, 'secondary2.db'),
});
// wait the async `db.destroy()` to be called
await setTimeout(100);
expect(constructorStub).toBeCalledTimes(2);
expect(destroyStub).toBeCalledTimes(1);
// if secondary meta is changed (but another workspace)
await storeWorkspaceMeta(v4(), {
secondaryDBPath: path.join(tmpDir, 'secondary3.db'),
});
await vi.advanceTimersByTimeAsync(1500);
expect(constructorStub).toBeCalledTimes(2);
expect(destroyStub).toBeCalledTimes(1);
// if primary is destroyed, secondary should also be destroyed
await db.destroy();
await setTimeout(100);
expect(destroyStub).toBeCalledTimes(2);
});

View File

@@ -0,0 +1,99 @@
import path from 'node:path';
import fs from 'fs-extra';
import { v4 } from 'uuid';
import { afterEach, expect, test, vi } from 'vitest';
import * as Y from 'yjs';
import { dbSubjects } from '../subjects';
const tmpDir = path.join(__dirname, 'tmp');
const appDataPath = path.join(tmpDir, 'app-data');
vi.doMock('../../main-rpc', () => ({
mainRPC: {
getPath: async () => appDataPath,
},
}));
afterEach(async () => {
await fs.remove(tmpDir);
});
function getTestUpdates() {
const testYDoc = new Y.Doc();
const yText = testYDoc.getText('test');
yText.insert(0, 'hello');
const updates = Y.encodeStateAsUpdate(testYDoc);
return updates;
}
test('can create new db file if not exists', async () => {
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
const workspaceId = v4();
const db = await openWorkspaceDatabase(workspaceId);
const dbPath = path.join(
appDataPath,
`workspaces/${workspaceId}`,
`storage.db`
);
expect(await fs.exists(dbPath)).toBe(true);
await db.destroy();
});
test('on applyUpdate (from self), will not trigger update', async () => {
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
const workspaceId = v4();
const onUpdate = vi.fn();
const db = await openWorkspaceDatabase(workspaceId);
db.update$.subscribe(onUpdate);
db.applyUpdate(getTestUpdates(), 'self');
expect(onUpdate).not.toHaveBeenCalled();
await db.destroy();
});
test('on applyUpdate (from renderer), will trigger update', async () => {
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
const workspaceId = v4();
const onUpdate = vi.fn();
const onExternalUpdate = vi.fn();
const db = await openWorkspaceDatabase(workspaceId);
db.update$.subscribe(onUpdate);
const sub = dbSubjects.externalUpdate.subscribe(onExternalUpdate);
db.applyUpdate(getTestUpdates(), 'renderer');
expect(onUpdate).toHaveBeenCalled();
sub.unsubscribe();
await db.destroy();
});
test('on applyUpdate (from external), will trigger update & send external update event', async () => {
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
const workspaceId = v4();
const onUpdate = vi.fn();
const onExternalUpdate = vi.fn();
const db = await openWorkspaceDatabase(workspaceId);
db.update$.subscribe(onUpdate);
const sub = dbSubjects.externalUpdate.subscribe(onExternalUpdate);
db.applyUpdate(getTestUpdates(), 'external');
expect(onUpdate).toHaveBeenCalled();
expect(onExternalUpdate).toHaveBeenCalled();
sub.unsubscribe();
await db.destroy();
});
test('on destroy, check if resources have been released', async () => {
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
const workspaceId = v4();
const db = await openWorkspaceDatabase(workspaceId);
const updateSub = {
complete: vi.fn(),
next: vi.fn(),
};
db.update$ = updateSub as any;
await db.destroy();
expect(db.db).toBe(null);
expect(updateSub.complete).toHaveBeenCalled();
});

View File

@@ -0,0 +1,116 @@
import { SqliteConnection } from '@affine/native';
import { logger } from '../logger';
/**
* A base class for SQLite DB adapter that provides basic methods around updates & blobs
*/
export abstract class BaseSQLiteAdapter {
db: SqliteConnection | null = null;
abstract role: string;
constructor(public readonly path: string) {}
async connectIfNeeded() {
if (!this.db) {
this.db = new SqliteConnection(this.path);
await this.db.connect();
logger.info(`[SQLiteAdapter:${this.role}]`, 'connected:', this.path);
}
return this.db;
}
async destroy() {
const { db } = this;
this.db = null;
// log after close will sometimes crash the app when quitting
logger.info(`[SQLiteAdapter:${this.role}]`, 'destroyed:', this.path);
await db?.close();
}
async addBlob(key: string, data: Uint8Array) {
try {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return;
}
await this.db.addBlob(key, data);
} catch (error) {
logger.error('addBlob', error);
}
}
async getBlob(key: string) {
try {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return;
}
const blob = await this.db.getBlob(key);
return blob?.data;
} catch (error) {
logger.error('getBlob', error);
return null;
}
}
async deleteBlob(key: string) {
try {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return;
}
await this.db.deleteBlob(key);
} catch (error) {
logger.error(`${this.path} delete blob failed`, error);
}
}
async getBlobKeys() {
try {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return [];
}
return await this.db.getBlobKeys();
} catch (error) {
logger.error(`getBlobKeys failed`, error);
return [];
}
}
async getUpdates() {
try {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return [];
}
return await this.db.getUpdates();
} catch (error) {
logger.error('getUpdates', error);
return [];
}
}
// add a single update to SQLite
async addUpdateToSQLite(updates: Uint8Array[]) {
// batch write instead write per key stroke?
try {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return;
}
const start = performance.now();
await this.db.insertUpdates(updates);
logger.debug(
`[SQLiteAdapter][${this.role}] addUpdateToSQLite`,
'length:',
updates.length,
performance.now() - start,
'ms'
);
} catch (error) {
logger.error('addUpdateToSQLite', this.path, error);
}
}
}

View File

@@ -0,0 +1,140 @@
import type { Subject } from 'rxjs';
import { Observable } from 'rxjs';
import {
concat,
defer,
from,
fromEvent,
interval,
lastValueFrom,
merge,
} from 'rxjs';
import {
concatMap,
distinctUntilChanged,
filter,
ignoreElements,
last,
map,
shareReplay,
startWith,
switchMap,
take,
takeUntil,
tap,
} from 'rxjs/operators';
import { logger } from '../logger';
import { getWorkspaceMeta, workspaceSubjects } from '../workspace';
import { SecondaryWorkspaceSQLiteDB } from './secondary-db';
import type { WorkspaceSQLiteDB } from './workspace-db-adapter';
import { openWorkspaceDatabase } from './workspace-db-adapter';
// export for testing
export const db$Map = new Map<string, Observable<WorkspaceSQLiteDB>>();
// use defer to prevent `app` is undefined while running tests
const beforeQuit$ = defer(() => fromEvent(process, 'beforeExit'));
// return a stream that emit a single event when the subject completes
function completed<T>(subject: Subject<T>) {
return new Observable(subscriber => {
const sub = subject.subscribe({
complete: () => {
subscriber.next();
subscriber.complete();
},
});
return () => sub.unsubscribe();
});
}
function getWorkspaceDB$(id: string) {
if (!db$Map.has(id)) {
db$Map.set(
id,
from(openWorkspaceDatabase(id)).pipe(
tap({
next: db => {
logger.info(
'[ensureSQLiteDB] db connection established',
db.workspaceId
);
},
}),
switchMap(db =>
// takeUntil the polling stream, and then destroy the db
concat(
startPollingSecondaryDB(db).pipe(
ignoreElements(),
startWith(db),
takeUntil(merge(beforeQuit$, completed(db.update$))),
last(),
tap({
next() {
logger.info(
'[ensureSQLiteDB] polling secondary db complete',
db.workspaceId
);
},
})
),
defer(async () => {
try {
await db.destroy();
db$Map.delete(id);
return db;
} catch (err) {
logger.error('[ensureSQLiteDB] destroy db failed', err);
throw err;
}
})
).pipe(startWith(db))
),
shareReplay(1)
)
);
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return db$Map.get(id)!;
}
function startPollingSecondaryDB(db: WorkspaceSQLiteDB) {
return merge(
getWorkspaceMeta(db.workspaceId),
workspaceSubjects.meta.pipe(
map(({ meta }) => meta),
filter(meta => meta.id === db.workspaceId)
)
).pipe(
map(meta => meta?.secondaryDBPath),
filter((p): p is string => !!p),
distinctUntilChanged(),
switchMap(path => {
// on secondary db path change, destroy the old db and create a new one
const secondaryDB = new SecondaryWorkspaceSQLiteDB(path, db);
return new Observable<SecondaryWorkspaceSQLiteDB>(subscriber => {
subscriber.next(secondaryDB);
return () => secondaryDB.destroy();
});
}),
switchMap(secondaryDB => {
return interval(300000).pipe(
startWith(0),
concatMap(() => secondaryDB.pull()),
tap({
error: err => {
logger.error(`[ensureSQLiteDB] polling secondary db error`, err);
},
complete: () => {
logger.info('[ensureSQLiteDB] polling secondary db complete');
},
})
);
})
);
}
export function ensureSQLiteDB(id: string) {
return lastValueFrom(getWorkspaceDB$(id).pipe(take(1)));
}

View File

@@ -0,0 +1,48 @@
import { mainRPC } from '../main-rpc';
import type { MainEventRegister } from '../type';
import { ensureSQLiteDB } from './ensure-db';
import { dbSubjects } from './subjects';
export * from './ensure-db';
export * from './subjects';
export const dbHandlers = {
getDocAsUpdates: async (id: string) => {
const workspaceDB = await ensureSQLiteDB(id);
return workspaceDB.getDocAsUpdates();
},
applyDocUpdate: async (id: string, update: Uint8Array) => {
const workspaceDB = await ensureSQLiteDB(id);
return workspaceDB.applyUpdate(update);
},
addBlob: async (workspaceId: string, key: string, data: Uint8Array) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.addBlob(key, data);
},
getBlob: async (workspaceId: string, key: string) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.getBlob(key);
},
deleteBlob: async (workspaceId: string, key: string) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.deleteBlob(key);
},
getBlobKeys: async (workspaceId: string) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.getBlobKeys();
},
getDefaultStorageLocation: async () => {
return await mainRPC.getPath('sessionData');
},
};
export const dbEvents = {
onExternalUpdate: (
fn: (update: { workspaceId: string; update: Uint8Array }) => void
) => {
const sub = dbSubjects.externalUpdate.subscribe(fn);
return () => {
sub.unsubscribe();
};
},
} satisfies Record<string, MainEventRegister>;

View File

@@ -0,0 +1,11 @@
import * as Y from 'yjs';
export function mergeUpdate(updates: Uint8Array[]) {
const yDoc = new Y.Doc();
Y.transact(yDoc, () => {
for (const update of updates) {
Y.applyUpdate(yDoc, update);
}
});
return Y.encodeStateAsUpdate(yDoc);
}

View File

@@ -0,0 +1,215 @@
import assert from 'node:assert';
import type { SqliteConnection } from '@affine/native';
import { debounce } from 'lodash-es';
import * as Y from 'yjs';
import { logger } from '../logger';
import type { YOrigin } from '../type';
import { getWorkspaceMeta } from '../workspace';
import { BaseSQLiteAdapter } from './base-db-adapter';
import { mergeUpdate } from './merge-update';
import type { WorkspaceSQLiteDB } from './workspace-db-adapter';
const FLUSH_WAIT_TIME = 5000;
const FLUSH_MAX_WAIT_TIME = 10000;
export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
role = 'secondary';
yDoc = new Y.Doc();
firstConnected = false;
destroyed = false;
updateQueue: Uint8Array[] = [];
unsubscribers = new Set<() => void>();
constructor(
public override path: string,
public upstream: WorkspaceSQLiteDB
) {
super(path);
this.setupAndListen();
logger.debug('[SecondaryWorkspaceSQLiteDB] created', this.workspaceId);
}
override async destroy() {
await this.flushUpdateQueue();
this.unsubscribers.forEach(unsub => unsub());
this.yDoc.destroy();
await super.destroy();
this.destroyed = true;
}
get workspaceId() {
return this.upstream.workspaceId;
}
// do not update db immediately, instead, push to a queue
// and flush the queue in a future time
async addUpdateToUpdateQueue(db: SqliteConnection, update: Uint8Array) {
this.updateQueue.push(update);
await this.debouncedFlush();
}
async flushUpdateQueue() {
if (this.destroyed) {
return;
}
logger.debug(
'flushUpdateQueue',
this.workspaceId,
'queue',
this.updateQueue.length
);
const updates = [...this.updateQueue];
this.updateQueue = [];
await this.run(async () => {
await this.addUpdateToSQLite(updates);
});
}
// flush after 5s, but will not wait for more than 10s
debouncedFlush = debounce(this.flushUpdateQueue, FLUSH_WAIT_TIME, {
maxWait: FLUSH_MAX_WAIT_TIME,
});
runCounter = 0;
// wrap the fn with connect and close
async run<T extends (...args: any[]) => any>(
fn: T
): Promise<
(T extends (...args: any[]) => infer U ? Awaited<U> : unknown) | undefined
> {
try {
if (this.destroyed) {
return;
}
await this.connectIfNeeded();
this.runCounter++;
return await fn();
} catch (err) {
logger.error(err);
throw err;
} finally {
this.runCounter--;
if (this.runCounter === 0) {
// just close db, but not the yDoc
await super.destroy();
}
}
}
setupAndListen() {
if (this.firstConnected) {
return;
}
this.firstConnected = true;
const onUpstreamUpdate = (update: Uint8Array, origin: YOrigin) => {
if (origin === 'renderer') {
// update to upstream yDoc should be replicated to self yDoc
this.applyUpdate(update, 'upstream');
}
};
const onSelfUpdate = async (update: Uint8Array, origin: YOrigin) => {
// for self update from upstream, we need to push it to external DB
if (origin === 'upstream' && this.db) {
await this.addUpdateToUpdateQueue(this.db, update);
}
if (origin === 'self') {
this.upstream.applyUpdate(update, 'external');
}
};
// listen to upstream update
this.upstream.yDoc.on('update', onUpstreamUpdate);
this.yDoc.on('update', onSelfUpdate);
this.unsubscribers.add(() => {
this.upstream.yDoc.off('update', onUpstreamUpdate);
this.yDoc.off('update', onSelfUpdate);
});
this.run(() => {
// apply all updates from upstream
const upstreamUpdate = this.upstream.getDocAsUpdates();
// to initialize the yDoc, we need to apply all updates from the db
this.applyUpdate(upstreamUpdate, 'upstream');
})
.then(() => {
logger.debug('run success');
})
.catch(err => {
logger.error('run error', err);
});
}
applyUpdate = (data: Uint8Array, origin: YOrigin = 'upstream') => {
Y.applyUpdate(this.yDoc, data, origin);
};
// TODO: have a better solution to handle blobs
async syncBlobs() {
await this.run(async () => {
// skip if upstream db is not connected (maybe it is already closed)
const blobsKeys = await this.getBlobKeys();
if (!this.upstream.db || this.upstream.db?.isClose) {
return;
}
const upstreamBlobsKeys = await this.upstream.getBlobKeys();
// put every missing blob to upstream
for (const key of blobsKeys) {
if (!upstreamBlobsKeys.includes(key)) {
const blob = await this.getBlob(key);
if (blob) {
await this.upstream.addBlob(key, blob);
logger.debug('syncBlobs', this.workspaceId, key);
}
}
}
});
}
/**
* pull from external DB file and apply to embedded yDoc
* workflow:
* - connect to external db
* - get updates
* - apply updates to local yDoc
* - get blobs and put new blobs to upstream
* - disconnect
*/
async pull() {
const start = performance.now();
assert(this.upstream.db, 'upstream db should be connected');
const updates = await this.run(async () => {
// TODO: no need to get all updates, just get the latest ones (using a cursor, etc)?
await this.syncBlobs();
return (await this.getUpdates()).map(update => update.data);
});
if (!updates || this.destroyed) {
return;
}
const merged = mergeUpdate(updates);
this.applyUpdate(merged, 'self');
logger.debug(
'pull external updates',
this.path,
updates.length,
(performance.now() - start).toFixed(2),
'ms'
);
}
}
export async function getSecondaryWorkspaceDBPath(workspaceId: string) {
const meta = await getWorkspaceMeta(workspaceId);
return meta?.secondaryDBPath;
}

View File

@@ -0,0 +1,5 @@
import { Subject } from 'rxjs';
export const dbSubjects = {
externalUpdate: new Subject<{ workspaceId: string; update: Uint8Array }>(),
};

View File

@@ -0,0 +1,102 @@
import { Subject } from 'rxjs';
import * as Y from 'yjs';
import { logger } from '../logger';
import type { YOrigin } from '../type';
import { getWorkspaceMeta } from '../workspace';
import { BaseSQLiteAdapter } from './base-db-adapter';
import { mergeUpdate } from './merge-update';
import { dbSubjects } from './subjects';
export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
role = 'primary';
yDoc = new Y.Doc();
firstConnected = false;
update$ = new Subject<void>();
constructor(public override path: string, public workspaceId: string) {
super(path);
}
override async destroy() {
await super.destroy();
this.yDoc.destroy();
// when db is closed, we can safely remove it from ensure-db list
this.update$.complete();
this.firstConnected = false;
}
getWorkspaceName = () => {
return this.yDoc.getMap('space:meta').get('name') as string;
};
async init() {
const db = await super.connectIfNeeded();
if (!this.firstConnected) {
this.yDoc.on('update', async (update: Uint8Array, origin: YOrigin) => {
if (origin === 'renderer') {
await this.addUpdateToSQLite([update]);
} else if (origin === 'external') {
dbSubjects.externalUpdate.next({
workspaceId: this.workspaceId,
update,
});
await this.addUpdateToSQLite([update]);
logger.debug('external update', this.workspaceId);
}
});
}
const updates = await this.getUpdates();
const merged = mergeUpdate(updates.map(update => update.data));
// to initialize the yDoc, we need to apply all updates from the db
this.applyUpdate(merged, 'self');
this.firstConnected = true;
this.update$.next();
return db;
}
getDocAsUpdates = () => {
return Y.encodeStateAsUpdate(this.yDoc);
};
// non-blocking and use yDoc to validate the update
// after that, the update is added to the db
applyUpdate = (data: Uint8Array, origin: YOrigin = 'renderer') => {
// todo: trim the updates when the number of records is too large
// 1. store the current ydoc state in the db
// 2. then delete the old updates
// yjs-idb will always trim the db for the first time after DB is loaded
Y.applyUpdate(this.yDoc, data, origin);
};
override async addBlob(key: string, value: Uint8Array) {
this.update$.next();
const res = await super.addBlob(key, value);
return res;
}
override async deleteBlob(key: string) {
this.update$.next();
await super.deleteBlob(key);
}
override async addUpdateToSQLite(data: Uint8Array[]) {
this.update$.next();
await super.addUpdateToSQLite(data);
}
}
export async function openWorkspaceDatabase(workspaceId: string) {
const meta = await getWorkspaceMeta(workspaceId);
const db = new WorkspaceSQLiteDB(meta.mainDBPath, workspaceId);
await db.init();
logger.info(`openWorkspaceDatabase [${workspaceId}]`);
return db;
}

View File

@@ -0,0 +1,341 @@
import path from 'node:path';
import fs from 'fs-extra';
import { nanoid } from 'nanoid';
import { ensureSQLiteDB } from '../db/ensure-db';
import type { WorkspaceSQLiteDB } from '../db/workspace-db-adapter';
import { logger } from '../logger';
import { mainRPC } from '../main-rpc';
import {
getWorkspaceDBPath,
getWorkspaceMeta,
getWorkspacesBasePath,
listWorkspaces,
storeWorkspaceMeta,
} from '../workspace';
// NOTE:
// we are using native dialogs because HTML dialogs do not give full file paths
export async function revealDBFile(workspaceId: string) {
const meta = await getWorkspaceMeta(workspaceId);
if (!meta) {
return;
}
await mainRPC.showItemInFolder(meta.secondaryDBPath ?? meta.mainDBPath);
}
// provide a backdoor to set dialog path for testing in playwright
export interface FakeDialogResult {
canceled?: boolean;
filePath?: string;
filePaths?: string[];
}
// result will be used in the next call to showOpenDialog
// if it is being read once, it will be reset to undefined
let fakeDialogResult: FakeDialogResult | undefined = undefined;
function getFakedResult() {
const result = fakeDialogResult;
fakeDialogResult = undefined;
return result;
}
export function setFakeDialogResult(result: FakeDialogResult | undefined) {
fakeDialogResult = result;
// for convenience, we will fill filePaths with filePath if it is not set
if (result?.filePaths === undefined && result?.filePath !== undefined) {
result.filePaths = [result.filePath];
}
}
const ErrorMessages = [
'DB_FILE_ALREADY_LOADED',
'DB_FILE_PATH_INVALID',
'DB_FILE_INVALID',
'FILE_ALREADY_EXISTS',
'UNKNOWN_ERROR',
] as const;
type ErrorMessage = (typeof ErrorMessages)[number];
export interface SaveDBFileResult {
filePath?: string;
canceled?: boolean;
error?: ErrorMessage;
}
const extension = 'affine';
function getDefaultDBFileName(name: string, id: string) {
const fileName = `${name}_${id}.${extension}`;
// make sure fileName is a valid file name
return fileName.replace(/[/\\?%*:|"<>]/g, '-');
}
/**
* This function is called when the user clicks the "Save" button in the "Save Workspace" dialog.
*
* It will just copy the file to the given path
*/
export async function saveDBFileAs(
workspaceId: string
): Promise<SaveDBFileResult> {
try {
const db = await ensureSQLiteDB(workspaceId);
const ret =
getFakedResult() ??
(await mainRPC.showSaveDialog({
properties: ['showOverwriteConfirmation'],
title: 'Save Workspace',
showsTagField: false,
buttonLabel: 'Save',
filters: [
{
extensions: [extension],
name: '',
},
],
defaultPath: getDefaultDBFileName(db.getWorkspaceName(), workspaceId),
message: 'Save Workspace as a SQLite Database file',
}));
const filePath = ret.filePath;
if (ret.canceled || !filePath) {
return {
canceled: true,
};
}
await fs.copyFile(db.path, filePath);
logger.log('saved', filePath);
mainRPC.showItemInFolder(filePath);
return { filePath };
} catch (err) {
logger.error('saveDBFileAs', err);
return {
error: 'UNKNOWN_ERROR',
};
}
}
export interface SelectDBFileLocationResult {
filePath?: string;
error?: ErrorMessage;
canceled?: boolean;
}
export async function selectDBFileLocation(): Promise<SelectDBFileLocationResult> {
try {
const ret =
getFakedResult() ??
(await mainRPC.showOpenDialog({
properties: ['openDirectory'],
title: 'Set Workspace Storage Location',
buttonLabel: 'Select',
defaultPath: await mainRPC.getPath('documents'),
message: "Select a location to store the workspace's database file",
}));
const dir = ret.filePaths?.[0];
if (ret.canceled || !dir) {
return {
canceled: true,
};
}
return { filePath: dir };
} catch (err) {
logger.error('selectDBFileLocation', err);
return {
error: (err as any).message,
};
}
}
export interface LoadDBFileResult {
workspaceId?: string;
error?: ErrorMessage;
canceled?: boolean;
}
/**
* This function is called when the user clicks the "Load" button in the "Load Workspace" dialog.
*
* It will
* - symlink the source db file to a new workspace id to app-data
* - return the new workspace id
*
* eg, it will create a new folder in app-data:
* <app-data>/<app-name>/workspaces/<workspace-id>/storage.db
*
* On the renderer side, after the UI got a new workspace id, it will
* update the local workspace id list and then connect to it.
*
*/
export async function loadDBFile(): Promise<LoadDBFileResult> {
try {
const ret =
getFakedResult() ??
(await mainRPC.showOpenDialog({
properties: ['openFile'],
title: 'Load Workspace',
buttonLabel: 'Load',
filters: [
{
name: 'SQLite Database',
// do we want to support other file format?
extensions: ['db', 'affine'],
},
],
message: 'Load Workspace from a AFFiNE file',
}));
const filePath = ret.filePaths?.[0];
if (ret.canceled || !filePath) {
logger.info('loadDBFile canceled');
return { canceled: true };
}
// the imported file should not be in app data dir
if (filePath.startsWith(await getWorkspacesBasePath())) {
logger.warn('loadDBFile: db file in app data dir');
return { error: 'DB_FILE_PATH_INVALID' };
}
if (await dbFileAlreadyLoaded(filePath)) {
logger.warn('loadDBFile: db file already loaded');
return { error: 'DB_FILE_ALREADY_LOADED' };
}
const { SqliteConnection } = await import('@affine/native');
if (!(await SqliteConnection.validate(filePath))) {
// TODO: report invalid db file error?
return { error: 'DB_FILE_INVALID' }; // invalid db file
}
// copy the db file to a new workspace id
const workspaceId = nanoid(10);
const internalFilePath = await getWorkspaceDBPath(workspaceId);
await fs.ensureDir(await getWorkspacesBasePath());
await fs.copy(filePath, internalFilePath);
logger.info(`loadDBFile, copy: ${filePath} -> ${internalFilePath}`);
await storeWorkspaceMeta(workspaceId, {
id: workspaceId,
mainDBPath: internalFilePath,
secondaryDBPath: filePath,
});
return { workspaceId };
} catch (err) {
logger.error('loadDBFile', err);
return {
error: 'UNKNOWN_ERROR',
};
}
}
export interface MoveDBFileResult {
filePath?: string;
error?: ErrorMessage;
canceled?: boolean;
}
/**
* This function is called when the user clicks the "Move" button in the "Move Workspace Storage" setting.
*
* It will
* - copy the source db file to a new location
* - remove the old db external file
* - update the external db file path in the workspace meta
* - return the new file path
*/
export async function moveDBFile(
workspaceId: string,
dbFileDir?: string
): Promise<MoveDBFileResult> {
let db: WorkspaceSQLiteDB | null = null;
try {
db = await ensureSQLiteDB(workspaceId);
const meta = await getWorkspaceMeta(workspaceId);
const oldDir = meta.secondaryDBPath
? path.dirname(meta.secondaryDBPath)
: null;
const defaultDir = oldDir ?? (await mainRPC.getPath('documents'));
const newName = getDefaultDBFileName(db.getWorkspaceName(), workspaceId);
const newDirPath =
dbFileDir ??
(
getFakedResult() ??
(await mainRPC.showOpenDialog({
properties: ['openDirectory'],
title: 'Move Workspace Storage',
buttonLabel: 'Move',
defaultPath: defaultDir,
message: 'Move Workspace storage file',
}))
).filePaths?.[0];
// skips if
// - user canceled the dialog
// - user selected the same dir
if (!newDirPath || newDirPath === oldDir) {
return {
canceled: true,
};
}
const newFilePath = path.join(newDirPath, newName);
if (await fs.pathExists(newFilePath)) {
return {
error: 'FILE_ALREADY_EXISTS',
};
}
logger.info(`[moveDBFile] copy ${meta.mainDBPath} -> ${newFilePath}`);
await fs.copy(meta.mainDBPath, newFilePath);
// remove the old db file, but we don't care if it fails
if (meta.secondaryDBPath) {
await fs
.remove(meta.secondaryDBPath)
.then(() => {
logger.info(`[moveDBFile] removed ${meta.secondaryDBPath}`);
})
.catch(err => {
logger.error(
`[moveDBFile] remove ${meta.secondaryDBPath} failed`,
err
);
});
}
// update meta
await storeWorkspaceMeta(workspaceId, {
secondaryDBPath: newFilePath,
});
return {
filePath: newFilePath,
};
} catch (err) {
await db?.destroy();
logger.error('[moveDBFile]', err);
return {
error: 'UNKNOWN_ERROR',
};
}
}
async function dbFileAlreadyLoaded(path: string) {
const meta = await listWorkspaces();
const paths = meta.map(m => m[1].secondaryDBPath);
return paths.includes(path);
}

View File

@@ -0,0 +1,31 @@
import {
loadDBFile,
moveDBFile,
revealDBFile,
saveDBFileAs,
selectDBFileLocation,
setFakeDialogResult,
} from './dialog';
export const dialogHandlers = {
revealDBFile: async (workspaceId: string) => {
return revealDBFile(workspaceId);
},
loadDBFile: async () => {
return loadDBFile();
},
saveDBFileAs: async (workspaceId: string) => {
return saveDBFileAs(workspaceId);
},
moveDBFile: (workspaceId: string, dbFileLocation?: string) => {
return moveDBFile(workspaceId, dbFileLocation);
},
selectDBFileLocation: async () => {
return selectDBFileLocation();
},
setFakeDialogResult: async (
result: Parameters<typeof setFakeDialogResult>[0]
) => {
return setFakeDialogResult(result);
},
};

View File

@@ -0,0 +1,33 @@
import { dbEvents, dbHandlers } from './db';
import { dialogHandlers } from './dialog';
import { workspaceEvents, workspaceHandlers } from './workspace';
export const handlers = {
db: dbHandlers,
workspace: workspaceHandlers,
dialog: dialogHandlers,
};
export const events = {
db: dbEvents,
workspace: workspaceEvents,
};
export const getExposedMeta = () => {
const handlersMeta = Object.entries(handlers).map(
([namespace, namespaceHandlers]) => {
return [namespace, Object.keys(namespaceHandlers)] as [string, string[]];
}
);
const eventsMeta = Object.entries(events).map(
([namespace, namespaceHandlers]) => {
return [namespace, Object.keys(namespaceHandlers)] as [string, string[]];
}
);
return {
handlers: handlersMeta,
events: eventsMeta,
};
};

View File

@@ -0,0 +1,86 @@
import type { EventBasedChannel } from 'async-call-rpc';
import { AsyncCall } from 'async-call-rpc';
import { events, handlers } from './exposed';
import { logger } from './logger';
const createMessagePortMainChannel = (
connection: Electron.MessagePortMain
): EventBasedChannel => {
return {
on(listener) {
const f = (e: Electron.MessageEvent) => {
listener(e.data);
};
connection.on('message', f);
// MUST start the connection to receive messages
connection.start();
return () => {
connection.off('message', f);
};
},
send(data) {
connection.postMessage(data);
},
};
};
function setupRendererConnection(rendererPort: Electron.MessagePortMain) {
const flattenedHandlers = Object.entries(handlers).flatMap(
([namespace, namespaceHandlers]) => {
return Object.entries(namespaceHandlers).map(([name, handler]) => {
const handlerWithLog = async (...args: any[]) => {
try {
const start = performance.now();
const result = await handler(...args);
logger.info(
'[async-api]',
`${namespace}.${name}`,
args.filter(
arg => typeof arg !== 'function' && typeof arg !== 'object'
),
'-',
(performance.now() - start).toFixed(2),
'ms'
);
return result;
} catch (error) {
logger.error('[async-api]', `${namespace}.${name}`, error);
}
};
return [`${namespace}:${name}`, handlerWithLog];
});
}
);
const rpc = AsyncCall<PeersAPIs.RendererToHelper>(
Object.fromEntries(flattenedHandlers),
{
channel: createMessagePortMainChannel(rendererPort),
log: false,
}
);
for (const [namespace, namespaceEvents] of Object.entries(events)) {
for (const [key, eventRegister] of Object.entries(namespaceEvents)) {
const subscription = eventRegister((...args: any[]) => {
const chan = `${namespace}:${key}`;
rpc.postEvent(chan, ...args);
});
process.on('exit', () => {
subscription();
});
}
}
}
function main() {
process.parentPort.on('message', e => {
if (e.data.channel === 'renderer-connect' && e.ports.length === 1) {
const rendererPort = e.ports[0];
setupRendererConnection(rendererPort);
logger.info('[helper] renderer connected');
}
});
}
main();

View File

@@ -0,0 +1,3 @@
import log from 'electron-log';
export const logger = log.scope('helper');

View File

@@ -0,0 +1,33 @@
import { AsyncCall, type EventBasedChannel } from 'async-call-rpc';
import { getExposedMeta } from './exposed';
function createMessagePortMainChannel(
connection: Electron.ParentPort
): EventBasedChannel {
return {
on(listener) {
const f = (e: Electron.MessageEvent) => {
listener(e.data);
};
connection.on('message', f);
return () => {
connection.off('message', f);
};
},
send(data) {
connection.postMessage(data);
},
};
}
const helperToMainServer: PeersAPIs.HelperToMain = {
getMeta: () => getExposedMeta(),
};
export const mainRPC = AsyncCall<PeersAPIs.MainToHelper>(helperToMainServer, {
strict: {
unknownMessage: false,
},
channel: createMessagePortMainChannel(process.parentPort),
});

View File

@@ -0,0 +1,9 @@
export interface WorkspaceMeta {
id: string;
mainDBPath: string;
secondaryDBPath?: string; // assume there will be only one
}
export type YOrigin = 'self' | 'external' | 'upstream' | 'renderer';
export type MainEventRegister = (...args: any[]) => () => void;

View File

@@ -0,0 +1 @@
tmp

View File

@@ -0,0 +1,142 @@
import path from 'node:path';
import fs from 'fs-extra';
import { v4 } from 'uuid';
import { afterEach, describe, expect, test, vi } from 'vitest';
const tmpDir = path.join(__dirname, 'tmp');
const appDataPath = path.join(tmpDir, 'app-data');
vi.doMock('../../db/ensure-db', () => ({
ensureSQLiteDB: async () => ({
destroy: () => {},
}),
}));
vi.doMock('../../main-rpc', () => ({
mainRPC: {
getPath: async () => appDataPath,
},
}));
afterEach(async () => {
await fs.remove(tmpDir);
});
describe('list workspaces', () => {
test('listWorkspaces (valid)', async () => {
const { listWorkspaces } = await import('../handlers');
const workspaceId = v4();
const workspacePath = path.join(appDataPath, 'workspaces', workspaceId);
const meta = {
id: workspaceId,
};
await fs.ensureDir(workspacePath);
await fs.writeJSON(path.join(workspacePath, 'meta.json'), meta);
const workspaces = await listWorkspaces();
expect(workspaces).toEqual([[workspaceId, meta]]);
});
test('listWorkspaces (without meta json file)', async () => {
const { listWorkspaces } = await import('../handlers');
const workspaceId = v4();
const workspacePath = path.join(appDataPath, 'workspaces', workspaceId);
await fs.ensureDir(workspacePath);
const workspaces = await listWorkspaces();
expect(workspaces).toEqual([
[
workspaceId,
// meta file will be created automatically
{ id: workspaceId, mainDBPath: path.join(workspacePath, 'storage.db') },
],
]);
});
});
describe('delete workspace', () => {
test('deleteWorkspace', async () => {
const { deleteWorkspace } = await import('../handlers');
const workspaceId = v4();
const workspacePath = path.join(appDataPath, 'workspaces', workspaceId);
await fs.ensureDir(workspacePath);
await deleteWorkspace(workspaceId);
expect(await fs.pathExists(workspacePath)).toBe(false);
// removed workspace will be moved to deleted-workspaces
expect(
await fs.pathExists(
path.join(appDataPath, 'deleted-workspaces', workspaceId)
)
).toBe(true);
});
});
describe('getWorkspaceMeta', () => {
test('can get meta', async () => {
const { getWorkspaceMeta } = await import('../handlers');
const workspaceId = v4();
const workspacePath = path.join(appDataPath, 'workspaces', workspaceId);
const meta = {
id: workspaceId,
};
await fs.ensureDir(workspacePath);
await fs.writeJSON(path.join(workspacePath, 'meta.json'), meta);
expect(await getWorkspaceMeta(workspaceId)).toEqual(meta);
});
test('can create meta if not exists', async () => {
const { getWorkspaceMeta } = await import('../handlers');
const workspaceId = v4();
const workspacePath = path.join(appDataPath, 'workspaces', workspaceId);
await fs.ensureDir(workspacePath);
expect(await getWorkspaceMeta(workspaceId)).toEqual({
id: workspaceId,
mainDBPath: path.join(workspacePath, 'storage.db'),
});
expect(
await fs.pathExists(path.join(workspacePath, 'meta.json'))
).toBeTruthy();
});
test('can migrate meta if db file is a link', async () => {
const { getWorkspaceMeta } = await import('../handlers');
const workspaceId = v4();
const workspacePath = path.join(appDataPath, 'workspaces', workspaceId);
await fs.ensureDir(workspacePath);
const sourcePath = path.join(tmpDir, 'source.db');
await fs.writeFile(sourcePath, 'test');
await fs.ensureSymlink(sourcePath, path.join(workspacePath, 'storage.db'));
expect(await getWorkspaceMeta(workspaceId)).toEqual({
id: workspaceId,
mainDBPath: path.join(workspacePath, 'storage.db'),
secondaryDBPath: sourcePath,
});
expect(
await fs.pathExists(path.join(workspacePath, 'meta.json'))
).toBeTruthy();
});
});
test('storeWorkspaceMeta', async () => {
const { storeWorkspaceMeta } = await import('../handlers');
const workspaceId = v4();
const workspacePath = path.join(appDataPath, 'workspaces', workspaceId);
await fs.ensureDir(workspacePath);
const meta = {
id: workspaceId,
mainDBPath: path.join(workspacePath, 'storage.db'),
};
await storeWorkspaceMeta(workspaceId, meta);
expect(await fs.readJSON(path.join(workspacePath, 'meta.json'))).toEqual(
meta
);
await storeWorkspaceMeta(workspaceId, {
secondaryDBPath: path.join(tmpDir, 'test.db'),
});
expect(await fs.readJSON(path.join(workspacePath, 'meta.json'))).toEqual({
...meta,
secondaryDBPath: path.join(tmpDir, 'test.db'),
});
});

View File

@@ -0,0 +1,143 @@
import path from 'node:path';
import fs from 'fs-extra';
import { ensureSQLiteDB } from '../db/ensure-db';
import { logger } from '../logger';
import { mainRPC } from '../main-rpc';
import type { WorkspaceMeta } from '../type';
import { workspaceSubjects } from './subjects';
let _appDataPath = '';
async function getAppDataPath() {
if (_appDataPath) {
return _appDataPath;
}
_appDataPath = await mainRPC.getPath('sessionData');
return _appDataPath;
}
export async function listWorkspaces(): Promise<
[workspaceId: string, meta: WorkspaceMeta][]
> {
const basePath = await getWorkspacesBasePath();
try {
await fs.ensureDir(basePath);
const dirs = (
await fs.readdir(basePath, {
withFileTypes: true,
})
).filter(d => d.isDirectory());
const metaList = (
await Promise.all(
dirs.map(async dir => {
// ? shall we put all meta in a single file instead of one file per workspace?
return await getWorkspaceMeta(dir.name);
})
)
).filter((w): w is WorkspaceMeta => !!w);
return metaList.map(meta => [meta.id, meta]);
} catch (error) {
logger.error('listWorkspaces', error);
return [];
}
}
export async function deleteWorkspace(id: string) {
const basePath = await getWorkspaceBasePath(id);
const movedPath = path.join(await getDeletedWorkspacesBasePath(), `${id}`);
try {
const db = await ensureSQLiteDB(id);
await db.destroy();
return await fs.move(basePath, movedPath, {
overwrite: true,
});
} catch (error) {
logger.error('deleteWorkspace', error);
}
}
export async function getWorkspacesBasePath() {
return path.join(await getAppDataPath(), 'workspaces');
}
export async function getWorkspaceBasePath(workspaceId: string) {
return path.join(await getAppDataPath(), 'workspaces', workspaceId);
}
async function getDeletedWorkspacesBasePath() {
return path.join(await getAppDataPath(), 'deleted-workspaces');
}
export async function getWorkspaceDBPath(workspaceId: string) {
return path.join(await getWorkspaceBasePath(workspaceId), 'storage.db');
}
export async function getWorkspaceMetaPath(workspaceId: string) {
return path.join(await getWorkspaceBasePath(workspaceId), 'meta.json');
}
/**
* Get workspace meta, create one if not exists
* This function will also migrate the workspace if needed
*/
export async function getWorkspaceMeta(
workspaceId: string
): Promise<WorkspaceMeta> {
try {
const basePath = await getWorkspaceBasePath(workspaceId);
const metaPath = await getWorkspaceMetaPath(workspaceId);
if (!(await fs.exists(metaPath))) {
// since not meta is found, we will migrate symlinked db file if needed
await fs.ensureDir(basePath);
const dbPath = await getWorkspaceDBPath(workspaceId);
// todo: remove this after migration (in stable version)
const realDBPath = (await fs.exists(dbPath))
? await fs.realpath(dbPath)
: dbPath;
const isLink = realDBPath !== dbPath;
if (isLink) {
await fs.copy(realDBPath, dbPath);
}
// create one if not exists
const meta = {
id: workspaceId,
mainDBPath: dbPath,
secondaryDBPath: isLink ? realDBPath : undefined,
};
await fs.writeJSON(metaPath, meta);
return meta;
} else {
const meta = await fs.readJSON(metaPath);
return meta;
}
} catch (err) {
logger.error('getWorkspaceMeta failed', err);
throw err;
}
}
export async function storeWorkspaceMeta(
workspaceId: string,
meta: Partial<WorkspaceMeta>
) {
try {
const basePath = await getWorkspaceBasePath(workspaceId);
await fs.ensureDir(basePath);
const metaPath = path.join(basePath, 'meta.json');
const currentMeta = await getWorkspaceMeta(workspaceId);
const newMeta = {
...currentMeta,
...meta,
};
await fs.writeJSON(metaPath, newMeta);
workspaceSubjects.meta.next({
workspaceId,
meta: newMeta,
});
} catch (err) {
logger.error('storeWorkspaceMeta failed', err);
}
}

View File

@@ -0,0 +1,25 @@
import type { MainEventRegister, WorkspaceMeta } from '../type';
import { deleteWorkspace, getWorkspaceMeta, listWorkspaces } from './handlers';
import { workspaceSubjects } from './subjects';
export * from './handlers';
export * from './subjects';
export const workspaceEvents = {
onMetaChange: (
fn: (meta: { workspaceId: string; meta: WorkspaceMeta }) => void
) => {
const sub = workspaceSubjects.meta.subscribe(fn);
return () => {
sub.unsubscribe();
};
},
} satisfies Record<string, MainEventRegister>;
export const workspaceHandlers = {
list: async () => listWorkspaces(),
delete: async (id: string) => deleteWorkspace(id),
getMeta: async (id: string) => {
return getWorkspaceMeta(id);
},
};

View File

@@ -0,0 +1,7 @@
import { Subject } from 'rxjs';
import type { WorkspaceMeta } from '../type';
export const workspaceSubjects = {
meta: new Subject<{ workspaceId: string; meta: WorkspaceMeta }>(),
};