Spring - Rsocket

Overview

Spring boot client server application with rsocket

Github: https://github.com/gitorko/project02

Rsocket

RSocket is a binary & message passing protocol for multiplexed, duplex communication over TCP, WebSocket, and other byte stream transports.

Interaction Models

TypeDescription
Request-Responsesend one message and receive one back
Request-Streamsend one message and receive a stream of messages back
Channelsend streams of messages in both directions
Fire-and-Forgetsend a one-way message

Key features of RSocket protocol

  1. Reactive Streams - back pressure allows a requester to slow down a responder at the source, hence reducing reliance on network layer congestion control, and the need for buffering at the network level or at any level.
  2. Request throttling - "Leasing" after the LEASE frame that can be sent from each end to limit the total number of requests allowed by other end for a given time. Leases are renewed periodically.
  3. Session resumption - loss of connectivity and requires some state to be maintained.
  4. Fragmentation - re-assembly of large messages.
  5. Keepalive - heartbeats.

Differences

RSocketGRPCRest
Binary Protocol (TCP, a File, WebSockets)Works on HTTP2 (Protocol Buffers)Works on HTTP/1.1
Works on 5/6 layer of OSI modelWorks on 7 layer of OSI model
Support Back pressure handling

Code

 1package com.demo.project02.rserver.controller;
 2
 3import java.time.Duration;
 4import java.time.Instant;
 5import java.util.stream.Stream;
 6
 7import com.demo.project02.rcommon.GreetingRequest;
 8import com.demo.project02.rcommon.GreetingResponse;
 9import org.springframework.messaging.handler.annotation.MessageMapping;
10import org.springframework.stereotype.Controller;
11import reactor.core.publisher.Flux;
12
13@Controller
14public class GreetingController {
15
16    /**
17     * Fire-Forget - No response
18     * Request-Response - Single value comes in, single value returned
19     * Request-Stream - Single value comes in, multiple values returned
20     * Channel - Multiple values comes in, multiple values returned.
21     */
22    @MessageMapping("greetings")
23    Flux<GreetingResponse> greet(GreetingRequest greetingRequest) {
24        var stream = Stream.generate(() -> new GreetingResponse("Hello " + greetingRequest.getName() + " @ " + Instant.now()));
25        return Flux.fromStream(stream)
26                .delayElements(Duration.ofSeconds(1));
27
28    }
29}
 1package com.demo.project02.rclient;
 2
 3import java.time.Duration;
 4
 5import com.demo.project02.rcommon.GreetingRequest;
 6import com.demo.project02.rcommon.GreetingResponse;
 7import lombok.SneakyThrows;
 8import lombok.extern.slf4j.Slf4j;
 9import org.springframework.boot.CommandLineRunner;
10import org.springframework.boot.SpringApplication;
11import org.springframework.boot.autoconfigure.SpringBootApplication;
12import org.springframework.boot.context.event.ApplicationReadyEvent;
13import org.springframework.context.ApplicationListener;
14import org.springframework.context.annotation.Bean;
15import org.springframework.messaging.rsocket.RSocketRequester;
16import reactor.core.publisher.Mono;
17import reactor.util.retry.Retry;
18
19@SpringBootApplication
20@Slf4j
21public class RclientApp {
22
23    @SneakyThrows
24    public static void main(String[] args) {
25        SpringApplication.run(RclientApp.class, args);
26        System.in.read();
27    }
28
29    @Bean
30    public CommandLineRunner onStart() {
31        return (args) -> {
32            log.info("On Start!");
33        };
34    }
35
36    @Bean
37    Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder builder) {
38        return builder
39                .rsocketConnector(connector -> connector
40                        .reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
41                .connectTcp("localhost", 8888);
42    }
43
44    @Bean
45    ApplicationListener<ApplicationReadyEvent> client(Mono<RSocketRequester> client) {
46        return (args) -> {
47            var greetingResponseFlux = client.flatMapMany(rSocketRequester -> {
48                return rSocketRequester.route("greetings")
49                        .data(new GreetingRequest("Jack"))
50                        .retrieveFlux(GreetingResponse.class);
51            });
52            greetingResponseFlux.subscribe(System.out::println);
53        };
54    }
55}

Setup

 1# Project 02
 2
 3Spring - Rsocket
 4
 5[https://gitorko.github.io/spring-rsocket/](https://gitorko.github.io/spring-rsocket/)
 6
 7### Version
 8
 9Check version
10
11```bash
12$java --version
13openjdk 21.0.3 2024-04-16 LTS
14```
15
16### Dev
17
18To run the code.
19
20```bash
21./gradlew clean build
22
23java -jar rserver/build/libs/rserver-1.0.0.jar
24java -jar rclient/build/libs/rclient-1.0.0.jar
25
26./gradlew :rserver:build
27./gradlew :rclient:build
28./gradlew :rcommon:build
29
30./gradlew :rserver:bootRun
31./gradlew :rclient:bootRun
32
33./gradlew bootJar
34```

References

https://docs.spring.io/spring-framework/reference/rsocket.html

https://medium.com/netifi/differences-between-grpc-and-rsocket-e736c954e60

comments powered by Disqus