refactor(server): use @nestjs-cls/transactional to impl database transaction (#9759)

This commit is contained in:
fengmk2
2025-01-21 06:43:29 +00:00
parent 07c32d016d
commit 90b4dc5c00
8 changed files with 180 additions and 132 deletions

View File

@@ -26,6 +26,8 @@
"@google-cloud/opentelemetry-cloud-monitoring-exporter": "^0.20.0",
"@google-cloud/opentelemetry-cloud-trace-exporter": "^2.4.1",
"@google-cloud/opentelemetry-resource-util": "^2.4.0",
"@nestjs-cls/transactional": "^2.4.4",
"@nestjs-cls/transactional-adapter-prisma": "^1.2.7",
"@nestjs/apollo": "^12.2.2",
"@nestjs/common": "^10.4.15",
"@nestjs/core": "^10.4.15",

View File

@@ -326,7 +326,7 @@ test('should grant member with read permission and Pending status by default', a
t.true(
updatedSpy.calledOnceWith({
workspaceId: workspace.id,
count: 2,
count: 1,
})
);
});

View File

@@ -7,6 +7,9 @@ import {
Module,
} from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';
import { ClsPluginTransactional } from '@nestjs-cls/transactional';
import { TransactionalAdapterPrisma } from '@nestjs-cls/transactional-adapter-prisma';
import { PrismaClient } from '@prisma/client';
import { get } from 'lodash-es';
import { ClsModule } from 'nestjs-cls';
@@ -55,6 +58,14 @@ export const FunctionalityModules = [
return randomUUID();
},
},
plugins: [
// https://papooch.github.io/nestjs-cls/plugins/available-plugins/transactional/prisma-adapter
new ClsPluginTransactional({
adapter: new TransactionalAdapterPrisma({
prismaInjectionToken: PrismaClient,
}),
}),
],
}),
ConfigModule.forRoot(),
RuntimeModule,

View File

@@ -1,4 +1,6 @@
import { Inject, Logger } from '@nestjs/common';
import { TransactionHost } from '@nestjs-cls/transactional';
import type { TransactionalAdapterPrisma } from '@nestjs-cls/transactional-adapter-prisma';
import { PrismaClient } from '@prisma/client';
import { Config } from '../base';
@@ -16,4 +18,11 @@ export class BaseModel {
@Inject(PrismaClient)
protected readonly db!: PrismaClient;
@Inject(TransactionHost)
private readonly txHost!: TransactionHost<TransactionalAdapterPrisma>;
protected get tx() {
return this.txHost.tx;
}
}

View File

@@ -1,8 +1,8 @@
import { Injectable } from '@nestjs/common';
import { Transactional } from '@nestjs-cls/transactional';
import { Feature } from '@prisma/client';
import { z } from 'zod';
import { PrismaTransaction } from '../base';
import { BaseModel } from './base';
import { Features, FeatureType } from './common';
@@ -20,7 +20,7 @@ type FeatureConfigs<T extends FeatureNames> = z.infer<
@Injectable()
export class FeatureModel extends BaseModel {
async get<T extends FeatureNames>(name: T) {
const feature = await this.getLatest(this.db, name);
const feature = await this.getLatest(name);
// All features are hardcoded in the codebase
// It would be a fatal error if the feature is not found in DB.
@@ -43,6 +43,7 @@ export class FeatureModel extends BaseModel {
};
}
@Transactional()
async upsert<T extends FeatureNames>(name: T, configs: FeatureConfigs<T>) {
const shape = this.getConfigShape(name);
const parseResult = shape.safeParse(configs);
@@ -58,37 +59,33 @@ export class FeatureModel extends BaseModel {
// TODO(@forehalo):
// could be a simple upsert operation, but we got useless `version` column in the database
// will be fixed when `version` column gets deprecated
const feature = await this.db.$transaction(async tx => {
const latest = await this.getLatest(tx, name);
const latest = await this.getLatest(name);
if (!latest) {
return await tx.feature.create({
data: {
type: FeatureType.Feature,
feature: name,
configs: parsedConfigs,
},
});
} else {
return await tx.feature.update({
where: { id: latest.id },
data: {
configs: parsedConfigs,
},
});
}
});
let feature: Feature;
if (!latest) {
feature = await this.tx.feature.create({
data: {
type: FeatureType.Feature,
feature: name,
configs: parsedConfigs,
},
});
} else {
feature = await this.tx.feature.update({
where: { id: latest.id },
data: {
configs: parsedConfigs,
},
});
}
this.logger.verbose(`Feature ${name} upserted`);
return feature as Feature & { configs: FeatureConfigs<T> };
}
private async getLatest<T extends FeatureNames>(
client: PrismaTransaction,
name: T
) {
return client.feature.findFirst({
private async getLatest<T extends FeatureNames>(name: T) {
return this.tx.feature.findFirst({
where: { feature: name },
orderBy: { version: 'desc' },
});

View File

@@ -1,4 +1,5 @@
import { Injectable } from '@nestjs/common';
import { Transactional } from '@nestjs-cls/transactional';
import {
type WorkspacePage as Page,
type WorkspacePageUserPermission as PageUserPermission,
@@ -87,6 +88,7 @@ export class PageModel extends BaseModel {
/**
* Grant the page member with the given permission.
*/
@Transactional()
async grantMember(
workspaceId: string,
pageId: string,
@@ -105,50 +107,48 @@ export class PageModel extends BaseModel {
// If the user is already accepted and the new permission is owner, we need to revoke old owner
if (!data || data.type !== permission) {
return await this.db.$transaction(async tx => {
if (data) {
// Update the permission
data = await tx.workspacePageUserPermission.update({
where: {
workspaceId_pageId_userId: {
workspaceId,
pageId,
userId,
},
},
data: { type: permission },
});
} else {
// Create a new permission
data = await tx.workspacePageUserPermission.create({
data: {
if (data) {
// Update the permission
data = await this.tx.workspacePageUserPermission.update({
where: {
workspaceId_pageId_userId: {
workspaceId,
pageId,
userId,
type: permission,
// page permission does not require invitee to accept, the accepted field will be deprecated later.
accepted: true,
},
});
}
},
data: { type: permission },
});
} else {
// Create a new permission
data = await this.tx.workspacePageUserPermission.create({
data: {
workspaceId,
pageId,
userId,
type: permission,
// page permission does not require invitee to accept, the accepted field will be deprecated later.
accepted: true,
},
});
}
// If the new permission is owner, we need to revoke old owner
if (permission === Permission.Owner) {
await tx.workspacePageUserPermission.updateMany({
where: {
workspaceId,
pageId,
type: Permission.Owner,
userId: { not: userId },
},
data: { type: Permission.Admin },
});
this.logger.log(
`Change owner of workspace ${workspaceId} page ${pageId} to user ${userId}`
);
}
return data;
});
// If the new permission is owner, we need to revoke old owner
if (permission === Permission.Owner) {
await this.tx.workspacePageUserPermission.updateMany({
where: {
workspaceId,
pageId,
type: Permission.Owner,
userId: { not: userId },
},
data: { type: Permission.Admin },
});
this.logger.log(
`Change owner of workspace ${workspaceId} page ${pageId} to user ${userId}`
);
}
return data;
}
// nothing to do

View File

@@ -1,4 +1,5 @@
import { Injectable } from '@nestjs/common';
import { Transactional } from '@nestjs-cls/transactional';
import {
type Workspace,
WorkspaceMemberStatus,
@@ -37,7 +38,7 @@ export class WorkspaceModel extends BaseModel {
* Create a new workspace for the user, default to private.
*/
async create(userId: string) {
const workspace = await this.db.workspace.create({
const workspace = await this.tx.workspace.create({
data: {
public: false,
permissions: {
@@ -58,7 +59,7 @@ export class WorkspaceModel extends BaseModel {
* Update the workspace with the given data.
*/
async update(workspaceId: string, data: UpdateWorkspaceInput) {
await this.db.workspace.update({
await this.tx.workspace.update({
where: {
id: workspaceId,
},
@@ -78,7 +79,7 @@ export class WorkspaceModel extends BaseModel {
}
async delete(workspaceId: string) {
await this.db.workspace.deleteMany({
await this.tx.workspace.deleteMany({
where: {
id: workspaceId,
},
@@ -125,13 +126,14 @@ export class WorkspaceModel extends BaseModel {
/**
* Grant the workspace member with the given permission and status.
*/
@Transactional()
async grantMember(
workspaceId: string,
userId: string,
permission: Permission = Permission.Read,
status: WorkspaceMemberStatus = WorkspaceMemberStatus.Pending
): Promise<WorkspaceUserPermission> {
const data = await this.db.workspaceUserPermission.findUnique({
const data = await this.tx.workspaceUserPermission.findUnique({
where: {
workspaceId_userId: {
workspaceId,
@@ -143,7 +145,7 @@ export class WorkspaceModel extends BaseModel {
if (!data) {
// Create a new permission
// TODO(fengmk2): should we check the permission here? Like owner can't be pending?
const created = await this.db.workspaceUserPermission.create({
const created = await this.tx.workspaceUserPermission.create({
data: {
workspaceId,
userId,
@@ -151,41 +153,42 @@ export class WorkspaceModel extends BaseModel {
status,
},
});
this.logger.log(
`Granted workspace ${workspaceId} member ${userId} with permission ${permission}`
);
await this.notifyMembersUpdated(workspaceId);
return created;
}
// If the user is already accepted and the new permission is owner, we need to revoke old owner
if (data.status === WorkspaceMemberStatus.Accepted || data.accepted) {
return await this.db.$transaction(async tx => {
const updated = await tx.workspaceUserPermission.update({
where: {
workspaceId_userId: { workspaceId, userId },
},
data: { type: permission },
});
// If the new permission is owner, we need to revoke old owner
if (permission === Permission.Owner) {
await tx.workspaceUserPermission.updateMany({
where: {
workspaceId,
type: Permission.Owner,
userId: { not: userId },
},
data: { type: Permission.Admin },
});
this.logger.log(
`Change owner of workspace ${workspaceId} to ${userId}`
);
}
return updated;
const updated = await this.tx.workspaceUserPermission.update({
where: {
workspaceId_userId: { workspaceId, userId },
},
data: { type: permission },
});
// If the new permission is owner, we need to revoke old owner
if (permission === Permission.Owner) {
await this.tx.workspaceUserPermission.updateMany({
where: {
workspaceId,
type: Permission.Owner,
userId: { not: userId },
},
data: { type: Permission.Admin },
});
this.logger.log(
`Change owner of workspace ${workspaceId} to ${userId}`
);
}
return updated;
}
// If the user is not accepted, we can update the status directly
const allowedStatus = this.getAllowedStatusSource(data.status);
if (allowedStatus.includes(status)) {
const updated = await this.db.workspaceUserPermission.update({
const updated = await this.tx.workspaceUserPermission.update({
where: { workspaceId_userId: { workspaceId, userId } },
data: {
status,
@@ -204,7 +207,7 @@ export class WorkspaceModel extends BaseModel {
* Get the workspace member invitation.
*/
async getMemberInvitation(invitationId: string) {
return await this.db.workspaceUserPermission.findUnique({
return await this.tx.workspaceUserPermission.findUnique({
where: {
id: invitationId,
},
@@ -220,7 +223,7 @@ export class WorkspaceModel extends BaseModel {
workspaceId: string,
status: WorkspaceMemberStatus = WorkspaceMemberStatus.Accepted
) {
const { count } = await this.db.workspaceUserPermission.updateMany({
const { count } = await this.tx.workspaceUserPermission.updateMany({
where: {
id: invitationId,
workspaceId: workspaceId,
@@ -348,7 +351,7 @@ export class WorkspaceModel extends BaseModel {
return false;
}
await this.db.workspaceUserPermission.deleteMany({
await this.tx.workspaceUserPermission.deleteMany({
where: {
workspaceId,
userId,
@@ -407,6 +410,7 @@ export class WorkspaceModel extends BaseModel {
/**
* Refresh the workspace member seat status.
*/
@Transactional()
async refreshMemberSeatStatus(workspaceId: string, memberLimit: number) {
const usedCount = await this.getMemberUsedCount(workspaceId);
const availableCount = memberLimit - usedCount;
@@ -414,43 +418,41 @@ export class WorkspaceModel extends BaseModel {
return;
}
return await this.db.$transaction(async tx => {
const members = await tx.workspaceUserPermission.findMany({
select: { id: true, status: true },
where: {
workspaceId,
status: {
in: [
WorkspaceMemberStatus.NeedMoreSeat,
WorkspaceMemberStatus.NeedMoreSeatAndReview,
],
},
const members = await this.tx.workspaceUserPermission.findMany({
select: { id: true, status: true },
where: {
workspaceId,
status: {
in: [
WorkspaceMemberStatus.NeedMoreSeat,
WorkspaceMemberStatus.NeedMoreSeatAndReview,
],
},
// find the oldest members first
orderBy: { createdAt: 'asc' },
});
const needChange = members.slice(0, availableCount);
const groups = groupBy(needChange, m => m.status);
const toPendings = groups.NeedMoreSeat;
if (toPendings) {
// NeedMoreSeat => Pending
await tx.workspaceUserPermission.updateMany({
where: { id: { in: toPendings.map(m => m.id) } },
data: { status: WorkspaceMemberStatus.Pending },
});
}
const toUnderReviews = groups.NeedMoreSeatAndReview;
if (toUnderReviews) {
// NeedMoreSeatAndReview => UnderReview
await tx.workspaceUserPermission.updateMany({
where: { id: { in: toUnderReviews.map(m => m.id) } },
data: { status: WorkspaceMemberStatus.UnderReview },
});
}
},
// find the oldest members first
orderBy: { createdAt: 'asc' },
});
const needChange = members.slice(0, availableCount);
const groups = groupBy(needChange, m => m.status);
const toPendings = groups.NeedMoreSeat;
if (toPendings) {
// NeedMoreSeat => Pending
await this.tx.workspaceUserPermission.updateMany({
where: { id: { in: toPendings.map(m => m.id) } },
data: { status: WorkspaceMemberStatus.Pending },
});
}
const toUnderReviews = groups.NeedMoreSeatAndReview;
if (toUnderReviews) {
// NeedMoreSeatAndReview => UnderReview
await this.tx.workspaceUserPermission.updateMany({
where: { id: { in: toUnderReviews.map(m => m.id) } },
data: { status: WorkspaceMemberStatus.UnderReview },
});
}
}
/**