Spring Webflux & Reactive JDBC

Overview

Webflux integration with reactive JDBC, to allow non-blocking calls to database.

Github: https://github.com/gitorko/project64

Webflux JDBC

This approach provides alternate way to integrate existing relational database with webflux if the project is not ready to use R2DBC.

Code

 1package com.demo.project64;
 2
 3import java.time.Duration;
 4
 5import com.demo.project64.domain.Customer;
 6import com.demo.project64.repositoryservice.CustomerReactiveRepoService;
 7import lombok.extern.slf4j.Slf4j;
 8import org.springframework.boot.CommandLineRunner;
 9import org.springframework.boot.SpringApplication;
10import org.springframework.boot.autoconfigure.SpringBootApplication;
11import org.springframework.context.annotation.Bean;
12import reactor.core.publisher.Flux;
13
14@SpringBootApplication
15@Slf4j
16public class Main {
17
18    public static void main(String[] args) {
19        SpringApplication.run(Main.class, args);
20    }
21
22    @Bean
23    public CommandLineRunner seedData(CustomerReactiveRepoService customerReactiveService) {
24        return args -> {
25            log.info("Seeding data!");
26
27            Flux<String> names = Flux.just("raj", "david", "pam").delayElements(Duration.ofSeconds(1));
28            Flux<Integer> ages = Flux.just(25, 27, 30).delayElements(Duration.ofSeconds(1));
29            Flux<Customer> customers = Flux.zip(names, ages).map(tupple -> {
30                return new Customer(null, tupple.getT1(), tupple.getT2());
31            });
32
33            customerReactiveService.deleteAll().thenMany(customers.flatMap(c -> customerReactiveService.save(c))
34                    .thenMany(customerReactiveService.findAll())).subscribe(System.out::println);
35        };
36    }
37
38}
39
 1package com.demo.project64.repositoryservice;
 2
 3import java.util.Optional;
 4import java.util.concurrent.Callable;
 5
 6import org.springframework.beans.factory.annotation.Autowired;
 7import org.springframework.beans.factory.annotation.Qualifier;
 8import org.springframework.data.domain.Page;
 9import org.springframework.data.domain.Pageable;
10import org.springframework.data.jpa.repository.JpaRepository;
11import reactor.core.publisher.Flux;
12import reactor.core.publisher.Mono;
13import reactor.core.scheduler.Scheduler;
14import reactor.core.scheduler.Schedulers;
15
16public abstract class AbstractReactiveRepoService<T> {
17
18    @Qualifier("jdbcScheduler")
19    @Autowired
20    Scheduler jdbcScheduler;
21
22    public Mono<Page<T>> findAll(Pageable pageable) {
23        return asyncCallable(() -> getRepository().findAll(pageable));
24    }
25
26    public Flux<T> findAll() {
27        return asyncIterable(() -> getRepository().findAll().iterator());
28    }
29
30    public Mono<Optional<T>> findById(Long id) {
31        return asyncCallable(() -> getRepository().findById(id));
32    }
33
34    public Mono<T> save(T customer) {
35        return (Mono<T>) asyncCallable(() -> getRepository().save(customer));
36    }
37
38    public Mono<Void> delete(T object) {
39        return asyncCallable(() -> {
40            getRepository().delete(object);
41            return null;
42        });
43    }
44
45    public Mono<Void> deleteAll() {
46        return asyncCallable(() -> {
47            getRepository().deleteAll();
48            return null;
49        });
50    }
51
52    protected <T> Mono<T> asyncCallable(Callable<T> callable) {
53        return Mono.fromCallable(callable).subscribeOn(Schedulers.newParallel("jdbc-thread")).publishOn(jdbcScheduler);
54    }
55
56    protected <T> Flux<T> asyncIterable(Iterable<T> iterable) {
57        return Flux.fromIterable(iterable).subscribeOn(Schedulers.newParallel("jdbc-thread")).publishOn(jdbcScheduler);
58    }
59
60    protected abstract JpaRepository getRepository();
61
62}
 1package com.demo.project64.repositoryservice;
 2
 3import java.util.List;
 4
 5import com.demo.project64.domain.Customer;
 6import com.demo.project64.repository.CustomerRepository;
 7import lombok.RequiredArgsConstructor;
 8import lombok.extern.slf4j.Slf4j;
 9import org.springframework.data.jpa.repository.JpaRepository;
10import org.springframework.stereotype.Service;
11import reactor.core.publisher.Mono;
12
13@Service
14@RequiredArgsConstructor
15@Slf4j
16public class CustomerReactiveRepoService extends AbstractReactiveRepoService<Customer> {
17
18    private final CustomerRepository customerRepository;
19
20    @Override
21    protected JpaRepository getRepository() {
22        return customerRepository;
23    }
24
25    public Mono<List<Customer>> findByNameAndAge(String name, Integer age) {
26        return asyncCallable(() -> customerRepository.findByNameAndAge(name, age));
27    }
28
29}
30
31
 1package com.demo.project64.repository;
 2
 3import java.util.List;
 4
 5import com.demo.project64.domain.Customer;
 6import org.springframework.data.jpa.repository.JpaRepository;
 7
 8public interface CustomerRepository extends JpaRepository<Customer, Long> {
 9    List<Customer> findByNameAndAge(String name, Integer age);
10}
 1package com.demo.project64.controller;
 2
 3import java.util.List;
 4import java.util.Optional;
 5
 6import com.demo.project64.domain.Customer;
 7import com.demo.project64.domain.DownloadFile;
 8import com.demo.project64.service.CsvService;
 9import com.demo.project64.repositoryservice.CustomerReactiveRepoService;
10import com.demo.project64.repositoryservice.DownloadFileReactiveRepoService;
11import lombok.RequiredArgsConstructor;
12import org.springframework.core.io.FileSystemResource;
13import org.springframework.core.io.Resource;
14import org.springframework.http.CacheControl;
15import org.springframework.http.HttpHeaders;
16import org.springframework.http.MediaType;
17import org.springframework.http.ResponseEntity;
18import org.springframework.web.bind.annotation.GetMapping;
19import org.springframework.web.bind.annotation.PathVariable;
20import org.springframework.web.bind.annotation.PostMapping;
21import org.springframework.web.bind.annotation.RequestBody;
22import org.springframework.web.bind.annotation.RequestMapping;
23import org.springframework.web.bind.annotation.RequestParam;
24import org.springframework.web.bind.annotation.RestController;
25import reactor.core.publisher.Flux;
26import reactor.core.publisher.Mono;
27
28@RestController
29@RequestMapping("/api")
30@RequiredArgsConstructor
31public class HomeController {
32
33    private final CustomerReactiveRepoService customerReactiveService;
34    private final CsvService csvService;
35    private final DownloadFileReactiveRepoService downloadFileReactiveRepoService;
36
37    @GetMapping("/customers")
38    public Flux<Customer> getAllCustomer() {
39        return customerReactiveService.findAll();
40    }
41
42    @GetMapping("/customer/{customerId}")
43    public Mono<Optional<Customer>> findById(@PathVariable Long customerId) {
44        return customerReactiveService.findById(customerId);
45    }
46
47    @PostMapping(value = "/customer")
48    public Mono<Customer> save(@RequestBody Customer customer) {
49        return customerReactiveService.save(customer);
50    }
51
52    @GetMapping("/customer")
53    public Mono<List<Customer>> findById(@RequestParam String name, @RequestParam Integer age) {
54        return customerReactiveService.findByNameAndAge(name, age);
55    }
56
57    @GetMapping("/csv")
58    public Mono<DownloadFile> generateCsvFile() {
59        return csvService.generateCsvFile();
60    }
61
62    @GetMapping("/downloads")
63    public Flux<DownloadFile> findAllDownloads() {
64        return downloadFileReactiveRepoService.findAll();
65    }
66
67    @GetMapping(path = "/download/{id}", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
68    Mono<ResponseEntity<Resource>> downloadCsvFile(@PathVariable("id") Long id) {
69        return csvService.getFileResourcePath(id)
70                .filter(response -> csvService.isFileExists(response))
71                .flatMap(s -> {
72                    Resource resource = new FileSystemResource(s);
73                    return Mono.just(ResponseEntity.ok()
74                            .cacheControl(CacheControl.noCache())
75                            .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + resource.getFilename())
76                            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_OCTET_STREAM_VALUE)
77                            .body(resource));
78
79                }).defaultIfEmpty(ResponseEntity.notFound().build());
80    }
81}
 1package com.demo.project64.config;
 2
 3import java.util.concurrent.Executors;
 4
 5import org.springframework.beans.factory.annotation.Value;
 6import org.springframework.context.annotation.Bean;
 7import org.springframework.context.annotation.Configuration;
 8import org.springframework.transaction.PlatformTransactionManager;
 9import org.springframework.transaction.support.TransactionTemplate;
10import reactor.core.scheduler.Scheduler;
11import reactor.core.scheduler.Schedulers;
12
13@Configuration
14class SchedulerConfig {
15
16    @Value("${spring.datasource.hikari.maximum-pool-size:100}")
17    private int connectionPoolSize;
18
19    @Bean
20    public Scheduler jdbcScheduler() {
21        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
22    }
23
24}
 1spring:
 2  main:
 3    banner-mode: "off"
 4  datasource:
 5    driver-class-name: org.postgresql.Driver
 6    host: localhost
 7    url: jdbc:postgresql://${spring.datasource.host}:5432/test-db
 8    username: test
 9    password: test@123
10  jpa:
11    show-sql: false
12    hibernate.ddl-auto: create-drop
13    properties.hibernate.temp.use_jdbc_metadata_defaults: false
14    database-platform: org.hibernate.dialect.PostgreSQLDialect
15    defer-datasource-initialization: true

Postman

Import the postman collection to postman

Postman Collection

Setup

 1# Project 64
 2
 3Spring WebFlux Reactive JDBC
 4
 5[https://gitorko.github.io/spring-webflux-reactive-jdbc/](https://gitorko.github.io/spring-webflux-reactive-jdbc/)
 6
 7### Version
 8
 9Check version
10
11```bash
12$java --version
13openjdk version "21.0.3" 2024-04-16 LTS
14```
15
16### Postgres DB
17
18```
19docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:9.6.10
20docker ps
21docker exec -it pg-container psql -U postgres -W postgres
22CREATE USER test WITH PASSWORD 'test@123';
23CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
24grant all PRIVILEGES ON DATABASE "test-db" to test;
25
26docker stop pg-container
27docker start pg-container
28```
29
30To seed large test data
31
32```
33INSERT INTO customer (name, age)
34SELECT
35    'John',
36    30
37FROM generate_series(1, 4000000);
38```
39
40### Dev
41
42To run the code.
43
44```bash
45./gradlew clean build
46./gradlew bootRun
47```

References

https://spring.io/blog/2018/12/07/reactive-programming-and-relational-databases

comments powered by Disqus