Spring - Rsocket
Overview
Spring boot client server application with rsocket
Github: https://github.com/gitorko/project02
Rsocket
RSocket is a binary & message passing protocol for multiplexed, duplex communication over TCP, WebSocket, and other byte stream transports.
Interaction Models
Type | Description |
---|---|
Request-Response | send one message and receive one back |
Request-Stream | send one message and receive a stream of messages back |
Channel | send streams of messages in both directions |
Fire-and-Forget | send a one-way message |
Key features of RSocket protocol
- Reactive Streams - back pressure allows a requester to slow down a responder at the source, hence reducing reliance on network layer congestion control, and the need for buffering at the network level or at any level.
- Request throttling - "Leasing" after the LEASE frame that can be sent from each end to limit the total number of requests allowed by other end for a given time. Leases are renewed periodically.
- Session resumption - loss of connectivity and requires some state to be maintained.
- Fragmentation - re-assembly of large messages.
- Keepalive - heartbeats.
Differences
RSocket | GRPC | Rest |
---|---|---|
Binary Protocol (TCP, a File, WebSockets) | Works on HTTP2 (Protocol Buffers) | Works on HTTP/1.1 |
Works on 5/6 layer of OSI model | Works on 7 layer of OSI model | |
Support Back pressure handling |
Code
1package com.demo.project02.rserver.controller;
2
3import java.time.Duration;
4import java.time.Instant;
5import java.util.stream.Stream;
6
7import com.demo.project02.rcommon.GreetingRequest;
8import com.demo.project02.rcommon.GreetingResponse;
9import org.springframework.messaging.handler.annotation.MessageMapping;
10import org.springframework.stereotype.Controller;
11import reactor.core.publisher.Flux;
12
13@Controller
14public class GreetingController {
15
16 /**
17 * Fire-Forget - No response
18 * Request-Response - Single value comes in, single value returned
19 * Request-Stream - Single value comes in, multiple values returned
20 * Channel - Multiple values comes in, multiple values returned.
21 */
22 @MessageMapping("greetings")
23 Flux<GreetingResponse> greet(GreetingRequest greetingRequest) {
24 var stream = Stream.generate(() -> new GreetingResponse("Hello " + greetingRequest.getName() + " @ " + Instant.now()));
25 return Flux.fromStream(stream)
26 .delayElements(Duration.ofSeconds(1));
27
28 }
29}
1package com.demo.project02.rclient;
2
3import java.time.Duration;
4
5import com.demo.project02.rcommon.GreetingRequest;
6import com.demo.project02.rcommon.GreetingResponse;
7import lombok.SneakyThrows;
8import lombok.extern.slf4j.Slf4j;
9import org.springframework.boot.CommandLineRunner;
10import org.springframework.boot.SpringApplication;
11import org.springframework.boot.autoconfigure.SpringBootApplication;
12import org.springframework.boot.context.event.ApplicationReadyEvent;
13import org.springframework.context.ApplicationListener;
14import org.springframework.context.annotation.Bean;
15import org.springframework.messaging.rsocket.RSocketRequester;
16import reactor.core.publisher.Mono;
17import reactor.util.retry.Retry;
18
19@SpringBootApplication
20@Slf4j
21public class RclientApp {
22
23 @SneakyThrows
24 public static void main(String[] args) {
25 SpringApplication.run(RclientApp.class, args);
26 System.in.read();
27 }
28
29 @Bean
30 public CommandLineRunner onStart() {
31 return (args) -> {
32 log.info("On Start!");
33 };
34 }
35
36 @Bean
37 Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder builder) {
38 return builder
39 .rsocketConnector(connector -> connector
40 .reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
41 .connectTcp("localhost", 8888);
42 }
43
44 @Bean
45 ApplicationListener<ApplicationReadyEvent> client(Mono<RSocketRequester> client) {
46 return (args) -> {
47 var greetingResponseFlux = client.flatMapMany(rSocketRequester -> {
48 return rSocketRequester.route("greetings")
49 .data(new GreetingRequest("Jack"))
50 .retrieveFlux(GreetingResponse.class);
51 });
52 greetingResponseFlux.subscribe(System.out::println);
53 };
54 }
55}
Setup
1# Project 02
2
3Spring - Rsocket
4
5[https://gitorko.github.io/spring-rsocket/](https://gitorko.github.io/spring-rsocket/)
6
7### Version
8
9Check version
10
11```bash
12$java --version
13openjdk 21.0.3 2024-04-16 LTS
14```
15
16### Dev
17
18To run the code.
19
20```bash
21./gradlew clean build
22
23java -jar rserver/build/libs/rserver-1.0.0.jar
24java -jar rclient/build/libs/rclient-1.0.0.jar
25
26./gradlew :rserver:build
27./gradlew :rclient:build
28./gradlew :rcommon:build
29
30./gradlew :rserver:bootRun
31./gradlew :rclient:bootRun
32
33./gradlew bootJar
34```
References
https://docs.spring.io/spring-framework/reference/rsocket.html
https://medium.com/netifi/differences-between-grpc-and-rsocket-e736c954e60
comments powered by Disqus