Spring Webflux & Reactive JDBC
Overview
Webflux integration with reactive JDBC, to allow non-blocking calls to database.
Github: https://github.com/gitorko/project64
Webflux JDBC
This approach provides alternate way to integrate existing relational database with webflux if the project is not ready to use R2DBC.
Code
1package com.demo.project64;
2
3import java.time.Duration;
4
5import com.demo.project64.domain.Customer;
6import com.demo.project64.repositoryservice.CustomerReactiveRepoService;
7import lombok.extern.slf4j.Slf4j;
8import org.springframework.boot.CommandLineRunner;
9import org.springframework.boot.SpringApplication;
10import org.springframework.boot.autoconfigure.SpringBootApplication;
11import org.springframework.context.annotation.Bean;
12import reactor.core.publisher.Flux;
13
14@SpringBootApplication
15@Slf4j
16public class Main {
17
18 public static void main(String[] args) {
19 SpringApplication.run(Main.class, args);
20 }
21
22 @Bean
23 public CommandLineRunner seedData(CustomerReactiveRepoService customerReactiveService) {
24 return args -> {
25 log.info("Seeding data!");
26
27 Flux<String> names = Flux.just("raj", "david", "pam").delayElements(Duration.ofSeconds(1));
28 Flux<Integer> ages = Flux.just(25, 27, 30).delayElements(Duration.ofSeconds(1));
29 Flux<Customer> customers = Flux.zip(names, ages).map(tupple -> {
30 return new Customer(null, tupple.getT1(), tupple.getT2());
31 });
32
33 customerReactiveService.deleteAll().thenMany(customers.flatMap(c -> customerReactiveService.save(c))
34 .thenMany(customerReactiveService.findAll())).subscribe(System.out::println);
35 };
36 }
37
38}
39
1package com.demo.project64.repositoryservice;
2
3import java.util.Optional;
4import java.util.concurrent.Callable;
5
6import org.springframework.beans.factory.annotation.Autowired;
7import org.springframework.beans.factory.annotation.Qualifier;
8import org.springframework.data.domain.Page;
9import org.springframework.data.domain.Pageable;
10import org.springframework.data.jpa.repository.JpaRepository;
11import reactor.core.publisher.Flux;
12import reactor.core.publisher.Mono;
13import reactor.core.scheduler.Scheduler;
14import reactor.core.scheduler.Schedulers;
15
16public abstract class AbstractReactiveRepoService<T> {
17
18 @Qualifier("jdbcScheduler")
19 @Autowired
20 Scheduler jdbcScheduler;
21
22 public Mono<Page<T>> findAll(Pageable pageable) {
23 return asyncCallable(() -> getRepository().findAll(pageable));
24 }
25
26 public Flux<T> findAll() {
27 return asyncIterable(() -> getRepository().findAll().iterator());
28 }
29
30 public Mono<Optional<T>> findById(Long id) {
31 return asyncCallable(() -> getRepository().findById(id));
32 }
33
34 public Mono<T> save(T customer) {
35 return (Mono<T>) asyncCallable(() -> getRepository().save(customer));
36 }
37
38 public Mono<Void> delete(T object) {
39 return asyncCallable(() -> {
40 getRepository().delete(object);
41 return null;
42 });
43 }
44
45 public Mono<Void> deleteAll() {
46 return asyncCallable(() -> {
47 getRepository().deleteAll();
48 return null;
49 });
50 }
51
52 protected <T> Mono<T> asyncCallable(Callable<T> callable) {
53 return Mono.fromCallable(callable).subscribeOn(Schedulers.newParallel("jdbc-thread")).publishOn(jdbcScheduler);
54 }
55
56 protected <T> Flux<T> asyncIterable(Iterable<T> iterable) {
57 return Flux.fromIterable(iterable).subscribeOn(Schedulers.newParallel("jdbc-thread")).publishOn(jdbcScheduler);
58 }
59
60 protected abstract JpaRepository getRepository();
61
62}
1package com.demo.project64.repositoryservice;
2
3import java.util.List;
4
5import com.demo.project64.domain.Customer;
6import com.demo.project64.repository.CustomerRepository;
7import lombok.RequiredArgsConstructor;
8import lombok.extern.slf4j.Slf4j;
9import org.springframework.data.jpa.repository.JpaRepository;
10import org.springframework.stereotype.Service;
11import reactor.core.publisher.Mono;
12
13@Service
14@RequiredArgsConstructor
15@Slf4j
16public class CustomerReactiveRepoService extends AbstractReactiveRepoService<Customer> {
17
18 private final CustomerRepository customerRepository;
19
20 @Override
21 protected JpaRepository getRepository() {
22 return customerRepository;
23 }
24
25 public Mono<List<Customer>> findByNameAndAge(String name, Integer age) {
26 return asyncCallable(() -> customerRepository.findByNameAndAge(name, age));
27 }
28
29}
30
31
1package com.demo.project64.repository;
2
3import java.util.List;
4
5import com.demo.project64.domain.Customer;
6import org.springframework.data.jpa.repository.JpaRepository;
7
8public interface CustomerRepository extends JpaRepository<Customer, Long> {
9 List<Customer> findByNameAndAge(String name, Integer age);
10}
1package com.demo.project64.controller;
2
3import java.util.List;
4import java.util.Optional;
5
6import com.demo.project64.domain.Customer;
7import com.demo.project64.domain.DownloadFile;
8import com.demo.project64.service.CsvService;
9import com.demo.project64.repositoryservice.CustomerReactiveRepoService;
10import com.demo.project64.repositoryservice.DownloadFileReactiveRepoService;
11import lombok.RequiredArgsConstructor;
12import org.springframework.core.io.FileSystemResource;
13import org.springframework.core.io.Resource;
14import org.springframework.http.CacheControl;
15import org.springframework.http.HttpHeaders;
16import org.springframework.http.MediaType;
17import org.springframework.http.ResponseEntity;
18import org.springframework.web.bind.annotation.GetMapping;
19import org.springframework.web.bind.annotation.PathVariable;
20import org.springframework.web.bind.annotation.PostMapping;
21import org.springframework.web.bind.annotation.RequestBody;
22import org.springframework.web.bind.annotation.RequestMapping;
23import org.springframework.web.bind.annotation.RequestParam;
24import org.springframework.web.bind.annotation.RestController;
25import reactor.core.publisher.Flux;
26import reactor.core.publisher.Mono;
27
28@RestController
29@RequestMapping("/api")
30@RequiredArgsConstructor
31public class HomeController {
32
33 private final CustomerReactiveRepoService customerReactiveService;
34 private final CsvService csvService;
35 private final DownloadFileReactiveRepoService downloadFileReactiveRepoService;
36
37 @GetMapping("/customers")
38 public Flux<Customer> getAllCustomer() {
39 return customerReactiveService.findAll();
40 }
41
42 @GetMapping("/customer/{customerId}")
43 public Mono<Optional<Customer>> findById(@PathVariable Long customerId) {
44 return customerReactiveService.findById(customerId);
45 }
46
47 @PostMapping(value = "/customer")
48 public Mono<Customer> save(@RequestBody Customer customer) {
49 return customerReactiveService.save(customer);
50 }
51
52 @GetMapping("/customer")
53 public Mono<List<Customer>> findById(@RequestParam String name, @RequestParam Integer age) {
54 return customerReactiveService.findByNameAndAge(name, age);
55 }
56
57 @GetMapping("/csv")
58 public Mono<DownloadFile> generateCsvFile() {
59 return csvService.generateCsvFile();
60 }
61
62 @GetMapping("/downloads")
63 public Flux<DownloadFile> findAllDownloads() {
64 return downloadFileReactiveRepoService.findAll();
65 }
66
67 @GetMapping(path = "/download/{id}", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
68 Mono<ResponseEntity<Resource>> downloadCsvFile(@PathVariable("id") Long id) {
69 return csvService.getFileResourcePath(id)
70 .filter(response -> csvService.isFileExists(response))
71 .flatMap(s -> {
72 Resource resource = new FileSystemResource(s);
73 return Mono.just(ResponseEntity.ok()
74 .cacheControl(CacheControl.noCache())
75 .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + resource.getFilename())
76 .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_OCTET_STREAM_VALUE)
77 .body(resource));
78
79 }).defaultIfEmpty(ResponseEntity.notFound().build());
80 }
81}
1package com.demo.project64.config;
2
3import java.util.concurrent.Executors;
4
5import org.springframework.beans.factory.annotation.Value;
6import org.springframework.context.annotation.Bean;
7import org.springframework.context.annotation.Configuration;
8import org.springframework.transaction.PlatformTransactionManager;
9import org.springframework.transaction.support.TransactionTemplate;
10import reactor.core.scheduler.Scheduler;
11import reactor.core.scheduler.Schedulers;
12
13@Configuration
14class SchedulerConfig {
15
16 @Value("${spring.datasource.hikari.maximum-pool-size:100}")
17 private int connectionPoolSize;
18
19 @Bean
20 public Scheduler jdbcScheduler() {
21 return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
22 }
23
24}
1spring:
2 main:
3 banner-mode: "off"
4 datasource:
5 driver-class-name: org.postgresql.Driver
6 host: localhost
7 url: jdbc:postgresql://${spring.datasource.host}:5432/test-db
8 username: test
9 password: test@123
10 jpa:
11 show-sql: false
12 hibernate.ddl-auto: create-drop
13 properties.hibernate.temp.use_jdbc_metadata_defaults: false
14 database-platform: org.hibernate.dialect.PostgreSQLDialect
15 defer-datasource-initialization: true
Postman
Import the postman collection to postman
Setup
1# Project 64
2
3Spring WebFlux Reactive JDBC
4
5[https://gitorko.github.io/spring-webflux-reactive-jdbc/](https://gitorko.github.io/spring-webflux-reactive-jdbc/)
6
7### Version
8
9Check version
10
11```bash
12$java --version
13openjdk version "21.0.3" 2024-04-16 LTS
14```
15
16### Postgres DB
17
18```
19docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:9.6.10
20docker ps
21docker exec -it pg-container psql -U postgres -W postgres
22CREATE USER test WITH PASSWORD 'test@123';
23CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
24grant all PRIVILEGES ON DATABASE "test-db" to test;
25
26docker stop pg-container
27docker start pg-container
28```
29
30To seed large test data
31
32```
33INSERT INTO customer (name, age)
34SELECT
35 'John',
36 30
37FROM generate_series(1, 4000000);
38```
39
40### Dev
41
42To run the code.
43
44```bash
45./gradlew clean build
46./gradlew bootRun
47```
References
https://spring.io/blog/2018/12/07/reactive-programming-and-relational-databases
comments powered by Disqus