diff --git a/packages/backend/server/src/fundamentals/index.ts b/packages/backend/server/src/fundamentals/index.ts index 9c125c7cca..729ea3f9ee 100644 --- a/packages/backend/server/src/fundamentals/index.ts +++ b/packages/backend/server/src/fundamentals/index.ts @@ -18,13 +18,7 @@ export type { GraphqlContext } from './graphql'; export { CryptoHelper, URLHelper } from './helpers'; export { MailService } from './mailer'; export { CallCounter, CallTimer, metrics } from './metrics'; -export { - BucketService, - LockGuard, - MUTEX_RETRY, - MUTEX_WAIT, - MutexService, -} from './mutex'; +export { type ILocker, Lock, Locker, MutexService } from './mutex'; export { getOptionalModuleMetadata, GlobalExceptionFilter, diff --git a/packages/backend/server/src/fundamentals/mutex/bucket.ts b/packages/backend/server/src/fundamentals/mutex/bucket.ts deleted file mode 100644 index 446676cb39..0000000000 --- a/packages/backend/server/src/fundamentals/mutex/bucket.ts +++ /dev/null @@ -1,15 +0,0 @@ -export class BucketService { - private readonly bucket = new Map(); - - get(key: string) { - return this.bucket.get(key); - } - - set(key: string, value: string) { - this.bucket.set(key, value); - } - - delete(key: string) { - this.bucket.delete(key); - } -} diff --git a/packages/backend/server/src/fundamentals/mutex/index.ts b/packages/backend/server/src/fundamentals/mutex/index.ts index 5c50fba547..f8d136f5ce 100644 --- a/packages/backend/server/src/fundamentals/mutex/index.ts +++ b/packages/backend/server/src/fundamentals/mutex/index.ts @@ -1,14 +1,14 @@ import { Global, Module } from '@nestjs/common'; -import { BucketService } from './bucket'; +import { Locker } from './local-lock'; import { MutexService } from './mutex'; @Global() @Module({ - providers: [BucketService, MutexService], - exports: [BucketService, MutexService], + providers: [MutexService, Locker], + exports: [MutexService, Locker], }) export class MutexModule {} -export { BucketService, MutexService }; -export { LockGuard, MUTEX_RETRY, MUTEX_WAIT } from './mutex'; +export { Locker, MutexService }; +export { type Locker as ILocker, Lock } from './lock'; diff --git a/packages/backend/server/src/fundamentals/mutex/local-lock.ts b/packages/backend/server/src/fundamentals/mutex/local-lock.ts new file mode 100644 index 0000000000..c0460f5fb7 --- /dev/null +++ b/packages/backend/server/src/fundamentals/mutex/local-lock.ts @@ -0,0 +1,28 @@ +import { Injectable } from '@nestjs/common'; + +import { Cache } from '../cache'; +import { Lock, Locker as ILocker } from './lock'; + +@Injectable() +export class Locker implements ILocker { + constructor(private readonly cache: Cache) {} + + async lock(owner: string, key: string): Promise { + const lockKey = `MutexLock:${key}`; + const prevOwner = await this.cache.get(lockKey); + + if (prevOwner && prevOwner !== owner) { + throw new Error(`Lock for resource [${key}] has been holder by others`); + } + + const acquired = await this.cache.set(lockKey, owner); + + if (acquired) { + return new Lock(async () => { + await this.cache.delete(lockKey); + }); + } + + throw new Error(`Failed to acquire lock for resource [${key}]`); + } +} diff --git a/packages/backend/server/src/fundamentals/mutex/lock.ts b/packages/backend/server/src/fundamentals/mutex/lock.ts new file mode 100644 index 0000000000..bf0a971293 --- /dev/null +++ b/packages/backend/server/src/fundamentals/mutex/lock.ts @@ -0,0 +1,23 @@ +import { Logger } from '@nestjs/common'; + +import { retryable } from '../utils/promise'; + +export class Lock implements AsyncDisposable { + private readonly logger = new Logger(Lock.name); + + constructor(private readonly dispose: () => Promise) {} + + async release() { + await retryable(() => this.dispose()).catch(e => { + this.logger.error('Failed to release lock', e); + }); + } + + async [Symbol.asyncDispose]() { + await this.release(); + } +} + +export interface Locker { + lock(owner: string, key: string): Promise; +} diff --git a/packages/backend/server/src/fundamentals/mutex/mutex.ts b/packages/backend/server/src/fundamentals/mutex/mutex.ts index ffdd8eb889..8d58c88eb0 100644 --- a/packages/backend/server/src/fundamentals/mutex/mutex.ts +++ b/packages/backend/server/src/fundamentals/mutex/mutex.ts @@ -1,24 +1,12 @@ import { randomUUID } from 'node:crypto'; -import { setTimeout } from 'node:timers/promises'; import { Inject, Injectable, Logger, Scope } from '@nestjs/common'; +import { ModuleRef } from '@nestjs/core'; import { CONTEXT } from '@nestjs/graphql'; import type { GraphqlContext } from '../graphql'; -import { BucketService } from './bucket'; - -export class LockGuard - implements AsyncDisposable -{ - constructor( - private readonly mutex: M, - private readonly key: string - ) {} - - async [Symbol.asyncDispose]() { - return this.mutex.unlock(this.key); - } -} +import { retryable } from '../utils/promise'; +import { Locker } from './local-lock'; export const MUTEX_RETRY = 5; export const MUTEX_WAIT = 100; @@ -29,7 +17,7 @@ export class MutexService { constructor( @Inject(CONTEXT) private readonly context: GraphqlContext, - private readonly bucket: BucketService + private readonly ref: ModuleRef ) {} protected getId() { @@ -64,33 +52,22 @@ export class MutexService { * @param key resource key * @returns LockGuard */ - async lock(key: string): Promise { - const id = this.getId(); - const fetchLock = async (retry: number): Promise => { - if (retry === 0) { - this.logger.error( - `Failed to fetch lock ${key} after ${MUTEX_RETRY} retry` - ); - return undefined; - } - const current = this.bucket.get(key); - if (current && current !== id) { - this.logger.warn( - `Failed to fetch lock ${key}, retrying in ${MUTEX_WAIT} ms` - ); - await setTimeout(MUTEX_WAIT * (MUTEX_RETRY - retry + 1)); - return fetchLock(retry - 1); - } - this.bucket.set(key, id); - return new LockGuard(this, key); - }; - - return fetchLock(MUTEX_RETRY); - } - - async unlock(key: string): Promise { - if (this.bucket.get(key) === this.getId()) { - this.bucket.delete(key); + async lock(key: string) { + try { + return await retryable( + () => { + const locker = this.ref.get(Locker, { strict: false }); + return locker.lock(this.getId(), key); + }, + MUTEX_RETRY, + MUTEX_WAIT + ); + } catch (e) { + this.logger.error( + `Failed to lock resource [${key}] after retry ${MUTEX_RETRY} times`, + e + ); + return undefined; } } } diff --git a/packages/backend/server/src/fundamentals/utils/promise.ts b/packages/backend/server/src/fundamentals/utils/promise.ts new file mode 100644 index 0000000000..81881a1205 --- /dev/null +++ b/packages/backend/server/src/fundamentals/utils/promise.ts @@ -0,0 +1,44 @@ +import { defer, retry } from 'rxjs'; + +export class RetryablePromise extends Promise { + constructor( + executor: ( + resolve: (value: T | PromiseLike) => void, + reject: (reason?: any) => void + ) => void, + retryTimes: number = 3, + retryIntervalInMs: number = 300 + ) { + super((resolve, reject) => { + defer(() => new Promise(executor)) + .pipe( + retry({ + count: retryTimes, + delay: retryIntervalInMs, + }) + ) + .subscribe({ + next: v => { + resolve(v); + }, + error: e => { + reject(e); + }, + }); + }); + } +} + +export function retryable( + asyncFn: () => Promise, + retryTimes = 3, + retryIntervalInMs = 300 +): Promise { + return new RetryablePromise( + (resolve, reject) => { + asyncFn().then(resolve).catch(reject); + }, + retryTimes, + retryIntervalInMs + ); +} diff --git a/packages/backend/server/src/plugins/redis/index.ts b/packages/backend/server/src/plugins/redis/index.ts index 1ca721147a..0fa6cedb30 100644 --- a/packages/backend/server/src/plugins/redis/index.ts +++ b/packages/backend/server/src/plugins/redis/index.ts @@ -1,27 +1,14 @@ import { Global, Provider, Type } from '@nestjs/common'; -import { CONTEXT } from '@nestjs/graphql'; import { Redis, type RedisOptions } from 'ioredis'; import { ThrottlerStorageRedisService } from 'nestjs-throttler-storage-redis'; -import { - BucketService, - Cache, - type GraphqlContext, - MutexService, - SessionCache, -} from '../../fundamentals'; +import { Cache, Locker, SessionCache } from '../../fundamentals'; import { ThrottlerStorage } from '../../fundamentals/throttler'; import { SocketIoAdapterImpl } from '../../fundamentals/websocket'; import { Plugin } from '../registry'; import { RedisCache } from './cache'; -import { - CacheRedis, - MutexRedis, - SessionRedis, - SocketIoRedis, - ThrottlerRedis, -} from './instances'; -import { MutexRedisService } from './mutex'; +import { CacheRedis, SessionRedis, SocketIoRedis } from './instances'; +import { RedisMutexLocker } from './mutex'; import { createSockerIoAdapterImpl } from './ws-adapter'; function makeProvider(token: Type, impl: Type): Provider { @@ -44,7 +31,7 @@ const throttlerStorageProvider: Provider = { useFactory: (redis: Redis) => { return new ThrottlerStorageRedisService(redis); }, - inject: [ThrottlerRedis], + inject: [SessionRedis], }; // socket io @@ -58,23 +45,14 @@ const socketIoRedisAdapterProvider: Provider = { // mutex const mutexRedisAdapterProvider: Provider = { - provide: MutexService, - useFactory: (redis: Redis, ctx: GraphqlContext, bucket: BucketService) => { - return new MutexRedisService(redis, ctx, bucket); - }, - inject: [MutexRedis, CONTEXT, BucketService], + provide: Locker, + useClass: RedisMutexLocker, }; @Global() @Plugin({ name: 'redis', - providers: [ - CacheRedis, - SessionRedis, - ThrottlerRedis, - SocketIoRedis, - MutexRedis, - ], + providers: [CacheRedis, SessionRedis, SocketIoRedis], overrides: [ cacheProvider, sessionCacheProvider, diff --git a/packages/backend/server/src/plugins/redis/instances.ts b/packages/backend/server/src/plugins/redis/instances.ts index 8fbd13b0c6..3c093e15f3 100644 --- a/packages/backend/server/src/plugins/redis/instances.ts +++ b/packages/backend/server/src/plugins/redis/instances.ts @@ -34,13 +34,6 @@ export class CacheRedis extends Redis { } } -@Injectable() -export class ThrottlerRedis extends Redis { - constructor(config: Config) { - super({ ...config.plugins.redis, db: (config.plugins.redis?.db ?? 0) + 1 }); - } -} - @Injectable() export class SessionRedis extends Redis { constructor(config: Config) { @@ -54,10 +47,3 @@ export class SocketIoRedis extends Redis { super({ ...config.plugins.redis, db: (config.plugins.redis?.db ?? 0) + 3 }); } } - -@Injectable() -export class MutexRedis extends Redis { - constructor(config: Config) { - super({ ...config.plugins.redis, db: (config.plugins.redis?.db ?? 0) + 4 }); - } -} diff --git a/packages/backend/server/src/plugins/redis/mutex.ts b/packages/backend/server/src/plugins/redis/mutex.ts index 9006507f08..8f63019f65 100644 --- a/packages/backend/server/src/plugins/redis/mutex.ts +++ b/packages/backend/server/src/plugins/redis/mutex.ts @@ -1,22 +1,13 @@ -import { setTimeout } from 'node:timers/promises'; - import { Injectable, Logger } from '@nestjs/common'; -import Redis, { Command } from 'ioredis'; +import { Command } from 'ioredis'; -import { - BucketService, - type GraphqlContext, - LockGuard, - MUTEX_RETRY, - MUTEX_WAIT, - MutexService, -} from '../../fundamentals'; +import { ILocker, Lock } from '../../fundamentals'; +import { SessionRedis } from './instances'; const lockScript = `local key = KEYS[1] local clientId = ARGV[1] -local releaseTime = ARGV[2] -if redis.call("get", key) == clientId or redis.call("set", key, clientId, "NX", "PX", releaseTime) then +if redis.call("get", key) == clientId or redis.call("set", key, clientId, "NX", "EX", 60) then return 1 else return 0 @@ -31,66 +22,31 @@ else end`; @Injectable() -export class MutexRedisService extends MutexService { - constructor( - private readonly redis: Redis, - context: GraphqlContext, - bucket: BucketService - ) { - super(context, bucket); - this.logger = new Logger(MutexRedisService.name); - } +export class RedisMutexLocker implements ILocker { + private readonly logger = new Logger(RedisMutexLocker.name); + constructor(private readonly redis: SessionRedis) {} - override async lock( - key: string, - releaseTimeInMS: number = 200 - ): Promise { - const clientId = this.getId(); - this.logger.debug(`Client ${clientId} lock try to lock ${key}`); - const releaseTime = releaseTimeInMS.toString(); + async lock(owner: string, key: string): Promise { + const lockKey = `MutexLock:${key}`; + this.logger.debug(`Client ${owner} is trying to lock resource ${key}`); - const fetchLock = async (retry: number): Promise => { - if (retry === 0) { - this.logger.error( - `Failed to fetch lock ${key} after ${MUTEX_RETRY} retry` - ); - return undefined; - } - try { - const success = await this.redis.sendCommand( - new Command('EVAL', [lockScript, '1', key, clientId, releaseTime]) - ); - if (success === 1) { - return new LockGuard(this, key); - } else { - this.logger.warn( - `Failed to fetch lock ${key}, retrying in ${MUTEX_WAIT} ms` - ); - await setTimeout(MUTEX_WAIT * (MUTEX_RETRY - retry + 1)); - return fetchLock(retry - 1); - } - } catch (error: any) { - this.logger.error( - `Unexpected error when fetch lock ${key}: ${error.message}` - ); - return undefined; - } - }; - - return fetchLock(MUTEX_RETRY); - } - - override async unlock(key: string, ignoreUnlockFail = false): Promise { - const clientId = this.getId(); - const result = await this.redis.sendCommand( - new Command('EVAL', [unlockScript, '1', key, clientId]) + const success = await this.redis.sendCommand( + new Command('EVAL', [lockScript, '1', lockKey, owner]) ); - if (result === 0) { - if (!ignoreUnlockFail) { - throw new Error(`Failed to release lock ${key}`); - } else { - this.logger.warn(`Failed to release lock ${key}`); - } + + if (success === 1) { + return new Lock(async () => { + const result = await this.redis.sendCommand( + new Command('EVAL', [unlockScript, '1', lockKey, owner]) + ); + + // TODO(@darksky): lock expired condition is not handled + if (result === 0) { + throw new Error(`Failed to release lock ${key}`); + } + }); } + + throw new Error(`Failed to acquire lock for resource [${key}]`); } }