Spring Integration - Basics
Overview
Spring Integration provides a framework to support Enterprise Integration Patterns.
Github: https://github.com/gitorko/project97
Spring Integration
- Messaging support - All communication is treated as asynchronous messages sent between different channels. This provides loose coupling.
- Support of external system - Adapters for ftp,file system, rabbitMQ and other external systems.
A higher level of abstraction over Spring’s support for remoting, messaging, and scheduling is provided so that developers dont have to write the code to interact with these systems but focus only on the business logic. At any point if the source changes from a file system to an ftp server, the changes required will be very minimal.
Spring integration provides different ways to configure:
- XML approach to wire
- Bean annotation approach
- Java DSL approach (We will focus on DSL approach as its easier to read and maintain)
Terminology
- Inbound Adapter - Real world object -> Message
- Outbound Adapter - Message
-> Real world object - Inbound Gateway - Real world object -> Spring Integration -> Real world object
- Outbound Gateway - Spring Integration -> Real world object -> Spring Integration
Message can be split, route, transform, wiretap, enrich, aggregate the messages.
- Message - Wrapper that can wrap a java object, contains payload & headers
- Message Channel - A conduit for transmitting messages between producers & consumers a. Point-to-Point channel - one consumer should receive each message from a channel b. Publish/Subscribe channel - Will broadcast the message to any subscriber listening to that channel.
- Message Transform
- Message Filter
- Message Router - Determines what channel or channels (if any) should receive the message next
- Message Bridge - Connects two message channels or channel adapters
- Splitter
- Aggregator
- Handle - ServiceActivator that handle the message.
- Adapter endpoints - Provide one-way integration.
- Gateway endpoints - Provide two-way request/response integration.
Types of Message Channel:
PollableChannel - Store messages, You need to ask for messages by polling i.e call receive method
a. QueueChannel - Pollable, FIFO, Channel has multiple consumers, only one of them will receive, can be made blocking. b. PriorityChannel - Pollable, Messages to be ordered within the channel based on priority, Channel has multiple consumers, only one of them will receive, can be made blocking. c. RendezvousChannel - Pollable, Synchronous, Similar to QueueChannel but zero capacity to buffer, direct-handoff scenario, wherein a sender blocks until consumer invokes the channel’s receive() method
SubscribableChannel - Event driven, need to subscribe to get the messages. i.e call the subscribe method.
a. Direct Channel - Subscribable, single subscriber, single thread behaviour blocking the sender thread until the message is subscribed. b. Publish Subscribe Channel - Subscribable, many subscribers, all will get the message. c. FixedSubscriberChannel - Subscribable, single subscriber, subscriber that cannot be unsubscribed
ExecutorChannel - Delegates to an instance of TaskExecutor to perform the dispatch
FluxChannel - Allows reactive consumption
Code
1package com.demo.project97.integration;
2
3import java.util.Arrays;
4import java.util.List;
5
6import com.demo.project97.domain.Customer;
7import lombok.extern.slf4j.Slf4j;
8import org.springframework.context.annotation.Bean;
9import org.springframework.integration.annotation.BridgeFrom;
10import org.springframework.integration.annotation.IntegrationComponentScan;
11import org.springframework.integration.channel.DirectChannel;
12import org.springframework.integration.channel.PublishSubscribeChannel;
13import org.springframework.integration.channel.QueueChannel;
14import org.springframework.integration.dsl.IntegrationFlow;
15import org.springframework.integration.dsl.Pollers;
16import org.springframework.integration.store.SimpleMessageStore;
17import org.springframework.messaging.MessageChannel;
18import org.springframework.messaging.PollableChannel;
19import org.springframework.stereotype.Component;
20
21@Slf4j
22@Component
23@IntegrationComponentScan
24public class BasicIntegration {
25
26 @Bean
27 public IntegrationFlow performSplit() {
28 return IntegrationFlow.from("inputChannel1")
29 .split()
30 .transform("Hello "::concat)
31 .handle(message -> {
32 log.info("performSplit: {}", message);
33 })
34 .get();
35 }
36
37 @Bean
38 public IntegrationFlow performAggregate(SimpleMessageStore messageStore) {
39 return IntegrationFlow.from("inputChannel2")
40 .split()
41 .aggregate(a ->
42 a.correlationStrategy(m -> {
43 Customer customer = (Customer) m.getPayload();
44 return customer.getCity();
45 })
46 .releaseStrategy(g -> g.size() > 2)
47 .messageStore(messageStore))
48 .handle(message -> {
49 log.info("performAggregate: {}", message);
50 })
51 .get();
52 }
53
54 @Bean
55 public IntegrationFlow performRoute() {
56 return IntegrationFlow.from("inputChannel3")
57 .split()
58 .log()
59 .route(Customer.class, m -> m.getCity(), m -> m
60 .channelMapping("New York", "channelA")
61 .channelMapping("Bangalore", "channelB"))
62 .get();
63 }
64
65 @Bean
66 public IntegrationFlow handleNewYork() {
67 return IntegrationFlow.from("channelA")
68 .handle(message -> {
69 log.info("handleNewYork: {}", message);
70 })
71 .get();
72 }
73
74 @Bean
75 public IntegrationFlow handleBangalore() {
76 return IntegrationFlow.from("channelB")
77 .handle(message -> {
78 log.info("handleBangalore: {}", message);
79 })
80 .get();
81 }
82
83 @Bean
84 public IntegrationFlow performSubFlow(IntegrationFlow subFlowNewYork, IntegrationFlow subFlowBangalore) {
85 return IntegrationFlow.from("inputChannel4")
86 .split()
87 .log()
88 .route(Customer.class, m -> m.getCity(), m -> m
89 .subFlowMapping("New York", subFlowNewYork)
90 .subFlowMapping("Bangalore", subFlowBangalore))
91 .get();
92 }
93
94 @Bean
95 public IntegrationFlow subFlowNewYork() {
96 return f -> f.handle(m -> log.info("subFlowNewYork: {}", m));
97 }
98
99 @Bean
100 public IntegrationFlow subFlowBangalore() {
101 return f -> f.handle(m -> log.info("subFlowBangalore: {}", m));
102 }
103
104 @Bean
105 public IntegrationFlow performBridge() {
106 return IntegrationFlow.from("polledChannel")
107 .bridge(e -> e.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(10)))
108 .handle(message -> {
109 log.info("performBridge: {}", message);
110 })
111 .get();
112 }
113
114 @Bean
115 public IntegrationFlow readInputChannel5_sub1() {
116 return IntegrationFlow.from("inputChannel5_sub1")
117 .handle(message -> {
118 log.info("readInputChannel5_sub1: {}", message);
119 })
120 .get();
121 }
122
123 @Bean
124 public IntegrationFlow readInputChannel5_sub2() {
125 return IntegrationFlow.from("inputChannel5_sub2")
126 .handle(message -> {
127 log.info("readInputChannel5_sub2: {}", message);
128 })
129 .get();
130 }
131
132 @Bean
133 public IntegrationFlow performDynamicBridge() {
134 List<String> cities = Arrays.asList("New York", "Bangalore", "London");
135 return IntegrationFlow.from("inputChannel6")
136 .split()
137 .route(Customer.class, m -> m.getCity(), m -> {
138 cities.forEach(city -> {
139 m.subFlowMapping(city, subFlow -> subFlow.publishSubscribeChannel(c -> {
140 c.ignoreFailures(true);
141 c.subscribe(s -> s.handle(h -> {
142 Customer customer = (Customer) h.getPayload();
143 customer.setName(customer.getName().toUpperCase());
144 log.info("Handle: {}", customer);
145 }));
146 }).bridge());
147 });
148 })
149 .aggregate()
150 .handle(m -> {
151 log.info("performDynamicBridge: {}", m);
152 })
153 .get();
154 }
155
156 @Bean
157 public MessageChannel inputChannel1() {
158 return new DirectChannel();
159 }
160
161 @Bean
162 public MessageChannel inputChannel2() {
163 return new DirectChannel();
164 }
165
166 @Bean
167 public MessageChannel inputChannel3() {
168 return new DirectChannel();
169 }
170
171 @Bean
172 public MessageChannel inputChannel4() {
173 return new DirectChannel();
174 }
175
176 @Bean
177 public MessageChannel inputChannel5() {
178 return new PublishSubscribeChannel();
179 }
180
181 @Bean
182 @BridgeFrom("inputChannel5")
183 public MessageChannel inputChannel5_sub1() {
184 return new DirectChannel();
185 }
186
187 @Bean
188 @BridgeFrom("inputChannel5")
189 public MessageChannel inputChannel5_sub2() {
190 return new DirectChannel();
191 }
192
193 @Bean
194 public MessageChannel inputChannel6() {
195 return new DirectChannel();
196 }
197
198 @Bean
199 public PollableChannel polledChannel() {
200 return new QueueChannel();
201 }
202
203 @Bean
204 public SimpleMessageStore messageStore() {
205 return new SimpleMessageStore();
206 }
207
208 @Bean
209 public MessageChannel channelA() {
210 return new DirectChannel();
211 }
212
213 @Bean
214 public MessageChannel channelB() {
215 return new DirectChannel();
216 }
217}
1package com.demo.project97.integration;
2
3import java.io.File;
4import java.util.List;
5
6import com.demo.project97.domain.Customer;
7import lombok.RequiredArgsConstructor;
8import lombok.extern.slf4j.Slf4j;
9import org.springframework.context.annotation.Bean;
10import org.springframework.integration.channel.DirectChannel;
11import org.springframework.integration.dsl.IntegrationFlow;
12import org.springframework.integration.dsl.Pollers;
13import org.springframework.integration.file.FileNameGenerator;
14import org.springframework.integration.file.dsl.Files;
15import org.springframework.integration.file.support.FileExistsMode;
16import org.springframework.messaging.Message;
17import org.springframework.messaging.MessageChannel;
18import org.springframework.messaging.support.MessageBuilder;
19import org.springframework.stereotype.Component;
20
21@Component
22@RequiredArgsConstructor
23@Slf4j
24public class FileIntegration {
25
26 private final DataTransformer dataTransformer;
27
28 /**
29 * Detects if new file present in folder, checks every 5 seconds
30 * Write the file name to fileChannel1
31 */
32 @Bean
33 public IntegrationFlow readFile(MessageChannel fileChannel1) {
34 return IntegrationFlow.from(
35 Files.inboundAdapter(new File("/tmp/src"))
36 .autoCreateDirectory(true)
37 .preventDuplicates(true)
38 .patternFilter("*.txt"),
39 e -> e.poller(Pollers.fixedRate(5000))
40 )
41 .transform(dataTransformer, "convertFileToCustomers")
42 .handle(message -> {
43 @SuppressWarnings("unchecked")
44 List<Customer> customers = (List<Customer>) message.getPayload();
45 log.info("Customers: {}", customers);
46 for (Customer c : customers) {
47 fileChannel1.send(MessageBuilder.withPayload(c).build());
48 }
49 })
50 .get();
51 }
52
53 @Bean
54 public IntegrationFlow readResultChannelWriteToFile() {
55 return IntegrationFlow.from("fileChannel2")
56 .transform(dataTransformer, "convertDbRecordToString")
57 .handle(Files.outboundAdapter(new File("/tmp/des"))
58 .autoCreateDirectory(true)
59 .fileNameGenerator(fileNameGenerator())
60 .fileExistsMode(FileExistsMode.APPEND)
61 .appendNewLine(true))
62 .get();
63 }
64
65 private FileNameGenerator fileNameGenerator() {
66 return new FileNameGenerator() {
67 @Override
68 public String generateFileName(Message<?> message) {
69 return message.getHeaders().get("file-name").toString().concat(".txt");
70 }
71 };
72 }
73
74 @Bean
75 public MessageChannel fileChannel1() {
76 return new DirectChannel();
77 }
78
79 @Bean
80 public MessageChannel fileChannel2() {
81 return new DirectChannel();
82 }
83}
1package com.demo.project97.integration;
2
3import java.time.Duration;
4import java.util.concurrent.TimeUnit;
5
6import com.demo.project97.domain.Customer;
7import jakarta.persistence.EntityManagerFactory;
8import lombok.RequiredArgsConstructor;
9import lombok.SneakyThrows;
10import lombok.extern.slf4j.Slf4j;
11import org.springframework.context.annotation.Bean;
12import org.springframework.integration.channel.DirectChannel;
13import org.springframework.integration.dsl.IntegrationFlow;
14import org.springframework.integration.dsl.Pollers;
15import org.springframework.integration.jpa.dsl.Jpa;
16import org.springframework.integration.jpa.support.PersistMode;
17import org.springframework.messaging.MessageChannel;
18import org.springframework.stereotype.Component;
19
20@Component
21@RequiredArgsConstructor
22@Slf4j
23public class JPAIntegration {
24
25 private final EntityManagerFactory entityManager;
26
27 /**
28 * Continuously reads the customer table every 10 seconds
29 */
30 @Bean
31 public IntegrationFlow readFromDbAdapter() {
32 return IntegrationFlow.from(Jpa.inboundAdapter(this.entityManager)
33 .jpaQuery("from Customer where phone is not null")
34 .maxResults(2)
35 .expectSingleResult(false)
36 .entityClass(Customer.class), e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
37 .handle(message -> {
38 log.info("readFromDbAdapter: {}", message);
39 })
40 .get();
41 }
42
43 /**
44 * Starts the flow when the id for customer is pushed to dbChannel1
45 */
46 @Bean
47 public IntegrationFlow readFromDbGateway(MessageChannel dbChannel2, MessageChannel dbChannel3) {
48 return IntegrationFlow.from("dbChannel1")
49 .handle(Jpa.retrievingGateway(this.entityManager)
50 .jpaQuery("from Customer c where c.id = :id")
51 .expectSingleResult(true)
52 .parameterExpression("id", "payload[id]"))
53 .handle(message -> {
54 log.info("readFromDbGateway: {}", message);
55 Customer payload = (Customer) message.getPayload();
56 log.info("readFromDbGateway Customer: {}", payload);
57 dbChannel2.send(message);
58 dbChannel3.send(message);
59 })
60 .get();
61 }
62
63 /**
64 * Reads dbChannel2 and updates phone number
65 * Doesnt return anything
66 */
67 @Bean
68 public IntegrationFlow updateDbAdapter() {
69 return IntegrationFlow.from("dbChannel2")
70 .handle(Jpa.outboundAdapter(this.entityManager)
71 .jpaQuery("update Customer c set c.phone = '88888' where c.id =:id")
72 .parameterExpression("id", "payload.id"), e -> e.transactional())
73 .get();
74 }
75
76 /**
77 * Reads dbChannel2 and updates phone number
78 * Doesnt return anything
79 */
80 @Bean
81 public IntegrationFlow updateDbGateway() {
82 return IntegrationFlow.from("dbChannel3")
83 .handle(Jpa.updatingGateway(this.entityManager)
84 .jpaQuery("update Customer c set c.name = CONCAT('Mr. ',c.name) where c.id =:id")
85 .parameterExpression("id", "payload.id"), e -> e.transactional())
86 .handle(message -> {
87 log.info("updateDbGateway: {}", message);
88 })
89 .get();
90 }
91
92 /**
93 * Reads dbChannel3 and deletes the customer
94 */
95 @Bean
96 public IntegrationFlow deleteRecord() {
97 return IntegrationFlow.from(Jpa.inboundAdapter(this.entityManager)
98 .jpaQuery("from Customer where name like 'Mr.%'")
99 .maxResults(2)
100 .expectSingleResult(false)
101 .entityClass(Customer.class), e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
102 .handle(Jpa.outboundAdapter(this.entityManager)
103 .persistMode(PersistMode.DELETE)
104 .parameterExpression("id", "payload.id")
105 .entityClass(Customer.class), e -> e.transactional())
106 .get();
107 }
108
109 /**
110 * Reads the fileChannel1 and persists all customers
111 */
112 @Bean
113 public IntegrationFlow readFileChannelWriteToDb() {
114 return IntegrationFlow.from("fileChannel1")
115 .handle(Jpa.outboundAdapter(this.entityManager)
116 .entityClass(Customer.class)
117 .persistMode(PersistMode.PERSIST),
118 e -> e.transactional(true))
119 .get();
120 }
121
122 /**
123 * Reads the fileChannel1 and persists all customers
124 */
125 @Bean
126 public IntegrationFlow readRabbitmqChannelUpdateDb() {
127 return IntegrationFlow.from("rabbitmqChannel1")
128 .handle(Jpa.outboundAdapter(this.entityManager)
129 .jpaQuery("update Customer c set c.phone = :phone where c.name =:name")
130 .parameterExpression("phone", "payload.phone")
131 .parameterExpression("name", "payload.name")
132 , e -> e.transactional())
133 .get();
134 }
135
136 @SneakyThrows
137 public void sleep(int seconds) {
138 TimeUnit.SECONDS.sleep(seconds);
139 }
140
141 @Bean
142 public MessageChannel dbChannel1() {
143 return new DirectChannel();
144 }
145
146 @Bean
147 public MessageChannel dbChannel2() {
148 return new DirectChannel();
149 }
150
151 @Bean
152 public MessageChannel dbChannel3() {
153 return new DirectChannel();
154 }
155
156
157}
1package com.demo.project97.integration;
2
3import com.demo.project97.domain.Customer;
4import lombok.RequiredArgsConstructor;
5import lombok.extern.slf4j.Slf4j;
6import org.springframework.amqp.AmqpRejectAndDontRequeueException;
7import org.springframework.amqp.core.Queue;
8import org.springframework.amqp.rabbit.connection.ConnectionFactory;
9import org.springframework.context.annotation.Bean;
10import org.springframework.core.AttributeAccessor;
11import org.springframework.integration.amqp.dsl.Amqp;
12import org.springframework.integration.channel.DirectChannel;
13import org.springframework.integration.dsl.IntegrationFlow;
14import org.springframework.integration.support.ErrorMessageStrategy;
15import org.springframework.messaging.MessageChannel;
16import org.springframework.messaging.support.ErrorMessage;
17import org.springframework.stereotype.Component;
18
19@Component
20@RequiredArgsConstructor
21@Slf4j
22public class RabbitMQIntegration {
23
24 private final ConnectionFactory connectionFactory;
25 private final DataTransformer dataTransformer;
26
27 @Bean
28 public IntegrationFlow readFromQueue(MessageChannel rabbitmqChannel1) {
29 return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "phone-queue")
30 .errorChannel("errorChannel")
31 .errorMessageStrategy(new RabbitMQIntegration.MyFatalExceptionStrategy()))
32 .transform(dataTransformer, "convertQueuePayloadToCustomer")
33 .handle(message -> {
34 log.info("readFromQueue: {}", message);
35 rabbitmqChannel1.send(message);
36 })
37 .get();
38 }
39
40 public static class MyFatalExceptionStrategy implements ErrorMessageStrategy {
41 @Override
42 public ErrorMessage buildErrorMessage(Throwable payload, AttributeAccessor attributes) {
43 throw new AmqpRejectAndDontRequeueException("Error In Message!");
44 }
45 }
46
47 @Bean
48 public MessageChannel rabbitmqChannel1() {
49 DirectChannel channel = new DirectChannel();
50 channel.setDatatypes(Customer.class);
51 return channel;
52 }
53
54 @Bean
55 public Queue inboundQueue() {
56 return new Queue("phone-queue", true, false, false);
57 }
58}
Postman
Import the postman collection to postman
Setup
1# Project97
2
3Spring Integration
4
5[https://gitorko.github.io/spring-integration-basics/](https://gitorko.github.io/spring-integration-basics/)
6
7### Version
8
9Check version
10
11```bash
12$java --version
13openjdk 17.0.3 2022-04-19 LTS
14```
15
16### RabbitMQ
17
18Run the docker command to start a rabbitmq instance
19
20```bash
21docker run -d --hostname my-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
22```
23
24Login to rabbitmq console [http://localhost:8080](http://localhost:8080)
25
26```
27username:guest
28password: guest
29```
30
31### Postgres DB
32
33```
34docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:9.6.10
35docker ps
36docker exec -it pg-container psql -U postgres -W postgres
37CREATE USER test WITH PASSWORD 'test@123';
38CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
39grant all PRIVILEGES ON DATABASE "test-db" to test;
40
41docker stop pg-container
42docker start pg-container
43```
44
45### Dev
46
47To run the code.
48
49```bash
50./gradlew clean build
51./gradlew bootRun
52```