Scatter Gather Pattern

Overview

Scatter Gather enterprise integration pattern is used for scenarios such as "best quote", where we need to request information from several suppliers and decide which one provides us with the best price for the requested item.

Github: https://github.com/gitorko/project01

Scatter Gather Pattern

So we have a book product and we need to fetch the price from various sources and at max we can wait for 3 seconds. You could use a Thread.sleep or Threads join() method but then if the tasks complete before 3 seconds the tasks will still wait for 3 seconds before returning.

Code

We can use a CountDownLatch to wait for the prices to be fetched. It will wait only for 3 seconds and return the prices fetched.

 1package com.demo.basics.designpatterns._24_scattergather.latch;
 2
 3import java.util.Map;
 4import java.util.concurrent.ConcurrentHashMap;
 5import java.util.concurrent.CountDownLatch;
 6import java.util.concurrent.ExecutorService;
 7import java.util.concurrent.Executors;
 8import java.util.concurrent.TimeUnit;
 9
10import lombok.AllArgsConstructor;
11import lombok.SneakyThrows;
12import org.junit.jupiter.api.Test;
13
14public class ScatterGatherLatchTest {
15
16    ExecutorService threadPool = Executors.newCachedThreadPool();
17
18    @Test
19    public void test() {
20        Map<String, Float> book1Prices = new ScatterGatherLatchTest().getPrices("book1");
21        System.out.println(book1Prices);
22    }
23
24    @SneakyThrows
25    private Map<String, Float> getPrices(String productId) {
26        Map<String, Float> prices = new ConcurrentHashMap<>();
27        CountDownLatch latch = new CountDownLatch(3);
28        threadPool.submit(new FetchData("http://amazon", productId, prices, latch));
29        threadPool.submit(new FetchData("http://ebay", productId, prices, latch));
30        threadPool.submit(new FetchData("http://flipkart", productId, prices, latch));
31        latch.await(3, TimeUnit.SECONDS);
32        threadPool.shutdown();
33        return prices;
34    }
35
36    @AllArgsConstructor
37    class FetchData implements Runnable {
38
39        String url;
40        String productId;
41        Map<String, Float> prices;
42        CountDownLatch latch;
43
44        @SneakyThrows
45        @Override
46        public void run() {
47            if (url.contains("amazon")) {
48                //http fetch from amazon
49                System.out.println("Fetching price from amazon!");
50                TimeUnit.SECONDS.sleep(2);
51                prices.put("amazon", 2.35f);
52                latch.countDown();
53            }
54
55            if (url.contains("ebay")) {
56                System.out.println("Fetching price from ebay!");
57                //http fetch from ebay
58                TimeUnit.SECONDS.sleep(4);
59                prices.put("ebay", 2.30f);
60                latch.countDown();
61            }
62
63            if (url.contains("flipkart")) {
64                System.out.println("Fetching price from flipkart!");
65                //http fetch from flipkart
66                TimeUnit.SECONDS.sleep(1);
67                prices.put("flipkart", 2.10f);
68                latch.countDown();
69            }
70        }
71    }
72}
73

We can also use the invokeAll method

 1package com.demo.basics.designpatterns._24_scattergather.invoke;
 2
 3import java.util.ArrayList;
 4import java.util.List;
 5import java.util.Map;
 6import java.util.concurrent.Callable;
 7import java.util.concurrent.ConcurrentHashMap;
 8import java.util.concurrent.ExecutorService;
 9import java.util.concurrent.Executors;
10import java.util.concurrent.TimeUnit;
11
12import lombok.AllArgsConstructor;
13import lombok.SneakyThrows;
14import org.junit.jupiter.api.Test;
15
16public class ScatterGatherInvokeTest {
17    ExecutorService threadPool = Executors.newCachedThreadPool();
18
19    @Test
20    public void test() {
21        Map<String, Float> book1Prices = new ScatterGatherInvokeTest().getPrices("book1");
22        System.out.println(book1Prices);
23    }
24
25    @SneakyThrows
26    private Map<String, Float> getPrices(String productId) {
27        Map<String, Float> prices = new ConcurrentHashMap<>();
28        List<Callable<Void>> tasks = new ArrayList<>();
29
30        tasks.add(new FetchData("http://amazon", productId, prices));
31        tasks.add(new FetchData("http://ebay", productId, prices));
32        tasks.add(new FetchData("http://flipkart", productId, prices));
33        threadPool.invokeAll(tasks, 3, TimeUnit.SECONDS);
34        threadPool.shutdown();
35        return prices;
36    }
37
38    @AllArgsConstructor
39    class FetchData implements Callable<Void> {
40
41        String url;
42        String productId;
43        Map<String, Float> prices;
44
45        @Override
46        @SneakyThrows
47        public Void call() throws Exception {
48            if (url.contains("amazon")) {
49                //http fetch from amazon
50                System.out.println("Fetching price from amazon!");
51                TimeUnit.SECONDS.sleep(2);
52                prices.put("amazon", 2.35f);
53            }
54
55            if (url.contains("ebay")) {
56                System.out.println("Fetching price from ebay!");
57                //http fetch from ebay
58                TimeUnit.SECONDS.sleep(4);
59                prices.put("ebay", 2.30f);
60            }
61
62            if (url.contains("flipkart")) {
63                System.out.println("Fetching price from flipkart!");
64                //http fetch from flipkart
65                TimeUnit.SECONDS.sleep(1);
66                prices.put("flipkart", 2.10f);
67            }
68            return null;
69        }
70    }
71}
72
73

We can also use the CompletableFuture.

 1package com.demo.basics.designpatterns._24_scattergather.completable;
 2
 3import java.util.Map;
 4import java.util.concurrent.CompletableFuture;
 5import java.util.concurrent.ConcurrentHashMap;
 6import java.util.concurrent.ExecutorService;
 7import java.util.concurrent.Executors;
 8import java.util.concurrent.TimeUnit;
 9import java.util.concurrent.TimeoutException;
10
11import lombok.AllArgsConstructor;
12import lombok.SneakyThrows;
13import org.junit.jupiter.api.Test;
14
15public class ScatterGatherCompletableTest {
16    ExecutorService threadPool = Executors.newCachedThreadPool();
17
18    @Test
19    public void test() {
20        Map<String, Float> book1Prices = new ScatterGatherCompletableTest().getPrices("book1");
21        System.out.println(book1Prices);
22    }
23
24    @SneakyThrows
25    private Map<String, Float> getPrices(String productId) {
26        Map<String, Float> prices = new ConcurrentHashMap<>();
27
28        CompletableFuture<Void> task1 = CompletableFuture.runAsync(new FetchData("http://amazon", productId, prices));
29        CompletableFuture<Void> task2 = CompletableFuture.runAsync(new FetchData("http://ebay", productId, prices));
30        CompletableFuture<Void> task3 = CompletableFuture.runAsync(new FetchData("http://flipkart", productId, prices));
31
32        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1,task2,task3);
33        try {
34            allTasks.get(3, TimeUnit.SECONDS);
35        } catch (TimeoutException ex) {
36            //Do Nothing!
37        }
38        return prices;
39    }
40
41    @AllArgsConstructor
42    class FetchData implements Runnable {
43
44        String url;
45        String productId;
46        Map<String, Float> prices;
47
48        @Override
49        @SneakyThrows
50        public void run() {
51            if (url.contains("amazon")) {
52                //http fetch from amazon
53                System.out.println("Fetching price from amazon!");
54                TimeUnit.SECONDS.sleep(2);
55                prices.put("amazon", 2.35f);
56            }
57
58            if (url.contains("ebay")) {
59                System.out.println("Fetching price from ebay!");
60                //http fetch from ebay
61                TimeUnit.SECONDS.sleep(4);
62                prices.put("ebay", 2.30f);
63            }
64
65            if (url.contains("flipkart")) {
66                System.out.println("Fetching price from flipkart!");
67                //http fetch from flipkart
68                TimeUnit.SECONDS.sleep(1);
69                prices.put("flipkart", 2.10f);
70            }
71        }
72    }
73}
74

Result

1{amazon=2.35, flipkart=2.1}

Setup

 1# Project 01
 2
 3Data Structure & Algorithms & Design Patterns
 4
 5[https://gitorko.github.io/grokking-the-coding-interview/](https://gitorko.github.io/grokking-the-coding-interview/)
 6[https://gitorko.github.io/design-patterns/](https://gitorko.github.io/design-patterns/)
 7
 8### Version
 9
10Check version
11
12```bash
13$java --version
14openjdk version "21.0.3" 2024-04-16 LTS
15```
16
17### Online code editor
18
19https://rustpad.io/
20
21https://collabedit.com/
22
23https://app.coderpad.io/
24
25https://codeshare.io/
26
27### Topic
28
2901. Number
3002. String
3103. Map & Set
3204. Heap
3305. Sliding window / Two pointer
3406. Matrix / Grid
3507. Backtracking
3608. Pre-Sum
3709. DP
3810. Link List
3911. Binary Tree / BST
4012. Interval
4113. Binary Search
4214. Topological Sort
4315. Stack & Monotonic Stack & Queue
4416. Graphs
4517. Thread
4618. Greedy
4719. Segment Tree
4820. Prefix Tree / Trie
4921. Cyclic sort
5022. Bit Manipulation
5125. Generic
comments powered by Disqus