Jenkins - Data Processing
Overview
Spring boot application integrated with jenkins pipeline for data processing jobs.
Github: https://github.com/gitorko/project84
Jenkins
Jenkins is mostly used for setting up CI/CD or build pipelines. Here we will use it to setup a data pipeline that can be used to orchestrate data processing jobs.
Requirement
Lets consider a company sells paint.
- STAGE1: They get their orders from the field in file format to their FTP server. This has to be processed and uploaded to the db. As these are large files ability re-run jobs after fixing files is required.
- STAGE2: They receive their material supplier in file format to their FTP server. This has to be processed and uploaded to the db. As these are large files ability re-run jobs after fixing files is required.
- STAGE3: Once the order and material is uploaded to db, if the order can be fulfilled the paint color and quantity need to be grouped by city.
- STAGE4: Additional buffer needs to be added to cover any shortages. This job is a small job and can be run in parallel.
- STAGE5: Sales rep bonus point need to be added in case the offer is active. This job is a small job and can be run in parallel.
- STAGE6: Order needs to be sent to factory in each city.
The features of jenkins that make it friendly for data processing are:
- Log view - Ability to look at logs across different stages
- Graph view - Ability to look at a run graphically.
- Input - Ability to provide input to job at run time.
- Scheduling - Ability to schedule jobs at periodic interval.
- Parallel execution - Ability to run jobs in parallel
- Agents load distribution - Ability to run the job on other agent machines distributing the load.
- Time to complete - Ability to see which job is running and history of runs.
- Time taken - Ability to view each stage time taken over long periods to identify trends in execution.
- Re-Run - Ability to re-run a particular stage of the failed job.
- Slack - Ability to notify users on slack after job completion.
- Pull from maven - Ability to download the jar from maven.
- Plugin support - Numerous plugin are available for jenkins.
Code
The backend job that needs to do the processing. It takes the input as arguments and processes each stage and writes the results to a postgres db.
- Ensure that each job can run in isolation and updates just one table. 2 stages should never update the same table.
- Ensure that it throws runtime exception in case of failure.
- Ensure logging is correctly added to identify the issue.
- Ensure the stage can be re-run many times. This is done by resetting the data.
- Change the value of BASE_PATH accordingly.
1package com.demo.project84;
2
3import java.io.BufferedReader;
4import java.io.Serializable;
5import java.nio.file.Files;
6import java.nio.file.Path;
7import java.nio.file.Paths;
8import java.time.LocalDate;
9import java.util.HashMap;
10import java.util.List;
11import java.util.Map;
12import jakarta.persistence.Column;
13import jakarta.persistence.Entity;
14import jakarta.persistence.GeneratedValue;
15import jakarta.persistence.GenerationType;
16import jakarta.persistence.Id;
17
18import lombok.AllArgsConstructor;
19import lombok.Builder;
20import lombok.Data;
21import lombok.NoArgsConstructor;
22import lombok.RequiredArgsConstructor;
23import lombok.SneakyThrows;
24import lombok.extern.slf4j.Slf4j;
25import org.springframework.boot.CommandLineRunner;
26import org.springframework.boot.SpringApplication;
27import org.springframework.boot.autoconfigure.SpringBootApplication;
28import org.springframework.data.jpa.repository.JpaRepository;
29
30@SpringBootApplication
31@RequiredArgsConstructor
32@Slf4j
33public class Main implements CommandLineRunner {
34 private static final String BASE_PATH = "/Users/asurendra/code/pet/project84/";
35
36 final OrderRepo orderRepo;
37 final MaterialRepo materialRepo;
38 final ProcessedRepo processedRepo;
39 final BonusRepo bonusRepo;
40 final FactoryRepo factoryRepo;
41
42 public static void main(String[] args) {
43 SpringApplication.run(Main.class, args);
44 }
45
46 @Override
47 public void run(String... args) throws Exception {
48 String caseType = args[0];
49 switch (caseType) {
50 case "STAGE1":
51 stage1();
52 break;
53 case "STAGE2":
54 stage2();
55 break;
56 case "STAGE3":
57 stage3();
58 break;
59 case "STAGE4":
60 stage4();
61 break;
62 case "STAGE5":
63 stage5();
64 break;
65 case "STAGE6":
66 stage6();
67 break;
68 default:
69 throw new IllegalStateException("Unexpected value: " + caseType);
70 }
71 }
72
73 /**
74 * Load order file to db.
75 */
76 @SneakyThrows
77 private void stage1() {
78 log.info("Loading orders to db");
79 try {
80 orderRepo.deleteAll();
81 Path path = Paths.get(BASE_PATH + "order-file.txt");
82 try (BufferedReader reader = Files.newBufferedReader(path)) {
83 while (reader.ready()) {
84 String line = reader.readLine();
85 log.info(line);
86 String[] split = line.split(",");
87 OrderDetail order = OrderDetail.builder()
88 .color(split[0])
89 .quantity(Double.valueOf(split[1]))
90 .city(split[2])
91 .salesRep(split[3])
92 .orderDate(LocalDate.parse(split[4]))
93 .build();
94 orderRepo.save(order);
95 }
96 }
97 log.info("Loading orders completed");
98 } catch (Exception ex) {
99 log.error("ERROR: stage1", ex);
100 throw new RuntimeException("ERROR: stage1");
101 }
102 }
103
104 /**
105 * Load material file to db.
106 */
107 @SneakyThrows
108 private void stage2() {
109 log.info("Loading materials to db");
110 try {
111 materialRepo.deleteAll();
112 Path path = Paths.get(BASE_PATH + "material-file.txt");
113 try (BufferedReader reader = Files.newBufferedReader(path)) {
114 while (reader.ready()) {
115 String line = reader.readLine();
116 log.info(line);
117 String[] split = line.split(",");
118 MaterialDetail material = MaterialDetail.builder()
119 .color(split[0])
120 .quantity(Double.valueOf(split[1]))
121 .orderDate(LocalDate.parse(split[2]))
122 .build();
123 materialRepo.save(material);
124 }
125 }
126 log.info("Loading orders completed");
127 } catch (Exception ex) {
128 log.error("ERROR: stage2", ex);
129 throw new RuntimeException("ERROR: stage2");
130 }
131 }
132
133 /**
134 * Process orders if it can be fulfilled.
135 */
136 private void stage3() {
137 log.info("Processing orders");
138 try {
139 processedRepo.deleteAll();
140 bonusRepo.deleteAll();
141 factoryRepo.deleteAll();
142 Map<String, Double> cache = new HashMap<>();
143 materialRepo.findAll().forEach(m -> {
144 cache.put(m.getColor(), m.getQuantity());
145 });
146 Map<String, Double> result = new HashMap<>();
147 List<OrderDetail> orders = orderRepo.findAll();
148 for (OrderDetail order : orders) {
149 Double balance = cache.get(order.getColor());
150 if (order.getQuantity() < balance) {
151 balance = balance - order.getQuantity();
152 cache.put(order.getColor(), balance);
153 String key = order.getColor() + ":" + order.getCity();
154 Double count = result.containsKey(key) ? result.get(key) + order.getQuantity() : order.getQuantity();
155 result.put(key, count);
156 //add to processed.
157 } else {
158 log.info("ERROR: stage3, will not be able to complete all order!");
159 throw new RuntimeException("ERROR: stage3, will not be able to complete all order!");
160 }
161 }
162 result.forEach((k, v) -> {
163 String[] split = k.split("\\:");
164 processedRepo.save(ProcessedDetail.builder()
165 .color(split[0])
166 .quantity(v)
167 .processDate(LocalDate.now())
168 .city(split[1])
169 .build());
170 });
171 log.info("Processing orders completed");
172 } catch (Exception ex) {
173 log.error("ERROR: stage3", ex);
174 throw new RuntimeException("ERROR: stage3");
175 }
176 }
177
178 /**
179 * Add buffer to order quantity to ensure no shortage.
180 */
181 private void stage4() {
182 log.info("Adding buffer");
183 try {
184 factoryRepo.deleteAll();
185 List<ProcessedDetail> processedDetail = processedRepo.findAll();
186 processedDetail.forEach(p -> {
187 FactoryDetail factory = FactoryDetail.builder()
188 .color(p.getColor())
189 .city(p.getCity())
190 .processDate(LocalDate.now())
191 .build();
192 if (p.getQuantity() > 500) {
193 factory.setQuantity(p.getQuantity() + (p.getQuantity() * 0.30));
194 } else if (p.getQuantity() > 200) {
195 factory.setQuantity(p.getQuantity() + (p.getQuantity() * 0.20));
196 } else if (p.getQuantity() > 100) {
197 factory.setQuantity(p.getQuantity() + (p.getQuantity() * 0.10));
198 p.setQuantity(p.getQuantity() + (p.getQuantity() * 0.10));
199 } else {
200 p.setQuantity(p.getQuantity());
201 }
202 factoryRepo.save(factory);
203
204 });
205 log.info("Adding buffer completed");
206 } catch (Exception ex) {
207 log.error("ERROR: stage4", ex);
208 throw new RuntimeException("ERROR: stage4");
209 }
210 }
211
212 /**
213 * Add bonus points for sales rep.
214 */
215 private void stage5() {
216 log.info("Adding Sales bonus");
217 try {
218 bonusRepo.deleteAll();
219 Map<String, Double> result = new HashMap<>();
220 List<OrderDetail> orders = orderRepo.findAll();
221 for (OrderDetail order : orders) {
222 String key = order.getSalesRep();
223 Double count = result.containsKey(key) ? result.get(key) + order.getQuantity() : order.getQuantity();
224 result.put(key, count);
225 }
226
227 result.forEach((k, v) -> {
228 if (v > 200) {
229 bonusRepo.save(BonusDetail.builder()
230 .salesRep(k)
231 .bonusPoints(5)
232 .orderDate(LocalDate.now())
233 .build());
234 }
235 if (v > 500) {
236 bonusRepo.save(BonusDetail.builder()
237 .salesRep(k)
238 .bonusPoints(15)
239 .orderDate(LocalDate.now())
240 .build());
241 }
242 });
243 log.info("Adding Sales bonus completed");
244 } catch (Exception ex) {
245 log.error("ERROR: stage5", ex);
246 throw new RuntimeException("ERROR: stage5");
247 }
248 }
249
250 /**
251 * Notify factory to start production.
252 */
253 private void stage6() {
254 log.info("Notifying factory");
255 try {
256 List<ProcessedDetail> processedDetail = processedRepo.findAll();
257 processedDetail.forEach(p -> {
258 log.info("Notifiying factory: {}", p);
259 });
260 log.info("Notifying factory completed");
261 } catch (Exception ex) {
262 log.error("ERROR: stage6", ex);
263 throw new RuntimeException("ERROR: stage6");
264 }
265 }
266}
267
268interface BonusRepo extends JpaRepository<BonusDetail, Long> {
269}
270
271interface MaterialRepo extends JpaRepository<MaterialDetail, Long> {
272}
273
274interface OrderRepo extends JpaRepository<OrderDetail, Long> {
275}
276
277interface ProcessedRepo extends JpaRepository<ProcessedDetail, Long> {
278}
279
280interface FactoryRepo extends JpaRepository<FactoryDetail, Long> {
281}
282
283@Entity
284@Data
285@Builder
286@AllArgsConstructor
287@NoArgsConstructor
288class BonusDetail {
289 @Id
290 @GeneratedValue(strategy = GenerationType.AUTO)
291 @Column(name = "id")
292 private Long id;
293 private String salesRep;
294 private Integer bonusPoints;
295 private LocalDate orderDate;
296}
297
298@Entity
299@Data
300@Builder
301@AllArgsConstructor
302@NoArgsConstructor
303class MaterialDetail implements Serializable {
304
305 @Id
306 @GeneratedValue(strategy = GenerationType.AUTO)
307 @Column(name = "id")
308 private Long id;
309 private String color;
310 private Double quantity;
311 private LocalDate orderDate;
312
313}
314
315@Entity
316@Data
317@Builder
318@AllArgsConstructor
319@NoArgsConstructor
320class OrderDetail {
321 @Id
322 @GeneratedValue(strategy = GenerationType.AUTO)
323 @Column(name = "id")
324 private Long id;
325 private String color;
326 private Double quantity;
327 private String city;
328 private String salesRep;
329 private LocalDate orderDate;
330}
331
332@Entity
333@Data
334@Builder
335@AllArgsConstructor
336@NoArgsConstructor
337class ProcessedDetail {
338 @Id
339 @GeneratedValue(strategy = GenerationType.AUTO)
340 @Column(name = "id")
341 private Long id;
342 private String color;
343 private Double quantity;
344 private String city;
345 private LocalDate processDate;
346}
347
348@Entity
349@Data
350@Builder
351@AllArgsConstructor
352@NoArgsConstructor
353class FactoryDetail {
354 @Id
355 @GeneratedValue(strategy = GenerationType.AUTO)
356 @Column(name = "id")
357 private Long id;
358 private String color;
359 private Double quantity;
360 private String city;
361 private LocalDate processDate;
362}
The properties file
1spring:
2 main:
3 banner-mode: "off"
4 web-application-type: none
5 datasource:
6 driver-class-name: org.postgresql.Driver
7 url: jdbc:postgresql://localhost:5432/test-db
8 username: test
9 password: test@123
10 jpa:
11 show-sql: false
12 hibernate.ddl-auto: update
13 properties.hibernate.temp.use_jdbc_metadata_defaults: false
14 database-platform: org.hibernate.dialect.PostgreSQLDialect
Jenkins
To setup jenkins download the jenkins.war from https://www.jenkins.io/download/ and start the server. Once the server starts you will see the admin password in the console log. This will be used to setup jenkins for the first time. This will be a one time activity.
1java -jar jenkins.war
Open the below url
Alternate way to setup jenkins via docker
1docker run --name my-jenkins -p 8080:8080 -p 50000:50000 jenkins/jenkins:lts-jdk11
Follow the steps to finish the configuration
Install the 'Pipeline Implementation for Blue Ocean plugin' to look at graphs
1user: admin
2pwd: admin@123
Go to Dashboard and click on 'New Item' and create a pipeline, enter the script below and click on 'Build Now' and ensure it is successful.
1pipeline {
2 agent any
3
4 stages {
5 stage('STAGE1') {
6 steps {
7 echo 'STAGE1..'
8 }
9 }
10 stage('STAGE2') {
11 steps {
12 echo 'STAGE2..'
13 }
14 }
15 }
16}
If the setup is correct this test job should be successful.
Now create 6 pipeline jobs and a master pipeline job. All stage jobs will be same as below but input param will change.
stage1-job - STAGE1 stage2-job - STAGE2 stage3-job - STAGE3 stage4-job - STAGE4 stage5-job - STAGE5 stage6-job - STAGE6
Change param accordingly
1pipeline {
2 agent any
3
4 stages {
5 stage('STAGE1') {
6 steps {
7 dir ("/Users/asurendra/code/pet/project84/build/libs") {
8 sh "java -jar project84-1.0.0.jar STAGE1"
9 }
10 }
11 }
12 }
13}
data-job-pipeline job
1pipeline {
2 agent any
3 parameters {
4 booleanParam(name: "BONUS_OFFER", defaultValue: true)
5 }
6 stages {
7 stage('STAGE1') {
8 steps {
9 build job: 'stage1-job'
10 }
11 }
12 stage('STAGE2') {
13 steps {
14 build job: 'stage2-job'
15 }
16 }
17 stage('STAGE3') {
18 steps {
19 build job: 'stage3-job'
20 }
21 }
22 stage("FORK") {
23 parallel {
24 stage('STAGE4') {
25 steps {
26 build job: 'stage4-job'
27 }
28 }
29 stage('STAGE5') {
30 //If bonus points are counted for sales then run this job.
31 when { expression { params.BONUS_OFFER } }
32 steps {
33 build job: 'stage5-job'
34 }
35 }
36 }
37 }
38 stage('STAGE6') {
39 steps {
40 build job: 'stage6-job'
41 }
42 }
43
44 }
45}
Click on 'Build with Parameters' and select the input checkbox. If bonus offer is applicable STAGE5 is executed else it wont be executed.
Monitor the job
Once the job is complete click on 'Pipeline graph' this shows the path taken graphically. You can also see the time take for each stage to complete. This can be useful to monitor the job over long time periods.
Rerun the job without the bonus offer checkbox, once completed you will see the graph shows the node with STAGE5 as skipped.
Look at the 'Console Output' that track each jobs log output. You can drill down to each stage job and look at the log specific to that.
Now lets make a stage fail and then fix the issue and re-run the stage.
Modify the material-file.txt and reduce the quantity to 10. Run the 'data-job-pipeline' job.
Since the materials are less and order cant be fulfilled the pipeline will fail, you can now look at the logs and identify the issue.
Fix the file again by changing the value back to what it was. Click on 'Restart from Stage' and select STAGE2. We need to seed the material file again hence restarting at STAGE2.
Once the job is successful you will notice that it didnt run the STAGE1 job and only ran STAGE2 and onwards.
You can even schedule this job to run daily.
Setup
1# Project61
2
3Jenkins Pipeline + Data processing
4
5[https://gitorko.github.io/jenkins-data-processing/](https://gitorko.github.io/jenkins-data-processing/)
6
7### Version
8
9Check version
10
11```bash
12$java --version
13openjdk 17.0.3 2022-04-19 LTS
14```
15
16### Postgres DB
17
18```
19docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:9.6.10
20docker ps
21docker exec -it pg-container psql -U postgres -W postgres
22CREATE USER test WITH PASSWORD 'test@123';
23CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
24grant all PRIVILEGES ON DATABASE "test-db" to test;
25
26docker stop pg-container
27docker start pg-container
28```
29
30### Dev
31
32Build the project & test if the jar works.
33
34```bash
35./gradlew clean build
36cd project84/build/libs
37java -jar project84-1.0.0.jar STAGE1
38java -jar project84-1.0.0.jar STAGE2
39java -jar project84-1.0.0.jar STAGE3
40java -jar project84-1.0.0.jar STAGE4
41java -jar project84-1.0.0.jar STAGE5
42java -jar project84-1.0.0.jar STAGE6
43```
44
45To truncate the tables
46
47```sql
48truncate order_detail;
49truncate material_detail;
50truncate processed_detail;
51truncate bonus_detail;
52truncate factory_detail;
53```