refactor(server): deprecate unstable redis manager (#4567)

This commit is contained in:
liuyi
2023-10-10 11:23:12 +08:00
committed by GitHub
parent 4a6cfedc4a
commit 0092a19812
9 changed files with 337 additions and 248 deletions

View File

@@ -0,0 +1,19 @@
/*
Warnings:
- A unique constraint covering the columns `[workspace_id,guid,seq]` on the table `updates` will be added. If there are existing duplicate values, this will fail.
- Added the required column `seq` to the `updates` table without a default value. This is not possible if the table is not empty.
*/
-- DropIndex
DROP INDEX "updates_guid_workspace_id_idx";
-- AlterTable
ALTER TABLE "snapshots" ADD COLUMN "seq" INTEGER NOT NULL DEFAULT 0,
ADD COLUMN "state" BYTEA;
-- AlterTable
ALTER TABLE "updates" ADD COLUMN "seq" INTEGER NOT NULL;
-- CreateIndex
CREATE UNIQUE INDEX "updates_workspace_id_guid_seq_key" ON "updates"("workspace_id", "guid", "seq");

View File

@@ -134,6 +134,8 @@ model Snapshot {
id String @default(uuid()) @map("guid") @db.VarChar
workspaceId String @map("workspace_id") @db.VarChar
blob Bytes @db.ByteA
seq Int @default(0) @db.Integer
state Bytes? @db.ByteA
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(6)
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz(6)
@@ -144,12 +146,13 @@ model Snapshot {
// backup during other update operation queue downtime
model Update {
objectId String @id @default(uuid()) @map("object_id") @db.VarChar
id String @map("guid") @db.VarChar
workspaceId String @map("workspace_id") @db.VarChar
id String @map("guid") @db.VarChar
seq Int @db.Integer
blob Bytes @db.ByteA
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz(6)
@@index([id, workspaceId])
@@unique([workspaceId, id, seq])
@@map("updates")
}

View File

@@ -5,7 +5,9 @@ import {
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { applyUpdate, Doc, encodeStateAsUpdate } from 'yjs';
import { Snapshot, Update } from '@prisma/client';
import { defer, retry } from 'rxjs';
import { applyUpdate, Doc, encodeStateAsUpdate, encodeStateVector } from 'yjs';
import { Config } from '../../config';
import { Metrics } from '../../metrics/metrics';
@@ -29,6 +31,8 @@ function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean {
return compare(yBinary, yBinary2, true);
}
const MAX_SEQ_NUM = 0x3fffffff; // u31
/**
* Since we can't directly save all client updates into database, in which way the database will overload,
* we need to buffer the updates and merge them to reduce db write.
@@ -36,13 +40,12 @@ function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean {
* And also, if a new client join, it would be nice to see the latest doc asap,
* so we need to at least store a snapshot of the doc and return quickly,
* along side all the updates that have not been applies to that snapshot(timestamp).
*
* @see [RedisUpdateManager](./redis-manager.ts) - redis backed manager
*/
@Injectable()
export class DocManager implements OnModuleInit, OnModuleDestroy {
protected logger = new Logger(DocManager.name);
private job: NodeJS.Timeout | null = null;
private seqMap = new Map<string, number>();
private busy = false;
constructor(
@@ -84,17 +87,14 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
return doc;
}
protected yjsMergeUpdates(...updates: Buffer[]): Buffer {
protected applyUpdates(guid: string, ...updates: Buffer[]): Doc {
const doc = this.recoverDoc(...updates);
return Buffer.from(encodeStateAsUpdate(doc));
}
protected mergeUpdates(guid: string, ...updates: Buffer[]): Buffer {
const yjsResult = this.yjsMergeUpdates(...updates);
this.metrics.jwstCodecMerge(1, {});
let log = false;
// test jwst codec
if (this.config.doc.manager.experimentalMergeWithJwstCodec) {
const yjsResult = Buffer.from(encodeStateAsUpdate(doc));
let log = false;
try {
const jwstResult = jwstMergeUpdates(updates);
if (!compare(yjsResult, jwstResult)) {
@@ -121,7 +121,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
}
}
return yjsResult;
return doc;
}
/**
@@ -131,7 +131,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
this.job = setInterval(() => {
if (!this.busy) {
this.busy = true;
this.apply()
this.autoSquash()
.catch(() => {
/* we handle all errors in work itself */
})
@@ -161,185 +161,146 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
}
/**
* add update to manager for later processing like fast merging.
* add update to manager for later processing.
*/
async push(workspaceId: string, guid: string, update: Buffer) {
await this.db.update.create({
data: {
workspaceId,
id: guid,
blob: update,
},
await new Promise<void>((resolve, reject) => {
defer(async () => {
const seq = await this.getUpdateSeq(workspaceId, guid);
await this.db.update.create({
data: {
workspaceId,
id: guid,
seq,
blob: update,
},
});
})
.pipe(retry(MAX_SEQ_NUM)) // retry until seq num not conflict
.subscribe({
next: () => {
this.logger.verbose(
`pushed update for workspace: ${workspaceId}, guid: ${guid}`
);
resolve();
},
error: reject,
});
});
}
this.logger.verbose(
`pushed update for workspace: ${workspaceId}, guid: ${guid}`
);
/**
* get the latest doc with all update applied.
*/
async get(workspaceId: string, guid: string): Promise<Doc | null> {
const result = await this._get(workspaceId, guid);
if (result) {
if ('doc' in result) {
return result.doc;
} else if ('snapshot' in result) {
return this.recoverDoc(result.snapshot);
}
}
return null;
}
/**
* get the latest doc binary with all update applied.
*/
async getBinary(workspaceId: string, guid: string): Promise<Buffer | null> {
const result = await this._get(workspaceId, guid);
if (result) {
if ('doc' in result) {
return Buffer.from(encodeStateAsUpdate(result.doc));
} else if ('snapshot' in result) {
return result.snapshot;
}
}
return null;
}
/**
* get the latest doc state vector with all update applied.
*/
async getState(workspaceId: string, guid: string): Promise<Buffer | null> {
const snapshot = await this.getSnapshot(workspaceId, guid);
const updates = await this.getUpdates(workspaceId, guid);
if (updates.length) {
const doc = await this.squash(updates, snapshot);
return Buffer.from(encodeStateVector(doc));
}
return snapshot ? snapshot.state : null;
}
/**
* get the snapshot of the doc we've seen.
*/
async getSnapshot(
workspaceId: string,
guid: string
): Promise<Buffer | undefined> {
const snapshot = await this.db.snapshot.findFirst({
protected async getSnapshot(workspaceId: string, guid: string) {
return this.db.snapshot.findUnique({
where: {
workspaceId,
id: guid,
id_workspaceId: {
workspaceId,
id: guid,
},
},
});
return snapshot?.blob;
}
/**
* get pending updates
*/
async getUpdates(workspaceId: string, guid: string): Promise<Buffer[]> {
const updates = await this.db.update.findMany({
protected async getUpdates(workspaceId: string, guid: string) {
return this.db.update.findMany({
where: {
workspaceId,
id: guid,
},
orderBy: {
seq: 'asc',
},
});
return updates.map(update => update.blob);
}
/**
* get the latest doc with all update applied.
*
* latest = snapshot + updates
*/
async getLatest(workspaceId: string, guid: string): Promise<Doc | undefined> {
const snapshot = await this.getSnapshot(workspaceId, guid);
const updates = await this.getUpdates(workspaceId, guid);
if (updates.length) {
if (snapshot) {
return this.recoverDoc(snapshot, ...updates);
} else {
return this.recoverDoc(...updates);
}
}
if (snapshot) {
return this.recoverDoc(snapshot);
}
return undefined;
}
/**
* get the latest doc and convert it to update binary
*/
async getLatestUpdate(
workspaceId: string,
guid: string
): Promise<Buffer | undefined> {
const doc = await this.getLatest(workspaceId, guid);
return doc ? Buffer.from(encodeStateAsUpdate(doc)) : undefined;
}
/**
* apply pending updates to snapshot
*/
async apply() {
const updates = await this.db
.$transaction(async db => {
// find the first update and batch process updates with same id
const first = await db.update.findFirst({
orderBy: {
createdAt: 'asc',
},
});
protected async autoSquash() {
// find the first update and batch process updates with same id
const first = await this.db.update.findFirst({
orderBy: {
createdAt: 'asc',
},
});
// no pending updates
if (!first) {
return;
}
const { id, workspaceId } = first;
const updates = await db.update.findMany({
where: {
id,
workspaceId,
},
});
// no pending updates
if (!updates.length) {
return;
}
// remove update that will be merged later
await db.update.deleteMany({
where: {
id,
workspaceId,
},
});
return updates;
})
.catch(
// transaction failed, it's safe to ignore
e => {
this.logger.error(`Failed to fetch updates: ${e}`);
}
);
// we put update merging logic outside transaction will make the processing more complex,
// but it's better to do so, since the merging may takes a lot of time,
// which may slow down the whole db.
if (!updates?.length) {
// no pending updates
if (!first) {
return;
}
const { id, workspaceId } = updates[0];
this.logger.verbose(
`applying ${updates.length} updates for workspace: ${workspaceId}, guid: ${id}`
);
const { id, workspaceId } = first;
try {
const snapshot = await this.db.snapshot.findFirst({
where: {
workspaceId,
id,
},
});
// merge updates
const merged = snapshot
? this.mergeUpdates(id, snapshot.blob, ...updates.map(u => u.blob))
: this.mergeUpdates(id, ...updates.map(u => u.blob));
// save snapshot
await this.upsert(workspaceId, id, merged);
await this._get(workspaceId, id);
} catch (e) {
// failed to merge updates, put them back
this.logger.error(`Failed to merge updates: ${e}`);
await this.db.update
.createMany({
data: updates.map(u => ({
id: u.id,
workspaceId: u.workspaceId,
blob: u.blob,
})),
})
.catch(e => {
// failed to recover, fallback TBD
this.logger.error(`Fetal: failed to put updates back to db: ${e}`);
});
this.logger.error(
`Failed to apply updates for workspace: ${workspaceId}, guid: ${id}`
);
this.logger.error(e);
}
}
protected async upsert(workspaceId: string, guid: string, blob: Buffer) {
protected async upsert(
workspaceId: string,
guid: string,
doc: Doc,
seq?: number
) {
const blob = Buffer.from(encodeStateAsUpdate(doc));
const state = Buffer.from(encodeStateVector(doc));
return this.db.snapshot.upsert({
where: {
id_workspaceId: {
@@ -351,10 +312,103 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
id: guid,
workspaceId,
blob,
state,
seq,
},
update: {
blob,
state,
},
});
}
protected async _get(
workspaceId: string,
guid: string
): Promise<{ doc: Doc } | { snapshot: Buffer } | null> {
const snapshot = await this.getSnapshot(workspaceId, guid);
const updates = await this.getUpdates(workspaceId, guid);
if (updates.length) {
return {
doc: await this.squash(updates, snapshot),
};
}
return snapshot ? { snapshot: snapshot.blob } : null;
}
/**
* Squash updates into a single update and save it as snapshot,
* and delete the updates records at the same time.
*/
protected async squash(updates: Update[], snapshot: Snapshot | null) {
if (!updates.length) {
throw new Error('No updates to squash');
}
const first = updates[0];
const last = updates[updates.length - 1];
const doc = this.applyUpdates(
first.id,
snapshot ? snapshot.blob : Buffer.from([0, 0]),
...updates.map(u => u.blob)
);
const { id, workspaceId } = first;
await this.upsert(workspaceId, id, doc, last.seq);
await this.db.update.deleteMany({
where: {
id,
workspaceId,
seq: {
in: updates.map(u => u.seq),
},
},
});
return doc;
}
private async getUpdateSeq(workspaceId: string, guid: string) {
try {
const { seq } = await this.db.snapshot.update({
select: {
seq: true,
},
where: {
id_workspaceId: {
workspaceId,
id: guid,
},
},
data: {
seq: {
increment: 1,
},
},
});
// reset
if (seq === MAX_SEQ_NUM) {
await this.db.snapshot.update({
where: {
id_workspaceId: {
workspaceId,
id: guid,
},
},
data: {
seq: 0,
},
});
}
return seq;
} catch {
const last = this.seqMap.get(workspaceId + guid) ?? 0;
this.seqMap.set(workspaceId + guid, last + 1);
return last + 1;
}
}
}

View File

@@ -23,6 +23,9 @@ const pushUpdateLua = `
redis.call('rpush', KEYS[2], ARGV[2])
`;
/**
* @deprecated unstable
*/
@Injectable()
export class RedisDocManager extends DocManager {
private readonly redis: Redis;
@@ -44,41 +47,15 @@ export class RedisDocManager extends DocManager {
override onModuleInit(): void {
if (this.automation) {
this.logger.log('Use Redis');
this.setup();
}
}
override async push(workspaceId: string, guid: string, update: Buffer) {
try {
const key = `${workspaceId}:${guid}`;
// @ts-expect-error custom command
this.redis.pushDocUpdate(pending, updates`${key}`, key, update);
this.logger.verbose(
`pushed update for workspace: ${workspaceId}, guid: ${guid}`
);
} catch (e) {
return await super.push(workspaceId, guid, update);
}
}
override async getUpdates(
workspaceId: string,
guid: string
): Promise<Buffer[]> {
try {
return this.redis.lrangeBuffer(updates`${workspaceId}:${guid}`, 0, -1);
} catch (e) {
return super.getUpdates(workspaceId, guid);
}
}
override async apply(): Promise<void> {
override async autoSquash(): Promise<void> {
// incase some update fallback to db
await super.apply();
await super.autoSquash();
// consume rest updates in redis queue
const pendingDoc = await this.redis.spop(pending).catch(() => null); // safe
if (!pendingDoc) {
@@ -127,13 +104,12 @@ export class RedisDocManager extends DocManager {
const snapshot = await this.getSnapshot(workspaceId, id);
// merge
const blob = snapshot
? this.mergeUpdates(id, snapshot, ...updates)
: this.mergeUpdates(id, ...updates);
const doc = snapshot
? this.applyUpdates(id, snapshot.blob, ...updates)
: this.applyUpdates(id, ...updates);
// update snapshot
await this.upsert(workspaceId, id, blob);
await this.upsert(workspaceId, id, doc, snapshot?.seq);
// delete merged updates
await this.redis

View File

@@ -138,7 +138,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
}
const guid = trimGuid(message.workspaceId, message.guid);
const doc = await this.docManager.getLatest(message.workspaceId, guid);
const doc = await this.docManager.get(message.workspaceId, guid);
if (!doc) {
endTimer();

View File

@@ -3,10 +3,9 @@ import { Module } from '@nestjs/common';
import { DocModule } from '../../doc';
import { PermissionService } from '../../workspaces/permission';
import { EventsGateway } from './events.gateway';
import { WorkspaceService } from './workspace';
@Module({
imports: [DocModule.forFeature()],
providers: [EventsGateway, PermissionService, WorkspaceService],
providers: [EventsGateway, PermissionService],
})
export class EventsModule {}

View File

@@ -1,48 +0,0 @@
import { Injectable } from '@nestjs/common';
import { Doc, encodeStateAsUpdate } from 'yjs';
import { DocManager } from '../../doc';
import { assertExists } from '../utils';
@Injectable()
export class WorkspaceService {
constructor(private readonly docManager: DocManager) {}
async getDocsFromWorkspaceId(workspaceId: string): Promise<
Array<{
guid: string;
update: Buffer;
}>
> {
const docs: Array<{
guid: string;
update: Buffer;
}> = [];
const queue: Array<[string, Doc]> = [];
// Workspace Doc's guid is the same as workspaceId. This is achieved by when creating a new workspace, the doc guid
// is manually set to workspaceId.
const doc = await this.docManager.getLatest(workspaceId, workspaceId);
if (doc) {
queue.push([workspaceId, doc]);
}
while (queue.length > 0) {
const head = queue.pop();
assertExists(head);
const [guid, doc] = head;
docs.push({
guid: guid,
update: Buffer.from(encodeStateAsUpdate(doc)),
});
for (const { guid } of doc.subdocs) {
const subDoc = await this.docManager.getLatest(workspaceId, guid);
if (subDoc) {
queue.push([guid, subDoc]);
}
}
}
return docs;
}
}

View File

@@ -72,7 +72,7 @@ export class WorkspacesController {
throw new ForbiddenException('Permission denied');
}
const update = await this.docManager.getLatestUpdate(ws, id);
const update = await this.docManager.getBinary(ws, id);
if (!update) {
throw new NotFoundException('Doc not found');

View File

@@ -75,7 +75,8 @@ test('should poll when intervel due', async t => {
const interval = m.get(Config).doc.manager.updatePollInterval;
let resolve: any;
const fake = mock.method(manager, 'apply', () => {
// @ts-expect-error private method
const fake = mock.method(manager, 'autoSquash', () => {
return new Promise(_resolve => {
resolve = _resolve;
});
@@ -121,19 +122,22 @@ test('should merge update when intervel due', async t => {
id: '1',
workspaceId: '1',
blob: Buffer.from([0, 0]),
seq: 1,
},
{
id: '1',
workspaceId: '1',
blob: Buffer.from(update),
seq: 2,
},
],
});
await manager.apply();
// @ts-expect-error private method
await manager.autoSquash();
t.deepEqual(
(await manager.getLatestUpdate(ws.id, '1'))?.toString('hex'),
(await manager.getBinary(ws.id, '1'))?.toString('hex'),
Buffer.from(update.buffer).toString('hex')
);
@@ -148,13 +152,95 @@ test('should merge update when intervel due', async t => {
workspaceId: ws.id,
id: '1',
blob: appendUpdate,
seq: 3,
},
});
await manager.apply();
// @ts-expect-error private method
await manager.autoSquash();
t.deepEqual(
(await manager.getLatestUpdate(ws.id, '1'))?.toString('hex'),
(await manager.getBinary(ws.id, '1'))?.toString('hex'),
Buffer.from(encodeStateAsUpdate(doc)).toString('hex')
);
});
test('should have sequential update number', async t => {
const db = m.get(PrismaService);
const manager = m.get(DocManager);
const doc = new YDoc();
const text = doc.getText('content');
const updates: Buffer[] = [];
doc.on('update', update => {
updates.push(Buffer.from(update));
});
text.insert(0, 'hello');
text.insert(5, 'world');
text.insert(5, ' ');
await Promise.all(updates.map(update => manager.push('2', '2', update)));
// [1,2,3]
// @ts-expect-error private method
let records = await manager.getUpdates('2', '2');
t.deepEqual(
records.map(({ seq }) => seq),
[1, 2, 3]
);
// @ts-expect-error private method
await manager.autoSquash();
await db.snapshot.update({
where: {
id_workspaceId: {
id: '2',
workspaceId: '2',
},
},
data: {
seq: 0x3ffffffe,
},
});
await Promise.all(updates.map(update => manager.push('2', '2', update)));
// @ts-expect-error private method
records = await manager.getUpdates('2', '2');
// push a new update with new seq num
await manager.push('2', '2', updates[0]);
// let the manager ignore update with the new seq num
// @ts-expect-error private method
const stub = Sinon.stub(manager, 'getUpdates').resolves(records);
// @ts-expect-error private method
await manager.autoSquash();
stub.restore();
// @ts-expect-error private method
records = await manager.getUpdates('2', '2');
// should not merge in one run
t.not(records.length, 0);
});
test('should retry if seq num conflict', async t => {
const manager = m.get(DocManager);
// @ts-expect-error private method
const stub = Sinon.stub(manager, 'getUpdateSeq');
stub.onCall(0).resolves(1);
// seq num conflict
stub.onCall(1).resolves(1);
stub.onCall(2).resolves(2);
await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0])));
await t.notThrowsAsync(() => manager.push('1', '1', Buffer.from([0, 0])));
t.is(stub.callCount, 3);
});