feat(native): move sqlite operation into Rust (#2497)

Co-authored-by: Peng Xiao <pengxiao@outlook.com>
This commit is contained in:
LongYinan
2023-06-07 14:52:19 +08:00
committed by GitHub
parent 541011ba90
commit d28c887237
36 changed files with 1910 additions and 545 deletions

View File

@@ -17,22 +17,6 @@ yarn dev # or yarn prod for production build
## Troubleshooting
### better-sqlite3 error
When running tests or starting electron, you may encounter the following error:
> Error: The module 'apps/electron/node_modules/better-sqlite3/build/Release/better_sqlite3.node'
This is due to the fact that the `better-sqlite3` package is built for the Node.js version in Electron & in your machine. To fix this, run the following command based on different cases:
```sh
# for running unit tests, we are not using Electron's node:
yarn rebuild better-sqlite3
# for running Electron, we are using Electron's node:
yarn postinstall
```
## Credits
Most of the boilerplate code is generously borrowed from the following

View File

@@ -1,5 +1,6 @@
import assert from 'node:assert';
import path from 'node:path';
import { setTimeout } from 'node:timers/promises';
import fs from 'fs-extra';
import { v4 } from 'uuid';
@@ -13,8 +14,6 @@ const registeredHandlers = new Map<
((...args: any[]) => Promise<any>)[]
>();
const delay = (ms: number) => new Promise(r => setTimeout(r, ms));
type WithoutFirstParameter<T> = T extends (_: any, ...args: infer P) => infer R
? (...args: P) => R
: T;
@@ -72,11 +71,14 @@ const nativeTheme = {
themeSource: 'light',
};
function compareBuffer(a: Uint8Array | null, b: Uint8Array | null) {
function compareBuffer(
a: Uint8Array | null | undefined,
b: Uint8Array | null | undefined
) {
if (
(a === null && b === null) ||
a === null ||
b === null ||
(a == null && b == null) ||
a == null ||
b == null ||
a.length !== b.length
) {
return false;
@@ -105,11 +107,11 @@ const electronModule = {
handlers.push(callback);
registeredHandlers.set(name, handlers);
},
addEventListener: (...args: any[]) => {
addListener: (...args: any[]) => {
// @ts-ignore
electronModule.app.on(...args);
},
removeEventListener: () => {},
removeListener: () => {},
},
BrowserWindow: {
getAllWindows: () => {
@@ -135,7 +137,6 @@ beforeEach(async () => {
const { registerEvents } = await import('../events');
registerEvents();
await fs.mkdirp(SESSION_DATA_PATH);
await import('../db/ensure-db');
registeredHandlers.get('ready')?.forEach(fn => fn());
});
@@ -143,7 +144,10 @@ beforeEach(async () => {
afterEach(async () => {
// reset registered handlers
registeredHandlers.get('before-quit')?.forEach(fn => fn());
// wait for the db to be closed on Windows
if (process.platform === 'win32') {
await setTimeout(200);
}
await fs.remove(SESSION_DATA_PATH);
});
@@ -175,7 +179,7 @@ describe('ensureSQLiteDB', () => {
const fileExists = await fs.pathExists(file);
expect(fileExists).toBe(true);
registeredHandlers.get('before-quit')?.forEach(fn => fn());
await delay(100);
await setTimeout(100);
expect(workspaceDB.db).toBe(null);
});
});
@@ -254,7 +258,7 @@ describe('db handlers', () => {
test('get non existent blob', async () => {
const workspaceId = v4();
const bin = await dispatch('db', 'getBlob', workspaceId, 'non-existent-id');
expect(bin).toBeNull();
expect(bin).toBeUndefined();
});
test('list blobs (empty)', async () => {

View File

@@ -1,4 +1,5 @@
import path from 'node:path';
import { setTimeout } from 'node:timers/promises';
import fs from 'fs-extra';
import { v4 } from 'uuid';
@@ -30,18 +31,20 @@ const electronModule = {
handlers.push(callback);
registeredHandlers.set(name, handlers);
},
addEventListener: (...args: any[]) => {
addListener: (...args: any[]) => {
// @ts-ignore
electronModule.app.on(...args);
},
removeEventListener: () => {},
removeListener: () => {},
},
shell: {} as Partial<Electron.Shell>,
dialog: {} as Partial<Electron.Dialog>,
};
const runHandler = (key: string) => {
registeredHandlers.get(key)?.forEach(handler => handler());
const runHandler = async (key: string) => {
await Promise.all(
(registeredHandlers.get(key) ?? []).map(handler => handler())
);
};
// dynamically import handlers so that we can inject local variables to mocks
@@ -51,6 +54,7 @@ vi.doMock('electron', () => {
const constructorStub = vi.fn();
const destroyStub = vi.fn();
destroyStub.mockReturnValue(Promise.resolve());
vi.doMock('../secondary-db', () => {
return {
@@ -59,6 +63,10 @@ vi.doMock('../secondary-db', () => {
constructorStub(...args);
}
connectIfNeeded = () => Promise.resolve();
pull = () => Promise.resolve();
destroy = destroyStub;
},
};
@@ -69,7 +77,7 @@ beforeEach(() => {
});
afterEach(async () => {
runHandler('before-quit');
await runHandler('before-quit');
await fs.remove(tmpDir);
vi.useRealTimers();
});
@@ -98,12 +106,24 @@ test('db should be destroyed when app quits', async () => {
expect(db0.db).not.toBeNull();
expect(db1.db).not.toBeNull();
runHandler('before-quit');
await runHandler('before-quit');
// wait the async `db.destroy()` to be called
await setTimeout(100);
expect(db0.db).toBeNull();
expect(db1.db).toBeNull();
});
test('db should be removed in db$Map after destroyed', async () => {
const { ensureSQLiteDB, db$Map } = await import('../ensure-db');
const workspaceId = v4();
const db = await ensureSQLiteDB(workspaceId);
await db.destroy();
await setTimeout(100);
expect(db$Map.has(workspaceId)).toBe(false);
});
test('if db has a secondary db path, we should also poll that', async () => {
const { ensureSQLiteDB } = await import('../ensure-db');
const { appContext } = await import('../../context');
@@ -115,10 +135,7 @@ test('if db has a secondary db path, we should also poll that', async () => {
const db = await ensureSQLiteDB(workspaceId);
await vi.advanceTimersByTimeAsync(1500);
// not sure why but we still need to wait with real timer
await new Promise(resolve => setTimeout(resolve, 100));
await setTimeout(10);
expect(constructorStub).toBeCalledTimes(1);
expect(constructorStub).toBeCalledWith(path.join(tmpDir, 'secondary.db'), db);
@@ -128,7 +145,8 @@ test('if db has a secondary db path, we should also poll that', async () => {
secondaryDBPath: path.join(tmpDir, 'secondary2.db'),
});
await vi.advanceTimersByTimeAsync(1500);
// wait the async `db.destroy()` to be called
await setTimeout(100);
expect(constructorStub).toBeCalledTimes(2);
expect(destroyStub).toBeCalledTimes(1);
@@ -141,7 +159,7 @@ test('if db has a secondary db path, we should also poll that', async () => {
expect(destroyStub).toBeCalledTimes(1);
// if primary is destroyed, secondary should also be destroyed
db.destroy();
await new Promise(resolve => setTimeout(resolve, 100));
await db.destroy();
await setTimeout(100);
expect(destroyStub).toBeCalledTimes(2);
});

View File

@@ -16,10 +16,7 @@ const testAppContext: AppContext = {
};
afterEach(async () => {
if (process.platform !== 'win32') {
// hmmm ....
await fs.remove(tmpDir);
}
await fs.remove(tmpDir);
});
function getTestUpdates() {
@@ -40,7 +37,7 @@ test('can create new db file if not exists', async () => {
`storage.db`
);
expect(await fs.exists(dbPath)).toBe(true);
db.destroy();
await db.destroy();
});
test('on applyUpdate (from self), will not trigger update', async () => {
@@ -52,7 +49,7 @@ test('on applyUpdate (from self), will not trigger update', async () => {
db.update$.subscribe(onUpdate);
db.applyUpdate(getTestUpdates(), 'self');
expect(onUpdate).not.toHaveBeenCalled();
db.destroy();
await db.destroy();
});
test('on applyUpdate (from renderer), will trigger update', async () => {
@@ -67,7 +64,7 @@ test('on applyUpdate (from renderer), will trigger update', async () => {
db.applyUpdate(getTestUpdates(), 'renderer');
expect(onUpdate).toHaveBeenCalled(); // not yet updated
sub.unsubscribe();
db.destroy();
await db.destroy();
});
test('on applyUpdate (from external), will trigger update & send external update event', async () => {
@@ -83,7 +80,7 @@ test('on applyUpdate (from external), will trigger update & send external update
expect(onUpdate).toHaveBeenCalled();
expect(onExternalUpdate).toHaveBeenCalled();
sub.unsubscribe();
db.destroy();
await db.destroy();
});
test('on destroy, check if resources have been released', async () => {
@@ -95,7 +92,7 @@ test('on destroy, check if resources have been released', async () => {
next: vi.fn(),
};
db.update$ = updateSub as any;
db.destroy();
await db.destroy();
expect(db.db).toBe(null);
expect(updateSub.complete).toHaveBeenCalled();
});

View File

@@ -1,120 +1,76 @@
import { SqliteConnection } from '@affine/native';
import assert from 'assert';
import type { Database } from 'better-sqlite3';
import sqlite from 'better-sqlite3';
import { logger } from '../logger';
const schemas = [
`CREATE TABLE IF NOT EXISTS "updates" (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data BLOB NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
)`,
`CREATE TABLE IF NOT EXISTS "blobs" (
key TEXT PRIMARY KEY NOT NULL,
data BLOB NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
)`,
];
interface UpdateRow {
id: number;
data: Buffer;
timestamp: string;
}
interface BlobRow {
key: string;
data: Buffer;
timestamp: string;
}
/**
* A base class for SQLite DB adapter that provides basic methods around updates & blobs
*/
export abstract class BaseSQLiteAdapter {
db: Database | null = null;
db: SqliteConnection | null = null;
abstract role: string;
constructor(public path: string) {}
ensureTables() {
assert(this.db, 'db is not connected');
this.db.exec(schemas.join(';'));
constructor(public readonly path: string) {
logger.info(`[SQLiteAdapter]`, 'path:', path);
}
// todo: what if SQLite DB wrapper later is not sync?
connect(): Database | undefined {
if (this.db) {
return this.db;
async connectIfNeeded() {
if (!this.db) {
this.db = new SqliteConnection(this.path);
await this.db.connect();
}
logger.log(`[SQLiteAdapter][${this.role}] open db`, this.path);
const db = (this.db = sqlite(this.path));
this.ensureTables();
return db;
return this.db;
}
destroy() {
this.db?.close();
async destroy() {
const { db } = this;
this.db = null;
await db?.close();
}
addBlob(key: string, data: Uint8Array) {
async addBlob(key: string, data: Uint8Array) {
try {
assert(this.db, 'db is not connected');
const statement = this.db.prepare(
'INSERT INTO blobs (key, data) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET data = ?'
);
statement.run(key, data, data);
return key;
assert(this.db, `${this.path} is not connected`);
await this.db.addBlob(key, data);
} catch (error) {
logger.error('addBlob', error);
}
}
getBlob(key: string) {
async getBlob(key: string) {
try {
assert(this.db, 'db is not connected');
const statement = this.db.prepare('SELECT data FROM blobs WHERE key = ?');
const row = statement.get(key) as BlobRow;
if (!row) {
return null;
}
return row.data;
assert(this.db, `${this.path} is not connected`);
const blob = await this.db.getBlob(key);
return blob?.data;
} catch (error) {
logger.error('getBlob', error);
return null;
}
}
deleteBlob(key: string) {
async deleteBlob(key: string) {
try {
assert(this.db, 'db is not connected');
const statement = this.db.prepare('DELETE FROM blobs WHERE key = ?');
statement.run(key);
assert(this.db, `${this.path} is not connected`);
await this.db.deleteBlob(key);
} catch (error) {
logger.error('deleteBlob', error);
logger.error(`${this.path} delete blob failed`, error);
}
}
getBlobKeys() {
async getBlobKeys() {
try {
assert(this.db, 'db is not connected');
const statement = this.db.prepare('SELECT key FROM blobs');
const rows = statement.all() as BlobRow[];
return rows.map(row => row.key);
assert(this.db, `${this.path} is not connected`);
return await this.db.getBlobKeys();
} catch (error) {
logger.error('getBlobKeys', error);
logger.error(`getBlobKeys failed`, error);
return [];
}
}
getUpdates() {
async getUpdates() {
try {
assert(this.db, 'db is not connected');
const statement = this.db.prepare('SELECT * FROM updates');
const rows = statement.all() as UpdateRow[];
return rows;
assert(this.db, `${this.path} is not connected`);
return await this.db.getUpdates();
} catch (error) {
logger.error('getUpdates', error);
return [];
@@ -122,22 +78,12 @@ export abstract class BaseSQLiteAdapter {
}
// add a single update to SQLite
addUpdateToSQLite(updates: Uint8Array[]) {
async addUpdateToSQLite(db: SqliteConnection, updates: Uint8Array[]) {
// batch write instead write per key stroke?
try {
assert(this.db, 'db is not connected');
const start = performance.now();
const statement = this.db.prepare(
'INSERT INTO updates (data) VALUES (?)'
);
const insertMany = this.db.transaction(updates => {
for (const d of updates) {
statement.run(d);
}
});
insertMany(updates);
await db.connect();
await db.insertUpdates(updates);
logger.debug(
`[SQLiteAdapter][${this.role}] addUpdateToSQLite`,
'length:',

View File

@@ -1,12 +1,14 @@
import { app } from 'electron';
import type { Subject } from 'rxjs';
import { Observable } from 'rxjs';
import {
concat,
defer,
firstValueFrom,
from,
fromEvent,
interval,
lastValueFrom,
merge,
Observable,
} from 'rxjs';
import {
distinctUntilChanged,
@@ -17,41 +19,83 @@ import {
shareReplay,
startWith,
switchMap,
take,
takeUntil,
tap,
} from 'rxjs/operators';
import { appContext } from '../context';
import { logger } from '../logger';
import { getWorkspaceMeta$ } from '../workspace';
import { getWorkspaceMeta, workspaceSubjects } from '../workspace';
import { SecondaryWorkspaceSQLiteDB } from './secondary-db';
import type { WorkspaceSQLiteDB } from './workspace-db-adapter';
import { openWorkspaceDatabase } from './workspace-db-adapter';
const db$Map = new Map<string, Observable<WorkspaceSQLiteDB>>();
// export for testing
export const db$Map = new Map<string, Observable<WorkspaceSQLiteDB>>();
// use defer to prevent `app` is undefined while running tests
const beforeQuit$ = defer(() => fromEvent(app, 'before-quit'));
// return a stream that emit a single event when the subject completes
function completed<T>(subject: Subject<T>) {
return new Observable(subscriber => {
const sub = subject.subscribe({
complete: () => {
subscriber.next();
subscriber.complete();
},
});
return () => sub.unsubscribe();
});
}
function getWorkspaceDB$(id: string) {
if (!db$Map.has(id)) {
db$Map.set(
id,
from(openWorkspaceDatabase(appContext, id)).pipe(
shareReplay(1),
switchMap(db => {
return startPollingSecondaryDB(db).pipe(
ignoreElements(),
startWith(db),
takeUntil(beforeQuit$),
tap({
complete: () => {
logger.info('[ensureSQLiteDB] close db connection');
db.destroy();
db$Map.delete(id);
},
})
);
tap({
next: db => {
logger.info(
'[ensureSQLiteDB] db connection established',
db.workspaceId
);
},
}),
switchMap(db =>
// takeUntil the polling stream, and then destroy the db
concat(
startPollingSecondaryDB(db).pipe(
ignoreElements(),
startWith(db),
takeUntil(merge(beforeQuit$, completed(db.update$))),
last(),
tap({
next() {
logger.info(
'[ensureSQLiteDB] polling secondary db complete',
db.workspaceId
);
},
})
),
defer(async () => {
try {
await db.destroy();
db$Map.delete(id);
logger.info(
'[ensureSQLiteDB] db connection destroyed',
db.workspaceId
);
return db;
} catch (err) {
logger.error('[ensureSQLiteDB] destroy db failed', err);
throw err;
}
})
).pipe(startWith(db))
),
shareReplay(1)
)
);
@@ -60,51 +104,43 @@ function getWorkspaceDB$(id: string) {
}
function startPollingSecondaryDB(db: WorkspaceSQLiteDB) {
const meta$ = getWorkspaceMeta$(db.workspaceId);
const secondaryDB$ = meta$.pipe(
return merge(
getWorkspaceMeta(appContext, db.workspaceId),
workspaceSubjects.meta.pipe(
map(({ meta }) => meta),
filter(meta => meta.id === db.workspaceId)
)
).pipe(
map(meta => meta?.secondaryDBPath),
distinctUntilChanged(),
filter((p): p is string => !!p),
distinctUntilChanged(),
switchMap(path => {
return new Observable<SecondaryWorkspaceSQLiteDB>(observer => {
const secondaryDB = new SecondaryWorkspaceSQLiteDB(path, db);
observer.next(secondaryDB);
return () => {
logger.info(
'[ensureSQLiteDB] close secondary db connection',
secondaryDB.path
);
secondaryDB.destroy();
};
// on secondary db path change, destroy the old db and create a new one
const secondaryDB = new SecondaryWorkspaceSQLiteDB(path, db);
return new Observable<SecondaryWorkspaceSQLiteDB>(subscriber => {
subscriber.next(secondaryDB);
return () => secondaryDB.destroy();
});
}),
takeUntil(db.update$.pipe(last())),
shareReplay(1)
switchMap(secondaryDB => {
return interval(300000).pipe(
startWith(0),
tap({
next: () => {
secondaryDB.pull();
},
error: err => {
logger.error(`[ensureSQLiteDB] polling secondary db error`, err);
},
complete: () => {
logger.info('[ensureSQLiteDB] polling secondary db complete');
},
})
);
})
);
const firstDelayedTick$ = defer(() => {
return new Promise<number>(resolve =>
setTimeout(() => {
resolve(0);
}, 1000)
);
});
// pull every 30 seconds
const poll$ = merge(firstDelayedTick$, interval(30000)).pipe(
switchMap(() => secondaryDB$),
tap({
next: secondaryDB => {
secondaryDB.pull();
},
}),
takeUntil(db.update$.pipe(last())),
shareReplay(1)
);
return poll$;
}
export function ensureSQLiteDB(id: string) {
return firstValueFrom(getWorkspaceDB$(id));
return lastValueFrom(getWorkspaceDB$(id).pipe(take(1)));
}

View File

@@ -1,38 +0,0 @@
import type { Database } from 'better-sqlite3';
import sqlite from 'better-sqlite3';
import { logger } from '../logger';
export function isValidateDB(db: Database) {
// check if db has two tables, one for updates and one for blobs
const statement = db.prepare(
`SELECT name FROM sqlite_schema WHERE type='table'`
);
const rows = statement.all() as { name: string }[];
const tableNames = rows.map(row => row.name);
if (!tableNames.includes('updates') || !tableNames.includes('blobs')) {
return false;
}
}
export function isValidDBFile(path: string) {
let db: Database | null = null;
try {
db = sqlite(path);
// check if db has two tables, one for updates and one for blobs
const statement = db.prepare(
`SELECT name FROM sqlite_schema WHERE type='table'`
);
const rows = statement.all() as { name: string }[];
const tableNames = rows.map(row => row.name);
if (!tableNames.includes('updates') || !tableNames.includes('blobs')) {
return false;
}
return true;
} catch (error) {
logger.error('isValidDBFile', error);
return false;
} finally {
db?.close();
}
}

View File

@@ -1,3 +1,6 @@
import assert from 'node:assert';
import type { SqliteConnection } from '@affine/native';
import { debounce } from 'lodash-es';
import * as Y from 'yjs';
@@ -30,16 +33,12 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
logger.debug('[SecondaryWorkspaceSQLiteDB] created', this.workspaceId);
}
close() {
this.db?.close();
this.db = null;
}
override destroy() {
this.flushUpdateQueue();
override async destroy() {
const { db } = this;
await this.flushUpdateQueue(db);
this.unsubscribers.forEach(unsub => unsub());
this.db?.close();
this.yDoc.destroy();
await super.destroy();
}
get workspaceId() {
@@ -48,12 +47,15 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
// do not update db immediately, instead, push to a queue
// and flush the queue in a future time
addUpdateToUpdateQueue(update: Uint8Array) {
async addUpdateToUpdateQueue(db: SqliteConnection, update: Uint8Array) {
this.updateQueue.push(update);
this.debouncedFlush();
await this.debouncedFlush(db);
}
flushUpdateQueue() {
async flushUpdateQueue(db = this.db) {
if (!db) {
return; // skip if db is not connected
}
logger.debug(
'flushUpdateQueue',
this.workspaceId,
@@ -62,9 +64,8 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
);
const updates = [...this.updateQueue];
this.updateQueue = [];
this.connect();
this.addUpdateToSQLite(updates);
this.close();
await db.connect();
await this.addUpdateToSQLite(db, updates);
}
// flush after 5s, but will not wait for more than 10s
@@ -75,29 +76,31 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
runCounter = 0;
// wrap the fn with connect and close
// it only works for sync functions
run = <T extends (...args: any[]) => any>(fn: T) => {
async run<T extends (...args: any[]) => any>(
fn: T
): Promise<
(T extends (...args: any[]) => infer U ? Awaited<U> : unknown) | undefined
> {
try {
if (this.runCounter === 0) {
this.connect();
}
await this.connectIfNeeded();
this.runCounter++;
return fn();
return await fn();
} catch (err) {
logger.error(err);
} finally {
this.runCounter--;
if (this.runCounter === 0) {
this.close();
await super.destroy();
}
}
};
}
setupAndListen() {
if (this.firstConnected) {
return;
}
this.firstConnected = true;
const { db } = this;
const onUpstreamUpdate = (update: Uint8Array, origin: YOrigin) => {
if (origin === 'renderer') {
@@ -109,7 +112,7 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
const onSelfUpdate = (update: Uint8Array, origin: YOrigin) => {
// for self update from upstream, we need to push it to external DB
if (origin === 'upstream') {
this.addUpdateToUpdateQueue(update);
this.addUpdateToUpdateQueue(db!, update);
}
if (origin === 'self') {
@@ -126,13 +129,11 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
this.yDoc.off('update', onSelfUpdate);
});
this.run(() => {
this.run(async () => {
// apply all updates from upstream
const upstreamUpdate = this.upstream.getDocAsUpdates();
// to initialize the yDoc, we need to apply all updates from the db
this.applyUpdate(upstreamUpdate, 'upstream');
this.pull();
});
}
@@ -141,17 +142,17 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
};
// TODO: have a better solution to handle blobs
syncBlobs() {
this.run(() => {
async syncBlobs() {
await this.run(async () => {
// pull blobs
const blobsKeys = this.getBlobKeys();
const upstreamBlobsKeys = this.upstream.getBlobKeys();
const blobsKeys = await this.getBlobKeys();
const upstreamBlobsKeys = await this.upstream.getBlobKeys();
// put every missing blob to upstream
for (const key of blobsKeys) {
if (!upstreamBlobsKeys.includes(key)) {
const blob = this.getBlob(key);
const blob = await this.getBlob(key);
if (blob) {
this.upstream.addBlob(key, blob);
await this.upstream.addBlob(key, blob);
logger.debug('syncBlobs', this.workspaceId, key);
}
}
@@ -170,12 +171,17 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
*/
async pull() {
const start = performance.now();
const updates = this.run(() => {
assert(this.upstream.db, 'upstream db should be connected');
const updates = await this.run(async () => {
// TODO: no need to get all updates, just get the latest ones (using a cursor, etc)?
this.syncBlobs();
return this.getUpdates().map(update => update.data);
await this.syncBlobs();
return (await this.getUpdates()).map(update => update.data);
});
if (!updates) {
return;
}
const merged = await mergeUpdateWorker(updates);
this.applyUpdate(merged, 'self');

View File

@@ -1,4 +1,4 @@
import type { Database } from 'better-sqlite3';
import type { SqliteConnection } from '@affine/native';
import { Subject } from 'rxjs';
import * as Y from 'yjs';
@@ -21,38 +21,38 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
super(path);
}
override destroy() {
this.db?.close();
this.db = null;
override async destroy() {
await super.destroy();
this.yDoc.destroy();
// when db is closed, we can safely remove it from ensure-db list
this.update$.complete();
this.firstConnected = false;
}
getWorkspaceName = () => {
return this.yDoc.getMap('space:meta').get('name') as string;
};
async init(): Promise<Database | undefined> {
const db = super.connect();
async init() {
const db = await super.connectIfNeeded();
if (!this.firstConnected) {
this.yDoc.on('update', (update: Uint8Array, origin: YOrigin) => {
this.yDoc.on('update', async (update: Uint8Array, origin: YOrigin) => {
if (origin === 'renderer') {
this.addUpdateToSQLite([update]);
await this.addUpdateToSQLite(db, [update]);
} else if (origin === 'external') {
this.addUpdateToSQLite([update]);
logger.debug('external update', this.workspaceId);
dbSubjects.externalUpdate.next({
workspaceId: this.workspaceId,
update,
});
await this.addUpdateToSQLite(db, [update]);
logger.debug('external update', this.workspaceId);
}
});
}
const updates = this.getUpdates();
const updates = await this.getUpdates();
const merged = await mergeUpdateWorker(updates.map(update => update.data));
// to initialize the yDoc, we need to apply all updates from the db
@@ -78,19 +78,19 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
Y.applyUpdate(this.yDoc, data, origin);
};
override addBlob(key: string, value: Uint8Array) {
const res = super.addBlob(key, value);
override async addBlob(key: string, value: Uint8Array) {
const res = await super.addBlob(key, value);
this.update$.next();
return res;
}
override deleteBlob(key: string) {
override async deleteBlob(key: string) {
super.deleteBlob(key);
this.update$.next();
}
override addUpdateToSQLite(data: Uint8Array[]) {
super.addUpdateToSQLite(data);
override async addUpdateToSQLite(db: SqliteConnection, data: Uint8Array[]) {
super.addUpdateToSQLite(db, data);
this.update$.next();
}
}
@@ -102,5 +102,6 @@ export async function openWorkspaceDatabase(
const meta = await getWorkspaceMeta(context, workspaceId);
const db = new WorkspaceSQLiteDB(meta.mainDBPath, workspaceId);
await db.init();
logger.info(`openWorkspaceDatabase [${workspaceId}]`);
return db;
}

View File

@@ -7,7 +7,6 @@ import { nanoid } from 'nanoid';
import { appContext } from '../context';
import { ensureSQLiteDB } from '../db/ensure-db';
import { isValidDBFile } from '../db/helper';
import type { WorkspaceSQLiteDB } from '../db/workspace-db-adapter';
import { logger } from '../logger';
import {
@@ -208,7 +207,9 @@ export async function loadDBFile(): Promise<LoadDBFileResult> {
return { error: 'DB_FILE_ALREADY_LOADED' };
}
if (!isValidDBFile(filePath)) {
const { SqliteConnection } = await import('@affine/native');
if (!(await SqliteConnection.validate(filePath))) {
// TODO: report invalid db file error?
return { error: 'DB_FILE_INVALID' }; // invalid db file
}
@@ -305,7 +306,9 @@ export async function moveDBFile(
// remove the old db file, but we don't care if it fails
if (meta.secondaryDBPath) {
fs.remove(meta.secondaryDBPath);
fs.remove(meta.secondaryDBPath).catch(err => {
logger.error(`[moveDBFile] remove ${meta.secondaryDBPath} failed`, err);
});
}
// update meta

View File

@@ -172,37 +172,3 @@ test('storeWorkspaceMeta', async () => {
secondaryDBPath: path.join(tmpDir, 'test.db'),
});
});
test('getWorkspaceMeta observable', async () => {
const { storeWorkspaceMeta } = await import('../handlers');
const { getWorkspaceMeta$ } = await import('../index');
const workspaceId = v4();
const workspacePath = path.join(
testAppContext.appDataPath,
'workspaces',
workspaceId
);
const metaChange = vi.fn();
const meta$ = getWorkspaceMeta$(workspaceId);
meta$.subscribe(metaChange);
await new Promise(resolve => setTimeout(resolve, 100));
expect(metaChange).toHaveBeenCalledWith({
id: workspaceId,
mainDBPath: path.join(workspacePath, 'storage.db'),
});
await storeWorkspaceMeta(testAppContext, workspaceId, {
secondaryDBPath: path.join(tmpDir, 'test.db'),
});
expect(metaChange).toHaveBeenCalledWith({
id: workspaceId,
mainDBPath: path.join(workspacePath, 'storage.db'),
secondaryDBPath: path.join(tmpDir, 'test.db'),
});
});

View File

@@ -41,7 +41,7 @@ export async function deleteWorkspace(context: AppContext, id: string) {
);
try {
const db = await ensureSQLiteDB(id);
db.destroy();
await db.destroy();
return await fs.move(basePath, movedPath, {
overwrite: true,
});

View File

@@ -1,6 +1,3 @@
import { merge } from 'rxjs';
import { filter, map } from 'rxjs/operators';
import { appContext } from '../context';
import type {
MainEventListener,
@@ -31,14 +28,3 @@ export const workspaceHandlers = {
return getWorkspaceMeta(appContext, id);
},
} satisfies NamespaceHandlers;
// used internally. Get a stream of workspace id -> meta
export const getWorkspaceMeta$ = (workspaceId: string) => {
return merge(
getWorkspaceMeta(appContext, workspaceId),
workspaceSubjects.meta.pipe(
map(meta => meta.meta),
filter(meta => meta.id === workspaceId)
)
);
};

View File

@@ -10,15 +10,13 @@
"description": "AFFiNE App",
"homepage": "https://github.com/toeverything/AFFiNE",
"scripts": {
"dev": "yarn electron-rebuild && yarn cross-env DEV_SERVER_URL=http://localhost:8080 node scripts/dev.mjs",
"watch": "yarn electron-rebuild && yarn cross-env DEV_SERVER_URL=http://localhost:8080 node scripts/dev.mjs --watch",
"prod": "yarn electron-rebuild && yarn node scripts/dev.mjs",
"dev": "yarn cross-env DEV_SERVER_URL=http://localhost:8080 node scripts/dev.mjs",
"watch": "yarn cross-env DEV_SERVER_URL=http://localhost:8080 node scripts/dev.mjs --watch",
"prod": "yarn node scripts/dev.mjs",
"build-layers": "zx scripts/build-layers.mjs",
"generate-assets": "zx scripts/generate-assets.mjs",
"package": "electron-forge package",
"make": "electron-forge make",
"rebuild:for-unit-test": "yarn rebuild better-sqlite3",
"rebuild:for-electron": "yarn electron-rebuild",
"test": "playwright test"
},
"config": {
@@ -36,9 +34,7 @@
"@electron-forge/maker-squirrel": "^6.1.1",
"@electron-forge/maker-zip": "^6.1.1",
"@electron-forge/shared-types": "^6.1.1",
"@electron/rebuild": "^3.2.13",
"@electron/remote": "2.0.9",
"@types/better-sqlite3": "^7.6.4",
"@types/fs-extra": "^11.0.1",
"@types/uuid": "^9.0.1",
"cross-env": "7.0.3",
@@ -55,8 +51,7 @@
"zx": "^7.2.2"
},
"dependencies": {
"better-sqlite3": "^8.4.0",
"chokidar": "^3.5.3",
"cheerio": "^1.0.0-rc.12",
"electron-updater": "^5.3.0",
"lodash-es": "^4.17.21",
"nanoid": "^4.0.2",

View File

@@ -46,7 +46,7 @@ export const config = () => {
bundle: true,
target: `node${NODE_MAJOR_VERSION}`,
platform: 'node',
external: ['electron', 'yjs', 'better-sqlite3', 'electron-updater'],
external: ['electron', 'yjs', 'electron-updater'],
define: define,
format: 'cjs',
loader: {