feat(server): reduce duplidated merge with cache (#4975)

This commit is contained in:
liuyi
2023-11-22 04:09:06 +00:00
parent c69e542b98
commit 525b196cae
2 changed files with 111 additions and 34 deletions

View File

@@ -16,6 +16,7 @@ import {
transact,
} from 'yjs';
import { Cache } from '../../cache';
import { Config } from '../../config';
import { Metrics } from '../../metrics/metrics';
import { PrismaService } from '../../prisma';
@@ -58,17 +59,18 @@ const MAX_SEQ_NUM = 0x3fffffff; // u31
*/
@Injectable()
export class DocManager implements OnModuleInit, OnModuleDestroy {
protected logger = new Logger(DocManager.name);
private logger = new Logger(DocManager.name);
private job: NodeJS.Timeout | null = null;
private seqMap = new Map<string, number>();
private busy = false;
constructor(
protected readonly db: PrismaService,
@Inject('DOC_MANAGER_AUTOMATION')
protected readonly automation: boolean,
protected readonly config: Config,
protected readonly metrics: Metrics
private readonly automation: boolean,
private readonly db: PrismaService,
private readonly config: Config,
private readonly metrics: Metrics,
private readonly cache: Cache
) {}
onModuleInit() {
@@ -82,7 +84,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
this.destroy();
}
protected recoverDoc(...updates: Buffer[]): Promise<Doc> {
private recoverDoc(...updates: Buffer[]): Promise<Doc> {
const doc = new Doc();
const chunks = chunk(updates, 10);
@@ -95,11 +97,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
try {
applyUpdate(doc, u);
} catch (e) {
this.logger.error(
`Failed to apply update: ${updates
.map(u => u.toString('hex'))
.join('\n')}`
);
this.logger.error('Failed to apply update', e);
}
});
});
@@ -117,14 +115,12 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
});
}
protected async applyUpdates(
guid: string,
...updates: Buffer[]
): Promise<Doc> {
private async applyUpdates(guid: string, ...updates: Buffer[]): Promise<Doc> {
const doc = await this.recoverDoc(...updates);
// test jwst codec
if (
this.config.affine.canary &&
this.config.doc.manager.experimentalMergeWithJwstCodec &&
updates.length < 100 /* avoid overloading */
) {
@@ -149,7 +145,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
this.logger.warn(`jwst apply update failed for ${guid}: ${e}`);
log = true;
} finally {
if (log) {
if (log && this.config.node.dev) {
this.logger.warn(
`Updates: ${updates.map(u => u.toString('hex')).join('\n')}`
);
@@ -223,8 +219,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
.pipe(retry(retryTimes)) // retry until seq num not conflict
.subscribe({
next: () => {
this.logger.verbose(
`pushed update for workspace: ${workspaceId}, guid: ${guid}`
this.logger.debug(
`pushed 1 update for ${guid} in workspace ${workspaceId}`
);
resolve();
},
@@ -233,6 +229,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
reject(new Error('Failed to push update'));
},
});
}).then(() => {
return this.updateCachedUpdatesCount(workspaceId, guid, 1);
});
}
@@ -267,8 +265,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
.pipe(retry(retryTimes)) // retry until seq num not conflict
.subscribe({
next: () => {
this.logger.verbose(
`pushed updates for workspace: ${workspaceId}, guid: ${guid}`
this.logger.debug(
`pushed ${updates.length} updates for ${guid} in workspace ${workspaceId}`
);
resolve();
},
@@ -277,6 +275,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
reject(new Error('Failed to push update'));
},
});
}).then(() => {
return this.updateCachedUpdatesCount(workspaceId, guid, updates.length);
});
}
@@ -363,21 +363,22 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
/**
* apply pending updates to snapshot
*/
protected async autoSquash() {
private async autoSquash() {
// find the first update and batch process updates with same id
const first = await this.db.update.findFirst({
select: {
id: true,
workspaceId: true,
},
});
const candidate = await this.getAutoSquashCandidate();
// no pending updates
if (!first) {
if (!candidate) {
return;
}
const { id, workspaceId } = first;
const { id, workspaceId } = candidate;
// acquire lock
const ok = await this.lockUpdatesForAutoSquash(workspaceId, id);
if (!ok) {
return;
}
try {
await this._get(workspaceId, id);
@@ -386,10 +387,27 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
`Failed to apply updates for workspace: ${workspaceId}, guid: ${id}`
);
this.logger.error(e);
} finally {
await this.unlockUpdatesForAutoSquash(workspaceId, id);
}
}
protected async upsert(
private async getAutoSquashCandidate() {
const cache = await this.getAutoSquashCandidateFromCache();
if (cache) {
return cache;
}
return this.db.update.findFirst({
select: {
id: true,
workspaceId: true,
},
});
}
private async upsert(
workspaceId: string,
guid: string,
doc: Doc,
@@ -426,7 +444,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
});
}
protected async _get(
private async _get(
workspaceId: string,
guid: string
): Promise<{ doc: Doc } | { snapshot: Buffer } | null> {
@@ -446,22 +464,25 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
* Squash updates into a single update and save it as snapshot,
* and delete the updates records at the same time.
*/
protected async squash(updates: Update[], snapshot: Snapshot | null) {
private async squash(updates: Update[], snapshot: Snapshot | null) {
if (!updates.length) {
throw new Error('No updates to squash');
}
const first = updates[0];
const last = updates[updates.length - 1];
const { id, workspaceId } = first;
const doc = await this.applyUpdates(
first.id,
snapshot ? snapshot.blob : Buffer.from([0, 0]),
...updates.map(u => u.blob)
);
const { id, workspaceId } = first;
await this.upsert(workspaceId, id, doc, last.seq);
this.logger.debug(
`Squashed ${updates.length} updates for ${id} in workspace ${workspaceId}`
);
await this.db.update.deleteMany({
where: {
id,
@@ -471,6 +492,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
},
},
});
await this.updateCachedUpdatesCount(workspaceId, id, -updates.length);
return doc;
}
@@ -516,4 +539,56 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
return last + batch;
}
}
private async updateCachedUpdatesCount(
workspaceId: string,
guid: string,
count: number
) {
const result = await this.cache.mapIncrease(
`doc:manager:updates`,
`${workspaceId}::${guid}`,
count
);
if (result <= 0) {
await this.cache.mapDelete(
`doc:manager:updates`,
`${workspaceId}::${guid}`
);
}
}
private async getAutoSquashCandidateFromCache() {
const key = await this.cache.mapRandomKey('doc:manager:updates');
if (key) {
const count = await this.cache.mapGet<number>('doc:manager:updates', key);
if (typeof count === 'number' && count > 0) {
const [workspaceId, id] = key.split('::');
return { id, workspaceId };
}
}
return null;
}
private async lockUpdatesForAutoSquash(workspaceId: string, guid: string) {
return this.cache.setnx(
`doc:manager:updates-lock:${workspaceId}::${guid}`,
1,
{
ttl: 60 * 1000,
}
);
}
private async unlockUpdatesForAutoSquash(workspaceId: string, guid: string) {
return this.cache
.delete(`doc:manager:updates-lock:${workspaceId}::${guid}`)
.catch(e => {
// safe, the lock will be expired when ttl ends
this.logger.error('Failed to release updates lock', e);
});
}
}

View File

@@ -7,6 +7,7 @@ import { register } from 'prom-client';
import * as Sinon from 'sinon';
import { Doc as YDoc, encodeStateAsUpdate } from 'yjs';
import { CacheModule } from '../src/cache';
import { Config, ConfigModule } from '../src/config';
import { MetricsModule } from '../src/metrics';
import { DocManager, DocModule } from '../src/modules/doc';
@@ -18,6 +19,7 @@ const createModule = () => {
imports: [
PrismaModule,
MetricsModule,
CacheModule,
ConfigModule.forRoot(),
DocModule.forRoot(),
],