EventMessage
. The GenericEventMessage.asEventMessage(Object)
method allows you to wrap any object into an EventMessage
. If the passed object is already an EventMessage
, it is simply returned.EventBus
is the mechanism that dispatches events to the subscribed event handlers. Axon provides two implementation of the Event Bus: SimpleEventBus
and EmbeddedEventStore
. While both implementations support subscribing and tracking processors (see Events Processors below), the EmbeddedEventStore
persists events, which allows you to replay them at a later stage. The SimpleEventBus
has a volatile storage and 'forgets' events as soon as they have been published to subscribed components.SimpleEventBus
is used by default. To configure the EmbeddedEventStore
instead, you need to supply an implementation of a StorageEngine, which does the actual storage of Events.org.axonframework.example.eventhandling.MyHandler
,org.axonframework.example.eventhandling.MyOtherHandler
, andorg.axonframework.example.eventhandling.module.MyHandler
org.axonframework.example.eventhandling
with 2 handlers, and org.axonframework.example.eventhandling.module
with a single handler@Order
annotation. This annotation should be placed on class level of your Event Handler class, adding a integer
value to specify the ordering.EventHandlingConfiguration
class of the Configuration API.EventHandlingConfiguration
class defines a number of methods that can be used to define how processors need to be configured.registerEventProcessorFactory
allows you to define a default factory method that creates Event Processors for which no explicit factories have been defined.registerEventProcessor(String name, EventProcessorBuilder builder)
defines the factory method to use to create a Processor with given name
. Note that such Processor is only created if name
is chosen as the processor for any of the available Event Handler beans.registerTrackingProcessor(String name)
defines that a processor with given name should be configured as a Tracking Event Processor, using default settings. It is configured with a TransactionManager and a TokenStore, both taken from the main configuration by default.registerTrackingProcessor(String name, Function<Configuration, TrackingEventProcessorConfiguration> processorConfiguration, Function<Configuration, SequencingPolicy<? super EventMessage<?>>> sequencingPolicy)
defines that a processor with given name should be configured as a Tracking Processor, and use the given TrackingEventProcessorConfiguration
to read the configuration settings for multi-threading. The SequencingPolicy
defines which expectations the processor has on sequential processing of events. See Parallel Processing below for more details.usingTrackingProcessors()
sets the default to Tracking Processors instead of Subscribing ones.SagaConfiguration
class. It provides static methods to initialize an instance either for Tracking Processing, or Subscribing.subscribingSagaManager()
method allows you to pass a (builder for an) EventProcessingStrategy
. By default, Sagas are invoked in synchronously. This can be made asynchronous using this method. However, using Tracking Processors is the preferred way for asynchronous invocation.TrackingProcessingConfiguration
has a few methods allowing you to specify how many segments will be created and which ThreadFactory should be used to create Processor threads. See Parallel Processing for more details.SagaConfiguration
class for full details on how to configure event handling for a Saga.InMemoryTokenStore
is used, which is not recommended in production.Configurer.registerComponent(TokenStore.class, conf -> ... create token store ...)
EventHandlingConfiguration
or SagaConfiguration
that defines that Processor. Where possible, it is recommended to use a Token Store that stores tokens in the same database as where the Event Handlers update the view models. This way, changes to the view model can be stored atomically with the changed tokens, guaranteeing exactly once processing semantics.TrackingEventProcessor
exposes processingStatus()
method, which returns a map where the key is the segment identifier, and the value is the event processing status. Based on this status we can determine whether the Processor is caught up and/or is replaying, and we can verify the Tracking Token of its segments.SequencingPolicy
for this. The SequencingPolicy
is essentially a function, that returns a value for any given message. If the return value of the SequencingPolicy
function is equal for two distinct event messages, it means that those messages must be processed sequentially. By default, Axon components will use the SequentialPerAggregatePolicy
, which makes it so that Events published by the same Aggregate instance will be handled sequentially.NoteNote that Subscribing Processors don't manage their own threads. Therefore, it is not possible to configure how they should receive their events. Effectively, they will always work on a sequential-per-aggregate basis, as that is generally the level of concurrency in the Command Handling component.
TokenStore
instance will use the JVM's name (usually a combination of the host name and process ID) as the default nodeId
. This can be overridden in TokenStore
implementations that support multi-node processing.Publisher
s to publish events to third-party messaging systems, and MessageSource
s to read events from these systems into your Axon based application.SpringAMQPPublisher
forwards events to an AMQP Exchange. It is initialized with a SubscribableMessageSource
, which is generally the EventBus
or EventStore
. Theoretically, this could be any source of Events that the publisher can Subscribe to.NoteNote that exchanges are not automatically created. You must still declare the Queues, Exchanges and Bindings you wish to use. Check the Spring documentation for more information.
SpringAMQPMessageSource
allows Event Processors to read messages from a Queue, instead of the Event Store or Event Bus. It acts as an adapter between Spring AMQP and the SubscribableMessageSource
needed by these processors.onMessage
method and annotates it with @RabbitListener
, as follows:@RabbitListener
annotation tells Spring that this method needs to be invoked for each message on the given Queue ('myQueue' in the example). This method simply invokes the super.onMessage()
method, which performs the actual publication of the Event to all the processors that have been subscribed to it.SpringAMQPMessageSource
instance to the constructor of the Subscribing Processor:axon-kafka
module is available on the classpath.axon-kafka
module is a new addition to the framework. Minor releases of the framework could include breaking changes to the APIs.KafkaPublisher
. Publication of the messages to Kafka will happen in the same thread (and Unit of Work) that published the events to the Event Bus.KafkaPublisher
takes a KafkaPublisherConfiguration
instance, which provides the different values and settings required to publish events to Kafka.DefaultProducerFactory
, which attempts to reuse created instances to avoid continuous creation of new ones. It's creation uses a similar builder pattern. The builder requires a configs
Map, which are the settings to use for the Kafka client, such as the Kafka instance locations. Please check the Kafka guide for the possible settings and their values.DefaultProducerFactory
needs to be shutDown
properly, to ensure all producer instances are properly closed.KafkaMessageSource
. This message source uses a Fetcher
to retrieve the actual messages from Kafka. You can either use the AsyncFetcher
, or provide your own.AsyncFetcher
is initialized using a builder, which requires the Kafka Configuration to initialize the client. Please check the Kafka guide for the possible settings and their values.AsyncFetcher
doesn't need to be explicitly started, as it will start when the first processors connect to it. It does need to be shut down, to ensure any thread pool or active connections are properly closed.DefaultKafkaMessageConverter
to convert an EventMessage
to a Kafka ProducerRecord
and an ConsumerRecord
back into an EventMessage
. This implementation already allows for some customization, such as how the Message
's MetaData
is mapped to Kafka headers. You can also choose which serializer should be used to fill the payload of the ProducerRecord
.KafkaMessageConverter
, and wire it into the KafkaPublisherConfiguration
and AsyncFetcher
:ProducerFactory
, or set axon.kafka.producer.transaction-id-prefix
in application.properties
to have auto configuration configure a ProducerFactory with Transactional semantics. In either case, application.properties
should provide the necessary Kafka Client properties, available under the axon.kafka
prefix. If none are provided, default settings are used, and localhost:9092
is used as the bootstrap server.KafkaMessageSource
, either provide a bean of type ConsumerFactory
, or provide the axon.kafka.consumer.group-id
setting in application.properties
. Also make sure all necessary Kafka Client Configuration properties are available under the axon.kafka
prefix.KafkaMessageSource
bean(s), in which case Axon will not create the default KafkaMessageSource.SubscribingProcessor
. To achieve this, the SubscribingProcessor
must be configured with an EventProcessingStrategy
. This strategy can be used to change how invocations of the Event Listeners should be managed.DirectEventProcessingStrategy
) invokes these handlers in the thread that delivers the Events. This allows processors to use existing transactions.AsynchronousEventProcessingStrategy
. It uses an Executor to asynchronously invoke the Event Listeners.AsynchronousEventProcessingStrategy
executes asynchronously, it is still desirable that certain events are processed sequentially. The SequencingPolicy
defines whether events must be handled sequentially, in parallel or a combination of both. Policies return a sequence identifier of a given event. If the policy returns an equal identifier for two events, this means that they must be handled sequentially by the event handler. A null
sequence identifier means the event may be processed in parallel with any other event.FullConcurrencyPolicy
will tell Axon that this event handler may handle all events concurrently. This means that there is no relationship between the events that require them to be processed in a particular order.SequentialPolicy
tells Axon that all events must be processed sequentially. Handling of an event will start when the handling of a previous event is finished.SequentialPerAggregatePolicy
will force domain events that were raised from the same aggregate to be handled sequentially. However, events from different aggregates may be handled concurrently. This is typically a suitable policy to use for event listeners that update details from aggregates in database tables.SequencingPolicy
interface. This interface defines a single method, getSequenceIdentifierFor
, that returns the sequence identifier for a given event. Events for which an equal sequence identifier is returned must be processed sequentially. Events that produce a different sequence identifier may be processed concurrently. For performance reasons, policy implementations should return null
if the event may be processed in parallel to any other event. This is faster, because Axon does not have to check for any restrictions on event processing.ErrorHandler
when using the AsynchronousEventProcessingStrategy
. The default ErrorHandler
propagates exceptions, but in an asynchronous execution, there is nothing to propagate to, other than the Executor. This may result in Events not being processed. Instead, it is recommended to use an ErrorHandler
that reports errors and allows processing to continue. The ErrorHandler
is configured on the constructor of the SubscribingEventProcessor
, where the EventProcessingStrategy
is also provided.TrackingEventProcessor
supports replaying of events. In order to achieve that, you should invoke the resetTokens()
method on it. It is important to know that the Tracking Event Processor must not be in active state when starting a reset. Hence it is wise to shut it down first, then reset it and once this was successful, start it up again. It is possible to define a @ResetHandler
, so you can do some preparation prior to resetting. Let's take a look how we can accomplish replaying. First, we'll see one simple projecting class:TrackingEventProcessor
:TrackingEventProcessor
, thus specifying from which point in the event log it should start replaying the events.TrackingEventProcessor
to the beginning of the event stream. As of version 3.3 functionality for starting a TrackingEventProcessor
from a custom position has been introduced. The TrackingEventProcessorConfiguration
provides the option to set an initial token for a given TrackingEventProcessor
through the andInitialTrackingToken(Function<StreamableMessageSource, TrackingToken>)
builder method. As an input parameter for the token builder function, we receive a StreamableMessageSource
which gives us three possibilities to build a token:createHeadToken()
. 2) From the tail of event stream: createTailToken()
. 3) From some point in time: createTokenAt(Instant)
and createTokenSince(Duration)
- Creates a token that tracks all events after given time. If there is an event exactly at the given time, it will be taken into account too.StreamableMessageSource
input parameter and create a token by yourself.TrackingEventProcessorConfiguration
with an initial token on "2007-12-03T10:15:30.00Z":EventBus
.EventBus
by doing the following:MessageHandlerInterceptor
interface. This interface declares one method, handle
, that takes three parameters: the (Event) Message, the current UnitOfWork
and an InterceptorChain
. The InterceptorChain
is used to continue the dispatching process, whereas the UnitOfWork
gives you (1) the message being handled and (2) provides the possibility to tie in logic prior, during or after (event) message handling (see UnitOfWork for more information about the phases).axonUser
as a value for the userId
field in the MetaData
. If the userId
is not present in the meta-data, an exception will be thrown which will prevent the Event from being handled. And if the userId
's value does not match axonUser
, we will also not proceed up the chain. Authenticating the Event Message like shown in this example is a regular use case of the MessageHandlerInterceptor
.EventProcessor
like so:NoteDifferent from theCommandBus
andQueryBus
, which both can have Handler and Dispatch Interceptors, theEventBus
can only have registered Dispatch Interceptors. This is the case because the Event publishing part, so the place which is in control of Event Message dispatching, is the sole purpose of the Event Bus. TheEventProcessor
s are in charge of handling the Event Messages, thus are the spot where the Handler Interceptors are registered.