CompletableFuture - Basics

Overview

Async Programming using CompletableFuture

Github: https://github.com/gitorko/project83

Basics

Methods demonstrating how to use CompletableFuture

  1package com.demo.project83;
  2
  3import java.util.ArrayList;
  4import java.util.Date;
  5import java.util.List;
  6import java.util.Objects;
  7import java.util.concurrent.CompletableFuture;
  8import java.util.concurrent.ExecutorService;
  9import java.util.concurrent.Executors;
 10import java.util.concurrent.Future;
 11import java.util.concurrent.TimeUnit;
 12import java.util.concurrent.TimeoutException;
 13import java.util.concurrent.atomic.AtomicInteger;
 14import java.util.stream.Collectors;
 15
 16import lombok.SneakyThrows;
 17import lombok.extern.slf4j.Slf4j;
 18import org.junit.jupiter.api.Assertions;
 19import org.junit.jupiter.api.Test;
 20
 21@Slf4j
 22public class CompletableFutureTest {
 23
 24    static AtomicInteger counter = new AtomicInteger();
 25
 26    /**
 27     * get() is blocking call. So main thread has to wait.
 28     * Old way with Future. Dont use it.
 29     */
 30    @Test
 31    @SneakyThrows
 32    void blockingChain_test() {
 33        counter = new AtomicInteger();
 34        List<Future<String>> futureLst = new ArrayList<>();
 35        ExecutorService executor = Executors.newCachedThreadPool();
 36        for (int i = 0; i < 5; i++) {
 37            int finalI = i;
 38            Future<String> future = executor.submit(() -> greetHello("Jack_" + finalI));
 39            futureLst.add(future);
 40        }
 41        for (Future<String> future : futureLst) {
 42            //get is blocking the main thread here.
 43            String message = future.get();
 44            finishedGreetHello(message);
 45        }
 46        executor.shutdown();
 47        Assertions.assertEquals(5, counter.get());
 48    }
 49
 50    /**
 51     * Callback attached so non blocking.
 52     *
 53     * Ability to provide call back functionality.
 54     * You can manually set the return response on a CompletableFuture which you cant do on Future. You can cancel it as well. 
 55     * You can chain & combine CompletableFutures which is not possible with Future.
 56     * Exception handling support in CompletableFutures which is not available in Future.
 57     *
 58     * Although chaining can be done manually but not advised to use this approach.
 59     * This example is for reference only.
 60     */
 61    @Test
 62    @SneakyThrows
 63    void nonBlockingChain_test() {
 64        counter = new AtomicInteger();
 65        ExecutorService executor = Executors.newCachedThreadPool();
 66        for (int i = 0; i < 5; i++) {
 67            int finalI = i;
 68            executor.submit(() -> {
 69                CompletableFutureTest.greetHelloChain("Jack_" + finalI, new CompletableFuture<>());
 70            });
 71        }
 72        //Give enough time for all threads to complete and return back with results.
 73        TimeUnit.SECONDS.sleep(10);
 74        executor.shutdown();
 75        Assertions.assertEquals(5, counter.get());
 76    }
 77
 78    /**
 79     * When function does not return anything then use CompletableFuture.runAsync()
 80     * returns CompletableFuture<Void>
 81     */
 82    @Test
 83    @SneakyThrows
 84    void runAsync_test() {
 85        counter = new AtomicInteger();
 86        for (int i = 0; i < 5; i++) {
 87            int finalI = i;
 88            CompletableFuture.runAsync(() -> {
 89                greetHello("Jack_" + finalI);
 90            }).thenRun(() -> {
 91                counter.incrementAndGet();
 92                log.info("Completed!");
 93            });
 94        }
 95        //Give enough time for all threads to complete and return back with results.
 96        TimeUnit.SECONDS.sleep(5);
 97        Assertions.assertEquals(5, counter.get());
 98    }
 99
100    /**
101     * Returns CompletableFuture<T>
102     */
103    @Test
104    @SneakyThrows
105    void supplyAsync_test() {
106        counter = new AtomicInteger();
107        for (int i = 0; i < 5; i++) {
108            int finalI = i;
109            CompletableFuture.supplyAsync(() -> {
110                return greetHello("Jack_" + finalI);
111            }).thenAccept(message -> {
112                counter.incrementAndGet();
113                log.info("Greeting: {}", message);
114            });
115        }
116        //Give enough time for all threads to complete and return back with results.
117        TimeUnit.SECONDS.sleep(5);
118        Assertions.assertEquals(5, counter.get());
119    }
120
121    /**
122     * thenApply will return a nested CompletionStage.
123     * thenApply returns CompletionStage & return value of the function.
124     */
125    @Test
126    @SneakyThrows
127    void thenApply_test() {
128        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
129            //Do some computation & return the result
130            return "hello ";
131        }).thenApply(message -> {
132            return message + "world";
133        }).thenApply(message -> {
134            return message.toUpperCase();
135        });
136        // Returns type CompletionStage<CompletionStage<CompletionStage<String>>>.
137        Assertions.assertEquals("HELLO WORLD", completableFuture.get());
138    }
139
140    /**
141     * thenAccept will return a single CompletionStage, flattening effect like a flatMap
142     * thenAccept takes a Consumer and returns a Void & only the completion state.
143     */
144    @Test
145    @SneakyThrows
146    void thenAccept_test() {
147        counter = new AtomicInteger();
148        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
149            //Do some computation & return the result
150            return "hello world";
151        }).thenAccept(message -> {
152            log.info("Got Message: {}", message);
153        }).thenRun(() -> {
154            counter.incrementAndGet();
155            log.info("Cant access previous result, just running!");
156        });
157        completableFuture.get();
158        Assertions.assertEquals(1, counter.get());
159    }
160
161    /**
162     * thenCompose() combines two futures where one future is dependent on the other
163     * thenCompose will return a single CompletionStage, flattening effect like a flatMap
164     */
165    @Test
166    @SneakyThrows
167    void thenCompose_test() {
168        //Notice the flattened return type. Combines 2 dependent future.
169        CompletableFuture<String> completableFuture = getGreeting("Jack")
170                .thenCompose(message -> CompletableFutureTest.transformMessage(message));
171        Assertions.assertEquals("HELLO JACK", completableFuture.get());
172    }
173
174    /**
175     * thenCombine() combines two independent futures.
176     */
177    @Test
178    @SneakyThrows
179    void thenCombine_test() {
180        //Combines the 2 independent futures.
181        CompletableFuture<String> completableFuture = getGreeting("Jack")
182                .thenCombine(CompletableFutureTest.getCurrentDate(), (message, currentDate) -> {
183                    return CompletableFutureTest.addDateToMessage(message, currentDate);
184                });
185        Assertions.assertTrue(completableFuture.get().contains("Hello Jack was sent on"));
186    }
187
188    @Test
189    @SneakyThrows
190    void exceptionally_test() {
191        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
192            //Do some computation & return the result
193            return "Stage 0";
194        }).thenApply(result -> {
195            return result + " -> Stage 1";
196        }).exceptionally(ex -> {
197            return "Error in stage 1 : " + ex.getMessage();
198        }).thenApply(result -> {
199            if (true) {
200                throw new RuntimeException("My custom error!");
201            }
202            return result + " -> Stage 2";
203        }).exceptionally(ex -> {
204            return "Error in stage 2 : " + ex.getMessage();
205        });
206        Assertions.assertTrue(completableFuture.get().contains("Error in stage 2"));
207    }
208
209    @Test
210    @SneakyThrows
211    void allOf_test() {
212        counter = new AtomicInteger();
213        List<CompletableFuture<Void>> tasks = getListOfTasks();
214        CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.get(0), tasks.get(1), tasks.get(2));
215        allTasks.get();
216        log.info("Waited for all tasks to complete and then returned!");
217        Assertions.assertEquals(3, counter.get());
218    }
219
220    @Test
221    @SneakyThrows
222    void anyOf_test() {
223        counter = new AtomicInteger();
224        List<CompletableFuture<Void>> tasks = getListOfTasks();
225        CompletableFuture<Object> allTasks = CompletableFuture.anyOf(tasks.get(0), tasks.get(1), tasks.get(2));
226        allTasks.get();
227        log.info("Waited for any one task to complete and then returned!");
228        Assertions.assertTrue(counter.get() >= 1);
229    }
230
231    @Test
232    @SneakyThrows
233    void allOf_withTimeLimit_test() {
234        counter = new AtomicInteger();
235        List<CompletableFuture<Void>> tasks = getListOfTasks();
236        CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.get(0), tasks.get(1), tasks.get(2));
237        try {
238            allTasks.get(4, TimeUnit.SECONDS);
239        } catch (TimeoutException ex) {
240            log.error("timeout!", ex);
241        }
242        log.info("Waited for 4 seconds and returned!");
243        Assertions.assertTrue(counter.get() >= 2);
244    }
245
246    @Test
247    @SneakyThrows
248    void allOf_iterate() {
249        List<String> names = List.of("Jack", "Adam", "Ram", "Ajay");
250        List<CompletableFuture<String>> customersFuture = names.stream()
251                .map(userName -> checkName(userName))
252                .collect(Collectors.toList());
253
254        CompletableFuture<Void> allFutures = CompletableFuture.allOf(customersFuture.toArray(new CompletableFuture[customersFuture.size()]));
255
256        CompletableFuture<List<String>> allCustomersFuture = allFutures.thenApply(v -> customersFuture.stream()
257                .map(pageContentFuture -> pageContentFuture.join())
258                .filter(Objects::isNull)
259                .collect(Collectors.toList()));
260
261        List<String> customers = allCustomersFuture.get();
262        Assertions.assertEquals(2, customers.size());
263    }
264
265    private static CompletableFuture<String> checkName(String userName) {
266        return CompletableFuture.supplyAsync(() -> {
267            if (userName.startsWith("A")) return userName;
268            return null;
269        });
270    }
271
272    private static String greetHello(String name) {
273        log.info("Got name: {}", name);
274        return "Hello " + name;
275    }
276
277    private static void finishedGreetHello(String result) {
278        counter.incrementAndGet();
279        log.info("Finished greet chain: {}", result);
280    }
281
282    private static void greetHelloChain(String name, CompletableFuture<String> completableFuture) {
283        log.info("Got name: {}", name);
284        completableFuture.complete("Hello " + name);
285        completableFuture.whenComplete(CompletableFutureTest::finishedGreetHelloChain);
286    }
287
288    private static void finishedGreetHelloChain(String result, Throwable t) {
289        counter.incrementAndGet();
290        log.info("Finished greet chain: {}", result);
291    }
292
293    private static CompletableFuture<String> getGreeting(String userName) {
294        return CompletableFuture.supplyAsync(() -> {
295            return "Hello " + userName;
296        });
297    }
298
299    private static CompletableFuture<Date> getCurrentDate() {
300        return CompletableFuture.supplyAsync(() -> {
301            return new Date();
302        });
303    }
304
305    private static CompletableFuture<String> transformMessage(String message) {
306        return CompletableFuture.supplyAsync(() -> {
307            return message.toUpperCase();
308        });
309    }
310
311    private static String addDateToMessage(String message, Date currentDate) {
312        return message + " was sent on  " + currentDate;
313    }
314
315    //Each task is delayed by few seconds
316    private static List<CompletableFuture<Void>> getListOfTasks() {
317        List<CompletableFuture<Void>> tasks = new ArrayList<>();
318        tasks.add(CompletableFuture.supplyAsync(() -> {
319            return greetHello("Jack");
320        }).thenAccept(message -> {
321            counter.incrementAndGet();
322            try {
323                TimeUnit.SECONDS.sleep(1);
324            } catch (InterruptedException e) {
325            }
326            log.info("Greeting: {}", message);
327        }));
328        tasks.add(CompletableFuture.supplyAsync(() -> {
329            return greetHello("Raj");
330        }).thenAccept(message -> {
331            counter.incrementAndGet();
332            try {
333                TimeUnit.SECONDS.sleep(3);
334            } catch (InterruptedException e) {
335            }
336            log.info("Greeting: {}", message);
337        }));
338        tasks.add(CompletableFuture.supplyAsync(() -> {
339            return greetHello("Dan");
340        }).thenAccept(message -> {
341            counter.incrementAndGet();
342            try {
343                TimeUnit.SECONDS.sleep(5);
344            } catch (InterruptedException e) {
345            }
346            log.info("Greeting: {}", message);
347        }));
348        return tasks;
349    }
350
351}

References

https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html

comments powered by Disqus