CommandGateway
interface and the DefaultCommandGateway
implementation provided by Axon. The command gateway provides a number of methods that allow you to send a command and wait for a result either synchronously, with a timeout or asynchronously.CommandGatewayFactory
. This allows you to define your application's interface using strong typing and declaring your own (checked) business exceptions. Axon will automatically generate an implementation for that interface at runtime.RetryScheduler
, CommandDispatchInterceptor
s, and CommandCallback
s.RetryScheduler
is capable of scheduling retries when command execution has failed. The IntervalRetryScheduler
is an implementation that will retry a given command at set intervals until it succeeds, or a maximum number of retries is done. When a command fails due to an exception that is explicitly non-transient, no retries are done at all. Note that the retry scheduler is only invoked when a command fails due to a RuntimeException
. Checked exceptions are regarded "business exception" and will never trigger a retry. Typical usage of a RetryScheduler
is when dispatching commands on a Distributed Command Bus. If a node fails, the Retry Scheduler will cause a command to be dispatched to the next node capable of processing the command (see Distributing the Command Bus).CommandDispatchInterceptor
s allow modification of CommandMessage
s prior to dispatching them to the Command Bus. In contrast to CommandDispatchInterceptor
s configured on the CommandBus, these interceptors are only invoked when messages are sent through this gateway. The interceptors can be used to attach meta data to a command or do validation, for example.CommandCallback
s are invoked for each command sent. This allows for some generic behavior for all Commands sent through this gateway, regardless of their type.@MetaDataValue
will have their value assigned to the meta data field with the identifier passed as annotation parameterMetaData
will be merged with the MetaData
on the CommandMessage. Meta data defined by latter parameters will overwrite the meta data of earlier parameters, if their key is equal.CommandCallback
will have their onSuccess
or onFailure
invoked after the Command is handled. You may pass in more than one callback, and it may be combined with a return value. In that case, the invocations of the callback will always match with the return value (or exception).long
(or int
) and TimeUnit
. In that case the method will block at most as long as these parameters indicate. How the method reacts on a timeout depends on the exceptions declared on the method (see below). Note that if other properties of the method prevent blocking altogether, a timeout will never occur.void
return type will cause the method to return immediately, unless there are other indications on the method that one would want to wait, such as a timeout or declared exceptions.Future
, CompletionStage
and CompletableFuture
will cause the method to return immediately. You can access the result of the Command Handler using the CompletableFuture
instance returned from the method. Exceptions and timeouts declared on the method are ignored.CommandExecutionException
, which is a RuntimeException
.null
from the method. This can be changed by declaring a TimeoutException
. If this exception is declared, a TimeoutException
is thrown instead.InterruptedException
on the method, this behavior is changed to throw that exception instead. The interrupt flag is removed when the exception is thrown, consistent with the java specification.@MetaDataValue
annotation on a parameter will have the value of that parameter added as meta data value. The key of the meta data entry is provided as parameter to the annotation.@Timeout
will block at most the indicated amount of time. This annotation is ignored if the method declares timeout parameters.@Timeout
will cause all methods declared in that class to block at most the indicated amount of time, unless they are annotated with their own @Timeout
annotation or specify timeout parameters.NoHandlerForCommandException
exception is thrown. Subscribing multiple command handlers to the same command type will result in subscriptions replacing each other. In that case, the last subscription wins.dispatch(commandMessage, callback)
and dispatch(commandMessage)
. The first parameter is a message containing the actual command to dispatch. The optional second parameter takes a callback that allows the dispatching component to be notified when command handling is completed. This callback has two methods: onSuccess()
and onFailure()
, which are called when command handling returned normally, or when it threw an exception, respectively.FutureCallback
. It is a combination of a Future
(as defined in the java.concurrent package) and Axon's CommandCallback
. Alternatively, consider using a Command Gateway.dispatch(commandMessage)
method can be used.SimpleCommandBus
is, as the name suggests, the simplest implementation. It does straightforward processing of commands in the thread that dispatches them. After a command is processed, the modified aggregate(s) are saved and generated events are published in that same thread. In most scenarios, such as web applications, this implementation will suit your needs. The SimpleCommandBus
is the implementation used by default in the Configuration API.CommandBus
implementations, the SimpleCommandBus
allows interceptors to be configured. CommandDispatchInterceptor
s are invoked when a command is dispatched on the Command Bus. The CommandHandlerInterceptor
s are invoked before the actual command handler method is, allowing you to do modify or block the command. See Command Interceptors for more information.CommandBus
implementations.AsynchronousCommandBus
implementation executes Commands asynchronously from the thread that dispatches them. It uses an Executor to perform the actual handling logic on a different Thread.AsynchronousCommandBus
uses an unbounded cached thread pool. This means a thread is created when a Command is dispatched. Threads that have finished processing a Command are reused for new commands. Threads are stopped if they haven't processed a command for 60 seconds.Executor
instance may be provided to configure a different threading strategy.AsynchronousCommandBus
should be shut down when stopping the application, to make sure any waiting threads are properly shut down. To shut down, call the shutdown()
method. This will also shutdown any provided Executor
instance, if it implements the ExecutorService
interface.SimpleCommandBus
has reasonable performance characteristics, especially when you've gone through the performance tips in Performance Tuning. The fact that the SimpleCommandBus
needs locking to prevent multiple threads from concurrently accessing the same aggregate causes processing overhead and lock contention.DisruptorCommandBus
takes a different approach to multithreaded processing. Instead of having multiple threads each doing the same process, there are multiple threads, each taking care of a piece of the process. The DisruptorCommandBus
uses the Disruptor (http://lmax-exchange.github.io/disruptor/), a small framework for concurrent programming, to achieve much better performance, by just taking a different approach to multi-threading. Instead of doing the processing in the calling thread, the tasks are handed off to two groups of threads, that each take care of a part of the processing. The first group of threads will execute the command handler, changing an aggregate's state. The second group will store and publish the events to the Event Store.DisruptorCommandBus
easily outperforms the SimpleCommandBus
by a factor of 4(!), there are a few limitations:DisruptorCommandBus
only supports Event Sourced Aggregates. This Command Bus also acts as a Repository for the aggregates processed by the Disruptor. To get a reference to the Repository, use createRepository(AggregateFactory)
.DisruptorCommandBus
cannot guarantee that Commands are processed in the order they were dispatched. Furthermore, it requires a retry of a number of other commands, causing unnecessary computations.DisruptorCommandBus
instance, you need an EventStore
. This component is explained in Repositories and Event Stores.DisruptorConfiguration
instance, which allows you to tweak the configuration to optimize performance for your specific environment:BusySpinWaitStrategy
. To make the Command Bus claim less of the CPU and allow other threads to do processing, use the YieldingWaitStrategy
. Finally, you can use the SleepingWaitStrategy
and BlockingWaitStrategy
to allow other processes a fair share of CPU. The latter is suitable if the Command Bus is not expected to be processing full-time. Defaults to the BlockingWaitStrategy
.DisruptorCommandBus
. This executor must be able to provide at least 4 threads. 3 of the threads are claimed by the processing components of the DisruptorCommandBus
. Extra threads are used to invoke callbacks and to schedule retries in case an Aggregate's state is detected to be corrupt. Defaults to a CachedThreadPool
that provides threads from a thread group called "DisruptorCommandBus".CommandHandlerInterceptor
s that are to be used in the invocation process. This is the process that calls the actual Command Handler method.CommandHandlerInterceptor
s that are to be used in the publication process. This is the process that stores and publishes the generated events.false
the callback's onFailure()
method will be invoked. If true
(the default), the command will be rescheduled instead.DisruptorCommandBus
will wrap all generated events in a SerializationAware
message. The serialized form of the payload and meta data is attached before they are published to the Event Store.CommandBus
.CommandBus
by doing the following:@NotEmpty
and @Pattern
. You need to include a JSR 303 implementation (such as Hibernate-Validator) on your classpath. Then, configure a BeanValidationInterceptor
on your Command Bus, and it will automatically find and configure your validator implementation. While it uses sensible defaults, you can fine-tune it to your specific needs.TipYou want to spend as few resources on an invalid command as possible. Therefore, this interceptor is generally placed in the very front of the interceptor chain. In some cases, a Logging or Auditing interceptor might need to be placed in front, with the validating interceptor immediately following it.
MessageHandlerInterceptor
, allowing you to configure it as a Handler Interceptor as well.MessageHandlerInterceptor
interface. This interface declares one method, handle
, that takes three parameters: the command message, the current UnitOfWork
and an InterceptorChain
. The InterceptorChain
is used to continue the dispatching process, whereas the UnitOfWork
gives you (1) the message being handled and (2) provides the possibility to tie in logic prior, during or after (command) message handling (see UnitOfWork for more information about the phases).TransactionManagingInterceptor
, which in turn is configured with a TransactionManager
to start and commit (or roll back) the actual transaction.axonUser
as a value for the userId
field in the MetaData
. If the userId
is not present in the meta-data, an exception will be thrown which will prevent the command from being handled. And if the userId
's value does not match axonUser
, we will also not proceed up the chain.CommandBus
like so:@CommandHandlerInterceptor
Annotation@CommandHandlerInterceptor
annotated method with on the Aggregate/Entity. The difference between a method on an Aggregate and a "regular" Command Handler Interceptor, is that with the annotation approach you can make decisions based on the current state of the given Aggregate. Some properties of an annotated Command Handler Interceptor are:InterceptorChain
as a parameter of the command handler interceptor method and use it to control command execution.commandNamePattern
attribute of the @CommandHandlerInterceptor
annotation we can intercept all commands matching provided regular expression.@CommandHandlerInterceptor
method which prevents command execution if a command's state
field does not match the Aggregate's state
field:DistributedCommandBus
comes in. Unlike the other CommandBus
implementations, the DistributedCommandBus
does not invoke any handlers at all. All it does is form a "bridge" between Command Bus implementations on different JVM's. Each instance of the DistributedCommandBus
on each JVM is called a "Segment".NoteWhile the distributed command bus itself is part of the Axon Framework Core module, it requires components that you can find in one of the axon-distributed-commandbus-... modules. If you use Maven, make sure you have the appropriate dependencies set. The groupId and version are identical to those of the Core module.
DistributedCommandBus
relies on two components: a CommandBusConnector
, which implements the communication protocol between the JVM's, and the CommandRouter
, which chooses a destination for each incoming Command. This Router defines which segment of the Distributed Command Bus should be given a Command, based on a Routing Key calculated by a Routing Strategy. Two commands with the same Routing Key will always be routed to the same segment, as long as there is no change in the number and configuration of the segments. Generally, the identifier of the targeted aggregate is used as a routing key.RoutingStrategy
are provided: the MetaDataRoutingStrategy
, which uses a Meta Data property in the Command Message to find the routing key, and the AnnotationRoutingStrategy
, which uses the @TargetAggregateIdentifier
annotation on the Command Messages payload to extract the Routing Key. Obviously, you can also provide your own implementation.JGroupsConnector
uses (as the name already gives away) JGroups as the underlying discovery and dispatching mechanism. Describing the feature set of JGroups is a bit too much for this reference guide, so please refer to the JGroups User Guide for more details.JGroupsConnector
acts as both a CommandBusConnector
and a CommandRouter
.NoteYou can find the JGroups specific components for theDistributedCommandBus
in theaxon-distributed-commandbus-jgroups
module.
NoteWhen using a Cache, it should be cleared out when theConsistentHash
changes to avoid potential data corruption (e.g. when commands don't specify a@TargetAggregateVersion
and a new member quickly joins and leaves the JGroup, modifying the aggregate while it's still cached elsewhere.)
connect()
method.NoteNote that it is not required that all segments have Command Handlers for the same type of Commands. You may use different segments for different Command Types altogether. The Distributed Command Bus will always choose a node to dispatch a Command to that has support for that specific type of Command.
JGroupsConnectorFactoryBean
. It automatically connects the Connector when the ApplicationContext is started, and does a proper disconnect when the ApplicationContext
is shut down. Furthermore, it uses sensible defaults for a testing environment (but should not be considered production ready) and autowiring for the configuration.NoteTheSpringCloudCommandRouter
uses the Spring Cloud specificServiceInstance.Metadata
field to inform all the nodes in the system of its message routing information. It is thus of importance that the Spring Cloud implementation selected supports the usage of theServiceInstance.Metadata
field. If the desired Spring Cloud implementation does not support the modification of theServiceInstance.Metadata
(e.g. Consul), theSpringCloudHttpBackupCommandRouter
is a viable solution. See the end of this chapter for configuration specifics on theSpringCloudHttpBackupCommandRouter
.
SpringCloudCommandRouter
and a SpringHttpCommandBusConnector
, which respectively fill the place of the CommandRouter
and the CommandBusConnector
for the DistributedCommandBus
.NoteWhen using theSpringCloudCommandRouter
, make sure that your Spring application is has heartbeat events enabled. The implementation leverages the heartbeat events published by a Spring Cloud application to check whether its knowledge of all the others nodes is up to date. Hence if heartbeat events are disabled the majority of the Axon applications within your cluster will not be aware of the entire set up, thus posing issues for correct command routing.
SpringCloudCommandRouter
has to be created by providing the following:DiscoveryClient
- This can be provided by annotating your Spring Boot application with @EnableDiscoveryClient
,RoutingStrategy
- The axon-messaging
module currently provides several implementations, but a function call can suffice as well. AnnotationRoutingStrategy
and annotate the field on the payload that identifies the aggregate with @TargetAggregateIdentifier
.Registration
- If you're Spring Boot application is annotated with the aforementioned @EnableDiscoveryClient
,Registration
bean referencing the instance itself. SpringCloudCommandRouter
are:Predicate<ServiceInstance>
. This predicate is used to filter out ServiceInstances
which the DiscoveryClient
might encounter which by forehand you know will not handle any command messages. This might be useful if you've got several services within the Spring Cloud Discovery Service set up which you do not want to take into account for command handling, ever.ConsistentHashChangeListener
. Adding a consistent hash change listener provides you the opportunity to perform a specific task if new members have been added to the known command handlers set.SpringHttpCommandBusConnector
requires three parameters for creation:CommandBus
. This is the Command Bus implementation that dispatches Commands to the local JVM. These commands may have been dispatched by instances on other JVMs or from the local one.RestOperations
object to perform the posting of a Command Message to another instance.Serializer
. The serializer is used to serialize the command messages before they are sent over the wire.NoteThe Spring Cloud Connector specific components for theDistributedCommandBus
can be found in theaxon-distributed-commandbus-springcloud
module.
SpringCloudCommandRouter
and SpringHttpCommandBusConnector
should then both be used for creating the DistributedCommandsBus
. In Spring Java config, that would look as follows:NoteNote that it is not required that all segments have Command Handlers for the same type of Commands. You may use different segments for different Command Types altogether. The Distributed Command Bus will always choose a node to dispatch a Command to that has support for that specific type of Command.
SpringCloudCommandRouter
uses the Metadata
map contained in the Spring Cloud ServiceInstance
to communicate the allowed message routing information throughout the distributed Axon environment. If the desired Spring Cloud implementation however does not allow the modification of the ServiceInstance.Metadata
field (e.g. Consul), one can choose to instantiate a SpringCloudHttpBackupCommandRouter
instead of the SpringCloudCommandRouter
.SpringCloudHttpBackupCommandRouter
, as the name suggests, has a back up mechanism if the ServiceInstance.Metadata
field does not contained the expected message routing information. That back up mechanism is to provide an HTTP endpoint from which the message routing information can be retrieved and by simultaneously adding the functionality to query that endpoint of other known nodes in the cluster to retrieve their message routing information. As such the back up mechanism functions is a Spring Controller to receive requests at a specifiable endpoint and uses a RestTemplate
to send request to other nodes at the specifiable endpoint.SpringCloudHttpBackupCommandRouter
instead of the SpringCloudCommandRouter
, add the following Spring Java configuration (which replaces the SpringCloudCommandRouter
method in our earlier example):