Spring Reactor - Basics

Overview

Reactive programming examples on how to use spring reactor.

Github: https://github.com/gitorko/project83

Spring Reactor

Methods demonstrating how to use spring reactor

   1package com.demo.project83;
   2
   3import static org.assertj.core.api.Assertions.assertThat;
   4
   5import java.nio.file.Files;
   6import java.nio.file.Path;
   7import java.time.Duration;
   8import java.time.LocalTime;
   9import java.util.ArrayList;
  10import java.util.Arrays;
  11import java.util.Collection;
  12import java.util.Date;
  13import java.util.HashMap;
  14import java.util.List;
  15import java.util.Map;
  16import java.util.Optional;
  17import java.util.Random;
  18import java.util.concurrent.Callable;
  19import java.util.concurrent.CountDownLatch;
  20import java.util.concurrent.TimeUnit;
  21import java.util.concurrent.atomic.AtomicLong;
  22import java.util.function.Consumer;
  23import java.util.function.Function;
  24import java.util.function.Supplier;
  25import java.util.stream.Stream;
  26
  27import lombok.SneakyThrows;
  28import lombok.extern.slf4j.Slf4j;
  29import org.junit.jupiter.api.Test;
  30import org.reactivestreams.Subscription;
  31import reactor.core.Disposable;
  32import reactor.core.Exceptions;
  33import reactor.core.publisher.BaseSubscriber;
  34import reactor.core.publisher.ConnectableFlux;
  35import reactor.core.publisher.Flux;
  36import reactor.core.publisher.FluxSink;
  37import reactor.core.publisher.Mono;
  38import reactor.core.publisher.ParallelFlux;
  39import reactor.core.scheduler.Schedulers;
  40import reactor.test.StepVerifier;
  41import reactor.test.scheduler.VirtualTimeScheduler;
  42import reactor.tools.agent.ReactorDebugAgent;
  43import reactor.util.function.Tuple2;
  44import reactor.util.function.Tuples;
  45import reactor.util.retry.Retry;
  46import reactor.util.retry.RetryBackoffSpec;
  47
  48/**
  49 * Reactive Streams Specification
  50 *
  51 * Publisher (Mono/Flux)
  52 *   - subscribe (data source, db, remote service)
  53 * Subscriber
  54 *   - onSubscribe
  55 *   - onNext
  56 *   - onError
  57 *   - onComplete
  58 * Subscription
  59 *   - request
  60 *   - cancel
  61 * Processor - Publisher + Subscriber
  62 *
  63 * Spring reactor is a Push + Pull data flow model
  64 */
  65@Slf4j
  66public class ReactorTest {
  67
  68    /**
  69     * ********************************************************************
  70     *  mono
  71     * ********************************************************************
  72     */
  73    @Test
  74    void monoTest() {
  75        //justOrEmpty
  76        Mono<String> mono1 = Mono.justOrEmpty("apple");
  77        mono1.subscribe(System.out::println);
  78        StepVerifier.create(mono1)
  79                .expectNext("apple")
  80                .verifyComplete();
  81
  82        //Note: Reactive Streams do not accept null values
  83        Mono<String> mono2 = Mono.justOrEmpty(null);
  84        mono2.subscribe(System.out::println);
  85        StepVerifier.create(mono2)
  86                .verifyComplete();
  87
  88        //Default value if empty.
  89        Mono<String> mono3 = mono2.defaultIfEmpty("Jill");
  90        mono3.subscribe(System.out::println);
  91        StepVerifier.create(mono3)
  92                .expectNext("Jill")
  93                .verifyComplete();
  94
  95        //Use log to look at transitions.
  96        Mono<String> mono4 = Mono.just("apple").log();
  97        mono4.subscribe(s -> {
  98            log.info("Got: {}", s);
  99        });
 100        StepVerifier.create(mono4)
 101                .expectNext("apple")
 102                .verifyComplete();
 103    }
 104
 105    /**
 106     * ********************************************************************
 107     *  flux
 108     * ********************************************************************
 109     */
 110    @Test
 111    void fluxTest() {
 112        Flux flux = Flux.just("apple", "orange");
 113        flux.subscribe(System.out::println);
 114        StepVerifier.create(flux)
 115                .expectNext("apple", "orange")
 116                .verifyComplete();
 117    }
 118
 119    /**
 120     * ********************************************************************
 121     *  flux - Avoid blocking calls that will hold thread
 122     * ********************************************************************
 123     */
 124    @Test
 125    void fluxSleepTest() {
 126        Flux flux = Flux.just("apple", "orange").map(e -> {
 127            log.info("Received: {}", e);
 128            //Bad idea to do Thread.sleep or any blocking call.
 129            //Instead use delayElements.
 130            return e;
 131        }).delayElements(Duration.ofSeconds(1));
 132        flux.subscribe(System.out::println);
 133        StepVerifier.create(flux)
 134                .expectNext("apple", "orange")
 135                .verifyComplete();
 136    }
 137
 138    /**
 139     * ********************************************************************
 140     *  block
 141     * ********************************************************************
 142     */
 143    @Test
 144    public void badTest() {
 145        //Never use .block() as it blocks the thread.
 146        String name = Mono.just("Jack")
 147                .block();
 148        System.out.println(name);
 149    }
 150
 151    /**
 152     * ********************************************************************
 153     *  filter - filter out elements that dont meet condition
 154     * ********************************************************************
 155     */
 156    @Test
 157    void fluxFilterTest() {
 158        //Get even numbers
 159        Flux flux = Flux.just(1, 2, 3, 4, 5)
 160                .filter(i -> i % 2 == 0);
 161        flux.subscribe(System.out::println);
 162        StepVerifier.create(flux)
 163                .expectNext(2, 4)
 164                .verifyComplete();
 165    }
 166
 167    /**
 168     * ********************************************************************
 169     *  flux from array, list, stream
 170     * ********************************************************************
 171     */
 172    @Test
 173    public void fluxArrayTest() {
 174        Integer[] arr = {2, 5, 7, 8};
 175        Flux<Integer> flux1 = Flux.fromArray(arr);
 176        flux1.subscribe(System.out::println);
 177        StepVerifier.create(flux1)
 178                .expectNext(2, 5, 7, 8)
 179                .verifyComplete();
 180
 181        List<String> fruitsList = Arrays.asList("apple", "oranges", "grapes");
 182        Flux<String> fruits = Flux.fromIterable(fruitsList);
 183        StepVerifier.create(fruits)
 184                .expectNext("apple", "oranges", "grapes")
 185                .verifyComplete();
 186
 187        Stream<Integer> stream = List.of(1, 2, 3, 4, 5).stream();
 188        Flux<Integer> flux2 = Flux.fromStream(() -> stream);
 189        StepVerifier.create(flux2)
 190                .expectNext(1, 2, 3, 4, 5)
 191                .verifyComplete();
 192
 193        Flux<Integer> flux3 = Flux.fromIterable(List.of(1, 2, 3, 4, 5));
 194        StepVerifier.create(flux3)
 195                .expectNext(1, 2, 3, 4, 5)
 196                .verifyComplete();
 197    }
 198
 199    /**
 200     * ********************************************************************
 201     *  flux range
 202     * ********************************************************************
 203     */
 204    @Test
 205    public void fluxRangeTest() {
 206        Flux<Integer> numbers = Flux.range(1, 5);
 207        numbers.subscribe(n -> {
 208            log.info("number: {}", n);
 209        });
 210        StepVerifier.create(numbers)
 211                .expectNext(1, 2, 3, 4, 5)
 212                .verifyComplete();
 213    }
 214
 215    /**
 216     * ********************************************************************
 217     *  map - synchronous by nature
 218     * ********************************************************************
 219     */
 220    @Test
 221    public void fluxMapTest() {
 222        Flux<Integer> flux1 = Flux.just("apple", "orange")
 223                .log()
 224                .map(i -> i.length());
 225        StepVerifier
 226                .create(flux1)
 227                .expectNext(5, 6)
 228                .verifyComplete();
 229
 230        Flux<Integer> flux2 = Flux.range(3, 2)
 231                .map(i -> i + 100);
 232        flux2.subscribe(System.out::println);
 233        StepVerifier.create(flux2)
 234                .expectNext(103, 104)
 235                .verifyComplete();
 236    }
 237
 238    /**
 239     * ********************************************************************
 240     *  flatMap - transform object 1-1 or 1-N in asyncronous fashion, returns back Mono/Flux.
 241     *  map - transform an object 1-1 in fixed time in synchronous fashion
 242     * ********************************************************************
 243     */
 244    @Test
 245    void flatMapTest() {
 246        Flux flux1 = Flux.just("apple", "orange")
 247                .flatMap(ReactorTest::capitalizeReactive);
 248        flux1.subscribe(System.out::println);
 249        StepVerifier.create(flux1)
 250                .expectSubscription()
 251                .expectNext("APPLE")
 252                .expectNext("ORANGE")
 253                .verifyComplete();
 254
 255        //capitalize will happen in blocking fashion. If this function takes long or does I/O then it will be blocking
 256        //Use map only when there is no IO involved in the function
 257        Flux flux2 = Flux.just("apple", "orange")
 258                .map(ReactorTest::capitalize);
 259        flux2.subscribe(System.out::println);
 260        StepVerifier.create(flux2)
 261                .expectSubscription()
 262                .expectNext("APPLE")
 263                .expectNext("ORANGE")
 264                .verifyComplete();
 265    }
 266
 267    private static Mono<String> capitalizeReactive(String user) {
 268        return Mono.just(user.toUpperCase());
 269    }
 270
 271    private static String capitalize(String user) {
 272        return user.toUpperCase();
 273    }
 274
 275    /**
 276     * ********************************************************************
 277     *  flatMap - object modification
 278     * ********************************************************************
 279     */
 280    @Test
 281    void objectModificationTest() {
 282        //Modification of object in chain - done via flatMap
 283        //Ideally create a new object instead of modifying the existing object.
 284        Mono<String> mono1 = Mono.just("Apple")
 285                .flatMap(ReactorTest::appendGreet);
 286        StepVerifier.create(mono1)
 287                .expectNext("Hello Apple")
 288                .verifyComplete();
 289
 290        //Modification of object in chain - done via zipWith
 291        //The 2nd argument for zipWith is a combinator function that determines how the 2 mono are zipped
 292        Mono<String> mono2 = Mono.just("Apple")
 293                .zipWith(Mono.just("Hello "), ReactorTest::getGreet);
 294        StepVerifier.create(mono2)
 295                .expectNext("Hello Apple")
 296                .verifyComplete();
 297    }
 298
 299    private static Mono<String> appendGreet(String name) {
 300        return Mono.just("Hello " + name);
 301    }
 302
 303    private static String getGreet(String name, String greet) {
 304        return greet + name;
 305    }
 306
 307    /**
 308     * ********************************************************************
 309     *  flatMap - Find all distinct chars in the list of names, convert to uppercase
 310     * ********************************************************************
 311     */
 312    @Test
 313    void flatMapTest2() {
 314        Flux<String> flux = Flux.fromIterable(List.of("Jack", "Joe", "Jill"))
 315                .map(String::toUpperCase)
 316                .flatMap(s -> splitString(s))
 317                .distinct();
 318        flux.subscribe(System.out::println);
 319        StepVerifier.create(flux)
 320                .expectNext("J", "A", "C", "K", "O", "E", "I", "L")
 321                .verifyComplete();
 322
 323    }
 324
 325    private Flux<String> splitString(String name) {
 326        return Flux.fromArray(name.split(""));
 327    }
 328
 329    /**
 330     * ********************************************************************
 331     *  concatMap - Same as flatMap but order is preserved, concatMap takes more time but ordering is preserved.
 332     *  flatMap - Takes less time but ordering is lost.
 333     * ********************************************************************
 334     */
 335    @Test
 336    @SneakyThrows
 337    void concatMapTest() {
 338        Flux<String> flux = Flux.fromIterable(List.of("Jack", "Joe", "Jill"))
 339                .map(String::toUpperCase)
 340                .concatMap(s -> splitStringAsync(s))
 341                .distinct();
 342
 343        StepVerifier.create(flux)
 344                .expectNext("J", "A", "C", "K", "O", "E", "I", "L")
 345                .verifyComplete();
 346    }
 347
 348    private Flux<String> splitStringAsync(String name) {
 349        return Flux.fromArray(name.split(""))
 350                .delayElements(Duration.ofMillis(new Random().nextInt(1000)));
 351    }
 352
 353    /**
 354     * ********************************************************************
 355     *  flatMapMany - similar to flatMap, but function returns flux
 356     * ********************************************************************
 357     */
 358    @Test
 359    void flatMapManyTest() {
 360
 361        Flux flux1 = Mono.just("Jack")
 362                .flatMapMany(ReactorTest::capitalizeSplit);
 363        flux1.subscribe(System.out::println);
 364        StepVerifier.create(flux1)
 365                .expectSubscription()
 366                .expectNext("J", "A", "C", "K")
 367                .verifyComplete();
 368
 369        Flux<String> flux2 = Mono.just("the quick brown fox jumps over the lazy dog")
 370                .flatMapMany(ReactorTest::capitalizeSplit)
 371                .distinct()
 372                .sort();
 373        flux2.subscribe(System.out::println);
 374        //26 letters in the alphabet
 375        StepVerifier.create(flux2)
 376                .expectNext("A", "B", "C")
 377                .expectComplete();
 378
 379        Mono<List<Integer>> mono = Mono.just(Arrays.asList(1, 2, 3));
 380        Flux<Integer> flux3 = mono.flatMapMany(it -> Flux.fromIterable(it));
 381        flux3.subscribe(System.out::println);
 382        StepVerifier
 383                .create(flux3)
 384                .expectNext(1, 2, 3)
 385                .verifyComplete();
 386    }
 387
 388    private static Flux<String> capitalizeSplit(String user) {
 389        return Flux.fromArray(user.toUpperCase().split(""));
 390    }
 391
 392    /**
 393     * ********************************************************************
 394     *  flatMapIterable - convert mono of list to flux
 395     * ********************************************************************
 396     */
 397    @Test
 398    void flatMapIterableTest() {
 399        //Converts Mono of list to Flux.
 400        Mono<List<Integer>> mono = Mono.just(Arrays.asList(1, 2, 3));
 401        Flux<Integer> flux1 = mono.flatMapIterable(list -> list);
 402        flux1.subscribe(System.out::println);
 403        StepVerifier
 404                .create(flux1)
 405                .expectNext(1, 2, 3)
 406                .verifyComplete();
 407    }
 408
 409    /**
 410     * ********************************************************************
 411     *  transform - accepts a Function functional interface. Used when similar transform is used in many places
 412     *  input is flux/mono
 413     *  output is flux/mono
 414     *  takes a flux/mono and returns a flux/mono
 415     * ********************************************************************
 416     */
 417    @Test
 418    void transformTest() {
 419        //Function defines input and output
 420        Function<Flux<String>, Flux<String>> filterMap = name -> name.map(String::toUpperCase);
 421        Flux<String> flux1 = Flux.fromIterable(List.of("Jack", "Joe", "Jill"))
 422                .transform(filterMap)
 423                .filter(s -> s.length() > 3);
 424        flux1.subscribe(System.out::println);
 425        StepVerifier
 426                .create(flux1)
 427                .expectNext("JACK", "JILL")
 428                .verifyComplete();
 429    }
 430
 431    /**
 432     * ********************************************************************
 433     *  switchIfEmpty - similar to defaultIfEmpty but return flux/mono
 434     *  defaultIfEmpty - return a fixed value.
 435     * ********************************************************************
 436     */
 437    @Test
 438    @SneakyThrows
 439    void switchTest() {
 440        Flux<Object> flux1 = emptyFlux()
 441                .switchIfEmpty(Flux.just("empty!"))
 442                .log();
 443        StepVerifier.create(flux1)
 444                .expectSubscription()
 445                .expectNext("empty!")
 446                .expectComplete()
 447                .verify();
 448
 449        Flux<Object> flux2 = emptyFlux()
 450                .defaultIfEmpty("empty!")
 451                .log();
 452        StepVerifier.create(flux2)
 453                .expectSubscription()
 454                .expectNext("empty!")
 455                .expectComplete()
 456                .verify();
 457
 458
 459    }
 460
 461    private Flux<Object> emptyFlux() {
 462        return Flux.empty();
 463    }
 464
 465    /**
 466     * ********************************************************************
 467     *  switchIfEmpty with Optional
 468     * ********************************************************************
 469     */
 470    @Test
 471    public void switchOptionalTest() {
 472
 473        Mono<String> mono1 = getHello1().map(e -> {
 474            return e.get().toUpperCase();
 475        }).switchIfEmpty(Mono.just("empty!"));
 476        StepVerifier.create(mono1)
 477                .expectNext("HELLO")
 478                .expectComplete()
 479                .verify();
 480
 481        Mono<String> mono2 = getHello2().map(e -> {
 482            return e.get().toUpperCase();
 483        }).switchIfEmpty(Mono.just("empty!"));
 484        StepVerifier.create(mono2)
 485                .expectNext("empty!")
 486                .expectComplete()
 487                .verify();
 488    }
 489
 490    private Mono<Optional<String>> getHello1() {
 491        return Mono.just(Optional.of("hello"));
 492    }
 493
 494    //Optional evaluated to object or empty
 495    private Mono<Optional<String>> getHello2() {
 496        return Mono.justOrEmpty(Optional.empty());
 497    }
 498
 499    /**
 500     * ********************************************************************
 501     *  switchIfEmpty - Used as if-else block
 502     * ********************************************************************
 503     */
 504    @Test
 505    void switchIfElseTest() {
 506        final String name = "Jack";
 507        //No need to use Mono.defer on the switchIfEmpty
 508        Mono<String> mono = Mono.just(name)
 509                .flatMap(this::wishGoodMorning)
 510                .switchIfEmpty(this.wishGoodNight(name));
 511
 512        StepVerifier.create(mono)
 513                .expectNext("GOOD NIGHT JACK")
 514                .verifyComplete();
 515    }
 516
 517    private Mono<String> wishGoodMorning(String name) {
 518        log.info("wishGoodMorning {}", name);
 519        if (name.equals("Jack")) {
 520            return Mono.empty();
 521        } else {
 522            return Mono.just("Good Morning " + name);
 523        }
 524    }
 525
 526    private Mono<String> wishGoodNight(String name) {
 527        log.info("wishGoodNight {}", name);
 528        return Mono.just("Good Night " + name)
 529                .map(String::toUpperCase);
 530    }
 531
 532    /**
 533     * ********************************************************************
 534     *  intersect with filterWhen - compare 2 flux for common elements
 535     * ********************************************************************
 536     */
 537    @Test
 538    void fluxIntersectCommonTest() {
 539        Flux<String> flux1 = Flux.just("apple", "orange", "banana").log();
 540        //Without cache on flux2 it will subscribe many times.
 541        Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").log().cache();
 542
 543        Flux<String> commonFlux = flux1.filterWhen(f -> ReactorTest.checkList1(flux2, f));
 544        commonFlux.subscribe(System.out::println);
 545        StepVerifier.create(commonFlux)
 546                .expectNext("apple", "orange")
 547                .verifyComplete();
 548    }
 549
 550    private static Mono<Boolean> checkList1(Flux<String> flux, String fruit) {
 551        //toStream will block so should be avoided. Look at ReactorObjectTest for better approach.
 552        return Mono.just(flux.toStream().anyMatch(e -> e.equals(fruit)));
 553    }
 554
 555    /**
 556     * ********************************************************************
 557     *  intersect with filter - compare 2 flux for common elements
 558     * ********************************************************************
 559     */
 560    @Test
 561    void fluxIntersectCommon2Test() {
 562        Flux<String> flux1 = Flux.just("apple", "orange", "banana").log();
 563        //Without cache on flux2 it will subscribe many times.
 564        Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").log().cache();
 565        Flux<String> commonFlux = flux1.filter(f -> {
 566            //toStream will block so should be avoided. Look at ReactorObjectTest for better approach.
 567            return flux2.toStream().anyMatch(e -> e.equals(f));
 568        });
 569        commonFlux.subscribe(System.out::println);
 570        StepVerifier.create(commonFlux)
 571                .expectNext("apple", "orange")
 572                .verifyComplete();
 573    }
 574
 575    /**
 576     * ********************************************************************
 577     *  intersect with filterWhen - compare 2 flux for diff
 578     * ********************************************************************
 579     */
 580    @Test
 581    void fluxIntersectDiffTest() {
 582        Flux<String> flux1 = Flux.just("apple", "orange", "banana").log();
 583        //Without cache on flux2 it will subscribe many times.
 584        Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").log().cache();
 585
 586        Flux<String> diffFlux = flux1.filterWhen(f -> ReactorTest.checkList2(flux2, f));
 587        diffFlux.subscribe(System.out::println);
 588        StepVerifier.create(diffFlux)
 589                .expectNext("banana")
 590                .verifyComplete();
 591    }
 592
 593    private static Mono<Boolean> checkList2(Flux<String> flux, String fruit) {
 594        //toStream will block so should be avoided. Look at ReactorObjectTest for better approach.
 595        return Mono.just(flux.toStream().anyMatch(e -> e.equals(fruit))).map(hasElement -> !hasElement);
 596    }
 597
 598    /**
 599     * ********************************************************************
 600     *  intersect with filter - compare 2 flux for diff
 601     * ********************************************************************
 602     */
 603    @Test
 604    void fluxIntersectDiff2Test() {
 605        Flux<String> flux1 = Flux.just("apple", "orange", "banana").log();
 606        //Without cache on flux2 it will subscribe many times.
 607        Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").log().cache();
 608        Flux<String> commonFlux = flux1.filter(f -> {
 609            //toStream will block so should be avoided. Look at ReactorObjectTest for better approach.
 610            return !flux2.toStream().anyMatch(e -> e.equals(f));
 611        });
 612        commonFlux.subscribe(System.out::println);
 613        StepVerifier.create(commonFlux)
 614                .expectNext("banana")
 615                .verifyComplete();
 616    }
 617
 618    /**
 619     * ********************************************************************
 620     *  startWith - add new element to flux.
 621     * ********************************************************************
 622     */
 623    @Test
 624    public void startWithTest() {
 625        Flux<Integer> flux = Flux.range(1, 3);
 626        Flux<Integer> integerFlux = flux.startWith(0);
 627        StepVerifier.create(integerFlux)
 628                .expectNext(0, 1, 2, 3)
 629                .verifyComplete();
 630    }
 631
 632    /**
 633     * ********************************************************************
 634     *  index
 635     * ********************************************************************
 636     */
 637    @Test
 638    void fluxIndexTest() {
 639        //append a number to each element.
 640        Flux<Tuple2<Long, String>> index = Flux
 641                .just("apple", "banana", "orange")
 642                .index();
 643        StepVerifier.create(index)
 644                .expectNext(Tuples.of(0L, "apple"))
 645                .expectNext(Tuples.of(1L, "banana"))
 646                .expectNext(Tuples.of(2L, "orange"))
 647                .verifyComplete();
 648    }
 649
 650    /**
 651     * ********************************************************************
 652     *  takeWhile & skipWhile
 653     * ********************************************************************
 654     */
 655    @Test
 656    void takeWhileTest() {
 657        Flux<Integer> numbersFlux = Flux.range(1, 10);
 658        Flux<Integer> takeWhile = numbersFlux.takeWhile(i -> i <= 5);
 659        StepVerifier
 660                .create(takeWhile)
 661                .expectNext(1, 2, 3, 4, 5)
 662                .verifyComplete();
 663
 664        Flux<Integer> skipWhile = numbersFlux.skipWhile(i -> i <= 5);
 665        StepVerifier
 666                .create(skipWhile)
 667                .expectNext(6, 7, 8, 9, 10)
 668                .verifyComplete();
 669    }
 670
 671    /**
 672     * ********************************************************************
 673     *  collectList & collectSortedList- flux to mono of list
 674     * ********************************************************************
 675     */
 676    @Test
 677    void collectListTest() {
 678        Flux<String> flux = Flux.just("apple", "orange");
 679        Mono<List<String>> mono = flux.collectList();
 680        mono.subscribe(System.out::println);
 681        StepVerifier.create(mono)
 682                .expectNext(Arrays.asList("apple", "orange"))
 683                .verifyComplete();
 684
 685        Mono<List<Integer>> listMono1 = Flux
 686                .just(1, 2, 3)
 687                .collectList();
 688        StepVerifier.create(listMono1)
 689                .expectNext(Arrays.asList(1, 2, 3))
 690                .verifyComplete();
 691
 692        Mono<List<Integer>> listMono2 = Flux
 693                .just(5, 2, 4, 1, 3)
 694                .collectSortedList();
 695        StepVerifier.create(listMono2)
 696                .expectNext(Arrays.asList(1, 2, 3, 4, 5))
 697                .verifyComplete();
 698    }
 699
 700    /**
 701     * ********************************************************************
 702     *  collectList
 703     * ********************************************************************
 704     */
 705    @Test
 706    void collectListTest2() {
 707        Mono<List<String>> monoList1 = Flux.just("banana", "apple")
 708                .collectList();
 709        StepVerifier.create(monoList1)
 710                .expectNext(Arrays.asList("banana", "apple"))
 711                .verifyComplete();
 712
 713        Mono<List<String>> monoList2 = Flux.just("banana", "apple")
 714                .collectSortedList();
 715        StepVerifier.create(monoList2)
 716                .expectNext(Arrays.asList("apple", "banana"))
 717                .verifyComplete();
 718
 719        //Dont use infinite flux, will never return.
 720        //Flux.interval(Duration.ofMillis(1000)).collectList().subscribe();
 721
 722        List<String> list3 = new ArrayList<>();
 723        monoList1.subscribe(list3::addAll);
 724        list3.forEach(System.out::println);
 725    }
 726
 727    /**
 728     * ********************************************************************
 729     *  collectMap
 730     * ********************************************************************
 731     */
 732    @Test
 733    void collectMapTest() {
 734        Mono<Map<Object, Object>> flux = Flux.just(
 735                "yellow:banana",
 736                "red:apple").collectMap(item -> item.split(":")[0], item -> item.split(":")[1]);
 737
 738        Map<Object, Object> map1 = new HashMap<>();
 739        flux.subscribe(map1::putAll);
 740        map1.forEach((key, value) -> System.out.println(key + " -> " + value));
 741
 742        StepVerifier.create(flux)
 743                .expectNext(Map.of("yellow", "banana", "red", "apple"))
 744                .verifyComplete();
 745    }
 746
 747    /**
 748     * ********************************************************************
 749     *  collectMultimap
 750     * ********************************************************************
 751     */
 752    @Test
 753    void collectMultimapTest() {
 754        Mono<Map<String, Collection<Object>>> flux = Flux.just("yellow:banana", "red:grapes", "red:apple", "yellow:pineapple")
 755                .collectMultimap(
 756                        item -> item.split(":")[0],
 757                        item -> item.split(":")[1]);
 758        Map<Object, Collection<Object>> map1 = new HashMap<>();
 759        flux.subscribe(map1::putAll);
 760        map1.forEach((key, value) -> System.out.println(key + " -> " + value));
 761
 762        StepVerifier.create(flux)
 763                .expectNext(Map.of("red", List.of("grapes", "apple"), "yellow", List.of("banana", "pineapple")))
 764                .verifyComplete();
 765    }
 766
 767    /**
 768     * ********************************************************************
 769     *  concat - subscribes to publishers in sequence, order guaranteed, static function
 770     * ********************************************************************
 771     */
 772    @Test
 773    @SneakyThrows
 774    void concatTest() {
 775        Flux<String> flux1 = Flux.just("a", "b");
 776        Flux<String> flux2 = Flux.just("c", "d");
 777        Flux<String> flux3 = Flux.concat(flux1, flux2);
 778
 779        StepVerifier.create(flux3)
 780                .expectSubscription()
 781                .expectNext("a", "b", "c", "d")
 782                .verifyComplete();
 783
 784        Flux<String> flux4 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
 785        Flux<String> flux5 = Flux.just("c", "d");
 786        //Lazy will wait till first flux finishes.
 787        Flux<String> flux6 = Flux.concat(flux1, flux2).log();
 788
 789        StepVerifier.create(flux6)
 790                .expectSubscription()
 791                .expectNext("a", "b", "c", "d")
 792                .verifyComplete();
 793    }
 794
 795    /**
 796     * ********************************************************************
 797     *  concatWith - subscribes to publishers in sequence, order guaranteed, instance function
 798     * ********************************************************************
 799     */
 800    @Test
 801    @SneakyThrows
 802    void concatWithTest() {
 803        Flux<String> flux1 = Flux.just("a", "b");
 804        Flux<String> flux2 = Flux.just("c", "d");
 805        Flux<String> flux3 = flux1.concatWith(flux2).log();
 806        StepVerifier.create(flux3)
 807                .expectSubscription()
 808                .expectNext("a", "b", "c", "d")
 809                .verifyComplete();
 810
 811        Mono<String> aFlux = Mono.just("a");
 812        Mono<String> bFlux = Mono.just("b");
 813        Flux<String> stringFlux = aFlux.concatWith(bFlux);
 814        stringFlux.subscribe(System.out::println);
 815        StepVerifier.create(stringFlux)
 816                .expectNext("a", "b")
 817                .verifyComplete();
 818    }
 819
 820    @Test
 821    void concatDelayErrorTest() {
 822        Flux<String> flux1 = Flux.just("a", "b").map(s -> {
 823            if (s.equals("b")) {
 824                throw new IllegalArgumentException("error!");
 825            }
 826            return s;
 827        });
 828        Flux<String> flux2 = Flux.just("c", "d");
 829        Flux<String> flux = Flux.concatDelayError(flux1, flux2)
 830                .log();
 831        StepVerifier.create(flux)
 832                .expectSubscription()
 833                .expectNext("a", "c", "d")
 834                .expectError()
 835                .verify();
 836    }
 837
 838    /**
 839     * ********************************************************************
 840     *  merge - subscribes to publishers eagerly, order not guaranteed, static function
 841     * ********************************************************************
 842     */
 843    @Test
 844    @SneakyThrows
 845    void mergeTest() {
 846        Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
 847        Flux<String> flux2 = Flux.just("c", "d");
 848        //Eager will not wait till first flux finishes.
 849        Flux<String> flux = Flux.merge(flux1, flux2)
 850                .log();
 851        StepVerifier.create(flux)
 852                .expectSubscription()
 853                .expectNext("c", "d", "a", "b")
 854                .verifyComplete();
 855    }
 856
 857    /**
 858     * ********************************************************************
 859     *  mergeWith - subscribes to publishers in eagerly, order not guaranteed, instance function
 860     * ********************************************************************
 861     */
 862    @Test
 863    @SneakyThrows
 864    void mergeWithTest() {
 865        Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
 866        Flux<String> flux2 = Flux.just("c", "d");
 867        //Eager will not wait till first flux finishes.
 868        Flux<String> flux3 = flux1.mergeWith(flux2).log();
 869        StepVerifier.create(flux3)
 870                .expectSubscription()
 871                .expectNext("c", "d", "a", "b")
 872                .verifyComplete();
 873
 874        Mono aMono = Mono.just("a");
 875        Mono bMono = Mono.just("b");
 876        Flux flux4 = aMono.mergeWith(bMono);
 877        StepVerifier.create(flux4)
 878                .expectNext("a", "b")
 879                .verifyComplete();
 880    }
 881
 882    /**
 883     * ********************************************************************
 884     *  mergeSequential - subscribes to publishers eagerly, result is sequential.
 885     *  concat          - subscribes to publishers in sequence, result is sequential.
 886     * ********************************************************************
 887     */
 888    @Test
 889    @SneakyThrows
 890    void mergeSequentialTest() {
 891        Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
 892        Flux<String> flux2 = Flux.just("c", "d");
 893        Flux<String> flux = Flux.mergeSequential(flux1, flux2, flux1)
 894                .log();
 895        StepVerifier.create(flux)
 896                .expectSubscription()
 897                .expectNext("a", "b", "c", "d", "a", "b")
 898                .verifyComplete();
 899    }
 900
 901    @Test
 902    void mergeDelayTest() {
 903        Flux<String> flux1 = Flux.just("a", "b").map(s -> {
 904            if (s.equals("b")) {
 905                throw new IllegalArgumentException("error!");
 906            }
 907            return s;
 908        }).doOnError(e -> log.error("Error: {}", e));
 909
 910        Flux<String> flux2 = Flux.just("c", "d");
 911        Flux<String> flux = Flux.mergeDelayError(1, flux1, flux2, flux1)
 912                .log();
 913        StepVerifier.create(flux)
 914                .expectSubscription()
 915                .expectNext("a", "c", "d", "a")
 916                .expectError()
 917                .verify();
 918    }
 919
 920    /**
 921     * ********************************************************************
 922     *  zip - subscribes to publishers in eagerly, waits for both flux to emit one element. 2-8 flux can be zipped, returns a tuple, Static function
 923     * ********************************************************************
 924     */
 925    @Test
 926    void fluxZipTest() {
 927        Flux<String> flux1 = Flux.just("red", "yellow");
 928        Flux<String> flux2 = Flux.just("apple", "banana");
 929        Flux<String> flux3 = Flux.zip(flux1, flux2)
 930                .map(tuple -> {
 931                    return (tuple.getT1() + " " + tuple.getT2());
 932                });
 933        flux3.subscribe(System.out::println);
 934        StepVerifier.create(flux3)
 935                .expectNext("red apple")
 936                .expectNext("yellow banana")
 937                .verifyComplete();
 938
 939        //Third argument is combinator lambda
 940        Flux<Integer> firstFlux = Flux.just(1, 2, 3);
 941        Flux<Integer> secondFlux = Flux.just(10, 20, 30, 40);
 942        //Define how the zip should happen
 943        Flux<Integer> zip = Flux.zip(firstFlux, secondFlux, (num1, num2) -> num1 + num2);
 944        StepVerifier
 945                .create(zip)
 946                .expectNext(11, 22, 33)
 947                .verifyComplete();
 948    }
 949
 950    /**
 951     * ********************************************************************
 952     *  zipWith - subscribes to publishers in eagerly, waits for both flux to emit one element. 2-8 flux can be zipped, returns a tuple, Instance function
 953     * ********************************************************************
 954     */
 955    @Test
 956    void fluxZipWithTest() {
 957        Flux<String> flux1 = Flux.just("red", "yellow");
 958        Flux<String> flux2 = Flux.just("apple", "banana");
 959        Flux<String> flux3 = flux1.zipWith(flux2)
 960                .map(tuple -> {
 961                    return (tuple.getT1() + " " + tuple.getT2());
 962                });
 963        StepVerifier.create(flux3)
 964                .expectNext("red apple")
 965                .expectNext("yellow banana")
 966                .verifyComplete();
 967
 968        Flux<String> flux4 = Flux.fromIterable(Arrays.asList("apple", "orange", "banana"))
 969                .zipWith(Flux.range(1, 5), (word, line) -> {
 970                    return line + ". " + word;
 971                });
 972        StepVerifier.create(flux4)
 973                .expectNext("1. apple")
 974                .expectNext("2. orange")
 975                .expectNext("3. banana")
 976                .verifyComplete();
 977    }
 978
 979    /**
 980     * ********************************************************************
 981     *  Error Recover Handling
 982     *  onErrorReturn - Return value on error
 983     * ********************************************************************
 984     */
 985    @Test
 986    void onErrorReturnTest() {
 987        Mono<Object> mono1 = Mono.error(new RuntimeException("My Error"))
 988                .onErrorReturn("Jack");
 989        StepVerifier.create(mono1)
 990                .expectNext("Jack")
 991                .verifyComplete();
 992    }
 993
 994    /**
 995     * ********************************************************************
 996     *  Error Recover Handling
 997     *  onErrorResume - Resume chain with new mono/flux.
 998     * ********************************************************************
 999     */
1000    @Test
1001    void onErrorResumeTest() {
1002        Mono<Object> mono1 = Mono.error(new RuntimeException("My Error"))
1003                .onErrorResume(e -> Mono.just("Jack"));
1004        StepVerifier.create(mono1)
1005                .expectNext("Jack")
1006                .verifyComplete();
1007
1008        Mono<Object> mono2 = Mono.error(new RuntimeException("My Error"))
1009                .onErrorResume(s -> {
1010                    log.info("Inside on onErrorResume");
1011                    return Mono.just("Jack");
1012                })
1013                .log();
1014        StepVerifier.create(mono2)
1015                .expectNext("Jack")
1016                .verifyComplete();
1017    }
1018
1019    /**
1020     * ********************************************************************
1021     *  Error Recover Handling
1022     *  onErrorContinue - Continue chain even if error occurs
1023     * ********************************************************************
1024     */
1025    @Test
1026    void onErrorContinueTest() {
1027        Flux<String> flux1 =
1028                Flux.just("a", "b", "c")
1029                        .map(e -> {
1030                            if (e.equals("b"))
1031                                throw new RuntimeException("My Error!");
1032                            return e;
1033                        })
1034                        .concatWith(Mono.just("d"))
1035                        .onErrorContinue((ex, value) -> {
1036                            log.info("Exception: {}", ex);
1037                            log.info("value: {}", value);
1038                        })
1039                        .log();
1040        StepVerifier.create(flux1)
1041                .expectNext("a", "c", "d")
1042                .verifyComplete();
1043    }
1044
1045    /**
1046     * ********************************************************************
1047     *  Error - Action
1048     *  doOnError - log the error, Side-effect operator.
1049     * ********************************************************************
1050     */
1051    @Test
1052    void doOnErrorTest() {
1053        Mono<Object> mono1 = Mono.error(new RuntimeException("My Error"))
1054                .doOnError(e -> log.error("Error: {}", e.getMessage()))
1055                .log();
1056        StepVerifier.create(mono1)
1057                .expectError(RuntimeException.class)
1058                .verify();
1059    }
1060
1061    /**
1062     * ********************************************************************
1063     *  Error - Action
1064     *  onErrorMap - Transform an error emitted
1065     * ********************************************************************
1066     */
1067    @Test
1068    void onErrorMapTest() {
1069        Flux flux1 = Flux.just("Jack", "Jill").map(u -> {
1070            if (u.equals("Jill")) {
1071                //always do throw here, never do return.
1072                throw new IllegalArgumentException("Not valid");
1073            }
1074            if (u.equals("Jack")) {
1075                throw new ClassCastException("Not valid");
1076            }
1077            return u;
1078        }).onErrorMap(IllegalArgumentException.class, e -> {
1079            log.info("Illegal Arg error");
1080            throw new RuntimeException("Illegal Arg error!");
1081        }).onErrorMap(ClassCastException.class, e -> {
1082            log.info("Class cast error");
1083            throw new RuntimeException("Class cast error!");
1084        });
1085
1086        StepVerifier.create(flux1)
1087                .expectErrorMessage("Class cast error!")
1088                .verify();
1089    }
1090
1091    /**
1092     * ********************************************************************
1093     *  retry
1094     * ********************************************************************
1095     */
1096    @Test
1097    void retryTest() {
1098        Mono<String> mono = Mono.just("Jack")
1099                .flatMap(this::twoAttemptFunction)
1100                .retry(3);
1101        StepVerifier.create(mono)
1102                .assertNext(e -> {
1103                    assertThat(e).isEqualTo("Hello Jack");
1104                })
1105                .verifyComplete();
1106    }
1107
1108    AtomicLong attemptCounter1 = new AtomicLong();
1109
1110    private Mono<String> twoAttemptFunction(String name) {
1111        Long attempt = attemptCounter1.getAndIncrement();
1112        log.info("attempt value: {}", attempt);
1113        if (attempt < 2) {
1114            throw new RuntimeException("FAILURE");
1115        }
1116        return Mono.just("Hello " + name);
1117    }
1118
1119    /**
1120     * ********************************************************************
1121     *  retryWhen
1122     * ********************************************************************
1123     */
1124    @Test
1125    void retryWhenTest() {
1126        attemptCounter2 = new AtomicLong();
1127        RetryBackoffSpec retryFilter1 = Retry.backoff(3, Duration.ofSeconds(1))
1128                .filter(throwable -> throwable instanceof RuntimeException);
1129
1130        Mono<String> mono1 = Mono.just("Jack")
1131                .flatMap(this::greetAfter2Failure)
1132                .retryWhen(retryFilter1);
1133        StepVerifier.create(mono1)
1134                .assertNext(e -> {
1135                    assertThat(e).isEqualTo("Hello Jack");
1136                })
1137                .verifyComplete();
1138
1139        attemptCounter2 = new AtomicLong();
1140        RetryBackoffSpec retryFilter2 = Retry.fixedDelay(1, Duration.ofSeconds(1))
1141                .filter(throwable -> throwable instanceof RuntimeException)
1142                .onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) ->
1143                        Exceptions.propagate(retrySignal.failure())
1144                ));
1145        Mono<String> mono2 = Mono.just("Jack")
1146                .flatMap(this::greetAfter2Failure)
1147                .retryWhen(retryFilter2);
1148        StepVerifier.create(mono2)
1149                .expectErrorMessage("FAILURE")
1150                .verify();
1151    }
1152
1153    AtomicLong attemptCounter2;
1154
1155    private Mono<String> greetAfter2Failure(String name) {
1156        Long attempt = attemptCounter2.getAndIncrement();
1157        log.info("attempt value: {}", attempt);
1158        if (attempt < 2) {
1159            throw new RuntimeException("FAILURE");
1160        }
1161        return Mono.just("Hello " + name);
1162    }
1163
1164    /**
1165     * ********************************************************************
1166     *  repeat - repeat an operation n times.
1167     * ********************************************************************
1168     */
1169    @Test
1170    void repeatTest() {
1171        Mono<List<String>> flux = getDate()
1172                .repeat(5)
1173                .collectList();
1174        StepVerifier.create(flux)
1175                .assertNext(e -> {
1176                    assertThat(e.size()).isEqualTo(6);
1177                })
1178                .verifyComplete();
1179    }
1180
1181    private Mono<String> getDate() {
1182        return Mono.just("Time " + new Date());
1183    }
1184
1185    /**
1186     * ********************************************************************
1187     *  doOn - doOnSubscribe, doOnNext, doOnError, doFinally, doOnComplete
1188     * ********************************************************************
1189     */
1190    @Test
1191    void doOnTest1() {
1192        Flux<Integer> numFlux = Flux.range(1, 5)
1193                .log()
1194                .map(i -> {
1195                    if (i == 4) {
1196                        throw new RuntimeException("Num Error!");
1197                    }
1198                    return i;
1199                });
1200        numFlux.subscribe(s -> {
1201                    log.info("Number: {}", s);
1202                },
1203                Throwable::printStackTrace,
1204                () -> {
1205                    log.info("Done!");
1206                });
1207        StepVerifier.create(numFlux)
1208                .expectNext(1, 2, 3)
1209                .expectError(RuntimeException.class)
1210                .verify();
1211    }
1212
1213    @Test
1214    void doOnTest2() {
1215        Flux<Object> flux = Flux.error(new RuntimeException("My Error"))
1216                .doOnSubscribe(s -> System.out.println("Subscribed!"))
1217                .doOnNext(p -> System.out.println("Next!"))
1218                .doOnComplete(() -> System.out.println("Completed!"))
1219                .doFinally((e) -> System.out.println("Signal: " + e))
1220                .doOnError((e) -> System.out.println("Error: " + e));
1221
1222        StepVerifier.create(flux)
1223                .expectError(RuntimeException.class)
1224                .verify();
1225
1226        StepVerifier.create(flux)
1227                .verifyError(RuntimeException.class);
1228    }
1229
1230    @Test
1231    void doOnTest3() {
1232        Flux flux = Flux.error(new RuntimeException("My Error"));
1233        flux.subscribe(
1234                onNext(),
1235                onError(),
1236                onComplete()
1237        );
1238    }
1239
1240    private static Consumer<Object> onNext() {
1241        return o -> System.out.println("Received : " + o);
1242    }
1243
1244    private static Consumer<Throwable> onError() {
1245        return e -> System.out.println("ERROR : " + e.getMessage());
1246    }
1247
1248    private static Runnable onComplete() {
1249        return () -> System.out.println("Completed");
1250    }
1251
1252    /**
1253     * ********************************************************************
1254     *  StepVerifier - assertNext, thenRequest, thenCancel, expectError, expectErrorMessage
1255     * ********************************************************************
1256     */
1257    @Test
1258    void fluxStepVerifyTest() {
1259        Flux flux = Flux.fromIterable(Arrays.asList("Jack", "Jill"));
1260        StepVerifier.create(flux)
1261                .expectNextMatches(user -> user.equals("Jack"))
1262                .assertNext(user -> assertThat(user).isEqualTo("Jill"))
1263                .verifyComplete();
1264
1265        //Wait for 2 elements.
1266        StepVerifier.create(flux)
1267                .expectNextCount(2)
1268                .verifyComplete();
1269
1270        //Request 1 value at a time, get 2 values then cancel.
1271        Flux flux2 = Flux.fromIterable(Arrays.asList("Jack", "Jill", "Raj"));
1272        StepVerifier.create(flux2, 1)
1273                .expectNext("JACK")
1274                .thenRequest(1)
1275                .expectNext("JILL")
1276                .thenCancel();
1277
1278        Mono<Object> mono1 = Mono.error(new RuntimeException("My Error"));
1279        StepVerifier.create(mono1)
1280                .expectError(RuntimeException.class)
1281                .verify();
1282        StepVerifier.create(mono1)
1283                .expectErrorMessage("My Error")
1284                .verify();
1285    }
1286
1287    /**
1288     * ********************************************************************
1289     *  flux error propagate
1290     * ********************************************************************
1291     */
1292    @Test
1293    void errorPropagateTest() {
1294        Flux flux1 = Flux.just("Jack", "Jill").map(u -> {
1295            try {
1296                return ReactorTest.checkName(u);
1297            } catch (CustomException e) {
1298                throw Exceptions.propagate(e);
1299            }
1300        });
1301        flux1.subscribe(System.out::println);
1302        StepVerifier.create(flux1)
1303                .expectNext("JACK")
1304                .verifyError(CustomException.class);
1305    }
1306
1307    private static String checkName(String name) throws CustomException {
1308        if (name.equals("Jill")) {
1309            throw new CustomException();
1310        }
1311        return name.toUpperCase();
1312    }
1313
1314    protected static final class CustomException extends Exception {
1315        private static final long serialVersionUID = 0L;
1316    }
1317
1318    /**
1319     * ********************************************************************
1320     *  subscribeOn - influences upstream (whole chain)
1321     * ********************************************************************
1322     */
1323    @Test
1324    void subscribeOnTest() {
1325        Flux numbFlux = Flux.range(1, 5)
1326                .map(i -> {
1327                    log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1328                    return i;
1329                }).subscribeOn(Schedulers.single())
1330                .map(i -> {
1331                    log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1332                    return i;
1333                });
1334        numbFlux.subscribe();
1335    }
1336
1337    /**
1338     * ********************************************************************
1339     *  subscribeOn - influences upstream (whole chain)
1340     * ********************************************************************
1341     */
1342    @Test
1343    void subscribeOnTest2() {
1344        Flux numbFlux = Flux.range(1, 5)
1345                .map(i -> {
1346                    log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1347                    return i;
1348                }).subscribeOn(Schedulers.newSingle("my-thread"))
1349                .map(i -> {
1350                    log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1351                    return i;
1352                });
1353        numbFlux.subscribe();
1354    }
1355
1356    /**
1357     * ********************************************************************
1358     *  publishOn - influences downstream
1359     * ********************************************************************
1360     */
1361    @Test
1362    void publishOnTest() {
1363        Flux numbFlux = Flux.range(1, 5)
1364                .map(i -> {
1365                    log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1366                    return i;
1367                }).publishOn(Schedulers.single())
1368                .map(i -> {
1369                    log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1370                    return i;
1371                });
1372        numbFlux.subscribe();
1373    }
1374
1375    /**
1376     * ********************************************************************
1377     *  publishOn - influences downstream
1378     * ********************************************************************
1379     */
1380    @Test
1381    void publishOnTest2() {
1382        Flux numbFlux = Flux.range(1, 5)
1383                .map(i -> {
1384                    log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1385                    return i;
1386                }).publishOn(Schedulers.newSingle("my-thread"))
1387                .map(i -> {
1388                    log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1389                    return i;
1390                });
1391        numbFlux.subscribe();
1392    }
1393
1394    /**
1395     * ********************************************************************
1396     *  fromSupplier - returns a value
1397     *  fromCallable - returns a value or exception, runs blocking function on different thread
1398     * ********************************************************************
1399     */
1400    @Test
1401    public void monoSupplierTest() {
1402        Supplier<String> stringSupplier = () -> getName();
1403        Mono<String> mono = Mono.fromSupplier(stringSupplier)
1404                .log();
1405        mono.subscribe(System.out::println);
1406    }
1407
1408    /**
1409     * ********************************************************************
1410     *  fromSupplier - returns a value
1411     *  fromCallable - returns a value or exception, runs blocking function on different thread
1412     * ********************************************************************
1413     */
1414    @Test
1415    public void monoCallableTest() {
1416        Callable<String> stringCallable = () -> getName();
1417        Mono<String> mono = Mono.fromCallable(stringCallable)
1418                .log()
1419                .subscribeOn(Schedulers.boundedElastic());
1420        mono.subscribe(System.out::println);
1421    }
1422
1423    /**
1424     * ********************************************************************
1425     *  fromCallable - read file may be blocking so we dont want to block main thread.
1426     * ********************************************************************
1427     */
1428    @Test
1429    @SneakyThrows
1430    void readFileTest() {
1431        Mono<List<String>> listMono = Mono.fromCallable(() -> Files.readAllLines(Path.of("src/test/resources/file.txt")))
1432                .log()
1433                .subscribeOn(Schedulers.boundedElastic());
1434
1435        listMono.subscribe(l -> log.info("Line: {}", l.size()));
1436        TimeUnit.SECONDS.sleep(5);
1437
1438        StepVerifier.create(listMono)
1439                .expectSubscription()
1440                .thenConsumeWhile(l -> {
1441                    assertThat(l.isEmpty()).isFalse();
1442                    return true;
1443                })
1444                .verifyComplete();
1445    }
1446
1447    /**
1448     * ********************************************************************
1449     *  fromRunnable - runs blocking function on different thread, but doesnt return value
1450     * ********************************************************************
1451     */
1452    @Test
1453    public void monoRunnableTest() {
1454        Runnable stringCallable = () -> getName();
1455        Mono<Object> mono = Mono.fromRunnable(stringCallable)
1456                .log()
1457                .subscribeOn(Schedulers.boundedElastic());
1458        mono.subscribe(System.out::println);
1459    }
1460
1461    /**
1462     * ********************************************************************
1463     *  ParallelFlux - Will complete in 1 sec even when 3 ops take 3 seconds in sequence
1464     * ********************************************************************
1465     */
1466    @Test
1467    void fluxParallelTest() {
1468        log.info("Cores: {}", Runtime.getRuntime().availableProcessors());
1469        ParallelFlux<String> flux1 = Flux.just("apple", "orange", "banana")
1470                .parallel()
1471                .runOn(Schedulers.parallel())
1472                .map(ReactorTest::capitalizeString)
1473                .log();
1474        StepVerifier.create(flux1)
1475                .expectNextCount(3)
1476                .verifyComplete();
1477
1478
1479        Flux<String> flux2 = Flux.just("apple", "orange", "banana")
1480                .flatMap(name -> {
1481                    return Mono.just(name)
1482                            .map(ReactorTest::capitalizeString)
1483                            .subscribeOn(Schedulers.parallel());
1484                })
1485                .log();
1486        StepVerifier.create(flux2)
1487                .expectNextCount(3)
1488                .verifyComplete();
1489    }
1490
1491    /**
1492     * ********************************************************************
1493     *  flatMap Parallelism - Will complete in 1 sec even when 3 ops take 3 seconds in sequence
1494     * ********************************************************************
1495     */
1496    @Test
1497    void fluxParallelWithFlatMapTest() {
1498        Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1499                .flatMap(name -> {
1500                    return Mono.just(name)
1501                            .map(ReactorTest::capitalizeString)
1502                            .subscribeOn(Schedulers.parallel());
1503                })
1504                .log();
1505        StepVerifier.create(flux1)
1506                .expectNextCount(3)
1507                .verifyComplete();
1508    }
1509
1510    @SneakyThrows
1511    private static String capitalizeString(String element) {
1512        log.info("Capitalizing: {}", element);
1513        TimeUnit.SECONDS.sleep(1);
1514        return element.toUpperCase();
1515    }
1516
1517    /**
1518     * ********************************************************************
1519     *  flatMap - fire-forget jobs with subscribe, Will run async jobs
1520     * ********************************************************************
1521     */
1522    @SneakyThrows
1523    @Test
1524    void fireForgetTest() {
1525        CountDownLatch latch = new CountDownLatch(3);
1526        Flux<Object> flux1 = Flux.just("apple", "orange", "banana")
1527                .flatMap(fruit -> {
1528                    Mono.just(fruit)
1529                            .map(e -> ReactorTest.capitalizeStringLatch(e, latch))
1530                            .subscribeOn(Schedulers.parallel())
1531                            .subscribe();
1532                    return Mono.empty();
1533                })
1534                .log();
1535        StepVerifier.create(flux1)
1536                .verifyComplete();
1537        latch.await(5, TimeUnit.SECONDS);
1538    }
1539
1540    @SneakyThrows
1541    private static String capitalizeStringLatch(String element, CountDownLatch latch) {
1542        log.info("Capitalizing: {}", element);
1543        TimeUnit.SECONDS.sleep(1);
1544        latch.countDown();
1545        return element.toUpperCase();
1546    }
1547
1548    /**
1549     * ********************************************************************
1550     *  flatMapSequential - Maintains order but executes in parallel
1551     * ********************************************************************
1552     */
1553    @Test
1554    void flatMapSequentialTest() {
1555        Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1556                .flatMapSequential(name -> {
1557                    return Mono.just(name)
1558                            .map(ReactorTest::capitalizeString)
1559                            .subscribeOn(Schedulers.parallel());
1560                })
1561                .log();
1562        StepVerifier.create(flux1)
1563                .expectNext("APPLE", "ORANGE", "BANANA")
1564                .verifyComplete();
1565    }
1566
1567    /**
1568     * ********************************************************************
1569     *  flatMapSequential - Maintains order but executes in parallel
1570     * ********************************************************************
1571     */
1572    @Test
1573    void flatMapSequentialConcurrencyTest() {
1574        Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1575                .flatMapSequential(name -> {
1576                    return Mono.just(name)
1577                            .map(ReactorTest::capitalizeString)
1578                            .subscribeOn(Schedulers.parallel());
1579                }, 1)
1580                .log();
1581        StepVerifier.create(flux1)
1582                .expectNext("APPLE", "ORANGE", "BANANA")
1583                .verifyComplete();
1584    }
1585
1586    /**
1587     * ********************************************************************
1588     *  withVirtualTime - flux that emits every second.
1589     * ********************************************************************
1590     */
1591    @Test
1592    @SneakyThrows
1593    void fluxIntervalTakeTest() {
1594        VirtualTimeScheduler.getOrSet();
1595        Flux<Long> interval = Flux.interval(Duration.ofSeconds(1))
1596                .log()
1597                .take(10);
1598        interval.subscribe(i -> log.info("Number: {}", i));
1599        TimeUnit.SECONDS.sleep(5);
1600        StepVerifier.withVirtualTime(() -> interval)
1601                .thenAwait(Duration.ofSeconds(5))
1602                .expectNextCount(4)
1603                .thenCancel()
1604                .verify();
1605    }
1606
1607    /**
1608     * ********************************************************************
1609     *  flux that emits every day. Use of virtual time to simulate days.
1610     * ********************************************************************
1611     */
1612    @Test
1613    @SneakyThrows
1614    void fluxIntervalVirtualTimeTest() {
1615        VirtualTimeScheduler.getOrSet();
1616        StepVerifier.withVirtualTime(this::getTake)
1617                .expectSubscription()
1618                .expectNoEvent(Duration.ofDays(1))
1619                .thenAwait(Duration.ofDays(1))
1620                .expectNext(0L)
1621                .thenAwait(Duration.ofDays(1))
1622                .expectNext(1L)
1623                .thenCancel()
1624                .verify();
1625    }
1626
1627    private Flux<Long> getTake() {
1628        return Flux.interval(Duration.ofDays(1))
1629                .log()
1630                .take(10);
1631    }
1632
1633    /**
1634     * ********************************************************************
1635     *  then - will just replay the source terminal signal, resulting in a Mono<Void> to indicate that this never signals any onNext.
1636     *  thenEmpty - not only returns a Mono<Void>, but it takes a Mono<Void> as a parameter. It represents a concatenation of the source completion signal then the second, empty Mono completion signal. In other words, it completes when A then B have both completed sequentially, and doesn't emit data.
1637     *  thenMany - waits for the source to complete then plays all the signals from its Publisher<R> parameter, resulting in a Flux<R> that will "pause" until the source completes, then emit the many elements from the provided publisher before replaying its completion signal as well.
1638     * ********************************************************************
1639     */
1640    @Test
1641    void thenManyChainTest() {
1642        Flux<String> names = Flux.just("Jack", "Jill");
1643        names.map(String::toUpperCase)
1644                .thenMany(ReactorTest.deleteFromDb())
1645                .thenMany(ReactorTest.saveToDb())
1646                .subscribe(System.out::println);
1647    }
1648
1649    private static Flux<String> deleteFromDb() {
1650        return Flux.just("Deleted from db").log();
1651    }
1652
1653    private static Flux<String> saveToDb() {
1654        return Flux.just("Saved to db").log();
1655    }
1656
1657    private static Mono<Void> sendMail() {
1658        return Mono.empty();
1659    }
1660
1661    @Test
1662    void thenEmptyTest() {
1663        Flux<String> names = Flux.just("Jack", "Jill");
1664        names.map(String::toUpperCase)
1665                .thenMany(ReactorTest.saveToDb())
1666                .thenEmpty(ReactorTest.sendMail())
1667                .subscribe(System.out::println);
1668    }
1669
1670    @Test
1671    void thenTest() {
1672        Flux<String> names = Flux.just("Jack", "Jill");
1673        names.map(String::toUpperCase)
1674                .thenMany(ReactorTest.saveToDb())
1675                .then()
1676                .then(Mono.just("Ram"))
1677                .thenReturn("Done!")
1678                .subscribe(System.out::println);
1679    }
1680
1681    /**
1682     * ********************************************************************
1683     *  firstWithValue - first mono to return
1684     * ********************************************************************
1685     */
1686    @Test
1687    void monoFirstTest() {
1688        Mono<String> mono1 = Mono.just("Jack").delayElement(Duration.ofSeconds(1));
1689        Mono<String> mono2 = Mono.just("Jill");
1690        //Return the mono which returns its value faster
1691        Mono<String> mono3 = Mono.firstWithValue(mono1, mono2);
1692        mono3.subscribe(System.out::println);
1693        StepVerifier.create(mono3)
1694                .expectNext("Jill")
1695                .verifyComplete();
1696    }
1697
1698    /**
1699     * ********************************************************************
1700     *  buffer
1701     * ********************************************************************
1702     */
1703    @Test
1704    public void bufferGroupTest() {
1705        Flux<List<Integer>> flux1 = Flux
1706                .range(1, 7)
1707                .buffer(2);
1708        StepVerifier
1709                .create(flux1)
1710                .expectNext(Arrays.asList(1, 2))
1711                .expectNext(Arrays.asList(3, 4))
1712                .expectNext(Arrays.asList(5, 6))
1713                .expectNext(Arrays.asList(7))
1714                .verifyComplete();
1715    }
1716
1717    @Test
1718    @SneakyThrows
1719    public void tickClockTest() {
1720        Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
1721        Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
1722        Flux.merge(fastClock, slowClock).subscribe(System.out::println);
1723        TimeUnit.SECONDS.sleep(5);
1724    }
1725
1726    @Test
1727    @SneakyThrows
1728    public void tickMergeClockTest() {
1729        Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
1730        Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
1731        Flux clock = Flux.merge(slowClock, fastClock);
1732        Flux feed = Flux.interval(Duration.ofSeconds(1)).map(tick -> LocalTime.now());
1733        clock.withLatestFrom(feed, (tick, time) -> tick + " " + time).subscribe(System.out::println);
1734        TimeUnit.SECONDS.sleep(15);
1735    }
1736
1737    @Test
1738    @SneakyThrows
1739    public void tickZipClockTest() {
1740        Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
1741        Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
1742        fastClock.zipWith(slowClock, (tick, time) -> tick + " " + time).subscribe(System.out::println);
1743        TimeUnit.SECONDS.sleep(5);
1744    }
1745
1746    @Test
1747    @SneakyThrows
1748    public void emitterTest() {
1749        MyFeed myFeed = new MyFeed();
1750        Flux feedFlux = Flux.create(emmiter -> {
1751            myFeed.register(new MyListener() {
1752                @Override
1753                public void priceTick(String msg) {
1754                    emmiter.next(msg);
1755                }
1756
1757                @Override
1758                public void error(Throwable error) {
1759                    emmiter.error(error);
1760                }
1761            });
1762        }, FluxSink.OverflowStrategy.LATEST);
1763        feedFlux.subscribe(System.out::println);
1764        TimeUnit.SECONDS.sleep(15);
1765        System.out.println("Sending message!");
1766        for (int i = 0; i < 10; i++) {
1767            myFeed.sendMessage("HELLO_" + i);
1768        }
1769    }
1770
1771    /**
1772     * ********************************************************************
1773     *  cancel subscription
1774     * ********************************************************************
1775     */
1776    @Test
1777    void monoCancelSubscriptionTest() {
1778        Mono<String> helloMono = Mono.just("Jack")
1779                .log()
1780                .map(String::toUpperCase);
1781        helloMono.subscribe(s -> {
1782                    log.info("Got: {}", s);
1783                },
1784                Throwable::printStackTrace,
1785                () -> log.info("Finished"),
1786                Subscription::cancel
1787        );
1788    }
1789
1790    /**
1791     * ********************************************************************
1792     *  cancel subscription after n elements
1793     * ********************************************************************
1794     */
1795    @Test
1796    void monoCompleteSubscriptionRequestBoundedTest() {
1797        //Jill wont be fetched as subscription will be cancelled after 2 elements
1798        Flux<String> namesMono = Flux.just("Jack", "Jane", "Jill")
1799                .log()
1800                .map(String::toUpperCase);
1801        namesMono.subscribe(s -> {
1802                    log.info("Got: {}", s);
1803                },
1804                Throwable::printStackTrace,
1805                () -> log.info("Finished"),
1806                subscription -> subscription.request(2));
1807    }
1808
1809    /**
1810     * ********************************************************************
1811     *  backpressure
1812     * ********************************************************************
1813     */
1814    @Test
1815    void fluxBackPressureTest() {
1816        Flux<Integer> fluxNumber = Flux.range(1, 5).log();
1817
1818        //Fetches 2 at a time.
1819        fluxNumber.subscribe(new BaseSubscriber<>() {
1820            private int count = 0;
1821            private final int requestCount = 2;
1822
1823            @Override
1824            protected void hookOnSubscribe(Subscription subscription) {
1825                request(requestCount);
1826            }
1827
1828            @Override
1829            protected void hookOnNext(Integer value) {
1830                count++;
1831                if (count >= requestCount) {
1832                    count = 0;
1833                    log.info("requesting next batch!");
1834                    request(requestCount);
1835                }
1836            }
1837        });
1838    }
1839
1840    /**
1841     * ********************************************************************
1842     *  onBackpressureDrop - fetches all in unbounded request, but stores in internal queue, drops elements not used
1843     * ********************************************************************
1844     */
1845    @Test
1846    void fluxBackPressureDropTest() {
1847        Flux<Integer> fluxNumber = Flux.range(1, 15).log();
1848
1849        //Fetches 2 at a time.
1850        fluxNumber
1851                .onBackpressureDrop(item -> {
1852                    log.info("Dropped {}", item);
1853                })
1854                .subscribe(new BaseSubscriber<>() {
1855                    private int count = 0;
1856                    private final int requestCount = 2;
1857                    private int batch = 0;
1858
1859                    @Override
1860                    protected void hookOnSubscribe(Subscription subscription) {
1861                        request(requestCount);
1862                    }
1863
1864                    @Override
1865                    protected void hookOnNext(Integer value) {
1866                        if (batch > 2) {
1867                            return;
1868                        }
1869                        count++;
1870                        if (count >= requestCount) {
1871                            count = 0;
1872                            batch++;
1873                            log.info("requesting next batch {}", batch);
1874                            request(requestCount);
1875                        }
1876
1877                    }
1878                });
1879    }
1880
1881    /**
1882     * ********************************************************************
1883     *  onBackpressureBuffer - fetches all in unbounded request, but stores in internal queue, but doesnt drop unused items
1884     * ********************************************************************
1885     */
1886    @Test
1887    void fluxBackPressureBuffetTest() {
1888        Flux<Integer> fluxNumber = Flux.range(1, 15).log();
1889
1890        //Fetches 2 at a time.
1891        fluxNumber
1892                .onBackpressureBuffer()
1893                .subscribe(new BaseSubscriber<>() {
1894                    private int count = 0;
1895                    private final int requestCount = 2;
1896                    private int batch = 0;
1897
1898                    @Override
1899                    protected void hookOnSubscribe(Subscription subscription) {
1900                        request(requestCount);
1901                    }
1902
1903                    @Override
1904                    protected void hookOnNext(Integer value) {
1905                        if (batch > 2) {
1906                            return;
1907                        }
1908                        count++;
1909                        if (count >= requestCount) {
1910                            count = 0;
1911                            batch++;
1912                            log.info("requesting next batch {}", batch);
1913                            request(requestCount);
1914                        }
1915
1916                    }
1917                });
1918    }
1919
1920    /**
1921     * ********************************************************************
1922     *  onBackpressureError - To identify if receiver is overrun by items as producer is producing more elements than can be processed.
1923     * ********************************************************************
1924     */
1925    @Test
1926    void fluxBackPressureOnErrorTest() {
1927        Flux<Integer> fluxNumber = Flux.range(1, 15).log();
1928
1929        //Fetches 2 at a time.
1930        fluxNumber
1931                .onBackpressureError()
1932                .subscribe(new BaseSubscriber<>() {
1933                    private int count = 0;
1934                    private final int requestCount = 2;
1935                    private int batch = 0;
1936
1937                    @Override
1938                    protected void hookOnSubscribe(Subscription subscription) {
1939                        request(requestCount);
1940                    }
1941
1942                    @Override
1943                    protected void hookOnError(Throwable throwable) {
1944                        log.error("Error thrown is: {}", throwable.getMessage());
1945                    }
1946
1947                    @Override
1948                    protected void hookOnNext(Integer value) {
1949                        if (batch > 2) {
1950                            return;
1951                        }
1952                        count++;
1953                        if (count >= requestCount) {
1954                            count = 0;
1955                            batch++;
1956                            log.info("requesting next batch {}", batch);
1957                            request(requestCount);
1958                        }
1959
1960                    }
1961                });
1962    }
1963
1964    /**
1965     * ********************************************************************
1966     *  backpressure - limit rate
1967     * ********************************************************************
1968     */
1969    @Test
1970    void fluxBackPressureLimitRateTest() {
1971        Flux<Integer> fluxNumber = Flux.range(1, 5).log().limitRate(3);
1972        StepVerifier.create(fluxNumber)
1973                .expectNext(1, 2, 3, 4, 5)
1974                .verifyComplete();
1975    }
1976
1977    /**
1978     * ********************************************************************
1979     *  hot flux
1980     * ********************************************************************
1981     */
1982    @Test
1983    @SneakyThrows
1984    void connectableFluxTest() {
1985        ConnectableFlux<Integer> connectableFlux = Flux.range(1, 10)
1986                .delayElements(Duration.ofSeconds(1))
1987                .publish();
1988        connectableFlux.connect();
1989
1990        TimeUnit.SECONDS.sleep(3);
1991        connectableFlux.subscribe(i -> {
1992            log.info("Sub1 Number: {}", i);
1993        });
1994
1995        TimeUnit.SECONDS.sleep(2);
1996        connectableFlux.subscribe(i -> {
1997            log.info("Sub2 Number: {}", i);
1998        });
1999
2000        ConnectableFlux<Integer> connectableFlux2 = Flux.range(1, 10)
2001                .delayElements(Duration.ofSeconds(1))
2002                .publish();
2003        StepVerifier.create(connectableFlux2)
2004                .then(connectableFlux2::connect)
2005                .thenConsumeWhile(i -> i <= 5)
2006                .expectNext(6, 7, 8, 9, 10)
2007                .expectComplete()
2008                .verify();
2009    }
2010
2011    /**
2012     * ********************************************************************
2013     *  hot flux - auto connect, min subscribers required before publisher emits
2014     * ********************************************************************
2015     */
2016    @Test
2017    @SneakyThrows
2018    void connectableAutoFluxTest() {
2019        //Hot Flux.
2020        Flux<Integer> connectableFlux = Flux.range(1, 5)
2021                .log()
2022                .delayElements(Duration.ofSeconds(1))
2023                .publish()
2024                .autoConnect(2);
2025
2026        //2 subscribers
2027        StepVerifier.create(connectableFlux)
2028                .then(connectableFlux::subscribe)
2029                .expectNext(1, 2, 3, 4, 5)
2030                .expectComplete()
2031                .verify();
2032    }
2033
2034    /**
2035     * ********************************************************************
2036     *  hot flux - ref count, if subscriber count goes down, publisher stops emitting
2037     * ********************************************************************
2038     */
2039    @Test
2040    @SneakyThrows
2041    void connectableRefCountTest() {
2042        //Hot Flux.
2043        Flux<Integer> connectableFlux = Flux.range(1, 15)
2044                .delayElements(Duration.ofSeconds(1))
2045                .doOnCancel(() -> {
2046                    log.info("Received cancel");
2047                })
2048                .publish()
2049                .refCount(2);
2050
2051        //Min 2 subscribers required
2052        Disposable subscribe1 = connectableFlux.subscribe(e -> log.info("Sub1: " + e));
2053        Disposable subscribe2 = connectableFlux.subscribe(e -> log.info("Sub2: " + e));
2054        TimeUnit.SECONDS.sleep(3);
2055        subscribe1.dispose();
2056        subscribe2.dispose();
2057        TimeUnit.SECONDS.sleep(5);
2058    }
2059
2060    /**
2061     * ********************************************************************
2062     *  defer
2063     * ********************************************************************
2064     */
2065    @Test
2066    @SneakyThrows
2067    void deferTest() {
2068        Mono<Long> just = Mono.just(System.currentTimeMillis());
2069        Mono<Long> deferJust = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
2070
2071        just.subscribe(l -> log.info("Time: {}", l));
2072        TimeUnit.SECONDS.sleep(2);
2073        just.subscribe(l -> log.info("Time: {}", l));
2074
2075        deferJust.subscribe(l -> log.info("Time: {}", l));
2076        TimeUnit.SECONDS.sleep(2);
2077        deferJust.subscribe(l -> log.info("Time: {}", l));
2078
2079    }
2080
2081    /**
2082     * ********************************************************************
2083     *  combineLatest - will change order based on time. Rarely used.
2084     * ********************************************************************
2085     */
2086    @Test
2087    void combineLatestTest() {
2088        Flux<String> flux1 = Flux.just("a", "b");
2089        Flux<String> flux2 = Flux.just("c", "d");
2090        Flux<String> flux3 = Flux.combineLatest(flux1, flux2, (s1, s2) -> s1 + s2)
2091                .log();
2092        StepVerifier.create(flux3)
2093                .expectSubscription()
2094                .expectNext("bc", "bd")
2095                .verifyComplete();
2096    }
2097
2098    /**
2099     * ********************************************************************
2100     *  onSchedulersHook - if you have to use thread local
2101     * ********************************************************************
2102     */
2103    @Test
2104    public void schedulerHookTest() {
2105        Runnable stringCallable = () -> getName();
2106        Schedulers.onScheduleHook("myHook", runnable -> {
2107            log.info("before scheduled runnable");
2108            return () -> {
2109                log.info("before execution");
2110                runnable.run();
2111                log.info("after execution");
2112            };
2113        });
2114        Mono.just("Hello world")
2115                .subscribeOn(Schedulers.single())
2116                .subscribe(System.out::println);
2117    }
2118
2119    /**
2120     * ********************************************************************
2121     *  checkpoint
2122     * ********************************************************************
2123     */
2124    @Test
2125    void checkpointTest() {
2126        Flux flux = Flux.just("Jack", "Jill", "Joe")
2127                .checkpoint("before uppercase")
2128                .map(e -> e.toUpperCase())
2129                .checkpoint("after uppercase")
2130                .filter(e -> e.length() > 3)
2131                .checkpoint("after filter")
2132                .map(e -> new RuntimeException("Custom error!"));
2133        flux.subscribe(System.out::println);
2134    }
2135
2136    /**
2137     * ********************************************************************
2138     *  checkpoint
2139     * ********************************************************************
2140     */
2141    @Test
2142    void debugAgentTest() {
2143        ReactorDebugAgent.init();
2144        ReactorDebugAgent.processExistingClasses();
2145        Flux flux = Flux.just("a")
2146                .concatWith(Flux.error(new IllegalArgumentException("My Error!")))
2147                .onErrorMap(ex -> {
2148                    log.error("Exception: {}", ex.getMessage());
2149                    return new IllegalStateException("New Error!");
2150                });
2151        flux.subscribe(System.out::println);
2152    }
2153
2154    /**
2155     * ********************************************************************
2156     *  Flux.generate - programmatically create flux, synchronous
2157     * ********************************************************************
2158     */
2159    @Test
2160    void fluxGenerateTest() {
2161        Flux<Integer> flux = Flux.generate(() -> 1, (state, sink) -> {
2162            sink.next(state * 2);
2163            if (state == 10) {
2164                sink.complete();
2165            }
2166            return state + 1;
2167        });
2168
2169        flux.subscribe(System.out::println);
2170
2171        StepVerifier.create(flux)
2172                .expectNextCount(10)
2173                .verifyComplete();
2174    }
2175
2176    /**
2177     * ********************************************************************
2178     *  Flux.create - programmatically create flux, asynchronous
2179     * ********************************************************************
2180     */
2181    @Test
2182    void fluxCreateTest() {
2183        List<String> names = Arrays.asList("jack", "jill");
2184        Flux<String> flux = Flux.create(sink -> {
2185            names.forEach(sink::next);
2186            sink.complete();
2187        });
2188
2189        StepVerifier.create(flux)
2190                .expectNextCount(2)
2191                .verifyComplete();
2192    }
2193
2194    private String getName() {
2195        return "John";
2196    }
2197
2198}
2199
2200class MyFeed {
2201
2202    List<MyListener> listeners = new ArrayList<>();
2203
2204    public void register(MyListener listener) {
2205        listeners.add(listener);
2206    }
2207
2208    public void sendMessage(String msg) {
2209        listeners.forEach(e -> {
2210            e.priceTick(msg);
2211        });
2212    }
2213}
2214
2215interface MyListener {
2216    void priceTick(String msg);
2217
2218    void error(Throwable error);
2219}

Reactor chaining samples

 1package com.demo.project83;
 2
 3import java.util.ArrayList;
 4import java.util.List;
 5
 6import lombok.Builder;
 7import lombok.Data;
 8import lombok.extern.slf4j.Slf4j;
 9import org.junit.jupiter.api.Test;
10import reactor.core.publisher.Mono;
11
12@Slf4j
13public class ReactorChainTest {
14
15    @Test
16    public void test() {
17        CompanyVO request = new CompanyVO();
18        request.setName("Twitter");
19        Mono.just(request)
20                .map(ReactorChainTest::convertToEntity)
21                .zipWith(ReactorChainTest.getNameSuffix(), ReactorChainTest::appendSuffix)
22                .flatMap(ReactorChainTest::addCompanyOwner)
23                .flatMap(ReactorChainTest::appendOrgIdToDepartment)
24                .flatMap(ReactorChainTest::save)
25                .subscribe(System.out::println);
26    }
27
28    private static Company appendSuffix(Company company, String nameSuffix) {
29        company.setName(company.name + " " + nameSuffix);
30        return company;
31    }
32
33    private static Mono<String> getOwnerName() {
34        return Mono.just("Jack");
35    }
36
37    private static Mono<String> getOrgId() {
38        return Mono.just("org1: ");
39    }
40
41    public static Company convertToEntity(CompanyVO companyVO) {
42        Company company = new Company();
43        company.setName(companyVO.getName().toUpperCase());
44        List<Department> departments = new ArrayList<>();
45        departments.add(Department.builder().name("department 1").build());
46        company.setDepartments(departments);
47        return company;
48    }
49
50    public static Mono<Company> save(Company company) {
51        log.info("Saved to db!");
52        return Mono.just(company);
53    }
54
55    public static Mono<Company> appendOrgIdToDepartment(Company company) {
56        return getOrgId().map(e -> {
57            company.getDepartments().forEach(d -> d.setName(e + " " + d.getName()));
58            return company;
59        });
60    }
61
62    public static Mono<String> getNameSuffix() {
63        return Mono.just(".Inc");
64    }
65
66    public static Mono<Company> addCompanyOwner(Company company) {
67        return getOwnerName().map(e -> {
68            company.setOwner(e);
69            return company;
70        });
71    }
72}
73
74@Data
75class CompanyVO {
76    String name;
77}
78
79@Data
80class Company {
81    String name;
82    List<Department> departments;
83    String owner;
84}
85
86@Data
87@Builder
88class Department {
89    String name;
90}

Reactor object samples

  1package com.demo.project83;
  2
  3import static org.junit.jupiter.api.Assertions.assertEquals;
  4
  5import java.util.ArrayList;
  6import java.util.List;
  7import java.util.stream.Collectors;
  8
  9import lombok.AllArgsConstructor;
 10import lombok.Builder;
 11import lombok.Data;
 12import lombok.NoArgsConstructor;
 13import lombok.RequiredArgsConstructor;
 14import lombok.extern.slf4j.Slf4j;
 15import org.junit.jupiter.api.BeforeEach;
 16import org.junit.jupiter.api.Test;
 17import reactor.core.publisher.Flux;
 18import reactor.core.publisher.GroupedFlux;
 19import reactor.core.publisher.Mono;
 20import reactor.test.StepVerifier;
 21import reactor.util.function.Tuple2;
 22import reactor.util.function.Tuples;
 23
 24@Slf4j
 25public class ReactorObjectTest {
 26
 27    Flux<ProjectDTO> fluxFromRequest;
 28    Flux<ProjectEntity> fluxFromDb;
 29
 30    @BeforeEach
 31    public void setup() {
 32        fluxFromRequest = Flux.just(
 33                ProjectDTO.builder().name("p3").build(),
 34                ProjectDTO.builder().name("p1").build(),
 35                ProjectDTO.builder().name("p5").build()
 36        );
 37
 38        fluxFromDb = Flux.just(
 39                ProjectEntity.builder().entityName("p5").build(),
 40                ProjectEntity.builder().entityName("p4").build(),
 41                ProjectEntity.builder().entityName("p1").build(),
 42                ProjectEntity.builder().entityName("p2").build()
 43        );
 44    }
 45
 46    @Test
 47    void fluxIntersectCommonApproach1() {
 48        Flux<ProjectEntity> commonFlux = fluxFromDb.filter(f -> {
 49            //Inefficient
 50            //Not for live stream or stream that can be subscribed only once.
 51            //toSteam converts your non-blocking asynchronous flux to a blocking stream API which will impact performance
 52            return fluxFromRequest.toStream().anyMatch(e -> e.getName().equals(f.getEntityName()));
 53        });
 54        commonFlux.subscribe(System.out::println);
 55        StepVerifier.create(commonFlux)
 56                .expectNext(ProjectEntity.builder().entityName("p5").build())
 57                .expectNext(ProjectEntity.builder().entityName("p1").build())
 58                .verifyComplete();
 59    }
 60
 61    @Test
 62    void fluxIntersectCommonApproach2() {
 63        Flux<ProjectEntity> commonFlux = fluxFromRequest
 64                .map(dto -> dto.getName())
 65                .collect(Collectors.toSet())
 66                .flatMapMany(set -> {
 67                    return fluxFromDb
 68                            //Filter out matching
 69                            //Limitation is that you can only compare 1 value collected in set.
 70                            .filter(t -> set.contains(t.getEntityName()));
 71                });
 72        commonFlux.subscribe(System.out::println);
 73        StepVerifier.create(commonFlux)
 74                .expectNext(ProjectEntity.builder().entityName("p5").build())
 75                .expectNext(ProjectEntity.builder().entityName("p1").build())
 76                .verifyComplete();
 77    }
 78
 79    @Test
 80    void fluxIntersectCommonApproach3() {
 81        Flux<ProjectEntity> commonFlux = fluxFromDb.join(fluxFromRequest, s -> Flux.never(), s -> Flux.never(), Tuples::of)
 82                //Filter out matching
 83                .filter(t -> t.getT1().getEntityName().equals(t.getT2().getName()))
 84                //Revert to single value
 85                .map(Tuple2::getT1)
 86                //Remove duplicates, if any
 87                .groupBy(f -> f)
 88                .map(GroupedFlux::key);
 89        commonFlux.subscribe(System.out::println);
 90        StepVerifier.create(commonFlux)
 91                .expectNext(ProjectEntity.builder().entityName("p1").build())
 92                .expectNext(ProjectEntity.builder().entityName("p5").build())
 93                .verifyComplete();
 94    }
 95
 96    @Test
 97    void postGetAllTest() {
 98        DbService service = new DbService();
 99        Flux<Post> postFlux = service.getAllPosts()
100                .flatMap(post -> {
101                    Mono<List<Comment>> commentMono = service.getCommentByPostId(post.id).collectList();
102                    return commentMono.map(comments -> Post.builder()
103                            .id(post.id)
104                            .message(post.message)
105                            .user(post.user)
106                            .comments(comments)
107                            .build());
108                });
109        StepVerifier.create(postFlux)
110                .assertNext(post -> {
111                    assertEquals("post 1", post.getMessage());
112                    assertEquals(1, post.getComments().size());
113                })
114                .assertNext(post -> {
115                    assertEquals("post 2", post.getMessage());
116                    assertEquals(2, post.getComments().size());
117                })
118                .verifyComplete();
119    }
120
121    @Test
122    void postGetByIdTest() {
123        DbService service = new DbService();
124        Mono<List<Comment>> commentMono = service.getCommentByPostId(1l).collectList();
125        Mono<Post> getByIdMono = service.getPostById(1l).zipWith(commentMono, (post, comments) -> {
126            return Post.builder()
127                    .id(post.id)
128                    .message(post.message)
129                    .user(post.user)
130                    .comments(comments)
131                    .build();
132        });
133        StepVerifier.create(getByIdMono)
134                .assertNext(post -> {
135                    assertEquals("post 1", post.getMessage());
136                    assertEquals(1, post.getComments().size());
137                })
138                .verifyComplete();
139    }
140
141}
142
143@Data
144@Builder
145class ProjectDTO {
146    String name;
147    String someExtraField;
148}
149
150@Data
151@Builder
152class ProjectEntity {
153    String entityName;
154    String dbField;
155}
156
157@Data
158@AllArgsConstructor
159@NoArgsConstructor
160@Builder
161class Post {
162    Long id;
163    String message;
164    String user;
165    @Builder.Default
166    List<Comment> comments = new ArrayList<>();
167}
168
169@Data
170@AllArgsConstructor
171@RequiredArgsConstructor
172@Builder
173class Comment {
174    Long id;
175    Long postId;
176    String comment;
177    String user;
178}
179
180class DbService {
181    Flux<Post> postFlux = Flux.fromIterable(List.of(
182            Post.builder().id(1l).message("post 1").user("jack").build(),
183            Post.builder().id(2l).message("post 2").user("jill").build()));
184
185    Flux<Comment> commentFlux = Flux.fromIterable(List.of(
186            Comment.builder().id(1l).postId(1l).comment("comment 1").user("adam").build(),
187            Comment.builder().id(2l).postId(2l).comment("comment 2").user("jane").build(),
188            Comment.builder().id(3l).postId(2l).comment("comment 3").user("raj").build()));
189
190    //Get all posts
191    Flux<Post> getAllPosts() {
192        return postFlux;
193    }
194
195    Mono<Post> getPostById(long id) {
196        //Take the first element in the flux.
197        return postFlux.filter(e -> e.id == id).next();
198    }
199
200    //Get all reviews associated with the post.
201    Flux<Comment> getCommentByPostId(long id) {
202        return commentFlux.filter(e -> e.postId == id);
203    }
204}

References

https://projectreactor.io/

comments powered by Disqus