refactor(server): use doc model on PgWorkspaceDocStorageAdapter (#9852)

close CLOUD-104
This commit is contained in:
fengmk2
2025-02-07 12:21:58 +00:00
parent 0b9d30b55a
commit 36ed81bcc6
5 changed files with 105 additions and 425 deletions

View File

@@ -243,7 +243,7 @@ test('should be able to get last history record', async t => {
);
t.truthy(history);
t.is(history?.timestamp.getTime(), timestamp + 9);
t.is(history?.timestamp, timestamp + 9);
});
test('should be able to recover from history', async t => {

View File

@@ -41,10 +41,7 @@ test.after.always(async () => {
await m?.close();
});
/**
* @deprecated `seq` would be removed
*/
test('should have sequential update number', async t => {
test('should have timestamp update', async t => {
const doc = new YDoc();
const text = doc.getText('content');
const updates: Buffer[] = [];
@@ -59,7 +56,6 @@ test('should have sequential update number', async t => {
await adapter.pushDocUpdates('2', '2', updates);
// [1,2,3]
let records = await db.update.findMany({
where: {
workspaceId: '2',
@@ -67,27 +63,16 @@ test('should have sequential update number', async t => {
},
});
let firstTimestamp = records[0].createdAt.getTime();
t.deepEqual(
records.map(({ seq }) => seq),
[1, 2, 3]
records.map(({ createdAt }) => createdAt.getTime()),
[firstTimestamp, firstTimestamp + 1, firstTimestamp + 2]
);
// merge
await adapter.getDoc('2', '2');
// fake the seq num is about to overflow
await db.snapshot.update({
where: {
workspaceId_id: {
id: '2',
workspaceId: '2',
},
},
data: {
seq: 0x3ffffffe,
},
});
// change timestamp again
await adapter.pushDocUpdates('2', '2', updates);
records = await db.update.findMany({
@@ -97,12 +82,13 @@ test('should have sequential update number', async t => {
},
});
firstTimestamp = records[0].createdAt.getTime();
t.deepEqual(
records.map(({ seq }) => seq),
[0x3ffffffe + 1, 0x3ffffffe + 2, 0x3ffffffe + 3]
records.map(({ createdAt }) => createdAt.getTime()),
[firstTimestamp, firstTimestamp + 1, firstTimestamp + 2]
);
// push a new update with new seq num
// push a new update
await adapter.pushDocUpdates('2', '2', updates.slice(0, 1));
// let the manager ignore update with the new seq num
@@ -174,7 +160,6 @@ test('should be able to merge updates as snapshot', async t => {
id: '1',
workspaceId: '1',
blob: Buffer.from(update),
seq: 1,
createdAt: new Date(Date.now() + 1),
createdBy: null,
},
@@ -197,7 +182,6 @@ test('should be able to merge updates as snapshot', async t => {
workspaceId: '1',
id: '1',
blob: appendUpdate,
seq: 2,
createdAt: new Date(),
createdBy: null,
},

View File

@@ -52,14 +52,12 @@ test('should create a batch updates on a doc', async t => {
blob: Buffer.from('blob1'),
timestamp: Date.now(),
editorId: user.id,
seq: 1,
},
{
spaceId: workspace.id,
docId,
blob: Buffer.from('blob2'),
timestamp: Date.now() + 1000,
seq: 2,
},
]);
t.is(updates.count, 2);
@@ -75,7 +73,6 @@ test('should create error when createdAt timestamp is not unique', async t => {
blob: Buffer.from('blob1'),
timestamp,
editorId: user.id,
seq: 1,
},
]);
await t.throwsAsync(
@@ -86,7 +83,6 @@ test('should create error when createdAt timestamp is not unique', async t => {
blob: Buffer.from('blob2'),
timestamp,
editorId: user.id,
seq: 2,
},
]),
{
@@ -105,7 +101,6 @@ test('should find updates by spaceId and docId', async t => {
blob: Buffer.from('blob1'),
timestamp: Date.now(),
editorId: user.id,
seq: 1,
},
{
spaceId: workspace.id,
@@ -113,7 +108,6 @@ test('should find updates by spaceId and docId', async t => {
blob: Buffer.from('blob2'),
timestamp: Date.now() + 1000,
editorId: user.id,
seq: 2,
},
]);
const foundUpdates = await t.context.doc.findUpdates(workspace.id, docId);
@@ -130,7 +124,6 @@ test('should find updates by spaceId and docId', async t => {
blob: Buffer.from('blob3'),
timestamp: Date.now(),
editorId: user.id,
seq: 3,
},
]);
count = await t.context.doc.getUpdateCount(workspace.id, docId);
@@ -147,14 +140,12 @@ test('should delete updates by spaceId, docId, and createdAts', async t => {
blob: Buffer.from('blob1'),
timestamp: timestamps[0],
editorId: user.id,
seq: 1,
},
{
spaceId: workspace.id,
docId,
blob: Buffer.from('blob2'),
timestamp: timestamps[1],
seq: 2,
},
]);
let count = await t.context.doc.deleteUpdates(
@@ -181,7 +172,6 @@ test('should get global update count', async t => {
blob: Buffer.from('blob1'),
timestamp: Date.now(),
editorId: user.id,
seq: 1,
},
{
spaceId: workspace.id,
@@ -189,7 +179,6 @@ test('should get global update count', async t => {
blob: Buffer.from('blob2'),
timestamp: Date.now() + 1000,
editorId: user.id,
seq: 2,
},
{
spaceId: workspace.id,
@@ -197,7 +186,6 @@ test('should get global update count', async t => {
blob: Buffer.from('blob2'),
timestamp: Date.now() + 1000,
editorId: user.id,
seq: 2,
},
]);
const count = await t.context.doc.getGlobalUpdateCount();
@@ -449,7 +437,6 @@ test('should delete a doc, including histories, snapshots and updates', async t
blob: Buffer.from('blob2'),
timestamp: Date.now(),
editorId: user.id,
seq: 1,
},
]);
await t.context.doc.delete(workspace.id, docId);
@@ -490,7 +477,6 @@ test('should delete all docs in a workspace', async t => {
blob: Buffer.from('blob2'),
timestamp: Date.now(),
editorId: user.id,
seq: 1,
},
]);
await t.context.doc.upsert(snapshot2);
@@ -502,7 +488,6 @@ test('should delete all docs in a workspace', async t => {
blob: Buffer.from('blob2'),
timestamp: Date.now(),
editorId: user.id,
seq: 1,
},
]);
const deletedCount = await t.context.doc.deleteAllByWorkspaceId(workspace.id);
@@ -555,7 +540,6 @@ test('should find all docs timestamps in a workspace', async t => {
blob: Buffer.from('blob2'),
timestamp: timestamp3,
editorId: user.id,
seq: 1,
},
]);
await t.context.doc.upsert(snapshot2);
@@ -568,22 +552,30 @@ test('should find all docs timestamps in a workspace', async t => {
});
});
test('should increase doc seq', async t => {
test('should detect doc exists or not', async t => {
const docId = randomUUID();
t.false(await t.context.doc.exists(workspace.id, docId));
const snapshot = {
spaceId: workspace.id,
docId,
docId: docId,
blob: Buffer.from('blob1'),
timestamp: Date.now(),
editorId: user.id,
};
await t.context.doc.upsert(snapshot);
const seq1 = await t.context.doc.increaseSeq(workspace.id, docId, 88);
t.is(seq1, 88);
const seq2 = await t.context.doc.increaseSeq(workspace.id, docId, 2);
t.is(seq2, 90);
// hit max seq, then reset to zero
await t.context.doc.increaseSeq(workspace.id, docId, 0x3fffffff);
const seq3 = await t.context.doc.increaseSeq(workspace.id, docId, 1);
t.is(seq3, 1);
t.true(await t.context.doc.exists(workspace.id, docId));
});
test('should detect doc exists on only updates exists', async t => {
const docId = randomUUID();
await t.context.doc.createUpdates([
{
spaceId: workspace.id,
docId: docId,
blob: Buffer.from('blob2'),
timestamp: Date.now(),
editorId: user.id,
},
]);
t.true(await t.context.doc.exists(workspace.id, docId));
});

View File

@@ -1,5 +1,4 @@
import { Injectable, Logger } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
import { chunk } from 'lodash-es';
import {
@@ -13,6 +12,7 @@ import {
Mutex,
} from '../../../base';
import { retryable } from '../../../base/utils/promise';
import { Models } from '../../../models';
import { DocStorageOptions } from '../options';
import {
DocRecord,
@@ -44,7 +44,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
private readonly logger = new Logger(PgWorkspaceDocStorageAdapter.name);
constructor(
private readonly db: PrismaClient,
private readonly models: Models,
private readonly mutex: Mutex,
private readonly cache: Cache,
private readonly event: EventBus,
@@ -63,7 +63,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
return 0;
}
const isNewDoc = !(await this.docExisted(workspaceId, docId));
const isNewDoc = !(await this.models.doc.exists(workspaceId, docId));
let pendings = updates;
let done = 0;
@@ -74,39 +74,25 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
pendings = pendings.slice(done);
}
// TODO(@forehalo): remove in next release
const lastSeq = await this.getUpdateSeq(
workspaceId,
docId,
updates.length
);
let turn = 0;
const batchCount = 10;
for (const batch of chunk(pendings, batchCount)) {
const now = Date.now();
await this.db.update.createMany({
data: batch.map((update, i) => {
await this.models.doc.createUpdates(
batch.map((update, i) => {
const subSeq = turn * batchCount + i + 1;
// `seq` is the last seq num of the batch
// example for 11 batched updates, start from seq num 20
// seq for first update in the batch should be:
// 31 - 11 + subSeq(0 * 10 + 0 + 1) = 21
// ^ last seq num ^ updates.length ^ turn ^ batchCount ^i
const seq = lastSeq - updates.length + subSeq;
const createdAt = now + subSeq;
timestamp = Math.max(timestamp, createdAt);
return {
workspaceId,
id: docId,
spaceId: workspaceId,
docId,
blob: Buffer.from(update),
seq,
createdAt: new Date(createdAt),
createdBy: editorId || null,
timestamp: createdAt,
editorId,
};
}),
});
})
);
turn++;
done += batch.length;
await this.updateCachedUpdatesCount(workspaceId, docId, batch.length);
@@ -129,90 +115,28 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
}
protected async getDocUpdates(workspaceId: string, docId: string) {
const rows = await this.db.update.findMany({
where: {
workspaceId,
id: docId,
},
orderBy: {
createdAt: 'asc',
},
});
const rows = await this.models.doc.findUpdates(workspaceId, docId);
return rows.map(row => ({
bin: row.blob,
timestamp: row.createdAt.getTime(),
editor: row.createdBy || undefined,
timestamp: row.timestamp,
editor: row.editorId,
}));
}
async deleteDoc(workspaceId: string, docId: string) {
const ident = { where: { workspaceId, id: docId } };
await this.db.$transaction([
this.db.snapshot.deleteMany(ident),
this.db.update.deleteMany(ident),
this.db.snapshotHistory.deleteMany(ident),
]);
await this.models.doc.delete(workspaceId, docId);
}
async deleteSpace(workspaceId: string) {
const ident = { where: { workspaceId } };
await this.db.$transaction([
this.db.snapshot.deleteMany(ident),
this.db.update.deleteMany(ident),
this.db.snapshotHistory.deleteMany(ident),
]);
await this.models.doc.deleteAllByWorkspaceId(workspaceId);
}
async getSpaceDocTimestamps(workspaceId: string, after?: number) {
const snapshots = await this.db.snapshot.findMany({
select: {
id: true,
updatedAt: true,
},
where: {
workspaceId,
...(after
? {
updatedAt: {
gt: new Date(after),
},
}
: {}),
},
});
const updates = await this.db.update.groupBy({
where: {
workspaceId,
...(after
? {
// [createdAt] in updates table is indexed, so it's fast
createdAt: {
gt: new Date(after),
},
}
: {}),
},
by: ['id'],
_max: {
createdAt: true,
},
});
const result: Record<string, number> = {};
snapshots.forEach(s => {
result[s.id] = s.updatedAt.getTime();
});
updates.forEach(u => {
if (u._max.createdAt) {
result[u.id] = u._max.createdAt.getTime();
}
});
return result;
return await this.models.doc.findTimestampsByWorkspaceId(
workspaceId,
after
);
}
protected async markUpdatesMerged(
@@ -220,18 +144,14 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
docId: string,
updates: DocUpdate[]
) {
const result = await this.db.update.deleteMany({
where: {
workspaceId,
id: docId,
createdAt: {
in: updates.map(u => new Date(u.timestamp)),
},
},
});
const count = await this.models.doc.deleteUpdates(
workspaceId,
docId,
updates.map(u => u.timestamp)
);
await this.updateCachedUpdatesCount(workspaceId, docId, -result.count);
return result.count;
await this.updateCachedUpdatesCount(workspaceId, docId, -count);
return count;
}
async listDocHistories(
@@ -239,45 +159,18 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
docId: string,
query: HistoryFilter
) {
const histories = await this.db.snapshotHistory.findMany({
select: {
timestamp: true,
createdByUser: {
select: {
name: true,
avatarUrl: true,
},
},
},
where: {
workspaceId,
id: docId,
timestamp: {
lt: query.before ? new Date(query.before) : new Date(),
},
},
orderBy: {
timestamp: 'desc',
},
return await this.models.doc.findHistories(workspaceId, docId, {
before: query.before,
take: query.limit,
});
return histories.map(h => ({
timestamp: h.timestamp.getTime(),
editor: h.createdByUser,
}));
}
async getDocHistory(workspaceId: string, docId: string, timestamp: number) {
const history = await this.db.snapshotHistory.findUnique({
where: {
workspaceId_id_timestamp: {
workspaceId,
id: docId,
timestamp: new Date(timestamp),
},
},
});
const history = await this.models.doc.getHistory(
workspaceId,
docId,
timestamp
);
if (!history) {
return null;
@@ -287,8 +180,8 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
spaceId: workspaceId,
docId,
bin: history.blob,
timestamp,
editor: history.createdBy || undefined,
timestamp: history.timestamp,
editor: history.editor?.id,
};
}
@@ -342,7 +235,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
// never created
shouldCreateHistory = true;
} else {
const lastHistoryTimestamp = last.timestamp.getTime();
const lastHistoryTimestamp = last.timestamp;
if (lastHistoryTimestamp === snapshot.timestamp) {
// no change
shouldCreateHistory = false;
@@ -376,24 +269,22 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
return false;
}
await this.db.snapshotHistory
.create({
select: {
timestamp: true,
},
data: {
workspaceId: snapshot.spaceId,
id: snapshot.docId,
timestamp: new Date(snapshot.timestamp),
try {
await this.models.doc.createHistory(
{
spaceId: snapshot.spaceId,
docId: snapshot.docId,
timestamp: snapshot.timestamp,
blob: Buffer.from(snapshot.bin),
createdBy: snapshot.editor,
expiredAt: new Date(Date.now() + historyMaxAge),
editorId: snapshot.editor,
},
})
.catch(() => {
// safe to ignore
// only happens when duplicated history record created in multi processes
});
historyMaxAge
);
} catch (e) {
// safe to ignore
// only happens when duplicated history record created in multi processes
this.logger.error('Failed to create history record', e);
}
metrics.doc
.counter('history_created_counter', {
@@ -410,90 +301,35 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
}
protected async getDocSnapshot(workspaceId: string, docId: string) {
const snapshot = await this.db.snapshot.findUnique({
where: {
workspaceId_id: {
workspaceId,
id: docId,
},
},
});
const snapshot = await this.models.doc.get(workspaceId, docId);
if (!snapshot) {
return null;
}
return {
spaceId: workspaceId,
docId,
spaceId: snapshot.spaceId,
docId: snapshot.docId,
bin: snapshot.blob,
timestamp: snapshot.updatedAt.getTime(),
timestamp: snapshot.timestamp,
// creator and editor may null if their account is deleted
editor: snapshot.updatedBy || snapshot.createdBy || undefined,
editor: snapshot.editorId,
};
}
protected async setDocSnapshot(snapshot: DocRecord) {
const { spaceId, docId, bin, timestamp } = snapshot;
if (this.isEmptyBin(bin)) {
if (this.isEmptyBin(snapshot.bin)) {
return false;
}
const updatedAt = new Date(timestamp);
// CONCERNS:
// i. Because we save the real user's last seen action time as `updatedAt`,
// it's possible to simply compare the `updatedAt` to determine if the snapshot is older than the one we are going to save.
//
// ii. Prisma doesn't support `upsert` with additional `where` condition along side unique constraint.
// In our case, we need to manually check the `updatedAt` to avoid overriding the newer snapshot.
// where: { workspaceId_id: {}, updatedAt: { lt: updatedAt } }
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
try {
const result: { updatedAt: Date }[] = await this.db.$queryRaw`
INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "created_at", "updated_at", "created_by", "updated_by")
VALUES (${spaceId}, ${docId}, ${bin}, DEFAULT, ${updatedAt}, ${snapshot.editor}, ${snapshot.editor})
ON CONFLICT ("workspace_id", "guid")
DO UPDATE SET "blob" = ${bin}, "updated_at" = ${updatedAt}, "updated_by" = ${snapshot.editor}
WHERE "snapshots"."workspace_id" = ${spaceId} AND "snapshots"."guid" = ${docId} AND "snapshots"."updated_at" <= ${updatedAt}
RETURNING "snapshots"."workspace_id" as "workspaceId", "snapshots"."guid" as "id", "snapshots"."updated_at" as "updatedAt"
`;
// const result = await this.db.snapshot.upsert({
// select: {
// updatedAt: true,
// seq: true,
// },
// where: {
// workspaceId_id: {
// workspaceId,
// id: guid,
// },
// ⬇️ NOT SUPPORTED BY PRISMA YET
// updatedAt: {
// lt: updatedAt,
// },
// },
// update: {
// blob,
// state,
// updatedAt,
// },
// create: {
// workspaceId,
// id: guid,
// blob,
// state,
// updatedAt,
// seq,
// },
// });
// if the condition `snapshot.updatedAt > updatedAt` is true, by which means the snapshot has already been updated by other process,
// the updates has been applied to current `doc` must have been seen by the other process as well.
// The `updatedSnapshot` will be `undefined` in this case.
const updatedSnapshot = result.at(0);
const updatedSnapshot = await this.models.doc.upsert({
spaceId: snapshot.spaceId,
docId: snapshot.docId,
blob: Buffer.from(snapshot.bin),
timestamp: snapshot.timestamp,
editorId: snapshot.editor,
});
if (updatedSnapshot) {
this.event.emit('doc.snapshot.updated', {
@@ -524,19 +360,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
}
protected async lastDocHistory(workspaceId: string, id: string) {
return this.db.snapshotHistory.findFirst({
where: {
workspaceId,
id,
},
select: {
timestamp: true,
state: true,
},
orderBy: {
timestamp: 'desc',
},
});
return this.models.doc.getLatestHistory(workspaceId, id);
}
// for auto merging
@@ -552,12 +376,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
if (cachedCount > 0) {
const [workspaceId, id] = key.split('::');
const count = await this.db.update.count({
where: {
workspaceId,
id,
},
});
const count = await this.models.doc.getUpdateCount(workspaceId, id);
// FIXME(@forehalo): somehow the update count in cache is not accurate
if (count === 0) {
@@ -593,87 +412,4 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
);
}
}
private async docExisted(workspaceId: string, docId: string) {
const snapshot = await this.db.snapshot.count({
where: {
workspaceId,
id: docId,
},
});
if (snapshot > 0) {
return true;
}
const updates = await this.db.update.count({
where: {
workspaceId,
id: docId,
},
});
return updates > 0;
}
/**
* @deprecated
*/
private readonly seqMap = new Map<string, number>();
/**
*
* @deprecated updates do not rely on seq number anymore
*
* keep in next release to avoid downtime when upgrading instances
*/
private async getUpdateSeq(workspaceId: string, guid: string, batch = 1) {
const MAX_SEQ_NUM = 0x3fffffff; // u31
try {
const { seq } = await this.db.snapshot.update({
select: {
seq: true,
},
where: {
workspaceId_id: {
workspaceId,
id: guid,
},
},
data: {
seq: {
increment: batch,
},
},
});
if (!seq) {
return batch;
}
// reset
if (seq >= MAX_SEQ_NUM) {
await this.db.snapshot.update({
select: {
seq: true,
},
where: {
workspaceId_id: {
workspaceId,
id: guid,
},
},
data: {
seq: 0,
},
});
}
return seq;
} catch {
// not existing snapshot just count it from 1
const last = this.seqMap.get(workspaceId + guid) ?? 0;
this.seqMap.set(workspaceId + guid, last + batch);
return last + batch;
}
}
}

View File

@@ -5,10 +5,7 @@ import type { Update } from '@prisma/client';
import { BaseModel } from './base';
import type { Doc, DocEditor } from './common';
export interface DocRecord extends Doc {
// TODO: deprecated field, remove in the future
seq: number | null;
}
export interface DocRecord extends Doc {}
export interface DocHistorySimple {
timestamp: number;
@@ -53,7 +50,6 @@ export class DocModel extends BaseModel {
blob: row.blob,
timestamp: row.createdAt.getTime(),
editorId: row.createdBy || undefined,
seq: row.seq,
};
}
@@ -64,7 +60,7 @@ export class DocModel extends BaseModel {
blob: record.blob,
createdAt: new Date(record.timestamp),
createdBy: record.editorId || null,
seq: record.seq,
seq: null,
};
}
@@ -344,49 +340,21 @@ export class DocModel extends BaseModel {
}
/**
* @deprecated updates do not rely on seq number anymore
* Detect a doc exists or not, including updates
*/
async increaseSeq(workspaceId: string, docId: string, increment: number) {
const MAX_SEQ_NUM = 0x3fffffff; // u31
const { seq } = await this.db.snapshot.update({
select: {
seq: true,
},
async exists(workspaceId: string, docId: string) {
const count = await this.db.snapshot.count({
where: {
workspaceId_id: {
workspaceId,
id: docId,
},
},
data: {
seq: {
increment,
},
workspaceId,
id: docId,
},
});
if (!seq) {
return increment;
if (count > 0) {
return true;
}
// reset
if (seq >= MAX_SEQ_NUM) {
await this.db.snapshot.update({
select: {
seq: true,
},
where: {
workspaceId_id: {
workspaceId,
id: docId,
},
},
data: {
seq: 0,
},
});
}
return seq;
const updateCount = await this.getUpdateCount(workspaceId, docId);
return updateCount > 0;
}
/**