Distributed Locking - Postgres
Overview
Spring boot application with distributed locking using postgres
Github: https://github.com/gitorko/project05
Distributed Locking
When there are many service running and need to acquire a lock to run a critical region of the code there is contention.
- Use OptimisticLocking to avoid 2 threads from acquiring the same lock
- Use UNIQUE constraint to ensure same lock is present only once in the db.
- Even if the server crashes/dies the locks should be auto released and not held forever or should not require manual intervention for cleanup.
- Use virtual threads cleanup locks after duration is completed.
- Only the node/server that acquired the lock can release the lock.
Postgres is not a distributed database, here the services that run are distributed, the services require a lock which is provided by postgres.
Code
1package com.demo.project05.service;
2
3import java.net.InetAddress;
4import java.time.LocalDateTime;
5import java.util.Optional;
6
7import com.demo.project05.domain.DistributedLock;
8import com.demo.project05.repository.DistributedLockRepository;
9import lombok.RequiredArgsConstructor;
10import lombok.SneakyThrows;
11import lombok.extern.slf4j.Slf4j;
12import org.springframework.stereotype.Service;
13import org.springframework.transaction.annotation.Propagation;
14import org.springframework.transaction.annotation.Transactional;
15
16/**
17 * Since @Transaction needs public scope we dont want to expose this class to other classes, hence it's an internal class.
18 */
19@Service
20@RequiredArgsConstructor
21@Slf4j
22public class InternalLockService {
23
24 final DistributedLockRepository repository;
25 final String lockedBy = getHostIdentifier();
26
27 @Transactional(propagation = Propagation.REQUIRES_NEW)
28 public boolean tryLock(String lockName, int expirySeconds) {
29 LocalDateTime lockUntil = LocalDateTime.now().plusSeconds(expirySeconds);
30 LocalDateTime now = LocalDateTime.now();
31 Optional<DistributedLock> lockOptional = repository.findUnlocked(lockName, now);
32 if (lockOptional.isPresent()) {
33 DistributedLock lock = lockOptional.get();
34 lock.setLockUntil(lockUntil);
35 lock.setLockAt(now);
36 lock.setLockBy(lockedBy);
37 repository.save(lock);
38 } else {
39 DistributedLock newLock = new DistributedLock();
40 newLock.setLockName(lockName);
41 newLock.setLockUntil(lockUntil);
42 newLock.setLockAt(now);
43 newLock.setLockBy(lockedBy);
44 repository.save(newLock);
45 }
46 return true;
47 }
48
49 @Transactional(propagation = Propagation.REQUIRES_NEW)
50 public boolean unlock(String lockName) {
51 Optional<DistributedLock> lockOptional = repository.findByLockName(lockName);
52 if (lockOptional.isPresent()) {
53 DistributedLock lock = lockOptional.get();
54 //Only the node that locked will be able to unlock
55 if (lockedBy.equals(lock.getLockBy())) {
56 lock.setLockUntil(null);
57 lock.setLockAt(null);
58 lock.setLockBy(null);
59 repository.save(lock);
60 }
61 return true;
62 }
63 return false;
64 }
65
66 @SneakyThrows
67 private String getHostIdentifier() {
68 // Get unique identifier for this instance, e.g., hostname or UUID
69 return InetAddress.getLocalHost().getHostName();
70 }
71}
1package com.demo.project05.service;
2
3import java.util.concurrent.TimeUnit;
4
5import lombok.RequiredArgsConstructor;
6import lombok.extern.slf4j.Slf4j;
7import org.springframework.dao.DataIntegrityViolationException;
8import org.springframework.orm.ObjectOptimisticLockingFailureException;
9import org.springframework.stereotype.Service;
10
11@Service
12@Slf4j
13@RequiredArgsConstructor
14public class DistributedLockService {
15
16 final InternalLockService internalLockService;
17
18 public boolean acquireLock(String lockName, int expirySeconds) {
19 log.info("Attempting to acquire lock: {}", lockName);
20 try {
21 boolean lockStatus = internalLockService.tryLock(lockName, expirySeconds);
22 scheduleUnlock(lockName, expirySeconds);
23 return lockStatus;
24 } catch (ObjectOptimisticLockingFailureException ex) {
25 log.error("Unable to acquire lock due to concurrent request");
26 return false;
27 } catch (DataIntegrityViolationException ex) {
28 log.error("Lock already exists");
29 return false;
30 } catch (Exception ex) {
31 log.error("Failed to acquire lock");
32 return false;
33 }
34 }
35
36 public boolean releaseLock(String lockName) {
37 try {
38 log.info("Attempting to release lock: {}", lockName);
39 return internalLockService.unlock(lockName);
40 } catch (Exception ex) {
41 log.error("Failed to release lock");
42 return false;
43 }
44 }
45
46 /**
47 * Auto cleanup job.
48 * Code will work even if this fails. But this will set the lock to null to make it more clear that it is unused.
49 * Even if server dies the lock will be released based on lock_until time
50 */
51 private void scheduleUnlock(String lockName, int expirySeconds) {
52 Thread.startVirtualThread(() -> {
53 try {
54 TimeUnit.SECONDS.sleep(expirySeconds);
55 releaseLock(lockName);
56 } catch (InterruptedException e) {
57 Thread.currentThread().interrupt();
58 }
59 });
60 }
61}
1package com.demo.project05.controller;
2
3import com.demo.project05.service.DistributedLockService;
4import lombok.RequiredArgsConstructor;
5import lombok.extern.slf4j.Slf4j;
6import org.springframework.web.bind.annotation.GetMapping;
7import org.springframework.web.bind.annotation.RequestParam;
8import org.springframework.web.bind.annotation.RestController;
9
10@RestController
11@RequiredArgsConstructor
12@Slf4j
13public class LockController {
14
15 final DistributedLockService distributedLockService;
16
17 @GetMapping("/try-lock")
18 public String tryLock(@RequestParam String lockName, @RequestParam int expirySeconds) {
19 boolean lockAcquired = distributedLockService.acquireLock(lockName, expirySeconds);
20 return lockAcquired ? "Lock acquired!" : "Failed to acquire lock!";
21 }
22
23 @GetMapping("/unlock")
24 public String unlock(@RequestParam String lockName) {
25 distributedLockService.releaseLock(lockName);
26 return "Lock released!";
27 }
28}
1CREATE TABLE distributed_lock
2(
3 lock_id BIGSERIAL PRIMARY KEY,
4 lock_name VARCHAR(255) UNIQUE,
5 lock_until TIMESTAMP,
6 lock_at TIMESTAMP,
7 lock_by VARCHAR(255),
8 lock_version BIGINT
9);
Postman
Import the postman collection to postman
Setup
1# Project 05
2
3Distributed Locking - Postgres
4
5[https://gitorko.github.io/post/distributed-locking-postgres](https://gitorko.github.io/post/distributed-locking-postgres)
6
7### Version
8
9Check version
10
11```bash
12$java --version
13openjdk 21.0.3 2024-04-16 LTS
14```
15
16### Postgres DB
17
18```bash
19docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:14
20docker ps
21docker exec -it pg-container psql -U postgres -W postgres
22CREATE USER test WITH PASSWORD 'test@123';
23CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
24grant all PRIVILEGES ON DATABASE "test-db" to test;
25
26docker stop pg-container
27docker start pg-container
28```
29
30### Dev
31
32To run the backend in dev mode.
33
34```bash
35./gradlew clean build
36./gradlew bootRun
37
38```
39
References
comments powered by Disqus