Distributed Locking - Apache Ignite
Overview
Spring boot application with distributed locking using apache ignite
Github: https://github.com/gitorko/project04
Apache Ignite
Apache Ignite is a distributed database. It supports distributed locking mechanism.
Code
1package com.demo.project04.config;
2
3import java.util.ArrayList;
4import java.util.Collections;
5import java.util.List;
6
7import org.apache.ignite.Ignite;
8import org.apache.ignite.Ignition;
9import org.apache.ignite.cache.CacheAtomicityMode;
10import org.apache.ignite.configuration.CacheConfiguration;
11import org.apache.ignite.configuration.DataPageEvictionMode;
12import org.apache.ignite.configuration.DataRegionConfiguration;
13import org.apache.ignite.configuration.DataStorageConfiguration;
14import org.apache.ignite.configuration.IgniteConfiguration;
15import org.apache.ignite.kubernetes.configuration.KubernetesConnectionConfiguration;
16import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
17import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
18import org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes.TcpDiscoveryKubernetesIpFinder;
19import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
20import org.springframework.beans.factory.annotation.Value;
21import org.springframework.context.annotation.Bean;
22import org.springframework.context.annotation.Configuration;
23
24@Configuration
25public class IgniteConfig {
26
27 /**
28 * Override the node name for each instance at start using properties
29 */
30 @Value("${ignite.nodeName:node0}")
31 private String nodeName;
32
33 @Value("${ignite.kubernetes.enabled:false}")
34 private Boolean k8sEnabled;
35
36 private String k8sApiServer = "https://kubernetes.docker.internal:6443";
37 private String k8sServiceName = "project04";
38 private String k8sNameSpace = "default";
39
40 @Bean(name = "igniteInstance")
41 public Ignite igniteInstance() {
42 Ignite ignite = Ignition.start(igniteConfiguration());
43 return ignite;
44 }
45
46 @Bean(name = "igniteConfiguration")
47 public IgniteConfiguration igniteConfiguration() {
48 IgniteConfiguration cfg = new IgniteConfiguration();
49 /**
50 * Uniquely identify node in a cluster use consistent Id.
51 */
52 cfg.setConsistentId(nodeName);
53
54 cfg.setIgniteInstanceName("my-ignite-instance");
55 cfg.setPeerClassLoadingEnabled(true);
56 cfg.setLocalHost("127.0.0.1");
57 cfg.setMetricsLogFrequency(0);
58
59 cfg.setCommunicationSpi(tcpCommunicationSpi());
60 if (k8sEnabled) {
61 cfg.setDiscoverySpi(tcpDiscoverySpiKubernetes());
62 } else {
63 cfg.setDiscoverySpi(tcpDiscovery());
64 }
65 cfg.setDataStorageConfiguration(dataStorageConfiguration());
66 cfg.setCacheConfiguration(cacheConfiguration());
67 return cfg;
68 }
69
70 @Bean(name = "cacheConfiguration")
71 public CacheConfiguration[] cacheConfiguration() {
72 List<CacheConfiguration> cacheConfigurations = new ArrayList<>();
73 cacheConfigurations.add(getLockCacheConfig());
74 return cacheConfigurations.toArray(new CacheConfiguration[cacheConfigurations.size()]);
75 }
76
77 private CacheConfiguration getLockCacheConfig() {
78 /**
79 * Country cache to store key value pair
80 */
81 CacheConfiguration cacheConfig = new CacheConfiguration("lock-cache");
82 /**
83 * This cache will be stored in non-persistent data region
84 */
85 cacheConfig.setDataRegionName("my-data-region");
86 /**
87 * Needs to be transactional for getting distributed lock
88 */
89 cacheConfig.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
90 return cacheConfig;
91 }
92
93 /**
94 * Nodes discover each other over this port
95 */
96 private TcpDiscoverySpi tcpDiscovery() {
97 TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
98 TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
99 ipFinder.setAddresses(Collections.singletonList("127.0.0.1:47500..47509"));
100 tcpDiscoverySpi.setIpFinder(ipFinder);
101 tcpDiscoverySpi.setLocalPort(47500);
102 // Changing local port range. This is an optional action.
103 tcpDiscoverySpi.setLocalPortRange(9);
104 //tcpDiscoverySpi.setLocalAddress("localhost");
105 return tcpDiscoverySpi;
106 }
107
108 private TcpDiscoverySpi tcpDiscoverySpiKubernetes() {
109 TcpDiscoverySpi spi = new TcpDiscoverySpi();
110 KubernetesConnectionConfiguration kcfg = new KubernetesConnectionConfiguration();
111 kcfg.setNamespace(k8sNameSpace);
112 kcfg.setMasterUrl(k8sApiServer);
113 TcpDiscoveryKubernetesIpFinder ipFinder = new TcpDiscoveryKubernetesIpFinder(kcfg);
114 ipFinder.setServiceName(k8sServiceName);
115 spi.setIpFinder(ipFinder);
116 return spi;
117 }
118
119 /**
120 * Nodes communicate with each other over this port
121 */
122 private TcpCommunicationSpi tcpCommunicationSpi() {
123 TcpCommunicationSpi communicationSpi = new TcpCommunicationSpi();
124 communicationSpi.setMessageQueueLimit(1024);
125 communicationSpi.setLocalAddress("localhost");
126 communicationSpi.setLocalPort(48100);
127 communicationSpi.setSlowClientQueueLimit(1000);
128 return communicationSpi;
129 }
130
131 private DataStorageConfiguration dataStorageConfiguration() {
132 DataStorageConfiguration dsc = new DataStorageConfiguration();
133 DataRegionConfiguration defaultRegionCfg = new DataRegionConfiguration();
134 DataRegionConfiguration regionCfg = new DataRegionConfiguration();
135
136 defaultRegionCfg.setName("default-data-region");
137 defaultRegionCfg.setInitialSize(10 * 1024 * 1024); //10MB
138 defaultRegionCfg.setMaxSize(50 * 1024 * 1024); //50MB
139 defaultRegionCfg.setPersistenceEnabled(false);
140 defaultRegionCfg.setPageEvictionMode(DataPageEvictionMode.RANDOM_LRU);
141
142 regionCfg.setName("my-data-region");
143 regionCfg.setInitialSize(10 * 1024 * 1024); //10MB
144 regionCfg.setMaxSize(50 * 1024 * 1024); //50MB
145 regionCfg.setPersistenceEnabled(false);
146
147 dsc.setDefaultDataRegionConfiguration(defaultRegionCfg);
148 dsc.setDataRegionConfigurations(regionCfg);
149
150 return dsc;
151 }
152
153}
1package com.demo.project04.service;
2
3import java.time.Instant;
4import java.util.concurrent.TimeUnit;
5import java.util.concurrent.locks.Lock;
6
7import jakarta.annotation.PostConstruct;
8import lombok.RequiredArgsConstructor;
9import lombok.SneakyThrows;
10import lombok.extern.slf4j.Slf4j;
11import org.apache.ignite.Ignite;
12import org.apache.ignite.IgniteCache;
13import org.springframework.beans.factory.annotation.Value;
14import org.springframework.stereotype.Service;
15
16/**
17 * Interact with Ignite as key-value store (non-persistent store)
18 */
19@Service
20@RequiredArgsConstructor
21@Slf4j
22public class LockService {
23
24 final Ignite ignite;
25 IgniteCache<String, String> cache;
26
27 @Value("${ignite.nodeName:node0}")
28 private String nodeName;
29
30 @PostConstruct
31 public void postInit() {
32 cache = ignite.cache("lock-cache");
33 }
34
35 public String runJob(Integer seconds) {
36 log.info("Acquiring Lock by {}", nodeName);
37 Lock myLock = cache.lock("lock-01");
38 try {
39 myLock.lock();
40 return executeTask(seconds);
41 } finally {
42 myLock.unlock();
43 }
44 }
45
46 @SneakyThrows
47 private String executeTask(Integer seconds) {
48 log.info("Starting job by {}", nodeName);
49 log.info("Sleeping for {} secs", seconds);
50 TimeUnit.SECONDS.sleep(seconds);
51 log.info("Finished job by {}", nodeName);
52 return "Job completed by " + nodeName + " @ " + Instant.now();
53 }
54
55}
1apiVersion: apps/v1
2kind: StatefulSet
3metadata:
4 name: project04
5spec:
6 selector:
7 matchLabels:
8 app: project04
9 serviceName: "project04"
10 replicas: 1
11 template:
12 metadata:
13 labels:
14 app: project04
15 spec:
16 containers:
17 - name: project04
18 image: project04:1.0.0
19 imagePullPolicy: IfNotPresent
20 env:
21 - name: ignite.nodeName
22 valueFrom:
23 fieldRef:
24 fieldPath: metadata.name
25 ports:
26 - containerPort: 47100 # communication SPI port
27 - containerPort: 47500 # discovery SPI port
28 - containerPort: 49112 # dafault JMX port
29 - containerPort: 10800 # thin clients/JDBC driver port
30 - containerPort: 8080 # REST API
31 volumeMounts:
32 - mountPath: /ignite/data
33 name: ignite
34 resources:
35 limits:
36 cpu: "1"
37 memory: "500Mi"
38 volumes:
39 - name: ignite
40 persistentVolumeClaim:
41 claimName: ignite-pv-claim
42---
43apiVersion: v1
44kind: PersistentVolume
45metadata:
46 name: ignite-pv-volume
47 labels:
48 type: local
49 app: project04
50spec:
51 storageClassName: manual
52 capacity:
53 storage: 5Gi
54 accessModes:
55 - ReadWriteMany
56 hostPath:
57 path: "/tmp/data"
58---
59apiVersion: v1
60kind: PersistentVolumeClaim
61metadata:
62 name: ignite-pv-claim
63 labels:
64 app: project04
65spec:
66 storageClassName: manual
67 accessModes:
68 - ReadWriteMany
69 resources:
70 requests:
71 storage: 5Gi
72---
73kind: Service
74apiVersion: v1
75metadata:
76 name: project04
77spec:
78 ports:
79 - port: 8080
80 targetPort: 8080
81 name: http
82 selector:
83 app: project04
84 type: LoadBalancer
Postman
Import the postman collection to postman
Setup
1# Project 04
2
3Distributed Locking - Apache Ignite
4
5[https://gitorko.github.io/distributed-locking-apache-ignite/](https://gitorko.github.io/distributed-locking-apache-ignite/)
6
7### Version
8
9Check version
10
11```bash
12$java --version
13openjdk 21.0.3 2024-04-16 LTS
14```
15
16
17### Dev
18
19To run the code.
20
21```bash
22./gradlew clean build
23./gradlew bootRun
24./gradlew bootJar
25```
26
27To run many node instances
28
29```bash
30cd build/libs
31java -jar project04-1.0.0.jar --server.port=8081 --ignite.nodeName=node1
32java -jar project04-1.0.0.jar --server.port=8082 --ignite.nodeName=node2
33java -jar project04-1.0.0.jar --server.port=8083 --ignite.nodeName=node3
34
35```
36
37JVM tuning parameters
38
39```bash
40java -jar -Xms1024m -Xmx2048m -XX:MaxDirectMemorySize=256m -XX:+DisableExplicitGC -XX:+UseG1GC -XX:+ScavengeBeforeFullGC -XX:+AlwaysPreTouch project04-1.0.0.jar --server.port=8080 --ignite.nodeName=node0
41```
42
43
44Create a service account
45
46```bash
47kubectl apply -f - <<EOF
48apiVersion: v1
49kind: Secret
50metadata:
51 name: default-secret
52 annotations:
53 kubernetes.io/service-account.name: default
54type: kubernetes.io/service-account-token
55EOF
56```
57
58Edit the service account and update the last 2 lines
59
60```bash
61kubectl edit serviceaccounts default
62
63apiVersion: v1
64kind: ServiceAccount
65metadata:
66 creationTimestamp: "XXXX-XX-XXTXX:XX:XXZ"
67 name: default
68 namespace: default
69 resourceVersion: "XXXX"
70 uid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
71secrets:
72 - name: default-secret
73```
74Check if token is created
75
76```bash
77kubectl describe secret default
78```
79
80Provide admin role to the service account
81
82```bash
83kubectl apply -f - <<EOF
84apiVersion: rbac.authorization.k8s.io/v1
85kind: ClusterRoleBinding
86metadata:
87 name: admin-user
88roleRef:
89 apiGroup: rbac.authorization.k8s.io
90 kind: ClusterRole
91 name: cluster-admin
92subjects:
93- kind: ServiceAccount
94 name: default
95 namespace: default
96EOF
97```
98
99Build the docker image
100
101```bash
102docker build -f docker/Dockerfile --force-rm -t project04:1.0.0 .
103```
104
105Deploy to k8s
106
107```bash
108mkdir /tmp/data
109kubectl apply -f docker/deployment.yaml
110kubectl get pods -w
111
112kubectl config set-context --current --namespace=default
113kubectl get deployments
114kubectl scale statefulset project04 --replicas=3
115kubectl scale deployment project04 --replicas=1
116```
117
118Clean up
119
120```bash
121kubectl delete -f docker/deployment.yaml
122```
References
comments powered by Disqus