Jenkins - Data Processing

Overview

Jenkins is mostly used to setup CI/CD pipelines. Here we will use it to setup a data pipeline that can be used to orchestrate data processing jobs.

Github: https://github.com/gitorko/project84

Requirement

Lets consider a company sells paint.

  • STAGE1: They get their orders from the field in file format to their FTP server. This has to be processed and uploaded to the db. As these are large files ability re-run jobs after fixing files is required.
  • STAGE2: They receive their material supplier in file format to their FTP server. This has to be processed and uploaded to the db. As these are large files ability re-run jobs after fixing files is required.
  • STAGE3: Once the order and material is uploaded to db, if the order can be fulfilled the paint color and quantity need to be grouped by city.
  • STAGE4: Additional buffer needs to be added to cover any shortages. This job is a small job and can be run in parallel.
  • STAGE5: Sales rep bonus point need to be added in case the offer is active. This job is a small job and can be run in parallel.
  • STAGE6: Order needs to be sent to factory in each city.

The features of jenkins that make it friendly for data processing are:

  1. Log view - Ability to look at logs across different stages
  2. Graph view - Ability to look at a run graphically.
  3. Input - Ability to provide input to job at run time.
  4. Scheduling - Ability to schedule jobs at periodic interval.
  5. Parallel execution - Ability to run jobs in parallel
  6. Agents load distribution - Ability to run the job on other agent machines distributing the load.
  7. Time to complete - Ability to see which job is running and history of runs.
  8. Time taken - Ability to view each stage time taken over long periods to identify trends in execution.
  9. Re-Run - Ability to re-run a particular stage of the failed job.
  10. Slack - Ability to notify users on slack after job completion.
  11. Pull from maven - Ability to download the jar from maven.
  12. Plugin support - Numerous plugin are available for jenkins.

Code

The backend job that needs to do the processing. It takes the input as arguments and processes each stage and writes the results to a postgres db.

  1. Ensure that each job can run in isolation and updates just one table. 2 stages should never update the same table.
  2. Ensure that it throws runtime exception in case of failure.
  3. Ensure logging is correctly added to identify the issue.
  4. Ensure the stage can be re-run many times. This is done by resetting the data.
  5. Change the value of BASE_PATH accordingly.
  1package com.demo.project84;
  2
  3import java.io.BufferedReader;
  4import java.io.Serializable;
  5import java.nio.file.Files;
  6import java.nio.file.Path;
  7import java.nio.file.Paths;
  8import java.time.LocalDate;
  9import java.util.HashMap;
 10import java.util.List;
 11import java.util.Map;
 12import javax.persistence.Column;
 13import javax.persistence.Entity;
 14import javax.persistence.GeneratedValue;
 15import javax.persistence.GenerationType;
 16import javax.persistence.Id;
 17
 18import lombok.AllArgsConstructor;
 19import lombok.Builder;
 20import lombok.Data;
 21import lombok.NoArgsConstructor;
 22import lombok.RequiredArgsConstructor;
 23import lombok.SneakyThrows;
 24import lombok.extern.slf4j.Slf4j;
 25import org.springframework.boot.CommandLineRunner;
 26import org.springframework.boot.SpringApplication;
 27import org.springframework.boot.autoconfigure.SpringBootApplication;
 28import org.springframework.data.jpa.repository.JpaRepository;
 29
 30@SpringBootApplication
 31@RequiredArgsConstructor
 32@Slf4j
 33public class Main implements CommandLineRunner {
 34    private static final String BASE_PATH = "/Users/asurendra/code/pet/project84/";
 35
 36    final OrderRepo orderRepo;
 37    final MaterialRepo materialRepo;
 38    final ProcessedRepo processedRepo;
 39    final BonusRepo bonusRepo;
 40    final FactoryRepo factoryRepo;
 41
 42    public static void main(String[] args) {
 43        SpringApplication.run(Main.class, args);
 44    }
 45
 46    @Override
 47    public void run(String... args) throws Exception {
 48        String caseType = args[0];
 49        switch (caseType) {
 50            case "STAGE1":
 51                stage1();
 52                break;
 53            case "STAGE2":
 54                stage2();
 55                break;
 56            case "STAGE3":
 57                stage3();
 58                break;
 59            case "STAGE4":
 60                stage4();
 61                break;
 62            case "STAGE5":
 63                stage5();
 64                break;
 65            case "STAGE6":
 66                stage6();
 67                break;
 68            default:
 69                throw new IllegalStateException("Unexpected value: " + caseType);
 70        }
 71    }
 72
 73    /**
 74     * Load order file to db.
 75     */
 76    @SneakyThrows
 77    private void stage1() {
 78        log.info("Loading orders to db");
 79        try {
 80            orderRepo.deleteAll();
 81            Path path = Paths.get(BASE_PATH + "order-file.txt");
 82            try (BufferedReader reader = Files.newBufferedReader(path)) {
 83                while (reader.ready()) {
 84                    String line = reader.readLine();
 85                    log.info(line);
 86                    String[] split = line.split(",");
 87                    OrderDetail order = OrderDetail.builder()
 88                            .color(split[0])
 89                            .quantity(Double.valueOf(split[1]))
 90                            .city(split[2])
 91                            .salesRep(split[3])
 92                            .orderDate(LocalDate.parse(split[4]))
 93                            .build();
 94                    orderRepo.save(order);
 95                }
 96            }
 97            log.info("Loading orders completed");
 98        } catch (Exception ex) {
 99            log.error("ERROR: stage1", ex);
100            throw new RuntimeException("ERROR: stage1");
101        }
102    }
103
104    /**
105     * Load material file to db.
106     */
107    @SneakyThrows
108    private void stage2() {
109        log.info("Loading materials to db");
110        try {
111            materialRepo.deleteAll();
112            Path path = Paths.get(BASE_PATH + "material-file.txt");
113            try (BufferedReader reader = Files.newBufferedReader(path)) {
114                while (reader.ready()) {
115                    String line = reader.readLine();
116                    log.info(line);
117                    String[] split = line.split(",");
118                    MaterialDetail material = MaterialDetail.builder()
119                            .color(split[0])
120                            .quantity(Double.valueOf(split[1]))
121                            .orderDate(LocalDate.parse(split[2]))
122                            .build();
123                    materialRepo.save(material);
124                }
125            }
126            log.info("Loading orders completed");
127        } catch (Exception ex) {
128            log.error("ERROR: stage2", ex);
129            throw new RuntimeException("ERROR: stage2");
130        }
131    }
132
133    /**
134     * Process orders if it can be fulfilled.
135     */
136    private void stage3() {
137        log.info("Processing orders");
138        try {
139            processedRepo.deleteAll();
140            bonusRepo.deleteAll();
141            factoryRepo.deleteAll();
142            Map<String, Double> cache = new HashMap<>();
143            materialRepo.findAll().forEach(m -> {
144                cache.put(m.getColor(), m.getQuantity());
145            });
146            Map<String, Double> result = new HashMap<>();
147            List<OrderDetail> orders = orderRepo.findAll();
148            for (OrderDetail order : orders) {
149                Double balance = cache.get(order.getColor());
150                if (order.getQuantity() < balance) {
151                    balance = balance - order.getQuantity();
152                    cache.put(order.getColor(), balance);
153                    String key = order.getColor() + ":" + order.getCity();
154                    Double count = result.containsKey(key) ? result.get(key) + order.getQuantity() : order.getQuantity();
155                    result.put(key, count);
156                    //add to processed.
157                } else {
158                    log.info("ERROR: stage3, will not be able to complete all order!");
159                    throw new RuntimeException("ERROR: stage3, will not be able to complete all order!");
160                }
161            }
162            result.forEach((k, v) -> {
163                String[] split = k.split("\\:");
164                processedRepo.save(ProcessedDetail.builder()
165                        .color(split[0])
166                        .quantity(v)
167                        .processDate(LocalDate.now())
168                        .city(split[1])
169                        .build());
170            });
171            log.info("Processing orders completed");
172        } catch (Exception ex) {
173            log.error("ERROR: stage3", ex);
174            throw new RuntimeException("ERROR: stage3");
175        }
176    }
177
178    /**
179     * Add buffer to order quantity to ensure no shortage.
180     */
181    private void stage4() {
182        log.info("Adding buffer");
183        try {
184            factoryRepo.deleteAll();
185            List<ProcessedDetail> processedDetail = processedRepo.findAll();
186            processedDetail.forEach(p -> {
187                FactoryDetail factory = FactoryDetail.builder()
188                        .color(p.getColor())
189                        .city(p.getCity())
190                        .processDate(LocalDate.now())
191                        .build();
192                if (p.getQuantity() > 500) {
193                    factory.setQuantity(p.getQuantity() + (p.getQuantity() * 0.30));
194                } else if (p.getQuantity() > 200) {
195                    factory.setQuantity(p.getQuantity() + (p.getQuantity() * 0.20));
196                } else if (p.getQuantity() > 100) {
197                    factory.setQuantity(p.getQuantity() + (p.getQuantity() * 0.10));
198                    p.setQuantity(p.getQuantity() + (p.getQuantity() * 0.10));
199                } else {
200                    p.setQuantity(p.getQuantity());
201                }
202                factoryRepo.save(factory);
203
204            });
205            log.info("Adding buffer completed");
206        } catch (Exception ex) {
207            log.error("ERROR: stage4", ex);
208            throw new RuntimeException("ERROR: stage4");
209        }
210    }
211
212    /**
213     * Add bonus points for sales rep.
214     */
215    private void stage5() {
216        log.info("Adding Sales bonus");
217        try {
218            bonusRepo.deleteAll();
219            Map<String, Double> result = new HashMap<>();
220            List<OrderDetail> orders = orderRepo.findAll();
221            for (OrderDetail order : orders) {
222                String key = order.getSalesRep();
223                Double count = result.containsKey(key) ? result.get(key) + order.getQuantity() : order.getQuantity();
224                result.put(key, count);
225            }
226
227            result.forEach((k, v) -> {
228                if (v > 200) {
229                    bonusRepo.save(BonusDetail.builder()
230                            .salesRep(k)
231                            .bonusPoints(5)
232                            .orderDate(LocalDate.now())
233                            .build());
234                }
235                if (v > 500) {
236                    bonusRepo.save(BonusDetail.builder()
237                            .salesRep(k)
238                            .bonusPoints(15)
239                            .orderDate(LocalDate.now())
240                            .build());
241                }
242            });
243            log.info("Adding Sales bonus completed");
244        } catch (Exception ex) {
245            log.error("ERROR: stage5", ex);
246            throw new RuntimeException("ERROR: stage5");
247        }
248    }
249
250    /**
251     * Notify factory to start production.
252     */
253    private void stage6() {
254        log.info("Notifying factory");
255        try {
256            List<ProcessedDetail> processedDetail = processedRepo.findAll();
257            processedDetail.forEach(p -> {
258                log.info("Notifiying factory: {}", p);
259            });
260            log.info("Notifying factory completed");
261        } catch (Exception ex) {
262            log.error("ERROR: stage6", ex);
263            throw new RuntimeException("ERROR: stage6");
264        }
265    }
266}
267
268interface BonusRepo extends JpaRepository<BonusDetail, Long> {
269}
270
271interface MaterialRepo extends JpaRepository<MaterialDetail, Long> {
272}
273
274interface OrderRepo extends JpaRepository<OrderDetail, Long> {
275}
276
277interface ProcessedRepo extends JpaRepository<ProcessedDetail, Long> {
278}
279
280interface FactoryRepo extends JpaRepository<FactoryDetail, Long> {
281}
282
283@Entity
284@Data
285@Builder
286@AllArgsConstructor
287@NoArgsConstructor
288class BonusDetail {
289    @Id
290    @GeneratedValue(strategy = GenerationType.AUTO)
291    @Column(name = "id")
292    private Long id;
293    private String salesRep;
294    private Integer bonusPoints;
295    private LocalDate orderDate;
296}
297
298@Entity
299@Data
300@Builder
301@AllArgsConstructor
302@NoArgsConstructor
303class MaterialDetail implements Serializable {
304
305    @Id
306    @GeneratedValue(strategy = GenerationType.AUTO)
307    @Column(name = "id")
308    private Long id;
309    private String color;
310    private Double quantity;
311    private LocalDate orderDate;
312
313}
314
315@Entity
316@Data
317@Builder
318@AllArgsConstructor
319@NoArgsConstructor
320class OrderDetail {
321    @Id
322    @GeneratedValue(strategy = GenerationType.AUTO)
323    @Column(name = "id")
324    private Long id;
325    private String color;
326    private Double quantity;
327    private String city;
328    private String salesRep;
329    private LocalDate orderDate;
330}
331
332@Entity
333@Data
334@Builder
335@AllArgsConstructor
336@NoArgsConstructor
337class ProcessedDetail {
338    @Id
339    @GeneratedValue(strategy = GenerationType.AUTO)
340    @Column(name = "id")
341    private Long id;
342    private String color;
343    private Double quantity;
344    private String city;
345    private LocalDate processDate;
346}
347
348@Entity
349@Data
350@Builder
351@AllArgsConstructor
352@NoArgsConstructor
353class FactoryDetail {
354    @Id
355    @GeneratedValue(strategy = GenerationType.AUTO)
356    @Column(name = "id")
357    private Long id;
358    private String color;
359    private Double quantity;
360    private String city;
361    private LocalDate processDate;
362}

The properties file

 1spring:
 2  main:
 3    banner-mode: "off"
 4    web-application-type: none
 5  datasource:
 6    driver-class-name: org.postgresql.Driver
 7    url: jdbc:postgresql://localhost:5432/test-db
 8    username: test
 9    password: test@123
10  jpa:
11    show-sql: false
12    hibernate.ddl-auto: update
13    properties.hibernate.temp.use_jdbc_metadata_defaults: false
14    database-platform: org.hibernate.dialect.PostgreSQLDialect

Jenkins

To setup jenkins download the jenkins.war from https://www.jenkins.io/download/ and start the server. Once the server starts you will see the admin password in the console log. This will be used to setup jenkins for the first time. This will be a one time activity.

1java -jar jenkins.war

Open the below url

http://localhost:8080/

Alternate way to setup jenkins via docker

1docker run --name my-jenkins -p 8080:8080 -p 50000:50000 jenkins/jenkins:lts-jdk11

Follow the steps to finish the configuration

Install the 'Pipeline Implementation for Blue Ocean plugin' to look at graphs

1user: admin
2pwd: admin@123

Go to Dashboard and click on 'New Item' and create a pipeline, enter the script below and click on 'Build Now' and ensure it is successful.

 1pipeline {
 2    agent any
 3
 4    stages {
 5        stage('STAGE1') {
 6            steps {
 7                echo 'STAGE1..'
 8            }
 9        }
10        stage('STAGE2') {
11            steps {
12                echo 'STAGE2..'
13            }
14        }
15    }
16}

If the setup is correct this test job should be successful.

Now create 6 pipeline jobs and a master pipeline job. All stage jobs will be same as below but input param will change.

stage1-job - STAGE1 stage2-job - STAGE2 stage3-job - STAGE3 stage4-job - STAGE4 stage5-job - STAGE5 stage6-job - STAGE6

Change param accordingly

 1pipeline {
 2    agent any
 3
 4    stages {
 5        stage('STAGE1') {
 6                steps {
 7                    dir ("/Users/asurendra/code/pet/project84/build/libs") {
 8                        sh "java -jar project84-1.0.0.jar STAGE1"
 9                    }
10                }
11            }
12        }
13}

data-job-pipeline job

 1pipeline {
 2    agent any
 3     parameters {
 4        booleanParam(name: "BONUS_OFFER", defaultValue: true)
 5    }
 6    stages {
 7        stage('STAGE1') {
 8            steps {
 9                build job: 'stage1-job' 
10            }
11        }
12        stage('STAGE2') {
13            steps {
14                build job: 'stage2-job' 
15            }
16        }
17        stage('STAGE3') {
18            steps {
19                build job: 'stage3-job' 
20            }
21        }
22        stage("FORK") {
23            parallel {
24                stage('STAGE4') {
25                    steps {
26                        build job: 'stage4-job' 
27                    }
28                }
29                stage('STAGE5') {
30                    //If bonus points are counted for sales then run this job.
31                    when { expression { params.BONUS_OFFER } }
32                    steps {
33                        build job: 'stage5-job' 
34                    }
35                }
36            }
37        }
38        stage('STAGE6') {
39            steps {
40                build job: 'stage6-job' 
41            }
42        }
43
44    }
45}

Click on 'Build with Parameters' and select the input checkbox. If bonus offer is applicable STAGE5 is executed else it wont be executed.

Monitor the job

Once the job is complete click on 'Pipeline graph' this shows the path taken graphically. You can also see the time take for each stage to complete. This can be useful to monitor the job over long time periods.

Rerun the job without the bonus offer checkbox, once completed you will see the graph shows the node with STAGE5 as skipped.

Look at the 'Console Output' that track each jobs log output. You can drill down to each stage job and look at the log specific to that.

Now lets make a stage fail and then fix the issue and re-run the stage.

Modify the material-file.txt and reduce the quantity to 10. Run the 'data-job-pipeline' job.

Since the materials are less and order cant be fulfilled the pipeline will fail, you can now look at the logs and identify the issue.

Fix the file again by changing the value back to what it was. Click on 'Restart from Stage' and select STAGE2. We need to seed the material file again hence restarting at STAGE2.

Once the job is successful you will notice that it didnt run the STAGE1 job and only ran STAGE2 and onwards.

You can even schedule this job to run daily.

Setup

Project61

Jenkins Pipeline + Data processing

https://gitorko.github.io/jenkins-data-processing/

Version

Check version

1$java --version
2openjdk 17.0.3 2022-04-19 LTS

Postgres DB

1docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:9.6.10
2docker ps
3docker exec -it pg-container psql -U postgres -W postgres
4CREATE USER test WITH PASSWORD 'test@123';
5CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
6grant all PRIVILEGES ON DATABASE "test-db" to test;
7
8docker stop pg-container
9docker start pg-container

Dev

Build the project & test if the jar works.

1./gradlew clean build
2cd project84/build/libs
3java -jar project84-1.0.0.jar STAGE1
4java -jar project84-1.0.0.jar STAGE2
5java -jar project84-1.0.0.jar STAGE3
6java -jar project84-1.0.0.jar STAGE4
7java -jar project84-1.0.0.jar STAGE5
8java -jar project84-1.0.0.jar STAGE6

To truncate the tables

1truncate order_detail;
2truncate material_detail;
3truncate processed_detail;
4truncate bonus_detail;
5truncate factory_detail;

References

https://github.com/jenkinsci/docker

https://www.jenkins.io/doc/book/pipeline/

comments powered by Disqus