Spring Reactor - Basics

Overview

Reactive programming examples on how to use spring reactor.

Github: https://github.com/gitorko/project83

Spring Reactor

Spring Reactor is a library for building non-blocking, reactive applications in Java. Reactor is used in Spring WebFlux, which is the reactive web framework included in Spring 5.

Features

  1. Reactive Streams: Reactor is based on the Reactive Streams specification, which defines a standard for asynchronous stream processing with non-blocking backpressure.
  2. Mono and Flux: Mono represents a single value or an empty result (similar to Optional). Flux represents a stream of 0 to N elements.
  3. Functional API: Reactor provides a rich set of operators that allow you to manipulate, transform, and compose reactive streams in a functional style.
  4. Non-blocking: Reactor is designed to work in a non-blocking manner, making it suitable for applications that need to handle a large number of concurrent I/O operations.
  5. Backpressure: Reactor supports backpressure, a mechanism to ensure that a producer does not overwhelm a consumer with too much data.

Code

   1package com.demo.project83;
   2
   3import static com.demo.project83.common.HelperUtil.getCustomer;
   4import static com.demo.project83.common.HelperUtil.getCustomers;
   5import static com.demo.project83.common.HelperUtil.getName;
   6import static org.assertj.core.api.Assertions.assertThat;
   7import static org.junit.jupiter.api.Assertions.assertEquals;
   8
   9import java.nio.file.Files;
  10import java.nio.file.Path;
  11import java.nio.file.Paths;
  12import java.time.Duration;
  13import java.time.LocalTime;
  14import java.util.Arrays;
  15import java.util.Collection;
  16import java.util.HashMap;
  17import java.util.List;
  18import java.util.Map;
  19import java.util.Optional;
  20import java.util.UUID;
  21import java.util.concurrent.Callable;
  22import java.util.concurrent.CountDownLatch;
  23import java.util.concurrent.FutureTask;
  24import java.util.concurrent.TimeUnit;
  25import java.util.concurrent.atomic.AtomicLong;
  26import java.util.function.Consumer;
  27import java.util.function.Function;
  28import java.util.function.Supplier;
  29import java.util.stream.Collectors;
  30import java.util.stream.IntStream;
  31import java.util.stream.Stream;
  32
  33import com.demo.project83.common.CompanyVO;
  34import com.demo.project83.common.Customer;
  35import com.demo.project83.common.Employee;
  36import com.demo.project83.common.HelperUtil;
  37import com.demo.project83.common.MyFeed;
  38import com.demo.project83.common.MyListener;
  39import lombok.SneakyThrows;
  40import lombok.extern.slf4j.Slf4j;
  41import org.junit.jupiter.api.Assertions;
  42import org.junit.jupiter.api.BeforeAll;
  43import org.junit.jupiter.api.Test;
  44import org.reactivestreams.Subscription;
  45import org.springframework.data.domain.PageImpl;
  46import org.springframework.data.domain.PageRequest;
  47import reactor.blockhound.BlockHound;
  48import reactor.blockhound.BlockingOperationError;
  49import reactor.core.Disposable;
  50import reactor.core.Exceptions;
  51import reactor.core.publisher.BaseSubscriber;
  52import reactor.core.publisher.ConnectableFlux;
  53import reactor.core.publisher.Flux;
  54import reactor.core.publisher.FluxSink;
  55import reactor.core.publisher.GroupedFlux;
  56import reactor.core.publisher.Mono;
  57import reactor.core.publisher.ParallelFlux;
  58import reactor.core.scheduler.Schedulers;
  59import reactor.test.StepVerifier;
  60import reactor.test.scheduler.VirtualTimeScheduler;
  61import reactor.tools.agent.ReactorDebugAgent;
  62import reactor.util.function.Tuple2;
  63import reactor.util.function.Tuples;
  64import reactor.util.retry.Retry;
  65import reactor.util.retry.RetryBackoffSpec;
  66
  67/**
  68 * Reactive Streams Specification
  69 * 1. Asynchronous
  70 * 2. Non-Blocking
  71 * 3. Backpressure
  72 *
  73 * Publisher (Mono/Flux)
  74 *   - subscribe (data source, db, remote service)
  75 * Subscriber
  76 *   - onSubscribe
  77 *   - onNext
  78 *   - onError
  79 *   - onComplete
  80 * Subscription
  81 *   - request
  82 *   - cancel
  83 * Processor - Publisher + Subscriber
  84 *
  85 * Spring reactor is a Push + Pull data flow model
  86 *
  87 * Subscribers request for data. Publishers provide data
  88 * subscribers (downstream) and publishers (upstream)
  89 */
  90@Slf4j
  91public class ReactorTest {
  92
  93    @BeforeAll
  94    public static void init() {
  95        BlockHound.install();
  96    }
  97
  98    /**
  99     * ********************************************************************
 100     *  Mono
 101     * ********************************************************************
 102     */
 103
 104    @Test
 105    void test_stepVerifier() {
 106        Mono.just("jack")
 107                .as(StepVerifier::create)
 108                .expectNext("jack")
 109                .verifyComplete();
 110    }
 111
 112    @Test
 113    void test_mono() {
 114        //justOrEmpty
 115        Mono<String> mono = Mono.just("jack");
 116        mono.subscribe(System.out::println);
 117        StepVerifier.create(mono)
 118                .expectNext("jack")
 119                .verifyComplete();
 120    }
 121
 122    @Test
 123    void test_justOrEmpty() {
 124        //justOrEmpty
 125        Mono<String> mono = Mono.justOrEmpty("jack");
 126        mono.subscribe(System.out::println);
 127        StepVerifier.create(mono)
 128                .expectNext("jack")
 129                .verifyComplete();
 130    }
 131
 132    @Test
 133    void test_justOrEmpty_null() {
 134        //Note: Reactive Streams do not accept null values
 135        Mono<String> mono = Mono.justOrEmpty(null);
 136        mono.subscribe(System.out::println);
 137        StepVerifier.create(mono)
 138                .expectNextCount(0)
 139                .verifyComplete();
 140    }
 141
 142    /**
 143     * ********************************************************************
 144     *  log
 145     *  request(unbounded)
 146     *  default subscribe requests unbounded, all elements are requested
 147     * ********************************************************************
 148     */
 149    @Test
 150    void test_log() {
 151        //Note: Use log to look at transitions.
 152        Mono<String> mono = Mono.just("jack")
 153                .log();
 154        mono.subscribe(s -> {
 155            log.info("Got: {}", s);
 156        });
 157        StepVerifier.create(mono)
 158                .expectNext("jack")
 159                .verifyComplete();
 160    }
 161
 162    /**
 163     * ********************************************************************
 164     *  flux
 165     * ********************************************************************
 166     */
 167    @Test
 168    void test_flux() {
 169        Flux flux = Flux.just("jack", "raj");
 170        flux.subscribe(System.out::println);
 171        StepVerifier.create(flux)
 172                .expectNext("jack", "raj")
 173                .verifyComplete();
 174    }
 175
 176    /**
 177     * ********************************************************************
 178     *  Avoid blocking operations that hold thread
 179     * ********************************************************************
 180     */
 181    @Test
 182    void test_delayElements() {
 183        Flux flux = Flux.just("jack", "raj")
 184                .map(e -> {
 185                    log.info("Received: {}", e);
 186                    //Bad idea to do Thread.sleep or any blocking call.
 187                    //Use delayElements.
 188                    return e;
 189                }).delayElements(Duration.ofSeconds(1));
 190        flux.subscribe(System.out::println);
 191        //Test will wait for 1 second
 192        StepVerifier.create(flux)
 193                .expectNext("jack", "raj")
 194                .verifyComplete();
 195    }
 196
 197    @Test
 198    void test_delayElements_virtualTime() {
 199        VirtualTimeScheduler.getOrSet();
 200        Flux flux = Flux.just("jack", "raj")
 201                .map(e -> {
 202                    log.info("Received: {}", e);
 203                    //Bad idea to do Thread.sleep or any blocking call.
 204                    //Use delayElements.
 205                    return e;
 206                }).delayElements(Duration.ofDays(1));
 207        flux.subscribe(System.out::println);
 208        //Use virtual time as test cant wait for 1 day
 209        StepVerifier.withVirtualTime(() -> flux)
 210                .thenAwait(Duration.ofDays(2))
 211                .expectNext("jack", "raj")
 212                .verifyComplete();
 213    }
 214
 215    /**
 216     * ********************************************************************
 217     *  block
 218     *  Never use .block() as it blocks the thread.
 219     *  Can we used in tests but not in main code.
 220     * ********************************************************************
 221     */
 222    @Test
 223    void test_block() {
 224        String name = Mono.just("jack")
 225                .block();
 226        System.out.println(name);
 227    }
 228
 229    /**
 230     * ********************************************************************
 231     *  flux from array, list, stream
 232     * ********************************************************************
 233     */
 234    @Test
 235    void test_fromArray() {
 236        Integer[] arr = {1, 2, 3, 4, 5};
 237        Flux<Integer> flux = Flux.fromArray(arr);
 238        flux.subscribe(System.out::println);
 239        StepVerifier.create(flux)
 240                .expectNext(1, 2, 3, 4, 5)
 241                .verifyComplete();
 242    }
 243
 244    @Test
 245    void test_fromIterable() {
 246        Flux<String> flux = Flux.fromIterable(List.of("jack", "raj"));
 247        StepVerifier.create(flux)
 248                .expectNext("jack", "raj")
 249                .verifyComplete();
 250    }
 251
 252    @Test
 253    void test_fromStream() {
 254        Flux<Integer> flux = Flux.fromStream(() -> List.of(1, 2, 3, 4, 5).stream());
 255        StepVerifier.create(flux)
 256                .expectNext(1, 2, 3, 4, 5)
 257                .verifyComplete();
 258    }
 259
 260    /**
 261     * ********************************************************************
 262     *  flux range
 263     * ********************************************************************
 264     */
 265    @Test
 266    public void test_range() {
 267        Flux<Integer> flux = Flux.range(1, 5);
 268        flux.subscribe(n -> {
 269            log.info("number: {}", n);
 270        });
 271        StepVerifier.create(flux)
 272                .expectNext(1, 2, 3, 4, 5)
 273                .verifyComplete();
 274    }
 275
 276    /**
 277     * ********************************************************************
 278     *  map - synchronous by nature
 279     * ********************************************************************
 280     */
 281    @Test
 282    public void test_map() {
 283        Flux<String> flux1 = Flux.just("jack", "raj")
 284                .map(String::toUpperCase);
 285        StepVerifier
 286                .create(flux1)
 287                .expectNext("JACK", "RAJ")
 288                .verifyComplete();
 289
 290        Flux<Integer> flux2 = Flux.range(3, 2)
 291                .map(i -> i + 100);
 292        flux2.subscribe(System.out::println);
 293        StepVerifier.create(flux2)
 294                .expectNext(103, 104)
 295                .verifyComplete();
 296    }
 297
 298    /**
 299     * ********************************************************************
 300     *  flatMap - transform object 1-1 or 1-N in asynchronous fashion, returns back Mono/Flux. Use when there is delay/IO involved.
 301     *  map - transform an object 1-1 in fixed time in synchronous fashion. Use when there is no delay/IO involved.
 302     *
 303     * flatMap - processing is concurrent
 304     * so all threads can run at same time not guarantee of being sequential.
 305     * ********************************************************************
 306     */
 307    @Test
 308    void test_flatMap() {
 309        Flux flux1 = Flux.just("jack", "raj")
 310                .flatMap(HelperUtil::capitalizeReactive);
 311        flux1.subscribe(System.out::println);
 312        //No guarantee of order, jack can come first or raj can come first.
 313        StepVerifier.create(flux1)
 314                .expectSubscription()
 315                .expectNextCount(2)
 316                .verifyComplete();
 317
 318        //capitalize will happen in blocking fashion. If this function takes long or does I/O then it will be blocking
 319        //Use map only when there is no IO involved in the function
 320        Flux flux2 = Flux.just("jack", "raj")
 321                .map(HelperUtil::capitalize);
 322        flux2.subscribe(System.out::println);
 323        StepVerifier.create(flux2)
 324                .expectNext("JACK")
 325                .expectNext("RAJ")
 326                .verifyComplete();
 327
 328        Flux flux3 = Flux.fromIterable(getCustomers())
 329                .flatMap(HelperUtil::capitalizeCustomerName);
 330        flux1.subscribe(System.out::println);
 331        //No guarantee of order
 332        StepVerifier.create(flux3)
 333                .expectNextCount(5)
 334                .verifyComplete();
 335    }
 336
 337    /**
 338     * Here flatMap will run one after other as there is just 1 thread allocated for it.
 339     * You can also look at concatMap to do the same
 340     */
 341    @Test
 342    void test_flatMap_nonConcurrent() {
 343        Flux<Integer> flux = Flux.range(1, 10)
 344                .map(i -> i)
 345                .flatMap(i -> {
 346                    return Mono.just(i);
 347                }, 1);
 348        flux.subscribe(System.out::println);
 349        StepVerifier.create(flux)
 350                .expectSubscription()
 351                .expectNextCount(10)
 352                .verifyComplete();
 353    }
 354
 355    /**
 356     * ********************************************************************
 357     *  flatMap - object modification
 358     * ********************************************************************
 359     */
 360    @Test
 361    void test_objectModification() {
 362        //Modification of object in chain - done via flatMap
 363        //Ideally create a new object instead of modifying the existing object.
 364        Mono<Customer> mono = Mono.just(getCustomer())
 365                .flatMap(e -> {
 366                    e.setCity("paris");
 367                    return Mono.just(e);
 368                });
 369        StepVerifier.create(mono)
 370                .assertNext(e -> {
 371                    assertThat(e.getCity()).isEqualTo("paris");
 372                })
 373                .verifyComplete();
 374    }
 375
 376    @Test
 377    void test_objectModification_zipWith() {
 378        //Modification of object in chain - done via zipWith
 379        //The 2nd argument for zipWith is a combinator function that determines how the 2 mono are zipped
 380        Mono<Customer> mono = Mono.just(getCustomer())
 381                .zipWith(Mono.just("paris"), HelperUtil::changeCity);
 382        StepVerifier.create(mono)
 383                .assertNext(e -> {
 384                    assertThat(e.getCity()).isEqualTo("paris");
 385                })
 386                .verifyComplete();
 387    }
 388
 389    /**
 390     * ********************************************************************
 391     *  distinct
 392     * ********************************************************************
 393     */
 394    @Test
 395    void test_distinct_flux() {
 396        Flux<String> flux = Flux.fromIterable(List.of("Jack", "Joe", "Jack", "Jill", "jack"))
 397                .map(String::toUpperCase)
 398                .distinct();
 399        flux.subscribe(System.out::println);
 400        StepVerifier.create(flux)
 401                .expectNext("JACK", "JOE", "JILL")
 402                .verifyComplete();
 403    }
 404
 405    /**
 406     * ********************************************************************
 407     *  concatMap - works only on flux, same as flatMap but order is preserved, concatMap takes more time but ordering is preserved.
 408     *  flatMap - Takes less time but ordering is lost.
 409     * ********************************************************************
 410     */
 411    @Test
 412    @SneakyThrows
 413    void test_concatMap() {
 414        Flux flux1 = Flux.just("jack", "raj")
 415                .concatMap(HelperUtil::capitalizeReactive);
 416        flux1.subscribe(System.out::println);
 417        //Guarantee of order, jack will come first then raj.
 418        StepVerifier.create(flux1)
 419                .expectSubscription()
 420                .expectNext("JACK", "RAJ")
 421                .verifyComplete();
 422
 423        Flux flux2 = Flux.fromIterable(getCustomers())
 424                .concatMap(HelperUtil::capitalizeCustomerName);
 425        flux1.subscribe(System.out::println);
 426        StepVerifier.create(flux2)
 427                .expectSubscription()
 428                .expectNextCount(5)
 429                .verifyComplete();
 430    }
 431
 432    /**
 433     * ********************************************************************
 434     *  flatMapMany - similar to flatMap but flattens the flux
 435     * ********************************************************************
 436     */
 437    @Test
 438    void test_flatMapMany() {
 439        Flux<String> flux1 = Mono.just("the quick brown fox jumps over the lazy dog")
 440                .flatMapMany(e -> Flux.fromArray(e.toUpperCase().split("")))
 441                .distinct()
 442                .sort();
 443        flux1.subscribe(System.out::println);
 444        //26 letters in the alphabet
 445        StepVerifier.create(flux1)
 446                .expectNextCount(26)
 447                .expectComplete();
 448
 449        Flux<Integer> flux2 = Mono.just(List.of(1, 2, 3))
 450                .flatMapMany(it -> Flux.fromIterable(it));
 451        flux2.subscribe(System.out::println);
 452        StepVerifier
 453                .create(flux2)
 454                .expectNext(1, 2, 3)
 455                .verifyComplete();
 456
 457        Flux flux3 = Mono.just(getCustomers())
 458                .flatMapMany(e -> HelperUtil.capitalizeCustomerName(e));
 459        flux1.subscribe(System.out::println);
 460        StepVerifier.create(flux3)
 461                .expectSubscription()
 462                .expectNextCount(5)
 463                .verifyComplete();
 464    }
 465
 466    /**
 467     * ********************************************************************
 468     *  flatMapIterable - convert mono of list to flux
 469     * ********************************************************************
 470     */
 471    @Test
 472    void test_flatMapIterable() {
 473        Mono<List<Integer>> mono = Mono.just(Arrays.asList(1, 2, 3));
 474        Flux<Integer> flux = mono.flatMapIterable(list -> list);
 475        flux.subscribe(System.out::println);
 476        StepVerifier
 477                .create(flux)
 478                .expectNext(1, 2, 3)
 479                .verifyComplete();
 480    }
 481
 482    /**
 483     * ********************************************************************
 484     *  flatMapIterable - convert mono of map to flux
 485     * ********************************************************************
 486     */
 487    @Test
 488    void test_flatMapIterable2() {
 489        Mono<Map<String, String>> mono = Mono.just(Map.of("foo", "bar"));
 490        Flux<Map.Entry<String, String>> flux = mono.flatMapIterable(list -> list.entrySet());
 491        flux.subscribe(System.out::println);
 492        StepVerifier
 493                .create(flux)
 494                .expectNext(Map.entry("foo", "bar"))
 495                .verifyComplete();
 496    }
 497
 498    /**
 499     * ********************************************************************
 500     *  transform - accepts a Function functional interface. Used when similar transform is used in many places
 501     *  input is flux/mono
 502     *  output is flux/mono
 503     *  takes a flux/mono and returns a flux/mono
 504     * ********************************************************************
 505     */
 506    @Test
 507    void test_transform() {
 508        //Function defines input and output
 509        Function<Flux<String>, Flux<String>> upperCaseFunction = name -> name.map(String::toUpperCase);
 510        Flux<String> flux = Flux.fromIterable(List.of("Jack", "Joe"))
 511                .transform(upperCaseFunction);
 512        flux.subscribe(System.out::println);
 513        StepVerifier
 514                .create(flux)
 515                .expectNext("JACK", "JOE")
 516                .verifyComplete();
 517    }
 518
 519    /**
 520     * ********************************************************************
 521     *  switchIfEmpty - similar to defaultIfEmpty but return flux/mono
 522     *  defaultIfEmpty - return a fixed value.
 523     * ********************************************************************
 524     */
 525    @Test
 526    @SneakyThrows
 527    void test_defaultIfEmpty() {
 528        Flux<Object> flux1 = Flux.empty()
 529                .defaultIfEmpty("empty")
 530                .log();
 531        StepVerifier.create(flux1)
 532                .expectNext("empty")
 533                .expectComplete()
 534                .verify();
 535
 536        Flux<Object> flux2 = Flux.empty()
 537                .switchIfEmpty(Flux.just("empty"))
 538                .log();
 539        StepVerifier.create(flux2)
 540                .expectNext("empty")
 541                .expectComplete()
 542                .verify();
 543    }
 544
 545    @Test
 546    void test_optional() {
 547        var mono1 = getHello(true)
 548                .defaultIfEmpty("NONE");
 549        StepVerifier.create(mono1)
 550                .expectNext("HELLO")
 551                .verifyComplete();
 552
 553        var mono2 = getHello(false)
 554                .defaultIfEmpty("NONE");
 555        StepVerifier.create(mono2)
 556                .expectNext("NONE")
 557                .verifyComplete();
 558
 559        var mono3 = getOptionalHello(true)
 560                .filter(Optional::isPresent)
 561                .map(Optional::get);
 562        StepVerifier.create(mono3)
 563                .expectNext("HELLO")
 564                .verifyComplete();
 565
 566        var mono4 = getOptionalHello(false)
 567                .filter(Optional::isPresent)
 568                .map(Optional::get);
 569        StepVerifier.create(mono4)
 570                .expectNextCount(0)
 571                .verifyComplete();
 572    }
 573
 574    private Mono<String> getHello(Boolean flag) {
 575        if (flag) {
 576            return Mono.just("HELLO");
 577        } else {
 578            return Mono.empty();
 579        }
 580    }
 581
 582    private Mono<Optional<String>> getOptionalHello(Boolean flag) {
 583        if (flag) {
 584            return Mono.just(Optional.of("HELLO"));
 585        } else {
 586            return Mono.just(Optional.empty());
 587        }
 588    }
 589
 590    /**
 591     * ********************************************************************
 592     *  switchIfEmpty with Optional
 593     * ********************************************************************
 594     */
 595    @Test
 596    public void test_switchIfEmpty() {
 597        Mono<Optional<Customer>> c1 = Mono.justOrEmpty(Optional.empty());
 598        Mono<Optional<Customer>> c2 = Mono.just(Optional.of(getCustomer()));
 599
 600        Mono<Optional<Customer>> mono1 = c1
 601                .switchIfEmpty(Mono.just(Optional.of(new Customer())));
 602        StepVerifier.create(mono1)
 603                .expectNextCount(1)
 604                .expectComplete()
 605                .verify();
 606
 607        Mono<Optional<Customer>> mono2 = c2
 608                .switchIfEmpty(Mono.just(Optional.empty()));
 609        StepVerifier.create(mono2)
 610                .expectNextCount(1)
 611                .expectComplete()
 612                .verify();
 613    }
 614
 615    /**
 616     * ********************************************************************
 617     *  switchIfEmpty - Used as if-else block
 618     * ********************************************************************
 619     */
 620    @Test
 621    void test_switchIfEmpty_if_else() {
 622        final Customer customer = getCustomer();
 623        //No need to use Mono.defer on the switchIfEmpty
 624        Mono<String> mono = Mono.just(customer)
 625                .flatMap(e -> {
 626                    if (customer.getCity().equals("bangalore")) {
 627                        return Mono.just("Timezone:IST");
 628                    } else {
 629                        return Mono.empty();
 630                    }
 631                })
 632                .switchIfEmpty(Mono.just("Timezone:GMT"));
 633
 634        StepVerifier.create(mono)
 635                .expectNext("Timezone:GMT")
 636                .verifyComplete();
 637    }
 638
 639    /**
 640     * ********************************************************************
 641     *  filterWhen - returns Mono
 642     *  filter - returns object
 643     * ********************************************************************
 644     */
 645    @Test
 646    void test_filterWhen() {
 647        Flux<String> flux1 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").cache();
 648        Flux<String> flux2 = flux1.filterWhen(f -> Mono.just(f.equals("apple")));
 649        flux2.subscribe(System.out::println);
 650        StepVerifier.create(flux2)
 651                .expectNext("apple")
 652                .verifyComplete();
 653    }
 654
 655    /**
 656     * ********************************************************************
 657     *  filterWhen - returns Mono
 658     *  filter - returns object
 659     * ********************************************************************
 660     */
 661    @Test
 662    void test_filter() {
 663        Flux<String> flux1 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").cache();
 664        Flux<String> flux2 = flux1.filter(f -> f.equals("apple"));
 665        flux2.subscribe(System.out::println);
 666        StepVerifier.create(flux2)
 667                .expectNext("apple")
 668                .verifyComplete();
 669
 670        //Get even numbers
 671        Flux flux = Flux.just(1, 2, 3, 4, 5)
 672                .filter(i -> i % 2 == 0);
 673        flux.subscribe(System.out::println);
 674        StepVerifier.create(flux)
 675                .expectNext(2, 4)
 676                .verifyComplete();
 677    }
 678
 679    /**
 680     * ********************************************************************
 681     *  intersect (common) - compare 2 flux for common elements
 682     * ********************************************************************
 683     */
 684    @Test
 685    void test_intersect_inefficient() {
 686        Flux<String> flux1 = Flux.just("apple", "orange", "banana");
 687        //Without cache on flux2 it will subscribe many times.
 688        Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").cache();
 689        Flux<String> commonFlux = flux1.filter(f -> {
 690            //toStream will block so should be avoided.
 691            //Inefficient - toStream will block so should be avoided.
 692            //Not for live stream or stream that can be subscribed only once.
 693            return flux2.toStream().anyMatch(e -> e.equals(f));
 694        });
 695        commonFlux.subscribe(System.out::println);
 696        StepVerifier.create(commonFlux)
 697                .expectNext("apple", "orange")
 698                .verifyComplete();
 699    }
 700
 701    /**
 702     * ********************************************************************
 703     *  intersect (common) - compare 2 flux for common elements
 704     * ********************************************************************
 705     */
 706    @Test
 707    void test_intersect_efficient_1() {
 708        Flux<String> flux1 = Flux.just("apple", "orange", "banana");
 709        Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
 710        Flux<String> commonFlux = flux1
 711                .collect(Collectors.toSet())
 712                .flatMapMany(set -> {
 713                    return flux2
 714                            //Filter out matching
 715                            //Limitation is that you can only compare 1 value collected in set.
 716                            .filter(t -> set.contains(t));
 717                });
 718        commonFlux.subscribe(System.out::println);
 719        StepVerifier.create(commonFlux)
 720                .expectNext("apple", "orange")
 721                .verifyComplete();
 722    }
 723
 724    /**
 725     * ********************************************************************
 726     *  intersect (common) - using join operator
 727     * ********************************************************************
 728     */
 729    @Test
 730    void test_intersect_efficient_2() {
 731        Flux<String> flux1 = Flux.just("apple", "orange", "banana");
 732        Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
 733
 734        Flux<String> commonFlux = flux1.join(flux2, s -> Flux.never(), s -> Flux.never(), Tuples::of)
 735                //Filter out matching
 736                .filter(t -> t.getT1().equals(t.getT2()))
 737                //Revert to single value
 738                .map(Tuple2::getT1)
 739                //Remove duplicates, if any
 740                .groupBy(f -> f)
 741                .map(GroupedFlux::key);
 742        commonFlux.subscribe(System.out::println);
 743        StepVerifier.create(commonFlux)
 744                .expectNext("apple", "orange")
 745                .verifyComplete();
 746    }
 747
 748    @Test
 749    void test_intersect_efficient_3() {
 750        Flux<String> flux1 = Flux.just("apple", "orange", "banana");
 751        Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
 752
 753        Mono<List<String>> monoList1 = flux1.collectList();
 754        Mono<List<String>> monoList2 = flux2.collectList();
 755
 756        Flux<String> commonFlux = Mono.zip(monoList1, monoList2)
 757                .map(tuple -> {
 758                    List<String> list1 = tuple.getT1();
 759                    List<String> list2 = tuple.getT2();
 760                    list1.retainAll(list2);
 761                    return list1;
 762                }).flatMapIterable(e -> e);
 763
 764        commonFlux.subscribe(System.out::println);
 765        StepVerifier.create(commonFlux)
 766                .expectNext("apple", "orange")
 767                .verifyComplete();
 768    }
 769
 770    @Test
 771    void test_intersect_efficient_4() {
 772        Flux<String> flux1 = Flux.just("apple", "orange", "banana");
 773        Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
 774
 775        Flux<String> commonFlux = flux2.filterWhen(element ->
 776                flux1.any(e -> e.equals(element))
 777        );
 778
 779        commonFlux.subscribe(System.out::println);
 780        StepVerifier.create(commonFlux)
 781                .expectNext("apple", "orange")
 782                .verifyComplete();
 783    }
 784
 785    @Test
 786    void test_intersect_efficient_5() {
 787        Flux<String> flux1 = Flux.just("apple", "orange", "banana");
 788        Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
 789
 790        Flux<String> commonFlux = flux2.concatMap(element2 ->
 791                flux1.filter(element1 -> element1.equals(element2)).take(1)
 792        );
 793        commonFlux.subscribe(System.out::println);
 794        StepVerifier.create(commonFlux)
 795                .expectNext("apple", "orange")
 796                .verifyComplete();
 797    }
 798
 799    @Test
 800    void test_intersect_efficient_6() {
 801        Flux<String> flux1 = Flux.just("apple", "orange", "banana");
 802        Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
 803
 804        Flux<String> commonFlux = flux2.flatMap(element2 ->
 805                flux1.flatMap(element1 ->
 806                        element1.equals(element2) ? Flux.just(element1) : Flux.empty()
 807                ).take(1)
 808        );
 809        commonFlux.subscribe(System.out::println);
 810        StepVerifier.create(commonFlux)
 811                .expectNext("apple", "orange")
 812                .verifyComplete();
 813    }
 814
 815    /**
 816     * ********************************************************************
 817     *  startWith - add new element to flux.
 818     * ********************************************************************
 819     */
 820    @Test
 821    public void test_startWith() {
 822        Flux<Integer> flux1 = Flux.range(1, 3);
 823        Flux<Integer> flux2 = flux1.startWith(0);
 824        StepVerifier.create(flux2)
 825                .expectNext(0, 1, 2, 3)
 826                .verifyComplete();
 827    }
 828
 829    /**
 830     * ********************************************************************
 831     *  index
 832     * ********************************************************************
 833     */
 834    @Test
 835    void test_index() {
 836        //append a number to each element.
 837        Flux<Tuple2<Long, String>> flux = Flux
 838                .just("apple", "banana", "orange")
 839                .index();
 840        StepVerifier.create(flux)
 841                .expectNext(Tuples.of(0L, "apple"))
 842                .expectNext(Tuples.of(1L, "banana"))
 843                .expectNext(Tuples.of(2L, "orange"))
 844                .verifyComplete();
 845    }
 846
 847    /**
 848     * ********************************************************************
 849     * takeWhile
 850     * ********************************************************************
 851     */
 852    @Test
 853    void test_takeWhile() {
 854        Flux<Integer> flux = Flux.range(1, 10);
 855        Flux<Integer> takeWhile = flux.takeWhile(i -> i <= 5);
 856        StepVerifier
 857                .create(takeWhile)
 858                .expectNext(1, 2, 3, 4, 5)
 859                .verifyComplete();
 860    }
 861
 862    /**
 863     * ********************************************************************
 864     * skipWhile
 865     * ********************************************************************
 866     */
 867    @Test
 868    void test_skipWhile() {
 869        Flux<Integer> flux = Flux.range(1, 10);
 870        Flux<Integer> skipWhile = flux.skipWhile(i -> i <= 5);
 871        StepVerifier
 872                .create(skipWhile)
 873                .expectNext(6, 7, 8, 9, 10)
 874                .verifyComplete();
 875    }
 876
 877    /**
 878     * ********************************************************************
 879     *  collectList - flux to mono of list
 880     * ********************************************************************
 881     */
 882    @Test
 883    void test_collectList() {
 884        Mono<List<Integer>> flux = Flux
 885                .just(1, 2, 3)
 886                .collectList();
 887        StepVerifier.create(flux)
 888                .expectNext(Arrays.asList(1, 2, 3))
 889                .verifyComplete();
 890    }
 891
 892    /**
 893     * ********************************************************************
 894     * collectSortedList- flux to mono of list
 895     * ********************************************************************
 896     */
 897    @Test
 898    void test_collectSortedList() {
 899        Mono<List<Integer>> listMono2 = Flux
 900                .just(5, 2, 4, 1, 3)
 901                .collectSortedList();
 902        StepVerifier.create(listMono2)
 903                .expectNext(Arrays.asList(1, 2, 3, 4, 5))
 904                .verifyComplete();
 905    }
 906
 907    /**
 908     * ********************************************************************
 909     *  collectMap
 910     * ********************************************************************
 911     */
 912    @Test
 913    void test_collectMap() {
 914        Mono<Map<Object, Object>> flux = Flux.just("yellow:banana", "red:apple")
 915                .map(item -> item.split(":"))
 916                .collectMap(item -> item[0], item -> item[1]);
 917
 918        Map<Object, Object> map = new HashMap<>();
 919        flux.subscribe(map::putAll);
 920        map.forEach((key, value) -> System.out.println(key + " -> " + value));
 921
 922        StepVerifier.create(flux)
 923                .expectNext(Map.of("yellow", "banana", "red", "apple"))
 924                .verifyComplete();
 925    }
 926
 927    /**
 928     * ********************************************************************
 929     *  collectMultimap
 930     * ********************************************************************
 931     */
 932    @Test
 933    void test_collectMultimap() {
 934        Mono<Map<String, Collection<String>>> flux = Flux.just("yellow:banana", "red:grapes", "red:apple", "yellow:pineapple")
 935                .map(item -> item.split(":"))
 936                .collectMultimap(
 937                        item -> item[0],
 938                        item -> item[1]);
 939        Map<Object, Collection<String>> map = new HashMap<>();
 940        flux.subscribe(map::putAll);
 941        map.forEach((key, value) -> System.out.println(key + " -> " + value));
 942
 943        StepVerifier.create(flux)
 944                .expectNext(Map.of("red", List.of("grapes", "apple"), "yellow", List.of("banana", "pineapple")))
 945                .verifyComplete();
 946    }
 947
 948    /**
 949     * ********************************************************************
 950     * concat - subscribes to publishers in sequence, order guaranteed, static function
 951     * concatWith - subscribes to publishers in sequence, order guaranteed, instance function
 952     * ********************************************************************
 953     */
 954    @Test
 955    @SneakyThrows
 956    void test_concat() {
 957        Flux<String> flux1 = Flux.just("a", "b");
 958        Flux<String> flux2 = Flux.just("c", "d");
 959        Flux<String> flux3 = Flux.concat(flux1, flux2);
 960
 961        StepVerifier.create(flux3)
 962                .expectSubscription()
 963                .expectNext("a", "b", "c", "d")
 964                .verifyComplete();
 965
 966        Flux<String> flux4 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
 967        Flux<String> flux5 = Flux.just("c", "d");
 968        //Lazy will wait till first flux finishes.
 969        Flux<String> flux6 = Flux.concat(flux1, flux2).log();
 970
 971        StepVerifier.create(flux6)
 972                .expectSubscription()
 973                .expectNext("a", "b", "c", "d")
 974                .verifyComplete();
 975    }
 976
 977    /**
 978     * ********************************************************************
 979     * concat - subscribes to publishers in sequence, order guaranteed, static function
 980     * concatWith - subscribes to publishers in sequence, order guaranteed, instance function
 981     * ********************************************************************
 982     */
 983    @Test
 984    @SneakyThrows
 985    void test_concatWith() {
 986        Flux<String> flux1 = Flux.just("a", "b");
 987        Flux<String> flux2 = Flux.just("c", "d");
 988        Flux<String> flux3 = flux1.concatWith(flux2);
 989        StepVerifier.create(flux3)
 990                .expectSubscription()
 991                .expectNext("a", "b", "c", "d")
 992                .verifyComplete();
 993
 994        Mono<String> aFlux = Mono.just("a");
 995        Mono<String> bFlux = Mono.just("b");
 996        Flux<String> stringFlux = aFlux.concatWith(bFlux);
 997        stringFlux.subscribe(System.out::println);
 998        StepVerifier.create(stringFlux)
 999                .expectNext("a", "b")
1000                .verifyComplete();
1001    }
1002
1003    /**
1004     * ********************************************************************
1005     * concatDelayError - When one flux can throw an error
1006     * ********************************************************************
1007     */
1008    @Test
1009    void test_concatDelayError() {
1010        Flux<String> flux1 = Flux.just("a", "b", "c")
1011                .map(s -> {
1012                    if (s.equals("b")) {
1013                        throw new RuntimeException("error!");
1014                    }
1015                    return s;
1016                });
1017        Flux<String> flux2 = Flux.just("d", "e", "f");
1018        Flux<String> flux3 = Flux.concatDelayError(flux1, flux2);
1019
1020        StepVerifier.create(flux3)
1021                .expectSubscription()
1022                .expectNext("a", "d", "e", "f")
1023                .expectError()
1024                .verify();
1025    }
1026
1027    /**
1028     * ********************************************************************
1029     *  combineLatest - will change order based on time. Rarely used.
1030     * ********************************************************************
1031     */
1032    @Test
1033    void test_combineLatest() {
1034        Flux<String> flux1 = Flux.just("a", "b");
1035        Flux<String> flux2 = Flux.just("c", "d");
1036        Flux<String> flux3 = Flux.combineLatest(flux1, flux2, (s1, s2) -> s1 + s2)
1037                .log();
1038        StepVerifier.create(flux3)
1039                .expectSubscription()
1040                .expectNext("bc", "bd")
1041                .verifyComplete();
1042    }
1043
1044    /**
1045     * ********************************************************************
1046     *  merge - subscribes to publishers eagerly, order not guaranteed, static function
1047     * ********************************************************************
1048     */
1049    @Test
1050    @SneakyThrows
1051    void test_merge() {
1052        Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
1053        Flux<String> flux2 = Flux.just("c", "d");
1054        //Eager will not wait till first flux3 finishes.
1055        Flux<String> flux3 = Flux.merge(flux1, flux2);
1056
1057        StepVerifier.create(flux3)
1058                .expectSubscription()
1059                .expectNext("c", "d", "a", "b")
1060                .verifyComplete();
1061    }
1062
1063    /**
1064     * ********************************************************************
1065     *  mergeWith - subscribes to publishers in eagerly, order not guaranteed, instance function
1066     * ********************************************************************
1067     */
1068    @Test
1069    @SneakyThrows
1070    void test_mergeWith() {
1071        Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
1072        Flux<String> flux2 = Flux.just("c", "d");
1073        //Eager will not wait till first flux finishes.
1074        Flux<String> flux3 = flux1.mergeWith(flux2);
1075
1076        StepVerifier.create(flux3)
1077                .expectSubscription()
1078                .expectNext("c", "d", "a", "b")
1079                .verifyComplete();
1080
1081        Mono aMono = Mono.just("a");
1082        Mono bMono = Mono.just("b");
1083        Flux flux4 = aMono.mergeWith(bMono);
1084        StepVerifier.create(flux4)
1085                .expectNext("a", "b")
1086                .verifyComplete();
1087    }
1088
1089    /**
1090     * ********************************************************************
1091     *  mergeSequential - subscribes to publishers eagerly, result is sequential.
1092     *  concat          - subscribes to publishers in sequence, result is sequential.
1093     * ********************************************************************
1094     */
1095    @Test
1096    @SneakyThrows
1097    void test_mergeSequential() {
1098        Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
1099        Flux<String> flux2 = Flux.just("c", "d");
1100        Flux<String> flux3 = Flux.mergeSequential(flux1, flux2, flux1);
1101
1102        StepVerifier.create(flux3)
1103                .expectSubscription()
1104                .expectNext("a", "b", "c", "d", "a", "b")
1105                .verifyComplete();
1106    }
1107
1108    /**
1109     * ********************************************************************
1110     * mergeDelayError - when one flux can throw an error
1111     * ********************************************************************
1112     */
1113    @Test
1114    void test_mergeDelayError() {
1115        Flux<String> flux1 = Flux.just("a", "b")
1116                .map(s -> {
1117                    if (s.equals("b")) {
1118                        throw new RuntimeException("error");
1119                    }
1120                    return s;
1121                }).doOnError(e -> log.error("Error: {}", e));
1122
1123        Flux<String> flux2 = Flux.just("c", "d");
1124        Flux<String> flux3 = Flux.mergeDelayError(1, flux1, flux2, flux1);
1125
1126        StepVerifier.create(flux3)
1127                .expectSubscription()
1128                .expectNext("a", "c", "d", "a")
1129                .expectError()
1130                .verify();
1131    }
1132
1133    /**
1134     * ********************************************************************
1135     *  zip - subscribes to publishers in eagerly, waits for both flux to emit one element.
1136     *  2-8 flux can be zipped, returns a tuple, Static function
1137     * ********************************************************************
1138     */
1139    @Test
1140    void test_zip() {
1141        Flux<String> flux1 = Flux.just("red", "yellow");
1142        Flux<String> flux2 = Flux.just("apple", "banana");
1143        Flux<String> flux3 = Flux.zip(flux1, flux2)
1144                .map(tuple -> {
1145                    return (tuple.getT1() + " " + tuple.getT2());
1146                });
1147        flux3.subscribe(System.out::println);
1148        StepVerifier.create(flux3)
1149                .expectNext("red apple")
1150                .expectNext("yellow banana")
1151                .verifyComplete();
1152
1153        //Third argument is combinator lambda
1154        Flux<Integer> firstFlux = Flux.just(1, 2, 3);
1155        Flux<Integer> secondFlux = Flux.just(10, 20, 30, 40);
1156        //Define how the zip should happen
1157        Flux<Integer> zip = Flux.zip(firstFlux, secondFlux, (num1, num2) -> num1 + num2);
1158        StepVerifier
1159                .create(zip)
1160                .expectNext(11, 22, 33)
1161                .verifyComplete();
1162    }
1163
1164    /**
1165     * ********************************************************************
1166     *  zipWith - subscribes to publishers in eagerly, waits for both flux to emit one element.
1167     *  2-8 flux can be zipped, returns a tuple, Instance function
1168     *  When 2 different size flux are combined zipWith return the smaller item size new flux.
1169     * ********************************************************************
1170     */
1171    @Test
1172    void test_zipWith() {
1173        Flux<String> flux1 = Flux.just("red", "yellow");
1174        Flux<String> flux2 = Flux.just("apple", "banana");
1175        Flux<String> flux3 = flux1.zipWith(flux2)
1176                .map(tuple -> {
1177                    return (tuple.getT1() + " " + tuple.getT2());
1178                });
1179        StepVerifier.create(flux3)
1180                .expectNext("red apple")
1181                .expectNext("yellow banana")
1182                .verifyComplete();
1183
1184        Flux<String> flux4 = Flux.fromIterable(Arrays.asList("apple", "orange", "banana"))
1185                .zipWith(Flux.range(1, 5), (word, line) -> {
1186                    return line + ". " + word;
1187                });
1188        StepVerifier.create(flux4)
1189                .expectNext("1. apple")
1190                .expectNext("2. orange")
1191                .expectNext("3. banana")
1192                .verifyComplete();
1193    }
1194
1195    /**
1196     * ********************************************************************
1197     * Cant do zipWith to combine mono & flux.
1198     * Use the operator join
1199     * ********************************************************************
1200     */
1201    @Test
1202    void test_join() {
1203        Mono<String> mono = Mono.just("green");
1204        Flux<String> flux1 = Flux.just("apple", "banana")
1205                .join(mono, s -> Flux.never(), s -> Flux.never(), Tuples::of)
1206                .flatMap(tuple -> {
1207                    return Mono.just(tuple.getT2() + " " + tuple.getT1());
1208                });
1209        StepVerifier.create(flux1)
1210                .expectNext("green apple")
1211                .expectNext("green banana")
1212                .verifyComplete();
1213
1214        Flux<String> flux2 = Flux.just("apple", "banana")
1215                .zipWith(mono.cache().repeat())
1216                .flatMap(tuple -> {
1217                    return Mono.just(tuple.getT2() + " " + tuple.getT1());
1218                });
1219        StepVerifier.create(flux2)
1220                .expectNext("green apple")
1221                .expectNext("green banana")
1222                .verifyComplete();
1223    }
1224
1225    /**
1226     * ********************************************************************
1227     * error
1228     * ********************************************************************
1229     */
1230    @Test
1231    void test_onError() {
1232        Mono<String> mono1 = Mono.just("jack")
1233                .map(s -> {
1234                    throw new RuntimeException("ERROR");
1235                });
1236        mono1.subscribe(s -> log.info("name: {}", s), Throwable::printStackTrace);
1237        StepVerifier.create(mono1)
1238                .expectError(RuntimeException.class)
1239                .verify();
1240
1241        System.out.println("********************************************************************");
1242
1243        Mono<String> mono2 = Mono.just("jack")
1244                .flatMap(s -> {
1245                    return Mono.error(new RuntimeException("ERROR"));
1246                });
1247        mono2.subscribe(s -> log.info("name: {}", s), Throwable::printStackTrace);
1248
1249        StepVerifier.create(mono2)
1250                .expectError(RuntimeException.class)
1251                .verify();
1252    }
1253
1254    /**
1255     * ********************************************************************
1256     *  Error Recover Handling
1257     *  onErrorReturn - Return value on error
1258     * ********************************************************************
1259     */
1260    @Test
1261    void test_onErrorReturn() {
1262        Mono<Object> mono1 = Mono.error(new RuntimeException("error"))
1263                .onErrorReturn("Jack");
1264        StepVerifier.create(mono1)
1265                .expectNext("Jack")
1266                .verifyComplete();
1267    }
1268
1269    /**
1270     * ********************************************************************
1271     *  Error Recover Handling
1272     *  onErrorResume - Resume chain with new mono/flux.
1273     * ********************************************************************
1274     */
1275    @Test
1276    void test_onErrorResume() {
1277        Mono<Object> mono1 = Mono.error(new RuntimeException("error"))
1278                .onErrorResume(e -> Mono.just("Jack"));
1279        StepVerifier.create(mono1)
1280                .expectNext("Jack")
1281                .verifyComplete();
1282
1283        Mono<Object> mono2 = Mono.error(new RuntimeException("error"))
1284                .onErrorResume(s -> {
1285                    log.info("Inside on onErrorResume");
1286                    return Mono.just("Jack");
1287                })
1288                .log();
1289        StepVerifier.create(mono2)
1290                .expectNext("Jack")
1291                .verifyComplete();
1292    }
1293
1294    /**
1295     * ********************************************************************
1296     *  Error Recover Handling
1297     *  onErrorContinue - Continue chain even if error occurs
1298     * ********************************************************************
1299     */
1300    @Test
1301    void test_onErrorContinue() {
1302        Flux<String> flux =
1303                Flux.just("a", "b", "c")
1304                        .map(e -> {
1305                            if (e.equals("b")) {
1306                                throw new RuntimeException("error");
1307                            }
1308                            return e;
1309                        })
1310                        .concatWith(Mono.just("d"))
1311                        .onErrorContinue((ex, value) -> {
1312                            log.info("Exception: {}", ex);
1313                            log.info("value: {}", value);
1314                        });
1315        StepVerifier.create(flux)
1316                .expectNext("a", "c", "d")
1317                .verifyComplete();
1318    }
1319
1320    /**
1321     * ********************************************************************
1322     *  Error - Action
1323     *  doOnError - log the error, Side-effect operator.
1324     * ********************************************************************
1325     */
1326    @Test
1327    void test_doOnError() {
1328        Mono<Object> mono1 = Mono.error(new RuntimeException("error"))
1329                .doOnError(e -> log.error("Error: {}", e.getMessage()))
1330                .log();
1331        StepVerifier.create(mono1)
1332                .expectError(RuntimeException.class)
1333                .verify();
1334    }
1335
1336    /**
1337     * ********************************************************************
1338     *  Error - Action
1339     *  onErrorMap - Transform an error emitted
1340     * ********************************************************************
1341     */
1342    @Test
1343    void test_onErrorMap() {
1344        Flux flux = Flux.just("Jack", "Jill")
1345                .map(u -> {
1346                    if (u.equals("Jill")) {
1347                        //always do throw here, never do return.
1348                        throw new IllegalArgumentException("Not valid");
1349                    }
1350                    if (u.equals("Jack")) {
1351                        throw new ClassCastException("Not valid");
1352                    }
1353                    return u;
1354                }).onErrorMap(IllegalArgumentException.class, e -> {
1355                    log.info("Illegal Arg error");
1356                    throw new RuntimeException("Illegal Arg error!");
1357                }).onErrorMap(ClassCastException.class, e -> {
1358                    log.info("Class cast error");
1359                    throw new RuntimeException("Class cast error!");
1360                });
1361
1362        StepVerifier.create(flux)
1363                .expectErrorMessage("Class cast error!")
1364                .verify();
1365    }
1366
1367    /**
1368     * ********************************************************************
1369     *  retry
1370     * ********************************************************************
1371     */
1372    @Test
1373    void test_retry() {
1374        AtomicLong attemptCounter = new AtomicLong();
1375        Mono<String> mono = Mono.just("Jack")
1376                .flatMap(n -> {
1377                    return this.twoAttemptFunction(attemptCounter, n);
1378                })
1379                .retry(3);
1380        StepVerifier.create(mono)
1381                .assertNext(e -> {
1382                    assertThat(e).isEqualTo("Hello Jack");
1383                })
1384                .verifyComplete();
1385    }
1386
1387    private Mono<String> twoAttemptFunction(AtomicLong counter, String name) {
1388        Long attempt = counter.getAndIncrement();
1389        log.info("attempt value: {}", attempt);
1390        if (attempt < 2) {
1391            throw new RuntimeException("error");
1392        }
1393        return Mono.just("Hello " + name);
1394    }
1395
1396    /**
1397     * ********************************************************************
1398     *  retryWhen
1399     * ********************************************************************
1400     */
1401    @Test
1402    void test_retryWhen() {
1403        AtomicLong attemptCounter1 = new AtomicLong();
1404        RetryBackoffSpec retryFilter1 = Retry.backoff(3, Duration.ofSeconds(1))
1405                .filter(throwable -> throwable instanceof RuntimeException);
1406
1407        Mono<String> mono1 = Mono.just("Jack")
1408                .flatMap(e -> this.greetAfter2Failure(attemptCounter1, e))
1409                .retryWhen(retryFilter1);
1410        StepVerifier.create(mono1)
1411                .assertNext(e -> {
1412                    assertThat(e).isEqualTo("Hello Jack");
1413                })
1414                .verifyComplete();
1415
1416        AtomicLong attemptCounter2 = new AtomicLong();
1417        RetryBackoffSpec retryFilter2 = Retry.fixedDelay(1, Duration.ofSeconds(1))
1418                .filter(throwable -> throwable instanceof RuntimeException)
1419                .onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) ->
1420                        Exceptions.propagate(retrySignal.failure())
1421                ));
1422        Mono<String> mono2 = Mono.just("Jack")
1423                .flatMap(e -> this.greetAfter2Failure(attemptCounter2, e))
1424                .retryWhen(retryFilter2);
1425        StepVerifier.create(mono2)
1426                .expectErrorMessage("error")
1427                .verify();
1428    }
1429
1430    private Mono<String> greetAfter2Failure(AtomicLong attemptCounter, String name) {
1431        Long attempt = attemptCounter.getAndIncrement();
1432        log.info("attempt value: {}", attempt);
1433        if (attempt < 2) {
1434            throw new RuntimeException("error");
1435        }
1436        return Mono.just("Hello " + name);
1437    }
1438
1439    /**
1440     * ********************************************************************
1441     *  repeat - repeat an operation n times.
1442     * ********************************************************************
1443     */
1444    @Test
1445    void test_repeat() {
1446        Mono<List<String>> flux = Mono.defer(() -> {
1447                    return Mono.just("UUID " + UUID.randomUUID());
1448                })
1449                .repeat(5)
1450                .collectList();
1451        flux.subscribe(System.out::println);
1452
1453        StepVerifier.create(flux)
1454                .assertNext(e -> {
1455                    assertThat(e.size()).isEqualTo(6);
1456                })
1457                .verifyComplete();
1458    }
1459
1460    @Test
1461    void test_takeUntil() {
1462        AtomicLong counter = new AtomicLong();
1463        Mono<List<String>> flux = Mono.defer(() -> {
1464                    return Mono.just("UUID " + UUID.randomUUID());
1465                })
1466                .repeat()
1467                .takeUntil(e -> {
1468                    return counter.incrementAndGet() == 5;
1469                })
1470                .collectList();
1471
1472        StepVerifier.create(flux)
1473                .assertNext(e -> {
1474                    assertThat(e.size()).isEqualTo(5);
1475                })
1476                .verifyComplete();
1477    }
1478
1479    /**
1480     * ********************************************************************
1481     *  Subscribe onComplete, onError
1482     *  Never use this format of subscribe code, always use doOn operator
1483     * ********************************************************************
1484     */
1485    @Test
1486    void test_doOn() {
1487        Flux<Integer> numFlux = Flux.range(1, 5)
1488                .map(i -> {
1489                    if (i == 4) {
1490                        throw new RuntimeException("error");
1491                    }
1492                    return i;
1493                });
1494        numFlux.subscribe(s -> {
1495                    log.info("Number: {}", s);
1496                },
1497                Throwable::printStackTrace,
1498                () -> {
1499                    log.info("Done!");
1500                });
1501        StepVerifier.create(numFlux)
1502                .expectNext(1, 2, 3)
1503                .expectError(RuntimeException.class)
1504                .verify();
1505    }
1506
1507    /**
1508     * ********************************************************************
1509     * doOn - doOnSubscribe, doOnNext, doOnError, doFinally, doOnComplete
1510     * ********************************************************************
1511     */
1512    @Test
1513    void test_test_doOn_2() {
1514        Flux<Object> flux = Flux.error(new RuntimeException("error"))
1515                .doOnSubscribe(s -> System.out.println("Subscribed!"))
1516                .doOnRequest(s -> System.out.println("Requested!"))
1517                .doOnNext(p -> System.out.println("Next!"))
1518                .doOnComplete(() -> System.out.println("Completed!"))
1519                .doFinally((e) -> System.out.println("Signal: " + e))
1520                .doOnError((e) -> System.out.println("Error: " + e));
1521
1522        StepVerifier.create(flux)
1523                .expectError(RuntimeException.class)
1524                .verify();
1525
1526        StepVerifier.create(flux)
1527                .verifyError(RuntimeException.class);
1528
1529        Mono<Object> mono = Mono.error(new RuntimeException("error"))
1530                .doOnSubscribe(s -> System.out.println("Subscribed!"))
1531                .doOnRequest(s -> System.out.println("Requested!"))
1532                .doOnNext(p -> System.out.println("Next!"))
1533                .doFinally((e) -> System.out.println("Signal: " + e))
1534                .doOnError((e) -> System.out.println("Error: " + e))
1535                .doOnSuccess((e) -> System.out.println("Success!"));
1536
1537        StepVerifier.create(mono)
1538                .expectError(RuntimeException.class)
1539                .verify();
1540    }
1541
1542    @Test
1543    void test_doOn_3() {
1544        Flux flux = Flux.error(new RuntimeException("My Error"));
1545        flux.subscribe(
1546                onNext(),
1547                onError(),
1548                onComplete()
1549        );
1550    }
1551
1552    private Consumer<Object> onNext() {
1553        return o -> System.out.println("Received : " + o);
1554    }
1555
1556    private Consumer<Throwable> onError() {
1557        return e -> System.out.println("ERROR : " + e.getMessage());
1558    }
1559
1560    private Runnable onComplete() {
1561        return () -> System.out.println("Completed");
1562    }
1563
1564    /**
1565     * ********************************************************************
1566     *  StepVerifier - assertNext, thenRequest, thenCancel, expectError, expectErrorMessage
1567     * ********************************************************************
1568     */
1569    @Test
1570    void test_StepVerifier() {
1571        Flux flux1 = Flux.fromIterable(Arrays.asList("Jack", "Jill"));
1572        StepVerifier.create(flux1)
1573                .expectNextMatches(user -> user.equals("Jack"))
1574                .assertNext(user -> assertThat(user).isEqualTo("Jill"))
1575                .verifyComplete();
1576
1577        //Wait for 2 elements.
1578        StepVerifier.create(flux1)
1579                .expectNextCount(2)
1580                .verifyComplete();
1581
1582        //Request 1 value at a time, get 2 values then cancel.
1583        Flux flux2 = Flux.fromIterable(Arrays.asList("Jack", "Jill", "Raj"));
1584        StepVerifier.create(flux2, 1)
1585                .expectNext("JACK")
1586                .thenRequest(1)
1587                .expectNext("JILL")
1588                .thenCancel();
1589
1590        Mono<Object> mono1 = Mono.error(new RuntimeException("My Error"));
1591        StepVerifier.create(mono1)
1592                .expectError(RuntimeException.class)
1593                .verify();
1594        StepVerifier.create(mono1)
1595                .expectErrorMessage("My Error")
1596                .verify();
1597    }
1598
1599    /**
1600     * ********************************************************************
1601     *  flux error propagate
1602     * ********************************************************************
1603     */
1604    @Test
1605    void test_propagate() {
1606        Flux flux = Flux.just("Jack", "Jill")
1607                .map(u -> {
1608                    try {
1609                        return HelperUtil.checkName(u);
1610                    } catch (HelperUtil.CustomException e) {
1611                        throw Exceptions.propagate(e);
1612                    }
1613                });
1614        flux.subscribe(System.out::println);
1615        StepVerifier.create(flux)
1616                .expectNext("JACK")
1617                .verifyError(HelperUtil.CustomException.class);
1618    }
1619
1620    /**
1621     * ********************************************************************
1622     *  subscribeOn - influences upstream (whole chain)
1623     * ********************************************************************
1624     */
1625    @Test
1626    void test_subscribeOn() {
1627        Flux numbFlux = Flux.range(1, 5)
1628                .map(i -> {
1629                    log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1630                    return i;
1631                }).subscribeOn(Schedulers.single())
1632                .map(i -> {
1633                    log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1634                    return i;
1635                });
1636        numbFlux.subscribe();
1637    }
1638
1639    @SneakyThrows
1640    @Test
1641    void test_subscribeOn_t1() {
1642        Flux<Integer> flux1 = Flux.range(0, 2)
1643                .map(i -> {
1644                    //will run on incoming thread
1645                    log.info("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
1646                    return i;
1647                });
1648        Runnable r1 = () -> flux1.subscribe(s -> {
1649            log.info("Received " + s + " via " + Thread.currentThread().getName());
1650        });
1651        Thread t1 = new Thread(r1, "t1");
1652        log.info("Program thread :: " + Thread.currentThread().getName());
1653        t1.start();
1654        t1.join();
1655    }
1656
1657    @SneakyThrows
1658    @Test
1659    void test_subscribeOn_t2() {
1660        Flux<Integer> flux2 = Flux.range(0, 2)
1661                .map(i -> {
1662                    //will run on incoming thread
1663                    log.info("Upstream: Mapping for {} is done by thread {}", i, Thread.currentThread().getName());
1664                    return i;
1665                })
1666                .publishOn(Schedulers.single())
1667                .map(i -> {
1668                    //will run on new thread
1669                    log.info("Downstream: Mapping for {} is done by thread {}", i, Thread.currentThread().getName());
1670                    return i;
1671                });
1672        Runnable r2 = () -> flux2.subscribe(s -> {
1673            log.info("Received {} via {}", s, Thread.currentThread().getName());
1674        });
1675        Thread t2 = new Thread(r2, "t2");
1676        log.info("Program thread {}" + Thread.currentThread().getName());
1677        t2.start();
1678        t2.join();
1679    }
1680
1681    @SneakyThrows
1682    @Test
1683    void test_subscribeOn_t3() {
1684        Flux<Integer> flux3 = Flux.range(0, 2)
1685                .map(i -> {
1686                    //will run on new thread
1687                    log.info("Upstream: Mapping for {} is done by thread {}", i, Thread.currentThread().getName());
1688                    return i;
1689                })
1690                .subscribeOn(Schedulers.single())
1691                .map(i -> {
1692                    //will run on new thread
1693                    log.info("Downstream: Mapping for {} is done by thread {}", i, Thread.currentThread().getName());
1694                    return i;
1695                });
1696        Runnable r3 = () -> flux3.subscribe(s -> {
1697            log.info("Received {} via {}", s, Thread.currentThread().getName());
1698        });
1699        Thread t3 = new Thread(r3, "t2");
1700        log.info("Program thread {}" + Thread.currentThread().getName());
1701        t3.start();
1702        t3.join();
1703    }
1704
1705    /**
1706     * ********************************************************************
1707     *  Schedulers
1708     *
1709     * parallel - for CPU intensive tasks (computation), thread pool workers = number of CPU cores
1710     * newParallel - same as above but new pool
1711     * boundedElastic - for IO intensive tasks (network calls), thread pool contains 10 * number of CPU cores
1712     * newBoundedElastic - same as above but new pool
1713     * immediate - keep the execution in the current thread
1714     * single - single reusable thread for all the callers
1715     * newSingle - same as above but new pool
1716     * elastic - unlimited threads (DON'T USE)
1717     *
1718     * We can have multiple publishOn methods which will keep switching the context.
1719     * The subscribeOn method can not do that. Only the very first subscribeOn method which is close to the source takes precedence.
1720     * ********************************************************************
1721     */
1722    @Test
1723    void test_Schedulers() {
1724        Flux numbFlux = Flux.range(1, 5)
1725                .map(i -> {
1726                    log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1727                    return i;
1728                }).subscribeOn(Schedulers.newSingle("my-thread"))
1729                .map(i -> {
1730                    log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1731                    return i;
1732                });
1733        numbFlux.subscribe();
1734    }
1735
1736    /**
1737     * ********************************************************************
1738     *  publishOn - influences downstream
1739     * ********************************************************************
1740     */
1741    @Test
1742    void test_publishOn() {
1743        Flux numbFlux = Flux.range(1, 5)
1744                .map(i -> {
1745                    log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1746                    return i;
1747                }).publishOn(Schedulers.single())
1748                .map(i -> {
1749                    log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1750                    return i;
1751                });
1752        numbFlux.subscribe();
1753    }
1754
1755    /**
1756     * ********************************************************************
1757     *  publishOn - influences downstream
1758     * ********************************************************************
1759     */
1760    @Test
1761    void test_publishOn_2() {
1762        Flux numbFlux = Flux.range(1, 5)
1763                .map(i -> {
1764                    log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1765                    return i;
1766                }).publishOn(Schedulers.newSingle("my-thread"))
1767                .map(i -> {
1768                    log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1769                    return i;
1770                });
1771        numbFlux.subscribe();
1772    }
1773
1774    /**
1775     * ********************************************************************
1776     *  fromSupplier - returns a value
1777     *  fromCallable - returns a value or exception
1778     *  fromRunnable - doesnt return value
1779     * ********************************************************************
1780     */
1781    @Test
1782    public void test_fromSupplier() {
1783        Supplier<String> stringSupplier = () -> getName();
1784        Mono<String> mono = Mono.fromSupplier(stringSupplier);
1785
1786        mono.subscribe(System.out::println);
1787    }
1788
1789    /**
1790     * ********************************************************************
1791     *  fromSupplier - returns a value
1792     *  fromCallable - returns a value or exception
1793     *  fromRunnable - doesnt return value
1794     * ********************************************************************
1795     */
1796    @Test
1797    public void test_fromCallable() {
1798        Callable<String> stringCallable = () -> getName();
1799        Mono<String> mono = Mono.fromCallable(stringCallable)
1800                .subscribeOn(Schedulers.boundedElastic());
1801        mono.subscribe(System.out::println);
1802    }
1803
1804    /**
1805     * ********************************************************************
1806     *  fromSupplier - returns a value
1807     *  fromCallable - returns a value or exception
1808     *  fromRunnable - doesnt return value
1809     * ********************************************************************
1810     */
1811    @Test
1812    public void test_fromRunnable() {
1813        Runnable stringCallable = () -> getName();
1814        Mono<Object> mono = Mono.fromRunnable(stringCallable)
1815                .subscribeOn(Schedulers.boundedElastic());
1816        mono.subscribe(System.out::println);
1817    }
1818
1819    /**
1820     * ********************************************************************
1821     *  fromCallable - read file may be blocking, we don't want to block main thread.
1822     * ********************************************************************
1823     */
1824    @Test
1825    @SneakyThrows
1826    void test_readFile_fromCallable() {
1827        Mono<List<String>> listMono = Mono.fromCallable(() -> Files.readAllLines(Path.of("src/test/resources/file.txt")))
1828                .subscribeOn(Schedulers.boundedElastic());
1829        listMono.subscribe(l -> log.info("Lines: {}", l));
1830
1831        StepVerifier.create(listMono)
1832                .expectSubscription()
1833                .thenConsumeWhile(l -> {
1834                    assertThat(l.isEmpty()).isFalse();
1835                    return true;
1836                })
1837                .verifyComplete();
1838    }
1839
1840    /**
1841     * ********************************************************************
1842     * Flux.using(
1843     *     resourceSupplier,
1844     *     (resource) -> return Publisher,
1845     *     (resource) -> clean this up
1846     * )
1847     *
1848     * share() creates a hot publisher, else it would be a cold publisher.
1849     * Cold publisher would read the file for each subscriber – that would mean opening and reading the same file many times.
1850     * ********************************************************************
1851     */
1852    @Test
1853    void test_readFile_using() {
1854        Path filePath = Paths.get("src/test/resources/file.txt");
1855        Flux<String> fileFlux = Flux.using(
1856                () -> Files.lines(filePath),
1857                Flux::fromStream,
1858                Stream::close
1859        );
1860        fileFlux.subscribe(l -> log.info("Lines: {}", l));
1861
1862        Flux<String> fileFlux2 = fileFlux
1863                .subscribeOn(Schedulers.newParallel("file-copy", 3))
1864                .share();
1865        fileFlux2.subscribe(l -> log.info("Lines: {}", l));
1866    }
1867
1868    /**
1869     * ********************************************************************
1870     *  ParallelFlux - Will complete in 1 sec even when 3 ops take 3 seconds in sequence
1871     * ********************************************************************
1872     */
1873    @Test
1874    void test_parallel() {
1875        log.info("Cores: {}", Runtime.getRuntime().availableProcessors());
1876        ParallelFlux<String> flux1 = Flux.just("apple", "orange", "banana")
1877                .parallel()
1878                .runOn(Schedulers.parallel())
1879                .map(HelperUtil::capitalizeString);
1880        StepVerifier.create(flux1)
1881                .expectNextCount(3)
1882                .verifyComplete();
1883
1884
1885        Flux<String> flux2 = Flux.just("apple", "orange", "banana")
1886                .flatMap(name -> {
1887                    return Mono.just(name)
1888                            .map(HelperUtil::capitalizeString)
1889                            .subscribeOn(Schedulers.parallel());
1890                });
1891        StepVerifier.create(flux2)
1892                .expectNextCount(3)
1893                .verifyComplete();
1894    }
1895
1896    /**
1897     * ********************************************************************
1898     *  flatMap Parallelism - Will complete in 1 sec even when 3 ops take 3 seconds in sequence
1899     * ********************************************************************
1900     */
1901    @Test
1902    void test_parallel_2() {
1903        Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1904                .flatMap(name -> {
1905                    return Mono.just(name)
1906                            .map(HelperUtil::capitalizeString)
1907                            .subscribeOn(Schedulers.parallel());
1908                });
1909        StepVerifier.create(flux1)
1910                .expectNextCount(3)
1911                .verifyComplete();
1912    }
1913
1914    /**
1915     * ********************************************************************
1916     *  flatMap - fire-forget jobs with subscribe, Will run async jobs
1917     * ********************************************************************
1918     */
1919    @SneakyThrows
1920    @Test
1921    void fireForgetTest() {
1922        CountDownLatch latch = new CountDownLatch(3);
1923        Flux<Object> flux1 = Flux.just("apple", "orange", "banana")
1924                .flatMap(fruit -> {
1925                    Mono.just(fruit)
1926                            .map(e -> HelperUtil.capitalizeStringLatch(e, latch))
1927                            .subscribeOn(Schedulers.parallel())
1928                            .subscribe();
1929                    return Mono.empty();
1930                });
1931        StepVerifier.create(flux1)
1932                .verifyComplete();
1933        latch.await(5, TimeUnit.SECONDS);
1934    }
1935
1936    /**
1937     * ********************************************************************
1938     *  flatMapSequential - Maintains order but executes in parallel
1939     * ********************************************************************
1940     */
1941    @Test
1942    void test_flatMapSequential() {
1943        Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1944                .flatMapSequential(name -> {
1945                    return Mono.just(name)
1946                            .map(HelperUtil::capitalizeString)
1947                            .subscribeOn(Schedulers.parallel());
1948                });
1949        StepVerifier.create(flux1)
1950                .expectNext("APPLE", "ORANGE", "BANANA")
1951                .verifyComplete();
1952    }
1953
1954    /**
1955     * ********************************************************************
1956     *  flatMapSequential - Maintains order but executes in parallel
1957     * ********************************************************************
1958     */
1959    @Test
1960    void test_flatMapSequential_2() {
1961        Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1962                .flatMapSequential(name -> {
1963                    return Mono.just(name)
1964                            .map(HelperUtil::capitalizeString)
1965                            .subscribeOn(Schedulers.parallel());
1966                }, 1)
1967                .log();
1968        StepVerifier.create(flux1)
1969                .expectNext("APPLE", "ORANGE", "BANANA")
1970                .verifyComplete();
1971    }
1972
1973    /**
1974     * ********************************************************************
1975     *  withVirtualTime - flux that emits every second.
1976     *  interval - blocks thread, so you will have to use sleep to see the output
1977     * ********************************************************************
1978     */
1979    @Test
1980    @SneakyThrows
1981    void test_withVirtualTime() {
1982        VirtualTimeScheduler.getOrSet();
1983        Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1))
1984                .log()
1985                .take(10);
1986        intervalFlux.subscribe(i -> log.info("Number: {}", i));
1987        TimeUnit.SECONDS.sleep(5);
1988        StepVerifier.withVirtualTime(() -> intervalFlux)
1989                .expectSubscription()
1990                .expectNoEvent(Duration.ofMillis(999))
1991                .thenAwait(Duration.ofSeconds(5))
1992                .expectNextCount(4)
1993                .thenCancel()
1994                .verify();
1995    }
1996
1997    /**
1998     * ********************************************************************
1999     *  flux that emits every day. Use of virtual time to simulate days.
2000     * ********************************************************************
2001     */
2002    @Test
2003    @SneakyThrows
2004    void test_withVirtualTime_2() {
2005        VirtualTimeScheduler.getOrSet();
2006        StepVerifier.withVirtualTime(this::getTake)
2007                .expectSubscription()
2008                .expectNoEvent(Duration.ofDays(1))
2009                .thenAwait(Duration.ofDays(1))
2010                .expectNext(0L)
2011                .thenAwait(Duration.ofDays(1))
2012                .expectNext(1L)
2013                .thenCancel()
2014                .verify();
2015    }
2016
2017    private Flux<Long> getTake() {
2018        return Flux.interval(Duration.ofDays(1))
2019                .log()
2020                .take(10);
2021    }
2022
2023    /**
2024     * ********************************************************************
2025     *  then - will just replay the source terminal signal, resulting in a Mono<Void> to indicate that this never signals any onNext.
2026     *  thenEmpty - not only returns a Mono<Void>, but it takes a Mono<Void> as a parameter. It represents a concatenation of the source completion signal then the second, empty Mono completion signal. In other words, it completes when A then B have both completed sequentially, and doesn't emit data.
2027     *  thenMany - waits for the source to complete then plays all the signals from its Publisher<R> parameter, resulting in a Flux<R> that will "pause" until the source completes, then emit the many elements from the provided publisher before replaying its completion signal as well.
2028     * ********************************************************************
2029     */
2030    @Test
2031    void test_thenManyChain() {
2032        Flux<String> names = Flux.just("Jack", "Jill");
2033        names.map(String::toUpperCase)
2034                .thenMany(HelperUtil.deleteFromDb())
2035                .thenMany(HelperUtil.saveToDb())
2036                .subscribe(System.out::println);
2037    }
2038
2039    @Test
2040    void test_thenEmpty() {
2041        Flux<String> names = Flux.just("Jack", "Jill");
2042        names.map(String::toUpperCase)
2043                .thenMany(HelperUtil.saveToDb())
2044                .thenEmpty(HelperUtil.sendMail())
2045                .subscribe(System.out::println);
2046    }
2047
2048    @Test
2049    void test_then() {
2050        Flux<String> names = Flux.just("Jack", "Jill");
2051        names.map(String::toUpperCase)
2052                .thenMany(HelperUtil.saveToDb())
2053                .then()
2054                .then(Mono.just("Ram"))
2055                .thenReturn("Done!")
2056                .subscribe(System.out::println);
2057    }
2058
2059    /**
2060     * ********************************************************************
2061     *  firstWithValue - first mono to return
2062     * ********************************************************************
2063     */
2064    @Test
2065    void test_monoFirst() {
2066        Mono<String> mono1 = Mono.just("Jack").delayElement(Duration.ofSeconds(1));
2067        Mono<String> mono2 = Mono.just("Jill");
2068        //Return the mono which returns its value faster
2069        Mono<String> mono3 = Mono.firstWithValue(mono1, mono2);
2070        mono3.subscribe(System.out::println);
2071        StepVerifier.create(mono3)
2072                .expectNext("Jill")
2073                .verifyComplete();
2074    }
2075
2076    /**
2077     * ********************************************************************
2078     *  buffer
2079     * ********************************************************************
2080     */
2081    @Test
2082    public void test_bufferGroup() {
2083        Flux<List<Integer>> flux1 = Flux
2084                .range(1, 7)
2085                .buffer(2);
2086        StepVerifier
2087                .create(flux1)
2088                .expectNext(Arrays.asList(1, 2))
2089                .expectNext(Arrays.asList(3, 4))
2090                .expectNext(Arrays.asList(5, 6))
2091                .expectNext(Arrays.asList(7))
2092                .verifyComplete();
2093    }
2094
2095    @Test
2096    @SneakyThrows
2097    void test_tickClock() {
2098        Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
2099        Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
2100        Flux.merge(fastClock, slowClock).subscribe(System.out::println);
2101        TimeUnit.SECONDS.sleep(5);
2102    }
2103
2104    @Test
2105    @SneakyThrows
2106    public void test_tickMergeClock() {
2107        Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
2108        Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
2109        Flux clock = Flux.merge(slowClock, fastClock);
2110        Flux feed = Flux.interval(Duration.ofSeconds(1)).map(tick -> LocalTime.now());
2111        clock.withLatestFrom(feed, (tick, time) -> tick + " " + time).subscribe(System.out::println);
2112        TimeUnit.SECONDS.sleep(15);
2113    }
2114
2115    @Test
2116    @SneakyThrows
2117    void test_tickZipClock() {
2118        Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
2119        Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
2120        fastClock.zipWith(slowClock, (tick, time) -> tick + " " + time).subscribe(System.out::println);
2121        TimeUnit.SECONDS.sleep(5);
2122    }
2123
2124    @Test
2125    @SneakyThrows
2126    void test_emitter() {
2127        MyFeed myFeed = new MyFeed();
2128        Flux feedFlux = Flux.create(emmiter -> {
2129            myFeed.register(new MyListener() {
2130                @Override
2131                public void priceTick(String msg) {
2132                    emmiter.next(msg);
2133                }
2134
2135                @Override
2136                public void error(Throwable error) {
2137                    emmiter.error(error);
2138                }
2139            });
2140        }, FluxSink.OverflowStrategy.LATEST);
2141        feedFlux.subscribe(System.out::println);
2142        TimeUnit.SECONDS.sleep(15);
2143        System.out.println("Sending message!");
2144        for (int i = 0; i < 10; i++) {
2145            myFeed.sendMessage("HELLO_" + i);
2146        }
2147    }
2148
2149    /**
2150     * ********************************************************************
2151     *  cancel subscription
2152     * ********************************************************************
2153     */
2154    @Test
2155    void test_monoCancelSubscription() {
2156        Mono<String> helloMono = Mono.just("Jack")
2157                .log()
2158                .map(String::toUpperCase);
2159        helloMono.subscribe(s -> {
2160                    log.info("Got: {}", s);
2161                },
2162                Throwable::printStackTrace,
2163                () -> log.info("Finished"),
2164                Subscription::cancel
2165        );
2166    }
2167
2168    /**
2169     * ********************************************************************
2170     *  cancel subscription after n elements
2171     * ********************************************************************
2172     */
2173    @Test
2174    void test_request() {
2175        //Jill won't be fetched as subscription will be cancelled after 2 elements
2176        Flux<String> namesMono = Flux.just("Jack", "Jane", "Jill")
2177                .log();
2178        namesMono.subscribe(s -> {
2179                    log.info("Got: {}", s);
2180                },
2181                Throwable::printStackTrace,
2182                () -> log.info("Finished"),
2183                subscription -> subscription.request(2));
2184    }
2185
2186    /**
2187     * ********************************************************************
2188     *  backpressure
2189     * ********************************************************************
2190     */
2191    @Test
2192    void test_fluxBackPressure() {
2193        Flux<Integer> fluxNumber = Flux.range(1, 5).log();
2194
2195        //Fetches 2 at a time.
2196        fluxNumber.subscribe(new BaseSubscriber<>() {
2197            private final int requestCount = 2;
2198            private int count = 0;
2199
2200            @Override
2201            protected void hookOnSubscribe(Subscription subscription) {
2202                request(requestCount);
2203            }
2204
2205            @Override
2206            protected void hookOnNext(Integer value) {
2207                count++;
2208                if (count >= requestCount) {
2209                    count = 0;
2210                    log.info("requesting next batch!");
2211                    request(requestCount);
2212                }
2213            }
2214        });
2215    }
2216
2217    /**
2218     * ********************************************************************
2219     *  onBackpressureDrop - fetches all in unbounded request, but stores in internal queue, drops elements not used
2220     * ********************************************************************
2221     */
2222    @Test
2223    void test_fluxBackPressureDrop() {
2224        Flux<Integer> fluxNumber = Flux.range(1, 15).log();
2225
2226        //Fetches 2 at a time.
2227        fluxNumber
2228                .onBackpressureDrop(item -> {
2229                    log.info("Dropped {}", item);
2230                })
2231                .subscribe(new BaseSubscriber<>() {
2232                    private final int requestCount = 2;
2233                    private int count = 0;
2234                    private int batch = 0;
2235
2236                    @Override
2237                    protected void hookOnSubscribe(Subscription subscription) {
2238                        request(requestCount);
2239                    }
2240
2241                    @Override
2242                    protected void hookOnNext(Integer value) {
2243                        if (batch > 2) {
2244                            return;
2245                        }
2246                        count++;
2247                        if (count >= requestCount) {
2248                            count = 0;
2249                            batch++;
2250                            log.info("requesting next batch {}", batch);
2251                            request(requestCount);
2252                        }
2253
2254                    }
2255                });
2256    }
2257
2258    /**
2259     * ********************************************************************
2260     *  onBackpressureBuffer - fetches all in unbounded request, but stores in internal queue, but doesnt drop unused items
2261     * ********************************************************************
2262     */
2263    @Test
2264    void test_fluxBackPressureBuffet() {
2265        Flux<Integer> fluxNumber = Flux.range(1, 15).log();
2266
2267        //Fetches 2 at a time.
2268        fluxNumber
2269                .onBackpressureBuffer()
2270                .subscribe(new BaseSubscriber<>() {
2271                    private final int requestCount = 2;
2272                    private int count = 0;
2273                    private int batch = 0;
2274
2275                    @Override
2276                    protected void hookOnSubscribe(Subscription subscription) {
2277                        request(requestCount);
2278                    }
2279
2280                    @Override
2281                    protected void hookOnNext(Integer value) {
2282                        if (batch > 2) {
2283                            return;
2284                        }
2285                        count++;
2286                        if (count >= requestCount) {
2287                            count = 0;
2288                            batch++;
2289                            log.info("requesting next batch {}", batch);
2290                            request(requestCount);
2291                        }
2292
2293                    }
2294                });
2295    }
2296
2297    /**
2298     * ********************************************************************
2299     *  onBackpressureError - To identify if receiver is overrun by items as producer is producing more elements than can be processed.
2300     * ********************************************************************
2301     */
2302    @Test
2303    void test_fluxBackPressureOnError() {
2304        Flux<Integer> fluxNumber = Flux.range(1, 15).log();
2305
2306        //Fetches 2 at a time.
2307        fluxNumber
2308                .onBackpressureError()
2309                .subscribe(new BaseSubscriber<>() {
2310                    private final int requestCount = 2;
2311                    private int count = 0;
2312                    private int batch = 0;
2313
2314                    @Override
2315                    protected void hookOnSubscribe(Subscription subscription) {
2316                        request(requestCount);
2317                    }
2318
2319                    @Override
2320                    protected void hookOnError(Throwable throwable) {
2321                        log.error("Error thrown is: {}", throwable.getMessage());
2322                    }
2323
2324                    @Override
2325                    protected void hookOnNext(Integer value) {
2326                        if (batch > 2) {
2327                            return;
2328                        }
2329                        count++;
2330                        if (count >= requestCount) {
2331                            count = 0;
2332                            batch++;
2333                            log.info("requesting next batch {}", batch);
2334                            request(requestCount);
2335                        }
2336
2337                    }
2338                });
2339    }
2340
2341    /**
2342     * ********************************************************************
2343     *  backpressure - limit rate
2344     * ********************************************************************
2345     */
2346    @Test
2347    void test_fluxBackPressureLimitRate() {
2348        Flux<Integer> fluxNumber = Flux.range(1, 5)
2349                .log()
2350                .limitRate(3);
2351        StepVerifier.create(fluxNumber)
2352                .expectNext(1, 2, 3, 4, 5)
2353                .verifyComplete();
2354    }
2355
2356    /**
2357     * ********************************************************************
2358     *  cold flux - producing/emitting only when a subscriber subscribes, generates new sets of values for each new subscription, eg: spotify
2359     *  hot flux - emitting happens even there is no subscriber. All the subscribers get the value from the single data producer irrespective of the time they started subscribing, eg: radio
2360     * ********************************************************************
2361     */
2362    @Test
2363    @SneakyThrows
2364    void test_connectableFlux() {
2365        ConnectableFlux<Integer> connectableFlux = Flux.range(1, 10)
2366                .delayElements(Duration.ofSeconds(1))
2367                .publish();
2368        connectableFlux.connect();
2369
2370        TimeUnit.SECONDS.sleep(3);
2371        connectableFlux.subscribe(i -> {
2372            log.info("Sub1 Number: {}", i);
2373        });
2374
2375        TimeUnit.SECONDS.sleep(2);
2376        connectableFlux.subscribe(i -> {
2377            log.info("Sub2 Number: {}", i);
2378        });
2379
2380        ConnectableFlux<Integer> connectableFlux2 = Flux.range(1, 10)
2381                .delayElements(Duration.ofSeconds(1))
2382                .publish();
2383        StepVerifier.create(connectableFlux2)
2384                .then(connectableFlux2::connect)
2385                .thenConsumeWhile(i -> i <= 5)
2386                .expectNext(6, 7, 8, 9, 10)
2387                .expectComplete()
2388                .verify();
2389    }
2390
2391    /**
2392     * ********************************************************************
2393     *  hot flux - auto connect, min subscribers required before publisher emits
2394     * ********************************************************************
2395     */
2396    @Test
2397    @SneakyThrows
2398    void test_connectableAutoFlux() {
2399        //Hot Flux.
2400        Flux<Integer> connectableFlux = Flux.range(1, 5)
2401                .log()
2402                .delayElements(Duration.ofSeconds(1))
2403                .publish()
2404                .autoConnect(2);
2405
2406        //2 subscribers
2407        StepVerifier.create(connectableFlux)
2408                .then(connectableFlux::subscribe)
2409                .expectNext(1, 2, 3, 4, 5)
2410                .expectComplete()
2411                .verify();
2412    }
2413
2414    /**
2415     * ********************************************************************
2416     *  hot flux - ref count, if subscriber count goes down, publisher stops emitting
2417     * ********************************************************************
2418     */
2419    @Test
2420    @SneakyThrows
2421    void test_connectableFlux_1() {
2422        //Hot Flux.
2423        Flux<Integer> connectableFlux = Flux.range(1, 15)
2424                .delayElements(Duration.ofSeconds(1))
2425                .doOnCancel(() -> {
2426                    log.info("Received cancel");
2427                })
2428                .publish()
2429                .refCount(2);
2430
2431        //Min 2 subscribers required
2432        Disposable subscribe1 = connectableFlux.subscribe(e -> log.info("Sub1: " + e));
2433        Disposable subscribe2 = connectableFlux.subscribe(e -> log.info("Sub2: " + e));
2434        TimeUnit.SECONDS.sleep(3);
2435        subscribe1.dispose();
2436        subscribe2.dispose();
2437        TimeUnit.SECONDS.sleep(5);
2438    }
2439
2440    /**
2441     * ********************************************************************
2442     *  defer
2443     * ********************************************************************
2444     */
2445    @Test
2446    @SneakyThrows
2447    void test_defer() {
2448        Mono<UUID> just = Mono.just(UUID.randomUUID());
2449        Mono<UUID> deferJust = Mono.defer(() -> Mono.just(UUID.randomUUID()));
2450
2451        just.subscribe(l -> log.info("UUID: {}", l));
2452        just.subscribe(l -> log.info("UUID: {}", l));
2453        System.out.println();
2454        deferJust.subscribe(l -> log.info("UUID: {}", l));
2455        deferJust.subscribe(l -> log.info("UUID: {}", l));
2456    }
2457
2458    /**
2459     * ********************************************************************
2460     *  onSchedulersHook - if you have to use thread local
2461     * ********************************************************************
2462     */
2463    @Test
2464    public void test_onScheduleHook() {
2465        Runnable stringCallable = () -> getName();
2466        Schedulers.onScheduleHook("myHook", runnable -> {
2467            log.info("before scheduled runnable");
2468            return () -> {
2469                log.info("before execution");
2470                runnable.run();
2471                log.info("after execution");
2472            };
2473        });
2474        Mono.just("Hello world")
2475                .subscribeOn(Schedulers.single())
2476                .subscribe(System.out::println);
2477    }
2478
2479    /**
2480     * ********************************************************************
2481     *  checkpoint
2482     * ********************************************************************
2483     */
2484    @Test
2485    void test_checkpoint() {
2486        Flux flux = Flux.just("Jack", "Jill", "Joe")
2487                .checkpoint("before uppercase")
2488                .map(e -> e.toUpperCase())
2489                .checkpoint("after uppercase")
2490                .filter(e -> e.length() > 3)
2491                .checkpoint("after filter")
2492                .map(e -> new RuntimeException("Custom error!"));
2493        flux.subscribe(System.out::println);
2494    }
2495
2496    /**
2497     * ********************************************************************
2498     *  checkpoint
2499     * ********************************************************************
2500     */
2501    @Test
2502    void flux_test_debugAgent() {
2503        ReactorDebugAgent.init();
2504        ReactorDebugAgent.processExistingClasses();
2505        Flux flux = Flux.just("a")
2506                .concatWith(Flux.error(new IllegalArgumentException("My Error!")))
2507                .onErrorMap(ex -> {
2508                    log.error("Exception: {}", ex.getMessage());
2509                    return new IllegalStateException("New Error!");
2510                });
2511        flux.subscribe(System.out::println);
2512    }
2513
2514    /**
2515     * ********************************************************************
2516     * Flux.generate - programmatically create flux, synchronous, cant emit without downstream subscriber asking for it.
2517     * Flux.create - programmatically create flux, asynchronous, can emit more elements without downstream subscriber asking for it.
2518     * ********************************************************************
2519     */
2520    @Test
2521    void test_flux_generate() {
2522        Flux<Integer> flux = Flux.generate(() -> 1, (state, sink) -> {
2523            sink.next(state * 2);
2524            if (state == 10) {
2525                sink.complete();
2526            }
2527            return state + 1;
2528        });
2529        flux.subscribe(System.out::println);
2530        StepVerifier.create(flux)
2531                .expectNextCount(10)
2532                .verifyComplete();
2533        System.out.println();
2534    }
2535
2536    /**
2537     * ********************************************************************
2538     * Flux.generate - programmatically create flux, synchronous
2539     * Flux.create - programmatically create flux, asynchronous
2540     *
2541     * buffer - buffer if downstream cant keep up
2542     * drop - drop if downstream cant keep up
2543     * error - singal error when downstream cant keep up
2544     * ignore - ignore downstream backpressure requests
2545     * latest - downstream will only get latest
2546     * ********************************************************************
2547     */
2548    @Test
2549    void test_flux_create() {
2550        List<String> names = Arrays.asList("jack", "jill");
2551        Flux<String> flux = Flux.create(sink -> {
2552            names.forEach(sink::next);
2553            sink.complete();
2554        });
2555
2556        StepVerifier.create(flux)
2557                .expectNextCount(2)
2558                .verifyComplete();
2559
2560        Flux<Integer> integerFlux = Flux.create((FluxSink<Integer> fluxSink) -> {
2561            IntStream.range(0, 5)
2562                    .peek(i -> System.out.println("going to emit - " + i))
2563                    .forEach(fluxSink::next);
2564            fluxSink.complete();
2565        });
2566
2567        StepVerifier.create(integerFlux)
2568                .expectNextCount(5)
2569                .verifyComplete();
2570
2571        Flux<Integer> integerFlux2 = Flux.create((FluxSink<Integer> fluxSink) -> {
2572            IntStream.range(0, 5)
2573                    .peek(i -> System.out.println("going to emit - " + i))
2574                    .forEach(fluxSink::next);
2575            fluxSink.complete();
2576        }, FluxSink.OverflowStrategy.DROP);
2577
2578        StepVerifier.create(integerFlux2)
2579                .expectNextCount(5)
2580                .verifyComplete();
2581    }
2582
2583    @Test
2584    void test_chain() {
2585        CompanyVO request = new CompanyVO();
2586        request.setName("Twitter");
2587        Mono.just(request)
2588                .map(HelperUtil::convertToEntity)
2589                .zipWith(HelperUtil.getNameSuffix(), HelperUtil::appendSuffix)
2590                .flatMap(HelperUtil::addCompanyOwner)
2591                .flatMap(HelperUtil::appendOrgIdToDepartment)
2592                .flatMap(HelperUtil::save)
2593                .subscribe(System.out::println);
2594    }
2595
2596    /**
2597     * ********************************************************************
2598     * expand 	Finding the shortest path in a graph. Searching file system. Finding neighbor nodes in a network.
2599     * expandDeep 	Finding all possible combinations.
2600     * ********************************************************************
2601     */
2602    @Test
2603    void test_expand() {
2604        Employee CEO = new Employee("CEO");
2605
2606        // Directors reporting to CEO
2607        Employee directorA = new Employee("Director of Dept A");
2608        Employee directorB = new Employee("Director of Dept B");
2609        CEO.addDirectReports(directorA, directorB);
2610
2611        // Managers reporting to directors
2612        Employee managerA1 = new Employee("Manager 1 of Dept A");
2613        Employee managerA2 = new Employee("Manager 2 of Dept A");
2614        Employee managerB1 = new Employee("Manager 1 of Dept B");
2615        Employee managerB2 = new Employee("Manager 2 of Dept B");
2616        directorA.addDirectReports(managerA1, managerA2);
2617        directorB.addDirectReports(managerB1, managerB2);
2618
2619        Mono.fromSupplier(() -> CEO)
2620                .expand(this::getDirectReports)
2621                .subscribe(System.out::println);
2622    }
2623
2624    @Test
2625    void test_expandDeep() {
2626        Employee CEO = new Employee("CEO");
2627
2628        // Directors reporting to CEO
2629        Employee directorA = new Employee("Director of Dept A");
2630        Employee directorB = new Employee("Director of Dept B");
2631        CEO.addDirectReports(directorA, directorB);
2632
2633        // Managers reporting to directors
2634        Employee managerA1 = new Employee("Manager 1 of Dept A");
2635        Employee managerA2 = new Employee("Manager 2 of Dept A");
2636        Employee managerB1 = new Employee("Manager 1 of Dept B");
2637        Employee managerB2 = new Employee("Manager 2 of Dept B");
2638        directorA.addDirectReports(managerA1, managerA2);
2639        directorB.addDirectReports(managerB1, managerB2);
2640
2641        Mono.fromSupplier(() -> CEO)
2642                .expandDeep(this::getDirectReports)
2643                .subscribe(System.out::println);
2644    }
2645
2646    private Flux<Employee> getDirectReports(Employee employee) {
2647        return Flux.fromIterable(employee.getDirectReports());
2648    }
2649
2650    @Test
2651    void test_fluxToMono() {
2652        Mono<List<String>> mono = Flux.just("jack", "raj").collectList();
2653        Flux<List<String>> flux = Flux.just("jack", "raj").collectList().flatMapMany(Flux::just);
2654
2655        StepVerifier.create(mono)
2656                .expectNextCount(1)
2657                .verifyComplete();
2658
2659        StepVerifier.create(flux)
2660                .expectNextCount(1)
2661                .verifyComplete();
2662
2663    }
2664
2665    @Test
2666    void test_compareMapWithList() {
2667        List<String> colors = List.of("red", "blue", "green");
2668        Map<String, String> fruitMap = Map.of("red", "apple", "green", "grapes");
2669        Mono<List<String>> flux1 = Mono.just(fruitMap)
2670                .flatMap(map -> {
2671                    return Flux.fromIterable(colors)
2672                            .flatMap(color -> {
2673                                if (map.containsKey(color)) {
2674                                    return Mono.just(map.get(color));
2675                                }
2676                                return Mono.empty();
2677                            }).collectList();
2678                });
2679        flux1.subscribe(System.out::println);
2680
2681        StepVerifier.create(flux1)
2682                .expectNext(List.of("apple", "grapes"))
2683                .verifyComplete();
2684
2685        Flux<String> flux2 = Mono.just(fruitMap)
2686                .flatMapMany(map ->
2687                        Flux.fromIterable(colors)
2688                                .flatMap(color -> {
2689                                    String fruit = fruitMap.get(color);
2690                                    return fruit != null ? Flux.just(fruit) : Flux.empty();
2691                                })
2692                );
2693        flux2.subscribe(System.out::println);
2694        StepVerifier.create(flux2)
2695                .expectNext("apple")
2696                .expectNext("grapes")
2697                .verifyComplete();
2698    }
2699
2700    /**
2701     * ********************************************************************
2702     * timeout - if response doesnt come in certain time then timeout.
2703     * ********************************************************************
2704     */
2705    @Test
2706    void test_timeout() {
2707        Mono<String> mono = Mono.just("jack")
2708                .delayElement(Duration.ofSeconds(5))
2709                .timeout(Duration.ofSeconds(1))
2710                .onErrorReturn("raj");
2711        StepVerifier.create(mono)
2712                .expectNext("raj")
2713                .verifyComplete();
2714    }
2715
2716    @Test
2717    void test_pageImpl() {
2718        List<String> names = List.of("Jack", "Raj", "Edward");
2719        PageRequest pageRequest = PageRequest.of(0, 5);
2720        Mono<PageImpl<String>> pageFlux = Flux.fromIterable(names)
2721                .collectList()
2722                .zipWith(Mono.just(names.size()))
2723                .map(t -> new PageImpl<>(t.getT1(), pageRequest, names.size()));
2724
2725        pageFlux.subscribe(System.out::println);
2726
2727        StepVerifier.create(pageFlux)
2728                .assertNext(e -> {
2729                    assertEquals(e.getNumberOfElements(), 3);
2730                    assertEquals("Jack", e.getContent().get(0));
2731                })
2732                .verifyComplete();
2733    }
2734
2735    @Test
2736    void test_function() {
2737        Function<String, Mono<String>> stringSupplier = p -> Mono.just("hello " + p);
2738        Mono<String> mono = Mono.defer(() -> stringSupplier.apply("jack"));
2739        StepVerifier.create(mono)
2740                .expectNext("hello jack")
2741                .verifyComplete();
2742    }
2743
2744    @SneakyThrows
2745    @Test
2746    void test_blockHound() {
2747        try {
2748            FutureTask<?> task = new FutureTask<>(() -> {
2749                TimeUnit.SECONDS.sleep(2);
2750                return "";
2751            });
2752            Schedulers.parallel().schedule(task);
2753            task.get(10, TimeUnit.SECONDS);
2754            Assertions.fail("should fail");
2755        } catch (Exception e) {
2756            Assertions.assertTrue(e.getCause() instanceof BlockingOperationError);
2757        }
2758    }
2759
2760    @Test
2761    void test_blockHound2() {
2762        Mono<String> mono = Mono.just("apple")
2763                .flatMap(name -> {
2764                    return Mono.just("name")
2765                            .map(n -> {
2766                                HelperUtil.sleep(1);
2767                                return n;
2768                            })
2769                            .subscribeOn(Schedulers.parallel());
2770                });
2771
2772        StepVerifier.create(mono)
2773                .expectError(BlockingOperationError.class);
2774    }
2775
2776}

References

https://projectreactor.io/

comments powered by Disqus