[Spring Boot] RSocket using Spring Boot

1. Introduction

As I was reading a book called “Reactive Spring”, I got an opportunity to learn about RSocket. Just by looking at the name of it, I thought it was another WebSocket but it wasn’t. As I was making a sample app, I thought it might be a good idea to write a few words so I can reference it in the future.

2. What is RSocket?

RSocket is an application protocol that provides Reactive Streams, a standard for asynchronous stream processing with non-blocking back pressure.

Motivation

The motivation was to replace HTTP with a protocol that is more efficient and flexible and has less overhead.

RSocket Characteristics

  • Message-driven (Reactive)
  • Binary Protocol (faster than text-based)
  • Abstracted away the underlying transport (supports WebSockets, Aeron, TCP)
  • There are no client and server. Once a connection is established, the two can both become a requester and a responder that can talk to each other.
  • Connections are multiplexed so there is no need to set up network connections over and over again.
  • Provides back pressure (application level flow control), session resumption for long-lived streams (useful for mobile-to-server communication when network connections drop, switch, and reconnect frequently), and routing.

Interaction Models

  • Request/Response: A requester sends a request and responder returns with one unique value.
  • Fire-and-Forget: A requester sends a request and responder does not produce a response.
  • Request/Stream: A requester sends a request and responder returns multiple values.
  • Channel: A requester sends multiple values and responder returns multiple values (also supports bi-directional stream).

RSocket can be used as a standard way for applications to communicate with each other across the network. This could mean network communication not only between microservices, it can be used between all different devices (mobile devices, set-top boxes, others) and different backend systems.

3. Use Cases

Refer to this link, it presents some real life use cases from Facebook and Alibaba.

4. Project Setup

In this article, we are going to create 2 Spring Boot applications: 1 Requester, 1 Responder.

Before We Start …

The term “requester” and “responder” can be confusing because when we look at bi-directional model later, each application can be a requester and a responder at the same time. To make things simple, from this point on I am going to refer requester as the application initiating request and responder as the application providing return values for the request.

Application Setup

For the entire dependencies (pom.xml), refer this repo.

First of all, we need the responder to host RSocket on a specific port. For this example, I will go with port 8181. This can be achieved with application configuration.

Secondly, we need the requester to connect to the RSocket at port 8181. For this, let’s create a JavaConfig. This Config class will grow overtime when there is a need to communicate bi-directionally or when security comes in to play. For now, let’s keep it at minimum.

5. Interaction Models

Request/Response

For request/response, the requester makes a request with Mono<T> and the responder returns with Mono<T>.

Requester

Responder

Because this is the easiest interaction and most of us are familiar with this model because this is the way HTTP works.

One interesting thing to observe is how routing is done for @MessageMapping. This is completely optional, but I thought it would be nice to show that the requester can provide some identifier so that responder can do something with the given identifier.

Result

Use your favorite REST client tool. Call the requester’s endpoint to see that responder returned by adding additional string to the requested data from the requester.

Request/Response result

Fire-and-Forget

For request/stream, the requester makes a request with Publisher<T> and the responder returns with Mono<Void>.

Requester

For Fire-and-Forget, the send() method is used instead of retrieveMono() or retrieveFlux() and it returns Mono<Void>.

Responder

Result

Fire-and-Forget Result

Request/Stream

For request/stream, the requester makes a request with Mono<T> and the responder returns with Flux<T>.

This will make sense once channel model is covered. We already demonstrated sending a request with Mono<T>. Channel examples will show how responder returns with Flux<T>. This is the combination of the two.

Channel Streaming

For channel, the requester makes a request with Flux<T> and the responder returns Flux<T>.

Requester

For this example, using text/event-stream is not necessary. Therefore, produce= MediaType.TEXT_EVENT_STREAM_VALUE can be removed. I put it there so that the output looks nicer on the browser. Also, I limited the stream to size of 10 by using take(10). Removing this will give you an infinite stream.

The requester will send a stream of UUIDs, one UUID per second.

Responder

The responder will convert the UUIDs given by the requester to uppercase and return one by one.

Result

This time I will use a browser to demonstrate.

Streaming Result

Channel Bi-directional

This case uses the same input and output types (Flux) as above. In this example, the requester will communicate a stream of data back and forth until some flag has raised. Then, the responder will make a request to the requester.

The flow will look like this. The request-response streaming will go as long as the requester responds with STARTED flag. Then, once (after 3 seconds in this example) the requester responds with STOPPED flag, the responder will stop relaying.

Requester

First of all, let’s make an endpoint to for requester to start sending a stream.

Then, the code below will make request to the responder.

Here is the ConditionFlag class which will be used to end streaming.

For simplicity, the requester will respond back with STOPPED flag after 3 seconds.

Lastly, we need to add rsocketConnector part in the requester JavaConfig to mount a responder accessible to other requesters.

Responder

The responder needs to initiate a request to the route health on the requester. However, it will only filter out the STOPPED instances. The responder will provide responses until healthFlux sees a STOPPED value.

Simply, copy and paste the ConditionFlag from the requester.

Result

For this example, I captured the logs since the the endpoint returns void. I cropped out the unnecessary parts of the logs that take up too much space.

Requester Log

11:59:39.408  io.jay.requester.RequesterController : Launching 1 clients
11:59:40.778 i.j.r.RequesterMessageController : STARTED?
11:59:41.783 io.jay.requester.RequesterController : Hello Client #0 @ 2022-01-06T02:59:40.760525Z
11:59:41.806 i.j.r.RequesterMessageController : Sending status started
11:59:42.796 io.jay.requester.RequesterController : Hello Client #0 @ 2022-01-06T02:59:41.782126Z
11:59:42.827 i.j.r.RequesterMessageController : Sending status started
11:59:43.808 io.jay.requester.RequesterController : Hello Client #0 @ 2022-01-06T02:59:42.795974Z
11:59:43.838 i.j.r.RequesterMessageController : Sending status started
11:59:44.822 io.jay.requester.RequesterController : Hello Client #0 @ 2022-01-06T02:59:43.807631Z
11:59:44.848 i.j.r.RequesterMessageController : Sending status stopped

Responder Log

11:59:40.711  io.jay.responder.ResponderController  : Bi-directional Client #0
11:59:41.770 io.jay.responder.ResponderController : Hello Client #0 @ 2022-01-06T02:59:40.760525Z
11:59:42.786 io.jay.responder.ResponderController : Hello Client #0 @ 2022-01-06T02:59:41.782126Z
11:59:43.796 io.jay.responder.ResponderController : Hello Client #0 @ 2022-01-06T02:59:42.795974Z
11:59:44.811 io.jay.responder.ResponderController : Hello Client #0 @ 2022-01-06T02:59:43.807631Z
11:59:44.859 io.jay.responder.ResponderController : ConditionFlag(state=stopped)

6. Error Handling

Requester

For this error handling example, let’s create an endpoint for a request/response model that talks to a dummy route. When this dummy route throws an exception, the requester will simply return some string.

Responder

The dummy route will always return an error from the responder. Also, an exception handler can be implemented using @MessageExceptionHandler similar to MVC’s exception handler. It is possible to handle certain types of exception. For this example, I will just handle any runtime exceptions thrown by the responder.

Result

The error was thrown by the responder with message “something bad happened” and it was propagated to the requester correctly because the same error message was printed in the logs.

Requester log
Error handling result

7. Metadata

A payload in RSocket can be distinguished in two types: data and metadata. The metadata can be used to store different information about the message. In fact, we are going to encode security credentials in the metadata when applying security in the next section.

Requester

A custom metadata can be added when making a request using metadata().

Responder

To retrieve custom metadata, we need to add a JavaConfig.

Defining some constants.

Spring injects the metadata as headers automatically.

A specific header value can be retrieved by using @Header(NAME), and all headers can be retrieved by using @Headers.

Let us look at what kind of headers Spring injects by default. Note that “custom-header” is the custom metadata we added previously.

Debug output

8. Applying Security

RSocket endpoints can be secured using Spring Security for simple authentication and JWT authentication.

In this example, we are going to create some restricted endpoints for the responder and the requester will request using simple authentication.

Requester

First of all, we need two dependencies.

Let’s now create a new endpoint that initiates to a restricted route. Look at how the user credential is passed into a metadata.

Up to this point, if you run the application and make a REST call to /auth, it will fail with

java.lang.IllegalArgumentException: No encoder for org.springframework.security.rsocket.metadata.UsernamePasswordMetadata

Therefore, we need to add a SimpleAuthenticationEncoder to the requester.

Responder

Add these dependencies.

Let’s add a new route endpoint.

From this point on, let’s run the application and see what kind of error pop up and figure out how we can resolve each one.

If you run both requester and responder now, you will notice that every request will fail with an error.

Unexpected error

Looking at the requester log, I found this:

io.rsocket.exceptions.RejectedSetupException: Access Denied

Seems like all route endpoints are restricted now after adding the security related dependencies. Therefore, let’s create a JavaConfig to restrict auth route only and allow other routes.

With the security configuration in, all endpoints beside auth will work again.

Works again!!

At this point, any request to route auth fails because the responder does not have any user. So we are going to configure in-memory authenticated user details and also provide an authorization filter using simple authentication.

Now everything should work. Let’s run the app and find out. The auth endpoint fails again with this time error like this:

org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public reactor.core.publisher.Mono<java.lang.String> io.jay.responder.RestrictedController.authenticate(reactor.core.publisher.Mono<org.springframework.security.core.userdetails.UserDetails>): Payload content is missing: public reactor.core.publisher.Mono<java.lang.String> io.jay.responder.RestrictedController.authenticate(reactor.core.publisher.Mono<org.springframework.security.core.userdetails.UserDetails>)

The problem is that after going through the authentication and authorization, we were not able to resolve @AuthenticationPrincipal. I have used this using spring-security with Spring MVC. Back then, I think it worked like a charm out of the box. But for RSocket controller, we need to let spring-security know what this is. Again in the SecurityConfiguration, let’s add another bean at the end.

Finally now, everything should work. Let’s verify.

AuthenticationPrincipal after simple authentication

Lastly, I also want to point out that UsernamePasswordMetadata can be passed per request like we have been doing and also during setup of RSocketRequester.

Requester

Responder

Result

Restart both apps and make a REST call to /auth. You will see a log like this.

9. Conclusion

In this article, we took a look at RSocket. As I learned more about it, it seemed more appealing. It was very well integrated with Spring. The documentation was poor and it was hard to find good answers on internet as I was making this simple example. I hope to have a chance where I can apply this in real life sometime in the future. This also motivated me to learn about different ways for microservices to communicate with each other.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store