Producer Consumer

Overview

Producer consumer problem implementations

Github: https://github.com/gitorko/project01

Producer Consumer

Producer consumer using ArrayBlockingQueue.

 1package com.demo.basics.concurrency._04_producerconsumer;
 2
 3import java.util.concurrent.ArrayBlockingQueue;
 4import java.util.concurrent.BlockingQueue;
 5
 6import lombok.SneakyThrows;
 7import org.junit.jupiter.api.Test;
 8
 9/**
10 * [Produce Consumer - EASY]()
11 *
12 * - blocking queue
13 */
14public class ProduceConsumer {
15
16    @SneakyThrows
17    @Test
18    public void test() {
19        // BlockingQueue<String> queue = new SynchronousQueue<>();
20        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
21
22        Runnable producer = () -> {
23            for (int i = 0; i < 20; i++) {
24                try {
25                    queue.put(String.valueOf(i));
26                    System.out.println("Published: " + i);
27                } catch (InterruptedException e) {
28                    e.printStackTrace();
29                }
30            }
31            try {
32                queue.put("END");
33            } catch (InterruptedException e) {
34                e.printStackTrace();
35            }
36
37        };
38
39        Runnable consumer = () -> {
40            while (true) {
41                try {
42                    //TimeUnit.SECONDS.sleep(3);
43                    String val = queue.take();
44                    if (val.equals("END")) break;
45                    System.out.println("Consumed: " + val);
46                } catch (InterruptedException e) {
47                    e.printStackTrace();
48                }
49            }
50        };
51
52        Thread p = new Thread(producer);
53        Thread c = new Thread(consumer);
54        p.start();
55        c.start();
56        System.out.println("Producer and Consumer has been started");
57        p.join();
58        c.join();
59        System.out.println("Completed");
60    }
61}

Producer consumer using wait notify.

 1package com.demo.basics.concurrency._04_producerconsumer;
 2
 3import java.util.LinkedList;
 4import java.util.Queue;
 5
 6import lombok.SneakyThrows;
 7import org.junit.jupiter.api.Test;
 8
 9/**
10 * [Produce Consumer - EASY]()
11 *
12 * - wait & notify
13 */
14public class PCWaitNotify {
15
16    @SneakyThrows
17    @Test
18    public void test() {
19        MyBlockingQueue<String> queue = new MyBlockingQueue<>();
20        Runnable producer = () -> {
21            for (int i = 0; i < 20; i++) {
22                queue.put(String.valueOf(i));
23                System.out.println("Published: " + i);
24            }
25            queue.put("END");
26        };
27
28        Runnable consumer = () -> {
29            while (true) {
30                String val = queue.take();
31                if (val.equals("END")) break;
32                System.out.println("Consumed: " + val);
33            }
34        };
35
36        Thread p = new Thread(producer);
37        Thread c = new Thread(consumer);
38        p.start();
39        c.start();
40        System.out.println("Producer and Consumer has been started");
41        p.join();
42        c.join();
43        System.out.println("Completed");
44    }
45
46    class MyBlockingQueue<E> {
47        private Queue<E> queue = new LinkedList<>();
48        private int size = 5;
49
50        public void put(E e) {
51            synchronized (queue) {
52                try {
53                    if (queue.size() == size) {
54                        queue.wait();
55                    }
56                    queue.add(e);
57                    queue.notifyAll();
58                } catch (InterruptedException ex) {
59                    ex.printStackTrace();
60                }
61            }
62        }
63
64        public E take() {
65            synchronized (queue) {
66                try {
67                    while (queue.size() == 0) {
68                        queue.wait();
69                    }
70                    E item = queue.remove();
71                    queue.notifyAll();
72                    return item;
73                } catch (InterruptedException e) {
74                    e.printStackTrace();
75                    return null;
76                }
77            }
78        }
79    }
80}
81
82

Producer Consumer using locks

  1package com.demo.basics.concurrency._04_producerconsumer;
  2
  3import java.util.LinkedList;
  4import java.util.Queue;
  5import java.util.concurrent.CountDownLatch;
  6import java.util.concurrent.ExecutorService;
  7import java.util.concurrent.Executors;
  8import java.util.concurrent.locks.Condition;
  9import java.util.concurrent.locks.Lock;
 10import java.util.concurrent.locks.ReentrantLock;
 11
 12import lombok.SneakyThrows;
 13import org.junit.jupiter.api.Test;
 14
 15/**
 16 * [Produce Consumer - EASY]()
 17 *
 18 * - locks
 19 */
 20public class PCLock {
 21
 22    @SneakyThrows
 23    @Test
 24    public void test() {
 25
 26        MyBlockingQueue<String> queue = new MyBlockingQueue<>();
 27        ExecutorService executor = Executors.newFixedThreadPool(5);
 28        CountDownLatch latch = new CountDownLatch(2);
 29        Runnable producer = () -> {
 30            try {
 31                for (int i = 0; i < 20; i++) {
 32                    queue.put(String.valueOf(i));
 33                    System.out.println("Published: " + i);
 34                }
 35                queue.put("END");
 36            } finally {
 37                latch.countDown();
 38            }
 39        };
 40
 41        Runnable consumer = () -> {
 42            try {
 43                while (true) {
 44                    //TimeUnit.SECONDS.sleep(3);
 45                    String val = queue.take();
 46                    if (val.equals("END")) break;
 47                    System.out.println("Consumed: " + val);
 48                }
 49            } catch (Exception ex) {
 50                //Do Nothing
 51            } finally {
 52                latch.countDown();
 53            }
 54        };
 55        executor.submit(producer);
 56        executor.submit(consumer);
 57        latch.await();
 58
 59    }
 60
 61    class MyBlockingQueue<E> {
 62        private Queue<E> queue = new LinkedList<>();
 63        private int size = 5;
 64        private Lock lock = new ReentrantLock(true);
 65        private Condition notFull = lock.newCondition();
 66        private Condition notEmpty = lock.newCondition();
 67
 68        public void put(E e) {
 69            lock.lock();
 70            try {
 71                if (queue.size() == size) {
 72                    notFull.await();
 73                }
 74                queue.add(e);
 75                notEmpty.signalAll();
 76            } catch (InterruptedException ex) {
 77                ex.printStackTrace();
 78            } finally {
 79                lock.unlock();
 80            }
 81        }
 82
 83        public E take() {
 84            lock.lock();
 85            try {
 86                while (queue.size() == 0) {
 87                    notEmpty.await();
 88                }
 89                E item = queue.remove();
 90                notFull.signalAll();
 91                return item;
 92            } catch (InterruptedException e) {
 93                e.printStackTrace();
 94                return null;
 95            } finally {
 96                lock.unlock();
 97            }
 98        }
 99    }
100}
101
102
comments powered by Disqus