feat: add editor record (#7938)

fix CLOUD-58, CLOUD-61, CLOUD-62, PD-1607, PD-1608
This commit is contained in:
darkskygit
2024-09-02 09:37:39 +00:00
parent d9cedf89e1
commit d93d39e29d
33 changed files with 622 additions and 55 deletions

View File

@@ -0,0 +1,21 @@
-- AlterTable
ALTER TABLE "snapshot_histories" ADD COLUMN "created_by" VARCHAR;
-- AlterTable
ALTER TABLE "snapshots" ADD COLUMN "created_by" VARCHAR,
ADD COLUMN "updated_by" VARCHAR;
-- AlterTable
ALTER TABLE "updates" ADD COLUMN "created_by" VARCHAR DEFAULT 'system';
-- AddForeignKey
ALTER TABLE "snapshots" ADD CONSTRAINT "snapshots_created_by_fkey" FOREIGN KEY ("created_by") REFERENCES "users"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "snapshots" ADD CONSTRAINT "snapshots_updated_by_fkey" FOREIGN KEY ("updated_by") REFERENCES "users"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "updates" ADD CONSTRAINT "updates_created_by_fkey" FOREIGN KEY ("created_by") REFERENCES "users"("id") ON DELETE SET NULL ON UPDATE CASCADE;
-- AddForeignKey
ALTER TABLE "snapshot_histories" ADD CONSTRAINT "snapshot_histories_created_by_fkey" FOREIGN KEY ("created_by") REFERENCES "users"("id") ON DELETE SET NULL ON UPDATE CASCADE;

View File

@@ -33,6 +33,10 @@ model User {
aiSessions AiSession[]
updatedRuntimeConfigs RuntimeConfig[]
userSnapshots UserSnapshot[]
createdSnapshot Snapshot[] @relation("createdSnapshot")
updatedSnapshot Snapshot[] @relation("updatedSnapshot")
createdUpdate Update[] @relation("createdUpdate")
createdHistory SnapshotHistory[] @relation("createdHistory")
@@index([email])
@@map("users")
@@ -241,9 +245,16 @@ model Snapshot {
// the `updated_at` field will not record the time of record changed,
// but the created time of last seen update that has been merged into snapshot.
updatedAt DateTime @map("updated_at") @db.Timestamptz(3)
createdBy String? @map("created_by") @db.VarChar
updatedBy String? @map("updated_by") @db.VarChar
// should not delete origin snapshot even if user is deleted
// we only delete the snapshot if the workspace is deleted
createdByUser User? @relation(name: "createdSnapshot", fields: [createdBy], references: [id], onDelete: SetNull)
updatedByUser User? @relation(name: "updatedSnapshot", fields: [updatedBy], references: [id], onDelete: SetNull)
// @deprecated use updatedAt only
seq Int? @default(0) @db.Integer
seq Int? @default(0) @db.Integer
// we need to clear all hanging updates and snapshots before enable the foreign key on workspaceId
// workspace Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade)
@@ -274,9 +285,14 @@ model Update {
id String @map("guid") @db.VarChar
blob Bytes @db.ByteA
createdAt DateTime @map("created_at") @db.Timestamptz(3)
// TODO(@darkskygit): fullfill old update, remove default value in next release
createdBy String? @default("system") @map("created_by") @db.VarChar
// will delete createor record if createor's account is deleted
createdByUser User? @relation(name: "createdUpdate", fields: [createdBy], references: [id], onDelete: SetNull)
// @deprecated use createdAt only
seq Int? @db.Integer
seq Int? @db.Integer
@@id([workspaceId, id, createdAt])
@@map("updates")
@@ -289,6 +305,10 @@ model SnapshotHistory {
blob Bytes @db.ByteA
state Bytes? @db.ByteA
expiredAt DateTime @map("expired_at") @db.Timestamptz(3)
createdBy String? @map("created_by") @db.VarChar
// will delete createor record if creator's account is deleted
createdByUser User? @relation(name: "createdHistory", fields: [createdBy], references: [id], onDelete: SetNull)
@@id([workspaceId, id, timestamp])
@@map("snapshot_histories")

View File

@@ -45,7 +45,12 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
return this.getDocSnapshot(spaceId, docId);
}
async pushDocUpdates(userId: string, docId: string, updates: Uint8Array[]) {
async pushDocUpdates(
userId: string,
docId: string,
updates: Uint8Array[],
editorId?: string
) {
if (!updates.length) {
return 0;
}
@@ -67,6 +72,7 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
docId,
bin,
timestamp,
editor: editorId,
});
return timestamp;
@@ -135,6 +141,7 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
docId,
bin: snapshot.blob,
timestamp: snapshot.updatedAt.getTime(),
editor: snapshot.userId,
};
}

View File

@@ -38,7 +38,8 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
async pushDocUpdates(
workspaceId: string,
docId: string,
updates: Uint8Array[]
updates: Uint8Array[],
editorId?: string
) {
if (!updates.length) {
return 0;
@@ -82,6 +83,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
blob: Buffer.from(update),
seq,
createdAt: new Date(createdAt),
createdBy: editorId || null,
};
}),
});
@@ -113,6 +115,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
return rows.map(row => ({
bin: row.blob,
timestamp: row.createdAt.getTime(),
editor: row.createdBy || undefined,
}));
}
@@ -216,6 +219,12 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
const histories = await this.db.snapshotHistory.findMany({
select: {
timestamp: true,
createdByUser: {
select: {
name: true,
avatarUrl: true,
},
},
},
where: {
workspaceId,
@@ -230,7 +239,10 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
take: query.limit,
});
return histories.map(h => h.timestamp.getTime());
return histories.map(h => ({
timestamp: h.timestamp.getTime(),
editor: h.createdByUser,
}));
}
async getDocHistory(workspaceId: string, docId: string, timestamp: number) {
@@ -253,13 +265,15 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
docId,
bin: history.blob,
timestamp,
editor: history.createdBy || undefined,
};
}
override async rollbackDoc(
spaceId: string,
docId: string,
timestamp: number
timestamp: number,
editorId?: string
): Promise<void> {
await using _lock = await this.lockDocForUpdate(spaceId, docId);
const toSnapshot = await this.getDocHistory(spaceId, docId, timestamp);
@@ -274,7 +288,14 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
}
// force create a new history record after rollback
await this.createDocHistory(fromSnapshot, true);
await this.createDocHistory(
{
...fromSnapshot,
// override the editor to the one who requested the rollback
editor: editorId,
},
true
);
// WARN:
// we should never do the snapshot updating in recovering,
// which is not the solution in CRDT.
@@ -331,6 +352,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
id: snapshot.docId,
timestamp: new Date(snapshot.timestamp),
blob: Buffer.from(snapshot.bin),
createdBy: snapshot.editor,
expiredAt: new Date(
Date.now() + (await this.options.historyMaxAge(snapshot.spaceId))
),
@@ -374,6 +396,8 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
docId,
bin: snapshot.blob,
timestamp: snapshot.updatedAt.getTime(),
// creator and editor may null if their account is deleted
editor: snapshot.updatedBy || snapshot.createdBy || undefined,
};
}
@@ -396,10 +420,10 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
try {
const result: { updatedAt: Date }[] = await this.db.$queryRaw`
INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "created_at", "updated_at")
VALUES (${spaceId}, ${docId}, ${bin}, DEFAULT, ${updatedAt})
INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "created_at", "updated_at", "created_by", "updated_by")
VALUES (${spaceId}, ${docId}, ${bin}, DEFAULT, ${updatedAt}, ${snapshot.editor}, ${snapshot.editor})
ON CONFLICT ("workspace_id", "guid")
DO UPDATE SET "blob" = ${bin}, "updated_at" = ${updatedAt}
DO UPDATE SET "blob" = ${bin}, "updated_at" = ${updatedAt}, "updated_by" = ${snapshot.editor}
WHERE "snapshots"."workspace_id" = ${spaceId} AND "snapshots"."guid" = ${docId} AND "snapshots"."updated_at" <= ${updatedAt}
RETURNING "snapshots"."workspace_id" as "workspaceId", "snapshots"."guid" as "id", "snapshots"."updated_at" as "updatedAt"
`;

View File

@@ -22,4 +22,4 @@ import { DocStorageOptions } from './options';
export class DocStorageModule {}
export { PgUserspaceDocStorageAdapter, PgWorkspaceDocStorageAdapter };
export { DocStorageAdapter } from './storage';
export { DocStorageAdapter, type Editor } from './storage';

View File

@@ -16,11 +16,13 @@ export interface DocRecord {
docId: string;
bin: Uint8Array;
timestamp: number;
editor?: string;
}
export interface DocUpdate {
bin: Uint8Array;
timestamp: number;
editor?: string;
}
export interface HistoryFilter {
@@ -28,6 +30,11 @@ export interface HistoryFilter {
limit?: number;
}
export interface Editor {
name: string;
avatarUrl: string | null;
}
export interface DocStorageOptions {
mergeUpdates?: (updates: Uint8Array[]) => Promise<Uint8Array> | Uint8Array;
}
@@ -61,7 +68,7 @@ export abstract class DocStorageAdapter extends Connection {
const updates = await this.getDocUpdates(spaceId, docId);
if (updates.length) {
const { timestamp, bin } = await this.squash(
const { timestamp, bin, editor } = await this.squash(
snapshot ? [snapshot, ...updates] : updates
);
@@ -70,6 +77,7 @@ export abstract class DocStorageAdapter extends Connection {
docId,
bin,
timestamp,
editor,
};
const success = await this.setDocSnapshot(newSnapshot);
@@ -91,7 +99,8 @@ export abstract class DocStorageAdapter extends Connection {
abstract pushDocUpdates(
spaceId: string,
docId: string,
updates: Uint8Array[]
updates: Uint8Array[],
editorId?: string
): Promise<number>;
abstract deleteDoc(spaceId: string, docId: string): Promise<void>;
@@ -99,7 +108,8 @@ export abstract class DocStorageAdapter extends Connection {
async rollbackDoc(
spaceId: string,
docId: string,
timestamp: number
timestamp: number,
editorId?: string
): Promise<void> {
await using _lock = await this.lockDocForUpdate(spaceId, docId);
const toSnapshot = await this.getDocHistory(spaceId, docId, timestamp);
@@ -114,7 +124,7 @@ export abstract class DocStorageAdapter extends Connection {
}
const change = this.generateChangeUpdate(fromSnapshot.bin, toSnapshot.bin);
await this.pushDocUpdates(spaceId, docId, [change]);
await this.pushDocUpdates(spaceId, docId, [change], editorId);
// force create a new history record after rollback
await this.createDocHistory(fromSnapshot, true);
}
@@ -127,7 +137,7 @@ export abstract class DocStorageAdapter extends Connection {
spaceId: string,
docId: string,
query: { skip?: number; limit?: number }
): Promise<number[]>;
): Promise<{ timestamp: number; editor: Editor | null }[]>;
abstract getDocHistory(
spaceId: string,
docId: string,
@@ -173,6 +183,7 @@ export abstract class DocStorageAdapter extends Connection {
return {
bin: finalUpdate,
timestamp: lastUpdate.timestamp,
editor: lastUpdate.editor,
};
}

View File

@@ -28,5 +28,6 @@ export {
DocStorageAdapter,
type DocStorageOptions,
type DocUpdate,
type Editor,
type HistoryFilter,
} from './doc';

View File

@@ -264,9 +264,11 @@ export class SpaceSyncGateway
};
}
@Auth()
@SubscribeMessage('space:push-doc-updates')
async onReceiveDocUpdates(
@ConnectedSocket() client: Socket,
@CurrentUser() user: CurrentUser,
@MessageBody()
message: PushDocUpdatesMessage
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
@@ -277,7 +279,8 @@ export class SpaceSyncGateway
const timestamp = await adapter.push(
spaceId,
docId,
updates.map(update => Buffer.from(update, 'base64'))
updates.map(update => Buffer.from(update, 'base64')),
user.id
);
// could be put in [adapter.push]
@@ -448,8 +451,10 @@ export class SpaceSyncGateway
});
}
@Auth()
@SubscribeMessage('client-update-v2')
async handleClientUpdateV2(
@CurrentUser() user: CurrentUser,
@MessageBody()
{
workspaceId,
@@ -462,7 +467,7 @@ export class SpaceSyncGateway
},
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
return this.onReceiveDocUpdates(client, {
return this.onReceiveDocUpdates(client, user, {
spaceType: SpaceType.Workspace,
spaceId: workspaceId,
docId: guid,
@@ -596,9 +601,9 @@ abstract class SyncSocketAdapter {
permission?: Permission
): Promise<void>;
push(spaceId: string, docId: string, updates: Buffer[]) {
push(spaceId: string, docId: string, updates: Buffer[], editorId: string) {
this.assertIn(spaceId);
return this.storage.pushDocUpdates(spaceId, docId, updates);
return this.storage.pushDocUpdates(spaceId, docId, updates, editorId);
}
get(spaceId: string, docId: string) {
@@ -621,9 +626,14 @@ class WorkspaceSyncAdapter extends SyncSocketAdapter {
super(SpaceType.Workspace, client, storage);
}
override push(spaceId: string, docId: string, updates: Buffer[]) {
override push(
spaceId: string,
docId: string,
updates: Buffer[],
editorId: string
) {
const id = new DocID(docId, spaceId);
return super.push(spaceId, id.guid, updates);
return super.push(spaceId, id.guid, updates, editorId);
}
override get(spaceId: string, docId: string) {

View File

@@ -16,6 +16,7 @@ import { PgWorkspaceDocStorageAdapter } from '../../doc';
import { Permission, PermissionService } from '../../permission';
import { DocID } from '../../utils/doc';
import { WorkspaceType } from '../types';
import { EditorType } from './workspace';
@ObjectType()
class DocHistoryType implements Partial<SnapshotHistory> {
@@ -27,6 +28,9 @@ class DocHistoryType implements Partial<SnapshotHistory> {
@Field(() => GraphQLISODateTime)
timestamp!: Date;
@Field(() => EditorType, { nullable: true })
editor!: EditorType | null;
}
@Resolver(() => WorkspaceType)
@@ -47,17 +51,18 @@ export class DocHistoryResolver {
): Promise<DocHistoryType[]> {
const docId = new DocID(guid, workspace.id);
const timestamps = await this.workspace.listDocHistories(
const histories = await this.workspace.listDocHistories(
workspace.id,
docId.guid,
{ before: timestamp.getTime(), limit: take }
);
return timestamps.map(timestamp => {
return histories.map(history => {
return {
workspaceId: workspace.id,
id: docId.guid,
timestamp: new Date(timestamp),
timestamp: new Date(history.timestamp),
editor: history.editor,
};
});
}
@@ -81,7 +86,8 @@ export class DocHistoryResolver {
await this.workspace.rollbackDoc(
docId.workspace,
docId.guid,
timestamp.getTime()
timestamp.getTime(),
user.id
);
return timestamp;

View File

@@ -1,8 +1,10 @@
import { Logger } from '@nestjs/common';
import {
Args,
Field,
Int,
Mutation,
ObjectType,
Parent,
Query,
ResolveField,
@@ -16,6 +18,7 @@ import { applyUpdate, Doc } from 'yjs';
import type { FileUpload } from '../../../fundamentals';
import {
CantChangeSpaceOwner,
DocNotFound,
EventEmitter,
InternalServerError,
MailService,
@@ -28,6 +31,7 @@ import {
UserNotFound,
} from '../../../fundamentals';
import { CurrentUser, Public } from '../../auth';
import type { Editor } from '../../doc';
import { Permission, PermissionService } from '../../permission';
import { QuotaManagementService, QuotaQueryType } from '../../quota';
import { WorkspaceBlobStorage } from '../../storage';
@@ -40,6 +44,30 @@ import {
} from '../types';
import { defaultWorkspaceAvatar } from '../utils';
@ObjectType()
export class EditorType implements Partial<Editor> {
@Field()
name!: string;
@Field(() => String, { nullable: true })
avatarUrl!: string | null;
}
@ObjectType()
class WorkspacePageMeta {
@Field(() => Date)
createdAt!: Date;
@Field(() => Date)
updatedAt!: Date;
@Field(() => EditorType, { nullable: true })
createdBy!: EditorType | null;
@Field(() => EditorType, { nullable: true })
updatedBy!: EditorType | null;
}
/**
* Workspace resolver
* Public apis rate limit: 10 req/m
@@ -155,6 +183,35 @@ export class WorkspaceResolver {
}));
}
@ResolveField(() => WorkspacePageMeta, {
description: 'Cloud page metadata of workspace',
complexity: 2,
})
async pageMeta(
@Parent() workspace: WorkspaceType,
@Args('pageId') pageId: string
) {
const metadata = await this.prisma.snapshot.findFirst({
where: { workspaceId: workspace.id, id: pageId },
select: {
createdAt: true,
updatedAt: true,
createdByUser: { select: { name: true, avatarUrl: true } },
updatedByUser: { select: { name: true, avatarUrl: true } },
},
});
if (!metadata) {
throw new DocNotFound({ spaceId: workspace.id, docId: pageId });
}
return {
createdAt: metadata.createdAt,
updatedAt: metadata.updatedAt,
createdBy: metadata.createdByUser || null,
updatedBy: metadata.updatedByUser || null,
};
}
@ResolveField(() => QuotaQueryType, {
name: 'quota',
description: 'quota of workspace',

View File

@@ -189,6 +189,7 @@ type DocHistoryNotFoundDataType {
}
type DocHistoryType {
editor: EditorType
id: String!
timestamp: DateTime!
workspaceId: String!
@@ -199,6 +200,11 @@ type DocNotFoundDataType {
spaceId: String!
}
type EditorType {
avatarUrl: String
name: String!
}
union ErrorDataUnion = AlreadyInSpaceDataType | BlobNotFoundDataType | CopilotMessageNotFoundDataType | CopilotPromptNotFoundDataType | CopilotProviderSideErrorDataType | DocAccessDeniedDataType | DocHistoryNotFoundDataType | DocNotFoundDataType | InvalidHistoryTimestampDataType | InvalidPasswordLengthDataType | InvalidRuntimeConfigTypeDataType | MissingOauthQueryParameterDataType | NotInSpaceDataType | RuntimeConfigNotFoundDataType | SameSubscriptionRecurringDataType | SpaceAccessDeniedDataType | SpaceNotFoundDataType | SpaceOwnerNotFoundDataType | SubscriptionAlreadyExistsDataType | SubscriptionNotExistsDataType | SubscriptionPlanNotFoundDataType | UnknownOauthProviderDataType | VersionRejectedDataType
enum ErrorNames {
@@ -875,6 +881,13 @@ type WorkspacePage {
workspaceId: String!
}
type WorkspacePageMeta {
createdAt: DateTime!
createdBy: EditorType
updatedAt: DateTime!
updatedBy: EditorType
}
type WorkspaceType {
"""Available features of workspace"""
availableFeatures: [FeatureType!]!
@@ -905,6 +918,9 @@ type WorkspaceType {
"""Owner of workspace"""
owner: UserType!
"""Cloud page metadata of workspace"""
pageMeta(pageId: String!): WorkspacePageMeta!
"""Permission of current signed in user in workspace"""
permission: Permission!

View File

@@ -48,6 +48,8 @@ const snapshot: Snapshot = {
seq: 0,
updatedAt: new Date(),
createdAt: new Date(),
createdBy: null,
updatedBy: null,
};
function getSnapshot(timestamp: number = Date.now()): DocRecord {

View File

@@ -177,6 +177,7 @@ test('should be able to merge updates as snapshot', async t => {
blob: Buffer.from(update),
seq: 1,
createdAt: new Date(Date.now() + 1),
createdBy: null,
},
],
});
@@ -199,6 +200,7 @@ test('should be able to merge updates as snapshot', async t => {
blob: appendUpdate,
seq: 2,
createdAt: new Date(),
createdBy: null,
},
});