Spring Reactor - Basics
Overview
Reactive programming examples on how to use spring reactor.
Github: https://github.com/gitorko/project83
Spring Reactor
Methods demonstrating how to use spring reactor
1package com.demo.project83;
2
3import static org.assertj.core.api.Assertions.assertThat;
4
5import java.nio.file.Files;
6import java.nio.file.Path;
7import java.time.Duration;
8import java.time.LocalTime;
9import java.util.ArrayList;
10import java.util.Arrays;
11import java.util.Collection;
12import java.util.Date;
13import java.util.HashMap;
14import java.util.List;
15import java.util.Map;
16import java.util.Optional;
17import java.util.Random;
18import java.util.concurrent.Callable;
19import java.util.concurrent.CountDownLatch;
20import java.util.concurrent.TimeUnit;
21import java.util.concurrent.atomic.AtomicLong;
22import java.util.function.Consumer;
23import java.util.function.Function;
24import java.util.function.Supplier;
25import java.util.stream.Stream;
26
27import lombok.SneakyThrows;
28import lombok.extern.slf4j.Slf4j;
29import org.junit.jupiter.api.Test;
30import org.reactivestreams.Subscription;
31import reactor.core.Disposable;
32import reactor.core.Exceptions;
33import reactor.core.publisher.BaseSubscriber;
34import reactor.core.publisher.ConnectableFlux;
35import reactor.core.publisher.Flux;
36import reactor.core.publisher.FluxSink;
37import reactor.core.publisher.Mono;
38import reactor.core.publisher.ParallelFlux;
39import reactor.core.scheduler.Schedulers;
40import reactor.test.StepVerifier;
41import reactor.test.scheduler.VirtualTimeScheduler;
42import reactor.tools.agent.ReactorDebugAgent;
43import reactor.util.function.Tuple2;
44import reactor.util.function.Tuples;
45import reactor.util.retry.Retry;
46import reactor.util.retry.RetryBackoffSpec;
47
48/**
49 * Reactive Streams Specification
50 *
51 * Publisher (Mono/Flux)
52 * - subscribe (data source, db, remote service)
53 * Subscriber
54 * - onSubscribe
55 * - onNext
56 * - onError
57 * - onComplete
58 * Subscription
59 * - request
60 * - cancel
61 * Processor - Publisher + Subscriber
62 *
63 * Spring reactor is a Push + Pull data flow model
64 */
65@Slf4j
66public class ReactorTest {
67
68 /**
69 * ********************************************************************
70 * mono
71 * ********************************************************************
72 */
73 @Test
74 void monoTest() {
75 //justOrEmpty
76 Mono<String> mono1 = Mono.justOrEmpty("apple");
77 mono1.subscribe(System.out::println);
78 StepVerifier.create(mono1)
79 .expectNext("apple")
80 .verifyComplete();
81
82 //Note: Reactive Streams do not accept null values
83 Mono<String> mono2 = Mono.justOrEmpty(null);
84 mono2.subscribe(System.out::println);
85 StepVerifier.create(mono2)
86 .verifyComplete();
87
88 //Default value if empty.
89 Mono<String> mono3 = mono2.defaultIfEmpty("Jill");
90 mono3.subscribe(System.out::println);
91 StepVerifier.create(mono3)
92 .expectNext("Jill")
93 .verifyComplete();
94
95 //Use log to look at transitions.
96 Mono<String> mono4 = Mono.just("apple").log();
97 mono4.subscribe(s -> {
98 log.info("Got: {}", s);
99 });
100 StepVerifier.create(mono4)
101 .expectNext("apple")
102 .verifyComplete();
103 }
104
105 /**
106 * ********************************************************************
107 * flux
108 * ********************************************************************
109 */
110 @Test
111 void fluxTest() {
112 Flux flux = Flux.just("apple", "orange");
113 flux.subscribe(System.out::println);
114 StepVerifier.create(flux)
115 .expectNext("apple", "orange")
116 .verifyComplete();
117 }
118
119 /**
120 * ********************************************************************
121 * flux - Avoid blocking calls that will hold thread
122 * ********************************************************************
123 */
124 @Test
125 void fluxSleepTest() {
126 Flux flux = Flux.just("apple", "orange").map(e -> {
127 log.info("Received: {}", e);
128 //Bad idea to do Thread.sleep or any blocking call.
129 //Instead use delayElements.
130 return e;
131 }).delayElements(Duration.ofSeconds(1));
132 flux.subscribe(System.out::println);
133 StepVerifier.create(flux)
134 .expectNext("apple", "orange")
135 .verifyComplete();
136 }
137
138 /**
139 * ********************************************************************
140 * block
141 * ********************************************************************
142 */
143 @Test
144 public void badTest() {
145 //Never use .block() as it blocks the thread.
146 String name = Mono.just("Jack")
147 .block();
148 System.out.println(name);
149 }
150
151 /**
152 * ********************************************************************
153 * filter - filter out elements that dont meet condition
154 * ********************************************************************
155 */
156 @Test
157 void fluxFilterTest() {
158 //Get even numbers
159 Flux flux = Flux.just(1, 2, 3, 4, 5)
160 .filter(i -> i % 2 == 0);
161 flux.subscribe(System.out::println);
162 StepVerifier.create(flux)
163 .expectNext(2, 4)
164 .verifyComplete();
165 }
166
167 /**
168 * ********************************************************************
169 * flux from array, list, stream
170 * ********************************************************************
171 */
172 @Test
173 public void fluxArrayTest() {
174 Integer[] arr = {2, 5, 7, 8};
175 Flux<Integer> flux1 = Flux.fromArray(arr);
176 flux1.subscribe(System.out::println);
177 StepVerifier.create(flux1)
178 .expectNext(2, 5, 7, 8)
179 .verifyComplete();
180
181 List<String> fruitsList = Arrays.asList("apple", "oranges", "grapes");
182 Flux<String> fruits = Flux.fromIterable(fruitsList);
183 StepVerifier.create(fruits)
184 .expectNext("apple", "oranges", "grapes")
185 .verifyComplete();
186
187 Stream<Integer> stream = List.of(1, 2, 3, 4, 5).stream();
188 Flux<Integer> flux2 = Flux.fromStream(() -> stream);
189 StepVerifier.create(flux2)
190 .expectNext(1, 2, 3, 4, 5)
191 .verifyComplete();
192
193 Flux<Integer> flux3 = Flux.fromIterable(List.of(1, 2, 3, 4, 5));
194 StepVerifier.create(flux3)
195 .expectNext(1, 2, 3, 4, 5)
196 .verifyComplete();
197 }
198
199 /**
200 * ********************************************************************
201 * flux range
202 * ********************************************************************
203 */
204 @Test
205 public void fluxRangeTest() {
206 Flux<Integer> numbers = Flux.range(1, 5);
207 numbers.subscribe(n -> {
208 log.info("number: {}", n);
209 });
210 StepVerifier.create(numbers)
211 .expectNext(1, 2, 3, 4, 5)
212 .verifyComplete();
213 }
214
215 /**
216 * ********************************************************************
217 * map - synchronous by nature
218 * ********************************************************************
219 */
220 @Test
221 public void fluxMapTest() {
222 Flux<Integer> flux1 = Flux.just("apple", "orange")
223 .log()
224 .map(i -> i.length());
225 StepVerifier
226 .create(flux1)
227 .expectNext(5, 6)
228 .verifyComplete();
229
230 Flux<Integer> flux2 = Flux.range(3, 2)
231 .map(i -> i + 100);
232 flux2.subscribe(System.out::println);
233 StepVerifier.create(flux2)
234 .expectNext(103, 104)
235 .verifyComplete();
236 }
237
238 /**
239 * ********************************************************************
240 * flatMap - transform object 1-1 or 1-N in asyncronous fashion, returns back Mono/Flux.
241 * map - transform an object 1-1 in fixed time in synchronous fashion
242 * ********************************************************************
243 */
244 @Test
245 void flatMapTest() {
246 Flux flux1 = Flux.just("apple", "orange")
247 .flatMap(ReactorTest::capitalizeReactive);
248 flux1.subscribe(System.out::println);
249 StepVerifier.create(flux1)
250 .expectSubscription()
251 .expectNext("APPLE")
252 .expectNext("ORANGE")
253 .verifyComplete();
254
255 //capitalize will happen in blocking fashion. If this function takes long or does I/O then it will be blocking
256 //Use map only when there is no IO involved in the function
257 Flux flux2 = Flux.just("apple", "orange")
258 .map(ReactorTest::capitalize);
259 flux2.subscribe(System.out::println);
260 StepVerifier.create(flux2)
261 .expectSubscription()
262 .expectNext("APPLE")
263 .expectNext("ORANGE")
264 .verifyComplete();
265 }
266
267 private static Mono<String> capitalizeReactive(String user) {
268 return Mono.just(user.toUpperCase());
269 }
270
271 private static String capitalize(String user) {
272 return user.toUpperCase();
273 }
274
275 /**
276 * ********************************************************************
277 * flatMap - object modification
278 * ********************************************************************
279 */
280 @Test
281 void objectModificationTest() {
282 //Modification of object in chain - done via flatMap
283 //Ideally create a new object instead of modifying the existing object.
284 Mono<String> mono1 = Mono.just("Apple")
285 .flatMap(ReactorTest::appendGreet);
286 StepVerifier.create(mono1)
287 .expectNext("Hello Apple")
288 .verifyComplete();
289
290 //Modification of object in chain - done via zipWith
291 //The 2nd argument for zipWith is a combinator function that determines how the 2 mono are zipped
292 Mono<String> mono2 = Mono.just("Apple")
293 .zipWith(Mono.just("Hello "), ReactorTest::getGreet);
294 StepVerifier.create(mono2)
295 .expectNext("Hello Apple")
296 .verifyComplete();
297 }
298
299 private static Mono<String> appendGreet(String name) {
300 return Mono.just("Hello " + name);
301 }
302
303 private static String getGreet(String name, String greet) {
304 return greet + name;
305 }
306
307 /**
308 * ********************************************************************
309 * flatMap - Find all distinct chars in the list of names, convert to uppercase
310 * ********************************************************************
311 */
312 @Test
313 void flatMapTest2() {
314 Flux<String> flux = Flux.fromIterable(List.of("Jack", "Joe", "Jill"))
315 .map(String::toUpperCase)
316 .flatMap(s -> splitString(s))
317 .distinct();
318 flux.subscribe(System.out::println);
319 StepVerifier.create(flux)
320 .expectNext("J", "A", "C", "K", "O", "E", "I", "L")
321 .verifyComplete();
322
323 }
324
325 private Flux<String> splitString(String name) {
326 return Flux.fromArray(name.split(""));
327 }
328
329 /**
330 * ********************************************************************
331 * concatMap - Same as flatMap but order is preserved, concatMap takes more time but ordering is preserved.
332 * flatMap - Takes less time but ordering is lost.
333 * ********************************************************************
334 */
335 @Test
336 @SneakyThrows
337 void concatMapTest() {
338 Flux<String> flux = Flux.fromIterable(List.of("Jack", "Joe", "Jill"))
339 .map(String::toUpperCase)
340 .concatMap(s -> splitStringAsync(s))
341 .distinct();
342
343 StepVerifier.create(flux)
344 .expectNext("J", "A", "C", "K", "O", "E", "I", "L")
345 .verifyComplete();
346 }
347
348 private Flux<String> splitStringAsync(String name) {
349 return Flux.fromArray(name.split(""))
350 .delayElements(Duration.ofMillis(new Random().nextInt(1000)));
351 }
352
353 /**
354 * ********************************************************************
355 * flatMapMany - similar to flatMap, but function returns flux
356 * ********************************************************************
357 */
358 @Test
359 void flatMapManyTest() {
360
361 Flux flux1 = Mono.just("Jack")
362 .flatMapMany(ReactorTest::capitalizeSplit);
363 flux1.subscribe(System.out::println);
364 StepVerifier.create(flux1)
365 .expectSubscription()
366 .expectNext("J", "A", "C", "K")
367 .verifyComplete();
368
369 Flux<String> flux2 = Mono.just("the quick brown fox jumps over the lazy dog")
370 .flatMapMany(ReactorTest::capitalizeSplit)
371 .distinct()
372 .sort();
373 flux2.subscribe(System.out::println);
374 //26 letters in the alphabet
375 StepVerifier.create(flux2)
376 .expectNext("A", "B", "C")
377 .expectComplete();
378
379 Mono<List<Integer>> mono = Mono.just(Arrays.asList(1, 2, 3));
380 Flux<Integer> flux3 = mono.flatMapMany(it -> Flux.fromIterable(it));
381 flux3.subscribe(System.out::println);
382 StepVerifier
383 .create(flux3)
384 .expectNext(1, 2, 3)
385 .verifyComplete();
386 }
387
388 private static Flux<String> capitalizeSplit(String user) {
389 return Flux.fromArray(user.toUpperCase().split(""));
390 }
391
392 /**
393 * ********************************************************************
394 * flatMapIterable - convert mono of list to flux
395 * ********************************************************************
396 */
397 @Test
398 void flatMapIterableTest() {
399 //Converts Mono of list to Flux.
400 Mono<List<Integer>> mono = Mono.just(Arrays.asList(1, 2, 3));
401 Flux<Integer> flux1 = mono.flatMapIterable(list -> list);
402 flux1.subscribe(System.out::println);
403 StepVerifier
404 .create(flux1)
405 .expectNext(1, 2, 3)
406 .verifyComplete();
407 }
408
409 /**
410 * ********************************************************************
411 * transform - accepts a Function functional interface. Used when similar transform is used in many places
412 * input is flux/mono
413 * output is flux/mono
414 * takes a flux/mono and returns a flux/mono
415 * ********************************************************************
416 */
417 @Test
418 void transformTest() {
419 //Function defines input and output
420 Function<Flux<String>, Flux<String>> filterMap = name -> name.map(String::toUpperCase);
421 Flux<String> flux1 = Flux.fromIterable(List.of("Jack", "Joe", "Jill"))
422 .transform(filterMap)
423 .filter(s -> s.length() > 3);
424 flux1.subscribe(System.out::println);
425 StepVerifier
426 .create(flux1)
427 .expectNext("JACK", "JILL")
428 .verifyComplete();
429 }
430
431 /**
432 * ********************************************************************
433 * switchIfEmpty - similar to defaultIfEmpty but return flux/mono
434 * defaultIfEmpty - return a fixed value.
435 * ********************************************************************
436 */
437 @Test
438 @SneakyThrows
439 void switchTest() {
440 Flux<Object> flux1 = emptyFlux()
441 .switchIfEmpty(Flux.just("empty!"))
442 .log();
443 StepVerifier.create(flux1)
444 .expectSubscription()
445 .expectNext("empty!")
446 .expectComplete()
447 .verify();
448
449 Flux<Object> flux2 = emptyFlux()
450 .defaultIfEmpty("empty!")
451 .log();
452 StepVerifier.create(flux2)
453 .expectSubscription()
454 .expectNext("empty!")
455 .expectComplete()
456 .verify();
457
458
459 }
460
461 private Flux<Object> emptyFlux() {
462 return Flux.empty();
463 }
464
465 /**
466 * ********************************************************************
467 * switchIfEmpty with Optional
468 * ********************************************************************
469 */
470 @Test
471 public void switchOptionalTest() {
472
473 Mono<String> mono1 = getHello1().map(e -> {
474 return e.get().toUpperCase();
475 }).switchIfEmpty(Mono.just("empty!"));
476 StepVerifier.create(mono1)
477 .expectNext("HELLO")
478 .expectComplete()
479 .verify();
480
481 Mono<String> mono2 = getHello2().map(e -> {
482 return e.get().toUpperCase();
483 }).switchIfEmpty(Mono.just("empty!"));
484 StepVerifier.create(mono2)
485 .expectNext("empty!")
486 .expectComplete()
487 .verify();
488 }
489
490 private Mono<Optional<String>> getHello1() {
491 return Mono.just(Optional.of("hello"));
492 }
493
494 //Optional evaluated to object or empty
495 private Mono<Optional<String>> getHello2() {
496 return Mono.justOrEmpty(Optional.empty());
497 }
498
499 /**
500 * ********************************************************************
501 * switchIfEmpty - Used as if-else block
502 * ********************************************************************
503 */
504 @Test
505 void switchIfElseTest() {
506 final String name = "Jack";
507 //No need to use Mono.defer on the switchIfEmpty
508 Mono<String> mono = Mono.just(name)
509 .flatMap(this::wishGoodMorning)
510 .switchIfEmpty(this.wishGoodNight(name));
511
512 StepVerifier.create(mono)
513 .expectNext("GOOD NIGHT JACK")
514 .verifyComplete();
515 }
516
517 private Mono<String> wishGoodMorning(String name) {
518 log.info("wishGoodMorning {}", name);
519 if (name.equals("Jack")) {
520 return Mono.empty();
521 } else {
522 return Mono.just("Good Morning " + name);
523 }
524 }
525
526 private Mono<String> wishGoodNight(String name) {
527 log.info("wishGoodNight {}", name);
528 return Mono.just("Good Night " + name)
529 .map(String::toUpperCase);
530 }
531
532 /**
533 * ********************************************************************
534 * intersect with filterWhen - compare 2 flux for common elements
535 * ********************************************************************
536 */
537 @Test
538 void fluxIntersectCommonTest() {
539 Flux<String> flux1 = Flux.just("apple", "orange", "banana").log();
540 //Without cache on flux2 it will subscribe many times.
541 Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").log().cache();
542
543 Flux<String> commonFlux = flux1.filterWhen(f -> ReactorTest.checkList1(flux2, f));
544 commonFlux.subscribe(System.out::println);
545 StepVerifier.create(commonFlux)
546 .expectNext("apple", "orange")
547 .verifyComplete();
548 }
549
550 private static Mono<Boolean> checkList1(Flux<String> flux, String fruit) {
551 //toStream will block so should be avoided. Look at ReactorObjectTest for better approach.
552 return Mono.just(flux.toStream().anyMatch(e -> e.equals(fruit)));
553 }
554
555 /**
556 * ********************************************************************
557 * intersect with filter - compare 2 flux for common elements
558 * ********************************************************************
559 */
560 @Test
561 void fluxIntersectCommon2Test() {
562 Flux<String> flux1 = Flux.just("apple", "orange", "banana").log();
563 //Without cache on flux2 it will subscribe many times.
564 Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").log().cache();
565 Flux<String> commonFlux = flux1.filter(f -> {
566 //toStream will block so should be avoided. Look at ReactorObjectTest for better approach.
567 return flux2.toStream().anyMatch(e -> e.equals(f));
568 });
569 commonFlux.subscribe(System.out::println);
570 StepVerifier.create(commonFlux)
571 .expectNext("apple", "orange")
572 .verifyComplete();
573 }
574
575 /**
576 * ********************************************************************
577 * intersect with filterWhen - compare 2 flux for diff
578 * ********************************************************************
579 */
580 @Test
581 void fluxIntersectDiffTest() {
582 Flux<String> flux1 = Flux.just("apple", "orange", "banana").log();
583 //Without cache on flux2 it will subscribe many times.
584 Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").log().cache();
585
586 Flux<String> diffFlux = flux1.filterWhen(f -> ReactorTest.checkList2(flux2, f));
587 diffFlux.subscribe(System.out::println);
588 StepVerifier.create(diffFlux)
589 .expectNext("banana")
590 .verifyComplete();
591 }
592
593 private static Mono<Boolean> checkList2(Flux<String> flux, String fruit) {
594 //toStream will block so should be avoided. Look at ReactorObjectTest for better approach.
595 return Mono.just(flux.toStream().anyMatch(e -> e.equals(fruit))).map(hasElement -> !hasElement);
596 }
597
598 /**
599 * ********************************************************************
600 * intersect with filter - compare 2 flux for diff
601 * ********************************************************************
602 */
603 @Test
604 void fluxIntersectDiff2Test() {
605 Flux<String> flux1 = Flux.just("apple", "orange", "banana").log();
606 //Without cache on flux2 it will subscribe many times.
607 Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").log().cache();
608 Flux<String> commonFlux = flux1.filter(f -> {
609 //toStream will block so should be avoided. Look at ReactorObjectTest for better approach.
610 return !flux2.toStream().anyMatch(e -> e.equals(f));
611 });
612 commonFlux.subscribe(System.out::println);
613 StepVerifier.create(commonFlux)
614 .expectNext("banana")
615 .verifyComplete();
616 }
617
618 /**
619 * ********************************************************************
620 * startWith - add new element to flux.
621 * ********************************************************************
622 */
623 @Test
624 public void startWithTest() {
625 Flux<Integer> flux = Flux.range(1, 3);
626 Flux<Integer> integerFlux = flux.startWith(0);
627 StepVerifier.create(integerFlux)
628 .expectNext(0, 1, 2, 3)
629 .verifyComplete();
630 }
631
632 /**
633 * ********************************************************************
634 * index
635 * ********************************************************************
636 */
637 @Test
638 void fluxIndexTest() {
639 //append a number to each element.
640 Flux<Tuple2<Long, String>> index = Flux
641 .just("apple", "banana", "orange")
642 .index();
643 StepVerifier.create(index)
644 .expectNext(Tuples.of(0L, "apple"))
645 .expectNext(Tuples.of(1L, "banana"))
646 .expectNext(Tuples.of(2L, "orange"))
647 .verifyComplete();
648 }
649
650 /**
651 * ********************************************************************
652 * takeWhile & skipWhile
653 * ********************************************************************
654 */
655 @Test
656 void takeWhileTest() {
657 Flux<Integer> numbersFlux = Flux.range(1, 10);
658 Flux<Integer> takeWhile = numbersFlux.takeWhile(i -> i <= 5);
659 StepVerifier
660 .create(takeWhile)
661 .expectNext(1, 2, 3, 4, 5)
662 .verifyComplete();
663
664 Flux<Integer> skipWhile = numbersFlux.skipWhile(i -> i <= 5);
665 StepVerifier
666 .create(skipWhile)
667 .expectNext(6, 7, 8, 9, 10)
668 .verifyComplete();
669 }
670
671 /**
672 * ********************************************************************
673 * collectList & collectSortedList- flux to mono of list
674 * ********************************************************************
675 */
676 @Test
677 void collectListTest() {
678 Flux<String> flux = Flux.just("apple", "orange");
679 Mono<List<String>> mono = flux.collectList();
680 mono.subscribe(System.out::println);
681 StepVerifier.create(mono)
682 .expectNext(Arrays.asList("apple", "orange"))
683 .verifyComplete();
684
685 Mono<List<Integer>> listMono1 = Flux
686 .just(1, 2, 3)
687 .collectList();
688 StepVerifier.create(listMono1)
689 .expectNext(Arrays.asList(1, 2, 3))
690 .verifyComplete();
691
692 Mono<List<Integer>> listMono2 = Flux
693 .just(5, 2, 4, 1, 3)
694 .collectSortedList();
695 StepVerifier.create(listMono2)
696 .expectNext(Arrays.asList(1, 2, 3, 4, 5))
697 .verifyComplete();
698 }
699
700 /**
701 * ********************************************************************
702 * collectList
703 * ********************************************************************
704 */
705 @Test
706 void collectListTest2() {
707 Mono<List<String>> monoList1 = Flux.just("banana", "apple")
708 .collectList();
709 StepVerifier.create(monoList1)
710 .expectNext(Arrays.asList("banana", "apple"))
711 .verifyComplete();
712
713 Mono<List<String>> monoList2 = Flux.just("banana", "apple")
714 .collectSortedList();
715 StepVerifier.create(monoList2)
716 .expectNext(Arrays.asList("apple", "banana"))
717 .verifyComplete();
718
719 //Dont use infinite flux, will never return.
720 //Flux.interval(Duration.ofMillis(1000)).collectList().subscribe();
721
722 List<String> list3 = new ArrayList<>();
723 monoList1.subscribe(list3::addAll);
724 list3.forEach(System.out::println);
725 }
726
727 /**
728 * ********************************************************************
729 * collectMap
730 * ********************************************************************
731 */
732 @Test
733 void collectMapTest() {
734 Mono<Map<Object, Object>> flux = Flux.just(
735 "yellow:banana",
736 "red:apple").collectMap(item -> item.split(":")[0], item -> item.split(":")[1]);
737
738 Map<Object, Object> map1 = new HashMap<>();
739 flux.subscribe(map1::putAll);
740 map1.forEach((key, value) -> System.out.println(key + " -> " + value));
741
742 StepVerifier.create(flux)
743 .expectNext(Map.of("yellow", "banana", "red", "apple"))
744 .verifyComplete();
745 }
746
747 /**
748 * ********************************************************************
749 * collectMultimap
750 * ********************************************************************
751 */
752 @Test
753 void collectMultimapTest() {
754 Mono<Map<String, Collection<Object>>> flux = Flux.just("yellow:banana", "red:grapes", "red:apple", "yellow:pineapple")
755 .collectMultimap(
756 item -> item.split(":")[0],
757 item -> item.split(":")[1]);
758 Map<Object, Collection<Object>> map1 = new HashMap<>();
759 flux.subscribe(map1::putAll);
760 map1.forEach((key, value) -> System.out.println(key + " -> " + value));
761
762 StepVerifier.create(flux)
763 .expectNext(Map.of("red", List.of("grapes", "apple"), "yellow", List.of("banana", "pineapple")))
764 .verifyComplete();
765 }
766
767 /**
768 * ********************************************************************
769 * concat - subscribes to publishers in sequence, order guaranteed, static function
770 * ********************************************************************
771 */
772 @Test
773 @SneakyThrows
774 void concatTest() {
775 Flux<String> flux1 = Flux.just("a", "b");
776 Flux<String> flux2 = Flux.just("c", "d");
777 Flux<String> flux3 = Flux.concat(flux1, flux2);
778
779 StepVerifier.create(flux3)
780 .expectSubscription()
781 .expectNext("a", "b", "c", "d")
782 .verifyComplete();
783
784 Flux<String> flux4 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
785 Flux<String> flux5 = Flux.just("c", "d");
786 //Lazy will wait till first flux finishes.
787 Flux<String> flux6 = Flux.concat(flux1, flux2).log();
788
789 StepVerifier.create(flux6)
790 .expectSubscription()
791 .expectNext("a", "b", "c", "d")
792 .verifyComplete();
793 }
794
795 /**
796 * ********************************************************************
797 * concatWith - subscribes to publishers in sequence, order guaranteed, instance function
798 * ********************************************************************
799 */
800 @Test
801 @SneakyThrows
802 void concatWithTest() {
803 Flux<String> flux1 = Flux.just("a", "b");
804 Flux<String> flux2 = Flux.just("c", "d");
805 Flux<String> flux3 = flux1.concatWith(flux2).log();
806 StepVerifier.create(flux3)
807 .expectSubscription()
808 .expectNext("a", "b", "c", "d")
809 .verifyComplete();
810
811 Mono<String> aFlux = Mono.just("a");
812 Mono<String> bFlux = Mono.just("b");
813 Flux<String> stringFlux = aFlux.concatWith(bFlux);
814 stringFlux.subscribe(System.out::println);
815 StepVerifier.create(stringFlux)
816 .expectNext("a", "b")
817 .verifyComplete();
818 }
819
820 @Test
821 void concatDelayErrorTest() {
822 Flux<String> flux1 = Flux.just("a", "b").map(s -> {
823 if (s.equals("b")) {
824 throw new IllegalArgumentException("error!");
825 }
826 return s;
827 });
828 Flux<String> flux2 = Flux.just("c", "d");
829 Flux<String> flux = Flux.concatDelayError(flux1, flux2)
830 .log();
831 StepVerifier.create(flux)
832 .expectSubscription()
833 .expectNext("a", "c", "d")
834 .expectError()
835 .verify();
836 }
837
838 /**
839 * ********************************************************************
840 * merge - subscribes to publishers eagerly, order not guaranteed, static function
841 * ********************************************************************
842 */
843 @Test
844 @SneakyThrows
845 void mergeTest() {
846 Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
847 Flux<String> flux2 = Flux.just("c", "d");
848 //Eager will not wait till first flux finishes.
849 Flux<String> flux = Flux.merge(flux1, flux2)
850 .log();
851 StepVerifier.create(flux)
852 .expectSubscription()
853 .expectNext("c", "d", "a", "b")
854 .verifyComplete();
855 }
856
857 /**
858 * ********************************************************************
859 * mergeWith - subscribes to publishers in eagerly, order not guaranteed, instance function
860 * ********************************************************************
861 */
862 @Test
863 @SneakyThrows
864 void mergeWithTest() {
865 Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
866 Flux<String> flux2 = Flux.just("c", "d");
867 //Eager will not wait till first flux finishes.
868 Flux<String> flux3 = flux1.mergeWith(flux2).log();
869 StepVerifier.create(flux3)
870 .expectSubscription()
871 .expectNext("c", "d", "a", "b")
872 .verifyComplete();
873
874 Mono aMono = Mono.just("a");
875 Mono bMono = Mono.just("b");
876 Flux flux4 = aMono.mergeWith(bMono);
877 StepVerifier.create(flux4)
878 .expectNext("a", "b")
879 .verifyComplete();
880 }
881
882 /**
883 * ********************************************************************
884 * mergeSequential - subscribes to publishers eagerly, result is sequential.
885 * concat - subscribes to publishers in sequence, result is sequential.
886 * ********************************************************************
887 */
888 @Test
889 @SneakyThrows
890 void mergeSequentialTest() {
891 Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
892 Flux<String> flux2 = Flux.just("c", "d");
893 Flux<String> flux = Flux.mergeSequential(flux1, flux2, flux1)
894 .log();
895 StepVerifier.create(flux)
896 .expectSubscription()
897 .expectNext("a", "b", "c", "d", "a", "b")
898 .verifyComplete();
899 }
900
901 @Test
902 void mergeDelayTest() {
903 Flux<String> flux1 = Flux.just("a", "b").map(s -> {
904 if (s.equals("b")) {
905 throw new IllegalArgumentException("error!");
906 }
907 return s;
908 }).doOnError(e -> log.error("Error: {}", e));
909
910 Flux<String> flux2 = Flux.just("c", "d");
911 Flux<String> flux = Flux.mergeDelayError(1, flux1, flux2, flux1)
912 .log();
913 StepVerifier.create(flux)
914 .expectSubscription()
915 .expectNext("a", "c", "d", "a")
916 .expectError()
917 .verify();
918 }
919
920 /**
921 * ********************************************************************
922 * zip - subscribes to publishers in eagerly, waits for both flux to emit one element. 2-8 flux can be zipped, returns a tuple, Static function
923 * ********************************************************************
924 */
925 @Test
926 void fluxZipTest() {
927 Flux<String> flux1 = Flux.just("red", "yellow");
928 Flux<String> flux2 = Flux.just("apple", "banana");
929 Flux<String> flux3 = Flux.zip(flux1, flux2)
930 .map(tuple -> {
931 return (tuple.getT1() + " " + tuple.getT2());
932 });
933 flux3.subscribe(System.out::println);
934 StepVerifier.create(flux3)
935 .expectNext("red apple")
936 .expectNext("yellow banana")
937 .verifyComplete();
938
939 //Third argument is combinator lambda
940 Flux<Integer> firstFlux = Flux.just(1, 2, 3);
941 Flux<Integer> secondFlux = Flux.just(10, 20, 30, 40);
942 //Define how the zip should happen
943 Flux<Integer> zip = Flux.zip(firstFlux, secondFlux, (num1, num2) -> num1 + num2);
944 StepVerifier
945 .create(zip)
946 .expectNext(11, 22, 33)
947 .verifyComplete();
948 }
949
950 /**
951 * ********************************************************************
952 * zipWith - subscribes to publishers in eagerly, waits for both flux to emit one element. 2-8 flux can be zipped, returns a tuple, Instance function
953 * ********************************************************************
954 */
955 @Test
956 void fluxZipWithTest() {
957 Flux<String> flux1 = Flux.just("red", "yellow");
958 Flux<String> flux2 = Flux.just("apple", "banana");
959 Flux<String> flux3 = flux1.zipWith(flux2)
960 .map(tuple -> {
961 return (tuple.getT1() + " " + tuple.getT2());
962 });
963 StepVerifier.create(flux3)
964 .expectNext("red apple")
965 .expectNext("yellow banana")
966 .verifyComplete();
967
968 Flux<String> flux4 = Flux.fromIterable(Arrays.asList("apple", "orange", "banana"))
969 .zipWith(Flux.range(1, 5), (word, line) -> {
970 return line + ". " + word;
971 });
972 StepVerifier.create(flux4)
973 .expectNext("1. apple")
974 .expectNext("2. orange")
975 .expectNext("3. banana")
976 .verifyComplete();
977 }
978
979 /**
980 * ********************************************************************
981 * Error Recover Handling
982 * onErrorReturn - Return value on error
983 * ********************************************************************
984 */
985 @Test
986 void onErrorReturnTest() {
987 Mono<Object> mono1 = Mono.error(new RuntimeException("My Error"))
988 .onErrorReturn("Jack");
989 StepVerifier.create(mono1)
990 .expectNext("Jack")
991 .verifyComplete();
992 }
993
994 /**
995 * ********************************************************************
996 * Error Recover Handling
997 * onErrorResume - Resume chain with new mono/flux.
998 * ********************************************************************
999 */
1000 @Test
1001 void onErrorResumeTest() {
1002 Mono<Object> mono1 = Mono.error(new RuntimeException("My Error"))
1003 .onErrorResume(e -> Mono.just("Jack"));
1004 StepVerifier.create(mono1)
1005 .expectNext("Jack")
1006 .verifyComplete();
1007
1008 Mono<Object> mono2 = Mono.error(new RuntimeException("My Error"))
1009 .onErrorResume(s -> {
1010 log.info("Inside on onErrorResume");
1011 return Mono.just("Jack");
1012 })
1013 .log();
1014 StepVerifier.create(mono2)
1015 .expectNext("Jack")
1016 .verifyComplete();
1017 }
1018
1019 /**
1020 * ********************************************************************
1021 * Error Recover Handling
1022 * onErrorContinue - Continue chain even if error occurs
1023 * ********************************************************************
1024 */
1025 @Test
1026 void onErrorContinueTest() {
1027 Flux<String> flux1 =
1028 Flux.just("a", "b", "c")
1029 .map(e -> {
1030 if (e.equals("b"))
1031 throw new RuntimeException("My Error!");
1032 return e;
1033 })
1034 .concatWith(Mono.just("d"))
1035 .onErrorContinue((ex, value) -> {
1036 log.info("Exception: {}", ex);
1037 log.info("value: {}", value);
1038 })
1039 .log();
1040 StepVerifier.create(flux1)
1041 .expectNext("a", "c", "d")
1042 .verifyComplete();
1043 }
1044
1045 /**
1046 * ********************************************************************
1047 * Error - Action
1048 * doOnError - log the error, Side-effect operator.
1049 * ********************************************************************
1050 */
1051 @Test
1052 void doOnErrorTest() {
1053 Mono<Object> mono1 = Mono.error(new RuntimeException("My Error"))
1054 .doOnError(e -> log.error("Error: {}", e.getMessage()))
1055 .log();
1056 StepVerifier.create(mono1)
1057 .expectError(RuntimeException.class)
1058 .verify();
1059 }
1060
1061 /**
1062 * ********************************************************************
1063 * Error - Action
1064 * onErrorMap - Transform an error emitted
1065 * ********************************************************************
1066 */
1067 @Test
1068 void onErrorMapTest() {
1069 Flux flux1 = Flux.just("Jack", "Jill").map(u -> {
1070 if (u.equals("Jill")) {
1071 //always do throw here, never do return.
1072 throw new IllegalArgumentException("Not valid");
1073 }
1074 if (u.equals("Jack")) {
1075 throw new ClassCastException("Not valid");
1076 }
1077 return u;
1078 }).onErrorMap(IllegalArgumentException.class, e -> {
1079 log.info("Illegal Arg error");
1080 throw new RuntimeException("Illegal Arg error!");
1081 }).onErrorMap(ClassCastException.class, e -> {
1082 log.info("Class cast error");
1083 throw new RuntimeException("Class cast error!");
1084 });
1085
1086 StepVerifier.create(flux1)
1087 .expectErrorMessage("Class cast error!")
1088 .verify();
1089 }
1090
1091 /**
1092 * ********************************************************************
1093 * retry
1094 * ********************************************************************
1095 */
1096 @Test
1097 void retryTest() {
1098 Mono<String> mono = Mono.just("Jack")
1099 .flatMap(this::twoAttemptFunction)
1100 .retry(3);
1101 StepVerifier.create(mono)
1102 .assertNext(e -> {
1103 assertThat(e).isEqualTo("Hello Jack");
1104 })
1105 .verifyComplete();
1106 }
1107
1108 AtomicLong attemptCounter1 = new AtomicLong();
1109
1110 private Mono<String> twoAttemptFunction(String name) {
1111 Long attempt = attemptCounter1.getAndIncrement();
1112 log.info("attempt value: {}", attempt);
1113 if (attempt < 2) {
1114 throw new RuntimeException("FAILURE");
1115 }
1116 return Mono.just("Hello " + name);
1117 }
1118
1119 /**
1120 * ********************************************************************
1121 * retryWhen
1122 * ********************************************************************
1123 */
1124 @Test
1125 void retryWhenTest() {
1126 attemptCounter2 = new AtomicLong();
1127 RetryBackoffSpec retryFilter1 = Retry.backoff(3, Duration.ofSeconds(1))
1128 .filter(throwable -> throwable instanceof RuntimeException);
1129
1130 Mono<String> mono1 = Mono.just("Jack")
1131 .flatMap(this::greetAfter2Failure)
1132 .retryWhen(retryFilter1);
1133 StepVerifier.create(mono1)
1134 .assertNext(e -> {
1135 assertThat(e).isEqualTo("Hello Jack");
1136 })
1137 .verifyComplete();
1138
1139 attemptCounter2 = new AtomicLong();
1140 RetryBackoffSpec retryFilter2 = Retry.fixedDelay(1, Duration.ofSeconds(1))
1141 .filter(throwable -> throwable instanceof RuntimeException)
1142 .onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) ->
1143 Exceptions.propagate(retrySignal.failure())
1144 ));
1145 Mono<String> mono2 = Mono.just("Jack")
1146 .flatMap(this::greetAfter2Failure)
1147 .retryWhen(retryFilter2);
1148 StepVerifier.create(mono2)
1149 .expectErrorMessage("FAILURE")
1150 .verify();
1151 }
1152
1153 AtomicLong attemptCounter2;
1154
1155 private Mono<String> greetAfter2Failure(String name) {
1156 Long attempt = attemptCounter2.getAndIncrement();
1157 log.info("attempt value: {}", attempt);
1158 if (attempt < 2) {
1159 throw new RuntimeException("FAILURE");
1160 }
1161 return Mono.just("Hello " + name);
1162 }
1163
1164 /**
1165 * ********************************************************************
1166 * repeat - repeat an operation n times.
1167 * ********************************************************************
1168 */
1169 @Test
1170 void repeatTest() {
1171 Mono<List<String>> flux = getDate()
1172 .repeat(5)
1173 .collectList();
1174 StepVerifier.create(flux)
1175 .assertNext(e -> {
1176 assertThat(e.size()).isEqualTo(6);
1177 })
1178 .verifyComplete();
1179 }
1180
1181 private Mono<String> getDate() {
1182 return Mono.just("Time " + new Date());
1183 }
1184
1185 /**
1186 * ********************************************************************
1187 * doOn - doOnSubscribe, doOnNext, doOnError, doFinally, doOnComplete
1188 * ********************************************************************
1189 */
1190 @Test
1191 void doOnTest1() {
1192 Flux<Integer> numFlux = Flux.range(1, 5)
1193 .log()
1194 .map(i -> {
1195 if (i == 4) {
1196 throw new RuntimeException("Num Error!");
1197 }
1198 return i;
1199 });
1200 numFlux.subscribe(s -> {
1201 log.info("Number: {}", s);
1202 },
1203 Throwable::printStackTrace,
1204 () -> {
1205 log.info("Done!");
1206 });
1207 StepVerifier.create(numFlux)
1208 .expectNext(1, 2, 3)
1209 .expectError(RuntimeException.class)
1210 .verify();
1211 }
1212
1213 @Test
1214 void doOnTest2() {
1215 Flux<Object> flux = Flux.error(new RuntimeException("My Error"))
1216 .doOnSubscribe(s -> System.out.println("Subscribed!"))
1217 .doOnNext(p -> System.out.println("Next!"))
1218 .doOnComplete(() -> System.out.println("Completed!"))
1219 .doFinally((e) -> System.out.println("Signal: " + e))
1220 .doOnError((e) -> System.out.println("Error: " + e));
1221
1222 StepVerifier.create(flux)
1223 .expectError(RuntimeException.class)
1224 .verify();
1225
1226 StepVerifier.create(flux)
1227 .verifyError(RuntimeException.class);
1228 }
1229
1230 @Test
1231 void doOnTest3() {
1232 Flux flux = Flux.error(new RuntimeException("My Error"));
1233 flux.subscribe(
1234 onNext(),
1235 onError(),
1236 onComplete()
1237 );
1238 }
1239
1240 private static Consumer<Object> onNext() {
1241 return o -> System.out.println("Received : " + o);
1242 }
1243
1244 private static Consumer<Throwable> onError() {
1245 return e -> System.out.println("ERROR : " + e.getMessage());
1246 }
1247
1248 private static Runnable onComplete() {
1249 return () -> System.out.println("Completed");
1250 }
1251
1252 /**
1253 * ********************************************************************
1254 * StepVerifier - assertNext, thenRequest, thenCancel, expectError, expectErrorMessage
1255 * ********************************************************************
1256 */
1257 @Test
1258 void fluxStepVerifyTest() {
1259 Flux flux = Flux.fromIterable(Arrays.asList("Jack", "Jill"));
1260 StepVerifier.create(flux)
1261 .expectNextMatches(user -> user.equals("Jack"))
1262 .assertNext(user -> assertThat(user).isEqualTo("Jill"))
1263 .verifyComplete();
1264
1265 //Wait for 2 elements.
1266 StepVerifier.create(flux)
1267 .expectNextCount(2)
1268 .verifyComplete();
1269
1270 //Request 1 value at a time, get 2 values then cancel.
1271 Flux flux2 = Flux.fromIterable(Arrays.asList("Jack", "Jill", "Raj"));
1272 StepVerifier.create(flux2, 1)
1273 .expectNext("JACK")
1274 .thenRequest(1)
1275 .expectNext("JILL")
1276 .thenCancel();
1277
1278 Mono<Object> mono1 = Mono.error(new RuntimeException("My Error"));
1279 StepVerifier.create(mono1)
1280 .expectError(RuntimeException.class)
1281 .verify();
1282 StepVerifier.create(mono1)
1283 .expectErrorMessage("My Error")
1284 .verify();
1285 }
1286
1287 /**
1288 * ********************************************************************
1289 * flux error propagate
1290 * ********************************************************************
1291 */
1292 @Test
1293 void errorPropagateTest() {
1294 Flux flux1 = Flux.just("Jack", "Jill").map(u -> {
1295 try {
1296 return ReactorTest.checkName(u);
1297 } catch (CustomException e) {
1298 throw Exceptions.propagate(e);
1299 }
1300 });
1301 flux1.subscribe(System.out::println);
1302 StepVerifier.create(flux1)
1303 .expectNext("JACK")
1304 .verifyError(CustomException.class);
1305 }
1306
1307 private static String checkName(String name) throws CustomException {
1308 if (name.equals("Jill")) {
1309 throw new CustomException();
1310 }
1311 return name.toUpperCase();
1312 }
1313
1314 protected static final class CustomException extends Exception {
1315 private static final long serialVersionUID = 0L;
1316 }
1317
1318 /**
1319 * ********************************************************************
1320 * subscribeOn - influences upstream (whole chain)
1321 * ********************************************************************
1322 */
1323 @Test
1324 void subscribeOnTest() {
1325 Flux numbFlux = Flux.range(1, 5)
1326 .map(i -> {
1327 log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1328 return i;
1329 }).subscribeOn(Schedulers.single())
1330 .map(i -> {
1331 log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1332 return i;
1333 });
1334 numbFlux.subscribe();
1335 }
1336
1337 /**
1338 * ********************************************************************
1339 * subscribeOn - influences upstream (whole chain)
1340 * ********************************************************************
1341 */
1342 @Test
1343 void subscribeOnTest2() {
1344 Flux numbFlux = Flux.range(1, 5)
1345 .map(i -> {
1346 log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1347 return i;
1348 }).subscribeOn(Schedulers.newSingle("my-thread"))
1349 .map(i -> {
1350 log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1351 return i;
1352 });
1353 numbFlux.subscribe();
1354 }
1355
1356 /**
1357 * ********************************************************************
1358 * publishOn - influences downstream
1359 * ********************************************************************
1360 */
1361 @Test
1362 void publishOnTest() {
1363 Flux numbFlux = Flux.range(1, 5)
1364 .map(i -> {
1365 log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1366 return i;
1367 }).publishOn(Schedulers.single())
1368 .map(i -> {
1369 log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1370 return i;
1371 });
1372 numbFlux.subscribe();
1373 }
1374
1375 /**
1376 * ********************************************************************
1377 * publishOn - influences downstream
1378 * ********************************************************************
1379 */
1380 @Test
1381 void publishOnTest2() {
1382 Flux numbFlux = Flux.range(1, 5)
1383 .map(i -> {
1384 log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1385 return i;
1386 }).publishOn(Schedulers.newSingle("my-thread"))
1387 .map(i -> {
1388 log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1389 return i;
1390 });
1391 numbFlux.subscribe();
1392 }
1393
1394 /**
1395 * ********************************************************************
1396 * fromSupplier - returns a value
1397 * fromCallable - returns a value or exception, runs blocking function on different thread
1398 * ********************************************************************
1399 */
1400 @Test
1401 public void monoSupplierTest() {
1402 Supplier<String> stringSupplier = () -> getName();
1403 Mono<String> mono = Mono.fromSupplier(stringSupplier)
1404 .log();
1405 mono.subscribe(System.out::println);
1406 }
1407
1408 /**
1409 * ********************************************************************
1410 * fromSupplier - returns a value
1411 * fromCallable - returns a value or exception, runs blocking function on different thread
1412 * ********************************************************************
1413 */
1414 @Test
1415 public void monoCallableTest() {
1416 Callable<String> stringCallable = () -> getName();
1417 Mono<String> mono = Mono.fromCallable(stringCallable)
1418 .log()
1419 .subscribeOn(Schedulers.boundedElastic());
1420 mono.subscribe(System.out::println);
1421 }
1422
1423 /**
1424 * ********************************************************************
1425 * fromCallable - read file may be blocking so we dont want to block main thread.
1426 * ********************************************************************
1427 */
1428 @Test
1429 @SneakyThrows
1430 void readFileTest() {
1431 Mono<List<String>> listMono = Mono.fromCallable(() -> Files.readAllLines(Path.of("src/test/resources/file.txt")))
1432 .log()
1433 .subscribeOn(Schedulers.boundedElastic());
1434
1435 listMono.subscribe(l -> log.info("Line: {}", l.size()));
1436 TimeUnit.SECONDS.sleep(5);
1437
1438 StepVerifier.create(listMono)
1439 .expectSubscription()
1440 .thenConsumeWhile(l -> {
1441 assertThat(l.isEmpty()).isFalse();
1442 return true;
1443 })
1444 .verifyComplete();
1445 }
1446
1447 /**
1448 * ********************************************************************
1449 * fromRunnable - runs blocking function on different thread, but doesnt return value
1450 * ********************************************************************
1451 */
1452 @Test
1453 public void monoRunnableTest() {
1454 Runnable stringCallable = () -> getName();
1455 Mono<Object> mono = Mono.fromRunnable(stringCallable)
1456 .log()
1457 .subscribeOn(Schedulers.boundedElastic());
1458 mono.subscribe(System.out::println);
1459 }
1460
1461 /**
1462 * ********************************************************************
1463 * ParallelFlux - Will complete in 1 sec even when 3 ops take 3 seconds in sequence
1464 * ********************************************************************
1465 */
1466 @Test
1467 void fluxParallelTest() {
1468 log.info("Cores: {}", Runtime.getRuntime().availableProcessors());
1469 ParallelFlux<String> flux1 = Flux.just("apple", "orange", "banana")
1470 .parallel()
1471 .runOn(Schedulers.parallel())
1472 .map(ReactorTest::capitalizeString)
1473 .log();
1474 StepVerifier.create(flux1)
1475 .expectNextCount(3)
1476 .verifyComplete();
1477
1478
1479 Flux<String> flux2 = Flux.just("apple", "orange", "banana")
1480 .flatMap(name -> {
1481 return Mono.just(name)
1482 .map(ReactorTest::capitalizeString)
1483 .subscribeOn(Schedulers.parallel());
1484 })
1485 .log();
1486 StepVerifier.create(flux2)
1487 .expectNextCount(3)
1488 .verifyComplete();
1489 }
1490
1491 /**
1492 * ********************************************************************
1493 * flatMap Parallelism - Will complete in 1 sec even when 3 ops take 3 seconds in sequence
1494 * ********************************************************************
1495 */
1496 @Test
1497 void fluxParallelWithFlatMapTest() {
1498 Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1499 .flatMap(name -> {
1500 return Mono.just(name)
1501 .map(ReactorTest::capitalizeString)
1502 .subscribeOn(Schedulers.parallel());
1503 })
1504 .log();
1505 StepVerifier.create(flux1)
1506 .expectNextCount(3)
1507 .verifyComplete();
1508 }
1509
1510 @SneakyThrows
1511 private static String capitalizeString(String element) {
1512 log.info("Capitalizing: {}", element);
1513 TimeUnit.SECONDS.sleep(1);
1514 return element.toUpperCase();
1515 }
1516
1517 /**
1518 * ********************************************************************
1519 * flatMap - fire-forget jobs with subscribe, Will run async jobs
1520 * ********************************************************************
1521 */
1522 @SneakyThrows
1523 @Test
1524 void fireForgetTest() {
1525 CountDownLatch latch = new CountDownLatch(3);
1526 Flux<Object> flux1 = Flux.just("apple", "orange", "banana")
1527 .flatMap(fruit -> {
1528 Mono.just(fruit)
1529 .map(e -> ReactorTest.capitalizeStringLatch(e, latch))
1530 .subscribeOn(Schedulers.parallel())
1531 .subscribe();
1532 return Mono.empty();
1533 })
1534 .log();
1535 StepVerifier.create(flux1)
1536 .verifyComplete();
1537 latch.await(5, TimeUnit.SECONDS);
1538 }
1539
1540 @SneakyThrows
1541 private static String capitalizeStringLatch(String element, CountDownLatch latch) {
1542 log.info("Capitalizing: {}", element);
1543 TimeUnit.SECONDS.sleep(1);
1544 latch.countDown();
1545 return element.toUpperCase();
1546 }
1547
1548 /**
1549 * ********************************************************************
1550 * flatMapSequential - Maintains order but executes in parallel
1551 * ********************************************************************
1552 */
1553 @Test
1554 void flatMapSequentialTest() {
1555 Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1556 .flatMapSequential(name -> {
1557 return Mono.just(name)
1558 .map(ReactorTest::capitalizeString)
1559 .subscribeOn(Schedulers.parallel());
1560 })
1561 .log();
1562 StepVerifier.create(flux1)
1563 .expectNext("APPLE", "ORANGE", "BANANA")
1564 .verifyComplete();
1565 }
1566
1567 /**
1568 * ********************************************************************
1569 * flatMapSequential - Maintains order but executes in parallel
1570 * ********************************************************************
1571 */
1572 @Test
1573 void flatMapSequentialConcurrencyTest() {
1574 Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1575 .flatMapSequential(name -> {
1576 return Mono.just(name)
1577 .map(ReactorTest::capitalizeString)
1578 .subscribeOn(Schedulers.parallel());
1579 }, 1)
1580 .log();
1581 StepVerifier.create(flux1)
1582 .expectNext("APPLE", "ORANGE", "BANANA")
1583 .verifyComplete();
1584 }
1585
1586 /**
1587 * ********************************************************************
1588 * withVirtualTime - flux that emits every second.
1589 * ********************************************************************
1590 */
1591 @Test
1592 @SneakyThrows
1593 void fluxIntervalTakeTest() {
1594 VirtualTimeScheduler.getOrSet();
1595 Flux<Long> interval = Flux.interval(Duration.ofSeconds(1))
1596 .log()
1597 .take(10);
1598 interval.subscribe(i -> log.info("Number: {}", i));
1599 TimeUnit.SECONDS.sleep(5);
1600 StepVerifier.withVirtualTime(() -> interval)
1601 .thenAwait(Duration.ofSeconds(5))
1602 .expectNextCount(4)
1603 .thenCancel()
1604 .verify();
1605 }
1606
1607 /**
1608 * ********************************************************************
1609 * flux that emits every day. Use of virtual time to simulate days.
1610 * ********************************************************************
1611 */
1612 @Test
1613 @SneakyThrows
1614 void fluxIntervalVirtualTimeTest() {
1615 VirtualTimeScheduler.getOrSet();
1616 StepVerifier.withVirtualTime(this::getTake)
1617 .expectSubscription()
1618 .expectNoEvent(Duration.ofDays(1))
1619 .thenAwait(Duration.ofDays(1))
1620 .expectNext(0L)
1621 .thenAwait(Duration.ofDays(1))
1622 .expectNext(1L)
1623 .thenCancel()
1624 .verify();
1625 }
1626
1627 private Flux<Long> getTake() {
1628 return Flux.interval(Duration.ofDays(1))
1629 .log()
1630 .take(10);
1631 }
1632
1633 /**
1634 * ********************************************************************
1635 * then - will just replay the source terminal signal, resulting in a Mono<Void> to indicate that this never signals any onNext.
1636 * thenEmpty - not only returns a Mono<Void>, but it takes a Mono<Void> as a parameter. It represents a concatenation of the source completion signal then the second, empty Mono completion signal. In other words, it completes when A then B have both completed sequentially, and doesn't emit data.
1637 * thenMany - waits for the source to complete then plays all the signals from its Publisher<R> parameter, resulting in a Flux<R> that will "pause" until the source completes, then emit the many elements from the provided publisher before replaying its completion signal as well.
1638 * ********************************************************************
1639 */
1640 @Test
1641 void thenManyChainTest() {
1642 Flux<String> names = Flux.just("Jack", "Jill");
1643 names.map(String::toUpperCase)
1644 .thenMany(ReactorTest.deleteFromDb())
1645 .thenMany(ReactorTest.saveToDb())
1646 .subscribe(System.out::println);
1647 }
1648
1649 private static Flux<String> deleteFromDb() {
1650 return Flux.just("Deleted from db").log();
1651 }
1652
1653 private static Flux<String> saveToDb() {
1654 return Flux.just("Saved to db").log();
1655 }
1656
1657 private static Mono<Void> sendMail() {
1658 return Mono.empty();
1659 }
1660
1661 @Test
1662 void thenEmptyTest() {
1663 Flux<String> names = Flux.just("Jack", "Jill");
1664 names.map(String::toUpperCase)
1665 .thenMany(ReactorTest.saveToDb())
1666 .thenEmpty(ReactorTest.sendMail())
1667 .subscribe(System.out::println);
1668 }
1669
1670 @Test
1671 void thenTest() {
1672 Flux<String> names = Flux.just("Jack", "Jill");
1673 names.map(String::toUpperCase)
1674 .thenMany(ReactorTest.saveToDb())
1675 .then()
1676 .then(Mono.just("Ram"))
1677 .thenReturn("Done!")
1678 .subscribe(System.out::println);
1679 }
1680
1681 /**
1682 * ********************************************************************
1683 * firstWithValue - first mono to return
1684 * ********************************************************************
1685 */
1686 @Test
1687 void monoFirstTest() {
1688 Mono<String> mono1 = Mono.just("Jack").delayElement(Duration.ofSeconds(1));
1689 Mono<String> mono2 = Mono.just("Jill");
1690 //Return the mono which returns its value faster
1691 Mono<String> mono3 = Mono.firstWithValue(mono1, mono2);
1692 mono3.subscribe(System.out::println);
1693 StepVerifier.create(mono3)
1694 .expectNext("Jill")
1695 .verifyComplete();
1696 }
1697
1698 /**
1699 * ********************************************************************
1700 * buffer
1701 * ********************************************************************
1702 */
1703 @Test
1704 public void bufferGroupTest() {
1705 Flux<List<Integer>> flux1 = Flux
1706 .range(1, 7)
1707 .buffer(2);
1708 StepVerifier
1709 .create(flux1)
1710 .expectNext(Arrays.asList(1, 2))
1711 .expectNext(Arrays.asList(3, 4))
1712 .expectNext(Arrays.asList(5, 6))
1713 .expectNext(Arrays.asList(7))
1714 .verifyComplete();
1715 }
1716
1717 @Test
1718 @SneakyThrows
1719 public void tickClockTest() {
1720 Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
1721 Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
1722 Flux.merge(fastClock, slowClock).subscribe(System.out::println);
1723 TimeUnit.SECONDS.sleep(5);
1724 }
1725
1726 @Test
1727 @SneakyThrows
1728 public void tickMergeClockTest() {
1729 Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
1730 Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
1731 Flux clock = Flux.merge(slowClock, fastClock);
1732 Flux feed = Flux.interval(Duration.ofSeconds(1)).map(tick -> LocalTime.now());
1733 clock.withLatestFrom(feed, (tick, time) -> tick + " " + time).subscribe(System.out::println);
1734 TimeUnit.SECONDS.sleep(15);
1735 }
1736
1737 @Test
1738 @SneakyThrows
1739 public void tickZipClockTest() {
1740 Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
1741 Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
1742 fastClock.zipWith(slowClock, (tick, time) -> tick + " " + time).subscribe(System.out::println);
1743 TimeUnit.SECONDS.sleep(5);
1744 }
1745
1746 @Test
1747 @SneakyThrows
1748 public void emitterTest() {
1749 MyFeed myFeed = new MyFeed();
1750 Flux feedFlux = Flux.create(emmiter -> {
1751 myFeed.register(new MyListener() {
1752 @Override
1753 public void priceTick(String msg) {
1754 emmiter.next(msg);
1755 }
1756
1757 @Override
1758 public void error(Throwable error) {
1759 emmiter.error(error);
1760 }
1761 });
1762 }, FluxSink.OverflowStrategy.LATEST);
1763 feedFlux.subscribe(System.out::println);
1764 TimeUnit.SECONDS.sleep(15);
1765 System.out.println("Sending message!");
1766 for (int i = 0; i < 10; i++) {
1767 myFeed.sendMessage("HELLO_" + i);
1768 }
1769 }
1770
1771 /**
1772 * ********************************************************************
1773 * cancel subscription
1774 * ********************************************************************
1775 */
1776 @Test
1777 void monoCancelSubscriptionTest() {
1778 Mono<String> helloMono = Mono.just("Jack")
1779 .log()
1780 .map(String::toUpperCase);
1781 helloMono.subscribe(s -> {
1782 log.info("Got: {}", s);
1783 },
1784 Throwable::printStackTrace,
1785 () -> log.info("Finished"),
1786 Subscription::cancel
1787 );
1788 }
1789
1790 /**
1791 * ********************************************************************
1792 * cancel subscription after n elements
1793 * ********************************************************************
1794 */
1795 @Test
1796 void monoCompleteSubscriptionRequestBoundedTest() {
1797 //Jill wont be fetched as subscription will be cancelled after 2 elements
1798 Flux<String> namesMono = Flux.just("Jack", "Jane", "Jill")
1799 .log()
1800 .map(String::toUpperCase);
1801 namesMono.subscribe(s -> {
1802 log.info("Got: {}", s);
1803 },
1804 Throwable::printStackTrace,
1805 () -> log.info("Finished"),
1806 subscription -> subscription.request(2));
1807 }
1808
1809 /**
1810 * ********************************************************************
1811 * backpressure
1812 * ********************************************************************
1813 */
1814 @Test
1815 void fluxBackPressureTest() {
1816 Flux<Integer> fluxNumber = Flux.range(1, 5).log();
1817
1818 //Fetches 2 at a time.
1819 fluxNumber.subscribe(new BaseSubscriber<>() {
1820 private int count = 0;
1821 private final int requestCount = 2;
1822
1823 @Override
1824 protected void hookOnSubscribe(Subscription subscription) {
1825 request(requestCount);
1826 }
1827
1828 @Override
1829 protected void hookOnNext(Integer value) {
1830 count++;
1831 if (count >= requestCount) {
1832 count = 0;
1833 log.info("requesting next batch!");
1834 request(requestCount);
1835 }
1836 }
1837 });
1838 }
1839
1840 /**
1841 * ********************************************************************
1842 * onBackpressureDrop - fetches all in unbounded request, but stores in internal queue, drops elements not used
1843 * ********************************************************************
1844 */
1845 @Test
1846 void fluxBackPressureDropTest() {
1847 Flux<Integer> fluxNumber = Flux.range(1, 15).log();
1848
1849 //Fetches 2 at a time.
1850 fluxNumber
1851 .onBackpressureDrop(item -> {
1852 log.info("Dropped {}", item);
1853 })
1854 .subscribe(new BaseSubscriber<>() {
1855 private int count = 0;
1856 private final int requestCount = 2;
1857 private int batch = 0;
1858
1859 @Override
1860 protected void hookOnSubscribe(Subscription subscription) {
1861 request(requestCount);
1862 }
1863
1864 @Override
1865 protected void hookOnNext(Integer value) {
1866 if (batch > 2) {
1867 return;
1868 }
1869 count++;
1870 if (count >= requestCount) {
1871 count = 0;
1872 batch++;
1873 log.info("requesting next batch {}", batch);
1874 request(requestCount);
1875 }
1876
1877 }
1878 });
1879 }
1880
1881 /**
1882 * ********************************************************************
1883 * onBackpressureBuffer - fetches all in unbounded request, but stores in internal queue, but doesnt drop unused items
1884 * ********************************************************************
1885 */
1886 @Test
1887 void fluxBackPressureBuffetTest() {
1888 Flux<Integer> fluxNumber = Flux.range(1, 15).log();
1889
1890 //Fetches 2 at a time.
1891 fluxNumber
1892 .onBackpressureBuffer()
1893 .subscribe(new BaseSubscriber<>() {
1894 private int count = 0;
1895 private final int requestCount = 2;
1896 private int batch = 0;
1897
1898 @Override
1899 protected void hookOnSubscribe(Subscription subscription) {
1900 request(requestCount);
1901 }
1902
1903 @Override
1904 protected void hookOnNext(Integer value) {
1905 if (batch > 2) {
1906 return;
1907 }
1908 count++;
1909 if (count >= requestCount) {
1910 count = 0;
1911 batch++;
1912 log.info("requesting next batch {}", batch);
1913 request(requestCount);
1914 }
1915
1916 }
1917 });
1918 }
1919
1920 /**
1921 * ********************************************************************
1922 * onBackpressureError - To identify if receiver is overrun by items as producer is producing more elements than can be processed.
1923 * ********************************************************************
1924 */
1925 @Test
1926 void fluxBackPressureOnErrorTest() {
1927 Flux<Integer> fluxNumber = Flux.range(1, 15).log();
1928
1929 //Fetches 2 at a time.
1930 fluxNumber
1931 .onBackpressureError()
1932 .subscribe(new BaseSubscriber<>() {
1933 private int count = 0;
1934 private final int requestCount = 2;
1935 private int batch = 0;
1936
1937 @Override
1938 protected void hookOnSubscribe(Subscription subscription) {
1939 request(requestCount);
1940 }
1941
1942 @Override
1943 protected void hookOnError(Throwable throwable) {
1944 log.error("Error thrown is: {}", throwable.getMessage());
1945 }
1946
1947 @Override
1948 protected void hookOnNext(Integer value) {
1949 if (batch > 2) {
1950 return;
1951 }
1952 count++;
1953 if (count >= requestCount) {
1954 count = 0;
1955 batch++;
1956 log.info("requesting next batch {}", batch);
1957 request(requestCount);
1958 }
1959
1960 }
1961 });
1962 }
1963
1964 /**
1965 * ********************************************************************
1966 * backpressure - limit rate
1967 * ********************************************************************
1968 */
1969 @Test
1970 void fluxBackPressureLimitRateTest() {
1971 Flux<Integer> fluxNumber = Flux.range(1, 5).log().limitRate(3);
1972 StepVerifier.create(fluxNumber)
1973 .expectNext(1, 2, 3, 4, 5)
1974 .verifyComplete();
1975 }
1976
1977 /**
1978 * ********************************************************************
1979 * hot flux
1980 * ********************************************************************
1981 */
1982 @Test
1983 @SneakyThrows
1984 void connectableFluxTest() {
1985 ConnectableFlux<Integer> connectableFlux = Flux.range(1, 10)
1986 .delayElements(Duration.ofSeconds(1))
1987 .publish();
1988 connectableFlux.connect();
1989
1990 TimeUnit.SECONDS.sleep(3);
1991 connectableFlux.subscribe(i -> {
1992 log.info("Sub1 Number: {}", i);
1993 });
1994
1995 TimeUnit.SECONDS.sleep(2);
1996 connectableFlux.subscribe(i -> {
1997 log.info("Sub2 Number: {}", i);
1998 });
1999
2000 ConnectableFlux<Integer> connectableFlux2 = Flux.range(1, 10)
2001 .delayElements(Duration.ofSeconds(1))
2002 .publish();
2003 StepVerifier.create(connectableFlux2)
2004 .then(connectableFlux2::connect)
2005 .thenConsumeWhile(i -> i <= 5)
2006 .expectNext(6, 7, 8, 9, 10)
2007 .expectComplete()
2008 .verify();
2009 }
2010
2011 /**
2012 * ********************************************************************
2013 * hot flux - auto connect, min subscribers required before publisher emits
2014 * ********************************************************************
2015 */
2016 @Test
2017 @SneakyThrows
2018 void connectableAutoFluxTest() {
2019 //Hot Flux.
2020 Flux<Integer> connectableFlux = Flux.range(1, 5)
2021 .log()
2022 .delayElements(Duration.ofSeconds(1))
2023 .publish()
2024 .autoConnect(2);
2025
2026 //2 subscribers
2027 StepVerifier.create(connectableFlux)
2028 .then(connectableFlux::subscribe)
2029 .expectNext(1, 2, 3, 4, 5)
2030 .expectComplete()
2031 .verify();
2032 }
2033
2034 /**
2035 * ********************************************************************
2036 * hot flux - ref count, if subscriber count goes down, publisher stops emitting
2037 * ********************************************************************
2038 */
2039 @Test
2040 @SneakyThrows
2041 void connectableRefCountTest() {
2042 //Hot Flux.
2043 Flux<Integer> connectableFlux = Flux.range(1, 15)
2044 .delayElements(Duration.ofSeconds(1))
2045 .doOnCancel(() -> {
2046 log.info("Received cancel");
2047 })
2048 .publish()
2049 .refCount(2);
2050
2051 //Min 2 subscribers required
2052 Disposable subscribe1 = connectableFlux.subscribe(e -> log.info("Sub1: " + e));
2053 Disposable subscribe2 = connectableFlux.subscribe(e -> log.info("Sub2: " + e));
2054 TimeUnit.SECONDS.sleep(3);
2055 subscribe1.dispose();
2056 subscribe2.dispose();
2057 TimeUnit.SECONDS.sleep(5);
2058 }
2059
2060 /**
2061 * ********************************************************************
2062 * defer
2063 * ********************************************************************
2064 */
2065 @Test
2066 @SneakyThrows
2067 void deferTest() {
2068 Mono<Long> just = Mono.just(System.currentTimeMillis());
2069 Mono<Long> deferJust = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
2070
2071 just.subscribe(l -> log.info("Time: {}", l));
2072 TimeUnit.SECONDS.sleep(2);
2073 just.subscribe(l -> log.info("Time: {}", l));
2074
2075 deferJust.subscribe(l -> log.info("Time: {}", l));
2076 TimeUnit.SECONDS.sleep(2);
2077 deferJust.subscribe(l -> log.info("Time: {}", l));
2078
2079 }
2080
2081 /**
2082 * ********************************************************************
2083 * combineLatest - will change order based on time. Rarely used.
2084 * ********************************************************************
2085 */
2086 @Test
2087 void combineLatestTest() {
2088 Flux<String> flux1 = Flux.just("a", "b");
2089 Flux<String> flux2 = Flux.just("c", "d");
2090 Flux<String> flux3 = Flux.combineLatest(flux1, flux2, (s1, s2) -> s1 + s2)
2091 .log();
2092 StepVerifier.create(flux3)
2093 .expectSubscription()
2094 .expectNext("bc", "bd")
2095 .verifyComplete();
2096 }
2097
2098 /**
2099 * ********************************************************************
2100 * onSchedulersHook - if you have to use thread local
2101 * ********************************************************************
2102 */
2103 @Test
2104 public void schedulerHookTest() {
2105 Runnable stringCallable = () -> getName();
2106 Schedulers.onScheduleHook("myHook", runnable -> {
2107 log.info("before scheduled runnable");
2108 return () -> {
2109 log.info("before execution");
2110 runnable.run();
2111 log.info("after execution");
2112 };
2113 });
2114 Mono.just("Hello world")
2115 .subscribeOn(Schedulers.single())
2116 .subscribe(System.out::println);
2117 }
2118
2119 /**
2120 * ********************************************************************
2121 * checkpoint
2122 * ********************************************************************
2123 */
2124 @Test
2125 void checkpointTest() {
2126 Flux flux = Flux.just("Jack", "Jill", "Joe")
2127 .checkpoint("before uppercase")
2128 .map(e -> e.toUpperCase())
2129 .checkpoint("after uppercase")
2130 .filter(e -> e.length() > 3)
2131 .checkpoint("after filter")
2132 .map(e -> new RuntimeException("Custom error!"));
2133 flux.subscribe(System.out::println);
2134 }
2135
2136 /**
2137 * ********************************************************************
2138 * checkpoint
2139 * ********************************************************************
2140 */
2141 @Test
2142 void debugAgentTest() {
2143 ReactorDebugAgent.init();
2144 ReactorDebugAgent.processExistingClasses();
2145 Flux flux = Flux.just("a")
2146 .concatWith(Flux.error(new IllegalArgumentException("My Error!")))
2147 .onErrorMap(ex -> {
2148 log.error("Exception: {}", ex.getMessage());
2149 return new IllegalStateException("New Error!");
2150 });
2151 flux.subscribe(System.out::println);
2152 }
2153
2154 /**
2155 * ********************************************************************
2156 * Flux.generate - programmatically create flux, synchronous
2157 * ********************************************************************
2158 */
2159 @Test
2160 void fluxGenerateTest() {
2161 Flux<Integer> flux = Flux.generate(() -> 1, (state, sink) -> {
2162 sink.next(state * 2);
2163 if (state == 10) {
2164 sink.complete();
2165 }
2166 return state + 1;
2167 });
2168
2169 flux.subscribe(System.out::println);
2170
2171 StepVerifier.create(flux)
2172 .expectNextCount(10)
2173 .verifyComplete();
2174 }
2175
2176 /**
2177 * ********************************************************************
2178 * Flux.create - programmatically create flux, asynchronous
2179 * ********************************************************************
2180 */
2181 @Test
2182 void fluxCreateTest() {
2183 List<String> names = Arrays.asList("jack", "jill");
2184 Flux<String> flux = Flux.create(sink -> {
2185 names.forEach(sink::next);
2186 sink.complete();
2187 });
2188
2189 StepVerifier.create(flux)
2190 .expectNextCount(2)
2191 .verifyComplete();
2192 }
2193
2194 private String getName() {
2195 return "John";
2196 }
2197
2198}
2199
2200class MyFeed {
2201
2202 List<MyListener> listeners = new ArrayList<>();
2203
2204 public void register(MyListener listener) {
2205 listeners.add(listener);
2206 }
2207
2208 public void sendMessage(String msg) {
2209 listeners.forEach(e -> {
2210 e.priceTick(msg);
2211 });
2212 }
2213}
2214
2215interface MyListener {
2216 void priceTick(String msg);
2217
2218 void error(Throwable error);
2219}
Reactor chaining samples
1package com.demo.project83;
2
3import java.util.ArrayList;
4import java.util.List;
5
6import lombok.Builder;
7import lombok.Data;
8import lombok.extern.slf4j.Slf4j;
9import org.junit.jupiter.api.Test;
10import reactor.core.publisher.Mono;
11
12@Slf4j
13public class ReactorChainTest {
14
15 @Test
16 public void test() {
17 CompanyVO request = new CompanyVO();
18 request.setName("Twitter");
19 Mono.just(request)
20 .map(ReactorChainTest::convertToEntity)
21 .zipWith(ReactorChainTest.getNameSuffix(), ReactorChainTest::appendSuffix)
22 .flatMap(ReactorChainTest::addCompanyOwner)
23 .flatMap(ReactorChainTest::appendOrgIdToDepartment)
24 .flatMap(ReactorChainTest::save)
25 .subscribe(System.out::println);
26 }
27
28 private static Company appendSuffix(Company company, String nameSuffix) {
29 company.setName(company.name + " " + nameSuffix);
30 return company;
31 }
32
33 private static Mono<String> getOwnerName() {
34 return Mono.just("Jack");
35 }
36
37 private static Mono<String> getOrgId() {
38 return Mono.just("org1: ");
39 }
40
41 public static Company convertToEntity(CompanyVO companyVO) {
42 Company company = new Company();
43 company.setName(companyVO.getName().toUpperCase());
44 List<Department> departments = new ArrayList<>();
45 departments.add(Department.builder().name("department 1").build());
46 company.setDepartments(departments);
47 return company;
48 }
49
50 public static Mono<Company> save(Company company) {
51 log.info("Saved to db!");
52 return Mono.just(company);
53 }
54
55 public static Mono<Company> appendOrgIdToDepartment(Company company) {
56 return getOrgId().map(e -> {
57 company.getDepartments().forEach(d -> d.setName(e + " " + d.getName()));
58 return company;
59 });
60 }
61
62 public static Mono<String> getNameSuffix() {
63 return Mono.just(".Inc");
64 }
65
66 public static Mono<Company> addCompanyOwner(Company company) {
67 return getOwnerName().map(e -> {
68 company.setOwner(e);
69 return company;
70 });
71 }
72}
73
74@Data
75class CompanyVO {
76 String name;
77}
78
79@Data
80class Company {
81 String name;
82 List<Department> departments;
83 String owner;
84}
85
86@Data
87@Builder
88class Department {
89 String name;
90}
Reactor object samples
1package com.demo.project83;
2
3import static org.junit.jupiter.api.Assertions.assertEquals;
4
5import java.util.ArrayList;
6import java.util.List;
7import java.util.stream.Collectors;
8
9import lombok.AllArgsConstructor;
10import lombok.Builder;
11import lombok.Data;
12import lombok.NoArgsConstructor;
13import lombok.RequiredArgsConstructor;
14import lombok.extern.slf4j.Slf4j;
15import org.junit.jupiter.api.BeforeEach;
16import org.junit.jupiter.api.Test;
17import reactor.core.publisher.Flux;
18import reactor.core.publisher.GroupedFlux;
19import reactor.core.publisher.Mono;
20import reactor.test.StepVerifier;
21import reactor.util.function.Tuple2;
22import reactor.util.function.Tuples;
23
24@Slf4j
25public class ReactorObjectTest {
26
27 Flux<ProjectDTO> fluxFromRequest;
28 Flux<ProjectEntity> fluxFromDb;
29
30 @BeforeEach
31 public void setup() {
32 fluxFromRequest = Flux.just(
33 ProjectDTO.builder().name("p3").build(),
34 ProjectDTO.builder().name("p1").build(),
35 ProjectDTO.builder().name("p5").build()
36 );
37
38 fluxFromDb = Flux.just(
39 ProjectEntity.builder().entityName("p5").build(),
40 ProjectEntity.builder().entityName("p4").build(),
41 ProjectEntity.builder().entityName("p1").build(),
42 ProjectEntity.builder().entityName("p2").build()
43 );
44 }
45
46 @Test
47 void fluxIntersectCommonApproach1() {
48 Flux<ProjectEntity> commonFlux = fluxFromDb.filter(f -> {
49 //Inefficient
50 //Not for live stream or stream that can be subscribed only once.
51 //toSteam converts your non-blocking asynchronous flux to a blocking stream API which will impact performance
52 return fluxFromRequest.toStream().anyMatch(e -> e.getName().equals(f.getEntityName()));
53 });
54 commonFlux.subscribe(System.out::println);
55 StepVerifier.create(commonFlux)
56 .expectNext(ProjectEntity.builder().entityName("p5").build())
57 .expectNext(ProjectEntity.builder().entityName("p1").build())
58 .verifyComplete();
59 }
60
61 @Test
62 void fluxIntersectCommonApproach2() {
63 Flux<ProjectEntity> commonFlux = fluxFromRequest
64 .map(dto -> dto.getName())
65 .collect(Collectors.toSet())
66 .flatMapMany(set -> {
67 return fluxFromDb
68 //Filter out matching
69 //Limitation is that you can only compare 1 value collected in set.
70 .filter(t -> set.contains(t.getEntityName()));
71 });
72 commonFlux.subscribe(System.out::println);
73 StepVerifier.create(commonFlux)
74 .expectNext(ProjectEntity.builder().entityName("p5").build())
75 .expectNext(ProjectEntity.builder().entityName("p1").build())
76 .verifyComplete();
77 }
78
79 @Test
80 void fluxIntersectCommonApproach3() {
81 Flux<ProjectEntity> commonFlux = fluxFromDb.join(fluxFromRequest, s -> Flux.never(), s -> Flux.never(), Tuples::of)
82 //Filter out matching
83 .filter(t -> t.getT1().getEntityName().equals(t.getT2().getName()))
84 //Revert to single value
85 .map(Tuple2::getT1)
86 //Remove duplicates, if any
87 .groupBy(f -> f)
88 .map(GroupedFlux::key);
89 commonFlux.subscribe(System.out::println);
90 StepVerifier.create(commonFlux)
91 .expectNext(ProjectEntity.builder().entityName("p1").build())
92 .expectNext(ProjectEntity.builder().entityName("p5").build())
93 .verifyComplete();
94 }
95
96 @Test
97 void postGetAllTest() {
98 DbService service = new DbService();
99 Flux<Post> postFlux = service.getAllPosts()
100 .flatMap(post -> {
101 Mono<List<Comment>> commentMono = service.getCommentByPostId(post.id).collectList();
102 return commentMono.map(comments -> Post.builder()
103 .id(post.id)
104 .message(post.message)
105 .user(post.user)
106 .comments(comments)
107 .build());
108 });
109 StepVerifier.create(postFlux)
110 .assertNext(post -> {
111 assertEquals("post 1", post.getMessage());
112 assertEquals(1, post.getComments().size());
113 })
114 .assertNext(post -> {
115 assertEquals("post 2", post.getMessage());
116 assertEquals(2, post.getComments().size());
117 })
118 .verifyComplete();
119 }
120
121 @Test
122 void postGetByIdTest() {
123 DbService service = new DbService();
124 Mono<List<Comment>> commentMono = service.getCommentByPostId(1l).collectList();
125 Mono<Post> getByIdMono = service.getPostById(1l).zipWith(commentMono, (post, comments) -> {
126 return Post.builder()
127 .id(post.id)
128 .message(post.message)
129 .user(post.user)
130 .comments(comments)
131 .build();
132 });
133 StepVerifier.create(getByIdMono)
134 .assertNext(post -> {
135 assertEquals("post 1", post.getMessage());
136 assertEquals(1, post.getComments().size());
137 })
138 .verifyComplete();
139 }
140
141}
142
143@Data
144@Builder
145class ProjectDTO {
146 String name;
147 String someExtraField;
148}
149
150@Data
151@Builder
152class ProjectEntity {
153 String entityName;
154 String dbField;
155}
156
157@Data
158@AllArgsConstructor
159@NoArgsConstructor
160@Builder
161class Post {
162 Long id;
163 String message;
164 String user;
165 @Builder.Default
166 List<Comment> comments = new ArrayList<>();
167}
168
169@Data
170@AllArgsConstructor
171@RequiredArgsConstructor
172@Builder
173class Comment {
174 Long id;
175 Long postId;
176 String comment;
177 String user;
178}
179
180class DbService {
181 Flux<Post> postFlux = Flux.fromIterable(List.of(
182 Post.builder().id(1l).message("post 1").user("jack").build(),
183 Post.builder().id(2l).message("post 2").user("jill").build()));
184
185 Flux<Comment> commentFlux = Flux.fromIterable(List.of(
186 Comment.builder().id(1l).postId(1l).comment("comment 1").user("adam").build(),
187 Comment.builder().id(2l).postId(2l).comment("comment 2").user("jane").build(),
188 Comment.builder().id(3l).postId(2l).comment("comment 3").user("raj").build()));
189
190 //Get all posts
191 Flux<Post> getAllPosts() {
192 return postFlux;
193 }
194
195 Mono<Post> getPostById(long id) {
196 //Take the first element in the flux.
197 return postFlux.filter(e -> e.id == id).next();
198 }
199
200 //Get all reviews associated with the post.
201 Flux<Comment> getCommentByPostId(long id) {
202 return commentFlux.filter(e -> e.postId == id);
203 }
204}
References
comments powered by Disqus