CompletableFuture - Basics
Overview
Async Programming using CompletableFuture
Github: https://github.com/gitorko/project83
Basics
Methods demonstrating how to use CompletableFuture
1package com.demo.project83;
2
3import java.util.ArrayList;
4import java.util.Date;
5import java.util.List;
6import java.util.Objects;
7import java.util.concurrent.CompletableFuture;
8import java.util.concurrent.ExecutorService;
9import java.util.concurrent.Executors;
10import java.util.concurrent.Future;
11import java.util.concurrent.TimeUnit;
12import java.util.concurrent.TimeoutException;
13import java.util.concurrent.atomic.AtomicInteger;
14import java.util.stream.Collectors;
15
16import lombok.SneakyThrows;
17import lombok.extern.slf4j.Slf4j;
18import org.junit.jupiter.api.Assertions;
19import org.junit.jupiter.api.Test;
20
21@Slf4j
22public class CompletableFutureTest {
23
24 static AtomicInteger counter = new AtomicInteger();
25
26 /**
27 * get() is blocking call. So main thread has to wait.
28 * Old way with Future. Dont use it.
29 */
30 @Test
31 @SneakyThrows
32 void blockingChain_test() {
33 counter = new AtomicInteger();
34 List<Future<String>> futureLst = new ArrayList<>();
35 ExecutorService executor = Executors.newCachedThreadPool();
36 for (int i = 0; i < 5; i++) {
37 int finalI = i;
38 Future<String> future = executor.submit(() -> greetHello("Jack_" + finalI));
39 futureLst.add(future);
40 }
41 for (Future<String> future : futureLst) {
42 //get is blocking the main thread here.
43 String message = future.get();
44 finishedGreetHello(message);
45 }
46 executor.shutdown();
47 Assertions.assertEquals(5, counter.get());
48 }
49
50 /**
51 * Callback attached so non blocking.
52 *
53 * Ability to provide call back functionality.
54 * You can manually set the return response on a CompletableFuture which you cant do on Future. You can cancel it as well.
55 * You can chain & combine CompletableFutures which is not possible with Future.
56 * Exception handling support in CompletableFutures which is not available in Future.
57 *
58 * Although chaining can be done manually but not advised to use this approach.
59 * This example is for reference only.
60 */
61 @Test
62 @SneakyThrows
63 void nonBlockingChain_test() {
64 counter = new AtomicInteger();
65 ExecutorService executor = Executors.newCachedThreadPool();
66 for (int i = 0; i < 5; i++) {
67 int finalI = i;
68 executor.submit(() -> {
69 CompletableFutureTest.greetHelloChain("Jack_" + finalI, new CompletableFuture<>());
70 });
71 }
72 //Give enough time for all threads to complete and return back with results.
73 TimeUnit.SECONDS.sleep(10);
74 executor.shutdown();
75 Assertions.assertEquals(5, counter.get());
76 }
77
78 /**
79 * When function does not return anything then use CompletableFuture.runAsync()
80 * returns CompletableFuture<Void>
81 */
82 @Test
83 @SneakyThrows
84 void runAsync_test() {
85 counter = new AtomicInteger();
86 for (int i = 0; i < 5; i++) {
87 int finalI = i;
88 CompletableFuture.runAsync(() -> {
89 greetHello("Jack_" + finalI);
90 }).thenRun(() -> {
91 counter.incrementAndGet();
92 log.info("Completed!");
93 });
94 }
95 //Give enough time for all threads to complete and return back with results.
96 TimeUnit.SECONDS.sleep(5);
97 Assertions.assertEquals(5, counter.get());
98 }
99
100 /**
101 * Returns CompletableFuture<T>
102 */
103 @Test
104 @SneakyThrows
105 void supplyAsync_test() {
106 counter = new AtomicInteger();
107 for (int i = 0; i < 5; i++) {
108 int finalI = i;
109 CompletableFuture.supplyAsync(() -> {
110 return greetHello("Jack_" + finalI);
111 }).thenAccept(message -> {
112 counter.incrementAndGet();
113 log.info("Greeting: {}", message);
114 });
115 }
116 //Give enough time for all threads to complete and return back with results.
117 TimeUnit.SECONDS.sleep(5);
118 Assertions.assertEquals(5, counter.get());
119 }
120
121 /**
122 * thenApply will return a nested CompletionStage.
123 * thenApply returns CompletionStage & return value of the function.
124 */
125 @Test
126 @SneakyThrows
127 void thenApply_test() {
128 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
129 //Do some computation & return the result
130 return "hello ";
131 }).thenApply(message -> {
132 return message + "world";
133 }).thenApply(message -> {
134 return message.toUpperCase();
135 });
136 // Returns type CompletionStage<CompletionStage<CompletionStage<String>>>.
137 Assertions.assertEquals("HELLO WORLD", completableFuture.get());
138 }
139
140 /**
141 * thenAccept will return a single CompletionStage, flattening effect like a flatMap
142 * thenAccept takes a Consumer and returns a Void & only the completion state.
143 */
144 @Test
145 @SneakyThrows
146 void thenAccept_test() {
147 counter = new AtomicInteger();
148 CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
149 //Do some computation & return the result
150 return "hello world";
151 }).thenAccept(message -> {
152 log.info("Got Message: {}", message);
153 }).thenRun(() -> {
154 counter.incrementAndGet();
155 log.info("Cant access previous result, just running!");
156 });
157 completableFuture.get();
158 Assertions.assertEquals(1, counter.get());
159 }
160
161 /**
162 * thenCompose() combines two futures where one future is dependent on the other
163 * thenCompose will return a single CompletionStage, flattening effect like a flatMap
164 */
165 @Test
166 @SneakyThrows
167 void thenCompose_test() {
168 //Notice the flattened return type. Combines 2 dependent future.
169 CompletableFuture<String> completableFuture = getGreeting("Jack")
170 .thenCompose(message -> CompletableFutureTest.transformMessage(message));
171 Assertions.assertEquals("HELLO JACK", completableFuture.get());
172 }
173
174 /**
175 * thenCombine() combines two independent futures.
176 */
177 @Test
178 @SneakyThrows
179 void thenCombine_test() {
180 //Combines the 2 independent futures.
181 CompletableFuture<String> completableFuture = getGreeting("Jack")
182 .thenCombine(CompletableFutureTest.getCurrentDate(), (message, currentDate) -> {
183 return CompletableFutureTest.addDateToMessage(message, currentDate);
184 });
185 Assertions.assertTrue(completableFuture.get().contains("Hello Jack was sent on"));
186 }
187
188 @Test
189 @SneakyThrows
190 void exceptionally_test() {
191 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
192 //Do some computation & return the result
193 return "Stage 0";
194 }).thenApply(result -> {
195 return result + " -> Stage 1";
196 }).exceptionally(ex -> {
197 return "Error in stage 1 : " + ex.getMessage();
198 }).thenApply(result -> {
199 if (true) {
200 throw new RuntimeException("My custom error!");
201 }
202 return result + " -> Stage 2";
203 }).exceptionally(ex -> {
204 return "Error in stage 2 : " + ex.getMessage();
205 });
206 Assertions.assertTrue(completableFuture.get().contains("Error in stage 2"));
207 }
208
209 @Test
210 @SneakyThrows
211 void allOf_test() {
212 counter = new AtomicInteger();
213 List<CompletableFuture<Void>> tasks = getListOfTasks();
214 CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.get(0), tasks.get(1), tasks.get(2));
215 allTasks.get();
216 log.info("Waited for all tasks to complete and then returned!");
217 Assertions.assertEquals(3, counter.get());
218 }
219
220 @Test
221 @SneakyThrows
222 void anyOf_test() {
223 counter = new AtomicInteger();
224 List<CompletableFuture<Void>> tasks = getListOfTasks();
225 CompletableFuture<Object> allTasks = CompletableFuture.anyOf(tasks.get(0), tasks.get(1), tasks.get(2));
226 allTasks.get();
227 log.info("Waited for any one task to complete and then returned!");
228 Assertions.assertTrue(counter.get() >= 1);
229 }
230
231 @Test
232 @SneakyThrows
233 void allOf_withTimeLimit_test() {
234 counter = new AtomicInteger();
235 List<CompletableFuture<Void>> tasks = getListOfTasks();
236 CompletableFuture<Void> allTasks = CompletableFuture.allOf(tasks.get(0), tasks.get(1), tasks.get(2));
237 try {
238 allTasks.get(4, TimeUnit.SECONDS);
239 } catch (TimeoutException ex) {
240 log.error("timeout!", ex);
241 }
242 log.info("Waited for 4 seconds and returned!");
243 Assertions.assertTrue(counter.get() >= 2);
244 }
245
246 @Test
247 @SneakyThrows
248 void allOf_iterate() {
249 List<String> names = List.of("Jack", "Adam", "Ram", "Ajay");
250 List<CompletableFuture<String>> customersFuture = names.stream()
251 .map(userName -> checkName(userName))
252 .collect(Collectors.toList());
253
254 CompletableFuture<Void> allFutures = CompletableFuture.allOf(customersFuture.toArray(new CompletableFuture[customersFuture.size()]));
255
256 CompletableFuture<List<String>> allCustomersFuture = allFutures.thenApply(v -> customersFuture.stream()
257 .map(pageContentFuture -> pageContentFuture.join())
258 .filter(Objects::isNull)
259 .collect(Collectors.toList()));
260
261 List<String> customers = allCustomersFuture.get();
262 Assertions.assertEquals(2, customers.size());
263 }
264
265 private static CompletableFuture<String> checkName(String userName) {
266 return CompletableFuture.supplyAsync(() -> {
267 if (userName.startsWith("A")) return userName;
268 return null;
269 });
270 }
271
272 private static String greetHello(String name) {
273 log.info("Got name: {}", name);
274 return "Hello " + name;
275 }
276
277 private static void finishedGreetHello(String result) {
278 counter.incrementAndGet();
279 log.info("Finished greet chain: {}", result);
280 }
281
282 private static void greetHelloChain(String name, CompletableFuture<String> completableFuture) {
283 log.info("Got name: {}", name);
284 completableFuture.complete("Hello " + name);
285 completableFuture.whenComplete(CompletableFutureTest::finishedGreetHelloChain);
286 }
287
288 private static void finishedGreetHelloChain(String result, Throwable t) {
289 counter.incrementAndGet();
290 log.info("Finished greet chain: {}", result);
291 }
292
293 private static CompletableFuture<String> getGreeting(String userName) {
294 return CompletableFuture.supplyAsync(() -> {
295 return "Hello " + userName;
296 });
297 }
298
299 private static CompletableFuture<Date> getCurrentDate() {
300 return CompletableFuture.supplyAsync(() -> {
301 return new Date();
302 });
303 }
304
305 private static CompletableFuture<String> transformMessage(String message) {
306 return CompletableFuture.supplyAsync(() -> {
307 return message.toUpperCase();
308 });
309 }
310
311 private static String addDateToMessage(String message, Date currentDate) {
312 return message + " was sent on " + currentDate;
313 }
314
315 //Each task is delayed by few seconds
316 private static List<CompletableFuture<Void>> getListOfTasks() {
317 List<CompletableFuture<Void>> tasks = new ArrayList<>();
318 tasks.add(CompletableFuture.supplyAsync(() -> {
319 return greetHello("Jack");
320 }).thenAccept(message -> {
321 counter.incrementAndGet();
322 try {
323 TimeUnit.SECONDS.sleep(1);
324 } catch (InterruptedException e) {
325 }
326 log.info("Greeting: {}", message);
327 }));
328 tasks.add(CompletableFuture.supplyAsync(() -> {
329 return greetHello("Raj");
330 }).thenAccept(message -> {
331 counter.incrementAndGet();
332 try {
333 TimeUnit.SECONDS.sleep(3);
334 } catch (InterruptedException e) {
335 }
336 log.info("Greeting: {}", message);
337 }));
338 tasks.add(CompletableFuture.supplyAsync(() -> {
339 return greetHello("Dan");
340 }).thenAccept(message -> {
341 counter.incrementAndGet();
342 try {
343 TimeUnit.SECONDS.sleep(5);
344 } catch (InterruptedException e) {
345 }
346 log.info("Greeting: {}", message);
347 }));
348 return tasks;
349 }
350
351}
References
comments powered by Disqus