mirror of
https://github.com/toeverything/AFFiNE.git
synced 2026-02-04 08:38:34 +00:00
fix(server): handle expired lock re-release & external locker injection (#6145)
This commit is contained in:
@@ -6,7 +6,7 @@ import { MutexService } from './mutex';
|
||||
@Global()
|
||||
@Module({
|
||||
providers: [MutexService, Locker],
|
||||
exports: [MutexService, Locker],
|
||||
exports: [MutexService],
|
||||
})
|
||||
export class MutexModule {}
|
||||
|
||||
|
||||
@@ -14,11 +14,21 @@ export const MUTEX_WAIT = 100;
|
||||
@Injectable({ scope: Scope.REQUEST })
|
||||
export class MutexService {
|
||||
protected logger = new Logger(MutexService.name);
|
||||
private readonly locker: Locker;
|
||||
|
||||
constructor(
|
||||
@Inject(CONTEXT) private readonly context: GraphqlContext,
|
||||
private readonly ref: ModuleRef
|
||||
) {}
|
||||
) {
|
||||
// nestjs will always find and injecting the locker from local module
|
||||
// so the RedisLocker implemented by the plugin mechanism will not be able to overwrite the internal locker
|
||||
// we need to use find and get the locker from the `ModuleRef` manually
|
||||
//
|
||||
// NOTE: when a `constructor` execute in normal service, the Locker module we expect may not have been initialized
|
||||
// but in the Service with `Scope.REQUEST`, we will create a separate Service instance for each request
|
||||
// at this time, all modules have been initialized, so we able to get the correct Locker instance in `constructor`
|
||||
this.locker = this.ref.get(Locker, { strict: false });
|
||||
}
|
||||
|
||||
protected getId() {
|
||||
let id = this.context.req.headers['x-transaction-id'] as string;
|
||||
@@ -55,10 +65,7 @@ export class MutexService {
|
||||
async lock(key: string) {
|
||||
try {
|
||||
return await retryable(
|
||||
() => {
|
||||
const locker = this.ref.get(Locker, { strict: false });
|
||||
return locker.lock(this.getId(), key);
|
||||
},
|
||||
() => this.locker.lock(this.getId(), key),
|
||||
MUTEX_RETRY,
|
||||
MUTEX_WAIT
|
||||
);
|
||||
|
||||
@@ -4,19 +4,33 @@ import { Command } from 'ioredis';
|
||||
import { ILocker, Lock } from '../../fundamentals';
|
||||
import { SessionRedis } from './instances';
|
||||
|
||||
// === atomic mutex lock ===
|
||||
// acquire lock
|
||||
// return 1 if lock is acquired
|
||||
// return 0 if lock is not acquired
|
||||
const lockScript = `local key = KEYS[1]
|
||||
local clientId = ARGV[1]
|
||||
local owner = ARGV[1]
|
||||
|
||||
if redis.call("get", key) == clientId or redis.call("set", key, clientId, "NX", "EX", 60) then
|
||||
-- if lock is not exists or lock is owned by the owner
|
||||
-- then set lock to the owner and return 1, otherwise return 0
|
||||
-- if the lock is not released correctly due to unexpected reasons
|
||||
-- lock will be released after 60 seconds
|
||||
if redis.call("get", key) == owner or redis.call("set", key, owner, "NX", "EX", 60) then
|
||||
return 1
|
||||
else
|
||||
return 0
|
||||
end`;
|
||||
// release lock
|
||||
// return 1 if lock is released or lock is not exists
|
||||
// return 0 if lock is not owned by the owner
|
||||
const unlockScript = `local key = KEYS[1]
|
||||
local clientId = ARGV[1]
|
||||
local owner = ARGV[1]
|
||||
|
||||
if redis.call("get", key) == clientId then
|
||||
local value = redis.call("get", key)
|
||||
if value == owner then
|
||||
return redis.call("del", key)
|
||||
elseif value == nil then
|
||||
return 1
|
||||
else
|
||||
return 0
|
||||
end`;
|
||||
@@ -40,7 +54,6 @@ export class RedisMutexLocker implements ILocker {
|
||||
new Command('EVAL', [unlockScript, '1', lockKey, owner])
|
||||
);
|
||||
|
||||
// TODO(@darksky): lock expired condition is not handled
|
||||
if (result === 0) {
|
||||
throw new Error(`Failed to release lock ${key}`);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user