CompletableFuture
with either a Mono
or Flux
. In some cases, the API is expended to ease use of common reactive patterns.null
values in streams. Any null value returned from the handler will be mapped to Mono#empty().Retrying operationsAll operations support Reactor's retry mechanism:reactiveQueryGateway.query(query, ResponseType.class).retry(5);
This call will retry sending the query a maximum of five times when it fails.
org.axonframework.extensions.reactor
and artifact id axon-reactor-spring-boot-starter
. The implementation of the extension can be found here.ReactorCommandGateway
.send
- Sends the given command once the caller subscribes to the command result. Returns immediately.Mono
directly to the controller:void
, Mono<CommandHandlerResponseBody>
should be replaced with Mono<Void>
sendAll
- This method uses the given Publisher
of commands to dispatch incoming commands.sendAll
operation will keep sending commands until the input stream is canceled.send
and sendAll
do not offer any backpressure, yet. The only backpressure mechanism in place is that commands will be sent sequentially; thus once the result of a previous command arrives. The number of commands is prefetched from an incoming stream and stored in a buffer for sending (see Flux#concatMap). This slows down sending, but does not guarantee that the Subscriber will not be overwhelmed with commands if they are sent too fast.query
- Sends the given query
, expecting a response in the form of responseType
from a single source.scatterGather
- Sends the given query
, expecting a response in the form of responseType
from several sources within a specified duration
.subscriptionQuery
- Sends the given query
, returns the initial result and keeps streaming incremental updates until a subscriber unsubscribes from the Flux
.ReactorQueryGateway
is equivalent to:subscriptionQueryMany
- Sends the given query
, returns the initial result and keeps streaming incremental updates until a subscriber unsubscribes from the Flux
.ReactorQueryGateway
is equivalent to:queryUpdates
- Sends the given query
and streams incremental updates until a subscriber unsubscribes from the Flux
.ReactorQueryGateway
is equivalent to:Flux
. When using the regular QueryGateway
, the subscription query needs to be closed manually however.EventGateway
. Provides support for reactive return types such as Flux
.publish
- Publishes the given events
once the caller subscribes to the resulting Flux
.Flux
.ReactorMessageDispatchInterceptor
and ReactorResultHandlerInterceptor
.ReactorMessageDispatchInterceptor
should be used to centrally apply rules and validations for outgoing messages. Note that a ReactorMessageDispatchInterceptor
is an implementation of the default MessageDispatchInterceptor
interface used throughout the framework. The implementation of this interface is described as follows:MessageDispatchInterceptor#handle(List<? extends M>
method to utilize the ReactorMessageDispatchInterceptor#intercept(Mono<M>)
method. As such, a ReactorMessageDispatchInterceptor
could thus be configured on a plain Axon gateway too. Here are a couple of examples how a message dispatch interceptor could be used:MetaData
.ReactorResultHandlerInterceptor
should be used to centrally apply rules and validations for incoming messages, a.k.a. results. The implementation of this interface is described as follows:message
that has been sent, and a Flux
of results
from that message, which is going to be intercepted. The message
parameter can be useful if you want to apply a given result rule only for specific messages. Here are a couple of examples how a message result interceptor could be used:blockedPayload
String
.queryName
matches myBlockedQuery
.