Spring Reactor - Basics
Overview
Reactive programming examples on how to use spring reactor.
Github: https://github.com/gitorko/project83
Spring Reactor
Spring Reactor is a library for building non-blocking, reactive applications in Java. Reactor is used in Spring WebFlux, which is the reactive web framework included in Spring 5.
Features
- Reactive Streams: Reactor is based on the Reactive Streams specification, which defines a standard for asynchronous stream processing with non-blocking backpressure.
- Mono and Flux: Mono represents a single value or an empty result (similar to Optional). Flux represents a stream of 0 to N elements.
- Functional API: Reactor provides a rich set of operators that allow you to manipulate, transform, and compose reactive streams in a functional style.
- Non-blocking: Reactor is designed to work in a non-blocking manner, making it suitable for applications that need to handle a large number of concurrent I/O operations.
- Backpressure: Reactor supports backpressure, a mechanism to ensure that a producer does not overwhelm a consumer with too much data.
Code
1package com.demo.project83;
2
3import static com.demo.project83.common.HelperUtil.getCustomer;
4import static com.demo.project83.common.HelperUtil.getCustomers;
5import static com.demo.project83.common.HelperUtil.getName;
6import static org.assertj.core.api.Assertions.assertThat;
7import static org.junit.jupiter.api.Assertions.assertEquals;
8
9import java.nio.file.Files;
10import java.nio.file.Path;
11import java.nio.file.Paths;
12import java.time.Duration;
13import java.time.LocalTime;
14import java.util.Arrays;
15import java.util.Collection;
16import java.util.HashMap;
17import java.util.List;
18import java.util.Map;
19import java.util.Optional;
20import java.util.UUID;
21import java.util.concurrent.Callable;
22import java.util.concurrent.CountDownLatch;
23import java.util.concurrent.FutureTask;
24import java.util.concurrent.TimeUnit;
25import java.util.concurrent.atomic.AtomicLong;
26import java.util.function.Consumer;
27import java.util.function.Function;
28import java.util.function.Supplier;
29import java.util.stream.Collectors;
30import java.util.stream.IntStream;
31import java.util.stream.Stream;
32
33import com.demo.project83.common.CompanyVO;
34import com.demo.project83.common.Customer;
35import com.demo.project83.common.Employee;
36import com.demo.project83.common.HelperUtil;
37import com.demo.project83.common.MyFeed;
38import com.demo.project83.common.MyListener;
39import lombok.SneakyThrows;
40import lombok.extern.slf4j.Slf4j;
41import org.junit.jupiter.api.Assertions;
42import org.junit.jupiter.api.BeforeAll;
43import org.junit.jupiter.api.Test;
44import org.reactivestreams.Subscription;
45import org.springframework.data.domain.PageImpl;
46import org.springframework.data.domain.PageRequest;
47import reactor.blockhound.BlockHound;
48import reactor.blockhound.BlockingOperationError;
49import reactor.core.Disposable;
50import reactor.core.Exceptions;
51import reactor.core.publisher.BaseSubscriber;
52import reactor.core.publisher.ConnectableFlux;
53import reactor.core.publisher.Flux;
54import reactor.core.publisher.FluxSink;
55import reactor.core.publisher.GroupedFlux;
56import reactor.core.publisher.Mono;
57import reactor.core.publisher.ParallelFlux;
58import reactor.core.scheduler.Schedulers;
59import reactor.test.StepVerifier;
60import reactor.test.scheduler.VirtualTimeScheduler;
61import reactor.tools.agent.ReactorDebugAgent;
62import reactor.util.function.Tuple2;
63import reactor.util.function.Tuples;
64import reactor.util.retry.Retry;
65import reactor.util.retry.RetryBackoffSpec;
66
67/**
68 * Reactive Streams Specification
69 * 1. Asynchronous
70 * 2. Non-Blocking
71 * 3. Backpressure
72 *
73 * Publisher (Mono/Flux)
74 * - subscribe (data source, db, remote service)
75 * Subscriber
76 * - onSubscribe
77 * - onNext
78 * - onError
79 * - onComplete
80 * Subscription
81 * - request
82 * - cancel
83 * Processor - Publisher + Subscriber
84 *
85 * Spring reactor is a Push + Pull data flow model
86 *
87 * Subscribers request for data. Publishers provide data
88 * subscribers (downstream) and publishers (upstream)
89 */
90@Slf4j
91public class ReactorTest {
92
93 @BeforeAll
94 public static void init() {
95 BlockHound.install();
96 }
97
98 /**
99 * ********************************************************************
100 * Mono
101 * ********************************************************************
102 */
103
104 @Test
105 void test_stepVerifier() {
106 Mono.just("jack")
107 .as(StepVerifier::create)
108 .expectNext("jack")
109 .verifyComplete();
110 }
111
112 @Test
113 void test_mono() {
114 //justOrEmpty
115 Mono<String> mono = Mono.just("jack");
116 mono.subscribe(System.out::println);
117 StepVerifier.create(mono)
118 .expectNext("jack")
119 .verifyComplete();
120 }
121
122 @Test
123 void test_justOrEmpty() {
124 //justOrEmpty
125 Mono<String> mono = Mono.justOrEmpty("jack");
126 mono.subscribe(System.out::println);
127 StepVerifier.create(mono)
128 .expectNext("jack")
129 .verifyComplete();
130 }
131
132 @Test
133 void test_justOrEmpty_null() {
134 //Note: Reactive Streams do not accept null values
135 Mono<String> mono = Mono.justOrEmpty(null);
136 mono.subscribe(System.out::println);
137 StepVerifier.create(mono)
138 .expectNextCount(0)
139 .verifyComplete();
140 }
141
142 /**
143 * ********************************************************************
144 * log
145 * request(unbounded)
146 * default subscribe requests unbounded, all elements are requested
147 * ********************************************************************
148 */
149 @Test
150 void test_log() {
151 //Note: Use log to look at transitions.
152 Mono<String> mono = Mono.just("jack")
153 .log();
154 mono.subscribe(s -> {
155 log.info("Got: {}", s);
156 });
157 StepVerifier.create(mono)
158 .expectNext("jack")
159 .verifyComplete();
160 }
161
162 /**
163 * ********************************************************************
164 * flux
165 * ********************************************************************
166 */
167 @Test
168 void test_flux() {
169 Flux flux = Flux.just("jack", "raj");
170 flux.subscribe(System.out::println);
171 StepVerifier.create(flux)
172 .expectNext("jack", "raj")
173 .verifyComplete();
174 }
175
176 /**
177 * ********************************************************************
178 * Avoid blocking operations that hold thread
179 * ********************************************************************
180 */
181 @Test
182 void test_delayElements() {
183 Flux flux = Flux.just("jack", "raj")
184 .map(e -> {
185 log.info("Received: {}", e);
186 //Bad idea to do Thread.sleep or any blocking call.
187 //Use delayElements.
188 return e;
189 }).delayElements(Duration.ofSeconds(1));
190 flux.subscribe(System.out::println);
191 //Test will wait for 1 second
192 StepVerifier.create(flux)
193 .expectNext("jack", "raj")
194 .verifyComplete();
195 }
196
197 @Test
198 void test_delayElements_virtualTime() {
199 VirtualTimeScheduler.getOrSet();
200 Flux flux = Flux.just("jack", "raj")
201 .map(e -> {
202 log.info("Received: {}", e);
203 //Bad idea to do Thread.sleep or any blocking call.
204 //Use delayElements.
205 return e;
206 }).delayElements(Duration.ofDays(1));
207 flux.subscribe(System.out::println);
208 //Use virtual time as test cant wait for 1 day
209 StepVerifier.withVirtualTime(() -> flux)
210 .thenAwait(Duration.ofDays(2))
211 .expectNext("jack", "raj")
212 .verifyComplete();
213 }
214
215 /**
216 * ********************************************************************
217 * block
218 * Never use .block() as it blocks the thread.
219 * Can we used in tests but not in main code.
220 * ********************************************************************
221 */
222 @Test
223 void test_block() {
224 String name = Mono.just("jack")
225 .block();
226 System.out.println(name);
227 }
228
229 /**
230 * ********************************************************************
231 * flux from array, list, stream
232 * ********************************************************************
233 */
234 @Test
235 void test_fromArray() {
236 Integer[] arr = {1, 2, 3, 4, 5};
237 Flux<Integer> flux = Flux.fromArray(arr);
238 flux.subscribe(System.out::println);
239 StepVerifier.create(flux)
240 .expectNext(1, 2, 3, 4, 5)
241 .verifyComplete();
242 }
243
244 @Test
245 void test_fromIterable() {
246 Flux<String> flux = Flux.fromIterable(List.of("jack", "raj"));
247 StepVerifier.create(flux)
248 .expectNext("jack", "raj")
249 .verifyComplete();
250 }
251
252 @Test
253 void test_fromStream() {
254 Flux<Integer> flux = Flux.fromStream(() -> List.of(1, 2, 3, 4, 5).stream());
255 StepVerifier.create(flux)
256 .expectNext(1, 2, 3, 4, 5)
257 .verifyComplete();
258 }
259
260 /**
261 * ********************************************************************
262 * flux range
263 * ********************************************************************
264 */
265 @Test
266 public void test_range() {
267 Flux<Integer> flux = Flux.range(1, 5);
268 flux.subscribe(n -> {
269 log.info("number: {}", n);
270 });
271 StepVerifier.create(flux)
272 .expectNext(1, 2, 3, 4, 5)
273 .verifyComplete();
274 }
275
276 /**
277 * ********************************************************************
278 * map - synchronous by nature
279 * ********************************************************************
280 */
281 @Test
282 public void test_map() {
283 Flux<String> flux1 = Flux.just("jack", "raj")
284 .map(String::toUpperCase);
285 StepVerifier
286 .create(flux1)
287 .expectNext("JACK", "RAJ")
288 .verifyComplete();
289
290 Flux<Integer> flux2 = Flux.range(3, 2)
291 .map(i -> i + 100);
292 flux2.subscribe(System.out::println);
293 StepVerifier.create(flux2)
294 .expectNext(103, 104)
295 .verifyComplete();
296 }
297
298 /**
299 * ********************************************************************
300 * flatMap - transform object 1-1 or 1-N in asynchronous fashion, returns back Mono/Flux. Use when there is delay/IO involved.
301 * map - transform an object 1-1 in fixed time in synchronous fashion. Use when there is no delay/IO involved.
302 *
303 * flatMap - processing is concurrent
304 * so all threads can run at same time not guarantee of being sequential.
305 * ********************************************************************
306 */
307 @Test
308 void test_flatMap() {
309 Flux flux1 = Flux.just("jack", "raj")
310 .flatMap(HelperUtil::capitalizeReactive);
311 flux1.subscribe(System.out::println);
312 //No guarantee of order, jack can come first or raj can come first.
313 StepVerifier.create(flux1)
314 .expectSubscription()
315 .expectNextCount(2)
316 .verifyComplete();
317
318 //capitalize will happen in blocking fashion. If this function takes long or does I/O then it will be blocking
319 //Use map only when there is no IO involved in the function
320 Flux flux2 = Flux.just("jack", "raj")
321 .map(HelperUtil::capitalize);
322 flux2.subscribe(System.out::println);
323 StepVerifier.create(flux2)
324 .expectNext("JACK")
325 .expectNext("RAJ")
326 .verifyComplete();
327
328 Flux flux3 = Flux.fromIterable(getCustomers())
329 .flatMap(HelperUtil::capitalizeCustomerName);
330 flux1.subscribe(System.out::println);
331 //No guarantee of order
332 StepVerifier.create(flux3)
333 .expectNextCount(5)
334 .verifyComplete();
335 }
336
337 /**
338 * Here flatMap will run one after other as there is just 1 thread allocated for it.
339 * You can also look at concatMap to do the same
340 */
341 @Test
342 void test_flatMap_nonConcurrent() {
343 Flux<Integer> flux = Flux.range(1, 10)
344 .map(i -> i)
345 .flatMap(i -> {
346 return Mono.just(i);
347 }, 1);
348 flux.subscribe(System.out::println);
349 StepVerifier.create(flux)
350 .expectSubscription()
351 .expectNextCount(10)
352 .verifyComplete();
353 }
354
355 /**
356 * ********************************************************************
357 * flatMap - object modification
358 * ********************************************************************
359 */
360 @Test
361 void test_objectModification() {
362 //Modification of object in chain - done via flatMap
363 //Ideally create a new object instead of modifying the existing object.
364 Mono<Customer> mono = Mono.just(getCustomer())
365 .flatMap(e -> {
366 e.setCity("paris");
367 return Mono.just(e);
368 });
369 StepVerifier.create(mono)
370 .assertNext(e -> {
371 assertThat(e.getCity()).isEqualTo("paris");
372 })
373 .verifyComplete();
374 }
375
376 @Test
377 void test_objectModification_zipWith() {
378 //Modification of object in chain - done via zipWith
379 //The 2nd argument for zipWith is a combinator function that determines how the 2 mono are zipped
380 Mono<Customer> mono = Mono.just(getCustomer())
381 .zipWith(Mono.just("paris"), HelperUtil::changeCity);
382 StepVerifier.create(mono)
383 .assertNext(e -> {
384 assertThat(e.getCity()).isEqualTo("paris");
385 })
386 .verifyComplete();
387 }
388
389 /**
390 * ********************************************************************
391 * distinct
392 * ********************************************************************
393 */
394 @Test
395 void test_distinct_flux() {
396 Flux<String> flux = Flux.fromIterable(List.of("Jack", "Joe", "Jack", "Jill", "jack"))
397 .map(String::toUpperCase)
398 .distinct();
399 flux.subscribe(System.out::println);
400 StepVerifier.create(flux)
401 .expectNext("JACK", "JOE", "JILL")
402 .verifyComplete();
403 }
404
405 /**
406 * ********************************************************************
407 * concatMap - works only on flux, same as flatMap but order is preserved, concatMap takes more time but ordering is preserved.
408 * flatMap - Takes less time but ordering is lost.
409 * ********************************************************************
410 */
411 @Test
412 @SneakyThrows
413 void test_concatMap() {
414 Flux flux1 = Flux.just("jack", "raj")
415 .concatMap(HelperUtil::capitalizeReactive);
416 flux1.subscribe(System.out::println);
417 //Guarantee of order, jack will come first then raj.
418 StepVerifier.create(flux1)
419 .expectSubscription()
420 .expectNext("JACK", "RAJ")
421 .verifyComplete();
422
423 Flux flux2 = Flux.fromIterable(getCustomers())
424 .concatMap(HelperUtil::capitalizeCustomerName);
425 flux1.subscribe(System.out::println);
426 StepVerifier.create(flux2)
427 .expectSubscription()
428 .expectNextCount(5)
429 .verifyComplete();
430 }
431
432 /**
433 * ********************************************************************
434 * flatMapMany - similar to flatMap but flattens the flux
435 * ********************************************************************
436 */
437 @Test
438 void test_flatMapMany() {
439 Flux<String> flux1 = Mono.just("the quick brown fox jumps over the lazy dog")
440 .flatMapMany(e -> Flux.fromArray(e.toUpperCase().split("")))
441 .distinct()
442 .sort();
443 flux1.subscribe(System.out::println);
444 //26 letters in the alphabet
445 StepVerifier.create(flux1)
446 .expectNextCount(26)
447 .expectComplete();
448
449 Flux<Integer> flux2 = Mono.just(List.of(1, 2, 3))
450 .flatMapMany(it -> Flux.fromIterable(it));
451 flux2.subscribe(System.out::println);
452 StepVerifier
453 .create(flux2)
454 .expectNext(1, 2, 3)
455 .verifyComplete();
456
457 Flux flux3 = Mono.just(getCustomers())
458 .flatMapMany(e -> HelperUtil.capitalizeCustomerName(e));
459 flux1.subscribe(System.out::println);
460 StepVerifier.create(flux3)
461 .expectSubscription()
462 .expectNextCount(5)
463 .verifyComplete();
464 }
465
466 /**
467 * ********************************************************************
468 * flatMapIterable - convert mono of list to flux
469 * ********************************************************************
470 */
471 @Test
472 void test_flatMapIterable() {
473 Mono<List<Integer>> mono = Mono.just(Arrays.asList(1, 2, 3));
474 Flux<Integer> flux = mono.flatMapIterable(list -> list);
475 flux.subscribe(System.out::println);
476 StepVerifier
477 .create(flux)
478 .expectNext(1, 2, 3)
479 .verifyComplete();
480 }
481
482 /**
483 * ********************************************************************
484 * flatMapIterable - convert mono of map to flux
485 * ********************************************************************
486 */
487 @Test
488 void test_flatMapIterable2() {
489 Mono<Map<String, String>> mono = Mono.just(Map.of("foo", "bar"));
490 Flux<Map.Entry<String, String>> flux = mono.flatMapIterable(list -> list.entrySet());
491 flux.subscribe(System.out::println);
492 StepVerifier
493 .create(flux)
494 .expectNext(Map.entry("foo", "bar"))
495 .verifyComplete();
496 }
497
498 /**
499 * ********************************************************************
500 * transform - accepts a Function functional interface. Used when similar transform is used in many places
501 * input is flux/mono
502 * output is flux/mono
503 * takes a flux/mono and returns a flux/mono
504 * ********************************************************************
505 */
506 @Test
507 void test_transform() {
508 //Function defines input and output
509 Function<Flux<String>, Flux<String>> upperCaseFunction = name -> name.map(String::toUpperCase);
510 Flux<String> flux = Flux.fromIterable(List.of("Jack", "Joe"))
511 .transform(upperCaseFunction);
512 flux.subscribe(System.out::println);
513 StepVerifier
514 .create(flux)
515 .expectNext("JACK", "JOE")
516 .verifyComplete();
517 }
518
519 /**
520 * ********************************************************************
521 * switchIfEmpty - similar to defaultIfEmpty but return flux/mono
522 * defaultIfEmpty - return a fixed value.
523 * ********************************************************************
524 */
525 @Test
526 @SneakyThrows
527 void test_defaultIfEmpty() {
528 Flux<Object> flux1 = Flux.empty()
529 .defaultIfEmpty("empty")
530 .log();
531 StepVerifier.create(flux1)
532 .expectNext("empty")
533 .expectComplete()
534 .verify();
535
536 Flux<Object> flux2 = Flux.empty()
537 .switchIfEmpty(Flux.just("empty"))
538 .log();
539 StepVerifier.create(flux2)
540 .expectNext("empty")
541 .expectComplete()
542 .verify();
543 }
544
545 @Test
546 void test_optional() {
547 var mono1 = getHello(true)
548 .defaultIfEmpty("NONE");
549 StepVerifier.create(mono1)
550 .expectNext("HELLO")
551 .verifyComplete();
552
553 var mono2 = getHello(false)
554 .defaultIfEmpty("NONE");
555 StepVerifier.create(mono2)
556 .expectNext("NONE")
557 .verifyComplete();
558
559 var mono3 = getOptionalHello(true)
560 .filter(Optional::isPresent)
561 .map(Optional::get);
562 StepVerifier.create(mono3)
563 .expectNext("HELLO")
564 .verifyComplete();
565
566 var mono4 = getOptionalHello(false)
567 .filter(Optional::isPresent)
568 .map(Optional::get);
569 StepVerifier.create(mono4)
570 .expectNextCount(0)
571 .verifyComplete();
572 }
573
574 private Mono<String> getHello(Boolean flag) {
575 if (flag) {
576 return Mono.just("HELLO");
577 } else {
578 return Mono.empty();
579 }
580 }
581
582 private Mono<Optional<String>> getOptionalHello(Boolean flag) {
583 if (flag) {
584 return Mono.just(Optional.of("HELLO"));
585 } else {
586 return Mono.just(Optional.empty());
587 }
588 }
589
590 /**
591 * ********************************************************************
592 * switchIfEmpty with Optional
593 * ********************************************************************
594 */
595 @Test
596 public void test_switchIfEmpty() {
597 Mono<Optional<Customer>> c1 = Mono.justOrEmpty(Optional.empty());
598 Mono<Optional<Customer>> c2 = Mono.just(Optional.of(getCustomer()));
599
600 Mono<Optional<Customer>> mono1 = c1
601 .switchIfEmpty(Mono.just(Optional.of(new Customer())));
602 StepVerifier.create(mono1)
603 .expectNextCount(1)
604 .expectComplete()
605 .verify();
606
607 Mono<Optional<Customer>> mono2 = c2
608 .switchIfEmpty(Mono.just(Optional.empty()));
609 StepVerifier.create(mono2)
610 .expectNextCount(1)
611 .expectComplete()
612 .verify();
613 }
614
615 /**
616 * ********************************************************************
617 * switchIfEmpty - Used as if-else block
618 * ********************************************************************
619 */
620 @Test
621 void test_switchIfEmpty_if_else() {
622 final Customer customer = getCustomer();
623 //No need to use Mono.defer on the switchIfEmpty
624 Mono<String> mono = Mono.just(customer)
625 .flatMap(e -> {
626 if (customer.getCity().equals("bangalore")) {
627 return Mono.just("Timezone:IST");
628 } else {
629 return Mono.empty();
630 }
631 })
632 .switchIfEmpty(Mono.just("Timezone:GMT"));
633
634 StepVerifier.create(mono)
635 .expectNext("Timezone:GMT")
636 .verifyComplete();
637 }
638
639 /**
640 * ********************************************************************
641 * filterWhen - returns Mono
642 * filter - returns object
643 * ********************************************************************
644 */
645 @Test
646 void test_filterWhen() {
647 Flux<String> flux1 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").cache();
648 Flux<String> flux2 = flux1.filterWhen(f -> Mono.just(f.equals("apple")));
649 flux2.subscribe(System.out::println);
650 StepVerifier.create(flux2)
651 .expectNext("apple")
652 .verifyComplete();
653 }
654
655 /**
656 * ********************************************************************
657 * filterWhen - returns Mono
658 * filter - returns object
659 * ********************************************************************
660 */
661 @Test
662 void test_filter() {
663 Flux<String> flux1 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").cache();
664 Flux<String> flux2 = flux1.filter(f -> f.equals("apple"));
665 flux2.subscribe(System.out::println);
666 StepVerifier.create(flux2)
667 .expectNext("apple")
668 .verifyComplete();
669
670 //Get even numbers
671 Flux flux = Flux.just(1, 2, 3, 4, 5)
672 .filter(i -> i % 2 == 0);
673 flux.subscribe(System.out::println);
674 StepVerifier.create(flux)
675 .expectNext(2, 4)
676 .verifyComplete();
677 }
678
679 /**
680 * ********************************************************************
681 * intersect (common) - compare 2 flux for common elements
682 * ********************************************************************
683 */
684 @Test
685 void test_intersect_inefficient() {
686 Flux<String> flux1 = Flux.just("apple", "orange", "banana");
687 //Without cache on flux2 it will subscribe many times.
688 Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple").cache();
689 Flux<String> commonFlux = flux1.filter(f -> {
690 //toStream will block so should be avoided.
691 //Inefficient - toStream will block so should be avoided.
692 //Not for live stream or stream that can be subscribed only once.
693 return flux2.toStream().anyMatch(e -> e.equals(f));
694 });
695 commonFlux.subscribe(System.out::println);
696 StepVerifier.create(commonFlux)
697 .expectNext("apple", "orange")
698 .verifyComplete();
699 }
700
701 /**
702 * ********************************************************************
703 * intersect (common) - compare 2 flux for common elements
704 * ********************************************************************
705 */
706 @Test
707 void test_intersect_efficient_1() {
708 Flux<String> flux1 = Flux.just("apple", "orange", "banana");
709 Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
710 Flux<String> commonFlux = flux1
711 .collect(Collectors.toSet())
712 .flatMapMany(set -> {
713 return flux2
714 //Filter out matching
715 //Limitation is that you can only compare 1 value collected in set.
716 .filter(t -> set.contains(t));
717 });
718 commonFlux.subscribe(System.out::println);
719 StepVerifier.create(commonFlux)
720 .expectNext("apple", "orange")
721 .verifyComplete();
722 }
723
724 /**
725 * ********************************************************************
726 * intersect (common) - using join operator
727 * ********************************************************************
728 */
729 @Test
730 void test_intersect_efficient_2() {
731 Flux<String> flux1 = Flux.just("apple", "orange", "banana");
732 Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
733
734 Flux<String> commonFlux = flux1.join(flux2, s -> Flux.never(), s -> Flux.never(), Tuples::of)
735 //Filter out matching
736 .filter(t -> t.getT1().equals(t.getT2()))
737 //Revert to single value
738 .map(Tuple2::getT1)
739 //Remove duplicates, if any
740 .groupBy(f -> f)
741 .map(GroupedFlux::key);
742 commonFlux.subscribe(System.out::println);
743 StepVerifier.create(commonFlux)
744 .expectNext("apple", "orange")
745 .verifyComplete();
746 }
747
748 @Test
749 void test_intersect_efficient_3() {
750 Flux<String> flux1 = Flux.just("apple", "orange", "banana");
751 Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
752
753 Mono<List<String>> monoList1 = flux1.collectList();
754 Mono<List<String>> monoList2 = flux2.collectList();
755
756 Flux<String> commonFlux = Mono.zip(monoList1, monoList2)
757 .map(tuple -> {
758 List<String> list1 = tuple.getT1();
759 List<String> list2 = tuple.getT2();
760 list1.retainAll(list2);
761 return list1;
762 }).flatMapIterable(e -> e);
763
764 commonFlux.subscribe(System.out::println);
765 StepVerifier.create(commonFlux)
766 .expectNext("apple", "orange")
767 .verifyComplete();
768 }
769
770 @Test
771 void test_intersect_efficient_4() {
772 Flux<String> flux1 = Flux.just("apple", "orange", "banana");
773 Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
774
775 Flux<String> commonFlux = flux2.filterWhen(element ->
776 flux1.any(e -> e.equals(element))
777 );
778
779 commonFlux.subscribe(System.out::println);
780 StepVerifier.create(commonFlux)
781 .expectNext("apple", "orange")
782 .verifyComplete();
783 }
784
785 @Test
786 void test_intersect_efficient_5() {
787 Flux<String> flux1 = Flux.just("apple", "orange", "banana");
788 Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
789
790 Flux<String> commonFlux = flux2.concatMap(element2 ->
791 flux1.filter(element1 -> element1.equals(element2)).take(1)
792 );
793 commonFlux.subscribe(System.out::println);
794 StepVerifier.create(commonFlux)
795 .expectNext("apple", "orange")
796 .verifyComplete();
797 }
798
799 @Test
800 void test_intersect_efficient_6() {
801 Flux<String> flux1 = Flux.just("apple", "orange", "banana");
802 Flux<String> flux2 = Flux.just("apple", "orange", "pumpkin", "papaya", "walnuts", "grapes", "pineapple");
803
804 Flux<String> commonFlux = flux2.flatMap(element2 ->
805 flux1.flatMap(element1 ->
806 element1.equals(element2) ? Flux.just(element1) : Flux.empty()
807 ).take(1)
808 );
809 commonFlux.subscribe(System.out::println);
810 StepVerifier.create(commonFlux)
811 .expectNext("apple", "orange")
812 .verifyComplete();
813 }
814
815 /**
816 * ********************************************************************
817 * startWith - add new element to flux.
818 * ********************************************************************
819 */
820 @Test
821 public void test_startWith() {
822 Flux<Integer> flux1 = Flux.range(1, 3);
823 Flux<Integer> flux2 = flux1.startWith(0);
824 StepVerifier.create(flux2)
825 .expectNext(0, 1, 2, 3)
826 .verifyComplete();
827 }
828
829 /**
830 * ********************************************************************
831 * index
832 * ********************************************************************
833 */
834 @Test
835 void test_index() {
836 //append a number to each element.
837 Flux<Tuple2<Long, String>> flux = Flux
838 .just("apple", "banana", "orange")
839 .index();
840 StepVerifier.create(flux)
841 .expectNext(Tuples.of(0L, "apple"))
842 .expectNext(Tuples.of(1L, "banana"))
843 .expectNext(Tuples.of(2L, "orange"))
844 .verifyComplete();
845 }
846
847 /**
848 * ********************************************************************
849 * takeWhile
850 * ********************************************************************
851 */
852 @Test
853 void test_takeWhile() {
854 Flux<Integer> flux = Flux.range(1, 10);
855 Flux<Integer> takeWhile = flux.takeWhile(i -> i <= 5);
856 StepVerifier
857 .create(takeWhile)
858 .expectNext(1, 2, 3, 4, 5)
859 .verifyComplete();
860 }
861
862 /**
863 * ********************************************************************
864 * skipWhile
865 * ********************************************************************
866 */
867 @Test
868 void test_skipWhile() {
869 Flux<Integer> flux = Flux.range(1, 10);
870 Flux<Integer> skipWhile = flux.skipWhile(i -> i <= 5);
871 StepVerifier
872 .create(skipWhile)
873 .expectNext(6, 7, 8, 9, 10)
874 .verifyComplete();
875 }
876
877 /**
878 * ********************************************************************
879 * collectList - flux to mono of list
880 * ********************************************************************
881 */
882 @Test
883 void test_collectList() {
884 Mono<List<Integer>> flux = Flux
885 .just(1, 2, 3)
886 .collectList();
887 StepVerifier.create(flux)
888 .expectNext(Arrays.asList(1, 2, 3))
889 .verifyComplete();
890 }
891
892 /**
893 * ********************************************************************
894 * collectSortedList- flux to mono of list
895 * ********************************************************************
896 */
897 @Test
898 void test_collectSortedList() {
899 Mono<List<Integer>> listMono2 = Flux
900 .just(5, 2, 4, 1, 3)
901 .collectSortedList();
902 StepVerifier.create(listMono2)
903 .expectNext(Arrays.asList(1, 2, 3, 4, 5))
904 .verifyComplete();
905 }
906
907 /**
908 * ********************************************************************
909 * collectMap
910 * ********************************************************************
911 */
912 @Test
913 void test_collectMap() {
914 Mono<Map<Object, Object>> flux = Flux.just("yellow:banana", "red:apple")
915 .map(item -> item.split(":"))
916 .collectMap(item -> item[0], item -> item[1]);
917
918 Map<Object, Object> map = new HashMap<>();
919 flux.subscribe(map::putAll);
920 map.forEach((key, value) -> System.out.println(key + " -> " + value));
921
922 StepVerifier.create(flux)
923 .expectNext(Map.of("yellow", "banana", "red", "apple"))
924 .verifyComplete();
925 }
926
927 /**
928 * ********************************************************************
929 * collectMultimap
930 * ********************************************************************
931 */
932 @Test
933 void test_collectMultimap() {
934 Mono<Map<String, Collection<String>>> flux = Flux.just("yellow:banana", "red:grapes", "red:apple", "yellow:pineapple")
935 .map(item -> item.split(":"))
936 .collectMultimap(
937 item -> item[0],
938 item -> item[1]);
939 Map<Object, Collection<String>> map = new HashMap<>();
940 flux.subscribe(map::putAll);
941 map.forEach((key, value) -> System.out.println(key + " -> " + value));
942
943 StepVerifier.create(flux)
944 .expectNext(Map.of("red", List.of("grapes", "apple"), "yellow", List.of("banana", "pineapple")))
945 .verifyComplete();
946 }
947
948 /**
949 * ********************************************************************
950 * concat - subscribes to publishers in sequence, order guaranteed, static function
951 * concatWith - subscribes to publishers in sequence, order guaranteed, instance function
952 * ********************************************************************
953 */
954 @Test
955 @SneakyThrows
956 void test_concat() {
957 Flux<String> flux1 = Flux.just("a", "b");
958 Flux<String> flux2 = Flux.just("c", "d");
959 Flux<String> flux3 = Flux.concat(flux1, flux2);
960
961 StepVerifier.create(flux3)
962 .expectSubscription()
963 .expectNext("a", "b", "c", "d")
964 .verifyComplete();
965
966 Flux<String> flux4 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
967 Flux<String> flux5 = Flux.just("c", "d");
968 //Lazy will wait till first flux finishes.
969 Flux<String> flux6 = Flux.concat(flux1, flux2).log();
970
971 StepVerifier.create(flux6)
972 .expectSubscription()
973 .expectNext("a", "b", "c", "d")
974 .verifyComplete();
975 }
976
977 /**
978 * ********************************************************************
979 * concat - subscribes to publishers in sequence, order guaranteed, static function
980 * concatWith - subscribes to publishers in sequence, order guaranteed, instance function
981 * ********************************************************************
982 */
983 @Test
984 @SneakyThrows
985 void test_concatWith() {
986 Flux<String> flux1 = Flux.just("a", "b");
987 Flux<String> flux2 = Flux.just("c", "d");
988 Flux<String> flux3 = flux1.concatWith(flux2);
989 StepVerifier.create(flux3)
990 .expectSubscription()
991 .expectNext("a", "b", "c", "d")
992 .verifyComplete();
993
994 Mono<String> aFlux = Mono.just("a");
995 Mono<String> bFlux = Mono.just("b");
996 Flux<String> stringFlux = aFlux.concatWith(bFlux);
997 stringFlux.subscribe(System.out::println);
998 StepVerifier.create(stringFlux)
999 .expectNext("a", "b")
1000 .verifyComplete();
1001 }
1002
1003 /**
1004 * ********************************************************************
1005 * concatDelayError - When one flux can throw an error
1006 * ********************************************************************
1007 */
1008 @Test
1009 void test_concatDelayError() {
1010 Flux<String> flux1 = Flux.just("a", "b", "c")
1011 .map(s -> {
1012 if (s.equals("b")) {
1013 throw new RuntimeException("error!");
1014 }
1015 return s;
1016 });
1017 Flux<String> flux2 = Flux.just("d", "e", "f");
1018 Flux<String> flux3 = Flux.concatDelayError(flux1, flux2);
1019
1020 StepVerifier.create(flux3)
1021 .expectSubscription()
1022 .expectNext("a", "d", "e", "f")
1023 .expectError()
1024 .verify();
1025 }
1026
1027 /**
1028 * ********************************************************************
1029 * combineLatest - will change order based on time. Rarely used.
1030 * ********************************************************************
1031 */
1032 @Test
1033 void test_combineLatest() {
1034 Flux<String> flux1 = Flux.just("a", "b");
1035 Flux<String> flux2 = Flux.just("c", "d");
1036 Flux<String> flux3 = Flux.combineLatest(flux1, flux2, (s1, s2) -> s1 + s2)
1037 .log();
1038 StepVerifier.create(flux3)
1039 .expectSubscription()
1040 .expectNext("bc", "bd")
1041 .verifyComplete();
1042 }
1043
1044 /**
1045 * ********************************************************************
1046 * merge - subscribes to publishers eagerly, order not guaranteed, static function
1047 * ********************************************************************
1048 */
1049 @Test
1050 @SneakyThrows
1051 void test_merge() {
1052 Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
1053 Flux<String> flux2 = Flux.just("c", "d");
1054 //Eager will not wait till first flux3 finishes.
1055 Flux<String> flux3 = Flux.merge(flux1, flux2);
1056
1057 StepVerifier.create(flux3)
1058 .expectSubscription()
1059 .expectNext("c", "d", "a", "b")
1060 .verifyComplete();
1061 }
1062
1063 /**
1064 * ********************************************************************
1065 * mergeWith - subscribes to publishers in eagerly, order not guaranteed, instance function
1066 * ********************************************************************
1067 */
1068 @Test
1069 @SneakyThrows
1070 void test_mergeWith() {
1071 Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
1072 Flux<String> flux2 = Flux.just("c", "d");
1073 //Eager will not wait till first flux finishes.
1074 Flux<String> flux3 = flux1.mergeWith(flux2);
1075
1076 StepVerifier.create(flux3)
1077 .expectSubscription()
1078 .expectNext("c", "d", "a", "b")
1079 .verifyComplete();
1080
1081 Mono aMono = Mono.just("a");
1082 Mono bMono = Mono.just("b");
1083 Flux flux4 = aMono.mergeWith(bMono);
1084 StepVerifier.create(flux4)
1085 .expectNext("a", "b")
1086 .verifyComplete();
1087 }
1088
1089 /**
1090 * ********************************************************************
1091 * mergeSequential - subscribes to publishers eagerly, result is sequential.
1092 * concat - subscribes to publishers in sequence, result is sequential.
1093 * ********************************************************************
1094 */
1095 @Test
1096 @SneakyThrows
1097 void test_mergeSequential() {
1098 Flux<String> flux1 = Flux.just("a", "b").delayElements(Duration.ofMillis(200));
1099 Flux<String> flux2 = Flux.just("c", "d");
1100 Flux<String> flux3 = Flux.mergeSequential(flux1, flux2, flux1);
1101
1102 StepVerifier.create(flux3)
1103 .expectSubscription()
1104 .expectNext("a", "b", "c", "d", "a", "b")
1105 .verifyComplete();
1106 }
1107
1108 /**
1109 * ********************************************************************
1110 * mergeDelayError - when one flux can throw an error
1111 * ********************************************************************
1112 */
1113 @Test
1114 void test_mergeDelayError() {
1115 Flux<String> flux1 = Flux.just("a", "b")
1116 .map(s -> {
1117 if (s.equals("b")) {
1118 throw new RuntimeException("error");
1119 }
1120 return s;
1121 }).doOnError(e -> log.error("Error: {}", e));
1122
1123 Flux<String> flux2 = Flux.just("c", "d");
1124 Flux<String> flux3 = Flux.mergeDelayError(1, flux1, flux2, flux1);
1125
1126 StepVerifier.create(flux3)
1127 .expectSubscription()
1128 .expectNext("a", "c", "d", "a")
1129 .expectError()
1130 .verify();
1131 }
1132
1133 /**
1134 * ********************************************************************
1135 * zip - subscribes to publishers in eagerly, waits for both flux to emit one element.
1136 * 2-8 flux can be zipped, returns a tuple, Static function
1137 * ********************************************************************
1138 */
1139 @Test
1140 void test_zip() {
1141 Flux<String> flux1 = Flux.just("red", "yellow");
1142 Flux<String> flux2 = Flux.just("apple", "banana");
1143 Flux<String> flux3 = Flux.zip(flux1, flux2)
1144 .map(tuple -> {
1145 return (tuple.getT1() + " " + tuple.getT2());
1146 });
1147 flux3.subscribe(System.out::println);
1148 StepVerifier.create(flux3)
1149 .expectNext("red apple")
1150 .expectNext("yellow banana")
1151 .verifyComplete();
1152
1153 //Third argument is combinator lambda
1154 Flux<Integer> firstFlux = Flux.just(1, 2, 3);
1155 Flux<Integer> secondFlux = Flux.just(10, 20, 30, 40);
1156 //Define how the zip should happen
1157 Flux<Integer> zip = Flux.zip(firstFlux, secondFlux, (num1, num2) -> num1 + num2);
1158 StepVerifier
1159 .create(zip)
1160 .expectNext(11, 22, 33)
1161 .verifyComplete();
1162 }
1163
1164 /**
1165 * ********************************************************************
1166 * zipWith - subscribes to publishers in eagerly, waits for both flux to emit one element.
1167 * 2-8 flux can be zipped, returns a tuple, Instance function
1168 * When 2 different size flux are combined zipWith return the smaller item size new flux.
1169 * ********************************************************************
1170 */
1171 @Test
1172 void test_zipWith() {
1173 Flux<String> flux1 = Flux.just("red", "yellow");
1174 Flux<String> flux2 = Flux.just("apple", "banana");
1175 Flux<String> flux3 = flux1.zipWith(flux2)
1176 .map(tuple -> {
1177 return (tuple.getT1() + " " + tuple.getT2());
1178 });
1179 StepVerifier.create(flux3)
1180 .expectNext("red apple")
1181 .expectNext("yellow banana")
1182 .verifyComplete();
1183
1184 Flux<String> flux4 = Flux.fromIterable(Arrays.asList("apple", "orange", "banana"))
1185 .zipWith(Flux.range(1, 5), (word, line) -> {
1186 return line + ". " + word;
1187 });
1188 StepVerifier.create(flux4)
1189 .expectNext("1. apple")
1190 .expectNext("2. orange")
1191 .expectNext("3. banana")
1192 .verifyComplete();
1193 }
1194
1195 /**
1196 * ********************************************************************
1197 * Cant do zipWith to combine mono & flux.
1198 * Use the operator join
1199 * ********************************************************************
1200 */
1201 @Test
1202 void test_join() {
1203 Mono<String> mono = Mono.just("green");
1204 Flux<String> flux1 = Flux.just("apple", "banana")
1205 .join(mono, s -> Flux.never(), s -> Flux.never(), Tuples::of)
1206 .flatMap(tuple -> {
1207 return Mono.just(tuple.getT2() + " " + tuple.getT1());
1208 });
1209 StepVerifier.create(flux1)
1210 .expectNext("green apple")
1211 .expectNext("green banana")
1212 .verifyComplete();
1213
1214 Flux<String> flux2 = Flux.just("apple", "banana")
1215 .zipWith(mono.cache().repeat())
1216 .flatMap(tuple -> {
1217 return Mono.just(tuple.getT2() + " " + tuple.getT1());
1218 });
1219 StepVerifier.create(flux2)
1220 .expectNext("green apple")
1221 .expectNext("green banana")
1222 .verifyComplete();
1223 }
1224
1225 /**
1226 * ********************************************************************
1227 * error
1228 * ********************************************************************
1229 */
1230 @Test
1231 void test_onError() {
1232 Mono<String> mono1 = Mono.just("jack")
1233 .map(s -> {
1234 throw new RuntimeException("ERROR");
1235 });
1236 mono1.subscribe(s -> log.info("name: {}", s), Throwable::printStackTrace);
1237 StepVerifier.create(mono1)
1238 .expectError(RuntimeException.class)
1239 .verify();
1240
1241 System.out.println("********************************************************************");
1242
1243 Mono<String> mono2 = Mono.just("jack")
1244 .flatMap(s -> {
1245 return Mono.error(new RuntimeException("ERROR"));
1246 });
1247 mono2.subscribe(s -> log.info("name: {}", s), Throwable::printStackTrace);
1248
1249 StepVerifier.create(mono2)
1250 .expectError(RuntimeException.class)
1251 .verify();
1252 }
1253
1254 /**
1255 * ********************************************************************
1256 * Error Recover Handling
1257 * onErrorReturn - Return value on error
1258 * ********************************************************************
1259 */
1260 @Test
1261 void test_onErrorReturn() {
1262 Mono<Object> mono1 = Mono.error(new RuntimeException("error"))
1263 .onErrorReturn("Jack");
1264 StepVerifier.create(mono1)
1265 .expectNext("Jack")
1266 .verifyComplete();
1267 }
1268
1269 /**
1270 * ********************************************************************
1271 * Error Recover Handling
1272 * onErrorResume - Resume chain with new mono/flux.
1273 * ********************************************************************
1274 */
1275 @Test
1276 void test_onErrorResume() {
1277 Mono<Object> mono1 = Mono.error(new RuntimeException("error"))
1278 .onErrorResume(e -> Mono.just("Jack"));
1279 StepVerifier.create(mono1)
1280 .expectNext("Jack")
1281 .verifyComplete();
1282
1283 Mono<Object> mono2 = Mono.error(new RuntimeException("error"))
1284 .onErrorResume(s -> {
1285 log.info("Inside on onErrorResume");
1286 return Mono.just("Jack");
1287 })
1288 .log();
1289 StepVerifier.create(mono2)
1290 .expectNext("Jack")
1291 .verifyComplete();
1292 }
1293
1294 /**
1295 * ********************************************************************
1296 * Error Recover Handling
1297 * onErrorContinue - Continue chain even if error occurs
1298 * ********************************************************************
1299 */
1300 @Test
1301 void test_onErrorContinue() {
1302 Flux<String> flux =
1303 Flux.just("a", "b", "c")
1304 .map(e -> {
1305 if (e.equals("b")) {
1306 throw new RuntimeException("error");
1307 }
1308 return e;
1309 })
1310 .concatWith(Mono.just("d"))
1311 .onErrorContinue((ex, value) -> {
1312 log.info("Exception: {}", ex);
1313 log.info("value: {}", value);
1314 });
1315 StepVerifier.create(flux)
1316 .expectNext("a", "c", "d")
1317 .verifyComplete();
1318 }
1319
1320 /**
1321 * ********************************************************************
1322 * Error - Action
1323 * doOnError - log the error, Side-effect operator.
1324 * ********************************************************************
1325 */
1326 @Test
1327 void test_doOnError() {
1328 Mono<Object> mono1 = Mono.error(new RuntimeException("error"))
1329 .doOnError(e -> log.error("Error: {}", e.getMessage()))
1330 .log();
1331 StepVerifier.create(mono1)
1332 .expectError(RuntimeException.class)
1333 .verify();
1334 }
1335
1336 /**
1337 * ********************************************************************
1338 * Error - Action
1339 * onErrorMap - Transform an error emitted
1340 * ********************************************************************
1341 */
1342 @Test
1343 void test_onErrorMap() {
1344 Flux flux = Flux.just("Jack", "Jill")
1345 .map(u -> {
1346 if (u.equals("Jill")) {
1347 //always do throw here, never do return.
1348 throw new IllegalArgumentException("Not valid");
1349 }
1350 if (u.equals("Jack")) {
1351 throw new ClassCastException("Not valid");
1352 }
1353 return u;
1354 }).onErrorMap(IllegalArgumentException.class, e -> {
1355 log.info("Illegal Arg error");
1356 throw new RuntimeException("Illegal Arg error!");
1357 }).onErrorMap(ClassCastException.class, e -> {
1358 log.info("Class cast error");
1359 throw new RuntimeException("Class cast error!");
1360 });
1361
1362 StepVerifier.create(flux)
1363 .expectErrorMessage("Class cast error!")
1364 .verify();
1365 }
1366
1367 /**
1368 * ********************************************************************
1369 * retry
1370 * ********************************************************************
1371 */
1372 @Test
1373 void test_retry() {
1374 AtomicLong attemptCounter = new AtomicLong();
1375 Mono<String> mono = Mono.just("Jack")
1376 .flatMap(n -> {
1377 return this.twoAttemptFunction(attemptCounter, n);
1378 })
1379 .retry(3);
1380 StepVerifier.create(mono)
1381 .assertNext(e -> {
1382 assertThat(e).isEqualTo("Hello Jack");
1383 })
1384 .verifyComplete();
1385 }
1386
1387 private Mono<String> twoAttemptFunction(AtomicLong counter, String name) {
1388 Long attempt = counter.getAndIncrement();
1389 log.info("attempt value: {}", attempt);
1390 if (attempt < 2) {
1391 throw new RuntimeException("error");
1392 }
1393 return Mono.just("Hello " + name);
1394 }
1395
1396 /**
1397 * ********************************************************************
1398 * retryWhen
1399 * ********************************************************************
1400 */
1401 @Test
1402 void test_retryWhen() {
1403 AtomicLong attemptCounter1 = new AtomicLong();
1404 RetryBackoffSpec retryFilter1 = Retry.backoff(3, Duration.ofSeconds(1))
1405 .filter(throwable -> throwable instanceof RuntimeException);
1406
1407 Mono<String> mono1 = Mono.just("Jack")
1408 .flatMap(e -> this.greetAfter2Failure(attemptCounter1, e))
1409 .retryWhen(retryFilter1);
1410 StepVerifier.create(mono1)
1411 .assertNext(e -> {
1412 assertThat(e).isEqualTo("Hello Jack");
1413 })
1414 .verifyComplete();
1415
1416 AtomicLong attemptCounter2 = new AtomicLong();
1417 RetryBackoffSpec retryFilter2 = Retry.fixedDelay(1, Duration.ofSeconds(1))
1418 .filter(throwable -> throwable instanceof RuntimeException)
1419 .onRetryExhaustedThrow(((retryBackoffSpec, retrySignal) ->
1420 Exceptions.propagate(retrySignal.failure())
1421 ));
1422 Mono<String> mono2 = Mono.just("Jack")
1423 .flatMap(e -> this.greetAfter2Failure(attemptCounter2, e))
1424 .retryWhen(retryFilter2);
1425 StepVerifier.create(mono2)
1426 .expectErrorMessage("error")
1427 .verify();
1428 }
1429
1430 private Mono<String> greetAfter2Failure(AtomicLong attemptCounter, String name) {
1431 Long attempt = attemptCounter.getAndIncrement();
1432 log.info("attempt value: {}", attempt);
1433 if (attempt < 2) {
1434 throw new RuntimeException("error");
1435 }
1436 return Mono.just("Hello " + name);
1437 }
1438
1439 /**
1440 * ********************************************************************
1441 * repeat - repeat an operation n times.
1442 * ********************************************************************
1443 */
1444 @Test
1445 void test_repeat() {
1446 Mono<List<String>> flux = Mono.defer(() -> {
1447 return Mono.just("UUID " + UUID.randomUUID());
1448 })
1449 .repeat(5)
1450 .collectList();
1451 flux.subscribe(System.out::println);
1452
1453 StepVerifier.create(flux)
1454 .assertNext(e -> {
1455 assertThat(e.size()).isEqualTo(6);
1456 })
1457 .verifyComplete();
1458 }
1459
1460 @Test
1461 void test_takeUntil() {
1462 AtomicLong counter = new AtomicLong();
1463 Mono<List<String>> flux = Mono.defer(() -> {
1464 return Mono.just("UUID " + UUID.randomUUID());
1465 })
1466 .repeat()
1467 .takeUntil(e -> {
1468 return counter.incrementAndGet() == 5;
1469 })
1470 .collectList();
1471
1472 StepVerifier.create(flux)
1473 .assertNext(e -> {
1474 assertThat(e.size()).isEqualTo(5);
1475 })
1476 .verifyComplete();
1477 }
1478
1479 /**
1480 * ********************************************************************
1481 * Subscribe onComplete, onError
1482 * Never use this format of subscribe code, always use doOn operator
1483 * ********************************************************************
1484 */
1485 @Test
1486 void test_doOn() {
1487 Flux<Integer> numFlux = Flux.range(1, 5)
1488 .map(i -> {
1489 if (i == 4) {
1490 throw new RuntimeException("error");
1491 }
1492 return i;
1493 });
1494 numFlux.subscribe(s -> {
1495 log.info("Number: {}", s);
1496 },
1497 Throwable::printStackTrace,
1498 () -> {
1499 log.info("Done!");
1500 });
1501 StepVerifier.create(numFlux)
1502 .expectNext(1, 2, 3)
1503 .expectError(RuntimeException.class)
1504 .verify();
1505 }
1506
1507 /**
1508 * ********************************************************************
1509 * doOn - doOnSubscribe, doOnNext, doOnError, doFinally, doOnComplete
1510 * ********************************************************************
1511 */
1512 @Test
1513 void test_test_doOn_2() {
1514 Flux<Object> flux = Flux.error(new RuntimeException("error"))
1515 .doOnSubscribe(s -> System.out.println("Subscribed!"))
1516 .doOnRequest(s -> System.out.println("Requested!"))
1517 .doOnNext(p -> System.out.println("Next!"))
1518 .doOnComplete(() -> System.out.println("Completed!"))
1519 .doFinally((e) -> System.out.println("Signal: " + e))
1520 .doOnError((e) -> System.out.println("Error: " + e));
1521
1522 StepVerifier.create(flux)
1523 .expectError(RuntimeException.class)
1524 .verify();
1525
1526 StepVerifier.create(flux)
1527 .verifyError(RuntimeException.class);
1528
1529 Mono<Object> mono = Mono.error(new RuntimeException("error"))
1530 .doOnSubscribe(s -> System.out.println("Subscribed!"))
1531 .doOnRequest(s -> System.out.println("Requested!"))
1532 .doOnNext(p -> System.out.println("Next!"))
1533 .doFinally((e) -> System.out.println("Signal: " + e))
1534 .doOnError((e) -> System.out.println("Error: " + e))
1535 .doOnSuccess((e) -> System.out.println("Success!"));
1536
1537 StepVerifier.create(mono)
1538 .expectError(RuntimeException.class)
1539 .verify();
1540 }
1541
1542 @Test
1543 void test_doOn_3() {
1544 Flux flux = Flux.error(new RuntimeException("My Error"));
1545 flux.subscribe(
1546 onNext(),
1547 onError(),
1548 onComplete()
1549 );
1550 }
1551
1552 private Consumer<Object> onNext() {
1553 return o -> System.out.println("Received : " + o);
1554 }
1555
1556 private Consumer<Throwable> onError() {
1557 return e -> System.out.println("ERROR : " + e.getMessage());
1558 }
1559
1560 private Runnable onComplete() {
1561 return () -> System.out.println("Completed");
1562 }
1563
1564 /**
1565 * ********************************************************************
1566 * StepVerifier - assertNext, thenRequest, thenCancel, expectError, expectErrorMessage
1567 * ********************************************************************
1568 */
1569 @Test
1570 void test_StepVerifier() {
1571 Flux flux1 = Flux.fromIterable(Arrays.asList("Jack", "Jill"));
1572 StepVerifier.create(flux1)
1573 .expectNextMatches(user -> user.equals("Jack"))
1574 .assertNext(user -> assertThat(user).isEqualTo("Jill"))
1575 .verifyComplete();
1576
1577 //Wait for 2 elements.
1578 StepVerifier.create(flux1)
1579 .expectNextCount(2)
1580 .verifyComplete();
1581
1582 //Request 1 value at a time, get 2 values then cancel.
1583 Flux flux2 = Flux.fromIterable(Arrays.asList("Jack", "Jill", "Raj"));
1584 StepVerifier.create(flux2, 1)
1585 .expectNext("JACK")
1586 .thenRequest(1)
1587 .expectNext("JILL")
1588 .thenCancel();
1589
1590 Mono<Object> mono1 = Mono.error(new RuntimeException("My Error"));
1591 StepVerifier.create(mono1)
1592 .expectError(RuntimeException.class)
1593 .verify();
1594 StepVerifier.create(mono1)
1595 .expectErrorMessage("My Error")
1596 .verify();
1597 }
1598
1599 /**
1600 * ********************************************************************
1601 * flux error propagate
1602 * ********************************************************************
1603 */
1604 @Test
1605 void test_propagate() {
1606 Flux flux = Flux.just("Jack", "Jill")
1607 .map(u -> {
1608 try {
1609 return HelperUtil.checkName(u);
1610 } catch (HelperUtil.CustomException e) {
1611 throw Exceptions.propagate(e);
1612 }
1613 });
1614 flux.subscribe(System.out::println);
1615 StepVerifier.create(flux)
1616 .expectNext("JACK")
1617 .verifyError(HelperUtil.CustomException.class);
1618 }
1619
1620 /**
1621 * ********************************************************************
1622 * subscribeOn - influences upstream (whole chain)
1623 * ********************************************************************
1624 */
1625 @Test
1626 void test_subscribeOn() {
1627 Flux numbFlux = Flux.range(1, 5)
1628 .map(i -> {
1629 log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1630 return i;
1631 }).subscribeOn(Schedulers.single())
1632 .map(i -> {
1633 log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1634 return i;
1635 });
1636 numbFlux.subscribe();
1637 }
1638
1639 @SneakyThrows
1640 @Test
1641 void test_subscribeOn_t1() {
1642 Flux<Integer> flux1 = Flux.range(0, 2)
1643 .map(i -> {
1644 //will run on incoming thread
1645 log.info("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
1646 return i;
1647 });
1648 Runnable r1 = () -> flux1.subscribe(s -> {
1649 log.info("Received " + s + " via " + Thread.currentThread().getName());
1650 });
1651 Thread t1 = new Thread(r1, "t1");
1652 log.info("Program thread :: " + Thread.currentThread().getName());
1653 t1.start();
1654 t1.join();
1655 }
1656
1657 @SneakyThrows
1658 @Test
1659 void test_subscribeOn_t2() {
1660 Flux<Integer> flux2 = Flux.range(0, 2)
1661 .map(i -> {
1662 //will run on incoming thread
1663 log.info("Upstream: Mapping for {} is done by thread {}", i, Thread.currentThread().getName());
1664 return i;
1665 })
1666 .publishOn(Schedulers.single())
1667 .map(i -> {
1668 //will run on new thread
1669 log.info("Downstream: Mapping for {} is done by thread {}", i, Thread.currentThread().getName());
1670 return i;
1671 });
1672 Runnable r2 = () -> flux2.subscribe(s -> {
1673 log.info("Received {} via {}", s, Thread.currentThread().getName());
1674 });
1675 Thread t2 = new Thread(r2, "t2");
1676 log.info("Program thread {}" + Thread.currentThread().getName());
1677 t2.start();
1678 t2.join();
1679 }
1680
1681 @SneakyThrows
1682 @Test
1683 void test_subscribeOn_t3() {
1684 Flux<Integer> flux3 = Flux.range(0, 2)
1685 .map(i -> {
1686 //will run on new thread
1687 log.info("Upstream: Mapping for {} is done by thread {}", i, Thread.currentThread().getName());
1688 return i;
1689 })
1690 .subscribeOn(Schedulers.single())
1691 .map(i -> {
1692 //will run on new thread
1693 log.info("Downstream: Mapping for {} is done by thread {}", i, Thread.currentThread().getName());
1694 return i;
1695 });
1696 Runnable r3 = () -> flux3.subscribe(s -> {
1697 log.info("Received {} via {}", s, Thread.currentThread().getName());
1698 });
1699 Thread t3 = new Thread(r3, "t2");
1700 log.info("Program thread {}" + Thread.currentThread().getName());
1701 t3.start();
1702 t3.join();
1703 }
1704
1705 /**
1706 * ********************************************************************
1707 * Schedulers
1708 *
1709 * parallel - for CPU intensive tasks (computation), thread pool workers = number of CPU cores
1710 * newParallel - same as above but new pool
1711 * boundedElastic - for IO intensive tasks (network calls), thread pool contains 10 * number of CPU cores
1712 * newBoundedElastic - same as above but new pool
1713 * immediate - keep the execution in the current thread
1714 * single - single reusable thread for all the callers
1715 * newSingle - same as above but new pool
1716 * elastic - unlimited threads (DON'T USE)
1717 *
1718 * We can have multiple publishOn methods which will keep switching the context.
1719 * The subscribeOn method can not do that. Only the very first subscribeOn method which is close to the source takes precedence.
1720 * ********************************************************************
1721 */
1722 @Test
1723 void test_Schedulers() {
1724 Flux numbFlux = Flux.range(1, 5)
1725 .map(i -> {
1726 log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1727 return i;
1728 }).subscribeOn(Schedulers.newSingle("my-thread"))
1729 .map(i -> {
1730 log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1731 return i;
1732 });
1733 numbFlux.subscribe();
1734 }
1735
1736 /**
1737 * ********************************************************************
1738 * publishOn - influences downstream
1739 * ********************************************************************
1740 */
1741 @Test
1742 void test_publishOn() {
1743 Flux numbFlux = Flux.range(1, 5)
1744 .map(i -> {
1745 log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1746 return i;
1747 }).publishOn(Schedulers.single())
1748 .map(i -> {
1749 log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1750 return i;
1751 });
1752 numbFlux.subscribe();
1753 }
1754
1755 /**
1756 * ********************************************************************
1757 * publishOn - influences downstream
1758 * ********************************************************************
1759 */
1760 @Test
1761 void test_publishOn_2() {
1762 Flux numbFlux = Flux.range(1, 5)
1763 .map(i -> {
1764 log.info("Map1 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1765 return i;
1766 }).publishOn(Schedulers.newSingle("my-thread"))
1767 .map(i -> {
1768 log.info("Map2 Num: {}, Thread: {}", i, Thread.currentThread().getName());
1769 return i;
1770 });
1771 numbFlux.subscribe();
1772 }
1773
1774 /**
1775 * ********************************************************************
1776 * fromSupplier - returns a value
1777 * fromCallable - returns a value or exception
1778 * fromRunnable - doesnt return value
1779 * ********************************************************************
1780 */
1781 @Test
1782 public void test_fromSupplier() {
1783 Supplier<String> stringSupplier = () -> getName();
1784 Mono<String> mono = Mono.fromSupplier(stringSupplier);
1785
1786 mono.subscribe(System.out::println);
1787 }
1788
1789 /**
1790 * ********************************************************************
1791 * fromSupplier - returns a value
1792 * fromCallable - returns a value or exception
1793 * fromRunnable - doesnt return value
1794 * ********************************************************************
1795 */
1796 @Test
1797 public void test_fromCallable() {
1798 Callable<String> stringCallable = () -> getName();
1799 Mono<String> mono = Mono.fromCallable(stringCallable)
1800 .subscribeOn(Schedulers.boundedElastic());
1801 mono.subscribe(System.out::println);
1802 }
1803
1804 /**
1805 * ********************************************************************
1806 * fromSupplier - returns a value
1807 * fromCallable - returns a value or exception
1808 * fromRunnable - doesnt return value
1809 * ********************************************************************
1810 */
1811 @Test
1812 public void test_fromRunnable() {
1813 Runnable stringCallable = () -> getName();
1814 Mono<Object> mono = Mono.fromRunnable(stringCallable)
1815 .subscribeOn(Schedulers.boundedElastic());
1816 mono.subscribe(System.out::println);
1817 }
1818
1819 /**
1820 * ********************************************************************
1821 * fromCallable - read file may be blocking, we don't want to block main thread.
1822 * ********************************************************************
1823 */
1824 @Test
1825 @SneakyThrows
1826 void test_readFile_fromCallable() {
1827 Mono<List<String>> listMono = Mono.fromCallable(() -> Files.readAllLines(Path.of("src/test/resources/file.txt")))
1828 .subscribeOn(Schedulers.boundedElastic());
1829 listMono.subscribe(l -> log.info("Lines: {}", l));
1830
1831 StepVerifier.create(listMono)
1832 .expectSubscription()
1833 .thenConsumeWhile(l -> {
1834 assertThat(l.isEmpty()).isFalse();
1835 return true;
1836 })
1837 .verifyComplete();
1838 }
1839
1840 /**
1841 * ********************************************************************
1842 * Flux.using(
1843 * resourceSupplier,
1844 * (resource) -> return Publisher,
1845 * (resource) -> clean this up
1846 * )
1847 *
1848 * share() creates a hot publisher, else it would be a cold publisher.
1849 * Cold publisher would read the file for each subscriber – that would mean opening and reading the same file many times.
1850 * ********************************************************************
1851 */
1852 @Test
1853 void test_readFile_using() {
1854 Path filePath = Paths.get("src/test/resources/file.txt");
1855 Flux<String> fileFlux = Flux.using(
1856 () -> Files.lines(filePath),
1857 Flux::fromStream,
1858 Stream::close
1859 );
1860 fileFlux.subscribe(l -> log.info("Lines: {}", l));
1861
1862 Flux<String> fileFlux2 = fileFlux
1863 .subscribeOn(Schedulers.newParallel("file-copy", 3))
1864 .share();
1865 fileFlux2.subscribe(l -> log.info("Lines: {}", l));
1866 }
1867
1868 /**
1869 * ********************************************************************
1870 * ParallelFlux - Will complete in 1 sec even when 3 ops take 3 seconds in sequence
1871 * ********************************************************************
1872 */
1873 @Test
1874 void test_parallel() {
1875 log.info("Cores: {}", Runtime.getRuntime().availableProcessors());
1876 ParallelFlux<String> flux1 = Flux.just("apple", "orange", "banana")
1877 .parallel()
1878 .runOn(Schedulers.parallel())
1879 .map(HelperUtil::capitalizeString);
1880 StepVerifier.create(flux1)
1881 .expectNextCount(3)
1882 .verifyComplete();
1883
1884
1885 Flux<String> flux2 = Flux.just("apple", "orange", "banana")
1886 .flatMap(name -> {
1887 return Mono.just(name)
1888 .map(HelperUtil::capitalizeString)
1889 .subscribeOn(Schedulers.parallel());
1890 });
1891 StepVerifier.create(flux2)
1892 .expectNextCount(3)
1893 .verifyComplete();
1894 }
1895
1896 /**
1897 * ********************************************************************
1898 * flatMap Parallelism - Will complete in 1 sec even when 3 ops take 3 seconds in sequence
1899 * ********************************************************************
1900 */
1901 @Test
1902 void test_parallel_2() {
1903 Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1904 .flatMap(name -> {
1905 return Mono.just(name)
1906 .map(HelperUtil::capitalizeString)
1907 .subscribeOn(Schedulers.parallel());
1908 });
1909 StepVerifier.create(flux1)
1910 .expectNextCount(3)
1911 .verifyComplete();
1912 }
1913
1914 /**
1915 * ********************************************************************
1916 * flatMap - fire-forget jobs with subscribe, Will run async jobs
1917 * ********************************************************************
1918 */
1919 @SneakyThrows
1920 @Test
1921 void fireForgetTest() {
1922 CountDownLatch latch = new CountDownLatch(3);
1923 Flux<Object> flux1 = Flux.just("apple", "orange", "banana")
1924 .flatMap(fruit -> {
1925 Mono.just(fruit)
1926 .map(e -> HelperUtil.capitalizeStringLatch(e, latch))
1927 .subscribeOn(Schedulers.parallel())
1928 .subscribe();
1929 return Mono.empty();
1930 });
1931 StepVerifier.create(flux1)
1932 .verifyComplete();
1933 latch.await(5, TimeUnit.SECONDS);
1934 }
1935
1936 /**
1937 * ********************************************************************
1938 * flatMapSequential - Maintains order but executes in parallel
1939 * ********************************************************************
1940 */
1941 @Test
1942 void test_flatMapSequential() {
1943 Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1944 .flatMapSequential(name -> {
1945 return Mono.just(name)
1946 .map(HelperUtil::capitalizeString)
1947 .subscribeOn(Schedulers.parallel());
1948 });
1949 StepVerifier.create(flux1)
1950 .expectNext("APPLE", "ORANGE", "BANANA")
1951 .verifyComplete();
1952 }
1953
1954 /**
1955 * ********************************************************************
1956 * flatMapSequential - Maintains order but executes in parallel
1957 * ********************************************************************
1958 */
1959 @Test
1960 void test_flatMapSequential_2() {
1961 Flux<String> flux1 = Flux.just("apple", "orange", "banana")
1962 .flatMapSequential(name -> {
1963 return Mono.just(name)
1964 .map(HelperUtil::capitalizeString)
1965 .subscribeOn(Schedulers.parallel());
1966 }, 1)
1967 .log();
1968 StepVerifier.create(flux1)
1969 .expectNext("APPLE", "ORANGE", "BANANA")
1970 .verifyComplete();
1971 }
1972
1973 /**
1974 * ********************************************************************
1975 * withVirtualTime - flux that emits every second.
1976 * interval - blocks thread, so you will have to use sleep to see the output
1977 * ********************************************************************
1978 */
1979 @Test
1980 @SneakyThrows
1981 void test_withVirtualTime() {
1982 VirtualTimeScheduler.getOrSet();
1983 Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1))
1984 .log()
1985 .take(10);
1986 intervalFlux.subscribe(i -> log.info("Number: {}", i));
1987 TimeUnit.SECONDS.sleep(5);
1988 StepVerifier.withVirtualTime(() -> intervalFlux)
1989 .expectSubscription()
1990 .expectNoEvent(Duration.ofMillis(999))
1991 .thenAwait(Duration.ofSeconds(5))
1992 .expectNextCount(4)
1993 .thenCancel()
1994 .verify();
1995 }
1996
1997 /**
1998 * ********************************************************************
1999 * flux that emits every day. Use of virtual time to simulate days.
2000 * ********************************************************************
2001 */
2002 @Test
2003 @SneakyThrows
2004 void test_withVirtualTime_2() {
2005 VirtualTimeScheduler.getOrSet();
2006 StepVerifier.withVirtualTime(this::getTake)
2007 .expectSubscription()
2008 .expectNoEvent(Duration.ofDays(1))
2009 .thenAwait(Duration.ofDays(1))
2010 .expectNext(0L)
2011 .thenAwait(Duration.ofDays(1))
2012 .expectNext(1L)
2013 .thenCancel()
2014 .verify();
2015 }
2016
2017 private Flux<Long> getTake() {
2018 return Flux.interval(Duration.ofDays(1))
2019 .log()
2020 .take(10);
2021 }
2022
2023 /**
2024 * ********************************************************************
2025 * then - will just replay the source terminal signal, resulting in a Mono<Void> to indicate that this never signals any onNext.
2026 * thenEmpty - not only returns a Mono<Void>, but it takes a Mono<Void> as a parameter. It represents a concatenation of the source completion signal then the second, empty Mono completion signal. In other words, it completes when A then B have both completed sequentially, and doesn't emit data.
2027 * thenMany - waits for the source to complete then plays all the signals from its Publisher<R> parameter, resulting in a Flux<R> that will "pause" until the source completes, then emit the many elements from the provided publisher before replaying its completion signal as well.
2028 * ********************************************************************
2029 */
2030 @Test
2031 void test_thenManyChain() {
2032 Flux<String> names = Flux.just("Jack", "Jill");
2033 names.map(String::toUpperCase)
2034 .thenMany(HelperUtil.deleteFromDb())
2035 .thenMany(HelperUtil.saveToDb())
2036 .subscribe(System.out::println);
2037 }
2038
2039 @Test
2040 void test_thenEmpty() {
2041 Flux<String> names = Flux.just("Jack", "Jill");
2042 names.map(String::toUpperCase)
2043 .thenMany(HelperUtil.saveToDb())
2044 .thenEmpty(HelperUtil.sendMail())
2045 .subscribe(System.out::println);
2046 }
2047
2048 @Test
2049 void test_then() {
2050 Flux<String> names = Flux.just("Jack", "Jill");
2051 names.map(String::toUpperCase)
2052 .thenMany(HelperUtil.saveToDb())
2053 .then()
2054 .then(Mono.just("Ram"))
2055 .thenReturn("Done!")
2056 .subscribe(System.out::println);
2057 }
2058
2059 /**
2060 * ********************************************************************
2061 * firstWithValue - first mono to return
2062 * ********************************************************************
2063 */
2064 @Test
2065 void test_monoFirst() {
2066 Mono<String> mono1 = Mono.just("Jack").delayElement(Duration.ofSeconds(1));
2067 Mono<String> mono2 = Mono.just("Jill");
2068 //Return the mono which returns its value faster
2069 Mono<String> mono3 = Mono.firstWithValue(mono1, mono2);
2070 mono3.subscribe(System.out::println);
2071 StepVerifier.create(mono3)
2072 .expectNext("Jill")
2073 .verifyComplete();
2074 }
2075
2076 /**
2077 * ********************************************************************
2078 * buffer
2079 * ********************************************************************
2080 */
2081 @Test
2082 public void test_bufferGroup() {
2083 Flux<List<Integer>> flux1 = Flux
2084 .range(1, 7)
2085 .buffer(2);
2086 StepVerifier
2087 .create(flux1)
2088 .expectNext(Arrays.asList(1, 2))
2089 .expectNext(Arrays.asList(3, 4))
2090 .expectNext(Arrays.asList(5, 6))
2091 .expectNext(Arrays.asList(7))
2092 .verifyComplete();
2093 }
2094
2095 @Test
2096 @SneakyThrows
2097 void test_tickClock() {
2098 Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
2099 Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
2100 Flux.merge(fastClock, slowClock).subscribe(System.out::println);
2101 TimeUnit.SECONDS.sleep(5);
2102 }
2103
2104 @Test
2105 @SneakyThrows
2106 public void test_tickMergeClock() {
2107 Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
2108 Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
2109 Flux clock = Flux.merge(slowClock, fastClock);
2110 Flux feed = Flux.interval(Duration.ofSeconds(1)).map(tick -> LocalTime.now());
2111 clock.withLatestFrom(feed, (tick, time) -> tick + " " + time).subscribe(System.out::println);
2112 TimeUnit.SECONDS.sleep(15);
2113 }
2114
2115 @Test
2116 @SneakyThrows
2117 void test_tickZipClock() {
2118 Flux fastClock = Flux.interval(Duration.ofSeconds(1)).map(tick -> "fast tick " + tick);
2119 Flux slowClock = Flux.interval(Duration.ofSeconds(2)).map(tick -> "slow tick " + tick);
2120 fastClock.zipWith(slowClock, (tick, time) -> tick + " " + time).subscribe(System.out::println);
2121 TimeUnit.SECONDS.sleep(5);
2122 }
2123
2124 @Test
2125 @SneakyThrows
2126 void test_emitter() {
2127 MyFeed myFeed = new MyFeed();
2128 Flux feedFlux = Flux.create(emmiter -> {
2129 myFeed.register(new MyListener() {
2130 @Override
2131 public void priceTick(String msg) {
2132 emmiter.next(msg);
2133 }
2134
2135 @Override
2136 public void error(Throwable error) {
2137 emmiter.error(error);
2138 }
2139 });
2140 }, FluxSink.OverflowStrategy.LATEST);
2141 feedFlux.subscribe(System.out::println);
2142 TimeUnit.SECONDS.sleep(15);
2143 System.out.println("Sending message!");
2144 for (int i = 0; i < 10; i++) {
2145 myFeed.sendMessage("HELLO_" + i);
2146 }
2147 }
2148
2149 /**
2150 * ********************************************************************
2151 * cancel subscription
2152 * ********************************************************************
2153 */
2154 @Test
2155 void test_monoCancelSubscription() {
2156 Mono<String> helloMono = Mono.just("Jack")
2157 .log()
2158 .map(String::toUpperCase);
2159 helloMono.subscribe(s -> {
2160 log.info("Got: {}", s);
2161 },
2162 Throwable::printStackTrace,
2163 () -> log.info("Finished"),
2164 Subscription::cancel
2165 );
2166 }
2167
2168 /**
2169 * ********************************************************************
2170 * cancel subscription after n elements
2171 * ********************************************************************
2172 */
2173 @Test
2174 void test_request() {
2175 //Jill won't be fetched as subscription will be cancelled after 2 elements
2176 Flux<String> namesMono = Flux.just("Jack", "Jane", "Jill")
2177 .log();
2178 namesMono.subscribe(s -> {
2179 log.info("Got: {}", s);
2180 },
2181 Throwable::printStackTrace,
2182 () -> log.info("Finished"),
2183 subscription -> subscription.request(2));
2184 }
2185
2186 /**
2187 * ********************************************************************
2188 * backpressure
2189 * ********************************************************************
2190 */
2191 @Test
2192 void test_fluxBackPressure() {
2193 Flux<Integer> fluxNumber = Flux.range(1, 5).log();
2194
2195 //Fetches 2 at a time.
2196 fluxNumber.subscribe(new BaseSubscriber<>() {
2197 private final int requestCount = 2;
2198 private int count = 0;
2199
2200 @Override
2201 protected void hookOnSubscribe(Subscription subscription) {
2202 request(requestCount);
2203 }
2204
2205 @Override
2206 protected void hookOnNext(Integer value) {
2207 count++;
2208 if (count >= requestCount) {
2209 count = 0;
2210 log.info("requesting next batch!");
2211 request(requestCount);
2212 }
2213 }
2214 });
2215 }
2216
2217 /**
2218 * ********************************************************************
2219 * onBackpressureDrop - fetches all in unbounded request, but stores in internal queue, drops elements not used
2220 * ********************************************************************
2221 */
2222 @Test
2223 void test_fluxBackPressureDrop() {
2224 Flux<Integer> fluxNumber = Flux.range(1, 15).log();
2225
2226 //Fetches 2 at a time.
2227 fluxNumber
2228 .onBackpressureDrop(item -> {
2229 log.info("Dropped {}", item);
2230 })
2231 .subscribe(new BaseSubscriber<>() {
2232 private final int requestCount = 2;
2233 private int count = 0;
2234 private int batch = 0;
2235
2236 @Override
2237 protected void hookOnSubscribe(Subscription subscription) {
2238 request(requestCount);
2239 }
2240
2241 @Override
2242 protected void hookOnNext(Integer value) {
2243 if (batch > 2) {
2244 return;
2245 }
2246 count++;
2247 if (count >= requestCount) {
2248 count = 0;
2249 batch++;
2250 log.info("requesting next batch {}", batch);
2251 request(requestCount);
2252 }
2253
2254 }
2255 });
2256 }
2257
2258 /**
2259 * ********************************************************************
2260 * onBackpressureBuffer - fetches all in unbounded request, but stores in internal queue, but doesnt drop unused items
2261 * ********************************************************************
2262 */
2263 @Test
2264 void test_fluxBackPressureBuffet() {
2265 Flux<Integer> fluxNumber = Flux.range(1, 15).log();
2266
2267 //Fetches 2 at a time.
2268 fluxNumber
2269 .onBackpressureBuffer()
2270 .subscribe(new BaseSubscriber<>() {
2271 private final int requestCount = 2;
2272 private int count = 0;
2273 private int batch = 0;
2274
2275 @Override
2276 protected void hookOnSubscribe(Subscription subscription) {
2277 request(requestCount);
2278 }
2279
2280 @Override
2281 protected void hookOnNext(Integer value) {
2282 if (batch > 2) {
2283 return;
2284 }
2285 count++;
2286 if (count >= requestCount) {
2287 count = 0;
2288 batch++;
2289 log.info("requesting next batch {}", batch);
2290 request(requestCount);
2291 }
2292
2293 }
2294 });
2295 }
2296
2297 /**
2298 * ********************************************************************
2299 * onBackpressureError - To identify if receiver is overrun by items as producer is producing more elements than can be processed.
2300 * ********************************************************************
2301 */
2302 @Test
2303 void test_fluxBackPressureOnError() {
2304 Flux<Integer> fluxNumber = Flux.range(1, 15).log();
2305
2306 //Fetches 2 at a time.
2307 fluxNumber
2308 .onBackpressureError()
2309 .subscribe(new BaseSubscriber<>() {
2310 private final int requestCount = 2;
2311 private int count = 0;
2312 private int batch = 0;
2313
2314 @Override
2315 protected void hookOnSubscribe(Subscription subscription) {
2316 request(requestCount);
2317 }
2318
2319 @Override
2320 protected void hookOnError(Throwable throwable) {
2321 log.error("Error thrown is: {}", throwable.getMessage());
2322 }
2323
2324 @Override
2325 protected void hookOnNext(Integer value) {
2326 if (batch > 2) {
2327 return;
2328 }
2329 count++;
2330 if (count >= requestCount) {
2331 count = 0;
2332 batch++;
2333 log.info("requesting next batch {}", batch);
2334 request(requestCount);
2335 }
2336
2337 }
2338 });
2339 }
2340
2341 /**
2342 * ********************************************************************
2343 * backpressure - limit rate
2344 * ********************************************************************
2345 */
2346 @Test
2347 void test_fluxBackPressureLimitRate() {
2348 Flux<Integer> fluxNumber = Flux.range(1, 5)
2349 .log()
2350 .limitRate(3);
2351 StepVerifier.create(fluxNumber)
2352 .expectNext(1, 2, 3, 4, 5)
2353 .verifyComplete();
2354 }
2355
2356 /**
2357 * ********************************************************************
2358 * cold flux - producing/emitting only when a subscriber subscribes, generates new sets of values for each new subscription, eg: spotify
2359 * hot flux - emitting happens even there is no subscriber. All the subscribers get the value from the single data producer irrespective of the time they started subscribing, eg: radio
2360 * ********************************************************************
2361 */
2362 @Test
2363 @SneakyThrows
2364 void test_connectableFlux() {
2365 ConnectableFlux<Integer> connectableFlux = Flux.range(1, 10)
2366 .delayElements(Duration.ofSeconds(1))
2367 .publish();
2368 connectableFlux.connect();
2369
2370 TimeUnit.SECONDS.sleep(3);
2371 connectableFlux.subscribe(i -> {
2372 log.info("Sub1 Number: {}", i);
2373 });
2374
2375 TimeUnit.SECONDS.sleep(2);
2376 connectableFlux.subscribe(i -> {
2377 log.info("Sub2 Number: {}", i);
2378 });
2379
2380 ConnectableFlux<Integer> connectableFlux2 = Flux.range(1, 10)
2381 .delayElements(Duration.ofSeconds(1))
2382 .publish();
2383 StepVerifier.create(connectableFlux2)
2384 .then(connectableFlux2::connect)
2385 .thenConsumeWhile(i -> i <= 5)
2386 .expectNext(6, 7, 8, 9, 10)
2387 .expectComplete()
2388 .verify();
2389 }
2390
2391 /**
2392 * ********************************************************************
2393 * hot flux - auto connect, min subscribers required before publisher emits
2394 * ********************************************************************
2395 */
2396 @Test
2397 @SneakyThrows
2398 void test_connectableAutoFlux() {
2399 //Hot Flux.
2400 Flux<Integer> connectableFlux = Flux.range(1, 5)
2401 .log()
2402 .delayElements(Duration.ofSeconds(1))
2403 .publish()
2404 .autoConnect(2);
2405
2406 //2 subscribers
2407 StepVerifier.create(connectableFlux)
2408 .then(connectableFlux::subscribe)
2409 .expectNext(1, 2, 3, 4, 5)
2410 .expectComplete()
2411 .verify();
2412 }
2413
2414 /**
2415 * ********************************************************************
2416 * hot flux - ref count, if subscriber count goes down, publisher stops emitting
2417 * ********************************************************************
2418 */
2419 @Test
2420 @SneakyThrows
2421 void test_connectableFlux_1() {
2422 //Hot Flux.
2423 Flux<Integer> connectableFlux = Flux.range(1, 15)
2424 .delayElements(Duration.ofSeconds(1))
2425 .doOnCancel(() -> {
2426 log.info("Received cancel");
2427 })
2428 .publish()
2429 .refCount(2);
2430
2431 //Min 2 subscribers required
2432 Disposable subscribe1 = connectableFlux.subscribe(e -> log.info("Sub1: " + e));
2433 Disposable subscribe2 = connectableFlux.subscribe(e -> log.info("Sub2: " + e));
2434 TimeUnit.SECONDS.sleep(3);
2435 subscribe1.dispose();
2436 subscribe2.dispose();
2437 TimeUnit.SECONDS.sleep(5);
2438 }
2439
2440 /**
2441 * ********************************************************************
2442 * defer
2443 * ********************************************************************
2444 */
2445 @Test
2446 @SneakyThrows
2447 void test_defer() {
2448 Mono<UUID> just = Mono.just(UUID.randomUUID());
2449 Mono<UUID> deferJust = Mono.defer(() -> Mono.just(UUID.randomUUID()));
2450
2451 just.subscribe(l -> log.info("UUID: {}", l));
2452 just.subscribe(l -> log.info("UUID: {}", l));
2453 System.out.println();
2454 deferJust.subscribe(l -> log.info("UUID: {}", l));
2455 deferJust.subscribe(l -> log.info("UUID: {}", l));
2456 }
2457
2458 /**
2459 * ********************************************************************
2460 * onSchedulersHook - if you have to use thread local
2461 * ********************************************************************
2462 */
2463 @Test
2464 public void test_onScheduleHook() {
2465 Runnable stringCallable = () -> getName();
2466 Schedulers.onScheduleHook("myHook", runnable -> {
2467 log.info("before scheduled runnable");
2468 return () -> {
2469 log.info("before execution");
2470 runnable.run();
2471 log.info("after execution");
2472 };
2473 });
2474 Mono.just("Hello world")
2475 .subscribeOn(Schedulers.single())
2476 .subscribe(System.out::println);
2477 }
2478
2479 /**
2480 * ********************************************************************
2481 * checkpoint
2482 * ********************************************************************
2483 */
2484 @Test
2485 void test_checkpoint() {
2486 Flux flux = Flux.just("Jack", "Jill", "Joe")
2487 .checkpoint("before uppercase")
2488 .map(e -> e.toUpperCase())
2489 .checkpoint("after uppercase")
2490 .filter(e -> e.length() > 3)
2491 .checkpoint("after filter")
2492 .map(e -> new RuntimeException("Custom error!"));
2493 flux.subscribe(System.out::println);
2494 }
2495
2496 /**
2497 * ********************************************************************
2498 * checkpoint
2499 * ********************************************************************
2500 */
2501 @Test
2502 void flux_test_debugAgent() {
2503 ReactorDebugAgent.init();
2504 ReactorDebugAgent.processExistingClasses();
2505 Flux flux = Flux.just("a")
2506 .concatWith(Flux.error(new IllegalArgumentException("My Error!")))
2507 .onErrorMap(ex -> {
2508 log.error("Exception: {}", ex.getMessage());
2509 return new IllegalStateException("New Error!");
2510 });
2511 flux.subscribe(System.out::println);
2512 }
2513
2514 /**
2515 * ********************************************************************
2516 * Flux.generate - programmatically create flux, synchronous, cant emit without downstream subscriber asking for it.
2517 * Flux.create - programmatically create flux, asynchronous, can emit more elements without downstream subscriber asking for it.
2518 * ********************************************************************
2519 */
2520 @Test
2521 void test_flux_generate() {
2522 Flux<Integer> flux = Flux.generate(() -> 1, (state, sink) -> {
2523 sink.next(state * 2);
2524 if (state == 10) {
2525 sink.complete();
2526 }
2527 return state + 1;
2528 });
2529 flux.subscribe(System.out::println);
2530 StepVerifier.create(flux)
2531 .expectNextCount(10)
2532 .verifyComplete();
2533 System.out.println();
2534 }
2535
2536 /**
2537 * ********************************************************************
2538 * Flux.generate - programmatically create flux, synchronous
2539 * Flux.create - programmatically create flux, asynchronous
2540 *
2541 * buffer - buffer if downstream cant keep up
2542 * drop - drop if downstream cant keep up
2543 * error - singal error when downstream cant keep up
2544 * ignore - ignore downstream backpressure requests
2545 * latest - downstream will only get latest
2546 * ********************************************************************
2547 */
2548 @Test
2549 void test_flux_create() {
2550 List<String> names = Arrays.asList("jack", "jill");
2551 Flux<String> flux = Flux.create(sink -> {
2552 names.forEach(sink::next);
2553 sink.complete();
2554 });
2555
2556 StepVerifier.create(flux)
2557 .expectNextCount(2)
2558 .verifyComplete();
2559
2560 Flux<Integer> integerFlux = Flux.create((FluxSink<Integer> fluxSink) -> {
2561 IntStream.range(0, 5)
2562 .peek(i -> System.out.println("going to emit - " + i))
2563 .forEach(fluxSink::next);
2564 fluxSink.complete();
2565 });
2566
2567 StepVerifier.create(integerFlux)
2568 .expectNextCount(5)
2569 .verifyComplete();
2570
2571 Flux<Integer> integerFlux2 = Flux.create((FluxSink<Integer> fluxSink) -> {
2572 IntStream.range(0, 5)
2573 .peek(i -> System.out.println("going to emit - " + i))
2574 .forEach(fluxSink::next);
2575 fluxSink.complete();
2576 }, FluxSink.OverflowStrategy.DROP);
2577
2578 StepVerifier.create(integerFlux2)
2579 .expectNextCount(5)
2580 .verifyComplete();
2581 }
2582
2583 @Test
2584 void test_chain() {
2585 CompanyVO request = new CompanyVO();
2586 request.setName("Twitter");
2587 Mono.just(request)
2588 .map(HelperUtil::convertToEntity)
2589 .zipWith(HelperUtil.getNameSuffix(), HelperUtil::appendSuffix)
2590 .flatMap(HelperUtil::addCompanyOwner)
2591 .flatMap(HelperUtil::appendOrgIdToDepartment)
2592 .flatMap(HelperUtil::save)
2593 .subscribe(System.out::println);
2594 }
2595
2596 /**
2597 * ********************************************************************
2598 * expand Finding the shortest path in a graph. Searching file system. Finding neighbor nodes in a network.
2599 * expandDeep Finding all possible combinations.
2600 * ********************************************************************
2601 */
2602 @Test
2603 void test_expand() {
2604 Employee CEO = new Employee("CEO");
2605
2606 // Directors reporting to CEO
2607 Employee directorA = new Employee("Director of Dept A");
2608 Employee directorB = new Employee("Director of Dept B");
2609 CEO.addDirectReports(directorA, directorB);
2610
2611 // Managers reporting to directors
2612 Employee managerA1 = new Employee("Manager 1 of Dept A");
2613 Employee managerA2 = new Employee("Manager 2 of Dept A");
2614 Employee managerB1 = new Employee("Manager 1 of Dept B");
2615 Employee managerB2 = new Employee("Manager 2 of Dept B");
2616 directorA.addDirectReports(managerA1, managerA2);
2617 directorB.addDirectReports(managerB1, managerB2);
2618
2619 Mono.fromSupplier(() -> CEO)
2620 .expand(this::getDirectReports)
2621 .subscribe(System.out::println);
2622 }
2623
2624 @Test
2625 void test_expandDeep() {
2626 Employee CEO = new Employee("CEO");
2627
2628 // Directors reporting to CEO
2629 Employee directorA = new Employee("Director of Dept A");
2630 Employee directorB = new Employee("Director of Dept B");
2631 CEO.addDirectReports(directorA, directorB);
2632
2633 // Managers reporting to directors
2634 Employee managerA1 = new Employee("Manager 1 of Dept A");
2635 Employee managerA2 = new Employee("Manager 2 of Dept A");
2636 Employee managerB1 = new Employee("Manager 1 of Dept B");
2637 Employee managerB2 = new Employee("Manager 2 of Dept B");
2638 directorA.addDirectReports(managerA1, managerA2);
2639 directorB.addDirectReports(managerB1, managerB2);
2640
2641 Mono.fromSupplier(() -> CEO)
2642 .expandDeep(this::getDirectReports)
2643 .subscribe(System.out::println);
2644 }
2645
2646 private Flux<Employee> getDirectReports(Employee employee) {
2647 return Flux.fromIterable(employee.getDirectReports());
2648 }
2649
2650 @Test
2651 void test_fluxToMono() {
2652 Mono<List<String>> mono = Flux.just("jack", "raj").collectList();
2653 Flux<List<String>> flux = Flux.just("jack", "raj").collectList().flatMapMany(Flux::just);
2654
2655 StepVerifier.create(mono)
2656 .expectNextCount(1)
2657 .verifyComplete();
2658
2659 StepVerifier.create(flux)
2660 .expectNextCount(1)
2661 .verifyComplete();
2662
2663 }
2664
2665 @Test
2666 void test_compareMapWithList() {
2667 List<String> colors = List.of("red", "blue", "green");
2668 Map<String, String> fruitMap = Map.of("red", "apple", "green", "grapes");
2669 Mono<List<String>> flux1 = Mono.just(fruitMap)
2670 .flatMap(map -> {
2671 return Flux.fromIterable(colors)
2672 .flatMap(color -> {
2673 if (map.containsKey(color)) {
2674 return Mono.just(map.get(color));
2675 }
2676 return Mono.empty();
2677 }).collectList();
2678 });
2679 flux1.subscribe(System.out::println);
2680
2681 StepVerifier.create(flux1)
2682 .expectNext(List.of("apple", "grapes"))
2683 .verifyComplete();
2684
2685 Flux<String> flux2 = Mono.just(fruitMap)
2686 .flatMapMany(map ->
2687 Flux.fromIterable(colors)
2688 .flatMap(color -> {
2689 String fruit = fruitMap.get(color);
2690 return fruit != null ? Flux.just(fruit) : Flux.empty();
2691 })
2692 );
2693 flux2.subscribe(System.out::println);
2694 StepVerifier.create(flux2)
2695 .expectNext("apple")
2696 .expectNext("grapes")
2697 .verifyComplete();
2698 }
2699
2700 /**
2701 * ********************************************************************
2702 * timeout - if response doesnt come in certain time then timeout.
2703 * ********************************************************************
2704 */
2705 @Test
2706 void test_timeout() {
2707 Mono<String> mono = Mono.just("jack")
2708 .delayElement(Duration.ofSeconds(5))
2709 .timeout(Duration.ofSeconds(1))
2710 .onErrorReturn("raj");
2711 StepVerifier.create(mono)
2712 .expectNext("raj")
2713 .verifyComplete();
2714 }
2715
2716 @Test
2717 void test_pageImpl() {
2718 List<String> names = List.of("Jack", "Raj", "Edward");
2719 PageRequest pageRequest = PageRequest.of(0, 5);
2720 Mono<PageImpl<String>> pageFlux = Flux.fromIterable(names)
2721 .collectList()
2722 .zipWith(Mono.just(names.size()))
2723 .map(t -> new PageImpl<>(t.getT1(), pageRequest, names.size()));
2724
2725 pageFlux.subscribe(System.out::println);
2726
2727 StepVerifier.create(pageFlux)
2728 .assertNext(e -> {
2729 assertEquals(e.getNumberOfElements(), 3);
2730 assertEquals("Jack", e.getContent().get(0));
2731 })
2732 .verifyComplete();
2733 }
2734
2735 @Test
2736 void test_function() {
2737 Function<String, Mono<String>> stringSupplier = p -> Mono.just("hello " + p);
2738 Mono<String> mono = Mono.defer(() -> stringSupplier.apply("jack"));
2739 StepVerifier.create(mono)
2740 .expectNext("hello jack")
2741 .verifyComplete();
2742 }
2743
2744 @SneakyThrows
2745 @Test
2746 void test_blockHound() {
2747 try {
2748 FutureTask<?> task = new FutureTask<>(() -> {
2749 TimeUnit.SECONDS.sleep(2);
2750 return "";
2751 });
2752 Schedulers.parallel().schedule(task);
2753 task.get(10, TimeUnit.SECONDS);
2754 Assertions.fail("should fail");
2755 } catch (Exception e) {
2756 Assertions.assertTrue(e.getCause() instanceof BlockingOperationError);
2757 }
2758 }
2759
2760 @Test
2761 void test_blockHound2() {
2762 Mono<String> mono = Mono.just("apple")
2763 .flatMap(name -> {
2764 return Mono.just("name")
2765 .map(n -> {
2766 HelperUtil.sleep(1);
2767 return n;
2768 })
2769 .subscribeOn(Schedulers.parallel());
2770 });
2771
2772 StepVerifier.create(mono)
2773 .expectError(BlockingOperationError.class);
2774 }
2775
2776}
References
comments powered by Disqus