Spring Batch - Multi Stage Job Orchestration

Overview

Spring boot implementation of multi level job workflow using spring batch

Github: https://github.com/gitorko/project67

Spring Batch

Spring batch is typically used to process data (read-process-write) in the background. Here we will use it as a workflow orchestration engine to execute jobs that take long time to complete for non-batch oriented flow.

As an example we take a travel booking flow, where a customer books flight, hotel, cab in a single click but the actual bookings are done in the background flow. We can use an event based model but the challenges with event model is that it's difficult to track a specific job & restart a specific job. It's difficult to view the event queues and see which jobs are stuck. We might even have to use priority queue to move some event ahead of the queue if the queue has a huge backlog. If there are various steps involved we might have different queues for each, this will allow restart of steps if they fail.

  1. JobBuilder - Create a Job.
  2. StepBuilder - Creates a Step which is part of a job. A step can perform a chunk-oriented task, tasklet, or any other processing logic.
  3. JobStepBuilder - Creates a Step that encapsulates a job within a step. This allows you to run an entire job as a single step within another job Nested Job.
  4. FlowBuilder - Defines a Flow of steps that can be executed within a job. A Flow can contain multiple steps, decision points, and other nested flows. A Flow can be part of multiple jobs or nested within other flows.
  5. FlowJobBuilder - Defines a FlowJob that executes a flow. Wraps a Flow into a Job, allowing the Flow to be executed as part of a job Nested Flow.

The RunIdIncrementer provides a mechanism to generate a unique job parameter (run.id) for each execution of a job. This is needed for re-running jobs with the same configuration and parameters multiple times without conflicts.

So we will use spring batch which provides Job & Step flow to orchestrate our booking flow.

Features:

  1. Retry will be attempted in case of failure with transactional rollback.
  2. Jobs can be restarted
  3. You can write if-else flows in the logic

After you run the code you see the jobs complete.

1curl --location 'http://localhost:8080/book-travel' \
2--header 'Content-Type: application/json' \
3--data '{
4    "customer": "ryan"
5}'

You can also run a batch job that reads a csv and writes it to a csv file and db.

1curl --location --request POST 'http://localhost:8080/employee-batch-job' \
2--data ''

Code

 1package com.demo.project67.task;
 2
 3import java.time.LocalDateTime;
 4
 5import com.demo.project67.domain.BookingEvent;
 6import com.demo.project67.repository.BookingEventRepository;
 7import com.demo.project67.service.HelperUtil;
 8import lombok.RequiredArgsConstructor;
 9import lombok.extern.slf4j.Slf4j;
10import org.springframework.batch.core.StepContribution;
11import org.springframework.batch.core.scope.context.ChunkContext;
12import org.springframework.batch.core.step.tasklet.Tasklet;
13import org.springframework.batch.repeat.RepeatStatus;
14import org.springframework.stereotype.Service;
15import org.springframework.transaction.annotation.Transactional;
16
17@Service
18@Slf4j
19@RequiredArgsConstructor
20public class BookCabTask {
21
22    final BookingEventRepository bookingEventRepository;
23
24    @Transactional
25    public Tasklet bookCab() {
26        return new Tasklet() {
27            @Override
28            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
29                Long bookingId = (Long) chunkContext.getStepContext()
30                        .getJobParameters()
31                        .get("bookingId");
32                log.info("Running bookCab, bookingId: {}", bookingId);
33                String customer = (String) chunkContext.getStepContext()
34                        .getJobParameters()
35                        .get("customer");
36                bookingEventRepository.save(
37                        BookingEvent.builder()
38                                .event("Cab Booked for customer " + customer)
39                                .bookingId(bookingId)
40                                .createdOn(LocalDateTime.now())
41                                .build()
42                );
43                HelperUtil.delay(10);
44                log.info("Completed bookCab, bookingId: {}", bookingId);
45                return RepeatStatus.FINISHED;
46            }
47        };
48    }
49}
 1package com.demo.project67.task;
 2
 3import java.time.LocalDateTime;
 4
 5import com.demo.project67.domain.BookingEvent;
 6import com.demo.project67.repository.BookingEventRepository;
 7import com.demo.project67.service.HelperUtil;
 8import lombok.RequiredArgsConstructor;
 9import lombok.extern.slf4j.Slf4j;
10import org.springframework.batch.core.step.tasklet.Tasklet;
11import org.springframework.batch.repeat.RepeatStatus;
12import org.springframework.stereotype.Service;
13
14@Service
15@Slf4j
16@RequiredArgsConstructor
17public class BookFlightTask {
18    final BookingEventRepository bookingEventRepository;
19
20    public Tasklet bookFlight() {
21        return (contribution, chunkContext) -> {
22            Long bookingId = (Long) chunkContext.getStepContext()
23                    .getJobParameters()
24                    .get("bookingId");
25            log.info("Running bookFlight, bookingId: {}", bookingId);
26            String customer = (String) chunkContext.getStepContext()
27                    .getJobParameters()
28                    .get("customer");
29            bookingEventRepository.save(
30                    BookingEvent.builder()
31                            .event("Flight Booked for customer " + customer)
32                            .bookingId(bookingId)
33                            .createdOn(LocalDateTime.now())
34                            .build()
35            );
36            HelperUtil.delay(10);
37            log.info("Completed bookFlight, bookingId: {}", bookingId);
38            return RepeatStatus.FINISHED;
39        };
40    }
41}
 1package com.demo.project67.task;
 2
 3import java.time.LocalDateTime;
 4import java.util.concurrent.atomic.AtomicInteger;
 5
 6import com.demo.project67.domain.BookingEvent;
 7import com.demo.project67.repository.BookingEventRepository;
 8import com.demo.project67.service.HelperUtil;
 9import lombok.RequiredArgsConstructor;
10import lombok.extern.slf4j.Slf4j;
11import org.springframework.batch.core.step.tasklet.Tasklet;
12import org.springframework.batch.repeat.RepeatStatus;
13import org.springframework.stereotype.Service;
14import org.springframework.transaction.annotation.Transactional;
15
16@Service
17@Slf4j
18@RequiredArgsConstructor
19public class FlightNotificationTask {
20    final BookingEventRepository bookingEventRepository;
21    AtomicInteger attemptCounter = new AtomicInteger();
22
23    /**
24     * Retry in flow
25     */
26    @Transactional
27    public Tasklet sendingFlightNotificationTask() {
28        return (contribution, chunkContext) -> {
29            Long bookingId = (Long) chunkContext.getStepContext()
30                    .getJobParameters()
31                    .get("bookingId");
32            log.info("Running sendingFlightNotificationTask, bookingId: {}, Attempt: {}", bookingId, attemptCounter.get());
33            String customer = (String) chunkContext.getStepContext()
34                    .getJobParameters()
35                    .get("customer");
36            bookingEventRepository.save(
37                    BookingEvent.builder()
38                            .event("Flight Notification Sent to customer " + customer + ", Attempt: " + attemptCounter.get())
39                            .bookingId(bookingId)
40                            .createdOn(LocalDateTime.now())
41                            .build()
42            );
43            log.info("sendingFlightNotificationTask,  bookingId: {}, Attempt: {}", bookingId, attemptCounter.get());
44            HelperUtil.delay(10);
45            //Simulate error for first 2 attempts
46            if (attemptCounter.incrementAndGet() < 3) {
47                log.error("Failed to send flight notification!");
48                throw new RuntimeException("Failed to send flight notification!");
49            }
50            log.info("Completed sendingFlightNotificationTask, bookingId: {}, Attempt: {}", bookingId, attemptCounter.get());
51            return RepeatStatus.FINISHED;
52        };
53    }
54}
 1package com.demo.project67.workflow;
 2
 3import com.demo.project67.exception.NotificationExceptionHandler;
 4import com.demo.project67.task.FlightNotificationTask;
 5import com.demo.project67.task.HotelNotificationTask;
 6import lombok.RequiredArgsConstructor;
 7import lombok.extern.slf4j.Slf4j;
 8import org.springframework.batch.core.BatchStatus;
 9import org.springframework.batch.core.Job;
10import org.springframework.batch.core.Step;
11import org.springframework.batch.core.job.builder.JobBuilder;
12import org.springframework.batch.core.job.flow.FlowExecutionStatus;
13import org.springframework.batch.core.job.flow.JobExecutionDecider;
14import org.springframework.batch.core.repository.JobRepository;
15import org.springframework.batch.core.step.builder.JobStepBuilder;
16import org.springframework.batch.core.step.builder.StepBuilder;
17import org.springframework.context.annotation.Bean;
18import org.springframework.context.annotation.Configuration;
19import org.springframework.transaction.PlatformTransactionManager;
20
21/**
22 * Shows 2 different way of doing retry
23 *
24 * 2 Jobs with 1 Step each.
25 */
26@Configuration
27@RequiredArgsConstructor
28@Slf4j
29public class NotificationWorkflow {
30
31    final JobRepository jobRepository;
32    final PlatformTransactionManager transactionManager;
33    final HotelNotificationTask hotelNotificationTask;
34    final FlightNotificationTask flightNotificationTask;
35    final NotificationExceptionHandler notificationExceptionHandler;
36
37    @Bean(name = "notificationStartJob")
38    public Job notificationStartJob(Job sendFlightNotificationJob, Job sendHotelNotificationJob) {
39        return new JobBuilder("notificationStartJob", jobRepository)
40                .start(sendFlightNotificationJobStep(sendFlightNotificationJob))
41                .next(sendHotelNotificationJobStep(sendHotelNotificationJob))
42                .build();
43    }
44
45    private Step sendFlightNotificationJobStep(Job sendFlightNotificationJob) {
46        return new JobStepBuilder(new StepBuilder("sendFlightNotificationJobStep", jobRepository))
47                .job(sendFlightNotificationJob)
48                .build();
49    }
50
51    private Step sendHotelNotificationJobStep(Job sendHotelNotificationJob) {
52        return new JobStepBuilder(new StepBuilder("sendHotelNotificationJobStep", jobRepository))
53                .job(sendHotelNotificationJob)
54                .build();
55    }
56
57    @Bean(name = "sendFlightNotificationJob")
58    public Job sendFlightNotificationJob(Step sendFlightNotificationStep) {
59        return new JobBuilder("sendFlightNotificationJob", jobRepository)
60                .start(sendFlightNotificationStep)
61                .next(retryDecider())
62                .from(retryDecider()).on("RETRY").to(sendFlightNotificationStep)
63                .from(retryDecider()).on("COMPLETED").end()
64                .end()
65                .build();
66    }
67
68    @Bean(name = "sendHotelNotificationJob")
69    public Job sendHotelNotificationJob(Step sendHotelNotificationStep) {
70        return new JobBuilder("sendHotelNotificationJob", jobRepository)
71                .start(sendHotelNotificationStep)
72                .build();
73    }
74
75    @Bean(name = "sendFlightNotificationStep")
76    public Step sendNotificationStep() {
77        return new StepBuilder("sendFlightNotificationStep", jobRepository)
78                .tasklet(flightNotificationTask.sendingFlightNotificationTask(), transactionManager)
79                .exceptionHandler(notificationExceptionHandler)
80                .build();
81    }
82
83    @Bean(name = "sendHotelNotificationStep")
84    public Step sendHotelNotificationStep() {
85        return new StepBuilder("sendHotelNotificationStep", jobRepository)
86                .tasklet(hotelNotificationTask.sendHotelNotificationTask(), transactionManager)
87                .exceptionHandler(notificationExceptionHandler)
88                .build();
89    }
90
91    private JobExecutionDecider retryDecider() {
92        return (jobExecution, stepExecution) -> {
93            if (stepExecution.getStatus() == BatchStatus.FAILED) {
94                return new FlowExecutionStatus("RETRY");
95            }
96            return new FlowExecutionStatus("COMPLETED");
97        };
98    }
99}

Postman

Import the postman collection to postman

Postman Collection

Setup

 1# Project 67
 2
 3Spring Batch - Multi Stage Job Orchestration
 4
 5[https://gitorko.github.io/post/spring-batch-orchestration](https://gitorko.github.io/post/spring-batch-orchestration)
 6
 7### Version
 8
 9Check version
10
11```bash
12$java --version
13openjdk 21.0.3 2024-04-16 LTS
14```
15
16### Postgres DB
17
18```bash
19docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:14
20docker ps
21docker exec -it pg-container psql -U postgres -W postgres
22CREATE USER test WITH PASSWORD 'test@123';
23CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
24grant all PRIVILEGES ON DATABASE "test-db" to test;
25
26docker stop pg-container
27docker start pg-container
28```
29
30### Dev
31
32To run the backend in dev mode.
33
34```bash
35./gradlew clean build
36./gradlew bootRun
37
38```
39

References

https://spring.io/projects/spring-batch

comments powered by Disqus