mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-14 05:14:54 +00:00
feat(core): migration for created by and updated by fields (#12171)
This commit is contained in:
@@ -10,6 +10,7 @@ export { ServerScope } from './scopes/server';
|
||||
export { AuthService } from './services/auth';
|
||||
export { CaptchaService } from './services/captcha';
|
||||
export { DefaultServerService } from './services/default-server';
|
||||
export { DocCreatedByUpdatedBySyncService } from './services/doc-created-by-updated-by-sync';
|
||||
export { EventSourceService } from './services/eventsource';
|
||||
export { FetchService } from './services/fetch';
|
||||
export { GraphQLService } from './services/graphql';
|
||||
@@ -98,6 +99,10 @@ import { UserQuotaStore } from './stores/user-quota';
|
||||
import { UserSettingsStore } from './stores/user-settings';
|
||||
import { DocCreatedByService } from './services/doc-created-by';
|
||||
import { DocUpdatedByService } from './services/doc-updated-by';
|
||||
import { DocCreatedByUpdatedBySyncService } from './services/doc-created-by-updated-by-sync';
|
||||
import { WorkspacePermissionService } from '../permissions';
|
||||
import { DocsService } from '../doc';
|
||||
import { DocCreatedByUpdatedBySyncStore } from './stores/doc-created-by-updated-by-sync';
|
||||
|
||||
export function configureCloudModule(framework: Framework) {
|
||||
configureDefaultAuthProvider(framework);
|
||||
@@ -181,5 +186,15 @@ export function configureCloudModule(framework: Framework) {
|
||||
.entity(WorkspaceInvoices, [WorkspaceService, WorkspaceServerService])
|
||||
.service(SelfhostLicenseService, [SelfhostLicenseStore, WorkspaceService])
|
||||
.store(SelfhostLicenseStore, [WorkspaceServerService])
|
||||
.service(BlocksuiteWriterInfoService, [WorkspaceServerService]);
|
||||
.service(BlocksuiteWriterInfoService, [WorkspaceServerService])
|
||||
.service(DocCreatedByUpdatedBySyncService, [
|
||||
WorkspaceService,
|
||||
DocsService,
|
||||
WorkspacePermissionService,
|
||||
DocCreatedByUpdatedBySyncStore,
|
||||
])
|
||||
.store(DocCreatedByUpdatedBySyncStore, [
|
||||
WorkspaceServerService,
|
||||
WorkspaceService,
|
||||
]);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
import {
|
||||
catchErrorInto,
|
||||
effect,
|
||||
fromPromise,
|
||||
LiveData,
|
||||
onStart,
|
||||
Service,
|
||||
throwIfAborted,
|
||||
} from '@toeverything/infra';
|
||||
import { clamp } from 'lodash-es';
|
||||
import { combineLatest, exhaustMap, finalize, map } from 'rxjs';
|
||||
|
||||
import type { DocsService } from '../../doc';
|
||||
import type { WorkspacePermissionService } from '../../permissions';
|
||||
import type { WorkspaceService } from '../../workspace';
|
||||
import type { DocCreatedByUpdatedBySyncStore } from '../stores/doc-created-by-updated-by-sync';
|
||||
|
||||
/**
|
||||
* This service is used to sync createdBy and updatedBy data from the cloud to local doc properties.
|
||||
*
|
||||
* # When sync is needed
|
||||
*
|
||||
* 1. When the user is an owner or admin
|
||||
* 2. When the root doc sync is complete
|
||||
* 3. When a doc is missing createdBy data
|
||||
* 4. When workspace has not been marked as `DocCreatedByUpdatedBySynced`
|
||||
*/
|
||||
export class DocCreatedByUpdatedBySyncService extends Service {
|
||||
constructor(
|
||||
private readonly workspaceService: WorkspaceService,
|
||||
private readonly docsService: DocsService,
|
||||
private readonly workspacePermissionService: WorkspacePermissionService,
|
||||
private readonly docCreatedByUpdatedBySyncStore: DocCreatedByUpdatedBySyncStore
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
syncing$ = new LiveData(false);
|
||||
error$ = new LiveData<any>(null);
|
||||
// sync progress 0.0 - 1.0
|
||||
progress$ = new LiveData<number>(0);
|
||||
|
||||
sync = effect(
|
||||
exhaustMap(() => {
|
||||
return fromPromise(async (signal?: AbortSignal) => {
|
||||
let afterCursor: string | null = null;
|
||||
let finishedCount = 0;
|
||||
while (true) {
|
||||
const result =
|
||||
await this.docCreatedByUpdatedBySyncStore.getDocCreatedByUpdatedByList(
|
||||
afterCursor
|
||||
);
|
||||
throwIfAborted(signal);
|
||||
|
||||
for (const edge of result.workspace.docs.edges) {
|
||||
const docId = edge.node.id;
|
||||
const docRecord = this.docsService.list.doc$(docId).value;
|
||||
if (docRecord) {
|
||||
if (edge.node.creatorId) {
|
||||
docRecord.setCreatedBy(edge.node.creatorId);
|
||||
}
|
||||
if (edge.node.lastUpdaterId) {
|
||||
docRecord.setUpdatedBy(edge.node.lastUpdaterId);
|
||||
}
|
||||
}
|
||||
finishedCount++;
|
||||
}
|
||||
this.progress$.value = clamp(
|
||||
finishedCount / result.workspace.docs.totalCount,
|
||||
0,
|
||||
1
|
||||
);
|
||||
if (!result.workspace.docs.pageInfo.hasNextPage) {
|
||||
break;
|
||||
}
|
||||
afterCursor = result.workspace.docs.pageInfo.endCursor;
|
||||
}
|
||||
|
||||
this.docCreatedByUpdatedBySyncStore.setDocCreatedByUpdatedBySynced(
|
||||
true
|
||||
);
|
||||
}).pipe(
|
||||
catchErrorInto(this.error$),
|
||||
onStart(() => {
|
||||
this.syncing$.value = true;
|
||||
this.progress$.value = 0;
|
||||
this.error$.value = null;
|
||||
}),
|
||||
finalize(() => {
|
||||
this.syncing$.value = false;
|
||||
})
|
||||
);
|
||||
})
|
||||
);
|
||||
|
||||
private readonly workspaceRootDocSynced$ =
|
||||
this.workspaceService.workspace.engine.doc
|
||||
.docState$(this.workspaceService.workspace.id)
|
||||
.pipe(map(doc => doc.synced));
|
||||
|
||||
private readonly isOwnerOrAdmin$ =
|
||||
this.workspacePermissionService.permission.isOwnerOrAdmin$;
|
||||
|
||||
private readonly missingCreatedBy$ = this.docsService
|
||||
.propertyValues$('createdBy')
|
||||
.pipe(
|
||||
map(allDocsCreatedBy => {
|
||||
let missingCreatedBy = false;
|
||||
console.log(allDocsCreatedBy);
|
||||
for (const createdBy of allDocsCreatedBy.values()) {
|
||||
if (!createdBy) {
|
||||
missingCreatedBy = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return missingCreatedBy;
|
||||
})
|
||||
);
|
||||
|
||||
private readonly markedSynced$ =
|
||||
this.docCreatedByUpdatedBySyncStore.watchDocCreatedByUpdatedBySynced();
|
||||
|
||||
needSync$ = LiveData.from(
|
||||
combineLatest([
|
||||
this.workspaceRootDocSynced$,
|
||||
this.isOwnerOrAdmin$,
|
||||
this.missingCreatedBy$,
|
||||
this.markedSynced$,
|
||||
]).pipe(
|
||||
map(
|
||||
([
|
||||
workspaceRootDocSynced,
|
||||
isOwnerOrAdmin,
|
||||
missingCreatedBy,
|
||||
markedSynced,
|
||||
]) =>
|
||||
workspaceRootDocSynced &&
|
||||
isOwnerOrAdmin &&
|
||||
missingCreatedBy &&
|
||||
!markedSynced
|
||||
)
|
||||
),
|
||||
false
|
||||
);
|
||||
|
||||
override dispose(): void {
|
||||
super.dispose();
|
||||
this.sync.unsubscribe();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
import { getDocCreatedByUpdatedByListQuery } from '@affine/graphql';
|
||||
import { Store, yjsGetPath } from '@toeverything/infra';
|
||||
import type { Observable } from 'rxjs';
|
||||
|
||||
import type { WorkspaceService } from '../../workspace';
|
||||
import type { WorkspaceServerService } from '../services/workspace-server';
|
||||
|
||||
export class DocCreatedByUpdatedBySyncStore extends Store {
|
||||
constructor(
|
||||
private readonly workspaceServerService: WorkspaceServerService,
|
||||
private readonly workspaceService: WorkspaceService
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async getDocCreatedByUpdatedByList(afterCursor?: string | null) {
|
||||
if (!this.workspaceServerService.server) {
|
||||
throw new Error('Server not found');
|
||||
}
|
||||
|
||||
return await this.workspaceServerService.server.gql({
|
||||
query: getDocCreatedByUpdatedByListQuery,
|
||||
variables: {
|
||||
workspaceId: this.workspaceService.workspace.id,
|
||||
pagination: {
|
||||
first: 100,
|
||||
after: afterCursor,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
watchDocCreatedByUpdatedBySynced() {
|
||||
const rootYDoc = this.workspaceService.workspace.rootYDoc;
|
||||
return yjsGetPath(
|
||||
rootYDoc.getMap('affine:workspace-properties'),
|
||||
'docCreatedByUpdatedBySynced'
|
||||
) as Observable<boolean>;
|
||||
}
|
||||
|
||||
setDocCreatedByUpdatedBySynced(synced: boolean) {
|
||||
const rootYDoc = this.workspaceService.workspace.rootYDoc;
|
||||
rootYDoc
|
||||
.getMap('affine:workspace-properties')
|
||||
.set('docCreatedByUpdatedBySynced', synced);
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,9 @@ export class WorkspacePermission extends Entity {
|
||||
);
|
||||
isOwner$ = this.cache$.map(cache => cache?.isOwner ?? null);
|
||||
isAdmin$ = this.cache$.map(cache => cache?.isAdmin ?? null);
|
||||
isOwnerOrAdmin$ = this.cache$.map(
|
||||
cache => (cache?.isOwner ?? null) || (cache?.isAdmin ?? null)
|
||||
);
|
||||
isTeam$ = this.cache$.map(cache => cache?.isTeam ?? null);
|
||||
isRevalidating$ = new LiveData(false);
|
||||
|
||||
|
||||
@@ -46,7 +46,13 @@ import {
|
||||
} from '@toeverything/infra';
|
||||
import { isEqual } from 'lodash-es';
|
||||
import { map, Observable, switchMap, tap } from 'rxjs';
|
||||
import { Doc as YDoc, encodeStateAsUpdate } from 'yjs';
|
||||
import {
|
||||
applyUpdate,
|
||||
type Array as YArray,
|
||||
Doc as YDoc,
|
||||
encodeStateAsUpdate,
|
||||
type Map as YMap,
|
||||
} from 'yjs';
|
||||
|
||||
import type { Server, ServersService } from '../../cloud';
|
||||
import {
|
||||
@@ -209,6 +215,13 @@ class CloudWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
|
||||
});
|
||||
}
|
||||
|
||||
const accountId = this.authService.session.account$.value?.id;
|
||||
await this.writeInitialDocProperties(
|
||||
workspaceId,
|
||||
docStorage,
|
||||
accountId ?? ''
|
||||
);
|
||||
|
||||
docStorage.connection.disconnect();
|
||||
blobStorage.connection.disconnect();
|
||||
|
||||
@@ -533,6 +546,45 @@ class CloudWorkspaceFlavourProvider implements WorkspaceFlavourProvider {
|
||||
};
|
||||
}
|
||||
|
||||
async writeInitialDocProperties(
|
||||
workspaceId: string,
|
||||
docStorage: DocStorage,
|
||||
creatorId: string
|
||||
) {
|
||||
try {
|
||||
const rootDocBuffer = await docStorage.getDoc(workspaceId);
|
||||
const rootDoc = new YDoc({ guid: workspaceId });
|
||||
if (rootDocBuffer) {
|
||||
applyUpdate(rootDoc, rootDocBuffer.bin);
|
||||
}
|
||||
|
||||
const docIds = (
|
||||
rootDoc.getMap('meta').get('pages') as YArray<YMap<string>>
|
||||
)
|
||||
?.map(page => page.get('id'))
|
||||
.filter(Boolean) as string[];
|
||||
|
||||
const propertiesDBBuffer = await docStorage.getDoc('db$docProperties');
|
||||
const propertiesDB = new YDoc({ guid: 'db$docProperties' });
|
||||
if (propertiesDBBuffer) {
|
||||
applyUpdate(propertiesDB, propertiesDBBuffer.bin);
|
||||
}
|
||||
|
||||
for (const docId of docIds) {
|
||||
const docProperties = propertiesDB.getMap(docId);
|
||||
docProperties.set('id', docId);
|
||||
docProperties.set('createdBy', creatorId);
|
||||
}
|
||||
|
||||
await docStorage.pushDocUpdate({
|
||||
docId: 'db$docProperties',
|
||||
bin: encodeStateAsUpdate(propertiesDB),
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('error to write initial doc properties', error);
|
||||
}
|
||||
}
|
||||
|
||||
private waitForLoaded() {
|
||||
return this.isRevalidating$.waitFor(loading => !loading);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user