mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-11 20:08:37 +00:00
fix: optimize DB pull (#2589)
This commit is contained in:
@@ -176,7 +176,7 @@ describe('ensureSQLiteDB', () => {
|
||||
expect(fileExists).toBe(true);
|
||||
registeredHandlers.get('before-quit')?.forEach(fn => fn());
|
||||
await delay(100);
|
||||
expect(workspaceDB.db?.open).toBe(false);
|
||||
expect(workspaceDB.db).toBe(null);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -197,7 +197,7 @@ describe('workspace handlers', () => {
|
||||
const list = await dispatch('workspace', 'list');
|
||||
expect(list.map(([id]) => id)).toEqual([ids[0]]);
|
||||
// deleted db should be closed
|
||||
expect(dbs[1].db?.open).toBe(false);
|
||||
expect(dbs[1].db).toBe(null);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -436,6 +436,8 @@ describe('dialog handlers', () => {
|
||||
});
|
||||
|
||||
test('moveDBFile (valid)', async () => {
|
||||
const sendStub = vi.fn();
|
||||
browserWindow.webContents.send = sendStub;
|
||||
const newPath = path.join(SESSION_DATA_PATH, 'xxx');
|
||||
const showOpenDialog = vi.fn(() => {
|
||||
return { filePaths: [newPath] };
|
||||
@@ -444,13 +446,19 @@ describe('dialog handlers', () => {
|
||||
|
||||
const id = v4();
|
||||
const { ensureSQLiteDB } = await import('../db/ensure-db');
|
||||
await ensureSQLiteDB(id);
|
||||
const db = await ensureSQLiteDB(id);
|
||||
const res = await dispatch('dialog', 'moveDBFile', id);
|
||||
expect(showOpenDialog).toBeCalled();
|
||||
assert(res.filePath);
|
||||
expect(path.dirname(res.filePath)).toBe(newPath);
|
||||
expect(res.filePath.endsWith('.affine')).toBe(true);
|
||||
// should also send workspace meta change event
|
||||
expect(sendStub).toBeCalledWith('workspace:onMetaChange', {
|
||||
workspaceId: id,
|
||||
meta: { id, secondaryDBPath: res.filePath, mainDBPath: db.path },
|
||||
});
|
||||
electronModule.dialog = {};
|
||||
browserWindow.webContents.send = () => {};
|
||||
});
|
||||
|
||||
test('moveDBFile (canceled)', async () => {
|
||||
@@ -469,3 +477,18 @@ describe('dialog handlers', () => {
|
||||
electronModule.dialog = {};
|
||||
});
|
||||
});
|
||||
|
||||
describe('applicationMenu', () => {
|
||||
// test some basic IPC events
|
||||
test('applicationMenu event', async () => {
|
||||
const { applicationMenuSubjects } = await import('../application-menu');
|
||||
const sendStub = vi.fn();
|
||||
browserWindow.webContents.send = sendStub;
|
||||
applicationMenuSubjects.newPageAction.next();
|
||||
expect(sendStub).toHaveBeenCalledWith(
|
||||
'applicationMenu:onNewPageAction',
|
||||
undefined
|
||||
);
|
||||
browserWindow.webContents.send = () => {};
|
||||
});
|
||||
});
|
||||
|
||||
1
apps/electron/layers/main/src/db/__tests__/.gitignore
vendored
Normal file
1
apps/electron/layers/main/src/db/__tests__/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
tmp
|
||||
147
apps/electron/layers/main/src/db/__tests__/ensure-db.spec.ts
Normal file
147
apps/electron/layers/main/src/db/__tests__/ensure-db.spec.ts
Normal file
@@ -0,0 +1,147 @@
|
||||
import path from 'node:path';
|
||||
|
||||
import fs from 'fs-extra';
|
||||
import { v4 } from 'uuid';
|
||||
import { afterEach, beforeEach, expect, test, vi } from 'vitest';
|
||||
|
||||
const tmpDir = path.join(__dirname, 'tmp');
|
||||
|
||||
const registeredHandlers = new Map<
|
||||
string,
|
||||
((...args: any[]) => Promise<any>)[]
|
||||
>();
|
||||
|
||||
const SESSION_DATA_PATH = path.join(tmpDir, 'affine-test');
|
||||
const DOCUMENTS_PATH = path.join(tmpDir, 'affine-test-documents');
|
||||
|
||||
const electronModule = {
|
||||
app: {
|
||||
getPath: (name: string) => {
|
||||
if (name === 'sessionData') {
|
||||
return SESSION_DATA_PATH;
|
||||
} else if (name === 'documents') {
|
||||
return DOCUMENTS_PATH;
|
||||
}
|
||||
throw new Error('not implemented');
|
||||
},
|
||||
name: 'affine-test',
|
||||
on: (name: string, callback: (...args: any[]) => any) => {
|
||||
const handlers = registeredHandlers.get(name) || [];
|
||||
handlers.push(callback);
|
||||
registeredHandlers.set(name, handlers);
|
||||
},
|
||||
addEventListener: (...args: any[]) => {
|
||||
// @ts-ignore
|
||||
electronModule.app.on(...args);
|
||||
},
|
||||
removeEventListener: () => {},
|
||||
},
|
||||
shell: {} as Partial<Electron.Shell>,
|
||||
dialog: {} as Partial<Electron.Dialog>,
|
||||
};
|
||||
|
||||
const runHandler = (key: string) => {
|
||||
registeredHandlers.get(key)?.forEach(handler => handler());
|
||||
};
|
||||
|
||||
// dynamically import handlers so that we can inject local variables to mocks
|
||||
vi.doMock('electron', () => {
|
||||
return electronModule;
|
||||
});
|
||||
|
||||
const constructorStub = vi.fn();
|
||||
const destroyStub = vi.fn();
|
||||
|
||||
vi.doMock('../secondary-db', () => {
|
||||
return {
|
||||
SecondaryWorkspaceSQLiteDB: class {
|
||||
constructor(...args: any[]) {
|
||||
constructorStub(...args);
|
||||
}
|
||||
|
||||
destroy = destroyStub;
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
runHandler('before-quit');
|
||||
await fs.remove(tmpDir);
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
test('can get a valid WorkspaceSQLiteDB', async () => {
|
||||
const { ensureSQLiteDB } = await import('../ensure-db');
|
||||
const workspaceId = v4();
|
||||
const db0 = await ensureSQLiteDB(workspaceId);
|
||||
expect(db0).toBeDefined();
|
||||
expect(db0.workspaceId).toBe(workspaceId);
|
||||
|
||||
const db1 = await ensureSQLiteDB(v4());
|
||||
expect(db1).not.toBe(db0);
|
||||
expect(db1.workspaceId).not.toBe(db0.workspaceId);
|
||||
|
||||
// ensure that the db is cached
|
||||
expect(await ensureSQLiteDB(workspaceId)).toBe(db0);
|
||||
});
|
||||
|
||||
test('db should be destroyed when app quits', async () => {
|
||||
const { ensureSQLiteDB } = await import('../ensure-db');
|
||||
const workspaceId = v4();
|
||||
const db0 = await ensureSQLiteDB(workspaceId);
|
||||
const db1 = await ensureSQLiteDB(v4());
|
||||
|
||||
expect(db0.db).not.toBeNull();
|
||||
expect(db1.db).not.toBeNull();
|
||||
|
||||
runHandler('before-quit');
|
||||
|
||||
expect(db0.db).toBeNull();
|
||||
expect(db1.db).toBeNull();
|
||||
});
|
||||
|
||||
test('if db has a secondary db path, we should also poll that', async () => {
|
||||
const { ensureSQLiteDB } = await import('../ensure-db');
|
||||
const { appContext } = await import('../../context');
|
||||
const { storeWorkspaceMeta } = await import('../../workspace');
|
||||
const workspaceId = v4();
|
||||
await storeWorkspaceMeta(appContext, workspaceId, {
|
||||
secondaryDBPath: path.join(tmpDir, 'secondary.db'),
|
||||
});
|
||||
|
||||
const db = await ensureSQLiteDB(workspaceId);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1500);
|
||||
|
||||
// not sure why but we still need to wait with real timer
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
|
||||
expect(constructorStub).toBeCalledTimes(1);
|
||||
expect(constructorStub).toBeCalledWith(path.join(tmpDir, 'secondary.db'), db);
|
||||
|
||||
// if secondary meta is changed
|
||||
await storeWorkspaceMeta(appContext, workspaceId, {
|
||||
secondaryDBPath: path.join(tmpDir, 'secondary2.db'),
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1500);
|
||||
expect(constructorStub).toBeCalledTimes(2);
|
||||
expect(destroyStub).toBeCalledTimes(1);
|
||||
|
||||
// if secondary meta is changed (but another workspace)
|
||||
await storeWorkspaceMeta(appContext, v4(), {
|
||||
secondaryDBPath: path.join(tmpDir, 'secondary3.db'),
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(1500);
|
||||
expect(constructorStub).toBeCalledTimes(2);
|
||||
expect(destroyStub).toBeCalledTimes(1);
|
||||
|
||||
// if primary is destroyed, secondary should also be destroyed
|
||||
db.destroy();
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
expect(destroyStub).toBeCalledTimes(2);
|
||||
});
|
||||
@@ -0,0 +1,101 @@
|
||||
import path from 'node:path';
|
||||
|
||||
import fs from 'fs-extra';
|
||||
import { v4 } from 'uuid';
|
||||
import { afterEach, expect, test, vi } from 'vitest';
|
||||
import * as Y from 'yjs';
|
||||
|
||||
import type { AppContext } from '../../context';
|
||||
import { dbSubjects } from '../subjects';
|
||||
|
||||
const tmpDir = path.join(__dirname, 'tmp');
|
||||
|
||||
const testAppContext: AppContext = {
|
||||
appDataPath: path.join(tmpDir, 'test-data'),
|
||||
appName: 'test',
|
||||
};
|
||||
|
||||
afterEach(async () => {
|
||||
if (process.platform !== 'win32') {
|
||||
// hmmm ....
|
||||
await fs.remove(tmpDir);
|
||||
}
|
||||
});
|
||||
|
||||
function getTestUpdates() {
|
||||
const testYDoc = new Y.Doc();
|
||||
const yText = testYDoc.getText('test');
|
||||
yText.insert(0, 'hello');
|
||||
const updates = Y.encodeStateAsUpdate(testYDoc);
|
||||
|
||||
return updates;
|
||||
}
|
||||
test('can create new db file if not exists', async () => {
|
||||
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
|
||||
const workspaceId = v4();
|
||||
const db = await openWorkspaceDatabase(testAppContext, workspaceId);
|
||||
const dbPath = path.join(
|
||||
testAppContext.appDataPath,
|
||||
`workspaces/${workspaceId}`,
|
||||
`storage.db`
|
||||
);
|
||||
expect(await fs.exists(dbPath)).toBe(true);
|
||||
db.destroy();
|
||||
});
|
||||
|
||||
test('on applyUpdate (from self), will not trigger update', async () => {
|
||||
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
|
||||
const workspaceId = v4();
|
||||
const onUpdate = vi.fn();
|
||||
|
||||
const db = await openWorkspaceDatabase(testAppContext, workspaceId);
|
||||
db.update$.subscribe(onUpdate);
|
||||
db.applyUpdate(getTestUpdates(), 'self');
|
||||
expect(onUpdate).not.toHaveBeenCalled();
|
||||
db.destroy();
|
||||
});
|
||||
|
||||
test('on applyUpdate (from renderer), will trigger update', async () => {
|
||||
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
|
||||
const workspaceId = v4();
|
||||
const onUpdate = vi.fn();
|
||||
const onExternalUpdate = vi.fn();
|
||||
|
||||
const db = await openWorkspaceDatabase(testAppContext, workspaceId);
|
||||
db.update$.subscribe(onUpdate);
|
||||
const sub = dbSubjects.externalUpdate.subscribe(onExternalUpdate);
|
||||
db.applyUpdate(getTestUpdates(), 'renderer');
|
||||
expect(onUpdate).toHaveBeenCalled(); // not yet updated
|
||||
sub.unsubscribe();
|
||||
db.destroy();
|
||||
});
|
||||
|
||||
test('on applyUpdate (from external), will trigger update & send external update event', async () => {
|
||||
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
|
||||
const workspaceId = v4();
|
||||
const onUpdate = vi.fn();
|
||||
const onExternalUpdate = vi.fn();
|
||||
|
||||
const db = await openWorkspaceDatabase(testAppContext, workspaceId);
|
||||
db.update$.subscribe(onUpdate);
|
||||
const sub = dbSubjects.externalUpdate.subscribe(onExternalUpdate);
|
||||
db.applyUpdate(getTestUpdates(), 'external');
|
||||
expect(onUpdate).toHaveBeenCalled();
|
||||
expect(onExternalUpdate).toHaveBeenCalled();
|
||||
sub.unsubscribe();
|
||||
db.destroy();
|
||||
});
|
||||
|
||||
test('on destroy, check if resources have been released', async () => {
|
||||
const { openWorkspaceDatabase } = await import('../workspace-db-adapter');
|
||||
const workspaceId = v4();
|
||||
const db = await openWorkspaceDatabase(testAppContext, workspaceId);
|
||||
const updateSub = {
|
||||
complete: vi.fn(),
|
||||
next: vi.fn(),
|
||||
};
|
||||
db.update$ = updateSub as any;
|
||||
db.destroy();
|
||||
expect(db.db).toBe(null);
|
||||
expect(updateSub.complete).toHaveBeenCalled();
|
||||
});
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
from,
|
||||
fromEvent,
|
||||
interval,
|
||||
merge,
|
||||
Observable,
|
||||
} from 'rxjs';
|
||||
import {
|
||||
@@ -36,6 +37,7 @@ function getWorkspaceDB$(id: string) {
|
||||
db$Map.set(
|
||||
id,
|
||||
from(openWorkspaceDatabase(appContext, id)).pipe(
|
||||
shareReplay(1),
|
||||
switchMap(db => {
|
||||
return startPollingSecondaryDB(db).pipe(
|
||||
ignoreElements(),
|
||||
@@ -57,7 +59,6 @@ function getWorkspaceDB$(id: string) {
|
||||
return db$Map.get(id)!;
|
||||
}
|
||||
|
||||
// fixme: this function has issue on registering multiple times...
|
||||
function startPollingSecondaryDB(db: WorkspaceSQLiteDB) {
|
||||
const meta$ = getWorkspaceMeta$(db.workspaceId);
|
||||
const secondaryDB$ = meta$.pipe(
|
||||
@@ -65,28 +66,43 @@ function startPollingSecondaryDB(db: WorkspaceSQLiteDB) {
|
||||
distinctUntilChanged(),
|
||||
filter((p): p is string => !!p),
|
||||
switchMap(path => {
|
||||
const secondaryDB = new SecondaryWorkspaceSQLiteDB(path, db);
|
||||
return new Observable<SecondaryWorkspaceSQLiteDB>(observer => {
|
||||
const secondaryDB = new SecondaryWorkspaceSQLiteDB(path, db);
|
||||
observer.next(secondaryDB);
|
||||
return () => {
|
||||
logger.info(
|
||||
'[ensureSQLiteDB] close secondary db connection',
|
||||
secondaryDB.path
|
||||
);
|
||||
secondaryDB.destroy();
|
||||
};
|
||||
});
|
||||
})
|
||||
}),
|
||||
takeUntil(db.update$.pipe(last())),
|
||||
shareReplay(1)
|
||||
);
|
||||
|
||||
const firstDelayedTick$ = defer(() => {
|
||||
return new Promise<number>(resolve =>
|
||||
setTimeout(() => {
|
||||
resolve(0);
|
||||
}, 1000)
|
||||
);
|
||||
});
|
||||
|
||||
// pull every 30 seconds
|
||||
const poll$ = interval(30000).pipe(
|
||||
const poll$ = merge(firstDelayedTick$, interval(30000)).pipe(
|
||||
switchMap(() => secondaryDB$),
|
||||
tap({
|
||||
next: secondaryDB => {
|
||||
secondaryDB.pull();
|
||||
},
|
||||
}),
|
||||
takeUntil(db.update$.pipe(last())),
|
||||
shareReplay(1)
|
||||
);
|
||||
|
||||
return poll$.pipe(takeUntil(db.update$.pipe(last())), shareReplay(1));
|
||||
return poll$;
|
||||
}
|
||||
|
||||
export function ensureSQLiteDB(id: string) {
|
||||
|
||||
@@ -4,6 +4,7 @@ import * as Y from 'yjs';
|
||||
import type { AppContext } from '../context';
|
||||
import { logger } from '../logger';
|
||||
import type { YOrigin } from '../type';
|
||||
import { mergeUpdateWorker } from '../workers';
|
||||
import { getWorkspaceMeta } from '../workspace';
|
||||
import { BaseSQLiteAdapter } from './base-db-adapter';
|
||||
import type { WorkspaceSQLiteDB } from './workspace-db-adapter';
|
||||
@@ -26,6 +27,7 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
) {
|
||||
super(path);
|
||||
this.setupAndListen();
|
||||
logger.debug('[SecondaryWorkspaceSQLiteDB] created', this.workspaceId);
|
||||
}
|
||||
|
||||
close() {
|
||||
@@ -34,10 +36,10 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
}
|
||||
|
||||
override destroy() {
|
||||
this.flushUpdateQueue();
|
||||
this.unsubscribers.forEach(unsub => unsub());
|
||||
this.db?.close();
|
||||
this.yDoc.destroy();
|
||||
this.close();
|
||||
}
|
||||
|
||||
get workspaceId() {
|
||||
@@ -74,13 +76,13 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
|
||||
// wrap the fn with connect and close
|
||||
// it only works for sync functions
|
||||
run = (fn: () => void) => {
|
||||
run = <T extends (...args: any[]) => any>(fn: T) => {
|
||||
try {
|
||||
if (this.runCounter === 0) {
|
||||
this.connect();
|
||||
}
|
||||
this.runCounter++;
|
||||
fn();
|
||||
return fn();
|
||||
} catch (err) {
|
||||
logger.error(err);
|
||||
} finally {
|
||||
@@ -166,19 +168,24 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
* - get blobs and put new blobs to upstream
|
||||
* - disconnect
|
||||
*/
|
||||
pull() {
|
||||
this.run(() => {
|
||||
async pull() {
|
||||
const start = performance.now();
|
||||
const updates = this.run(() => {
|
||||
// TODO: no need to get all updates, just get the latest ones (using a cursor, etc)?
|
||||
const updates = this.getUpdates().map(update => update.data);
|
||||
Y.transact(this.yDoc, () => {
|
||||
updates.forEach(update => {
|
||||
this.applyUpdate(update, 'self');
|
||||
});
|
||||
});
|
||||
logger.debug('pull external updates', this.path, updates.length);
|
||||
|
||||
this.syncBlobs();
|
||||
return this.getUpdates().map(update => update.data);
|
||||
});
|
||||
|
||||
const merged = await mergeUpdateWorker(updates);
|
||||
this.applyUpdate(merged, 'self');
|
||||
|
||||
logger.debug(
|
||||
'pull external updates',
|
||||
this.path,
|
||||
updates.length,
|
||||
(performance.now() - start).toFixed(2),
|
||||
'ms'
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import * as Y from 'yjs';
|
||||
import type { AppContext } from '../context';
|
||||
import { logger } from '../logger';
|
||||
import type { YOrigin } from '../type';
|
||||
import { mergeUpdateWorker } from '../workers';
|
||||
import { getWorkspaceMeta } from '../workspace';
|
||||
import { BaseSQLiteAdapter } from './base-db-adapter';
|
||||
import { dbSubjects } from './subjects';
|
||||
@@ -21,6 +22,7 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
|
||||
override destroy() {
|
||||
this.db?.close();
|
||||
this.db = null;
|
||||
this.yDoc.destroy();
|
||||
|
||||
// when db is closed, we can safely remove it from ensure-db list
|
||||
@@ -31,15 +33,15 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
return this.yDoc.getMap('space:meta').get('name') as string;
|
||||
};
|
||||
|
||||
override connect() {
|
||||
async init() {
|
||||
const db = super.connect();
|
||||
|
||||
if (!this.firstConnected) {
|
||||
this.yDoc.on('update', (update: Uint8Array, origin: YOrigin) => {
|
||||
if (origin !== 'self') {
|
||||
if (origin === 'renderer') {
|
||||
this.addUpdateToSQLite([update]);
|
||||
} else if (origin === 'external') {
|
||||
this.addUpdateToSQLite([update]);
|
||||
}
|
||||
if (origin === 'external') {
|
||||
logger.debug('external update', this.workspaceId);
|
||||
dbSubjects.externalUpdate.next({
|
||||
workspaceId: this.workspaceId,
|
||||
@@ -50,12 +52,10 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
}
|
||||
|
||||
const updates = this.getUpdates();
|
||||
const merged = await mergeUpdateWorker(updates.map(update => update.data));
|
||||
|
||||
// to initialize the yDoc, we need to apply all updates from the db
|
||||
Y.transact(this.yDoc, () => {
|
||||
updates.forEach(update => {
|
||||
this.applyUpdate(update.data, 'self');
|
||||
});
|
||||
});
|
||||
this.applyUpdate(merged, 'self');
|
||||
|
||||
this.firstConnected = true;
|
||||
this.update$.next();
|
||||
@@ -100,6 +100,6 @@ export async function openWorkspaceDatabase(
|
||||
) {
|
||||
const meta = await getWorkspaceMeta(context, workspaceId);
|
||||
const db = new WorkspaceSQLiteDB(meta.mainDBPath, workspaceId);
|
||||
await db.connect();
|
||||
await db.init();
|
||||
return db;
|
||||
}
|
||||
|
||||
@@ -21,9 +21,18 @@ export function registerEvents() {
|
||||
// register events
|
||||
for (const [namespace, namespaceEvents] of Object.entries(allEvents)) {
|
||||
for (const [key, eventRegister] of Object.entries(namespaceEvents)) {
|
||||
const subscription = eventRegister((...args: any) => {
|
||||
const subscription = eventRegister((...args: any[]) => {
|
||||
const chan = `${namespace}:${key}`;
|
||||
logger.info('[ipc-event]', chan, args);
|
||||
logger.info(
|
||||
'[ipc-event]',
|
||||
chan,
|
||||
args.filter(
|
||||
a =>
|
||||
a !== undefined &&
|
||||
typeof a !== 'function' &&
|
||||
typeof a !== 'object'
|
||||
)
|
||||
);
|
||||
getActiveWindows().forEach(win => win.webContents.send(chan, ...args));
|
||||
});
|
||||
app.on('before-quit', () => {
|
||||
|
||||
35
apps/electron/layers/main/src/workers/index.ts
Normal file
35
apps/electron/layers/main/src/workers/index.ts
Normal file
@@ -0,0 +1,35 @@
|
||||
import path from 'node:path';
|
||||
import { Worker } from 'node:worker_threads';
|
||||
|
||||
import { mergeUpdate } from './merge-update';
|
||||
|
||||
export function mergeUpdateWorker(updates: Uint8Array[]) {
|
||||
// fallback to main thread if worker is disabled (in vitest)
|
||||
if (process.env.USE_WORKER !== 'true') {
|
||||
return mergeUpdate(updates);
|
||||
}
|
||||
return new Promise<Uint8Array>((resolve, reject) => {
|
||||
// it is intended to have "./workers" in the path
|
||||
const workerFile = path.join(__dirname, './workers/merge-update.worker.js');
|
||||
|
||||
// convert updates to SharedArrayBuffer[s]
|
||||
const sharedArrayBufferUpdates = updates.map(update => {
|
||||
const buffer = new SharedArrayBuffer(update.byteLength);
|
||||
const view = new Uint8Array(buffer);
|
||||
view.set(update);
|
||||
return view;
|
||||
});
|
||||
|
||||
const worker = new Worker(workerFile, {
|
||||
workerData: sharedArrayBufferUpdates,
|
||||
});
|
||||
|
||||
worker.on('message', resolve);
|
||||
worker.on('error', reject);
|
||||
worker.on('exit', code => {
|
||||
if (code !== 0) {
|
||||
reject(new Error(`Worker stopped with exit code ${code}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
11
apps/electron/layers/main/src/workers/merge-update.ts
Normal file
11
apps/electron/layers/main/src/workers/merge-update.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import * as Y from 'yjs';
|
||||
|
||||
export function mergeUpdate(updates: Uint8Array[]) {
|
||||
const yDoc = new Y.Doc();
|
||||
Y.transact(yDoc, () => {
|
||||
for (const update of updates) {
|
||||
Y.applyUpdate(yDoc, update);
|
||||
}
|
||||
});
|
||||
return Y.encodeStateAsUpdate(yDoc);
|
||||
}
|
||||
14
apps/electron/layers/main/src/workers/merge-update.worker.ts
Normal file
14
apps/electron/layers/main/src/workers/merge-update.worker.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
import { parentPort, workerData } from 'node:worker_threads';
|
||||
|
||||
import { mergeUpdate } from './merge-update';
|
||||
|
||||
function getMergeUpdate(updates: Uint8Array[]) {
|
||||
const update = mergeUpdate(updates);
|
||||
const buffer = new SharedArrayBuffer(update.byteLength);
|
||||
const view = new Uint8Array(buffer);
|
||||
view.set(update);
|
||||
|
||||
return update;
|
||||
}
|
||||
|
||||
parentPort?.postMessage(getMergeUpdate(workerData));
|
||||
1
apps/electron/layers/main/src/workspace/__tests__/.gitignore
vendored
Normal file
1
apps/electron/layers/main/src/workspace/__tests__/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
tmp
|
||||
@@ -0,0 +1,208 @@
|
||||
import path from 'node:path';
|
||||
|
||||
import fs from 'fs-extra';
|
||||
import { v4 } from 'uuid';
|
||||
import { afterEach, describe, expect, test, vi } from 'vitest';
|
||||
|
||||
import type { AppContext } from '../../context';
|
||||
|
||||
const tmpDir = path.join(__dirname, 'tmp');
|
||||
|
||||
const testAppContext: AppContext = {
|
||||
appDataPath: path.join(tmpDir, 'test-data'),
|
||||
appName: 'test',
|
||||
};
|
||||
|
||||
vi.doMock('../../context', () => ({
|
||||
appContext: testAppContext,
|
||||
}));
|
||||
|
||||
vi.doMock('../../db/ensure-db', () => ({
|
||||
ensureSQLiteDB: async () => ({
|
||||
destroy: () => {},
|
||||
}),
|
||||
}));
|
||||
|
||||
afterEach(async () => {
|
||||
await fs.remove(tmpDir);
|
||||
});
|
||||
|
||||
describe('list workspaces', () => {
|
||||
test('listWorkspaces (valid)', async () => {
|
||||
const { listWorkspaces } = await import('../handlers');
|
||||
const workspaceId = v4();
|
||||
const workspacePath = path.join(
|
||||
testAppContext.appDataPath,
|
||||
'workspaces',
|
||||
workspaceId
|
||||
);
|
||||
const meta = {
|
||||
id: workspaceId,
|
||||
};
|
||||
await fs.ensureDir(workspacePath);
|
||||
await fs.writeJSON(path.join(workspacePath, 'meta.json'), meta);
|
||||
const workspaces = await listWorkspaces(testAppContext);
|
||||
expect(workspaces).toEqual([[workspaceId, meta]]);
|
||||
});
|
||||
|
||||
test('listWorkspaces (without meta json file)', async () => {
|
||||
const { listWorkspaces } = await import('../handlers');
|
||||
const workspaceId = v4();
|
||||
const workspacePath = path.join(
|
||||
testAppContext.appDataPath,
|
||||
'workspaces',
|
||||
workspaceId
|
||||
);
|
||||
await fs.ensureDir(workspacePath);
|
||||
const workspaces = await listWorkspaces(testAppContext);
|
||||
expect(workspaces).toEqual([
|
||||
[
|
||||
workspaceId,
|
||||
// meta file will be created automatically
|
||||
{ id: workspaceId, mainDBPath: path.join(workspacePath, 'storage.db') },
|
||||
],
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('delete workspace', () => {
|
||||
test('deleteWorkspace', async () => {
|
||||
const { deleteWorkspace } = await import('../handlers');
|
||||
const workspaceId = v4();
|
||||
const workspacePath = path.join(
|
||||
testAppContext.appDataPath,
|
||||
'workspaces',
|
||||
workspaceId
|
||||
);
|
||||
await fs.ensureDir(workspacePath);
|
||||
await deleteWorkspace(testAppContext, workspaceId);
|
||||
expect(await fs.pathExists(workspacePath)).toBe(false);
|
||||
// removed workspace will be moved to delete-workspaces
|
||||
expect(
|
||||
await fs.pathExists(
|
||||
path.join(testAppContext.appDataPath, 'delete-workspaces', workspaceId)
|
||||
)
|
||||
).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getWorkspaceMeta', () => {
|
||||
test('can get meta', async () => {
|
||||
const { getWorkspaceMeta } = await import('../handlers');
|
||||
const workspaceId = v4();
|
||||
const workspacePath = path.join(
|
||||
testAppContext.appDataPath,
|
||||
'workspaces',
|
||||
workspaceId
|
||||
);
|
||||
const meta = {
|
||||
id: workspaceId,
|
||||
};
|
||||
await fs.ensureDir(workspacePath);
|
||||
await fs.writeJSON(path.join(workspacePath, 'meta.json'), meta);
|
||||
expect(await getWorkspaceMeta(testAppContext, workspaceId)).toEqual(meta);
|
||||
});
|
||||
|
||||
test('can create meta if not exists', async () => {
|
||||
const { getWorkspaceMeta } = await import('../handlers');
|
||||
const workspaceId = v4();
|
||||
const workspacePath = path.join(
|
||||
testAppContext.appDataPath,
|
||||
'workspaces',
|
||||
workspaceId
|
||||
);
|
||||
await fs.ensureDir(workspacePath);
|
||||
expect(await getWorkspaceMeta(testAppContext, workspaceId)).toEqual({
|
||||
id: workspaceId,
|
||||
mainDBPath: path.join(workspacePath, 'storage.db'),
|
||||
});
|
||||
expect(
|
||||
await fs.pathExists(path.join(workspacePath, 'meta.json'))
|
||||
).toBeTruthy();
|
||||
});
|
||||
|
||||
test('can migrate meta if db file is a link', async () => {
|
||||
const { getWorkspaceMeta } = await import('../handlers');
|
||||
const workspaceId = v4();
|
||||
const workspacePath = path.join(
|
||||
testAppContext.appDataPath,
|
||||
'workspaces',
|
||||
workspaceId
|
||||
);
|
||||
await fs.ensureDir(workspacePath);
|
||||
const sourcePath = path.join(tmpDir, 'source.db');
|
||||
await fs.writeFile(sourcePath, 'test');
|
||||
|
||||
await fs.ensureSymlink(sourcePath, path.join(workspacePath, 'storage.db'));
|
||||
|
||||
expect(await getWorkspaceMeta(testAppContext, workspaceId)).toEqual({
|
||||
id: workspaceId,
|
||||
mainDBPath: path.join(workspacePath, 'storage.db'),
|
||||
secondaryDBPath: sourcePath,
|
||||
});
|
||||
|
||||
expect(
|
||||
await fs.pathExists(path.join(workspacePath, 'meta.json'))
|
||||
).toBeTruthy();
|
||||
});
|
||||
});
|
||||
|
||||
test('storeWorkspaceMeta', async () => {
|
||||
const { storeWorkspaceMeta } = await import('../handlers');
|
||||
const workspaceId = v4();
|
||||
const workspacePath = path.join(
|
||||
testAppContext.appDataPath,
|
||||
'workspaces',
|
||||
workspaceId
|
||||
);
|
||||
await fs.ensureDir(workspacePath);
|
||||
const meta = {
|
||||
id: workspaceId,
|
||||
mainDBPath: path.join(workspacePath, 'storage.db'),
|
||||
};
|
||||
await storeWorkspaceMeta(testAppContext, workspaceId, meta);
|
||||
expect(await fs.readJSON(path.join(workspacePath, 'meta.json'))).toEqual(
|
||||
meta
|
||||
);
|
||||
await storeWorkspaceMeta(testAppContext, workspaceId, {
|
||||
secondaryDBPath: path.join(tmpDir, 'test.db'),
|
||||
});
|
||||
expect(await fs.readJSON(path.join(workspacePath, 'meta.json'))).toEqual({
|
||||
...meta,
|
||||
secondaryDBPath: path.join(tmpDir, 'test.db'),
|
||||
});
|
||||
});
|
||||
|
||||
test('getWorkspaceMeta observable', async () => {
|
||||
const { storeWorkspaceMeta } = await import('../handlers');
|
||||
const { getWorkspaceMeta$ } = await import('../index');
|
||||
|
||||
const workspaceId = v4();
|
||||
const workspacePath = path.join(
|
||||
testAppContext.appDataPath,
|
||||
'workspaces',
|
||||
workspaceId
|
||||
);
|
||||
|
||||
const metaChange = vi.fn();
|
||||
|
||||
const meta$ = getWorkspaceMeta$(workspaceId);
|
||||
|
||||
meta$.subscribe(metaChange);
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
|
||||
expect(metaChange).toHaveBeenCalledWith({
|
||||
id: workspaceId,
|
||||
mainDBPath: path.join(workspacePath, 'storage.db'),
|
||||
});
|
||||
|
||||
await storeWorkspaceMeta(testAppContext, workspaceId, {
|
||||
secondaryDBPath: path.join(tmpDir, 'test.db'),
|
||||
});
|
||||
|
||||
expect(metaChange).toHaveBeenCalledWith({
|
||||
id: workspaceId,
|
||||
mainDBPath: path.join(workspacePath, 'storage.db'),
|
||||
secondaryDBPath: path.join(tmpDir, 'test.db'),
|
||||
});
|
||||
});
|
||||
@@ -42,7 +42,6 @@ export async function deleteWorkspace(context: AppContext, id: string) {
|
||||
try {
|
||||
const db = await ensureSQLiteDB(id);
|
||||
db.destroy();
|
||||
// TODO: should remove DB connection first
|
||||
return await fs.move(basePath, movedPath, {
|
||||
overwrite: true,
|
||||
});
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { from, merge } from 'rxjs';
|
||||
import { map } from 'rxjs/operators';
|
||||
import { merge } from 'rxjs';
|
||||
import { filter, map } from 'rxjs/operators';
|
||||
|
||||
import { appContext } from '../context';
|
||||
import type {
|
||||
@@ -35,7 +35,10 @@ export const workspaceHandlers = {
|
||||
// used internally. Get a stream of workspace id -> meta
|
||||
export const getWorkspaceMeta$ = (workspaceId: string) => {
|
||||
return merge(
|
||||
from(getWorkspaceMeta(appContext, workspaceId)),
|
||||
workspaceSubjects.meta.pipe(map(meta => meta.meta))
|
||||
getWorkspaceMeta(appContext, workspaceId),
|
||||
workspaceSubjects.meta.pipe(
|
||||
map(meta => meta.meta),
|
||||
filter(meta => meta.id === workspaceId)
|
||||
)
|
||||
);
|
||||
};
|
||||
|
||||
@@ -23,6 +23,7 @@ export const config = () => {
|
||||
JSON.stringify(process.env[key] ?? ''),
|
||||
]),
|
||||
['process.env.NODE_ENV', `"${mode}"`],
|
||||
['process.env.USE_WORKER', '"true"'],
|
||||
]);
|
||||
|
||||
if (DEV_SERVER_URL) {
|
||||
@@ -31,7 +32,10 @@ export const config = () => {
|
||||
|
||||
return {
|
||||
main: {
|
||||
entryPoints: [resolve(root, './layers/main/src/index.ts')],
|
||||
entryPoints: [
|
||||
resolve(root, './layers/main/src/index.ts'),
|
||||
resolve(root, './layers/main/src/workers/merge-update.worker.ts'),
|
||||
],
|
||||
outdir: resolve(root, './dist/layers/main'),
|
||||
bundle: true,
|
||||
target: `node${NODE_MAJOR_VERSION}`,
|
||||
|
||||
Reference in New Issue
Block a user