Spring Integration - Basics
Overview
Spring Integration provides a framework to support Enterprise Integration Patterns.
Github: https://github.com/gitorko/project97
Spring Integration
- Messaging support - All communication is treated as asynchronous messages sent between different channels. This provides loose coupling.
- Support of external system - Adapters for ftp,file system, rabbitMQ and other external systems.
A higher level of abstraction over Spring’s support for remoting, messaging, and scheduling is provided so that developers dont have to write the code to interact with these systems but focus only on the business logic. At any point if the source changes from a file system to an ftp server, the changes required will be very minimal.
Spring integration provides different ways to configure:
- XML approach to wire
- Bean annotation approach
- Java DSL approach (We will focus on DSL approach as its easier to read and maintain)
Terminology
- Message - Wrapper that can wrap a java object, contains payload & headers
- Message Channel - A conduit for transmitting messages between producers & consumers a. Point-to-Point channel - one consumer should receive each message from a channel b. Publish/Subscribe channel - Will broadcast the message to any subscriber listening to that channel.
- Message Transform
- Message Filter
- Message Router - Determines what channel or channels (if any) should receive the message next
- Message Bridge - Connects two message channels or channel adapters
- Splitter
- Aggregator
- Handle - ServiceActivator that handle the message.
- Adapter endpoints - Provide one-way integration.
- Gateway endpoints - Provide two-way request/response integration.
Types of Message Channel:
PollableChannel - Store messages, You need to ask for messages by polling i.e call receive method
a. QueueChannel - Pollable, FIFO, Channel has multiple consumers, only one of them will receive, can be made blocking. b. PriorityChannel - Pollable, Messages to be ordered within the channel based on priority, Channel has multiple consumers, only one of them will receive, can be made blocking. c. RendezvousChannel - Pollable, Synchronous, Similar to QueueChannel but zero capacity to buffer, direct-handoff scenario, wherein a sender blocks until consumer invokes the channel’s receive() method
SubscribableChannel - Event driven, need to subscribe to get the messages. i.e call the subscribe method.
a. Direct Channel - Subscribable, single subscriber, single thread behaviour blocking the sender thread until the message is subscribed. b. Publish Subscribe Channel - Subscribable, many subscribers, all will get the message. c. FixedSubscriberChannel - Subscribable, single subscriber, subscriber that cannot be unsubscribed
ExecutorChannel - Delegates to an instance of TaskExecutor to perform the dispatch
FluxChannel - Allows reactive consumption
Code
1package com.demo.project97.integration;
2
3import java.util.Arrays;
4import java.util.List;
5
6import com.demo.project97.domain.Customer;
7import lombok.extern.slf4j.Slf4j;
8import org.springframework.context.annotation.Bean;
9import org.springframework.integration.annotation.BridgeFrom;
10import org.springframework.integration.annotation.IntegrationComponentScan;
11import org.springframework.integration.channel.DirectChannel;
12import org.springframework.integration.channel.PublishSubscribeChannel;
13import org.springframework.integration.channel.QueueChannel;
14import org.springframework.integration.dsl.IntegrationFlow;
15import org.springframework.integration.dsl.IntegrationFlows;
16import org.springframework.integration.dsl.Pollers;
17import org.springframework.integration.store.SimpleMessageStore;
18import org.springframework.messaging.MessageChannel;
19import org.springframework.messaging.PollableChannel;
20import org.springframework.stereotype.Component;
21
22@Slf4j
23@Component
24@IntegrationComponentScan
25public class BasicIntegration {
26
27 @Bean
28 public IntegrationFlow performSplit() {
29 return IntegrationFlows.from("inputChannel1")
30 .split()
31 .transform("Hello "::concat)
32 .handle(message -> {
33 log.info("performSplit: {}", message);
34 })
35 .get();
36 }
37
38 @Bean
39 public IntegrationFlow performAggregate(SimpleMessageStore messageStore) {
40 return IntegrationFlows.from("inputChannel2")
41 .split()
42 .aggregate(a ->
43 a.correlationStrategy(m -> {
44 Customer customer = (Customer) m.getPayload();
45 return customer.getCity();
46 })
47 .releaseStrategy(g -> g.size() > 2)
48 .messageStore(messageStore))
49 .handle(message -> {
50 log.info("performAggregate: {}", message);
51 })
52 .get();
53 }
54
55 @Bean
56 public IntegrationFlow performRoute() {
57 return IntegrationFlows.from("inputChannel3")
58 .split()
59 .log()
60 .route(Customer.class, m -> m.getCity(), m -> m
61 .channelMapping("New York", "channelA")
62 .channelMapping("Bangalore", "channelB"))
63 .get();
64 }
65
66 @Bean
67 public IntegrationFlow handleNewYork() {
68 return IntegrationFlows.from("channelA")
69 .handle(message -> {
70 log.info("handleNewYork: {}", message);
71 })
72 .get();
73 }
74
75 @Bean
76 public IntegrationFlow handleBangalore() {
77 return IntegrationFlows.from("channelB")
78 .handle(message -> {
79 log.info("handleBangalore: {}", message);
80 })
81 .get();
82 }
83
84 @Bean
85 public IntegrationFlow performSubFlow(IntegrationFlow subFlowNewYork, IntegrationFlow subFlowBangalore) {
86 return IntegrationFlows.from("inputChannel4")
87 .split()
88 .log()
89 .route(Customer.class, m -> m.getCity(), m -> m
90 .subFlowMapping("New York", subFlowNewYork)
91 .subFlowMapping("Bangalore", subFlowBangalore))
92 .get();
93 }
94
95 @Bean
96 public IntegrationFlow subFlowNewYork() {
97 return f -> f.handle(m -> log.info("subFlowNewYork: {}", m));
98 }
99
100 @Bean
101 public IntegrationFlow subFlowBangalore() {
102 return f -> f.handle(m -> log.info("subFlowBangalore: {}", m));
103 }
104
105 @Bean
106 public IntegrationFlow performBridge() {
107 return IntegrationFlows.from("polledChannel")
108 .bridge(e -> e.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(10)))
109 .handle(message -> {
110 log.info("performBridge: {}", message);
111 })
112 .get();
113 }
114
115 @Bean
116 public IntegrationFlow readInputChannel5_sub1() {
117 return IntegrationFlows.from("inputChannel5_sub1")
118 .handle(message -> {
119 log.info("readInputChannel5_sub1: {}", message);
120 })
121 .get();
122 }
123
124 @Bean
125 public IntegrationFlow readInputChannel5_sub2() {
126 return IntegrationFlows.from("inputChannel5_sub2")
127 .handle(message -> {
128 log.info("readInputChannel5_sub2: {}", message);
129 })
130 .get();
131 }
132
133 @Bean
134 public IntegrationFlow performDynamicBridge() {
135 List<String> cities = Arrays.asList("New York", "Bangalore", "London");
136 return IntegrationFlows.from("inputChannel6")
137 .split()
138 .route(Customer.class, m -> m.getCity(), m -> {
139 cities.forEach(city -> {
140 m.subFlowMapping(city, subFlow -> subFlow.publishSubscribeChannel(c -> {
141 c.ignoreFailures(true);
142 c.subscribe(s -> s.handle(h -> {
143 Customer customer = (Customer) h.getPayload();
144 customer.setName(customer.getName().toUpperCase());
145 log.info("Handle: {}", customer);
146 }));
147 }).bridge());
148 });
149 })
150 .aggregate()
151 .handle(m -> {
152 log.info("performDynamicBridge: {}", m);
153 })
154 .get();
155 }
156
157 @Bean
158 public MessageChannel inputChannel1() {
159 return new DirectChannel();
160 }
161
162 @Bean
163 public MessageChannel inputChannel2() {
164 return new DirectChannel();
165 }
166
167 @Bean
168 public MessageChannel inputChannel3() {
169 return new DirectChannel();
170 }
171
172 @Bean
173 public MessageChannel inputChannel4() {
174 return new DirectChannel();
175 }
176
177 @Bean
178 public MessageChannel inputChannel5() {
179 return new PublishSubscribeChannel();
180 }
181
182 @Bean
183 @BridgeFrom("inputChannel5")
184 public MessageChannel inputChannel5_sub1() {
185 return new DirectChannel();
186 }
187
188 @Bean
189 @BridgeFrom("inputChannel5")
190 public MessageChannel inputChannel5_sub2() {
191 return new DirectChannel();
192 }
193
194 @Bean
195 public MessageChannel inputChannel6() {
196 return new DirectChannel();
197 }
198
199 @Bean
200 public PollableChannel polledChannel() {
201 return new QueueChannel();
202 }
203
204 @Bean
205 public SimpleMessageStore messageStore() {
206 return new SimpleMessageStore();
207 }
208
209 @Bean
210 public MessageChannel channelA() {
211 return new DirectChannel();
212 }
213
214 @Bean
215 public MessageChannel channelB() {
216 return new DirectChannel();
217 }
218}
1package com.demo.project97.integration;
2
3import java.io.File;
4import java.util.List;
5
6import com.demo.project97.domain.Customer;
7import lombok.RequiredArgsConstructor;
8import lombok.extern.slf4j.Slf4j;
9import org.springframework.context.annotation.Bean;
10import org.springframework.integration.channel.DirectChannel;
11import org.springframework.integration.dsl.IntegrationFlow;
12import org.springframework.integration.dsl.IntegrationFlows;
13import org.springframework.integration.file.FileNameGenerator;
14import org.springframework.integration.file.dsl.Files;
15import org.springframework.integration.file.support.FileExistsMode;
16import org.springframework.messaging.Message;
17import org.springframework.messaging.MessageChannel;
18import org.springframework.messaging.support.MessageBuilder;
19import org.springframework.stereotype.Component;
20
21@Component
22@RequiredArgsConstructor
23@Slf4j
24public class FileIntegration {
25
26 private final DataTransformer dataTransformer;
27
28 /**
29 * Detects if new file present in folder, checks every 5 seconds
30 * Write the file name to fileChannel1
31 */
32 @Bean
33 public IntegrationFlow readFile(MessageChannel fileChannel1) {
34 return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/src"))
35 .autoCreateDirectory(true)
36 .preventDuplicates(true)
37 .patternFilter("*.txt")
38 .get(), poller -> poller.poller(pm -> pm.fixedRate(5000)))
39 .transform(dataTransformer, "convertFileToCustomers")
40 .handle(message -> {
41 List<Customer> customers = (List<Customer>) message.getPayload();
42 log.info("Customers: {}", customers);
43 for (Customer c: customers) {
44 fileChannel1.send(MessageBuilder.withPayload(c).build());
45 }
46 })
47 .get();
48 }
49
50 @Bean
51 public IntegrationFlow readResultChannelWriteToFile() {
52 return IntegrationFlows.from("fileChannel2")
53 .transform(dataTransformer, "convertDbRecordToString")
54 .handle(Files.outboundAdapter(new File("/tmp/des"))
55 .autoCreateDirectory(true)
56 .fileNameGenerator(fileNameGenerator())
57 .fileExistsMode(FileExistsMode.APPEND)
58 .appendNewLine(true)
59 .get())
60 .get();
61 }
62
63 private FileNameGenerator fileNameGenerator() {
64 return new FileNameGenerator() {
65 @Override
66 public String generateFileName(Message<?> message) {
67 return message.getHeaders().get("file-name").toString().concat(".txt");
68 }
69 };
70 }
71
72 @Bean
73 public MessageChannel fileChannel1() {
74 return new DirectChannel();
75 }
76
77 @Bean
78 public MessageChannel fileChannel2() {
79 return new DirectChannel();
80 }
81}
1package com.demo.project97.integration;
2
3import java.time.Duration;
4import java.util.concurrent.TimeUnit;
5import javax.persistence.EntityManagerFactory;
6
7import com.demo.project97.domain.Customer;
8import lombok.RequiredArgsConstructor;
9import lombok.SneakyThrows;
10import lombok.extern.slf4j.Slf4j;
11import org.springframework.context.annotation.Bean;
12import org.springframework.integration.channel.DirectChannel;
13import org.springframework.integration.dsl.IntegrationFlow;
14import org.springframework.integration.dsl.IntegrationFlows;
15import org.springframework.integration.dsl.Pollers;
16import org.springframework.integration.jpa.dsl.Jpa;
17import org.springframework.integration.jpa.support.PersistMode;
18import org.springframework.messaging.MessageChannel;
19import org.springframework.stereotype.Component;
20
21@Component
22@RequiredArgsConstructor
23@Slf4j
24public class JPAIntegration {
25
26 private final EntityManagerFactory entityManager;
27
28 /**
29 * Continuously reads the customer table every 10 seconds
30 */
31 @Bean
32 public IntegrationFlow readFromDbAdapter() {
33 return IntegrationFlows.from(Jpa.inboundAdapter(this.entityManager)
34 .jpaQuery("from Customer where phone is not null")
35 .maxResults(2)
36 .expectSingleResult(false)
37 .entityClass(Customer.class), e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
38 .handle(message -> {
39 log.info("readFromDbAdapter: {}", message);
40 })
41 .get();
42 }
43
44 /**
45 * Starts the flow when the id for customer is pushed to dbChannel1
46 */
47 @Bean
48 public IntegrationFlow readFromDbGateway(MessageChannel dbChannel2, MessageChannel dbChannel3) {
49 return IntegrationFlows.from("dbChannel1")
50 .handle(Jpa.retrievingGateway(this.entityManager)
51 .jpaQuery("from Customer c where c.id = :id")
52 .expectSingleResult(true)
53 .parameterExpression("id", "payload[id]"))
54 .handle(message -> {
55 log.info("readFromDbGateway: {}", message);
56 Customer payload = (Customer) message.getPayload();
57 log.info("readFromDbGateway Customer: {}", payload);
58 dbChannel2.send(message);
59 dbChannel3.send(message);
60 })
61 .get();
62 }
63
64 /**
65 * Reads dbChannel2 and updates phone number
66 * Doesnt return anything
67 */
68 @Bean
69 public IntegrationFlow updateDbAdapter() {
70 return IntegrationFlows.from("dbChannel2")
71 .handle(Jpa.outboundAdapter(this.entityManager)
72 .jpaQuery("update Customer c set c.phone = '88888' where c.id =:id")
73 .parameterExpression("id", "payload.id"), e -> e.transactional())
74 .get();
75 }
76
77 /**
78 * Reads dbChannel2 and updates phone number
79 * Doesnt return anything
80 */
81 @Bean
82 public IntegrationFlow updateDbGateway() {
83 return IntegrationFlows.from("dbChannel3")
84 .handle(Jpa.updatingGateway(this.entityManager)
85 .jpaQuery("update Customer c set c.name = CONCAT('Mr. ',c.name) where c.id =:id")
86 .parameterExpression("id", "payload.id"), e -> e.transactional())
87 .handle(message -> {
88 log.info("updateDbGateway: {}", message);
89 })
90 .get();
91 }
92
93 /**
94 * Reads dbChannel3 and deletes the customer
95 */
96 @Bean
97 public IntegrationFlow deleteRecord() {
98 return IntegrationFlows.from(Jpa.inboundAdapter(this.entityManager)
99 .jpaQuery("from Customer where name like 'Mr.%'")
100 .maxResults(2)
101 .expectSingleResult(false)
102 .entityClass(Customer.class), e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
103 .handle(Jpa.outboundAdapter(this.entityManager)
104 .persistMode(PersistMode.DELETE)
105 .parameterExpression("id", "payload.id")
106 .entityClass(Customer.class), e -> e.transactional())
107 .get();
108 }
109
110 /**
111 * Reads the fileChannel1 and persists all customers
112 */
113 @Bean
114 public IntegrationFlow readFileChannelWriteToDb() {
115 return IntegrationFlows.from("fileChannel1")
116 .handle(Jpa.outboundAdapter(this.entityManager)
117 .entityClass(Customer.class)
118 .persistMode(PersistMode.PERSIST)
119 .get(), e -> e.transactional())
120 .get();
121 }
122
123 /**
124 * Reads the fileChannel1 and persists all customers
125 */
126 @Bean
127 public IntegrationFlow readRabbitmqChannelUpdateDb() {
128 return IntegrationFlows.from("rabbitmqChannel1")
129 .handle(Jpa.outboundAdapter(this.entityManager)
130 .jpaQuery("update Customer c set c.phone = :phone where c.name =:name")
131 .parameterExpression("phone", "payload.phone")
132 .parameterExpression("name", "payload.name")
133 , e -> e.transactional())
134 .get();
135 }
136
137 @SneakyThrows
138 public void sleep(int seconds) {
139 TimeUnit.SECONDS.sleep(seconds);
140 }
141
142 @Bean
143 public MessageChannel dbChannel1() {
144 return new DirectChannel();
145 }
146
147 @Bean
148 public MessageChannel dbChannel2() {
149 return new DirectChannel();
150 }
151
152 @Bean
153 public MessageChannel dbChannel3() {
154 return new DirectChannel();
155 }
156
157
158}
1package com.demo.project97.integration;
2
3import com.demo.project97.domain.Customer;
4import lombok.RequiredArgsConstructor;
5import lombok.extern.slf4j.Slf4j;
6import org.springframework.amqp.AmqpRejectAndDontRequeueException;
7import org.springframework.amqp.core.Queue;
8import org.springframework.amqp.rabbit.connection.ConnectionFactory;
9import org.springframework.context.annotation.Bean;
10import org.springframework.core.AttributeAccessor;
11import org.springframework.integration.amqp.dsl.Amqp;
12import org.springframework.integration.channel.DirectChannel;
13import org.springframework.integration.dsl.IntegrationFlow;
14import org.springframework.integration.dsl.IntegrationFlows;
15import org.springframework.integration.support.ErrorMessageStrategy;
16import org.springframework.messaging.MessageChannel;
17import org.springframework.messaging.support.ErrorMessage;
18import org.springframework.stereotype.Component;
19
20@Component
21@RequiredArgsConstructor
22@Slf4j
23public class RabbitMQIntegration {
24
25 private final ConnectionFactory connectionFactory;
26 private final DataTransformer dataTransformer;
27
28 @Bean
29 public IntegrationFlow readFromQueue(MessageChannel rabbitmqChannel1) {
30 return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "phone-queue")
31 .errorChannel("errorChannel")
32 .errorMessageStrategy(new RabbitMQIntegration.MyFatalExceptionStrategy()))
33 .transform(dataTransformer, "convertQueuePayloadToCustomer")
34 .handle(message -> {
35 log.info("readFromQueue: {}", message);
36 rabbitmqChannel1.send(message);
37 })
38 .get();
39 }
40
41 public static class MyFatalExceptionStrategy implements ErrorMessageStrategy {
42 @Override
43 public ErrorMessage buildErrorMessage(Throwable payload, AttributeAccessor attributes) {
44 throw new AmqpRejectAndDontRequeueException("Error In Message!");
45 }
46 }
47
48 @Bean
49 public MessageChannel rabbitmqChannel1() {
50 DirectChannel channel = new DirectChannel();
51 channel.setDatatypes(Customer.class);
52 return channel;
53 }
54
55 @Bean
56 public Queue inboundQueue() {
57 return new Queue("phone-queue", true, false, false);
58 }
59}
Setup
Project97
Spring Integration
https://gitorko.github.io/spring-integration-basics/
Version
Check version
1$java --version
2openjdk 17.0.3 2022-04-19 LTS
RabbitMQ
Run the docker command to start a rabbitmq instance
1docker run -d --hostname my-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
Login to rabbitmq console http://localhost:8080
1username:guest
2password: guest
Postgres DB
1docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:9.6.10
2docker ps
3docker exec -it pg-container psql -U postgres -W postgres
4CREATE USER test WITH PASSWORD 'test@123';
5CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
6grant all PRIVILEGES ON DATABASE "test-db" to test;
7
8docker stop pg-container
9docker start pg-container
Dev
To run the code.
1./gradlew clean build
2./gradlew bootRun
Postman
Import the postman collection to postman