mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-12 12:28:42 +00:00
refactor(electron): cleanup secondary db logic (#6710)
This commit is contained in:
@@ -1,145 +1,38 @@
|
||||
import type { Subject } from 'rxjs';
|
||||
import {
|
||||
concat,
|
||||
defer,
|
||||
from,
|
||||
fromEvent,
|
||||
interval,
|
||||
lastValueFrom,
|
||||
merge,
|
||||
Observable,
|
||||
} from 'rxjs';
|
||||
import {
|
||||
concatMap,
|
||||
distinctUntilChanged,
|
||||
filter,
|
||||
ignoreElements,
|
||||
last,
|
||||
map,
|
||||
shareReplay,
|
||||
startWith,
|
||||
switchMap,
|
||||
take,
|
||||
takeUntil,
|
||||
tap,
|
||||
} from 'rxjs/operators';
|
||||
|
||||
import { logger } from '../logger';
|
||||
import { getWorkspaceMeta } from '../workspace/meta';
|
||||
import { workspaceSubjects } from '../workspace/subjects';
|
||||
import { SecondaryWorkspaceSQLiteDB } from './secondary-db';
|
||||
import type { WorkspaceSQLiteDB } from './workspace-db-adapter';
|
||||
import { openWorkspaceDatabase } from './workspace-db-adapter';
|
||||
|
||||
// export for testing
|
||||
export const db$Map = new Map<string, Observable<WorkspaceSQLiteDB>>();
|
||||
export const db$Map = new Map<string, Promise<WorkspaceSQLiteDB>>();
|
||||
|
||||
// use defer to prevent `app` is undefined while running tests
|
||||
const beforeQuit$ = defer(() => fromEvent(process, 'beforeExit'));
|
||||
|
||||
// return a stream that emit a single event when the subject completes
|
||||
function completed<T>(subject$: Subject<T>) {
|
||||
return new Observable(subscriber => {
|
||||
const sub = subject$.subscribe({
|
||||
complete: () => {
|
||||
subscriber.next();
|
||||
subscriber.complete();
|
||||
},
|
||||
});
|
||||
return () => sub.unsubscribe();
|
||||
});
|
||||
}
|
||||
|
||||
function getWorkspaceDB(id: string) {
|
||||
async function getWorkspaceDB(id: string) {
|
||||
let db = await db$Map.get(id);
|
||||
if (!db$Map.has(id)) {
|
||||
db$Map.set(
|
||||
id,
|
||||
from(openWorkspaceDatabase(id)).pipe(
|
||||
tap({
|
||||
next: db => {
|
||||
logger.info(
|
||||
'[ensureSQLiteDB] db connection established',
|
||||
db.workspaceId
|
||||
);
|
||||
},
|
||||
}),
|
||||
switchMap(db =>
|
||||
// takeUntil the polling stream, and then destroy the db
|
||||
concat(
|
||||
startPollingSecondaryDB(db).pipe(
|
||||
ignoreElements(),
|
||||
startWith(db),
|
||||
takeUntil(merge(beforeQuit$, completed(db.update$))),
|
||||
last(),
|
||||
tap({
|
||||
next() {
|
||||
logger.info(
|
||||
'[ensureSQLiteDB] polling secondary db complete',
|
||||
db.workspaceId
|
||||
);
|
||||
},
|
||||
})
|
||||
),
|
||||
defer(async () => {
|
||||
try {
|
||||
await db.destroy();
|
||||
db$Map.delete(id);
|
||||
return db;
|
||||
} catch (err) {
|
||||
logger.error('[ensureSQLiteDB] destroy db failed', err);
|
||||
throw err;
|
||||
}
|
||||
})
|
||||
).pipe(startWith(db))
|
||||
),
|
||||
shareReplay(1)
|
||||
)
|
||||
);
|
||||
const promise = openWorkspaceDatabase(id);
|
||||
db$Map.set(id, promise);
|
||||
const _db = (db = await promise);
|
||||
const cleanup = () => {
|
||||
db$Map.delete(id);
|
||||
_db
|
||||
.destroy()
|
||||
.then(() => {
|
||||
logger.info('[ensureSQLiteDB] db connection closed', _db.workspaceId);
|
||||
})
|
||||
.catch(err => {
|
||||
logger.error('[ensureSQLiteDB] destroy db failed', err);
|
||||
});
|
||||
};
|
||||
|
||||
db.update$.subscribe({
|
||||
complete: cleanup,
|
||||
});
|
||||
|
||||
process.on('beforeExit', cleanup);
|
||||
}
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
return db$Map.get(id)!;
|
||||
}
|
||||
|
||||
function startPollingSecondaryDB(db: WorkspaceSQLiteDB) {
|
||||
return merge(
|
||||
getWorkspaceMeta(db.workspaceId),
|
||||
workspaceSubjects.meta$.pipe(
|
||||
map(({ meta }) => meta),
|
||||
filter(meta => meta.id === db.workspaceId)
|
||||
)
|
||||
).pipe(
|
||||
map(meta => meta?.secondaryDBPath),
|
||||
filter((p): p is string => !!p),
|
||||
distinctUntilChanged(),
|
||||
switchMap(path => {
|
||||
// on secondary db path change, destroy the old db and create a new one
|
||||
const secondaryDB = new SecondaryWorkspaceSQLiteDB(path, db);
|
||||
return new Observable<SecondaryWorkspaceSQLiteDB>(subscriber => {
|
||||
subscriber.next(secondaryDB);
|
||||
return () => {
|
||||
secondaryDB.destroy().catch(err => {
|
||||
subscriber.error(err);
|
||||
});
|
||||
};
|
||||
});
|
||||
}),
|
||||
switchMap(secondaryDB => {
|
||||
return interval(300000).pipe(
|
||||
startWith(0),
|
||||
concatMap(() => secondaryDB.pull()),
|
||||
tap({
|
||||
error: err => {
|
||||
logger.error(`[ensureSQLiteDB] polling secondary db error`, err);
|
||||
},
|
||||
complete: () => {
|
||||
logger.info('[ensureSQLiteDB] polling secondary db complete');
|
||||
},
|
||||
})
|
||||
);
|
||||
})
|
||||
);
|
||||
return db!;
|
||||
}
|
||||
|
||||
export function ensureSQLiteDB(id: string) {
|
||||
return lastValueFrom(getWorkspaceDB(id).pipe(take(1)));
|
||||
return getWorkspaceDB(id);
|
||||
}
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
import { mainRPC } from '../main-rpc';
|
||||
import type { MainEventRegister } from '../type';
|
||||
import { ensureSQLiteDB } from './ensure-db';
|
||||
import { dbSubjects } from './subjects';
|
||||
|
||||
export * from './ensure-db';
|
||||
export * from './subjects';
|
||||
|
||||
export const dbHandlers = {
|
||||
getDocAsUpdates: async (workspaceId: string, subdocId?: string) => {
|
||||
@@ -17,7 +15,12 @@ export const dbHandlers = {
|
||||
subdocId?: string
|
||||
) => {
|
||||
const workspaceDB = await ensureSQLiteDB(workspaceId);
|
||||
return workspaceDB.applyUpdate(update, 'renderer', subdocId);
|
||||
return workspaceDB.addUpdateToSQLite([
|
||||
{
|
||||
data: update,
|
||||
docId: subdocId,
|
||||
},
|
||||
]);
|
||||
},
|
||||
addBlob: async (workspaceId: string, key: string, data: Uint8Array) => {
|
||||
const workspaceDB = await ensureSQLiteDB(workspaceId);
|
||||
@@ -40,17 +43,4 @@ export const dbHandlers = {
|
||||
},
|
||||
};
|
||||
|
||||
export const dbEvents = {
|
||||
onExternalUpdate: (
|
||||
fn: (update: {
|
||||
workspaceId: string;
|
||||
update: Uint8Array;
|
||||
docId?: string;
|
||||
}) => void
|
||||
) => {
|
||||
const sub = dbSubjects.externalUpdate$.subscribe(fn);
|
||||
return () => {
|
||||
sub.unsubscribe();
|
||||
};
|
||||
},
|
||||
} satisfies Record<string, MainEventRegister>;
|
||||
export const dbEvents = {} satisfies Record<string, MainEventRegister>;
|
||||
|
||||
@@ -1,304 +0,0 @@
|
||||
import assert from 'node:assert';
|
||||
|
||||
import type { InsertRow } from '@affine/native';
|
||||
import { debounce } from 'lodash-es';
|
||||
import { applyUpdate, Doc as YDoc } from 'yjs';
|
||||
|
||||
import { logger } from '../logger';
|
||||
import type { YOrigin } from '../type';
|
||||
import { getWorkspaceMeta } from '../workspace/meta';
|
||||
import { BaseSQLiteAdapter } from './base-db-adapter';
|
||||
import type { WorkspaceSQLiteDB } from './workspace-db-adapter';
|
||||
|
||||
const FLUSH_WAIT_TIME = 5000;
|
||||
const FLUSH_MAX_WAIT_TIME = 10000;
|
||||
|
||||
// todo: trim db when it is too big
|
||||
export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
role = 'secondary';
|
||||
yDoc = new YDoc();
|
||||
firstConnected = false;
|
||||
destroyed = false;
|
||||
|
||||
updateQueue: { data: Uint8Array; docId?: string }[] = [];
|
||||
|
||||
unsubscribers = new Set<() => void>();
|
||||
|
||||
constructor(
|
||||
public override path: string,
|
||||
public upstream: WorkspaceSQLiteDB
|
||||
) {
|
||||
super(path);
|
||||
this.init();
|
||||
logger.debug('[SecondaryWorkspaceSQLiteDB] created', this.workspaceId);
|
||||
}
|
||||
|
||||
getDoc(docId?: string) {
|
||||
if (!docId) {
|
||||
return this.yDoc;
|
||||
}
|
||||
// this should be pretty fast and we don't need to cache it
|
||||
for (const subdoc of this.yDoc.subdocs) {
|
||||
if (subdoc.guid === docId) {
|
||||
return subdoc;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
override async destroy() {
|
||||
await this.flushUpdateQueue();
|
||||
this.unsubscribers.forEach(unsub => unsub());
|
||||
this.yDoc.destroy();
|
||||
await super.destroy();
|
||||
this.destroyed = true;
|
||||
}
|
||||
|
||||
get workspaceId() {
|
||||
return this.upstream.workspaceId;
|
||||
}
|
||||
|
||||
// do not update db immediately, instead, push to a queue
|
||||
// and flush the queue in a future time
|
||||
async addUpdateToUpdateQueue(update: InsertRow) {
|
||||
this.updateQueue.push(update);
|
||||
await this.debouncedFlush();
|
||||
}
|
||||
|
||||
async flushUpdateQueue() {
|
||||
if (this.destroyed) {
|
||||
return;
|
||||
}
|
||||
logger.debug(
|
||||
'flushUpdateQueue',
|
||||
this.workspaceId,
|
||||
'queue',
|
||||
this.updateQueue.length
|
||||
);
|
||||
const updates = [...this.updateQueue];
|
||||
this.updateQueue = [];
|
||||
await this.run(async () => {
|
||||
await this.addUpdateToSQLite(updates);
|
||||
});
|
||||
}
|
||||
|
||||
// flush after 5s, but will not wait for more than 10s
|
||||
debouncedFlush = debounce(this.flushUpdateQueue, FLUSH_WAIT_TIME, {
|
||||
maxWait: FLUSH_MAX_WAIT_TIME,
|
||||
});
|
||||
|
||||
runCounter = 0;
|
||||
|
||||
// wrap the fn with connect and close
|
||||
async run<T extends (...args: any[]) => any>(
|
||||
fn: T
|
||||
): Promise<
|
||||
(T extends (...args: any[]) => infer U ? Awaited<U> : unknown) | undefined
|
||||
> {
|
||||
try {
|
||||
if (this.destroyed) {
|
||||
return;
|
||||
}
|
||||
await this.connectIfNeeded();
|
||||
this.runCounter++;
|
||||
return await fn();
|
||||
} catch (err) {
|
||||
logger.error(err);
|
||||
throw err;
|
||||
} finally {
|
||||
this.runCounter--;
|
||||
if (this.runCounter === 0) {
|
||||
// just close db, but not the yDoc
|
||||
await super.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
setupListener(docId?: string) {
|
||||
logger.debug(
|
||||
'SecondaryWorkspaceSQLiteDB:setupListener',
|
||||
this.workspaceId,
|
||||
docId
|
||||
);
|
||||
const doc = this.getDoc(docId);
|
||||
const upstreamDoc = this.upstream.getDoc(docId);
|
||||
if (!doc || !upstreamDoc) {
|
||||
logger.warn(
|
||||
'[SecondaryWorkspaceSQLiteDB] setupListener: doc not found',
|
||||
docId
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const onUpstreamUpdate = (update: Uint8Array, origin: YOrigin) => {
|
||||
logger.debug(
|
||||
'SecondaryWorkspaceSQLiteDB:onUpstreamUpdate',
|
||||
origin,
|
||||
this.workspaceId,
|
||||
docId,
|
||||
update.length
|
||||
);
|
||||
if (origin === 'renderer' || origin === 'self') {
|
||||
// update to upstream yDoc should be replicated to self yDoc
|
||||
this.applyUpdate(update, 'upstream', docId);
|
||||
}
|
||||
};
|
||||
|
||||
const onSelfUpdate = async (update: Uint8Array, origin: YOrigin) => {
|
||||
logger.debug(
|
||||
'SecondaryWorkspaceSQLiteDB:onSelfUpdate',
|
||||
origin,
|
||||
this.workspaceId,
|
||||
docId,
|
||||
update.length
|
||||
);
|
||||
// for self update from upstream, we need to push it to external DB
|
||||
if (origin === 'upstream') {
|
||||
await this.addUpdateToUpdateQueue({
|
||||
data: update,
|
||||
docId,
|
||||
});
|
||||
}
|
||||
|
||||
if (origin === 'self') {
|
||||
this.upstream.applyUpdate(update, 'external', docId);
|
||||
}
|
||||
};
|
||||
|
||||
const onSubdocs = ({ added }: { added: Set<YDoc> }) => {
|
||||
added.forEach(subdoc => {
|
||||
this.setupListener(subdoc.guid);
|
||||
});
|
||||
};
|
||||
|
||||
doc.subdocs.forEach(subdoc => {
|
||||
this.setupListener(subdoc.guid);
|
||||
});
|
||||
|
||||
// listen to upstream update
|
||||
this.upstream.yDoc.on('update', onUpstreamUpdate);
|
||||
doc.on('update', (update, origin) => {
|
||||
onSelfUpdate(update, origin).catch(err => {
|
||||
logger.error(err);
|
||||
});
|
||||
});
|
||||
doc.on('subdocs', onSubdocs);
|
||||
|
||||
this.unsubscribers.add(() => {
|
||||
this.upstream.yDoc.off('update', onUpstreamUpdate);
|
||||
doc.off('update', (update, origin) => {
|
||||
onSelfUpdate(update, origin).catch(err => {
|
||||
logger.error(err);
|
||||
});
|
||||
});
|
||||
doc.off('subdocs', onSubdocs);
|
||||
});
|
||||
}
|
||||
|
||||
init() {
|
||||
if (this.firstConnected) {
|
||||
return;
|
||||
}
|
||||
this.firstConnected = true;
|
||||
this.setupListener();
|
||||
// apply all updates from upstream
|
||||
// we assume here that the upstream ydoc is already sync'ed
|
||||
const syncUpstreamDoc = (docId?: string) => {
|
||||
const update = this.upstream.getDocAsUpdates(docId);
|
||||
if (update) {
|
||||
this.applyUpdate(update, 'upstream');
|
||||
}
|
||||
};
|
||||
syncUpstreamDoc();
|
||||
this.upstream.yDoc.subdocs.forEach(subdoc => {
|
||||
syncUpstreamDoc(subdoc.guid);
|
||||
});
|
||||
}
|
||||
|
||||
applyUpdate = (
|
||||
data: Uint8Array,
|
||||
origin: YOrigin = 'upstream',
|
||||
docId?: string
|
||||
) => {
|
||||
const doc = this.getDoc(docId);
|
||||
if (doc) {
|
||||
applyUpdate(this.yDoc, data, origin);
|
||||
} else {
|
||||
logger.warn(
|
||||
'[SecondaryWorkspaceSQLiteDB] applyUpdate: doc not found',
|
||||
docId
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: have a better solution to handle blobs
|
||||
async syncBlobs() {
|
||||
await this.run(async () => {
|
||||
// skip if upstream db is not connected (maybe it is already closed)
|
||||
const blobsKeys = await this.getBlobKeys();
|
||||
if (!this.upstream.db || this.upstream.db?.isClose) {
|
||||
return;
|
||||
}
|
||||
const upstreamBlobsKeys = await this.upstream.getBlobKeys();
|
||||
// put every missing blob to upstream
|
||||
for (const key of blobsKeys) {
|
||||
if (!upstreamBlobsKeys.includes(key)) {
|
||||
const blob = await this.getBlob(key);
|
||||
if (blob) {
|
||||
await this.upstream.addBlob(key, blob);
|
||||
logger.debug('syncBlobs', this.workspaceId, key);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* pull from external DB file and apply to embedded yDoc
|
||||
* workflow:
|
||||
* - connect to external db
|
||||
* - get updates
|
||||
* - apply updates to local yDoc
|
||||
* - get blobs and put new blobs to upstream
|
||||
* - disconnect
|
||||
*/
|
||||
async pull() {
|
||||
const start = performance.now();
|
||||
assert(this.upstream.db, 'upstream db should be connected');
|
||||
const rows = await this.run(async () => {
|
||||
// TODO: no need to get all updates, just get the latest ones (using a cursor, etc)?
|
||||
await this.syncBlobs();
|
||||
return await this.getAllUpdates();
|
||||
});
|
||||
|
||||
if (!rows || this.destroyed) {
|
||||
return;
|
||||
}
|
||||
|
||||
// apply root doc first
|
||||
rows.forEach(row => {
|
||||
if (!row.docId) {
|
||||
this.applyUpdate(row.data, 'self');
|
||||
}
|
||||
});
|
||||
|
||||
rows.forEach(row => {
|
||||
if (row.docId) {
|
||||
this.applyUpdate(row.data, 'self', row.docId);
|
||||
}
|
||||
});
|
||||
|
||||
logger.debug(
|
||||
'pull external updates',
|
||||
this.path,
|
||||
rows.length,
|
||||
(performance.now() - start).toFixed(2),
|
||||
'ms'
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export async function getSecondaryWorkspaceDBPath(workspaceId: string) {
|
||||
const meta = await getWorkspaceMeta(workspaceId);
|
||||
return meta?.secondaryDBPath;
|
||||
}
|
||||
@@ -1,9 +0,0 @@
|
||||
import { Subject } from 'rxjs';
|
||||
|
||||
export const dbSubjects = {
|
||||
externalUpdate$: new Subject<{
|
||||
workspaceId: string;
|
||||
update: Uint8Array;
|
||||
docId?: string;
|
||||
}>(),
|
||||
};
|
||||
@@ -1,20 +1,16 @@
|
||||
import type { InsertRow } from '@affine/native';
|
||||
import { debounce } from 'lodash-es';
|
||||
import { Subject } from 'rxjs';
|
||||
import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs';
|
||||
import { applyUpdate, Doc as YDoc } from 'yjs';
|
||||
|
||||
import { logger } from '../logger';
|
||||
import type { YOrigin } from '../type';
|
||||
import { getWorkspaceMeta } from '../workspace/meta';
|
||||
import { BaseSQLiteAdapter } from './base-db-adapter';
|
||||
import { dbSubjects } from './subjects';
|
||||
import { mergeUpdate } from './merge-update';
|
||||
|
||||
const TRIM_SIZE = 500;
|
||||
|
||||
export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
role = 'primary';
|
||||
yDoc = new YDoc();
|
||||
firstConnected = false;
|
||||
|
||||
update$ = new Subject<void>();
|
||||
|
||||
@@ -27,131 +23,30 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
|
||||
override async destroy() {
|
||||
await super.destroy();
|
||||
this.yDoc.destroy();
|
||||
|
||||
// when db is closed, we can safely remove it from ensure-db list
|
||||
this.update$.complete();
|
||||
this.firstConnected = false;
|
||||
}
|
||||
|
||||
getDoc(docId?: string) {
|
||||
if (!docId) {
|
||||
return this.yDoc;
|
||||
}
|
||||
// this should be pretty fast and we don't need to cache it
|
||||
for (const subdoc of this.yDoc.subdocs) {
|
||||
if (subdoc.guid === docId) {
|
||||
return subdoc;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
getWorkspaceName = () => {
|
||||
return this.yDoc.getMap('meta').get('name') as string;
|
||||
getWorkspaceName = async () => {
|
||||
const ydoc = new YDoc();
|
||||
const updates = await this.getUpdates();
|
||||
updates.forEach(update => {
|
||||
applyUpdate(ydoc, update.data);
|
||||
});
|
||||
return ydoc.getMap('meta').get('name') as string;
|
||||
};
|
||||
|
||||
setupListener(docId?: string) {
|
||||
logger.debug('WorkspaceSQLiteDB:setupListener', this.workspaceId, docId);
|
||||
const doc = this.getDoc(docId);
|
||||
if (doc) {
|
||||
const onUpdate = async (update: Uint8Array, origin: YOrigin) => {
|
||||
logger.debug(
|
||||
'WorkspaceSQLiteDB:onUpdate',
|
||||
this.workspaceId,
|
||||
docId,
|
||||
update.length
|
||||
);
|
||||
const insertRows = [{ data: update, docId }];
|
||||
if (origin === 'renderer') {
|
||||
await this.addUpdateToSQLite(insertRows);
|
||||
} else if (origin === 'external') {
|
||||
dbSubjects.externalUpdate$.next({
|
||||
workspaceId: this.workspaceId,
|
||||
update,
|
||||
docId,
|
||||
});
|
||||
await this.addUpdateToSQLite(insertRows);
|
||||
logger.debug('external update', this.workspaceId);
|
||||
}
|
||||
};
|
||||
doc.subdocs.forEach(subdoc => {
|
||||
this.setupListener(subdoc.guid);
|
||||
});
|
||||
const onSubdocs = ({ added }: { added: Set<YDoc> }) => {
|
||||
logger.info('onSubdocs', this.workspaceId, docId, added);
|
||||
added.forEach(subdoc => {
|
||||
this.setupListener(subdoc.guid);
|
||||
});
|
||||
};
|
||||
|
||||
doc.on('update', (update, origin) => {
|
||||
onUpdate(update, origin).catch(err => {
|
||||
logger.error(err);
|
||||
});
|
||||
});
|
||||
doc.on('subdocs', onSubdocs);
|
||||
} else {
|
||||
logger.error('setupListener: doc not found', docId);
|
||||
}
|
||||
}
|
||||
|
||||
async init() {
|
||||
const db = await super.connectIfNeeded();
|
||||
|
||||
if (!this.firstConnected) {
|
||||
this.setupListener();
|
||||
}
|
||||
|
||||
const updates = await this.getAllUpdates();
|
||||
|
||||
// apply root first (without ID).
|
||||
// subdoc will be available after root is applied
|
||||
updates.forEach(update => {
|
||||
if (!update.docId) {
|
||||
this.applyUpdate(update.data, 'self');
|
||||
}
|
||||
});
|
||||
|
||||
// then, for all subdocs, apply the updates
|
||||
updates.forEach(update => {
|
||||
if (update.docId) {
|
||||
this.applyUpdate(update.data, 'self', update.docId);
|
||||
}
|
||||
});
|
||||
|
||||
this.firstConnected = true;
|
||||
this.update$.next();
|
||||
|
||||
await this.tryTrim();
|
||||
return db;
|
||||
}
|
||||
|
||||
// unlike getUpdates, this will return updates in yDoc
|
||||
getDocAsUpdates = (docId?: string) => {
|
||||
const doc = docId ? this.getDoc(docId) : this.yDoc;
|
||||
if (doc) {
|
||||
return encodeStateAsUpdate(doc);
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
// non-blocking and use yDoc to validate the update
|
||||
// after that, the update is added to the db
|
||||
applyUpdate = (
|
||||
data: Uint8Array,
|
||||
origin: YOrigin = 'renderer',
|
||||
docId?: string
|
||||
) => {
|
||||
// todo: trim the updates when the number of records is too large
|
||||
// 1. store the current ydoc state in the db
|
||||
// 2. then delete the old updates
|
||||
// yjs-idb will always trim the db for the first time after DB is loaded
|
||||
const doc = this.getDoc(docId);
|
||||
if (doc) {
|
||||
applyUpdate(doc, data, origin);
|
||||
} else {
|
||||
logger.warn('[WorkspaceSQLiteDB] applyUpdate: doc not found', docId);
|
||||
}
|
||||
// getUpdates then encode
|
||||
getDocAsUpdates = async (docId?: string) => {
|
||||
const updates = await this.getUpdates(docId);
|
||||
return mergeUpdate(updates.map(row => row.data));
|
||||
};
|
||||
|
||||
override async addBlob(key: string, value: Uint8Array) {
|
||||
@@ -167,28 +62,21 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter {
|
||||
|
||||
override async addUpdateToSQLite(data: InsertRow[]) {
|
||||
this.update$.next();
|
||||
data.forEach(row => {
|
||||
this.trimWhenNecessary(row.docId)?.catch(err => {
|
||||
logger.error('trimWhenNecessary failed', err);
|
||||
});
|
||||
});
|
||||
await super.addUpdateToSQLite(data);
|
||||
}
|
||||
|
||||
trimWhenNecessary = debounce(async (docId?: string) => {
|
||||
if (this.firstConnected) {
|
||||
const count = (await this.db?.getUpdatesCount(docId)) ?? 0;
|
||||
if (count > TRIM_SIZE) {
|
||||
logger.debug(`trim ${this.workspaceId}:${docId} ${count}`);
|
||||
const update = this.getDocAsUpdates(docId);
|
||||
if (update) {
|
||||
const insertRows = [{ data: update, docId }];
|
||||
await this.db?.replaceUpdates(docId, insertRows);
|
||||
logger.debug(`trim ${this.workspaceId}:${docId} successfully`);
|
||||
}
|
||||
private readonly tryTrim = async (docId?: string) => {
|
||||
const count = (await this.db?.getUpdatesCount(docId)) ?? 0;
|
||||
if (count > TRIM_SIZE) {
|
||||
logger.debug(`trim ${this.workspaceId}:${docId} ${count}`);
|
||||
const update = await this.getDocAsUpdates(docId);
|
||||
if (update) {
|
||||
const insertRows = [{ data: update, docId }];
|
||||
await this.db?.replaceUpdates(docId, insertRows);
|
||||
logger.debug(`trim ${this.workspaceId}:${docId} successfully`);
|
||||
}
|
||||
}
|
||||
}, 1000);
|
||||
};
|
||||
}
|
||||
|
||||
export async function openWorkspaceDatabase(workspaceId: string) {
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import path from 'node:path';
|
||||
|
||||
import { ValidationResult } from '@affine/native';
|
||||
import { WorkspaceVersion } from '@toeverything/infra/blocksuite';
|
||||
import fs from 'fs-extra';
|
||||
@@ -11,10 +9,9 @@ import {
|
||||
migrateToLatest,
|
||||
migrateToSubdocAndReplaceDatabase,
|
||||
} from '../db/migration';
|
||||
import type { WorkspaceSQLiteDB } from '../db/workspace-db-adapter';
|
||||
import { logger } from '../logger';
|
||||
import { mainRPC } from '../main-rpc';
|
||||
import { listWorkspaces, storeWorkspaceMeta } from '../workspace';
|
||||
import { storeWorkspaceMeta } from '../workspace';
|
||||
import {
|
||||
getWorkspaceDBPath,
|
||||
getWorkspaceMeta,
|
||||
@@ -47,12 +44,6 @@ export interface SelectDBFileLocationResult {
|
||||
canceled?: boolean;
|
||||
}
|
||||
|
||||
export interface MoveDBFileResult {
|
||||
filePath?: string;
|
||||
error?: ErrorMessage;
|
||||
canceled?: boolean;
|
||||
}
|
||||
|
||||
// provide a backdoor to set dialog path for testing in playwright
|
||||
export interface FakeDialogResult {
|
||||
canceled?: boolean;
|
||||
@@ -68,7 +59,7 @@ export async function revealDBFile(workspaceId: string) {
|
||||
if (!meta) {
|
||||
return;
|
||||
}
|
||||
await mainRPC.showItemInFolder(meta.secondaryDBPath ?? meta.mainDBPath);
|
||||
await mainRPC.showItemInFolder(meta.mainDBPath);
|
||||
}
|
||||
|
||||
// result will be used in the next call to showOpenDialog
|
||||
@@ -120,7 +111,10 @@ export async function saveDBFileAs(
|
||||
name: '',
|
||||
},
|
||||
],
|
||||
defaultPath: getDefaultDBFileName(db.getWorkspaceName(), workspaceId),
|
||||
defaultPath: getDefaultDBFileName(
|
||||
await db.getWorkspaceName(),
|
||||
workspaceId
|
||||
),
|
||||
message: 'Save Workspace as a SQLite Database file',
|
||||
}));
|
||||
const filePath = ret.filePath;
|
||||
@@ -213,11 +207,6 @@ export async function loadDBFile(): Promise<LoadDBFileResult> {
|
||||
return { error: 'DB_FILE_PATH_INVALID' };
|
||||
}
|
||||
|
||||
if (await dbFileAlreadyLoaded(originalPath)) {
|
||||
logger.warn('loadDBFile: db file already loaded');
|
||||
return { error: 'DB_FILE_ALREADY_LOADED' };
|
||||
}
|
||||
|
||||
const { SqliteConnection } = await import('@affine/native');
|
||||
|
||||
const validationResult = await SqliteConnection.validate(originalPath);
|
||||
@@ -294,100 +283,3 @@ export async function loadDBFile(): Promise<LoadDBFileResult> {
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is called when the user clicks the "Move" button in the "Move Workspace Storage" setting.
|
||||
*
|
||||
* It will
|
||||
* - copy the source db file to a new location
|
||||
* - remove the old db external file
|
||||
* - update the external db file path in the workspace meta
|
||||
* - return the new file path
|
||||
*/
|
||||
export async function moveDBFile(
|
||||
workspaceId: string,
|
||||
dbFileDir?: string
|
||||
): Promise<MoveDBFileResult> {
|
||||
let db: WorkspaceSQLiteDB | null = null;
|
||||
try {
|
||||
db = await ensureSQLiteDB(workspaceId);
|
||||
const meta = await getWorkspaceMeta(workspaceId);
|
||||
|
||||
const oldDir = meta.secondaryDBPath
|
||||
? path.dirname(meta.secondaryDBPath)
|
||||
: null;
|
||||
const defaultDir = oldDir ?? (await mainRPC.getPath('documents'));
|
||||
|
||||
const newName = getDefaultDBFileName(db.getWorkspaceName(), workspaceId);
|
||||
|
||||
const newDirPath =
|
||||
dbFileDir ??
|
||||
(
|
||||
getFakedResult() ??
|
||||
(await mainRPC.showOpenDialog({
|
||||
properties: ['openDirectory'],
|
||||
title: 'Move Workspace Storage',
|
||||
buttonLabel: 'Move',
|
||||
defaultPath: defaultDir,
|
||||
message: 'Move Workspace storage file',
|
||||
}))
|
||||
).filePaths?.[0];
|
||||
|
||||
// skips if
|
||||
// - user canceled the dialog
|
||||
// - user selected the same dir
|
||||
if (!newDirPath || newDirPath === oldDir) {
|
||||
return {
|
||||
canceled: true,
|
||||
};
|
||||
}
|
||||
|
||||
const newFilePath = path.join(newDirPath, newName);
|
||||
|
||||
if (await fs.pathExists(newFilePath)) {
|
||||
return {
|
||||
error: 'FILE_ALREADY_EXISTS',
|
||||
};
|
||||
}
|
||||
|
||||
logger.info(`[moveDBFile] copy ${meta.mainDBPath} -> ${newFilePath}`);
|
||||
|
||||
await fs.copy(meta.mainDBPath, newFilePath);
|
||||
|
||||
// remove the old db file, but we don't care if it fails
|
||||
if (meta.secondaryDBPath) {
|
||||
await fs
|
||||
.remove(meta.secondaryDBPath)
|
||||
.then(() => {
|
||||
logger.info(`[moveDBFile] removed ${meta.secondaryDBPath}`);
|
||||
})
|
||||
.catch(err => {
|
||||
logger.error(
|
||||
`[moveDBFile] remove ${meta.secondaryDBPath} failed`,
|
||||
err
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
// update meta
|
||||
await storeWorkspaceMeta(workspaceId, {
|
||||
secondaryDBPath: newFilePath,
|
||||
});
|
||||
|
||||
return {
|
||||
filePath: newFilePath,
|
||||
};
|
||||
} catch (err) {
|
||||
await db?.destroy();
|
||||
logger.error('[moveDBFile]', err);
|
||||
return {
|
||||
error: 'UNKNOWN_ERROR',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async function dbFileAlreadyLoaded(path: string) {
|
||||
const meta = await listWorkspaces();
|
||||
const paths = meta.map(m => m[1].secondaryDBPath);
|
||||
return paths.includes(path);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import {
|
||||
loadDBFile,
|
||||
moveDBFile,
|
||||
revealDBFile,
|
||||
saveDBFileAs,
|
||||
selectDBFileLocation,
|
||||
@@ -17,9 +16,6 @@ export const dialogHandlers = {
|
||||
saveDBFileAs: async (workspaceId: string) => {
|
||||
return saveDBFileAs(workspaceId);
|
||||
},
|
||||
moveDBFile: (workspaceId: string, dbFileLocation?: string) => {
|
||||
return moveDBFile(workspaceId, dbFileLocation);
|
||||
},
|
||||
selectDBFileLocation: async () => {
|
||||
return selectDBFileLocation();
|
||||
},
|
||||
|
||||
@@ -12,7 +12,7 @@ function setupRendererConnection(rendererPort: Electron.MessagePortMain) {
|
||||
try {
|
||||
const start = performance.now();
|
||||
const result = await handler(...args);
|
||||
logger.info(
|
||||
logger.debug(
|
||||
'[async-api]',
|
||||
`${namespace}.${name}`,
|
||||
args.filter(
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
export interface WorkspaceMeta {
|
||||
id: string;
|
||||
mainDBPath: string;
|
||||
secondaryDBPath?: string; // assume there will be only one
|
||||
}
|
||||
|
||||
export type YOrigin = 'self' | 'external' | 'upstream' | 'renderer';
|
||||
|
||||
@@ -52,26 +52,12 @@ export async function getWorkspaceMeta(
|
||||
.then(() => true)
|
||||
.catch(() => false))
|
||||
) {
|
||||
// since not meta is found, we will migrate symlinked db file if needed
|
||||
await fs.ensureDir(basePath);
|
||||
const dbPath = await getWorkspaceDBPath(workspaceId);
|
||||
|
||||
// todo: remove this after migration (in stable version)
|
||||
const realDBPath = (await fs
|
||||
.access(dbPath)
|
||||
.then(() => true)
|
||||
.catch(() => false))
|
||||
? await fs.realpath(dbPath)
|
||||
: dbPath;
|
||||
const isLink = realDBPath !== dbPath;
|
||||
if (isLink) {
|
||||
await fs.copy(realDBPath, dbPath);
|
||||
}
|
||||
// create one if not exists
|
||||
const meta = {
|
||||
id: workspaceId,
|
||||
mainDBPath: dbPath,
|
||||
secondaryDBPath: isLink ? realDBPath : undefined,
|
||||
};
|
||||
await fs.writeJSON(metaPath, meta);
|
||||
return meta;
|
||||
|
||||
Reference in New Issue
Block a user