Scatter Gather Pattern

Overview

Scatter Gather enterprise integration pattern is used for scenarios such as "best quote", where we need to request information from several suppliers and decide which one provides us with the best price for the requested item.

Github: https://github.com/gitorko/project62

Scatter Gather Pattern

So we have a book product and we need to fetch the price from various sources and at max we can wait for 3 seconds. You could use a Thread.sleep or Threads join() method but then if the tasks complete before 3 seconds the tasks will still wait for 3 seconds before returning.

Code

We can use a CountDownLatch to wait for the prices to be fetched. It will wait only for 3 seconds and return the prices fetched.

 1package com.demo.project62.scattergather.latch;
 2
 3import java.util.Map;
 4import java.util.concurrent.ConcurrentHashMap;
 5import java.util.concurrent.CountDownLatch;
 6import java.util.concurrent.ExecutorService;
 7import java.util.concurrent.Executors;
 8import java.util.concurrent.TimeUnit;
 9
10import lombok.AllArgsConstructor;
11import lombok.SneakyThrows;
12
13public class ScatterGatherLatch {
14
15    ExecutorService threadPool = Executors.newCachedThreadPool();
16
17    public static void main(String[] args) {
18        Map<String, Float> book1Prices = new ScatterGatherLatch().getPrices("book1");
19        System.out.println(book1Prices);
20    }
21
22    @SneakyThrows
23    private Map<String, Float> getPrices(String productId) {
24        Map<String, Float> prices = new ConcurrentHashMap<>();
25        CountDownLatch latch = new CountDownLatch(3);
26        threadPool.submit(new FetchData("http://amazon", productId, prices, latch));
27        threadPool.submit(new FetchData("http://ebay", productId, prices, latch));
28        threadPool.submit(new FetchData("http://flipkart", productId, prices, latch));
29        latch.await(3, TimeUnit.SECONDS);
30        threadPool.shutdown();
31        return prices;
32    }
33
34    @AllArgsConstructor
35    class FetchData implements Runnable {
36
37        String url;
38        String productId;
39        Map<String, Float> prices;
40        CountDownLatch latch;
41
42        @SneakyThrows
43        @Override
44        public void run() {
45            if (url.contains("amazon")) {
46                //http fetch from amazon
47                System.out.println("Fetching price from amazon!");
48                TimeUnit.SECONDS.sleep(2);
49                prices.put("amazon", 2.35f);
50                latch.countDown();
51            }
52
53            if (url.contains("ebay")) {
54                System.out.println("Fetching price from ebay!");
55                //http fetch from ebay
56                TimeUnit.SECONDS.sleep(4);
57                prices.put("ebay", 2.30f);
58                latch.countDown();
59            }
60
61            if (url.contains("flipkart")) {
62                System.out.println("Fetching price from flipkart!");
63                //http fetch from flipkart
64                TimeUnit.SECONDS.sleep(1);
65                prices.put("flipkart", 2.10f);
66                latch.countDown();
67            }
68        }
69    }
70}
71

We can also use the invokeAll method

 1package com.demo.project62.scattergather.invoke;
 2
 3import java.util.ArrayList;
 4import java.util.List;
 5import java.util.Map;
 6import java.util.concurrent.Callable;
 7import java.util.concurrent.ConcurrentHashMap;
 8import java.util.concurrent.ExecutorService;
 9import java.util.concurrent.Executors;
10import java.util.concurrent.TimeUnit;
11
12import lombok.AllArgsConstructor;
13import lombok.SneakyThrows;
14
15public class ScatterGatherInvoke {
16    ExecutorService threadPool = Executors.newCachedThreadPool();
17
18    public static void main(String[] args) {
19        Map<String, Float> book1Prices = new ScatterGatherInvoke().getPrices("book1");
20        System.out.println(book1Prices);
21    }
22
23    @SneakyThrows
24    private Map<String, Float> getPrices(String productId) {
25        Map<String, Float> prices = new ConcurrentHashMap<>();
26        List<Callable<Void>> tasks = new ArrayList<>();
27
28        tasks.add(new FetchData("http://amazon", productId, prices));
29        tasks.add(new FetchData("http://ebay", productId, prices));
30        tasks.add(new FetchData("http://flipkart", productId, prices));
31        threadPool.invokeAll(tasks, 3, TimeUnit.SECONDS);
32        threadPool.shutdown();
33        return prices;
34    }
35
36    @AllArgsConstructor
37    class FetchData implements Callable<Void> {
38
39        String url;
40        String productId;
41        Map<String, Float> prices;
42
43        @Override
44        @SneakyThrows
45        public Void call() throws Exception {
46            if (url.contains("amazon")) {
47                //http fetch from amazon
48                System.out.println("Fetching price from amazon!");
49                TimeUnit.SECONDS.sleep(2);
50                prices.put("amazon", 2.35f);
51            }
52
53            if (url.contains("ebay")) {
54                System.out.println("Fetching price from ebay!");
55                //http fetch from ebay
56                TimeUnit.SECONDS.sleep(4);
57                prices.put("ebay", 2.30f);
58            }
59
60            if (url.contains("flipkart")) {
61                System.out.println("Fetching price from flipkart!");
62                //http fetch from flipkart
63                TimeUnit.SECONDS.sleep(1);
64                prices.put("flipkart", 2.10f);
65            }
66            return null;
67        }
68    }
69}
70
71

We can also use the CompletableFuture.

 1package com.demo.project62.scattergather.completable;
 2
 3import java.util.Map;
 4import java.util.concurrent.CompletableFuture;
 5import java.util.concurrent.ConcurrentHashMap;
 6import java.util.concurrent.ExecutorService;
 7import java.util.concurrent.Executors;
 8import java.util.concurrent.TimeUnit;
 9import java.util.concurrent.TimeoutException;
10
11import lombok.AllArgsConstructor;
12import lombok.SneakyThrows;
13
14public class ScatterGatherCompletable {
15    ExecutorService threadPool = Executors.newCachedThreadPool();
16
17    public static void main(String[] args) {
18        Map<String, Float> book1Prices = new ScatterGatherCompletable().getPrices("book1");
19        System.out.println(book1Prices);
20    }
21
22    @SneakyThrows
23    private Map<String, Float> getPrices(String productId) {
24        Map<String, Float> prices = new ConcurrentHashMap<>();
25
26        CompletableFuture<Void> task1 = CompletableFuture.runAsync(new FetchData("http://amazon", productId, prices));
27        CompletableFuture<Void> task2 = CompletableFuture.runAsync(new FetchData("http://ebay", productId, prices));
28        CompletableFuture<Void> task3 = CompletableFuture.runAsync(new FetchData("http://flipkart", productId, prices));
29
30        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1,task2,task3);
31        try {
32            allTasks.get(3, TimeUnit.SECONDS);
33        } catch (TimeoutException ex) {
34            //Do Nothing!
35        }
36        return prices;
37    }
38
39    @AllArgsConstructor
40    class FetchData implements Runnable {
41
42        String url;
43        String productId;
44        Map<String, Float> prices;
45
46        @Override
47        @SneakyThrows
48        public void run() {
49            if (url.contains("amazon")) {
50                //http fetch from amazon
51                System.out.println("Fetching price from amazon!");
52                TimeUnit.SECONDS.sleep(2);
53                prices.put("amazon", 2.35f);
54            }
55
56            if (url.contains("ebay")) {
57                System.out.println("Fetching price from ebay!");
58                //http fetch from ebay
59                TimeUnit.SECONDS.sleep(4);
60                prices.put("ebay", 2.30f);
61            }
62
63            if (url.contains("flipkart")) {
64                System.out.println("Fetching price from flipkart!");
65                //http fetch from flipkart
66                TimeUnit.SECONDS.sleep(1);
67                prices.put("flipkart", 2.10f);
68            }
69        }
70    }
71}
72

Result

1{amazon=2.35, flipkart=2.1}
comments powered by Disqus