Spring Webflux & Reactive JDBC

Overview

Webflux integration with reactive JDBC, to allow non-blocking calls to database. R2DBC is still not recommended for production, hence this approach should help you integrate existing relational database with webflux.

Github: https://github.com/gitorko/project64

Code

 1package com.demo.project64;
 2
 3import java.time.Duration;
 4
 5import com.demo.project64.domain.Customer;
 6import com.demo.project64.repositoryservice.CustomerReactiveRepoService;
 7import lombok.extern.slf4j.Slf4j;
 8import org.springframework.boot.CommandLineRunner;
 9import org.springframework.boot.SpringApplication;
10import org.springframework.boot.autoconfigure.SpringBootApplication;
11import org.springframework.context.annotation.Bean;
12import reactor.core.publisher.Flux;
13
14@SpringBootApplication
15@Slf4j
16public class Main {
17
18    public static void main(String[] args) {
19        SpringApplication.run(Main.class, args);
20    }
21
22    @Bean
23    public CommandLineRunner seedData(CustomerReactiveRepoService customerReactiveService) {
24        return args -> {
25            log.info("Seeding data!");
26
27            Flux<String> names = Flux.just("raj", "david", "pam").delayElements(Duration.ofSeconds(1));
28            Flux<Integer> ages = Flux.just(25, 27, 30).delayElements(Duration.ofSeconds(1));
29            Flux<Customer> customers = Flux.zip(names, ages).map(tupple -> {
30                return new Customer(null, tupple.getT1(), tupple.getT2());
31            });
32
33            customerReactiveService.deleteAll().thenMany(customers.flatMap(c -> customerReactiveService.save(c))
34                    .thenMany(customerReactiveService.findAll())).subscribe(System.out::println);
35        };
36    }
37
38}
39
 1package com.demo.project64.repositoryservice;
 2
 3import java.util.Optional;
 4import java.util.concurrent.Callable;
 5
 6import org.springframework.beans.factory.annotation.Autowired;
 7import org.springframework.beans.factory.annotation.Qualifier;
 8import org.springframework.data.domain.Page;
 9import org.springframework.data.domain.Pageable;
10import org.springframework.data.jpa.repository.JpaRepository;
11import reactor.core.publisher.Flux;
12import reactor.core.publisher.Mono;
13import reactor.core.scheduler.Scheduler;
14import reactor.core.scheduler.Schedulers;
15
16public abstract class AbstractReactiveRepoService<T> {
17
18    @Qualifier("jdbcScheduler")
19    @Autowired
20    Scheduler jdbcScheduler;
21
22    public Mono<Page<T>> findAll(Pageable pageable) {
23        return asyncCallable(() -> getRepository().findAll(pageable));
24    }
25
26    public Mono<Page<T>> findAllBlocking(Pageable pageable) {
27        return Mono.just(getRepository().findAll(pageable));
28    }
29
30    public Flux<T> findAll() {
31        return asyncIterable(() -> getRepository().findAll().iterator());
32    }
33
34    public Mono<Optional<T>> findById(Long id) {
35        return asyncCallable(() -> getRepository().findById(id));
36    }
37
38    public Mono<T> save(T customer) {
39        return (Mono<T>) asyncCallable(() -> getRepository().save(customer));
40    }
41
42    public Mono<Void> delete(T object) {
43        return asyncCallable(() -> {
44            getRepository().delete(object);
45            return null;
46        });
47    }
48
49    public Mono<Void> deleteAll() {
50        return asyncCallable(() -> {
51            getRepository().deleteAll();
52            return null;
53        });
54    }
55
56    protected <T> Mono<T> asyncCallable(Callable<T> callable) {
57        return Mono.fromCallable(callable).subscribeOn(Schedulers.newParallel("jdbc-thread")).publishOn(jdbcScheduler);
58    }
59
60    protected <T> Flux<T> asyncIterable(Iterable<T> iterable) {
61        return Flux.fromIterable(iterable).subscribeOn(Schedulers.newParallel("jdbc-thread")).publishOn(jdbcScheduler);
62    }
63
64    protected abstract JpaRepository getRepository();
65
66}
 1package com.demo.project64.repositoryservice;
 2
 3import java.util.List;
 4
 5import com.demo.project64.domain.Customer;
 6import com.demo.project64.repository.CustomerRepository;
 7import lombok.RequiredArgsConstructor;
 8import lombok.extern.slf4j.Slf4j;
 9import org.springframework.data.jpa.repository.JpaRepository;
10import org.springframework.stereotype.Service;
11import reactor.core.publisher.Mono;
12
13@Service
14@RequiredArgsConstructor
15@Slf4j
16public class CustomerReactiveRepoService extends AbstractReactiveRepoService<Customer> {
17
18    private final CustomerRepository customerRepository;
19
20    @Override
21    protected JpaRepository getRepository() {
22        return customerRepository;
23    }
24
25    public Mono<List<Customer>> findByNameAndAge(String name, Integer age) {
26        return asyncCallable(() -> customerRepository.findByNameAndAge(name, age));
27    }
28
29}
30
31
 1package com.demo.project64.repository;
 2
 3import java.util.List;
 4
 5import com.demo.project64.domain.Customer;
 6import org.springframework.data.jpa.repository.JpaRepository;
 7
 8public interface CustomerRepository extends JpaRepository<Customer, Long> {
 9    List<Customer> findByNameAndAge(String name, Integer age);
10}
 1package com.demo.project64.controller;
 2
 3import java.util.List;
 4import java.util.Optional;
 5
 6import com.demo.project64.domain.Customer;
 7import com.demo.project64.domain.DownloadFile;
 8import com.demo.project64.service.CsvService;
 9import com.demo.project64.repositoryservice.CustomerReactiveRepoService;
10import com.demo.project64.repositoryservice.DownloadFileReactiveRepoService;
11import lombok.RequiredArgsConstructor;
12import org.springframework.core.io.FileSystemResource;
13import org.springframework.core.io.Resource;
14import org.springframework.http.CacheControl;
15import org.springframework.http.HttpHeaders;
16import org.springframework.http.MediaType;
17import org.springframework.http.ResponseEntity;
18import org.springframework.web.bind.annotation.GetMapping;
19import org.springframework.web.bind.annotation.PathVariable;
20import org.springframework.web.bind.annotation.PostMapping;
21import org.springframework.web.bind.annotation.RequestBody;
22import org.springframework.web.bind.annotation.RequestMapping;
23import org.springframework.web.bind.annotation.RequestParam;
24import org.springframework.web.bind.annotation.RestController;
25import reactor.core.publisher.Flux;
26import reactor.core.publisher.Mono;
27
28@RestController
29@RequestMapping("/api")
30@RequiredArgsConstructor
31public class HomeController {
32
33    private final CustomerReactiveRepoService customerReactiveService;
34    private final CsvService csvService;
35    private final DownloadFileReactiveRepoService downloadFileReactiveRepoService;
36
37    @GetMapping("/customers")
38    public Flux<Customer> getAllCustomer() {
39        return customerReactiveService.findAll();
40    }
41
42    @GetMapping("/customer/{customerId}")
43    public Mono<Optional<Customer>> findById(@PathVariable Long customerId) {
44        return customerReactiveService.findById(customerId);
45    }
46
47    @PostMapping(value = "/customer")
48    public Mono<Customer> save(@RequestBody Customer customer) {
49        return customerReactiveService.save(customer);
50    }
51
52    @GetMapping("/customer")
53    public Mono<List<Customer>> findById(@RequestParam String name, @RequestParam Integer age) {
54        return customerReactiveService.findByNameAndAge(name, age);
55    }
56
57    @GetMapping("/csv")
58    public Mono<DownloadFile> generateCsvFile() {
59        return csvService.generateCsvFile();
60    }
61
62    @GetMapping("/downloads")
63    public Flux<DownloadFile> findAllDownloads() {
64        return downloadFileReactiveRepoService.findAll();
65    }
66
67    @GetMapping(path = "/download/{id}", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
68    Mono<ResponseEntity<Resource>> downloadCsvFile(@PathVariable("id") Long id) {
69        return csvService.getFileResourcePath(id)
70                .filter(response -> csvService.isFileExists(response))
71                .flatMap(s -> {
72                    Resource resource = new FileSystemResource(s);
73                    return Mono.just(ResponseEntity.ok()
74                            .cacheControl(CacheControl.noCache())
75                            .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + resource.getFilename())
76                            .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_OCTET_STREAM_VALUE)
77                            .body(resource));
78
79                }).defaultIfEmpty(ResponseEntity.notFound().build());
80    }
81}
 1package com.demo.project64.config;
 2
 3import java.util.concurrent.Executors;
 4
 5import org.springframework.beans.factory.annotation.Value;
 6import org.springframework.context.annotation.Bean;
 7import org.springframework.context.annotation.Configuration;
 8import org.springframework.transaction.PlatformTransactionManager;
 9import org.springframework.transaction.support.TransactionTemplate;
10import reactor.core.scheduler.Scheduler;
11import reactor.core.scheduler.Schedulers;
12
13@Configuration
14class SchedulerConfig {
15
16    @Value("${spring.datasource.hikari.maximum-pool-size:100}")
17    private int connectionPoolSize;
18
19    @Bean
20    public Scheduler jdbcScheduler() {
21        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
22    }
23
24}
 1spring:
 2  main:
 3    banner-mode: "off"
 4  datasource:
 5    driver-class-name: org.postgresql.Driver
 6    host: localhost
 7    url: jdbc:postgresql://${spring.datasource.host}:5432/test-db
 8    username: test
 9    password: test@123
10  jpa:
11    show-sql: false
12    hibernate.ddl-auto: create-drop
13    properties.hibernate.temp.use_jdbc_metadata_defaults: false
14    database-platform: org.hibernate.dialect.PostgreSQLDialect
15    defer-datasource-initialization: true

Setup

Project 64

Spring WebFlux Reactive JDBC

https://gitorko.github.io/spring-webflux-reactive-jdbc/

Version

Check version

1$java --version
2openjdk 17.0.3 2022-04-19 LTS

Postgres DB

1docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:9.6.10
2docker ps
3docker exec -it pg-container psql -U postgres -W postgres
4CREATE USER test WITH PASSWORD 'test@123';
5CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
6grant all PRIVILEGES ON DATABASE "test-db" to test;
7
8docker stop pg-container
9docker start pg-container

To seed large test data

1INSERT INTO customer (name, age)
2SELECT
3    'John',
4    30
5FROM generate_series(1, 4000000);

Dev

To run the code.

1./gradlew clean build
2./gradlew bootRun

Postman

Import the postman collection to postman

Postman Collection

References

https://spring.io/blog/2018/12/07/reactive-programming-and-relational-databases

comments powered by Disqus