feat(server): impl doc history (#5004)

This commit is contained in:
liuyi
2023-11-22 07:56:59 +00:00
parent 946b7b4004
commit d1476495ae
18 changed files with 783 additions and 35 deletions

View File

@@ -362,6 +362,14 @@ export interface AFFiNEConfig {
*/
experimentalMergeWithJwstCodec: boolean;
};
history: {
/**
* How long the buffer time of creating a new history snapshot when doc get updated.
*
* in {ms}
*/
interval: number;
};
};
payment: {

View File

@@ -209,6 +209,9 @@ export const getDefaultAFFiNEConfig: () => AFFiNEConfig = () => {
updatePollInterval: 3000,
experimentalMergeWithJwstCodec: false,
},
history: {
interval: 1000 * 60 * 10 /* 10 mins */,
},
},
payment: {
stripe: {

View File

@@ -10,3 +10,4 @@ import { Metrics } from './metrics';
controllers: [MetricsController],
})
export class MetricsModule {}
export { Metrics };

View File

@@ -25,4 +25,7 @@ export class Metrics implements OnModuleDestroy {
authCounter = metricsCreator.counter('auth');
authFailCounter = metricsCreator.counter('auth_fail', ['reason']);
docHistoryCounter = metricsCreator.counter('doc_history_created');
docRecoverCounter = metricsCreator.counter('doc_history_recovered');
}

View File

@@ -0,0 +1,230 @@
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { Cron, CronExpression } from '@nestjs/schedule';
import type { Snapshot } from '@prisma/client';
import { Config } from '../../config';
import { Metrics } from '../../metrics';
import { PrismaService } from '../../prisma';
import { SubscriptionStatus } from '../payment/service';
import { Permission } from '../workspaces/types';
@Injectable()
export class DocHistoryManager {
private readonly logger = new Logger(DocHistoryManager.name);
constructor(
private readonly config: Config,
private readonly db: PrismaService,
private readonly metrics: Metrics
) {}
@OnEvent('doc:manager:snapshot:beforeUpdate')
async onDocUpdated(snapshot: Snapshot, forceCreate = false) {
const last = await this.last(snapshot.workspaceId, snapshot.id);
let shouldCreateHistory = false;
if (!last) {
// never created
shouldCreateHistory = true;
} else if (last.timestamp === snapshot.updatedAt) {
// no change
shouldCreateHistory = false;
} else if (
// force
forceCreate ||
// last history created before interval in configs
last.timestamp.getTime() <
snapshot.updatedAt.getTime() - this.config.doc.history.interval
) {
shouldCreateHistory = true;
}
if (shouldCreateHistory) {
await this.db.snapshotHistory
.create({
select: {
timestamp: true,
},
data: {
workspaceId: snapshot.workspaceId,
id: snapshot.id,
timestamp: snapshot.updatedAt,
blob: snapshot.blob,
state: snapshot.state,
expiredAt: await this.getExpiredDateFromNow(snapshot.workspaceId),
},
})
.catch(() => {
// safe to ignore
// only happens when duplicated history record created in multi processes
});
this.metrics.docHistoryCounter(1, {});
this.logger.log(
`History created for ${snapshot.id} in workspace ${snapshot.workspaceId}.`
);
}
}
async list(
workspaceId: string,
id: string,
before: Date = new Date(),
take: number = 10
) {
return this.db.snapshotHistory.findMany({
select: {
timestamp: true,
},
where: {
workspaceId,
id,
timestamp: {
lte: before,
},
// only include the ones has not expired
expiredAt: {
gt: new Date(),
},
},
orderBy: {
timestamp: 'desc',
},
take,
});
}
async count(workspaceId: string, id: string) {
return this.db.snapshotHistory.count({
where: {
workspaceId,
id,
expiredAt: {
gt: new Date(),
},
},
});
}
async get(workspaceId: string, id: string, timestamp: Date) {
return this.db.snapshotHistory.findUnique({
where: {
workspaceId_id_timestamp: {
workspaceId,
id,
timestamp,
},
expiredAt: {
gt: new Date(),
},
},
});
}
async last(workspaceId: string, id: string) {
return this.db.snapshotHistory.findFirst({
where: {
workspaceId,
id,
},
select: {
timestamp: true,
},
orderBy: {
timestamp: 'desc',
},
});
}
async recover(workspaceId: string, id: string, timestamp: Date) {
const history = await this.db.snapshotHistory.findUnique({
where: {
workspaceId_id_timestamp: {
workspaceId,
id,
timestamp,
},
},
});
if (!history) {
throw new Error('Given history not found');
}
const oldSnapshot = await this.db.snapshot.findUnique({
where: {
id_workspaceId: {
id,
workspaceId,
},
},
});
if (!oldSnapshot) {
// unreachable actually
throw new Error('Given Doc not found');
}
// save old snapshot as one history record
await this.onDocUpdated(oldSnapshot, true);
// WARN:
// we should never do the snapshot updating in recovering,
// which is not the solution in CRDT.
// let user revert in client and update the data in sync system
// `await this.db.snapshot.update();`
this.metrics.docRecoverCounter(1, {});
return history.timestamp;
}
/**
* @todo(@darkskygit) refactor with [Usage Control] system
*/
async getExpiredDateFromNow(workspaceId: string) {
const permission = await this.db.workspaceUserPermission.findFirst({
select: {
userId: true,
},
where: {
workspaceId,
type: Permission.Owner,
},
});
if (!permission) {
// unreachable actually
throw new Error('Workspace owner not found');
}
const sub = await this.db.userSubscription.findFirst({
select: {
id: true,
},
where: {
userId: permission.userId,
status: SubscriptionStatus.Active,
},
});
return new Date(
Date.now() +
1000 *
60 *
60 *
24 *
// 30 days for subscription user, 7 days for free user
(sub ? 30 : 7)
);
}
@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT /* everyday at 12am */)
async cleanupExpiredHistory() {
await this.db.snapshotHistory.deleteMany({
where: {
expiredAt: {
lte: new Date(),
},
},
});
}
}

View File

@@ -1,5 +1,6 @@
import { DynamicModule } from '@nestjs/common';
import { DocHistoryManager } from './history';
import { DocManager } from './manager';
export class DocModule {
@@ -14,12 +15,10 @@ export class DocModule {
provide: 'DOC_MANAGER_AUTOMATION',
useValue: automation,
},
{
provide: DocManager,
useClass: DocManager,
},
DocManager,
DocHistoryManager,
],
exports: [DocManager],
exports: [DocManager, DocHistoryManager],
};
}
@@ -36,4 +35,4 @@ export class DocModule {
}
}
export { DocManager };
export { DocHistoryManager, DocManager };

View File

@@ -1,5 +1,6 @@
import { DynamicModule, Type } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { ScheduleModule } from '@nestjs/schedule';
import { GqlModule } from '../graphql.module';
import { AuthModule } from './auth';
@@ -23,6 +24,7 @@ switch (SERVER_FLAVOR) {
break;
case 'graphql':
BusinessModules.push(
ScheduleModule.forRoot(),
GqlModule,
WorkspaceModule,
UsersModule,
@@ -34,6 +36,7 @@ switch (SERVER_FLAVOR) {
case 'allinone':
default:
BusinessModules.push(
ScheduleModule.forRoot(),
GqlModule,
WorkspaceModule,
UsersModule,

View File

@@ -16,9 +16,10 @@ import { PrismaService } from '../../prisma';
import { StorageProvide } from '../../storage';
import { DocID } from '../../utils/doc';
import { Auth, CurrentUser, Publicable } from '../auth';
import { DocManager } from '../doc';
import { DocHistoryManager, DocManager } from '../doc';
import { UserType } from '../users';
import { PermissionService, PublicPageMode } from './permission';
import { Permission } from './types';
@Controller('/api/workspaces')
export class WorkspacesController {
@@ -28,6 +29,7 @@ export class WorkspacesController {
@Inject(StorageProvide) private readonly storage: Storage,
private readonly permission: PermissionService,
private readonly docManager: DocManager,
private readonly historyManager: DocHistoryManager,
private readonly prisma: PrismaService
) {}
@@ -104,4 +106,47 @@ export class WorkspacesController {
res.send(update);
this.logger.debug(`workspaces doc api: ${format(process.hrtime(start))}`);
}
@Get('/:id/docs/:guid/histories/:timestamp')
@Auth()
async history(
@CurrentUser() user: UserType,
@Param('id') ws: string,
@Param('guid') guid: string,
@Param('timestamp') timestamp: string,
@Res() res: Response
) {
const docId = new DocID(guid, ws);
let ts;
try {
const timeNum = parseInt(timestamp);
if (Number.isNaN(timeNum)) {
throw new Error('Invalid timestamp');
}
ts = new Date(timeNum);
} catch (e) {
throw new Error('Invalid timestamp');
}
await this.permission.checkPagePermission(
docId.workspace,
docId.guid,
user.id,
Permission.Write
);
const history = await this.historyManager.get(
docId.workspace,
docId.guid,
ts
);
if (history) {
res.setHeader('content-type', 'application/octet-stream');
res.send(history.blob);
} else {
throw new NotFoundException('Doc history not found');
}
}
}

View File

@@ -0,0 +1,92 @@
import {
Args,
Field,
GraphQLISODateTime,
Int,
Mutation,
ObjectType,
Parent,
ResolveField,
Resolver,
} from '@nestjs/graphql';
import type { SnapshotHistory } from '@prisma/client';
import { DocID } from '../../utils/doc';
import { Auth, CurrentUser } from '../auth';
import { DocHistoryManager } from '../doc/history';
import { UserType } from '../users';
import { PermissionService } from './permission';
import { WorkspaceType } from './resolver';
import { Permission } from './types';
@ObjectType()
class DocHistoryType implements Partial<SnapshotHistory> {
@Field()
workspaceId!: string;
@Field()
id!: string;
@Field(() => GraphQLISODateTime)
timestamp!: Date;
}
@Resolver(() => WorkspaceType)
export class DocHistoryResolver {
constructor(
private readonly historyManager: DocHistoryManager,
private readonly permission: PermissionService
) {}
@ResolveField(() => [DocHistoryType])
async histories(
@Parent() workspace: WorkspaceType,
@Args('guid') guid: string,
@Args({ name: 'before', type: () => GraphQLISODateTime, nullable: true })
timestamp: Date = new Date(),
@Args({ name: 'take', type: () => Int, nullable: true })
take?: number
): Promise<DocHistoryType[]> {
const docId = new DocID(guid, workspace.id);
if (docId.isWorkspace) {
throw new Error('Invalid guid for listing doc histories.');
}
return this.historyManager
.list(workspace.id, docId.guid, timestamp, take)
.then(rows =>
rows.map(({ timestamp }) => {
return {
workspaceId: workspace.id,
id: docId.guid,
timestamp,
};
})
);
}
@Auth()
@Mutation(() => Date)
async recoverDoc(
@CurrentUser() user: UserType,
@Args('workspaceId') workspaceId: string,
@Args('guid') guid: string,
@Args({ name: 'timestamp', type: () => GraphQLISODateTime }) timestamp: Date
): Promise<Date> {
const docId = new DocID(guid, workspaceId);
if (docId.isWorkspace) {
throw new Error('Invalid guid for recovering doc from history.');
}
await this.permission.checkPagePermission(
docId.workspace,
docId.guid,
user.id,
Permission.Write
);
return this.historyManager.recover(docId.workspace, docId.guid, timestamp);
}
}

View File

@@ -3,6 +3,7 @@ import { Module } from '@nestjs/common';
import { DocModule } from '../doc';
import { UsersService } from '../users';
import { WorkspacesController } from './controller';
import { DocHistoryResolver } from './history.resolver';
import { PermissionService } from './permission';
import { PagePermissionResolver, WorkspaceResolver } from './resolver';
@@ -14,6 +15,7 @@ import { PagePermissionResolver, WorkspaceResolver } from './resolver';
PermissionService,
UsersService,
PagePermissionResolver,
DocHistoryResolver,
],
exports: [PermissionService],
})

View File

@@ -244,18 +244,20 @@ export class PermissionService {
permission = Permission.Read
) {
// check whether page is public
const count = await this.prisma.workspacePage.count({
where: {
workspaceId: ws,
pageId: page,
public: true,
},
});
if (permission === Permission.Read) {
const count = await this.prisma.workspacePage.count({
where: {
workspaceId: ws,
pageId: page,
public: true,
},
});
// page is public
// accessible
if (count > 0) {
return true;
// page is public
// accessible
if (count > 0) {
return true;
}
}
if (user) {

View File

@@ -192,6 +192,7 @@ type WorkspaceType {
"""Public pages of a workspace"""
publicPages: [WorkspacePage!]!
histories(guid: String!, before: DateTime, take: Int): [DocHistoryType!]!
}
type InvitationWorkspaceType {
@@ -232,6 +233,12 @@ enum PublicPageMode {
Edgeless
}
type DocHistoryType {
workspaceId: String!
id: String!
timestamp: DateTime!
}
type Query {
"""Get is owner of workspace"""
isOwner(workspaceId: String!): Boolean!
@@ -288,6 +295,7 @@ type Mutation {
publishPage(workspaceId: String!, pageId: String!, mode: PublicPageMode = Page): WorkspacePage!
revokePage(workspaceId: String!, pageId: String!): Boolean! @deprecated(reason: "use revokePublicPage")
revokePublicPage(workspaceId: String!, pageId: String!): WorkspacePage!
recoverDoc(workspaceId: String!, guid: String!, timestamp: DateTime!): DateTime!
"""Upload user avatar"""
uploadAvatar(avatar: Upload!): UserType!