CompletableFuture - Basics

Overview

Async Programming using CompletableFuture

Github: https://github.com/gitorko/project83

Basics

Methods demonstrating how to use CompletableFuture

  1package com.demo.project83;
  2
  3import static org.awaitility.Awaitility.await;
  4
  5import java.util.ArrayList;
  6import java.util.Date;
  7import java.util.List;
  8import java.util.Objects;
  9import java.util.concurrent.CompletableFuture;
 10import java.util.concurrent.ExecutorService;
 11import java.util.concurrent.Executors;
 12import java.util.concurrent.Future;
 13import java.util.concurrent.TimeUnit;
 14import java.util.concurrent.TimeoutException;
 15import java.util.concurrent.atomic.AtomicInteger;
 16import java.util.stream.Collectors;
 17
 18import lombok.SneakyThrows;
 19import lombok.extern.slf4j.Slf4j;
 20import org.junit.jupiter.api.Assertions;
 21import org.junit.jupiter.api.Test;
 22
 23@Slf4j
 24public class CompletableFutureTest {
 25
 26    static AtomicInteger counter = new AtomicInteger();
 27
 28    /**
 29     * get() is blocking call. So main thread has to wait.
 30     * Old way with Future. Dont use it.
 31     */
 32    @Test
 33    @SneakyThrows
 34    void blockingChain_test() {
 35        counter = new AtomicInteger();
 36        List<Future<String>> futureLst = new ArrayList<>();
 37        ExecutorService executor = Executors.newCachedThreadPool();
 38        for (int i = 0; i < 5; i++) {
 39            int finalI = i;
 40            Future<String> future = executor.submit(() -> greetHello("Jack_" + finalI));
 41            futureLst.add(future);
 42        }
 43        for (Future<String> future : futureLst) {
 44            //get is blocking the main thread here.
 45            String message = future.get();
 46            finishedGreetHello(message);
 47        }
 48        executor.shutdown();
 49        Assertions.assertEquals(5, counter.get());
 50    }
 51
 52    /**
 53     * Callback attached so non blocking.
 54     *
 55     * Ability to provide call back functionality.
 56     * You can manually set the return response on a CompletableFuture which you cant do on Future. You can cancel it as well. 
 57     * You can chain & combine CompletableFutures which is not possible with Future.
 58     * Exception handling support in CompletableFutures which is not available in Future.
 59     *
 60     * Although chaining can be done manually but not advised to use this approach.
 61     * This example is for reference only.
 62     */
 63    @Test
 64    @SneakyThrows
 65    void nonBlockingChain_test() {
 66        counter = new AtomicInteger();
 67        ExecutorService executor = Executors.newCachedThreadPool();
 68        for (int i = 0; i < 5; i++) {
 69            int finalI = i;
 70            executor.submit(() -> {
 71                CompletableFutureTest.greetHelloChain("Jack_" + finalI, new CompletableFuture<>());
 72            });
 73        }
 74        //Give enough time for all threads to complete and return back with results.
 75        TimeUnit.SECONDS.sleep(10);
 76        executor.shutdown();
 77        Assertions.assertEquals(5, counter.get());
 78    }
 79
 80    /**
 81     * When function does not return anything then use CompletableFuture.runAsync()
 82     * returns CompletableFuture<Void>
 83     */
 84    @Test
 85    @SneakyThrows
 86    void runAsync_test() {
 87        counter = new AtomicInteger();
 88        for (int i = 0; i < 5; i++) {
 89            int finalI = i;
 90            CompletableFuture.runAsync(() -> {
 91                greetHello("Jack_" + finalI);
 92            }).thenRun(() -> {
 93                counter.incrementAndGet();
 94                log.info("Completed!");
 95            });
 96        }
 97        //Give enough time for all threads to complete and return back with results.
 98        TimeUnit.SECONDS.sleep(5);
 99        Assertions.assertEquals(5, counter.get());
100    }
101
102    @Test
103    @SneakyThrows
104    void runAsync_test_await() {
105        counter = new AtomicInteger();
106        for (int i = 0; i < 5; i++) {
107            int finalI = i;
108            CompletableFuture.runAsync(() -> {
109                greetHello("Jack_" + finalI);
110            }).thenRun(() -> {
111                counter.incrementAndGet();
112                log.info("Completed!");
113            });
114        }
115        await().atMost(2, TimeUnit.SECONDS).until(() -> counter.get() == 5);
116        Assertions.assertEquals(5, counter.get());
117    }
118
119    /**
120     * Returns CompletableFuture<T>
121     */
122    @Test
123    @SneakyThrows
124    void supplyAsync_test() {
125        counter = new AtomicInteger();
126        for (int i = 0; i < 5; i++) {
127            int finalI = i;
128            CompletableFuture.supplyAsync(() -> {
129                return greetHello("Jack_" + finalI);
130            }).thenAccept(message -> {
131                counter.incrementAndGet();
132                log.info("Greeting: {}", message);
133            });
134        }
135        //Give enough time for all threads to complete and return back with results.
136        TimeUnit.SECONDS.sleep(5);
137        Assertions.assertEquals(5, counter.get());
138    }
139
140    /**
141     * thenApply will return a nested CompletionStage.
142     * thenApply returns CompletionStage & return value of the function.
143     */
144    @Test
145    @SneakyThrows
146    void thenApply_test() {
147        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
148            //Do some computation & return the result
149            return "hello ";
150        }).thenApply(message -> {
151            return message + "world";
152        }).thenApply(message -> {
153            return message.toUpperCase();
154        });
155        // Returns type CompletionStage<CompletionStage<CompletionStage<String>>>.
156        Assertions.assertEquals("HELLO WORLD", completableFuture.get());
157    }
158
159    /**
160     * thenAccept will return a single CompletionStage, flattening effect like a flatMap
161     * thenAccept takes a Consumer and returns a Void & only the completion state.
162     */
163    @Test
164    @SneakyThrows
165    void thenAccept_test() {
166        counter = new AtomicInteger();
167        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
168            //Do some computation & return the result
169            return "hello world";
170        }).thenAccept(message -> {
171            log.info("Got Message: {}", message);
172        }).thenRun(() -> {
173            counter.incrementAndGet();
174            log.info("Cant access previous result, just running!");
175        });
176        completableFuture.get();
177        Assertions.assertEquals(1, counter.get());
178    }
179
180    /**
181     * thenCompose() combines two futures where one future is dependent on the other
182     * thenCompose will return a single CompletionStage, flattening effect like a flatMap
183     */
184    @Test
185    @SneakyThrows
186    void thenCompose_test() {
187        //Notice the flattened return type. Combines 2 dependent future.
188        CompletableFuture<String> completableFuture = getGreeting("Jack")
189                .thenCompose(message -> CompletableFutureTest.transformMessage(message));
190        Assertions.assertEquals("HELLO JACK", completableFuture.get());
191    }
192
193    /**
194     * thenCombine() combines two independent futures.
195     */
196    @Test
197    @SneakyThrows
198    void thenCombine_test() {
199        //Combines the 2 independent futures.
200        CompletableFuture<String> completableFuture = getGreeting("Jack")
201                .thenCombine(CompletableFutureTest.getCurrentDate(), (message, currentDate) -> {
202                    return CompletableFutureTest.addDateToMessage(message, currentDate);
203                });
204        Assertions.assertTrue(completableFuture.get().contains("Hello Jack was sent on"));
205    }
206
207    @Test
208    @SneakyThrows
209    void exceptionally_test() {
210        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
211            //Do some computation & return the result
212            return "Stage 0";
213        }).thenApply(result -> {
214            return result + " -> Stage 1";
215        }).exceptionally(ex -> {
216            return "Error in stage 1 : " + ex.getMessage();
217        }).thenApply(result -> {
218            if (true) {
219                throw new RuntimeException("My custom error!");
220            }
221            return result + " -> Stage 2";
222        }).exceptionally(ex -> {
223            return "Error in stage 2 : " + ex.getMessage();
224        });
225        Assertions.assertTrue(completableFuture.get().contains("Error in stage 2"));
226    }
227
228    @Test
229    @SneakyThrows
230    void allOf_test() {
231        counter = new AtomicInteger();
232        List<CompletableFuture<Void>> tasks = getListOfTasks();
233        CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.get(0), tasks.get(1), tasks.get(2));
234        allTasks.get();
235        log.info("Waited for all tasks to complete and then returned!");
236        Assertions.assertEquals(3, counter.get());
237    }
238
239    @Test
240    @SneakyThrows
241    void anyOf_test() {
242        counter = new AtomicInteger();
243        List<CompletableFuture<Void>> tasks = getListOfTasks();
244        CompletableFuture<Object> allTasks = CompletableFuture.anyOf(tasks.get(0), tasks.get(1), tasks.get(2));
245        allTasks.get();
246        log.info("Waited for any one task to complete and then returned!");
247        Assertions.assertTrue(counter.get() >= 1);
248    }
249
250    @Test
251    @SneakyThrows
252    void allOf_withTimeLimit_test() {
253        counter = new AtomicInteger();
254        List<CompletableFuture<Void>> tasks = getListOfTasks();
255        CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.get(0), tasks.get(1), tasks.get(2));
256        try {
257            allTasks.get(4, TimeUnit.SECONDS);
258        } catch (TimeoutException ex) {
259            log.error("timeout!", ex);
260        }
261        log.info("Waited for 4 seconds and returned!");
262        Assertions.assertTrue(counter.get() >= 2);
263    }
264
265    @Test
266    @SneakyThrows
267    void allOf_iterate() {
268        List<String> names = List.of("Jack", "Adam", "Ram", "Ajay");
269        List<CompletableFuture<String>> customersFuture = names.stream()
270                .map(userName -> checkName(userName))
271                .collect(Collectors.toList());
272
273        CompletableFuture<Void> allFutures = CompletableFuture.allOf(customersFuture.toArray(new CompletableFuture[customersFuture.size()]));
274
275        CompletableFuture<List<String>> allCustomersFuture = allFutures.thenApply(v -> customersFuture.stream()
276                .map(pageContentFuture -> pageContentFuture.join())
277                .filter(Objects::isNull)
278                .collect(Collectors.toList()));
279
280        List<String> customers = allCustomersFuture.get();
281        Assertions.assertEquals(2, customers.size());
282    }
283
284    private static CompletableFuture<String> checkName(String userName) {
285        return CompletableFuture.supplyAsync(() -> {
286            if (userName.startsWith("A")) return userName;
287            return null;
288        });
289    }
290
291    private static String greetHello(String name) {
292        log.info("Got name: {}", name);
293        return "Hello " + name;
294    }
295
296    private static void finishedGreetHello(String result) {
297        counter.incrementAndGet();
298        log.info("Finished greet chain: {}", result);
299    }
300
301    private static void greetHelloChain(String name, CompletableFuture<String> completableFuture) {
302        log.info("Got name: {}", name);
303        completableFuture.complete("Hello " + name);
304        completableFuture.whenComplete(CompletableFutureTest::finishedGreetHelloChain);
305    }
306
307    private static void finishedGreetHelloChain(String result, Throwable t) {
308        counter.incrementAndGet();
309        log.info("Finished greet chain: {}", result);
310    }
311
312    private static CompletableFuture<String> getGreeting(String userName) {
313        return CompletableFuture.supplyAsync(() -> {
314            return "Hello " + userName;
315        });
316    }
317
318    private static CompletableFuture<Date> getCurrentDate() {
319        return CompletableFuture.supplyAsync(() -> {
320            return new Date();
321        });
322    }
323
324    private static CompletableFuture<String> transformMessage(String message) {
325        return CompletableFuture.supplyAsync(() -> {
326            return message.toUpperCase();
327        });
328    }
329
330    private static String addDateToMessage(String message, Date currentDate) {
331        return message + " was sent on  " + currentDate;
332    }
333
334    //Each task is delayed by few seconds
335    private static List<CompletableFuture<Void>> getListOfTasks() {
336        List<CompletableFuture<Void>> tasks = new ArrayList<>();
337        tasks.add(CompletableFuture.supplyAsync(() -> {
338            return greetHello("Jack");
339        }).thenAccept(message -> {
340            counter.incrementAndGet();
341            try {
342                TimeUnit.SECONDS.sleep(1);
343            } catch (InterruptedException e) {
344            }
345            log.info("Greeting: {}", message);
346        }));
347        tasks.add(CompletableFuture.supplyAsync(() -> {
348            return greetHello("Raj");
349        }).thenAccept(message -> {
350            counter.incrementAndGet();
351            try {
352                TimeUnit.SECONDS.sleep(3);
353            } catch (InterruptedException e) {
354            }
355            log.info("Greeting: {}", message);
356        }));
357        tasks.add(CompletableFuture.supplyAsync(() -> {
358            return greetHello("Dan");
359        }).thenAccept(message -> {
360            counter.incrementAndGet();
361            try {
362                TimeUnit.SECONDS.sleep(5);
363            } catch (InterruptedException e) {
364            }
365            log.info("Greeting: {}", message);
366        }));
367        return tasks;
368    }
369
370}

References

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html

comments powered by Disqus