Spring - JobRunr

Overview

Spring Boot 3 integration with JobRunr

Github: https://github.com/gitorko/project59

JobRunr

JobRunr is a distributed job scheduler. If a service runs on many nodes the JobRunr ensure that a scheduled job is run only on a single instance. If you run a spring @Scheduled annotation then all instances will start the same job, you can use shedlock library to prevent this but this requires extra code.

Types of Job

  1. Fire-Forget
  2. Delayed
  3. Recurring Job

Advantages

  1. It lets you schedule background jobs using lambda.
  2. The jobs can run on a distributed nodes, more node that join, the work gets distributed.
  3. It serializes the lambda as JSON and stores it in db.
  4. It also contains an automatic retry feature with an exponential back-off policy for failed jobs.
  5. There is also a built-in dashboard that allows you to monitor all jobs.
  6. It is self-maintaining, Successful jobs are automatically deleted after a configurable amount of time, so there is no need to perform manual storage cleanup.
  7. The job details are stored in db.

Code

 1package com.demo.project59.controller;
 2
 3import java.time.LocalDateTime;
 4import java.time.format.DateTimeFormatter;
 5import java.util.concurrent.atomic.AtomicInteger;
 6
 7import com.demo.project59.service.AppService;
 8import lombok.RequiredArgsConstructor;
 9import lombok.extern.slf4j.Slf4j;
10import org.jobrunr.jobs.context.JobContext;
11import org.jobrunr.scheduling.JobScheduler;
12import org.springframework.http.ResponseEntity;
13import org.springframework.web.bind.annotation.GetMapping;
14import org.springframework.web.bind.annotation.PathVariable;
15import org.springframework.web.bind.annotation.RestController;
16
17@RestController
18@Slf4j
19@RequiredArgsConstructor
20public class HomeController {
21
22    final AppService appService;
23    final JobScheduler jobScheduler;
24    DateTimeFormatter dtFormat = DateTimeFormatter.ofPattern("ddMMyyyyHHmmss");
25
26    @GetMapping("/invoke-job")
27    public ResponseEntity invokeJob() {
28        String jobTag = "job_" + dtFormat.format(LocalDateTime.now());
29        //These will enqueue jobs, will run on a different thread
30        jobScheduler.enqueue(() -> {
31            appService.doJobWithTag(jobTag);
32        });
33        jobScheduler.enqueue(() -> {
34            appService.doFireAndForgetWork();
35        });
36        jobScheduler.enqueue(() -> {
37            appService.doDelayedWorkWithProgressBar(JobContext.Null);
38        });
39        log.info("Job enqueued!");
40        return ResponseEntity.ok().build();
41    }
42
43    @GetMapping("/direct-call")
44    public ResponseEntity diretCall() {
45        appService.directCall();
46        log.info("Job submitted!");
47        return ResponseEntity.ok().build();
48    }
49
50    @GetMapping("/retry-job")
51    public ResponseEntity retryJob() {
52        jobScheduler.enqueue(() -> {
53            appService.doJobRetry();
54        });
55        log.info("Retry Job submitted!");
56        return ResponseEntity.ok().build();
57    }
58
59    /**
60     * When there are pool of workers they will pick the jobs from queue and complete them.
61     * This distributes the work across many nodes.
62     */
63    @GetMapping("/many-job/{count}")
64    public ResponseEntity submitManyJobs(@PathVariable Integer count) {
65        AtomicInteger counter = new AtomicInteger();
66        for (int i = 0; i < count; i++) {
67            String jobTag = "job_" + counter.getAndIncrement();
68            jobScheduler.enqueue(() -> {
69                appService.doJobWithTag(jobTag);
70            });
71        }
72        log.info("Many Jobs submitted!");
73        return ResponseEntity.ok().build();
74    }
75
76}
77
  1package com.demo.project59.service;
  2
  3import java.time.Instant;
  4import java.util.concurrent.TimeUnit;
  5
  6import com.demo.project59.domain.Customer;
  7import com.demo.project59.repository.CustomerRepository;
  8import lombok.RequiredArgsConstructor;
  9import lombok.SneakyThrows;
 10import lombok.extern.slf4j.Slf4j;
 11import org.jobrunr.jobs.annotations.Job;
 12import org.jobrunr.jobs.annotations.Recurring;
 13import org.jobrunr.jobs.context.JobContext;
 14import org.jobrunr.jobs.context.JobDashboardProgressBar;
 15import org.jobrunr.scheduling.JobScheduler;
 16import org.jobrunr.scheduling.cron.Cron;
 17import org.springframework.stereotype.Service;
 18
 19@Service
 20@Slf4j
 21@RequiredArgsConstructor
 22public class AppService {
 23
 24    final CustomerRepository customerRepository;
 25    final JobScheduler jobScheduler;
 26
 27    @SneakyThrows
 28    public void doDelayedWork() {
 29        log.info("Running doWork");
 30        TimeUnit.SECONDS.sleep(1);
 31        log.info("Completed doWork");
 32    }
 33
 34    @Job(name = "doJob [jobTag: %0]")
 35    @SneakyThrows
 36    public void doJobWithTag(String jobTag) {
 37        log.info("Running doJob");
 38        TimeUnit.MINUTES.sleep(1);
 39        log.info("Completed doJob");
 40    }
 41
 42    /**
 43     * Once the job fails it can be picked for next retry by any other server in the pool.
 44     * So don't use AtomicInteger to track count
 45     */
 46    @Job(name = "doJobRetry", retries = 3)
 47    @SneakyThrows
 48    public void doJobRetry() {
 49        log.info("Running doJobRetry");
 50        Customer customer = customerRepository.findById(200l).orElseThrow();
 51        customer.setInvokeCount(customer.getInvokeCount() + 1);
 52        customerRepository.save(customer);
 53        log.info("Updated customer invoke count!");
 54        if (customer.getInvokeCount() < 3) {
 55            throw new RuntimeException("Will not work first 2 times");
 56        }
 57        log.info("Completed doJobRetry");
 58    }
 59
 60    @Job(name = "doFireAndForgetWork")
 61    @SneakyThrows
 62    public void doFireAndForgetWork() {
 63        log.info("Running fireForget");
 64        TimeUnit.MINUTES.sleep(1);
 65        log.info("Completed fireForget");
 66    }
 67
 68    @Job(name = "hourlyJob")
 69    @Recurring(id = "hourlyJob", cron = "0 * * * *", zoneId = "Asia/Kolkata")
 70    @SneakyThrows
 71    public void doHourlyWork() {
 72        log.info("Running hourlyJob");
 73        TimeUnit.SECONDS.sleep(5);
 74        log.info("Completed hourlyJob");
 75    }
 76
 77    @Job(name = "doDelayedWorkWithProgressBar")
 78    @SneakyThrows
 79    public void doDelayedWorkWithProgressBar(JobContext jobContext) {
 80        JobDashboardProgressBar bar = jobContext.progressBar(100);
 81        for (int i = 0; i < 10; i++) {
 82            log.info("Running longJob");
 83            //progress by 10% each time
 84            bar.setProgress(10 * i);
 85            TimeUnit.SECONDS.sleep(5);
 86            log.info("Completed fireForget");
 87        }
 88    }
 89
 90    public void directCall() {
 91        //Recurring
 92        jobScheduler.scheduleRecurrently(Cron.hourly(), () -> {
 93            doDelayedWork();
 94        });
 95
 96        //Fire Forget
 97        jobScheduler.enqueue(() -> {
 98            doDelayedWork();
 99        });
100
101        //Delay job, scheduled to run in future
102        jobScheduler.schedule(Instant.now().plusSeconds(30), () -> {
103            doDelayedWork();
104        });
105    }
106}
 1spring:
 2  main:
 3    banner-mode: "off"
 4  datasource:
 5    driver-class-name: org.postgresql.Driver
 6    url: jdbc:postgresql://localhost:5432/test-db
 7    username: test
 8    password: test@123
 9  jpa:
10    show-sql: false
11    hibernate.ddl-auto: create
12    properties.hibernate.temp.use_jdbc_metadata_defaults: false
13    database-platform: org.hibernate.dialect.PostgreSQLDialect
14    defer-datasource-initialization: true
15  sql:
16    init:
17      mode: always
18org:
19  jobrunr:
20    job-scheduler:
21      enabled: true
22    dashboard:
23      enabled: true
24      port: 8085
25    background-job-server:
26      enabled: true
27      poll-interval-in-seconds: 5 #check for new work every 5 seconds
28      worker-count: 10 #this value normally is defined by the amount of CPU's that are available
29      delete-succeeded-jobs-after: 36 #succeeded jobs will go to the deleted state after 36 hours
30      permanently-delete-deleted-jobs-after: 72 #deleted jobs will be deleted permanently after 72 hours
31    database:
32      table-prefix: "project59_" # allows to set a table prefix
33    jobs:
34      default-number-of-retries: 10 #the default number of retries for a failing job
35      retry-back-off-time-seed: 3 #the default time seed for the exponential back-off policy.
36      metrics:
37        enabled: true #Micrometer integration

Open dashboard: http://localhost:8000/dashboard/

Setup

 1# Project 59
 2
 3Spring Boot JobRunr
 4
 5[https://gitorko.github.io/spring-jobrunr/](https://gitorko.github.io/spring-jobrunr/)
 6
 7### Version
 8
 9Check version
10
11```bash
12$java --version
13openjdk version "21.0.3" 2024-04-16 LTS
14```
15
16### Postgres DB
17
18```
19docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:14
20docker ps
21docker exec -it pg-container psql -U postgres -W postgres
22CREATE USER test WITH PASSWORD 'test@123';
23CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
24grant all PRIVILEGES ON DATABASE "test-db" to test;
25
26docker stop pg-container
27docker start pg-container
28```
29
30### Dev
31
32To run the code
33
34```bash
35./gradlew clean build
36./gradlew bootRun
37```
38
39Dashboard:
40
41[http://localhost:8085/dashboard](http://localhost:8085/dashboard)
42
43```bash
44curl --location --request GET 'http://localhost:8080/invoke-job'
45curl --location --request GET 'http://localhost:8080/direct-call'
46curl --location --request GET 'http://localhost:8080/retry-job'
47
48```
49
50Create a pool of 3 servers
51
52```bash
53java -jar build/libs/project59-1.0.0.jar --server.port=8081 --org.jobrunr.dashboard.enabled=true
54java -jar build/libs/project59-1.0.0.jar --server.port=8082 --org.jobrunr.dashboard.enabled=false
55java -jar build/libs/project59-1.0.0.jar --server.port=8083 --org.jobrunr.dashboard.enabled=false
56```
57
58
59Submit 500 jobs that will be processed by the 3 servers
60
61```bash
62curl --location --request GET 'http://localhost:8081/many-job/500'
63```

References

https://www.jobrunr.io/en/ https://www.jobrunr.io/en/documentation/configuration/spring/

comments powered by Disqus