Reactor Gateways
The "Reactive Gateways" offer a reactive API wrapper around the command, query and event bus. Most of the operations are similar to those from non-reactive gateways, simply replacing the CompletableFuture
with either a Mono
or Flux
. In some cases, the API is expended to ease use of common reactive patterns.
Reactor doesn't allow null
values in streams. Any null value returned from the handler will be mapped to Mono#empty().
Retrying operations
All operations support Reactor's retry mechanism:
reactiveQueryGateway.query(query, ResponseType.class).retry(5);
This call will retry sending the query a maximum of five times when it fails.
Configuration in Spring Boot
This extension can be added as a Spring Boot starter dependency to your project using group id org.axonframework.extensions.reactor
and artifact id axon-reactor-spring-boot-starter
. The implementation of the extension can be found here.
Reactor Command Gateway
This section describes the methods on the ReactorCommandGateway
.
send
- Sends the given command once the caller subscribes to the command result. Returns immediately.
A common pattern is using the REST API to send a command. In this case it is recommend to for example use WebFlux, and return the command result Mono
directly to the controller:
Sending a command from a Spring WebFlux Controller.
If the command handling function returns type void
, Mono<CommandHandlerResponseBody>
should be replaced with Mono<Void>
Another common pattern is "send and forget":
Function that sends a command and returns immediately without waiting for the result.
sendAll
- This method uses the given Publisher
of commands to dispatch incoming commands.
This operation is available only in the Reactor extension. Use it to connect 3rd party streams that delivers commands.
Connects an external input stream directly to the Reactor Command Gateway.
The sendAll
operation will keep sending commands until the input stream is canceled.
send
and sendAll
do not offer any backpressure, yet. The only backpressure mechanism in place is that commands will be sent sequentially; thus once the result of a previous command arrives. The number of commands is prefetched from an incoming stream and stored in a buffer for sending (see Flux#concatMap). This slows down sending, but does not guarantee that the Subscriber will not be overwhelmed with commands if they are sent too fast.
Reactor Query Gateway
query
- Sends the given query
, expecting a response in the form of responseType
from a single source.
Recommended way of using the Reactor query gateway within a Spring REST controller.
scatterGather
- Sends the given query
, expecting a response in the form of responseType
from several sources within a specified duration
.
Sends a given query that stops after receiving three results, or after 5 seconds.
Subscription queries
Firstly, the Reactor API for subscription queries in Axon is not new.
However, we noticed several patterns which are often used, such as:
Concatenating initial results with query updates in a single stream, or
skipping the initial result all together.
As such, the Reactor Extension provides several methods to ease usage of these common patterns.
subscriptionQuery
- Sends the given query
, returns the initial result and keeps streaming incremental updates until a subscriber unsubscribes from the Flux
.
Note that this method should be used when the response type of the initial result and incremental update match.
The above invocation through the ReactorQueryGateway
is equivalent to:
subscriptionQueryMany
- Sends the given query
, returns the initial result and keeps streaming incremental updates until a subscriber unsubscribes from the Flux
.
This operation should be used when the initial result contains multiple instances of the response type which needs to be flattened. Additionally, the response type of the initial response and incremental updates need to match.
The above invocation through the ReactorQueryGateway
is equivalent to:
queryUpdates
- Sends the given query
and streams incremental updates until a subscriber unsubscribes from the Flux
.
This method could be used when subscriber is only interested in updates.
The above invocation through the ReactorQueryGateway
is equivalent to:
In the above shown methods, the subscription query is closed automatically after a subscriber has unsubscribed from the Flux
. When using the regular QueryGateway
, the subscription query needs to be closed manually however.
Reactor Event Gateway
Reactive variation of the EventGateway
. Provides support for reactive return types such as Flux
.
publish
- Publishes the given events
once the caller subscribes to the resulting Flux
.
This method returns events that were published. Note that the returned events may be different from those the user has published, granted an interceptor has been registered which modifies events.
Example of dispatcher modified events, returned to user as the result Flux
.
Interceptors
Axon provides a notion of interceptors. The Reactor gateways allow for similar interceptor logic, namely the ReactorMessageDispatchInterceptor
and ReactorResultHandlerInterceptor
.
These interceptors allow us to centrally define rules and filters that will be applied to a message stream.
Interceptors will be applied in order they have been registered to the given component.
Reactor Dispatch Interceptors
The ReactorMessageDispatchInterceptor
should be used to centrally apply rules and validations for outgoing messages. Note that a ReactorMessageDispatchInterceptor
is an implementation of the default MessageDispatchInterceptor
interface used throughout the framework. The implementation of this interface is described as follows:
It thus defaults the MessageDispatchInterceptor#handle(List<? extends M>
method to utilize the ReactorMessageDispatchInterceptor#intercept(Mono<M>)
method. As such, a ReactorMessageDispatchInterceptor
could thus be configured on a plain Axon gateway too. Here are a couple of examples how a message dispatch interceptor could be used:
Dispatch interceptor that adds key-value pairs to the message's MetaData
.
Dispatch interceptor that discards the message, based on a security flag in the Reactor Context.
Reactor Result Handler Interceptors
The ReactorResultHandlerInterceptor
should be used to centrally apply rules and validations for incoming messages, a.k.a. results. The implementation of this interface is described as follows:
The parameters are the message
that has been sent, and a Flux
of results
from that message, which is going to be intercepted. The message
parameter can be useful if you want to apply a given result rule only for specific messages. Here are a couple of examples how a message result interceptor could be used:
This type of interceptor is available only in the Reactor Extension.
Result interceptor which discards all results that have a payload matching blockedPayload
Result interceptor which validates that the query result does not contain an empty String
.
Result interceptor which discards all results where the queryName
matches myBlockedQuery
.
Result interceptor which limits the result waiting time to thirty seconds per message.
Result interceptor which limits the number of results to five entries, and logs all results.
Last updated