이전에 분산락을 구현하기 위해 Redisson의 Pub/Sub 방식을 이용한 RLock을 활용했습니다.
redisson은 lock 구현을 위해 hash set 자료구조에 lock을 두고 순차적인 lock 제공을 위해 PubSub 채널을 이용합니다.
redisson은 두 개의 자료구조(키)에 대한 조작을 어떻게 원자성있게 관리할까요?
Lock 인터페이스 사용
// Lock 생성
RLock lock = redissonClient.getLock(couponId.toString());
// Lock 획득
boolean available = lock.tryLock(10, 1, TimeUnit.SECONDS);
// Lock 반납
lock.unlock();
redisson은 getLock()을 통해 Lock 객체를 획득하고, tryLock()을 이용해 락 획득여부를 얻을 수 있으며 unLock()을 이용해 안전하게 락을 반납합니다.
Lock 생성
redissonClient를 이용해 생성한 RLock은 RedissonLock을 통해 생성됩니다.
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = this.getServiceManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
상속받은 RedissonBaseLock에서 name을 이용해 entryName을 결정하고
LockLeaseTime 결정과 pubSub 채널을 구독할 준비를 합니다.
String getChannelName() {
return prefixName("redisson_lock__channel", this.getRawName());
}
lock 채널 생성시 채널명에 "redisson_lock__channel" 을 prefix로 붙이는 내용도 확인할 수 있습니다.
Lock 획득
redisson 인터페이스 내부의 Lock은 어떤 방식으로 작동할까요?
MultyLock, SpinLock이 아닌 일반적인 Lock의 경우 다음의 과정을 거칩니다.
Lock 획득 시도
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
대기시간, 임대시간, 단위, 쓰레드아이디를 이용해 Lock을 획득하려 합니다.
-- KEYS[1]: key
-- ARGV[1]: 임대시간
-- ARGV[2]: 락 이름
if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1))
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
내부에서는 lua script를 이용해 원자적으로 Lock을 생성하거나(nil), 해당 키에 부여된 TTL을 리턴합니다.
Lock 획득 성공
if (ttl == null) {
return true;
}
락 획득 성공시 tryLock() 메서드의 리턴은 true로 락 획득을 성공했다고 응답합니다.
Lock 획득 실패
락 획득 실패시 내부에서는 대기시간 내 Lock 획득을 위한 처리를 진행합니다.
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
}
대기시간이 지났다면 실패처리를 진행하고
boolean currentTime;
try {
time -= System.currentTimeMillis() - current;
if (time > 0L) {
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
boolean var32 = true;
return var32;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var31 = false;
return var31;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
boolean var16 = false;
return var16;
}
this.acquireFailed(waitTime, unit, threadId);
currentTime = false;
} finally {
this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
}
대기시간이 남아있다면, 다음과 같이 대기시간이 부족할 때 까지 락 획득을 시도합니다.
((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
위 코드의 경우 subscribeFuture을 이용해 RedissonLockEntry를 획득하고getLatch()로 Pub/Sub 내부 락이 해제되는 이벤트를 받는 대기 객체를 가져옵니다.
이후 tryAcquire(ttl, TimeUnit.MILLISECONDS)로 ttl만큼 락 획득을 대기합니다.
this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
최종적으로 락 획득 성공 또는 실패 이후 Pub/Sub 채널 구독을 해제합니다.
Lock 제거
lock.unlock();
-- KEYS[1]: rawName
-- KEYS[2]: channelName
-- KEYS[3]: unlockLatchName
local val = redis.call('get', KEYS[3]);
if val ~= false
then return tonumber(val);
end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
then return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0)
then redis.call('pexpire', KEYS[1], ARGV[2]);
redis.call('set', KEYS[3], 0, 'px', ARGV[5]);
return 0;
else redis.call('del', KEYS[1]);
redis.call(ARGV[4], KEYS[2], ARGV[1]);
redis.call('set', KEYS[3], 1, 'px', ARGV[5]);
return 1;
end;
unLock 과정 또한 lua script로 원자성을 보장해 실행됩니다.
락을 소유한 스레드인지 확인하고
락 카운터도 줄이는 과정도 진행합니다.
락 카운터를 줄인 이후에도 카운터가 0보다 크다면 락 해제를 진행하지 않고 TTL만 연장합니다.
락 카운터가 0이라면 락 키 삭제, pub/sub 채널에 락 해제 메세지 전송 및 락 해제상태 저장 작업을 진행합니다.
링크드인을 둘러보다 spinLock 방식과 PubSub 방식의 락 구현 사이에서 과연 PubSub이 성능이 더 좋은게 맞을까? 하는 내용을 보게 되었습니다.
상황에 따라 성능이 다를 수 있지만 내가 사용하는 PubSub방식의 락이 어떤 방식으로 동작하는지 알아볼 수 있었습니다.
'Java & Kotlin > Spring' 카테고리의 다른 글
| Feign에서 multipart/form-data json 조금 더 잘 다루기 (0) | 2024.03.10 |
|---|---|
| ELK 스택을 사용해 Springboot 로그 수집하기 (0) | 2023.11.05 |
| [Spring] ExceptionHandlerLoggedImpl 에러가 나타난다면 (0) | 2022.08.24 |
| [Spring] AOP Joinpoint의 getThis()와 getTarget() (0) | 2022.08.20 |
| [Spring] kotlin spring lazy loading 삽질기 (0) | 2022.08.13 |
| [Spring] kotlin + Spring 시작하기 (0) | 2022.08.10 |