Spring Integration - Basics

Overview

Spring Integration provides a framework to support Enterprise Integration Patterns.

Github: https://github.com/gitorko/project97

Spring Integration

  1. Messaging support - All communication is treated as asynchronous messages sent between different channels. This provides loose coupling.
  2. Support of external system - Adapters for ftp,file system, rabbitMQ and other external systems.

A higher level of abstraction over Spring’s support for remoting, messaging, and scheduling is provided so that developers dont have to write the code to interact with these systems but focus only on the business logic. At any point if the source changes from a file system to an ftp server, the changes required will be very minimal.

Spring integration provides different ways to configure:

  1. XML approach to wire
  2. Bean annotation approach
  3. Java DSL approach (We will focus on DSL approach as its easier to read and maintain)

Terminology

  1. Inbound Adapter - Real world object -> Message
  2. Outbound Adapter - Message -> Real world object
  3. Inbound Gateway - Real world object -> Spring Integration -> Real world object
  4. Outbound Gateway - Spring Integration -> Real world object -> Spring Integration

Message can be split, route, transform, wiretap, enrich, aggregate the messages.

  1. Message - Wrapper that can wrap a java object, contains payload & headers
  2. Message Channel - A conduit for transmitting messages between producers & consumers a. Point-to-Point channel - one consumer should receive each message from a channel b. Publish/Subscribe channel - Will broadcast the message to any subscriber listening to that channel.
  3. Message Transform
  4. Message Filter
  5. Message Router - Determines what channel or channels (if any) should receive the message next
  6. Message Bridge - Connects two message channels or channel adapters
  7. Splitter
  8. Aggregator
  9. Handle - ServiceActivator that handle the message.
  10. Adapter endpoints - Provide one-way integration.
  11. Gateway endpoints - Provide two-way request/response integration.

Types of Message Channel:

  1. PollableChannel - Store messages, You need to ask for messages by polling i.e call receive method

    a. QueueChannel - Pollable, FIFO, Channel has multiple consumers, only one of them will receive, can be made blocking. b. PriorityChannel - Pollable, Messages to be ordered within the channel based on priority, Channel has multiple consumers, only one of them will receive, can be made blocking. c. RendezvousChannel - Pollable, Synchronous, Similar to QueueChannel but zero capacity to buffer, direct-handoff scenario, wherein a sender blocks until consumer invokes the channel’s receive() method

  2. SubscribableChannel - Event driven, need to subscribe to get the messages. i.e call the subscribe method.

    a. Direct Channel - Subscribable, single subscriber, single thread behaviour blocking the sender thread until the message is subscribed. b. Publish Subscribe Channel - Subscribable, many subscribers, all will get the message. c. FixedSubscriberChannel - Subscribable, single subscriber, subscriber that cannot be unsubscribed

  3. ExecutorChannel - Delegates to an instance of TaskExecutor to perform the dispatch

  4. FluxChannel - Allows reactive consumption

Code

  1package com.demo.project97.integration;
  2
  3import java.util.Arrays;
  4import java.util.List;
  5
  6import com.demo.project97.domain.Customer;
  7import lombok.extern.slf4j.Slf4j;
  8import org.springframework.context.annotation.Bean;
  9import org.springframework.integration.annotation.BridgeFrom;
 10import org.springframework.integration.annotation.IntegrationComponentScan;
 11import org.springframework.integration.channel.DirectChannel;
 12import org.springframework.integration.channel.PublishSubscribeChannel;
 13import org.springframework.integration.channel.QueueChannel;
 14import org.springframework.integration.dsl.IntegrationFlow;
 15import org.springframework.integration.dsl.Pollers;
 16import org.springframework.integration.store.SimpleMessageStore;
 17import org.springframework.messaging.MessageChannel;
 18import org.springframework.messaging.PollableChannel;
 19import org.springframework.stereotype.Component;
 20
 21@Slf4j
 22@Component
 23@IntegrationComponentScan
 24public class BasicIntegration {
 25
 26    @Bean
 27    public IntegrationFlow performSplit() {
 28        return IntegrationFlow.from("inputChannel1")
 29                .split()
 30                .transform("Hello "::concat)
 31                .handle(message -> {
 32                    log.info("performSplit: {}", message);
 33                })
 34                .get();
 35    }
 36
 37    @Bean
 38    public IntegrationFlow performAggregate(SimpleMessageStore messageStore) {
 39        return IntegrationFlow.from("inputChannel2")
 40                .split()
 41                .aggregate(a ->
 42                        a.correlationStrategy(m -> {
 43                            Customer customer = (Customer) m.getPayload();
 44                            return customer.getCity();
 45                        })
 46                                .releaseStrategy(g -> g.size() > 2)
 47                                .messageStore(messageStore))
 48                .handle(message -> {
 49                    log.info("performAggregate: {}", message);
 50                })
 51                .get();
 52    }
 53
 54    @Bean
 55    public IntegrationFlow performRoute() {
 56        return IntegrationFlow.from("inputChannel3")
 57                .split()
 58                .log()
 59                .route(Customer.class, m -> m.getCity(), m -> m
 60                        .channelMapping("New York", "channelA")
 61                        .channelMapping("Bangalore", "channelB"))
 62                .get();
 63    }
 64
 65    @Bean
 66    public IntegrationFlow handleNewYork() {
 67        return IntegrationFlow.from("channelA")
 68                .handle(message -> {
 69                    log.info("handleNewYork: {}", message);
 70                })
 71                .get();
 72    }
 73
 74    @Bean
 75    public IntegrationFlow handleBangalore() {
 76        return IntegrationFlow.from("channelB")
 77                .handle(message -> {
 78                    log.info("handleBangalore: {}", message);
 79                })
 80                .get();
 81    }
 82
 83    @Bean
 84    public IntegrationFlow performSubFlow(IntegrationFlow subFlowNewYork, IntegrationFlow subFlowBangalore) {
 85        return IntegrationFlow.from("inputChannel4")
 86                .split()
 87                .log()
 88                .route(Customer.class, m -> m.getCity(), m -> m
 89                        .subFlowMapping("New York", subFlowNewYork)
 90                        .subFlowMapping("Bangalore", subFlowBangalore))
 91                .get();
 92    }
 93
 94    @Bean
 95    public IntegrationFlow subFlowNewYork() {
 96        return f -> f.handle(m -> log.info("subFlowNewYork: {}", m));
 97    }
 98
 99    @Bean
100    public IntegrationFlow subFlowBangalore() {
101        return f -> f.handle(m -> log.info("subFlowBangalore: {}", m));
102    }
103
104    @Bean
105    public IntegrationFlow performBridge() {
106        return IntegrationFlow.from("polledChannel")
107                .bridge(e -> e.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(10)))
108                .handle(message -> {
109                    log.info("performBridge: {}", message);
110                })
111                .get();
112    }
113
114    @Bean
115    public IntegrationFlow readInputChannel5_sub1() {
116        return IntegrationFlow.from("inputChannel5_sub1")
117                .handle(message -> {
118                    log.info("readInputChannel5_sub1: {}", message);
119                })
120                .get();
121    }
122
123    @Bean
124    public IntegrationFlow readInputChannel5_sub2() {
125        return IntegrationFlow.from("inputChannel5_sub2")
126                .handle(message -> {
127                    log.info("readInputChannel5_sub2: {}", message);
128                })
129                .get();
130    }
131
132    @Bean
133    public IntegrationFlow performDynamicBridge() {
134        List<String> cities = Arrays.asList("New York", "Bangalore", "London");
135        return IntegrationFlow.from("inputChannel6")
136                .split()
137                .route(Customer.class, m -> m.getCity(), m -> {
138                    cities.forEach(city -> {
139                        m.subFlowMapping(city, subFlow -> subFlow.publishSubscribeChannel(c -> {
140                            c.ignoreFailures(true);
141                            c.subscribe(s -> s.handle(h -> {
142                                Customer customer = (Customer) h.getPayload();
143                                customer.setName(customer.getName().toUpperCase());
144                                log.info("Handle: {}", customer);
145                            }));
146                        }).bridge());
147                    });
148                })
149                .aggregate()
150                .handle(m -> {
151                    log.info("performDynamicBridge: {}", m);
152                })
153                .get();
154    }
155
156    @Bean
157    public MessageChannel inputChannel1() {
158        return new DirectChannel();
159    }
160
161    @Bean
162    public MessageChannel inputChannel2() {
163        return new DirectChannel();
164    }
165
166    @Bean
167    public MessageChannel inputChannel3() {
168        return new DirectChannel();
169    }
170
171    @Bean
172    public MessageChannel inputChannel4() {
173        return new DirectChannel();
174    }
175
176    @Bean
177    public MessageChannel inputChannel5() {
178        return new PublishSubscribeChannel();
179    }
180
181    @Bean
182    @BridgeFrom("inputChannel5")
183    public MessageChannel inputChannel5_sub1() {
184        return new DirectChannel();
185    }
186
187    @Bean
188    @BridgeFrom("inputChannel5")
189    public MessageChannel inputChannel5_sub2() {
190        return new DirectChannel();
191    }
192
193    @Bean
194    public MessageChannel inputChannel6() {
195        return new DirectChannel();
196    }
197
198    @Bean
199    public PollableChannel polledChannel() {
200        return new QueueChannel();
201    }
202
203    @Bean
204    public SimpleMessageStore messageStore() {
205        return new SimpleMessageStore();
206    }
207
208    @Bean
209    public MessageChannel channelA() {
210        return new DirectChannel();
211    }
212
213    @Bean
214    public MessageChannel channelB() {
215        return new DirectChannel();
216    }
217}
 1package com.demo.project97.integration;
 2
 3import java.io.File;
 4import java.util.List;
 5
 6import com.demo.project97.domain.Customer;
 7import lombok.RequiredArgsConstructor;
 8import lombok.extern.slf4j.Slf4j;
 9import org.springframework.context.annotation.Bean;
10import org.springframework.integration.channel.DirectChannel;
11import org.springframework.integration.dsl.IntegrationFlow;
12import org.springframework.integration.dsl.Pollers;
13import org.springframework.integration.file.FileNameGenerator;
14import org.springframework.integration.file.dsl.Files;
15import org.springframework.integration.file.support.FileExistsMode;
16import org.springframework.messaging.Message;
17import org.springframework.messaging.MessageChannel;
18import org.springframework.messaging.support.MessageBuilder;
19import org.springframework.stereotype.Component;
20
21@Component
22@RequiredArgsConstructor
23@Slf4j
24public class FileIntegration {
25
26    private final DataTransformer dataTransformer;
27
28    /**
29     * Detects if new file present in folder, checks every 5 seconds
30     * Write the file name to fileChannel1
31     */
32    @Bean
33    public IntegrationFlow readFile(MessageChannel fileChannel1) {
34        return IntegrationFlow.from(
35                        Files.inboundAdapter(new File("/tmp/src"))
36                                .autoCreateDirectory(true)
37                                .preventDuplicates(true)
38                                .patternFilter("*.txt"),
39                        e -> e.poller(Pollers.fixedRate(5000))
40                )
41                .transform(dataTransformer, "convertFileToCustomers")
42                .handle(message -> {
43                    @SuppressWarnings("unchecked")
44                    List<Customer> customers = (List<Customer>) message.getPayload();
45                    log.info("Customers: {}", customers);
46                    for (Customer c : customers) {
47                        fileChannel1.send(MessageBuilder.withPayload(c).build());
48                    }
49                })
50                .get();
51    }
52
53    @Bean
54    public IntegrationFlow readResultChannelWriteToFile() {
55        return IntegrationFlow.from("fileChannel2")
56                .transform(dataTransformer, "convertDbRecordToString")
57                .handle(Files.outboundAdapter(new File("/tmp/des"))
58                        .autoCreateDirectory(true)
59                        .fileNameGenerator(fileNameGenerator())
60                        .fileExistsMode(FileExistsMode.APPEND)
61                        .appendNewLine(true))
62                .get();
63    }
64
65    private FileNameGenerator fileNameGenerator() {
66        return new FileNameGenerator() {
67            @Override
68            public String generateFileName(Message<?> message) {
69                return message.getHeaders().get("file-name").toString().concat(".txt");
70            }
71        };
72    }
73
74    @Bean
75    public MessageChannel fileChannel1() {
76        return new DirectChannel();
77    }
78
79    @Bean
80    public MessageChannel fileChannel2() {
81        return new DirectChannel();
82    }
83}
  1package com.demo.project97.integration;
  2
  3import java.time.Duration;
  4import java.util.concurrent.TimeUnit;
  5
  6import com.demo.project97.domain.Customer;
  7import jakarta.persistence.EntityManagerFactory;
  8import lombok.RequiredArgsConstructor;
  9import lombok.SneakyThrows;
 10import lombok.extern.slf4j.Slf4j;
 11import org.springframework.context.annotation.Bean;
 12import org.springframework.integration.channel.DirectChannel;
 13import org.springframework.integration.dsl.IntegrationFlow;
 14import org.springframework.integration.dsl.Pollers;
 15import org.springframework.integration.jpa.dsl.Jpa;
 16import org.springframework.integration.jpa.support.PersistMode;
 17import org.springframework.messaging.MessageChannel;
 18import org.springframework.stereotype.Component;
 19
 20@Component
 21@RequiredArgsConstructor
 22@Slf4j
 23public class JPAIntegration {
 24
 25    private final EntityManagerFactory entityManager;
 26
 27    /**
 28     * Continuously reads the customer table every 10 seconds
 29     */
 30    @Bean
 31    public IntegrationFlow readFromDbAdapter() {
 32        return IntegrationFlow.from(Jpa.inboundAdapter(this.entityManager)
 33                        .jpaQuery("from Customer where phone is not null")
 34                        .maxResults(2)
 35                        .expectSingleResult(false)
 36                        .entityClass(Customer.class), e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
 37                .handle(message -> {
 38                    log.info("readFromDbAdapter: {}", message);
 39                })
 40                .get();
 41    }
 42
 43    /**
 44     * Starts the flow when the id for customer is pushed to dbChannel1
 45     */
 46    @Bean
 47    public IntegrationFlow readFromDbGateway(MessageChannel dbChannel2, MessageChannel dbChannel3) {
 48        return IntegrationFlow.from("dbChannel1")
 49                .handle(Jpa.retrievingGateway(this.entityManager)
 50                        .jpaQuery("from Customer c where c.id = :id")
 51                        .expectSingleResult(true)
 52                        .parameterExpression("id", "payload[id]"))
 53                .handle(message -> {
 54                    log.info("readFromDbGateway: {}", message);
 55                    Customer payload = (Customer) message.getPayload();
 56                    log.info("readFromDbGateway Customer: {}", payload);
 57                    dbChannel2.send(message);
 58                    dbChannel3.send(message);
 59                })
 60                .get();
 61    }
 62
 63    /**
 64     * Reads dbChannel2 and updates phone number
 65     * Doesnt return anything
 66     */
 67    @Bean
 68    public IntegrationFlow updateDbAdapter() {
 69        return IntegrationFlow.from("dbChannel2")
 70                .handle(Jpa.outboundAdapter(this.entityManager)
 71                        .jpaQuery("update Customer c set c.phone = '88888' where c.id =:id")
 72                        .parameterExpression("id", "payload.id"), e -> e.transactional())
 73                .get();
 74    }
 75
 76    /**
 77     * Reads dbChannel2 and updates phone number
 78     * Doesnt return anything
 79     */
 80    @Bean
 81    public IntegrationFlow updateDbGateway() {
 82        return IntegrationFlow.from("dbChannel3")
 83                .handle(Jpa.updatingGateway(this.entityManager)
 84                        .jpaQuery("update Customer c set c.name = CONCAT('Mr. ',c.name) where c.id =:id")
 85                        .parameterExpression("id", "payload.id"), e -> e.transactional())
 86                .handle(message -> {
 87                    log.info("updateDbGateway: {}", message);
 88                })
 89                .get();
 90    }
 91
 92    /**
 93     * Reads dbChannel3 and deletes the customer
 94     */
 95    @Bean
 96    public IntegrationFlow deleteRecord() {
 97        return IntegrationFlow.from(Jpa.inboundAdapter(this.entityManager)
 98                        .jpaQuery("from Customer where name like 'Mr.%'")
 99                        .maxResults(2)
100                        .expectSingleResult(false)
101                        .entityClass(Customer.class), e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
102                .handle(Jpa.outboundAdapter(this.entityManager)
103                        .persistMode(PersistMode.DELETE)
104                        .parameterExpression("id", "payload.id")
105                        .entityClass(Customer.class), e -> e.transactional())
106                .get();
107    }
108
109    /**
110     * Reads the fileChannel1 and persists all customers
111     */
112    @Bean
113    public IntegrationFlow readFileChannelWriteToDb() {
114        return IntegrationFlow.from("fileChannel1")
115                .handle(Jpa.outboundAdapter(this.entityManager)
116                                .entityClass(Customer.class)
117                                .persistMode(PersistMode.PERSIST),
118                        e -> e.transactional(true))
119                .get();
120    }
121
122    /**
123     * Reads the fileChannel1 and persists all customers
124     */
125    @Bean
126    public IntegrationFlow readRabbitmqChannelUpdateDb() {
127        return IntegrationFlow.from("rabbitmqChannel1")
128                .handle(Jpa.outboundAdapter(this.entityManager)
129                                .jpaQuery("update Customer c set c.phone = :phone where c.name =:name")
130                                .parameterExpression("phone", "payload.phone")
131                                .parameterExpression("name", "payload.name")
132                        , e -> e.transactional())
133                .get();
134    }
135
136    @SneakyThrows
137    public void sleep(int seconds) {
138        TimeUnit.SECONDS.sleep(seconds);
139    }
140
141    @Bean
142    public MessageChannel dbChannel1() {
143        return new DirectChannel();
144    }
145
146    @Bean
147    public MessageChannel dbChannel2() {
148        return new DirectChannel();
149    }
150
151    @Bean
152    public MessageChannel dbChannel3() {
153        return new DirectChannel();
154    }
155
156
157}
 1package com.demo.project97.integration;
 2
 3import com.demo.project97.domain.Customer;
 4import lombok.RequiredArgsConstructor;
 5import lombok.extern.slf4j.Slf4j;
 6import org.springframework.amqp.AmqpRejectAndDontRequeueException;
 7import org.springframework.amqp.core.Queue;
 8import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 9import org.springframework.context.annotation.Bean;
10import org.springframework.core.AttributeAccessor;
11import org.springframework.integration.amqp.dsl.Amqp;
12import org.springframework.integration.channel.DirectChannel;
13import org.springframework.integration.dsl.IntegrationFlow;
14import org.springframework.integration.support.ErrorMessageStrategy;
15import org.springframework.messaging.MessageChannel;
16import org.springframework.messaging.support.ErrorMessage;
17import org.springframework.stereotype.Component;
18
19@Component
20@RequiredArgsConstructor
21@Slf4j
22public class RabbitMQIntegration {
23
24    private final ConnectionFactory connectionFactory;
25    private final DataTransformer dataTransformer;
26
27    @Bean
28    public IntegrationFlow readFromQueue(MessageChannel rabbitmqChannel1) {
29        return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "phone-queue")
30                .errorChannel("errorChannel")
31                .errorMessageStrategy(new RabbitMQIntegration.MyFatalExceptionStrategy()))
32                .transform(dataTransformer, "convertQueuePayloadToCustomer")
33                .handle(message -> {
34                    log.info("readFromQueue: {}", message);
35                    rabbitmqChannel1.send(message);
36                })
37                .get();
38    }
39
40    public static class MyFatalExceptionStrategy implements ErrorMessageStrategy {
41        @Override
42        public ErrorMessage buildErrorMessage(Throwable payload, AttributeAccessor attributes) {
43            throw new AmqpRejectAndDontRequeueException("Error In Message!");
44        }
45    }
46
47    @Bean
48    public MessageChannel rabbitmqChannel1() {
49        DirectChannel channel = new DirectChannel();
50        channel.setDatatypes(Customer.class);
51        return channel;
52    }
53
54    @Bean
55    public Queue inboundQueue() {
56        return new Queue("phone-queue", true, false, false);
57    }
58}

Postman

Import the postman collection to postman

Postman Collection

Setup

 1# Project97
 2
 3Spring Integration
 4
 5[https://gitorko.github.io/spring-integration-basics/](https://gitorko.github.io/spring-integration-basics/)
 6
 7### Version
 8
 9Check version
10
11```bash
12$java --version
13openjdk 17.0.3 2022-04-19 LTS
14```
15
16### RabbitMQ
17
18Run the docker command to start a rabbitmq instance
19
20```bash
21docker run -d --hostname my-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
22```
23
24Login to rabbitmq console [http://localhost:8080](http://localhost:8080)
25
26```
27username:guest
28password: guest
29```
30
31### Postgres DB
32
33```
34docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:9.6.10
35docker ps
36docker exec -it pg-container psql -U postgres -W postgres
37CREATE USER test WITH PASSWORD 'test@123';
38CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
39grant all PRIVILEGES ON DATABASE "test-db" to test;
40
41docker stop pg-container
42docker start pg-container
43```
44
45### Dev
46
47To run the code.
48
49```bash
50./gradlew clean build
51./gradlew bootRun
52```

References

https://spring.io/projects/spring-integration

https://www.enterpriseintegrationpatterns.com/

comments powered by Disqus