diff --git a/packages/backend/server/src/modules/doc/manager.ts b/packages/backend/server/src/modules/doc/manager.ts index c97027f2cd..e82932a8e7 100644 --- a/packages/backend/server/src/modules/doc/manager.ts +++ b/packages/backend/server/src/modules/doc/manager.ts @@ -16,6 +16,7 @@ import { transact, } from 'yjs'; +import { Cache } from '../../cache'; import { Config } from '../../config'; import { Metrics } from '../../metrics/metrics'; import { PrismaService } from '../../prisma'; @@ -58,17 +59,18 @@ const MAX_SEQ_NUM = 0x3fffffff; // u31 */ @Injectable() export class DocManager implements OnModuleInit, OnModuleDestroy { - protected logger = new Logger(DocManager.name); + private logger = new Logger(DocManager.name); private job: NodeJS.Timeout | null = null; private seqMap = new Map(); private busy = false; constructor( - protected readonly db: PrismaService, @Inject('DOC_MANAGER_AUTOMATION') - protected readonly automation: boolean, - protected readonly config: Config, - protected readonly metrics: Metrics + private readonly automation: boolean, + private readonly db: PrismaService, + private readonly config: Config, + private readonly metrics: Metrics, + private readonly cache: Cache ) {} onModuleInit() { @@ -82,7 +84,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { this.destroy(); } - protected recoverDoc(...updates: Buffer[]): Promise { + private recoverDoc(...updates: Buffer[]): Promise { const doc = new Doc(); const chunks = chunk(updates, 10); @@ -95,11 +97,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { try { applyUpdate(doc, u); } catch (e) { - this.logger.error( - `Failed to apply update: ${updates - .map(u => u.toString('hex')) - .join('\n')}` - ); + this.logger.error('Failed to apply update', e); } }); }); @@ -117,14 +115,12 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { }); } - protected async applyUpdates( - guid: string, - ...updates: Buffer[] - ): Promise { + private async applyUpdates(guid: string, ...updates: Buffer[]): Promise { const doc = await this.recoverDoc(...updates); // test jwst codec if ( + this.config.affine.canary && this.config.doc.manager.experimentalMergeWithJwstCodec && updates.length < 100 /* avoid overloading */ ) { @@ -149,7 +145,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { this.logger.warn(`jwst apply update failed for ${guid}: ${e}`); log = true; } finally { - if (log) { + if (log && this.config.node.dev) { this.logger.warn( `Updates: ${updates.map(u => u.toString('hex')).join('\n')}` ); @@ -223,8 +219,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { .pipe(retry(retryTimes)) // retry until seq num not conflict .subscribe({ next: () => { - this.logger.verbose( - `pushed update for workspace: ${workspaceId}, guid: ${guid}` + this.logger.debug( + `pushed 1 update for ${guid} in workspace ${workspaceId}` ); resolve(); }, @@ -233,6 +229,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { reject(new Error('Failed to push update')); }, }); + }).then(() => { + return this.updateCachedUpdatesCount(workspaceId, guid, 1); }); } @@ -267,8 +265,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { .pipe(retry(retryTimes)) // retry until seq num not conflict .subscribe({ next: () => { - this.logger.verbose( - `pushed updates for workspace: ${workspaceId}, guid: ${guid}` + this.logger.debug( + `pushed ${updates.length} updates for ${guid} in workspace ${workspaceId}` ); resolve(); }, @@ -277,6 +275,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { reject(new Error('Failed to push update')); }, }); + }).then(() => { + return this.updateCachedUpdatesCount(workspaceId, guid, updates.length); }); } @@ -363,21 +363,22 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { /** * apply pending updates to snapshot */ - protected async autoSquash() { + private async autoSquash() { // find the first update and batch process updates with same id - const first = await this.db.update.findFirst({ - select: { - id: true, - workspaceId: true, - }, - }); + const candidate = await this.getAutoSquashCandidate(); // no pending updates - if (!first) { + if (!candidate) { return; } - const { id, workspaceId } = first; + const { id, workspaceId } = candidate; + // acquire lock + const ok = await this.lockUpdatesForAutoSquash(workspaceId, id); + + if (!ok) { + return; + } try { await this._get(workspaceId, id); @@ -386,10 +387,27 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { `Failed to apply updates for workspace: ${workspaceId}, guid: ${id}` ); this.logger.error(e); + } finally { + await this.unlockUpdatesForAutoSquash(workspaceId, id); } } - protected async upsert( + private async getAutoSquashCandidate() { + const cache = await this.getAutoSquashCandidateFromCache(); + + if (cache) { + return cache; + } + + return this.db.update.findFirst({ + select: { + id: true, + workspaceId: true, + }, + }); + } + + private async upsert( workspaceId: string, guid: string, doc: Doc, @@ -426,7 +444,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { }); } - protected async _get( + private async _get( workspaceId: string, guid: string ): Promise<{ doc: Doc } | { snapshot: Buffer } | null> { @@ -446,22 +464,25 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { * Squash updates into a single update and save it as snapshot, * and delete the updates records at the same time. */ - protected async squash(updates: Update[], snapshot: Snapshot | null) { + private async squash(updates: Update[], snapshot: Snapshot | null) { if (!updates.length) { throw new Error('No updates to squash'); } const first = updates[0]; const last = updates[updates.length - 1]; + const { id, workspaceId } = first; + const doc = await this.applyUpdates( first.id, snapshot ? snapshot.blob : Buffer.from([0, 0]), ...updates.map(u => u.blob) ); - const { id, workspaceId } = first; - await this.upsert(workspaceId, id, doc, last.seq); + this.logger.debug( + `Squashed ${updates.length} updates for ${id} in workspace ${workspaceId}` + ); await this.db.update.deleteMany({ where: { id, @@ -471,6 +492,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { }, }, }); + + await this.updateCachedUpdatesCount(workspaceId, id, -updates.length); return doc; } @@ -516,4 +539,56 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { return last + batch; } } + + private async updateCachedUpdatesCount( + workspaceId: string, + guid: string, + count: number + ) { + const result = await this.cache.mapIncrease( + `doc:manager:updates`, + `${workspaceId}::${guid}`, + count + ); + + if (result <= 0) { + await this.cache.mapDelete( + `doc:manager:updates`, + `${workspaceId}::${guid}` + ); + } + } + + private async getAutoSquashCandidateFromCache() { + const key = await this.cache.mapRandomKey('doc:manager:updates'); + + if (key) { + const count = await this.cache.mapGet('doc:manager:updates', key); + if (typeof count === 'number' && count > 0) { + const [workspaceId, id] = key.split('::'); + return { id, workspaceId }; + } + } + + return null; + } + + private async lockUpdatesForAutoSquash(workspaceId: string, guid: string) { + return this.cache.setnx( + `doc:manager:updates-lock:${workspaceId}::${guid}`, + 1, + { + ttl: 60 * 1000, + } + ); + } + + private async unlockUpdatesForAutoSquash(workspaceId: string, guid: string) { + return this.cache + .delete(`doc:manager:updates-lock:${workspaceId}::${guid}`) + .catch(e => { + // safe, the lock will be expired when ttl ends + this.logger.error('Failed to release updates lock', e); + }); + } } diff --git a/packages/backend/server/tests/doc.spec.ts b/packages/backend/server/tests/doc.spec.ts index 5f18bedbed..2ca8ced837 100644 --- a/packages/backend/server/tests/doc.spec.ts +++ b/packages/backend/server/tests/doc.spec.ts @@ -7,6 +7,7 @@ import { register } from 'prom-client'; import * as Sinon from 'sinon'; import { Doc as YDoc, encodeStateAsUpdate } from 'yjs'; +import { CacheModule } from '../src/cache'; import { Config, ConfigModule } from '../src/config'; import { MetricsModule } from '../src/metrics'; import { DocManager, DocModule } from '../src/modules/doc'; @@ -18,6 +19,7 @@ const createModule = () => { imports: [ PrismaModule, MetricsModule, + CacheModule, ConfigModule.forRoot(), DocModule.forRoot(), ],