Spring - JobRunr
Overview
Spring Boot 3 integration with JobRunr
Github: https://github.com/gitorko/project59
JobRunr
JobRunr is a distributed job scheduler. If a service runs on many nodes the JobRunr ensure that a scheduled job is run only on a single instance.
If you run a spring @Scheduled
annotation then all instances will start the same job, you can use shedlock library to prevent this but this requires extra code.
Types of Job
- Fire-Forget
- Delayed
- Recurring Job
Advantages
- It lets you schedule background jobs using lambda.
- The jobs can run on a distributed nodes, more node that join, the work gets distributed.
- It serializes the lambda as JSON and stores it in db.
- It also contains an automatic retry feature with an exponential back-off policy for failed jobs.
- There is also a built-in dashboard that allows you to monitor all jobs.
- It is self-maintaining, Successful jobs are automatically deleted after a configurable amount of time, so there is no need to perform manual storage cleanup.
- The job details are stored in db.
Code
1package com.demo.project59.controller;
2
3import java.time.LocalDateTime;
4import java.time.format.DateTimeFormatter;
5import java.util.concurrent.atomic.AtomicInteger;
6
7import com.demo.project59.service.AppService;
8import lombok.RequiredArgsConstructor;
9import lombok.extern.slf4j.Slf4j;
10import org.jobrunr.jobs.context.JobContext;
11import org.jobrunr.scheduling.JobScheduler;
12import org.springframework.http.ResponseEntity;
13import org.springframework.web.bind.annotation.GetMapping;
14import org.springframework.web.bind.annotation.PathVariable;
15import org.springframework.web.bind.annotation.RestController;
16
17@RestController
18@Slf4j
19@RequiredArgsConstructor
20public class HomeController {
21
22 final AppService appService;
23 final JobScheduler jobScheduler;
24 DateTimeFormatter dtFormat = DateTimeFormatter.ofPattern("ddMMyyyyHHmmss");
25
26 @GetMapping("/invoke-job")
27 public ResponseEntity invokeJob() {
28 String jobTag = "job_" + dtFormat.format(LocalDateTime.now());
29 //These will enqueue jobs, will run on a different thread
30 jobScheduler.enqueue(() -> {
31 appService.doJobWithTag(jobTag);
32 });
33 jobScheduler.enqueue(() -> {
34 appService.doFireAndForgetWork();
35 });
36 jobScheduler.enqueue(() -> {
37 appService.doDelayedWorkWithProgressBar(JobContext.Null);
38 });
39 log.info("Job enqueued!");
40 return ResponseEntity.ok().build();
41 }
42
43 @GetMapping("/direct-call")
44 public ResponseEntity diretCall() {
45 appService.directCall();
46 log.info("Job submitted!");
47 return ResponseEntity.ok().build();
48 }
49
50 @GetMapping("/retry-job")
51 public ResponseEntity retryJob() {
52 jobScheduler.enqueue(() -> {
53 appService.doJobRetry();
54 });
55 log.info("Retry Job submitted!");
56 return ResponseEntity.ok().build();
57 }
58
59 /**
60 * When there are pool of workers they will pick the jobs from queue and complete them.
61 * This distributes the work across many nodes.
62 */
63 @GetMapping("/many-job/{count}")
64 public ResponseEntity submitManyJobs(@PathVariable Integer count) {
65 AtomicInteger counter = new AtomicInteger();
66 for (int i = 0; i < count; i++) {
67 String jobTag = "job_" + counter.getAndIncrement();
68 jobScheduler.enqueue(() -> {
69 appService.doJobWithTag(jobTag);
70 });
71 }
72 log.info("Many Jobs submitted!");
73 return ResponseEntity.ok().build();
74 }
75
76}
77
1package com.demo.project59.service;
2
3import java.time.Instant;
4import java.util.concurrent.TimeUnit;
5
6import com.demo.project59.domain.Customer;
7import com.demo.project59.repository.CustomerRepository;
8import lombok.RequiredArgsConstructor;
9import lombok.SneakyThrows;
10import lombok.extern.slf4j.Slf4j;
11import org.jobrunr.jobs.annotations.Job;
12import org.jobrunr.jobs.annotations.Recurring;
13import org.jobrunr.jobs.context.JobContext;
14import org.jobrunr.jobs.context.JobDashboardProgressBar;
15import org.jobrunr.scheduling.JobScheduler;
16import org.jobrunr.scheduling.cron.Cron;
17import org.springframework.stereotype.Service;
18
19@Service
20@Slf4j
21@RequiredArgsConstructor
22public class AppService {
23
24 final CustomerRepository customerRepository;
25 final JobScheduler jobScheduler;
26
27 @SneakyThrows
28 public void doDelayedWork() {
29 log.info("Running doWork");
30 TimeUnit.SECONDS.sleep(1);
31 log.info("Completed doWork");
32 }
33
34 @Job(name = "doJob [jobTag: %0]")
35 @SneakyThrows
36 public void doJobWithTag(String jobTag) {
37 log.info("Running doJob");
38 TimeUnit.MINUTES.sleep(1);
39 log.info("Completed doJob");
40 }
41
42 /**
43 * Once the job fails it can be picked for next retry by any other server in the pool.
44 * So don't use AtomicInteger to track count
45 */
46 @Job(name = "doJobRetry", retries = 3)
47 @SneakyThrows
48 public void doJobRetry() {
49 log.info("Running doJobRetry");
50 Customer customer = customerRepository.findById(200l).orElseThrow();
51 customer.setInvokeCount(customer.getInvokeCount() + 1);
52 customerRepository.save(customer);
53 log.info("Updated customer invoke count!");
54 if (customer.getInvokeCount() < 3) {
55 throw new RuntimeException("Will not work first 2 times");
56 }
57 log.info("Completed doJobRetry");
58 }
59
60 @Job(name = "doFireAndForgetWork")
61 @SneakyThrows
62 public void doFireAndForgetWork() {
63 log.info("Running fireForget");
64 TimeUnit.MINUTES.sleep(1);
65 log.info("Completed fireForget");
66 }
67
68 @Job(name = "hourlyJob")
69 @Recurring(id = "hourlyJob", cron = "0 * * * *", zoneId = "Asia/Kolkata")
70 @SneakyThrows
71 public void doHourlyWork() {
72 log.info("Running hourlyJob");
73 TimeUnit.SECONDS.sleep(5);
74 log.info("Completed hourlyJob");
75 }
76
77 @Job(name = "doDelayedWorkWithProgressBar")
78 @SneakyThrows
79 public void doDelayedWorkWithProgressBar(JobContext jobContext) {
80 JobDashboardProgressBar bar = jobContext.progressBar(100);
81 for (int i = 0; i < 10; i++) {
82 log.info("Running longJob");
83 //progress by 10% each time
84 bar.setProgress(10 * i);
85 TimeUnit.SECONDS.sleep(5);
86 log.info("Completed fireForget");
87 }
88 }
89
90 public void directCall() {
91 //Recurring
92 jobScheduler.scheduleRecurrently(Cron.hourly(), () -> {
93 doDelayedWork();
94 });
95
96 //Fire Forget
97 jobScheduler.enqueue(() -> {
98 doDelayedWork();
99 });
100
101 //Delay job, scheduled to run in future
102 jobScheduler.schedule(Instant.now().plusSeconds(30), () -> {
103 doDelayedWork();
104 });
105 }
106}
1spring:
2 main:
3 banner-mode: "off"
4 datasource:
5 driver-class-name: org.postgresql.Driver
6 url: jdbc:postgresql://localhost:5432/test-db
7 username: test
8 password: test@123
9 jpa:
10 show-sql: false
11 hibernate.ddl-auto: create
12 properties.hibernate.temp.use_jdbc_metadata_defaults: false
13 database-platform: org.hibernate.dialect.PostgreSQLDialect
14 defer-datasource-initialization: true
15 sql:
16 init:
17 mode: always
18org:
19 jobrunr:
20 job-scheduler:
21 enabled: true
22 dashboard:
23 enabled: true
24 port: 8085
25 background-job-server:
26 enabled: true
27 poll-interval-in-seconds: 5 #check for new work every 5 seconds
28 worker-count: 10 #this value normally is defined by the amount of CPU's that are available
29 delete-succeeded-jobs-after: 36 #succeeded jobs will go to the deleted state after 36 hours
30 permanently-delete-deleted-jobs-after: 72 #deleted jobs will be deleted permanently after 72 hours
31 database:
32 table-prefix: "project59_" # allows to set a table prefix
33 jobs:
34 default-number-of-retries: 10 #the default number of retries for a failing job
35 retry-back-off-time-seed: 3 #the default time seed for the exponential back-off policy.
36 metrics:
37 enabled: true #Micrometer integration
Open dashboard: http://localhost:8000/dashboard/
Setup
1# Project 59
2
3Spring Boot JobRunr
4
5[https://gitorko.github.io/spring-jobrunr/](https://gitorko.github.io/spring-jobrunr/)
6
7### Version
8
9Check version
10
11```bash
12$java --version
13openjdk version "21.0.3" 2024-04-16 LTS
14```
15
16### Postgres DB
17
18```
19docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:14
20docker ps
21docker exec -it pg-container psql -U postgres -W postgres
22CREATE USER test WITH PASSWORD 'test@123';
23CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
24grant all PRIVILEGES ON DATABASE "test-db" to test;
25
26docker stop pg-container
27docker start pg-container
28```
29
30### Dev
31
32To run the code
33
34```bash
35./gradlew clean build
36./gradlew bootRun
37```
38
39Dashboard:
40
41[http://localhost:8085/dashboard](http://localhost:8085/dashboard)
42
43```bash
44curl --location --request GET 'http://localhost:8080/invoke-job'
45curl --location --request GET 'http://localhost:8080/direct-call'
46curl --location --request GET 'http://localhost:8080/retry-job'
47
48```
49
50Create a pool of 3 servers
51
52```bash
53java -jar build/libs/project59-1.0.0.jar --server.port=8081 --org.jobrunr.dashboard.enabled=true
54java -jar build/libs/project59-1.0.0.jar --server.port=8082 --org.jobrunr.dashboard.enabled=false
55java -jar build/libs/project59-1.0.0.jar --server.port=8083 --org.jobrunr.dashboard.enabled=false
56```
57
58
59Submit 500 jobs that will be processed by the 3 servers
60
61```bash
62curl --location --request GET 'http://localhost:8081/many-job/500'
63```
References
https://www.jobrunr.io/en/ https://www.jobrunr.io/en/documentation/configuration/spring/
comments powered by Disqus