mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-27 02:42:25 +08:00
fix(core): electron storage sync (#13213)
#### PR Dependency Tree * **PR #13213** 👈 This tree was auto-generated by [Charcoal](https://github.com/danerwilliams/charcoal) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added version tracking for global state and cache updates, enabling synchronized updates across multiple windows. * Introduced a unique client identifier to prevent processing self-originated updates. * **Refactor** * Improved event broadcasting for global state and cache changes, ensuring more reliable and efficient update propagation. * **Chores** * Updated internal logic to support structured event formats and revision management for shared storage. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
@@ -1,25 +1,17 @@
|
||||
import type { MainEventRegister } from '../type';
|
||||
import { globalCacheStorage, globalStateStorage } from './storage';
|
||||
import { globalCacheUpdates$, globalStateUpdates$ } from './handlers';
|
||||
|
||||
export const sharedStorageEvents = {
|
||||
onGlobalStateChanged: (
|
||||
fn: (state: Record<string, unknown | undefined>) => void
|
||||
) => {
|
||||
const subscription = globalStateStorage.watchAll().subscribe(updates => {
|
||||
fn(updates);
|
||||
});
|
||||
return () => {
|
||||
subscription.unsubscribe();
|
||||
};
|
||||
const subscription = globalStateUpdates$.subscribe(fn);
|
||||
return () => subscription.unsubscribe();
|
||||
},
|
||||
onGlobalCacheChanged: (
|
||||
fn: (state: Record<string, unknown | undefined>) => void
|
||||
) => {
|
||||
const subscription = globalCacheStorage.watchAll().subscribe(updates => {
|
||||
fn(updates);
|
||||
});
|
||||
return () => {
|
||||
subscription.unsubscribe();
|
||||
};
|
||||
const subscription = globalCacheUpdates$.subscribe(fn);
|
||||
return () => subscription.unsubscribe();
|
||||
},
|
||||
} satisfies Record<string, MainEventRegister>;
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
import { Subject } from 'rxjs';
|
||||
|
||||
import type { NamespaceHandlers } from '../type';
|
||||
import { globalCacheStorage, globalStateStorage } from './storage';
|
||||
|
||||
// Subjects used by shared-storage/events.ts to broadcast updates to all renderer processes
|
||||
export const globalStateUpdates$ = new Subject<Record<string, any>>();
|
||||
export const globalCacheUpdates$ = new Subject<Record<string, any>>();
|
||||
|
||||
// Revision maps; main generates the next value each time
|
||||
const globalStateRevisions = new Map<string, number>();
|
||||
const globalCacheRevisions = new Map<string, number>();
|
||||
|
||||
function nextRev(revisions: Map<string, number>, key: string) {
|
||||
const r = (revisions.get(key) ?? 0) + 1;
|
||||
revisions.set(key, r);
|
||||
return r;
|
||||
}
|
||||
|
||||
export const sharedStorageHandlers = {
|
||||
getAllGlobalState: async () => {
|
||||
return globalStateStorage.all();
|
||||
@@ -8,22 +24,36 @@ export const sharedStorageHandlers = {
|
||||
getAllGlobalCache: async () => {
|
||||
return globalCacheStorage.all();
|
||||
},
|
||||
setGlobalState: async (_, key: string, value: any) => {
|
||||
return globalStateStorage.set(key, value);
|
||||
|
||||
setGlobalState: async (_e, key: string, value: any, sourceId?: string) => {
|
||||
const rev = nextRev(globalStateRevisions, key);
|
||||
globalStateStorage.set(key, value);
|
||||
globalStateUpdates$.next({ [key]: { v: value, r: rev, s: sourceId } });
|
||||
},
|
||||
delGlobalState: async (_, key: string) => {
|
||||
return globalStateStorage.del(key);
|
||||
delGlobalState: async (_e, key: string, sourceId?: string) => {
|
||||
const rev = nextRev(globalStateRevisions, key);
|
||||
globalStateStorage.del(key);
|
||||
globalStateUpdates$.next({ [key]: { v: undefined, r: rev, s: sourceId } });
|
||||
},
|
||||
clearGlobalState: async () => {
|
||||
return globalStateStorage.clear();
|
||||
clearGlobalState: async (_e, sourceId?: string) => {
|
||||
globalStateRevisions.clear();
|
||||
globalStateStorage.clear();
|
||||
globalStateUpdates$.next({ '*': { v: undefined, r: 0, s: sourceId } });
|
||||
},
|
||||
setGlobalCache: async (_, key: string, value: any) => {
|
||||
return globalCacheStorage.set(key, value);
|
||||
|
||||
setGlobalCache: async (_e, key: string, value: any, sourceId?: string) => {
|
||||
const rev = nextRev(globalCacheRevisions, key);
|
||||
globalCacheStorage.set(key, value);
|
||||
globalCacheUpdates$.next({ [key]: { v: value, r: rev, s: sourceId } });
|
||||
},
|
||||
delGlobalCache: async (_, key: string) => {
|
||||
return globalCacheStorage.del(key);
|
||||
delGlobalCache: async (_e, key: string, sourceId?: string) => {
|
||||
const rev = nextRev(globalCacheRevisions, key);
|
||||
globalCacheStorage.del(key);
|
||||
globalCacheUpdates$.next({ [key]: { v: undefined, r: rev, s: sourceId } });
|
||||
},
|
||||
clearGlobalCache: async () => {
|
||||
return globalCacheStorage.clear();
|
||||
clearGlobalCache: async (_e, sourceId?: string) => {
|
||||
globalCacheRevisions.clear();
|
||||
globalCacheStorage.clear();
|
||||
globalCacheUpdates$.next({ '*': { v: undefined, r: 0, s: sourceId } });
|
||||
},
|
||||
} satisfies NamespaceHandlers;
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
AFFINE_EVENT_CHANNEL_NAME,
|
||||
} from '../shared/type';
|
||||
|
||||
// Load persisted data from main process synchronously at preload time
|
||||
const initialGlobalState = ipcRenderer.sendSync(
|
||||
AFFINE_API_CHANNEL_NAME,
|
||||
'sharedStorage:getAllGlobalState'
|
||||
@@ -15,6 +16,9 @@ const initialGlobalCache = ipcRenderer.sendSync(
|
||||
'sharedStorage:getAllGlobalCache'
|
||||
);
|
||||
|
||||
// Unique id for this renderer instance, used to ignore self-originated broadcasts
|
||||
const CLIENT_ID: string = Math.random().toString(36).slice(2);
|
||||
|
||||
function invokeWithCatch(key: string, ...args: any[]) {
|
||||
ipcRenderer.invoke(AFFINE_API_CHANNEL_NAME, key, ...args).catch(err => {
|
||||
console.error(`Failed to invoke ${key}`, err);
|
||||
@@ -34,7 +38,23 @@ function createSharedStorageApi(
|
||||
memory.setAll(init);
|
||||
ipcRenderer.on(AFFINE_EVENT_CHANNEL_NAME, (_event, channel, updates) => {
|
||||
if (channel === `sharedStorage:${event}`) {
|
||||
for (const [key, value] of Object.entries(updates)) {
|
||||
for (const [key, raw] of Object.entries(updates)) {
|
||||
// support both legacy plain value and new { v, r, s } structure
|
||||
let value: any;
|
||||
let source: string | undefined;
|
||||
|
||||
if (raw && typeof raw === 'object' && 'v' in raw) {
|
||||
value = (raw as any).v;
|
||||
source = (raw as any).s;
|
||||
} else {
|
||||
value = raw;
|
||||
}
|
||||
|
||||
// Ignore our own broadcasts
|
||||
if (source && source === CLIENT_ID) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (value === undefined) {
|
||||
memory.del(key);
|
||||
} else {
|
||||
@@ -47,11 +67,11 @@ function createSharedStorageApi(
|
||||
return {
|
||||
del(key: string) {
|
||||
memory.del(key);
|
||||
invokeWithCatch(`sharedStorage:${api.del}`, key);
|
||||
invokeWithCatch(`sharedStorage:${api.del}`, key, CLIENT_ID);
|
||||
},
|
||||
clear() {
|
||||
memory.clear();
|
||||
invokeWithCatch(`sharedStorage:${api.clear}`);
|
||||
invokeWithCatch(`sharedStorage:${api.clear}`, CLIENT_ID);
|
||||
},
|
||||
get<T>(key: string): T | undefined {
|
||||
return memory.get(key);
|
||||
@@ -61,7 +81,7 @@ function createSharedStorageApi(
|
||||
},
|
||||
set(key: string, value: unknown) {
|
||||
memory.set(key, value);
|
||||
invokeWithCatch(`sharedStorage:${api.set}`, key, value);
|
||||
invokeWithCatch(`sharedStorage:${api.set}`, key, value, CLIENT_ID);
|
||||
},
|
||||
watch<T>(key: string, cb: (i: T | undefined) => void): () => void {
|
||||
const subscription = memory.watch(key).subscribe(i => cb(i as T));
|
||||
|
||||
Reference in New Issue
Block a user