fix(server): inject correct locker to request scope mutex (#6140)

This commit is contained in:
liuyi
2024-03-19 02:16:35 +00:00
parent f18133af82
commit 4702c1a9ca
10 changed files with 154 additions and 183 deletions

View File

@@ -18,13 +18,7 @@ export type { GraphqlContext } from './graphql';
export { CryptoHelper, URLHelper } from './helpers'; export { CryptoHelper, URLHelper } from './helpers';
export { MailService } from './mailer'; export { MailService } from './mailer';
export { CallCounter, CallTimer, metrics } from './metrics'; export { CallCounter, CallTimer, metrics } from './metrics';
export { export { type ILocker, Lock, Locker, MutexService } from './mutex';
BucketService,
LockGuard,
MUTEX_RETRY,
MUTEX_WAIT,
MutexService,
} from './mutex';
export { export {
getOptionalModuleMetadata, getOptionalModuleMetadata,
GlobalExceptionFilter, GlobalExceptionFilter,

View File

@@ -1,15 +0,0 @@
export class BucketService {
private readonly bucket = new Map<string, string>();
get(key: string) {
return this.bucket.get(key);
}
set(key: string, value: string) {
this.bucket.set(key, value);
}
delete(key: string) {
this.bucket.delete(key);
}
}

View File

@@ -1,14 +1,14 @@
import { Global, Module } from '@nestjs/common'; import { Global, Module } from '@nestjs/common';
import { BucketService } from './bucket'; import { Locker } from './local-lock';
import { MutexService } from './mutex'; import { MutexService } from './mutex';
@Global() @Global()
@Module({ @Module({
providers: [BucketService, MutexService], providers: [MutexService, Locker],
exports: [BucketService, MutexService], exports: [MutexService, Locker],
}) })
export class MutexModule {} export class MutexModule {}
export { BucketService, MutexService }; export { Locker, MutexService };
export { LockGuard, MUTEX_RETRY, MUTEX_WAIT } from './mutex'; export { type Locker as ILocker, Lock } from './lock';

View File

@@ -0,0 +1,28 @@
import { Injectable } from '@nestjs/common';
import { Cache } from '../cache';
import { Lock, Locker as ILocker } from './lock';
@Injectable()
export class Locker implements ILocker {
constructor(private readonly cache: Cache) {}
async lock(owner: string, key: string): Promise<Lock> {
const lockKey = `MutexLock:${key}`;
const prevOwner = await this.cache.get<string>(lockKey);
if (prevOwner && prevOwner !== owner) {
throw new Error(`Lock for resource [${key}] has been holder by others`);
}
const acquired = await this.cache.set(lockKey, owner);
if (acquired) {
return new Lock(async () => {
await this.cache.delete(lockKey);
});
}
throw new Error(`Failed to acquire lock for resource [${key}]`);
}
}

View File

@@ -0,0 +1,23 @@
import { Logger } from '@nestjs/common';
import { retryable } from '../utils/promise';
export class Lock implements AsyncDisposable {
private readonly logger = new Logger(Lock.name);
constructor(private readonly dispose: () => Promise<void>) {}
async release() {
await retryable(() => this.dispose()).catch(e => {
this.logger.error('Failed to release lock', e);
});
}
async [Symbol.asyncDispose]() {
await this.release();
}
}
export interface Locker {
lock(owner: string, key: string): Promise<Lock>;
}

View File

@@ -1,24 +1,12 @@
import { randomUUID } from 'node:crypto'; import { randomUUID } from 'node:crypto';
import { setTimeout } from 'node:timers/promises';
import { Inject, Injectable, Logger, Scope } from '@nestjs/common'; import { Inject, Injectable, Logger, Scope } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { CONTEXT } from '@nestjs/graphql'; import { CONTEXT } from '@nestjs/graphql';
import type { GraphqlContext } from '../graphql'; import type { GraphqlContext } from '../graphql';
import { BucketService } from './bucket'; import { retryable } from '../utils/promise';
import { Locker } from './local-lock';
export class LockGuard<M extends MutexService = MutexService>
implements AsyncDisposable
{
constructor(
private readonly mutex: M,
private readonly key: string
) {}
async [Symbol.asyncDispose]() {
return this.mutex.unlock(this.key);
}
}
export const MUTEX_RETRY = 5; export const MUTEX_RETRY = 5;
export const MUTEX_WAIT = 100; export const MUTEX_WAIT = 100;
@@ -29,7 +17,7 @@ export class MutexService {
constructor( constructor(
@Inject(CONTEXT) private readonly context: GraphqlContext, @Inject(CONTEXT) private readonly context: GraphqlContext,
private readonly bucket: BucketService private readonly ref: ModuleRef
) {} ) {}
protected getId() { protected getId() {
@@ -64,33 +52,22 @@ export class MutexService {
* @param key resource key * @param key resource key
* @returns LockGuard * @returns LockGuard
*/ */
async lock(key: string): Promise<LockGuard | undefined> { async lock(key: string) {
const id = this.getId(); try {
const fetchLock = async (retry: number): Promise<LockGuard | undefined> => { return await retryable(
if (retry === 0) { () => {
this.logger.error( const locker = this.ref.get(Locker, { strict: false });
`Failed to fetch lock ${key} after ${MUTEX_RETRY} retry` return locker.lock(this.getId(), key);
); },
return undefined; MUTEX_RETRY,
} MUTEX_WAIT
const current = this.bucket.get(key); );
if (current && current !== id) { } catch (e) {
this.logger.warn( this.logger.error(
`Failed to fetch lock ${key}, retrying in ${MUTEX_WAIT} ms` `Failed to lock resource [${key}] after retry ${MUTEX_RETRY} times`,
); e
await setTimeout(MUTEX_WAIT * (MUTEX_RETRY - retry + 1)); );
return fetchLock(retry - 1); return undefined;
}
this.bucket.set(key, id);
return new LockGuard(this, key);
};
return fetchLock(MUTEX_RETRY);
}
async unlock(key: string): Promise<void> {
if (this.bucket.get(key) === this.getId()) {
this.bucket.delete(key);
} }
} }
} }

View File

@@ -0,0 +1,44 @@
import { defer, retry } from 'rxjs';
export class RetryablePromise<T> extends Promise<T> {
constructor(
executor: (
resolve: (value: T | PromiseLike<T>) => void,
reject: (reason?: any) => void
) => void,
retryTimes: number = 3,
retryIntervalInMs: number = 300
) {
super((resolve, reject) => {
defer(() => new Promise<T>(executor))
.pipe(
retry({
count: retryTimes,
delay: retryIntervalInMs,
})
)
.subscribe({
next: v => {
resolve(v);
},
error: e => {
reject(e);
},
});
});
}
}
export function retryable<Ret = unknown>(
asyncFn: () => Promise<Ret>,
retryTimes = 3,
retryIntervalInMs = 300
): Promise<Ret> {
return new RetryablePromise<Ret>(
(resolve, reject) => {
asyncFn().then(resolve).catch(reject);
},
retryTimes,
retryIntervalInMs
);
}

View File

@@ -1,27 +1,14 @@
import { Global, Provider, Type } from '@nestjs/common'; import { Global, Provider, Type } from '@nestjs/common';
import { CONTEXT } from '@nestjs/graphql';
import { Redis, type RedisOptions } from 'ioredis'; import { Redis, type RedisOptions } from 'ioredis';
import { ThrottlerStorageRedisService } from 'nestjs-throttler-storage-redis'; import { ThrottlerStorageRedisService } from 'nestjs-throttler-storage-redis';
import { import { Cache, Locker, SessionCache } from '../../fundamentals';
BucketService,
Cache,
type GraphqlContext,
MutexService,
SessionCache,
} from '../../fundamentals';
import { ThrottlerStorage } from '../../fundamentals/throttler'; import { ThrottlerStorage } from '../../fundamentals/throttler';
import { SocketIoAdapterImpl } from '../../fundamentals/websocket'; import { SocketIoAdapterImpl } from '../../fundamentals/websocket';
import { Plugin } from '../registry'; import { Plugin } from '../registry';
import { RedisCache } from './cache'; import { RedisCache } from './cache';
import { import { CacheRedis, SessionRedis, SocketIoRedis } from './instances';
CacheRedis, import { RedisMutexLocker } from './mutex';
MutexRedis,
SessionRedis,
SocketIoRedis,
ThrottlerRedis,
} from './instances';
import { MutexRedisService } from './mutex';
import { createSockerIoAdapterImpl } from './ws-adapter'; import { createSockerIoAdapterImpl } from './ws-adapter';
function makeProvider(token: Type, impl: Type<Redis>): Provider { function makeProvider(token: Type, impl: Type<Redis>): Provider {
@@ -44,7 +31,7 @@ const throttlerStorageProvider: Provider = {
useFactory: (redis: Redis) => { useFactory: (redis: Redis) => {
return new ThrottlerStorageRedisService(redis); return new ThrottlerStorageRedisService(redis);
}, },
inject: [ThrottlerRedis], inject: [SessionRedis],
}; };
// socket io // socket io
@@ -58,23 +45,14 @@ const socketIoRedisAdapterProvider: Provider = {
// mutex // mutex
const mutexRedisAdapterProvider: Provider = { const mutexRedisAdapterProvider: Provider = {
provide: MutexService, provide: Locker,
useFactory: (redis: Redis, ctx: GraphqlContext, bucket: BucketService) => { useClass: RedisMutexLocker,
return new MutexRedisService(redis, ctx, bucket);
},
inject: [MutexRedis, CONTEXT, BucketService],
}; };
@Global() @Global()
@Plugin({ @Plugin({
name: 'redis', name: 'redis',
providers: [ providers: [CacheRedis, SessionRedis, SocketIoRedis],
CacheRedis,
SessionRedis,
ThrottlerRedis,
SocketIoRedis,
MutexRedis,
],
overrides: [ overrides: [
cacheProvider, cacheProvider,
sessionCacheProvider, sessionCacheProvider,

View File

@@ -34,13 +34,6 @@ export class CacheRedis extends Redis {
} }
} }
@Injectable()
export class ThrottlerRedis extends Redis {
constructor(config: Config) {
super({ ...config.plugins.redis, db: (config.plugins.redis?.db ?? 0) + 1 });
}
}
@Injectable() @Injectable()
export class SessionRedis extends Redis { export class SessionRedis extends Redis {
constructor(config: Config) { constructor(config: Config) {
@@ -54,10 +47,3 @@ export class SocketIoRedis extends Redis {
super({ ...config.plugins.redis, db: (config.plugins.redis?.db ?? 0) + 3 }); super({ ...config.plugins.redis, db: (config.plugins.redis?.db ?? 0) + 3 });
} }
} }
@Injectable()
export class MutexRedis extends Redis {
constructor(config: Config) {
super({ ...config.plugins.redis, db: (config.plugins.redis?.db ?? 0) + 4 });
}
}

View File

@@ -1,22 +1,13 @@
import { setTimeout } from 'node:timers/promises';
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import Redis, { Command } from 'ioredis'; import { Command } from 'ioredis';
import { import { ILocker, Lock } from '../../fundamentals';
BucketService, import { SessionRedis } from './instances';
type GraphqlContext,
LockGuard,
MUTEX_RETRY,
MUTEX_WAIT,
MutexService,
} from '../../fundamentals';
const lockScript = `local key = KEYS[1] const lockScript = `local key = KEYS[1]
local clientId = ARGV[1] local clientId = ARGV[1]
local releaseTime = ARGV[2]
if redis.call("get", key) == clientId or redis.call("set", key, clientId, "NX", "PX", releaseTime) then if redis.call("get", key) == clientId or redis.call("set", key, clientId, "NX", "EX", 60) then
return 1 return 1
else else
return 0 return 0
@@ -31,66 +22,31 @@ else
end`; end`;
@Injectable() @Injectable()
export class MutexRedisService extends MutexService { export class RedisMutexLocker implements ILocker {
constructor( private readonly logger = new Logger(RedisMutexLocker.name);
private readonly redis: Redis, constructor(private readonly redis: SessionRedis) {}
context: GraphqlContext,
bucket: BucketService
) {
super(context, bucket);
this.logger = new Logger(MutexRedisService.name);
}
override async lock( async lock(owner: string, key: string): Promise<Lock> {
key: string, const lockKey = `MutexLock:${key}`;
releaseTimeInMS: number = 200 this.logger.debug(`Client ${owner} is trying to lock resource ${key}`);
): Promise<LockGuard | undefined> {
const clientId = this.getId();
this.logger.debug(`Client ${clientId} lock try to lock ${key}`);
const releaseTime = releaseTimeInMS.toString();
const fetchLock = async (retry: number): Promise<LockGuard | undefined> => { const success = await this.redis.sendCommand(
if (retry === 0) { new Command('EVAL', [lockScript, '1', lockKey, owner])
this.logger.error(
`Failed to fetch lock ${key} after ${MUTEX_RETRY} retry`
);
return undefined;
}
try {
const success = await this.redis.sendCommand(
new Command('EVAL', [lockScript, '1', key, clientId, releaseTime])
);
if (success === 1) {
return new LockGuard(this, key);
} else {
this.logger.warn(
`Failed to fetch lock ${key}, retrying in ${MUTEX_WAIT} ms`
);
await setTimeout(MUTEX_WAIT * (MUTEX_RETRY - retry + 1));
return fetchLock(retry - 1);
}
} catch (error: any) {
this.logger.error(
`Unexpected error when fetch lock ${key}: ${error.message}`
);
return undefined;
}
};
return fetchLock(MUTEX_RETRY);
}
override async unlock(key: string, ignoreUnlockFail = false): Promise<void> {
const clientId = this.getId();
const result = await this.redis.sendCommand(
new Command('EVAL', [unlockScript, '1', key, clientId])
); );
if (result === 0) {
if (!ignoreUnlockFail) { if (success === 1) {
throw new Error(`Failed to release lock ${key}`); return new Lock(async () => {
} else { const result = await this.redis.sendCommand(
this.logger.warn(`Failed to release lock ${key}`); new Command('EVAL', [unlockScript, '1', lockKey, owner])
} );
// TODO(@darksky): lock expired condition is not handled
if (result === 0) {
throw new Error(`Failed to release lock ${key}`);
}
});
} }
throw new Error(`Failed to acquire lock for resource [${key}]`);
} }
} }