Spring Integration - Basics

Overview

Spring Integration provides a framework to support Enterprise Integration Patterns.

Github: https://github.com/gitorko/project97

Spring Integration

  1. Messaging support - All communication is treated as asynchronous messages sent between different channels. This provides loose coupling.
  2. Support of external system - Adapters for ftp,file system, rabbitMQ and other external systems.

A higher level of abstraction over Spring’s support for remoting, messaging, and scheduling is provided so that developers dont have to write the code to interact with these systems but focus only on the business logic. At any point if the source changes from a file system to an ftp server, the changes required will be very minimal.

Spring integration provides different ways to configure:

  1. XML approach to wire
  2. Bean annotation approach
  3. Java DSL approach (We will focus on DSL approach as its easier to read and maintain)

Terminology

  1. Message - Wrapper that can wrap a java object, contains payload & headers
  2. Message Channel - A conduit for transmitting messages between producers & consumers a. Point-to-Point channel - one consumer should receive each message from a channel b. Publish/Subscribe channel - Will broadcast the message to any subscriber listening to that channel.
  3. Message Transform
  4. Message Filter
  5. Message Router - Determines what channel or channels (if any) should receive the message next
  6. Message Bridge - Connects two message channels or channel adapters
  7. Splitter
  8. Aggregator
  9. Handle - ServiceActivator that handle the message.
  10. Adapter endpoints - Provide one-way integration.
  11. Gateway endpoints - Provide two-way request/response integration.

Types of Message Channel:

  1. PollableChannel - Store messages, You need to ask for messages by polling i.e call receive method

    a. QueueChannel - Pollable, FIFO, Channel has multiple consumers, only one of them will receive, can be made blocking. b. PriorityChannel - Pollable, Messages to be ordered within the channel based on priority, Channel has multiple consumers, only one of them will receive, can be made blocking. c. RendezvousChannel - Pollable, Synchronous, Similar to QueueChannel but zero capacity to buffer, direct-handoff scenario, wherein a sender blocks until consumer invokes the channel’s receive() method

  2. SubscribableChannel - Event driven, need to subscribe to get the messages. i.e call the subscribe method.

    a. Direct Channel - Subscribable, single subscriber, single thread behaviour blocking the sender thread until the message is subscribed. b. Publish Subscribe Channel - Subscribable, many subscribers, all will get the message. c. FixedSubscriberChannel - Subscribable, single subscriber, subscriber that cannot be unsubscribed

  3. ExecutorChannel - Delegates to an instance of TaskExecutor to perform the dispatch

  4. FluxChannel - Allows reactive consumption

Code

  1package com.demo.project97.integration;
  2
  3import java.util.Arrays;
  4import java.util.List;
  5
  6import com.demo.project97.domain.Customer;
  7import lombok.extern.slf4j.Slf4j;
  8import org.springframework.context.annotation.Bean;
  9import org.springframework.integration.annotation.BridgeFrom;
 10import org.springframework.integration.annotation.IntegrationComponentScan;
 11import org.springframework.integration.channel.DirectChannel;
 12import org.springframework.integration.channel.PublishSubscribeChannel;
 13import org.springframework.integration.channel.QueueChannel;
 14import org.springframework.integration.dsl.IntegrationFlow;
 15import org.springframework.integration.dsl.IntegrationFlows;
 16import org.springframework.integration.dsl.Pollers;
 17import org.springframework.integration.store.SimpleMessageStore;
 18import org.springframework.messaging.MessageChannel;
 19import org.springframework.messaging.PollableChannel;
 20import org.springframework.stereotype.Component;
 21
 22@Slf4j
 23@Component
 24@IntegrationComponentScan
 25public class BasicIntegration {
 26
 27    @Bean
 28    public IntegrationFlow performSplit() {
 29        return IntegrationFlows.from("inputChannel1")
 30                .split()
 31                .transform("Hello "::concat)
 32                .handle(message -> {
 33                    log.info("performSplit: {}", message);
 34                })
 35                .get();
 36    }
 37
 38    @Bean
 39    public IntegrationFlow performAggregate(SimpleMessageStore messageStore) {
 40        return IntegrationFlows.from("inputChannel2")
 41                .split()
 42                .aggregate(a ->
 43                        a.correlationStrategy(m -> {
 44                            Customer customer = (Customer) m.getPayload();
 45                            return customer.getCity();
 46                        })
 47                                .releaseStrategy(g -> g.size() > 2)
 48                                .messageStore(messageStore))
 49                .handle(message -> {
 50                    log.info("performAggregate: {}", message);
 51                })
 52                .get();
 53    }
 54
 55    @Bean
 56    public IntegrationFlow performRoute() {
 57        return IntegrationFlows.from("inputChannel3")
 58                .split()
 59                .log()
 60                .route(Customer.class, m -> m.getCity(), m -> m
 61                        .channelMapping("New York", "channelA")
 62                        .channelMapping("Bangalore", "channelB"))
 63                .get();
 64    }
 65
 66    @Bean
 67    public IntegrationFlow handleNewYork() {
 68        return IntegrationFlows.from("channelA")
 69                .handle(message -> {
 70                    log.info("handleNewYork: {}", message);
 71                })
 72                .get();
 73    }
 74
 75    @Bean
 76    public IntegrationFlow handleBangalore() {
 77        return IntegrationFlows.from("channelB")
 78                .handle(message -> {
 79                    log.info("handleBangalore: {}", message);
 80                })
 81                .get();
 82    }
 83
 84    @Bean
 85    public IntegrationFlow performSubFlow(IntegrationFlow subFlowNewYork, IntegrationFlow subFlowBangalore) {
 86        return IntegrationFlows.from("inputChannel4")
 87                .split()
 88                .log()
 89                .route(Customer.class, m -> m.getCity(), m -> m
 90                        .subFlowMapping("New York", subFlowNewYork)
 91                        .subFlowMapping("Bangalore", subFlowBangalore))
 92                .get();
 93    }
 94
 95    @Bean
 96    public IntegrationFlow subFlowNewYork() {
 97        return f -> f.handle(m -> log.info("subFlowNewYork: {}", m));
 98    }
 99
100    @Bean
101    public IntegrationFlow subFlowBangalore() {
102        return f -> f.handle(m -> log.info("subFlowBangalore: {}", m));
103    }
104
105    @Bean
106    public IntegrationFlow performBridge() {
107        return IntegrationFlows.from("polledChannel")
108                .bridge(e -> e.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(10)))
109                .handle(message -> {
110                    log.info("performBridge: {}", message);
111                })
112                .get();
113    }
114
115    @Bean
116    public IntegrationFlow readInputChannel5_sub1() {
117        return IntegrationFlows.from("inputChannel5_sub1")
118                .handle(message -> {
119                    log.info("readInputChannel5_sub1: {}", message);
120                })
121                .get();
122    }
123
124    @Bean
125    public IntegrationFlow readInputChannel5_sub2() {
126        return IntegrationFlows.from("inputChannel5_sub2")
127                .handle(message -> {
128                    log.info("readInputChannel5_sub2: {}", message);
129                })
130                .get();
131    }
132
133    @Bean
134    public IntegrationFlow performDynamicBridge() {
135        List<String> cities = Arrays.asList("New York", "Bangalore", "London");
136        return IntegrationFlows.from("inputChannel6")
137                .split()
138                .route(Customer.class, m -> m.getCity(), m -> {
139                    cities.forEach(city -> {
140                        m.subFlowMapping(city, subFlow -> subFlow.publishSubscribeChannel(c -> {
141                            c.ignoreFailures(true);
142                            c.subscribe(s -> s.handle(h -> {
143                                Customer customer = (Customer) h.getPayload();
144                                customer.setName(customer.getName().toUpperCase());
145                                log.info("Handle: {}", customer);
146                            }));
147                        }).bridge());
148                    });
149                })
150                .aggregate()
151                .handle(m -> {
152                    log.info("performDynamicBridge: {}", m);
153                })
154                .get();
155    }
156
157    @Bean
158    public MessageChannel inputChannel1() {
159        return new DirectChannel();
160    }
161
162    @Bean
163    public MessageChannel inputChannel2() {
164        return new DirectChannel();
165    }
166
167    @Bean
168    public MessageChannel inputChannel3() {
169        return new DirectChannel();
170    }
171
172    @Bean
173    public MessageChannel inputChannel4() {
174        return new DirectChannel();
175    }
176
177    @Bean
178    public MessageChannel inputChannel5() {
179        return new PublishSubscribeChannel();
180    }
181
182    @Bean
183    @BridgeFrom("inputChannel5")
184    public MessageChannel inputChannel5_sub1() {
185        return new DirectChannel();
186    }
187
188    @Bean
189    @BridgeFrom("inputChannel5")
190    public MessageChannel inputChannel5_sub2() {
191        return new DirectChannel();
192    }
193
194    @Bean
195    public MessageChannel inputChannel6() {
196        return new DirectChannel();
197    }
198
199    @Bean
200    public PollableChannel polledChannel() {
201        return new QueueChannel();
202    }
203
204    @Bean
205    public SimpleMessageStore messageStore() {
206        return new SimpleMessageStore();
207    }
208
209    @Bean
210    public MessageChannel channelA() {
211        return new DirectChannel();
212    }
213
214    @Bean
215    public MessageChannel channelB() {
216        return new DirectChannel();
217    }
218}
 1package com.demo.project97.integration;
 2
 3import java.io.File;
 4import java.util.List;
 5
 6import com.demo.project97.domain.Customer;
 7import lombok.RequiredArgsConstructor;
 8import lombok.extern.slf4j.Slf4j;
 9import org.springframework.context.annotation.Bean;
10import org.springframework.integration.channel.DirectChannel;
11import org.springframework.integration.dsl.IntegrationFlow;
12import org.springframework.integration.dsl.IntegrationFlows;
13import org.springframework.integration.file.FileNameGenerator;
14import org.springframework.integration.file.dsl.Files;
15import org.springframework.integration.file.support.FileExistsMode;
16import org.springframework.messaging.Message;
17import org.springframework.messaging.MessageChannel;
18import org.springframework.messaging.support.MessageBuilder;
19import org.springframework.stereotype.Component;
20
21@Component
22@RequiredArgsConstructor
23@Slf4j
24public class FileIntegration {
25
26    private final DataTransformer dataTransformer;
27
28    /**
29     * Detects if new file present in folder, checks every 5 seconds
30     * Write the file name to fileChannel1
31     */
32    @Bean
33    public IntegrationFlow readFile(MessageChannel fileChannel1) {
34        return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/src"))
35                .autoCreateDirectory(true)
36                .preventDuplicates(true)
37                .patternFilter("*.txt")
38                .get(), poller -> poller.poller(pm -> pm.fixedRate(5000)))
39                .transform(dataTransformer, "convertFileToCustomers")
40                .handle(message -> {
41                    List<Customer> customers = (List<Customer>) message.getPayload();
42                    log.info("Customers: {}", customers);
43                    for (Customer c: customers) {
44                        fileChannel1.send(MessageBuilder.withPayload(c).build());
45                    }
46                })
47                .get();
48    }
49
50    @Bean
51    public IntegrationFlow readResultChannelWriteToFile() {
52        return IntegrationFlows.from("fileChannel2")
53                .transform(dataTransformer, "convertDbRecordToString")
54                .handle(Files.outboundAdapter(new File("/tmp/des"))
55                        .autoCreateDirectory(true)
56                        .fileNameGenerator(fileNameGenerator())
57                        .fileExistsMode(FileExistsMode.APPEND)
58                        .appendNewLine(true)
59                        .get())
60                .get();
61    }
62
63    private FileNameGenerator fileNameGenerator() {
64        return new FileNameGenerator() {
65            @Override
66            public String generateFileName(Message<?> message) {
67                return message.getHeaders().get("file-name").toString().concat(".txt");
68            }
69        };
70    }
71
72    @Bean
73    public MessageChannel fileChannel1() {
74        return new DirectChannel();
75    }
76
77    @Bean
78    public MessageChannel fileChannel2() {
79        return new DirectChannel();
80    }
81}
  1package com.demo.project97.integration;
  2
  3import java.time.Duration;
  4import java.util.concurrent.TimeUnit;
  5import javax.persistence.EntityManagerFactory;
  6
  7import com.demo.project97.domain.Customer;
  8import lombok.RequiredArgsConstructor;
  9import lombok.SneakyThrows;
 10import lombok.extern.slf4j.Slf4j;
 11import org.springframework.context.annotation.Bean;
 12import org.springframework.integration.channel.DirectChannel;
 13import org.springframework.integration.dsl.IntegrationFlow;
 14import org.springframework.integration.dsl.IntegrationFlows;
 15import org.springframework.integration.dsl.Pollers;
 16import org.springframework.integration.jpa.dsl.Jpa;
 17import org.springframework.integration.jpa.support.PersistMode;
 18import org.springframework.messaging.MessageChannel;
 19import org.springframework.stereotype.Component;
 20
 21@Component
 22@RequiredArgsConstructor
 23@Slf4j
 24public class JPAIntegration {
 25
 26    private final EntityManagerFactory entityManager;
 27
 28    /**
 29     * Continuously reads the customer table every 10 seconds
 30     */
 31    @Bean
 32    public IntegrationFlow readFromDbAdapter() {
 33        return IntegrationFlows.from(Jpa.inboundAdapter(this.entityManager)
 34                .jpaQuery("from Customer where phone is not null")
 35                .maxResults(2)
 36                .expectSingleResult(false)
 37                .entityClass(Customer.class), e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
 38                .handle(message -> {
 39                    log.info("readFromDbAdapter: {}", message);
 40                })
 41                .get();
 42    }
 43
 44    /**
 45     * Starts the flow when the id for customer is pushed to dbChannel1
 46     */
 47    @Bean
 48    public IntegrationFlow readFromDbGateway(MessageChannel dbChannel2, MessageChannel dbChannel3) {
 49        return IntegrationFlows.from("dbChannel1")
 50                .handle(Jpa.retrievingGateway(this.entityManager)
 51                        .jpaQuery("from Customer c where c.id = :id")
 52                        .expectSingleResult(true)
 53                        .parameterExpression("id", "payload[id]"))
 54                .handle(message -> {
 55                    log.info("readFromDbGateway: {}", message);
 56                    Customer payload = (Customer) message.getPayload();
 57                    log.info("readFromDbGateway Customer: {}", payload);
 58                    dbChannel2.send(message);
 59                    dbChannel3.send(message);
 60                })
 61                .get();
 62    }
 63
 64    /**
 65     * Reads dbChannel2 and updates phone number
 66     * Doesnt return anything
 67     */
 68    @Bean
 69    public IntegrationFlow updateDbAdapter() {
 70        return IntegrationFlows.from("dbChannel2")
 71                .handle(Jpa.outboundAdapter(this.entityManager)
 72                        .jpaQuery("update Customer c set c.phone = '88888' where c.id =:id")
 73                        .parameterExpression("id", "payload.id"), e -> e.transactional())
 74                .get();
 75    }
 76
 77    /**
 78     * Reads dbChannel2 and updates phone number
 79     * Doesnt return anything
 80     */
 81    @Bean
 82    public IntegrationFlow updateDbGateway() {
 83        return IntegrationFlows.from("dbChannel3")
 84                .handle(Jpa.updatingGateway(this.entityManager)
 85                        .jpaQuery("update Customer c set c.name = CONCAT('Mr. ',c.name) where c.id =:id")
 86                        .parameterExpression("id", "payload.id"), e -> e.transactional())
 87                .handle(message -> {
 88                    log.info("updateDbGateway: {}", message);
 89                })
 90                .get();
 91    }
 92
 93    /**
 94     * Reads dbChannel3 and deletes the customer
 95     */
 96    @Bean
 97    public IntegrationFlow deleteRecord() {
 98        return IntegrationFlows.from(Jpa.inboundAdapter(this.entityManager)
 99                .jpaQuery("from Customer where name like 'Mr.%'")
100                .maxResults(2)
101                .expectSingleResult(false)
102                .entityClass(Customer.class), e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
103                .handle(Jpa.outboundAdapter(this.entityManager)
104                        .persistMode(PersistMode.DELETE)
105                        .parameterExpression("id", "payload.id")
106                        .entityClass(Customer.class), e -> e.transactional())
107                .get();
108    }
109
110    /**
111     * Reads the fileChannel1 and persists all customers
112     */
113    @Bean
114    public IntegrationFlow readFileChannelWriteToDb() {
115        return IntegrationFlows.from("fileChannel1")
116                .handle(Jpa.outboundAdapter(this.entityManager)
117                        .entityClass(Customer.class)
118                        .persistMode(PersistMode.PERSIST)
119                        .get(), e -> e.transactional())
120                .get();
121    }
122
123    /**
124     * Reads the fileChannel1 and persists all customers
125     */
126    @Bean
127    public IntegrationFlow readRabbitmqChannelUpdateDb() {
128        return IntegrationFlows.from("rabbitmqChannel1")
129                .handle(Jpa.outboundAdapter(this.entityManager)
130                                .jpaQuery("update Customer c set c.phone = :phone where c.name =:name")
131                                .parameterExpression("phone", "payload.phone")
132                                .parameterExpression("name", "payload.name")
133                        , e -> e.transactional())
134                .get();
135    }
136
137    @SneakyThrows
138    public void sleep(int seconds) {
139        TimeUnit.SECONDS.sleep(seconds);
140    }
141
142    @Bean
143    public MessageChannel dbChannel1() {
144        return new DirectChannel();
145    }
146
147    @Bean
148    public MessageChannel dbChannel2() {
149        return new DirectChannel();
150    }
151
152    @Bean
153    public MessageChannel dbChannel3() {
154        return new DirectChannel();
155    }
156
157
158}
 1package com.demo.project97.integration;
 2
 3import com.demo.project97.domain.Customer;
 4import lombok.RequiredArgsConstructor;
 5import lombok.extern.slf4j.Slf4j;
 6import org.springframework.amqp.AmqpRejectAndDontRequeueException;
 7import org.springframework.amqp.core.Queue;
 8import org.springframework.amqp.rabbit.connection.ConnectionFactory;
 9import org.springframework.context.annotation.Bean;
10import org.springframework.core.AttributeAccessor;
11import org.springframework.integration.amqp.dsl.Amqp;
12import org.springframework.integration.channel.DirectChannel;
13import org.springframework.integration.dsl.IntegrationFlow;
14import org.springframework.integration.dsl.IntegrationFlows;
15import org.springframework.integration.support.ErrorMessageStrategy;
16import org.springframework.messaging.MessageChannel;
17import org.springframework.messaging.support.ErrorMessage;
18import org.springframework.stereotype.Component;
19
20@Component
21@RequiredArgsConstructor
22@Slf4j
23public class RabbitMQIntegration {
24
25    private final ConnectionFactory connectionFactory;
26    private final DataTransformer dataTransformer;
27
28    @Bean
29    public IntegrationFlow readFromQueue(MessageChannel rabbitmqChannel1) {
30        return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "phone-queue")
31                .errorChannel("errorChannel")
32                .errorMessageStrategy(new RabbitMQIntegration.MyFatalExceptionStrategy()))
33                .transform(dataTransformer, "convertQueuePayloadToCustomer")
34                .handle(message -> {
35                    log.info("readFromQueue: {}", message);
36                    rabbitmqChannel1.send(message);
37                })
38                .get();
39    }
40
41    public static class MyFatalExceptionStrategy implements ErrorMessageStrategy {
42        @Override
43        public ErrorMessage buildErrorMessage(Throwable payload, AttributeAccessor attributes) {
44            throw new AmqpRejectAndDontRequeueException("Error In Message!");
45        }
46    }
47
48    @Bean
49    public MessageChannel rabbitmqChannel1() {
50        DirectChannel channel = new DirectChannel();
51        channel.setDatatypes(Customer.class);
52        return channel;
53    }
54
55    @Bean
56    public Queue inboundQueue() {
57        return new Queue("phone-queue", true, false, false);
58    }
59}

Setup

Project97

Spring Integration

https://gitorko.github.io/spring-integration-basics/

Version

Check version

1$java --version
2openjdk 17.0.3 2022-04-19 LTS

RabbitMQ

Run the docker command to start a rabbitmq instance

1docker run -d --hostname my-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management

Login to rabbitmq console http://localhost:8080

1username:guest
2password: guest

Postgres DB

1docker run -p 5432:5432 --name pg-container -e POSTGRES_PASSWORD=password -d postgres:9.6.10
2docker ps
3docker exec -it pg-container psql -U postgres -W postgres
4CREATE USER test WITH PASSWORD 'test@123';
5CREATE DATABASE "test-db" WITH OWNER "test" ENCODING UTF8 TEMPLATE template0;
6grant all PRIVILEGES ON DATABASE "test-db" to test;
7
8docker stop pg-container
9docker start pg-container

Dev

To run the code.

1./gradlew clean build
2./gradlew bootRun

Postman

Import the postman collection to postman

Postman Collection

References

https://spring.io/projects/spring-integration

https://www.enterpriseintegrationpatterns.com/

comments powered by Disqus