[Spring Boot] RSocket using Spring Boot
As I was reading a book called “Reactive Spring”, I got an opportunity to learn about RSocket. Just by looking at the name of it, I thought it was another WebSocket but it wasn’t. As I was making a sample app, I thought it might be a good idea to write a few words so I can reference it in the future.
2. What is RSocket?
RSocket is an application protocol that provides Reactive Streams, a standard for asynchronous stream processing with non-blocking back pressure.
The motivation was to replace HTTP with a protocol that is more efficient and flexible and has less overhead.
- Message-driven (Reactive)
- Binary Protocol (faster than text-based)
- Abstracted away the underlying transport (supports WebSockets, Aeron, TCP)
- There are no client and server. Once a connection is established, the two can both become a requester and a responder that can talk to each other.
- Connections are multiplexed so there is no need to set up network connections over and over again.
- Provides back pressure (application level flow control), session resumption for long-lived streams (useful for mobile-to-server communication when network connections drop, switch, and reconnect frequently), and routing.
- Request/Response: A requester sends a request and responder returns with one unique value.
- Fire-and-Forget: A requester sends a request and responder does not produce a response.
- Request/Stream: A requester sends a request and responder returns multiple values.
- Channel: A requester sends multiple values and responder returns multiple values (also supports bi-directional stream).
RSocket can be used as a standard way for applications to communicate with each other across the network. This could mean network communication not only between microservices, it can be used between all different devices (mobile devices, set-top boxes, others) and different backend systems.
3. Use Cases
Refer to this link, it presents some real life use cases from Facebook and Alibaba.
4. Project Setup
In this article, we are going to create 2 Spring Boot applications: 1 Requester, 1 Responder.
Before We Start …
The term “requester” and “responder” can be confusing because when we look at bi-directional model later, each application can be a requester and a responder at the same time. To make things simple, from this point on I am going to refer
requester as the application initiating request and
responder as the application providing return values for the request.
For the entire dependencies (pom.xml), refer this repo.
First of all, we need the responder to host RSocket on a specific port. For this example, I will go with port 8181. This can be achieved with application configuration.
Secondly, we need the requester to connect to the RSocket at port 8181. For this, let’s create a JavaConfig. This Config class will grow overtime when there is a need to communicate bi-directionally or when security comes in to play. For now, let’s keep it at minimum.
5. Interaction Models
For request/response, the requester makes a request with
Mono<T> and the responder returns with
Because this is the easiest interaction and most of us are familiar with this model because this is the way HTTP works.
One interesting thing to observe is how
routing is done for
@MessageMapping. This is completely optional, but I thought it would be nice to show that the requester can provide some identifier so that responder can do something with the given identifier.
Use your favorite REST client tool. Call the requester’s endpoint to see that responder returned by adding additional string to the requested data from the requester.
For request/stream, the requester makes a request with
Publisher<T> and the responder returns with
For Fire-and-Forget, the
send() method is used instead of
retrieveFlux() and it returns
For request/stream, the requester makes a request with
Mono<T> and the responder returns with
This will make sense once channel model is covered. We already demonstrated sending a request with
Mono<T>. Channel examples will show how responder returns with
Flux<T>. This is the combination of the two.
For channel, the requester makes a request with
Flux<T> and the responder returns
For this example, using
text/event-stream is not necessary. Therefore,
produce= MediaType.TEXT_EVENT_STREAM_VALUE can be removed. I put it there so that the output looks nicer on the browser. Also, I limited the stream to size of 10 by using
take(10). Removing this will give you an infinite stream.
The requester will send a stream of UUIDs, one UUID per second.
The responder will convert the UUIDs given by the requester to uppercase and return one by one.
This time I will use a browser to demonstrate.
This case uses the same input and output types (Flux) as above. In this example, the requester will communicate a stream of data back and forth until some flag has raised. Then, the responder will make a request to the requester.
The flow will look like this. The request-response streaming will go as long as the requester responds with
STARTED flag. Then, once (after 3 seconds in this example) the requester responds with
STOPPED flag, the responder will stop relaying.
First of all, let’s make an endpoint to for requester to start sending a stream.
Then, the code below will make request to the responder.
Here is the
ConditionFlag class which will be used to end streaming.
For simplicity, the requester will respond back with
STOPPED flag after 3 seconds.
Lastly, we need to add
rsocketConnector part in the requester JavaConfig to mount a responder accessible to other requesters.
The responder needs to initiate a request to the route
health on the requester. However, it will only filter out the
STOPPED instances. The responder will provide responses until
healthFlux sees a
Simply, copy and paste the
ConditionFlag from the requester.
For this example, I captured the logs since the the endpoint returns void. I cropped out the unnecessary parts of the logs that take up too much space.
11:59:39.408 io.jay.requester.RequesterController : Launching 1 clients
11:59:40.778 i.j.r.RequesterMessageController : STARTED?
11:59:41.783 io.jay.requester.RequesterController : Hello Client #0 @ 2022-01-06T02:59:40.760525Z
11:59:41.806 i.j.r.RequesterMessageController : Sending status started
11:59:42.796 io.jay.requester.RequesterController : Hello Client #0 @ 2022-01-06T02:59:41.782126Z
11:59:42.827 i.j.r.RequesterMessageController : Sending status started
11:59:43.808 io.jay.requester.RequesterController : Hello Client #0 @ 2022-01-06T02:59:42.795974Z
11:59:43.838 i.j.r.RequesterMessageController : Sending status started
11:59:44.822 io.jay.requester.RequesterController : Hello Client #0 @ 2022-01-06T02:59:43.807631Z
11:59:44.848 i.j.r.RequesterMessageController : Sending status stopped
11:59:40.711 io.jay.responder.ResponderController : Bi-directional Client #0
11:59:41.770 io.jay.responder.ResponderController : Hello Client #0 @ 2022-01-06T02:59:40.760525Z
11:59:42.786 io.jay.responder.ResponderController : Hello Client #0 @ 2022-01-06T02:59:41.782126Z
11:59:43.796 io.jay.responder.ResponderController : Hello Client #0 @ 2022-01-06T02:59:42.795974Z
11:59:44.811 io.jay.responder.ResponderController : Hello Client #0 @ 2022-01-06T02:59:43.807631Z
11:59:44.859 io.jay.responder.ResponderController : ConditionFlag(state=stopped)
6. Error Handling
For this error handling example, let’s create an endpoint for a request/response model that talks to a dummy route. When this dummy route throws an exception, the requester will simply return some string.
The dummy route will always return an error from the responder. Also, an exception handler can be implemented using
@MessageExceptionHandler similar to MVC’s exception handler. It is possible to handle certain types of exception. For this example, I will just handle any runtime exceptions thrown by the responder.
The error was thrown by the responder with message “something bad happened” and it was propagated to the requester correctly because the same error message was printed in the logs.
A payload in RSocket can be distinguished in two types: data and metadata. The metadata can be used to store different information about the message. In fact, we are going to encode security credentials in the metadata when applying security in the next section.
A custom metadata can be added when making a request using
To retrieve custom metadata, we need to add a JavaConfig.
Defining some constants.
Spring injects the metadata as headers automatically.
A specific header value can be retrieved by using
@Header(NAME), and all headers can be retrieved by using
Let us look at what kind of headers Spring injects by default. Note that “custom-header” is the custom metadata we added previously.
8. Applying Security
RSocket endpoints can be secured using Spring Security for simple authentication and JWT authentication.
In this example, we are going to create some restricted endpoints for the responder and the requester will request using simple authentication.
First of all, we need two dependencies.
Let’s now create a new endpoint that initiates to a restricted route. Look at how the user credential is passed into a metadata.
Up to this point, if you run the application and make a REST call to
/auth, it will fail with
java.lang.IllegalArgumentException: No encoder for org.springframework.security.rsocket.metadata.UsernamePasswordMetadata
Therefore, we need to add a
SimpleAuthenticationEncoder to the requester.
Add these dependencies.
Let’s add a new route endpoint.
From this point on, let’s run the application and see what kind of error pop up and figure out how we can resolve each one.
If you run both requester and responder now, you will notice that every request will fail with an error.
Looking at the requester log, I found this:
io.rsocket.exceptions.RejectedSetupException: Access Denied
Seems like all route endpoints are restricted now after adding the security related dependencies. Therefore, let’s create a JavaConfig to restrict
auth route only and allow other routes.
With the security configuration in, all endpoints beside
auth will work again.
At this point, any request to route
auth fails because the responder does not have any user. So we are going to configure in-memory authenticated user details and also provide an authorization filter using simple authentication.
Now everything should work. Let’s run the app and find out. The auth endpoint fails again with this time error like this:
org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public reactor.core.publisher.Mono<java.lang.String> io.jay.responder.RestrictedController.authenticate(reactor.core.publisher.Mono<org.springframework.security.core.userdetails.UserDetails>): Payload content is missing: public reactor.core.publisher.Mono<java.lang.String> io.jay.responder.RestrictedController.authenticate(reactor.core.publisher.Mono<org.springframework.security.core.userdetails.UserDetails>)
The problem is that after going through the authentication and authorization, we were not able to resolve
@AuthenticationPrincipal. I have used this using spring-security with Spring MVC. Back then, I think it worked like a charm out of the box. But for RSocket controller, we need to let spring-security know what this is. Again in the
SecurityConfiguration, let’s add another bean at the end.
Finally now, everything should work. Let’s verify.
Lastly, I also want to point out that
UsernamePasswordMetadata can be passed per request like we have been doing and also during setup of
Restart both apps and make a REST call to
/auth. You will see a log like this.
In this article, we took a look at RSocket. As I learned more about it, it seemed more appealing. It was very well integrated with Spring. The documentation was poor and it was hard to find good answers on internet as I was making this simple example. I hope to have a chance where I can apply this in real life sometime in the future. This also motivated me to learn about different ways for microservices to communicate with each other.
Full source can be found at
You can't perform that action at this time. You signed in with another tab or window. You signed out in another tab or…
RSocket: Solving Real-World Architectural Challenges
Ondrej Lehecka of Facebook, Robert Roeser of Netifi, and Andy Shi of Alibaba explain the use cases for RSocket within…
Reactive Service to Service Communication with RSocket - Introduction
This article is the first one of the mini-series which will help you to get familiar with RSocket - a new binary…
Motivations | RSocket
Large, distributed systems are often implemented in a modular fashion by different teams using a variety of…