CompletableFuture - Basics
Overview
Async Programming using CompletableFuture
Github: https://github.com/gitorko/project83
Basics
Methods demonstrating how to use CompletableFuture
1package com.demo.project83;
2
3import static org.awaitility.Awaitility.await;
4
5import java.util.ArrayList;
6import java.util.Date;
7import java.util.List;
8import java.util.Objects;
9import java.util.concurrent.CompletableFuture;
10import java.util.concurrent.ExecutorService;
11import java.util.concurrent.Executors;
12import java.util.concurrent.Future;
13import java.util.concurrent.TimeUnit;
14import java.util.concurrent.TimeoutException;
15import java.util.concurrent.atomic.AtomicInteger;
16import java.util.stream.Collectors;
17
18import lombok.SneakyThrows;
19import lombok.extern.slf4j.Slf4j;
20import org.junit.jupiter.api.Assertions;
21import org.junit.jupiter.api.Test;
22
23@Slf4j
24public class CompletableFutureTest {
25
26 static AtomicInteger counter = new AtomicInteger();
27
28 /**
29 * get() is blocking call. So main thread has to wait.
30 * Old way with Future. Dont use it.
31 */
32 @Test
33 @SneakyThrows
34 void blockingChain_test() {
35 counter = new AtomicInteger();
36 List<Future<String>> futureLst = new ArrayList<>();
37 ExecutorService executor = Executors.newCachedThreadPool();
38 for (int i = 0; i < 5; i++) {
39 int finalI = i;
40 Future<String> future = executor.submit(() -> greetHello("Jack_" + finalI));
41 futureLst.add(future);
42 }
43 for (Future<String> future : futureLst) {
44 //get is blocking the main thread here.
45 String message = future.get();
46 finishedGreetHello(message);
47 }
48 executor.shutdown();
49 Assertions.assertEquals(5, counter.get());
50 }
51
52 /**
53 * Callback attached so non blocking.
54 *
55 * Ability to provide call back functionality.
56 * You can manually set the return response on a CompletableFuture which you cant do on Future. You can cancel it as well.
57 * You can chain & combine CompletableFutures which is not possible with Future.
58 * Exception handling support in CompletableFutures which is not available in Future.
59 *
60 * Although chaining can be done manually but not advised to use this approach.
61 * This example is for reference only.
62 */
63 @Test
64 @SneakyThrows
65 void nonBlockingChain_test() {
66 counter = new AtomicInteger();
67 ExecutorService executor = Executors.newCachedThreadPool();
68 for (int i = 0; i < 5; i++) {
69 int finalI = i;
70 executor.submit(() -> {
71 CompletableFutureTest.greetHelloChain("Jack_" + finalI, new CompletableFuture<>());
72 });
73 }
74 //Give enough time for all threads to complete and return back with results.
75 TimeUnit.SECONDS.sleep(10);
76 executor.shutdown();
77 Assertions.assertEquals(5, counter.get());
78 }
79
80 /**
81 * When function does not return anything then use CompletableFuture.runAsync()
82 * returns CompletableFuture<Void>
83 */
84 @Test
85 @SneakyThrows
86 void runAsync_test() {
87 counter = new AtomicInteger();
88 for (int i = 0; i < 5; i++) {
89 int finalI = i;
90 CompletableFuture.runAsync(() -> {
91 greetHello("Jack_" + finalI);
92 }).thenRun(() -> {
93 counter.incrementAndGet();
94 log.info("Completed!");
95 });
96 }
97 //Give enough time for all threads to complete and return back with results.
98 TimeUnit.SECONDS.sleep(5);
99 Assertions.assertEquals(5, counter.get());
100 }
101
102 @Test
103 @SneakyThrows
104 void runAsync_test_await() {
105 counter = new AtomicInteger();
106 for (int i = 0; i < 5; i++) {
107 int finalI = i;
108 CompletableFuture.runAsync(() -> {
109 greetHello("Jack_" + finalI);
110 }).thenRun(() -> {
111 counter.incrementAndGet();
112 log.info("Completed!");
113 });
114 }
115 await().atMost(2, TimeUnit.SECONDS).until(() -> counter.get() == 5);
116 Assertions.assertEquals(5, counter.get());
117 }
118
119 /**
120 * Returns CompletableFuture<T>
121 */
122 @Test
123 @SneakyThrows
124 void supplyAsync_test() {
125 counter = new AtomicInteger();
126 for (int i = 0; i < 5; i++) {
127 int finalI = i;
128 CompletableFuture.supplyAsync(() -> {
129 return greetHello("Jack_" + finalI);
130 }).thenAccept(message -> {
131 counter.incrementAndGet();
132 log.info("Greeting: {}", message);
133 });
134 }
135 //Give enough time for all threads to complete and return back with results.
136 TimeUnit.SECONDS.sleep(5);
137 Assertions.assertEquals(5, counter.get());
138 }
139
140 /**
141 * thenApply will return a nested CompletionStage.
142 * thenApply returns CompletionStage & return value of the function.
143 */
144 @Test
145 @SneakyThrows
146 void thenApply_test() {
147 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
148 //Do some computation & return the result
149 return "hello ";
150 }).thenApply(message -> {
151 return message + "world";
152 }).thenApply(message -> {
153 return message.toUpperCase();
154 });
155 // Returns type CompletionStage<CompletionStage<CompletionStage<String>>>.
156 Assertions.assertEquals("HELLO WORLD", completableFuture.get());
157 }
158
159 /**
160 * thenAccept will return a single CompletionStage, flattening effect like a flatMap
161 * thenAccept takes a Consumer and returns a Void & only the completion state.
162 */
163 @Test
164 @SneakyThrows
165 void thenAccept_test() {
166 counter = new AtomicInteger();
167 CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
168 //Do some computation & return the result
169 return "hello world";
170 }).thenAccept(message -> {
171 log.info("Got Message: {}", message);
172 }).thenRun(() -> {
173 counter.incrementAndGet();
174 log.info("Cant access previous result, just running!");
175 });
176 completableFuture.get();
177 Assertions.assertEquals(1, counter.get());
178 }
179
180 /**
181 * thenCompose() combines two futures where one future is dependent on the other
182 * thenCompose will return a single CompletionStage, flattening effect like a flatMap
183 */
184 @Test
185 @SneakyThrows
186 void thenCompose_test() {
187 //Notice the flattened return type. Combines 2 dependent future.
188 CompletableFuture<String> completableFuture = getGreeting("Jack")
189 .thenCompose(message -> CompletableFutureTest.transformMessage(message));
190 Assertions.assertEquals("HELLO JACK", completableFuture.get());
191 }
192
193 /**
194 * thenCombine() combines two independent futures.
195 */
196 @Test
197 @SneakyThrows
198 void thenCombine_test() {
199 //Combines the 2 independent futures.
200 CompletableFuture<String> completableFuture = getGreeting("Jack")
201 .thenCombine(CompletableFutureTest.getCurrentDate(), (message, currentDate) -> {
202 return CompletableFutureTest.addDateToMessage(message, currentDate);
203 });
204 Assertions.assertTrue(completableFuture.get().contains("Hello Jack was sent on"));
205 }
206
207 @Test
208 @SneakyThrows
209 void exceptionally_test() {
210 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
211 //Do some computation & return the result
212 return "Stage 0";
213 }).thenApply(result -> {
214 return result + " -> Stage 1";
215 }).exceptionally(ex -> {
216 return "Error in stage 1 : " + ex.getMessage();
217 }).thenApply(result -> {
218 if (true) {
219 throw new RuntimeException("My custom error!");
220 }
221 return result + " -> Stage 2";
222 }).exceptionally(ex -> {
223 return "Error in stage 2 : " + ex.getMessage();
224 });
225 Assertions.assertTrue(completableFuture.get().contains("Error in stage 2"));
226 }
227
228 @Test
229 @SneakyThrows
230 void allOf_test() {
231 counter = new AtomicInteger();
232 List<CompletableFuture<Void>> tasks = getListOfTasks();
233 CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.get(0), tasks.get(1), tasks.get(2));
234 allTasks.get();
235 log.info("Waited for all tasks to complete and then returned!");
236 Assertions.assertEquals(3, counter.get());
237 }
238
239 @Test
240 @SneakyThrows
241 void anyOf_test() {
242 counter = new AtomicInteger();
243 List<CompletableFuture<Void>> tasks = getListOfTasks();
244 CompletableFuture<Object> allTasks = CompletableFuture.anyOf(tasks.get(0), tasks.get(1), tasks.get(2));
245 allTasks.get();
246 log.info("Waited for any one task to complete and then returned!");
247 Assertions.assertTrue(counter.get() >= 1);
248 }
249
250 @Test
251 @SneakyThrows
252 void allOf_withTimeLimit_test() {
253 counter = new AtomicInteger();
254 List<CompletableFuture<Void>> tasks = getListOfTasks();
255 CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.get(0), tasks.get(1), tasks.get(2));
256 try {
257 allTasks.get(4, TimeUnit.SECONDS);
258 } catch (TimeoutException ex) {
259 log.error("timeout!", ex);
260 }
261 log.info("Waited for 4 seconds and returned!");
262 Assertions.assertTrue(counter.get() >= 2);
263 }
264
265 @Test
266 @SneakyThrows
267 void allOf_iterate() {
268 List<String> names = List.of("Jack", "Adam", "Ram", "Ajay");
269 List<CompletableFuture<String>> customersFuture = names.stream()
270 .map(userName -> checkName(userName))
271 .collect(Collectors.toList());
272
273 CompletableFuture<Void> allFutures = CompletableFuture.allOf(customersFuture.toArray(new CompletableFuture[customersFuture.size()]));
274
275 CompletableFuture<List<String>> allCustomersFuture = allFutures.thenApply(v -> customersFuture.stream()
276 .map(pageContentFuture -> pageContentFuture.join())
277 .filter(Objects::isNull)
278 .collect(Collectors.toList()));
279
280 List<String> customers = allCustomersFuture.get();
281 Assertions.assertEquals(2, customers.size());
282 }
283
284 private static CompletableFuture<String> checkName(String userName) {
285 return CompletableFuture.supplyAsync(() -> {
286 if (userName.startsWith("A")) return userName;
287 return null;
288 });
289 }
290
291 private static String greetHello(String name) {
292 log.info("Got name: {}", name);
293 return "Hello " + name;
294 }
295
296 private static void finishedGreetHello(String result) {
297 counter.incrementAndGet();
298 log.info("Finished greet chain: {}", result);
299 }
300
301 private static void greetHelloChain(String name, CompletableFuture<String> completableFuture) {
302 log.info("Got name: {}", name);
303 completableFuture.complete("Hello " + name);
304 completableFuture.whenComplete(CompletableFutureTest::finishedGreetHelloChain);
305 }
306
307 private static void finishedGreetHelloChain(String result, Throwable t) {
308 counter.incrementAndGet();
309 log.info("Finished greet chain: {}", result);
310 }
311
312 private static CompletableFuture<String> getGreeting(String userName) {
313 return CompletableFuture.supplyAsync(() -> {
314 return "Hello " + userName;
315 });
316 }
317
318 private static CompletableFuture<Date> getCurrentDate() {
319 return CompletableFuture.supplyAsync(() -> {
320 return new Date();
321 });
322 }
323
324 private static CompletableFuture<String> transformMessage(String message) {
325 return CompletableFuture.supplyAsync(() -> {
326 return message.toUpperCase();
327 });
328 }
329
330 private static String addDateToMessage(String message, Date currentDate) {
331 return message + " was sent on " + currentDate;
332 }
333
334 //Each task is delayed by few seconds
335 private static List<CompletableFuture<Void>> getListOfTasks() {
336 List<CompletableFuture<Void>> tasks = new ArrayList<>();
337 tasks.add(CompletableFuture.supplyAsync(() -> {
338 return greetHello("Jack");
339 }).thenAccept(message -> {
340 counter.incrementAndGet();
341 try {
342 TimeUnit.SECONDS.sleep(1);
343 } catch (InterruptedException e) {
344 }
345 log.info("Greeting: {}", message);
346 }));
347 tasks.add(CompletableFuture.supplyAsync(() -> {
348 return greetHello("Raj");
349 }).thenAccept(message -> {
350 counter.incrementAndGet();
351 try {
352 TimeUnit.SECONDS.sleep(3);
353 } catch (InterruptedException e) {
354 }
355 log.info("Greeting: {}", message);
356 }));
357 tasks.add(CompletableFuture.supplyAsync(() -> {
358 return greetHello("Dan");
359 }).thenAccept(message -> {
360 counter.incrementAndGet();
361 try {
362 TimeUnit.SECONDS.sleep(5);
363 } catch (InterruptedException e) {
364 }
365 log.info("Greeting: {}", message);
366 }));
367 return tasks;
368 }
369
370}
References
comments powered by Disqus