Event Processing
The Events generated by the application need to be dispatched to the components that update the query databases, search engines or any other resources that need them: the Event Handlers. It is the responsibility of the Event Bus to dispatch Event Messages to all components interested. On the receiving end, Event Processors are responsible for handling those events, which includes invocation of the appropriate Event Handlers.
Publishing Events
In the vast majority of cases, the Aggregates will publish events by applying them. However, occasionally, it is necessary to publish an event (possibly from within another component), directly to the Event Bus. To publish an event, simply wrap the payload describing the event in an EventMessage
. The GenericEventMessage.asEventMessage(Object)
method allows you to wrap any object into an EventMessage
. If the passed object is already an EventMessage
, it is simply returned.
Event Bus
The EventBus
is the mechanism that dispatches events to the subscribed event handlers. Axon provides two implementation of the Event Bus: SimpleEventBus
and EmbeddedEventStore
. While both implementations support subscribing and tracking processors (see Events Processors below), the EmbeddedEventStore
persists events, which allows you to replay them at a later stage. The SimpleEventBus
has a volatile storage and 'forgets' events as soon as they have been published to subscribed components.
When using the Configuration API, the SimpleEventBus
is used by default. To configure the EmbeddedEventStore
instead, you need to supply an implementation of a StorageEngine, which does the actual storage of Events.
Event Processors
Event Handlers define the business logic to be performed when an Event is received. Event Processors are the components that take care of the technical aspects of that processing. They start a Unit of Work and possibly a transaction, but also ensure that correlation data can be correctly attached to all messages created during Event processing.
Event Processors come in roughly two forms: Subscribing and Tracking. The Subscribing Event Processors subscribe themselves to a source of Events and are invoked by the thread managed by the publishing mechanism. Tracking Event Processors, on the other hand, pull their messages from a source using a thread that it manages itself.
Assigning handlers to processors
All processors have a name, which identifies a processor instance across JVM instances. Two processors with the same name, can be considered as two instances of the same processor.
All Event Handlers are attached to a Processor whose name is the package name of the Event Handler's class.
For example, the following classes:
org.axonframework.example.eventhandling.MyHandler
,org.axonframework.example.eventhandling.MyOtherHandler
, andorg.axonframework.example.eventhandling.module.MyHandler
will trigger the creation of two Processors:
org.axonframework.example.eventhandling
with 2 handlers, andorg.axonframework.example.eventhandling.module
with a single handler
The Configuration API allows you to configure other strategies for assigning classes to processors, or even assign specific instances to specific processors.
Ordering Event Handlers within a single Event Processor
To order Event Handlers within an Event Processor, the ordering in which Event Handlers are registered (as described in the Registering Event Handlers section) is guiding. Thus, the ordering in which Event Handlers will be called by an Event Processor for Event Handling is their insertion ordering in the configuration API.
If Spring is selected as the mechanism to wire everything, the ordering of the Event Handlers can be specified by adding the @Order
annotation. This annotation should be placed on class level of your Event Handler class, adding a integer
value to specify the ordering.
Do note that it is not possible to order Event Handlers which are not a part of the same Event Processor.
Configuring processors
Processors take care of the technical aspects of handling an event, regardless of the business logic triggered by each event. However, the way "regular" (singleton, stateless) event handlers are Configured is slightly different from Sagas, as different aspects are important for both types of handlers.
Event Handlers
By default, Axon will use Subscribing Event Processors. It is possible to change how Handlers are assigned and how processors are configured using the EventHandlingConfiguration
class of the Configuration API.
The EventHandlingConfiguration
class defines a number of methods that can be used to define how processors need to be configured.
registerEventProcessorFactory
allows you to define a default factory method that creates Event Processors for which no explicit factories have been defined.registerEventProcessor(String name, EventProcessorBuilder builder)
defines the factory method to use to create a Processor with givenname
. Note that such Processor is only created ifname
is chosen as the processor for any of the available Event Handler beans.registerTrackingProcessor(String name)
defines that a processor with given name should be configured as a Tracking Event Processor, using default settings. It is configured with a TransactionManager and a TokenStore, both taken from the main configuration by default.registerTrackingProcessor(String name, Function<Configuration, TrackingEventProcessorConfiguration> processorConfiguration, Function<Configuration, SequencingPolicy<? super EventMessage<?>>> sequencingPolicy)
defines that a processor with given name should be configured as a Tracking Processor, and use the givenTrackingEventProcessorConfiguration
to read the configuration settings for multi-threading. TheSequencingPolicy
defines which expectations the processor has on sequential processing of events. See Parallel Processing below for more details.usingTrackingProcessors()
sets the default to Tracking Processors instead of Subscribing ones.
Sagas
Sagas are configured using the SagaConfiguration
class. It provides static methods to initialize an instance either for Tracking Processing, or Subscribing.
To configure a Saga to run in subscribing mode, simply do:
If you don't want to use the default EventBus / Store as source for this Saga to get its messages from, you can define another source of messages as well:
Another variant of the subscribingSagaManager()
method allows you to pass a (builder for an) EventProcessingStrategy
. By default, Sagas are invoked in synchronously. This can be made asynchronous using this method. However, using Tracking Processors is the preferred way for asynchronous invocation.
To configure a Saga to use a Tracking Processor, simply do:
This will set the default properties, meaning a single Thread is used to process events. To change this:
The TrackingProcessingConfiguration
has a few methods allowing you to specify how many segments will be created and which ThreadFactory should be used to create Processor threads. See Parallel Processing for more details.
If you are using Spring, you can configure your saga like this:
Check out the API documentation (JavaDoc) of the SagaConfiguration
class for full details on how to configure event handling for a Saga.
Token Store
Tracking Processors, unlike Subscribing ones, need a Token Store to store their progress in. Each message a Tracking Processor receives through its Event Stream is accompanied by a Token. This Token allows the processor to reopen the Stream at any later point, picking up where it left off with the last Event.
The Configuration API takes the Token Store, as well as most other components Processors need from the Global Configuration instance. If no TokenStore is explicitly defined, an InMemoryTokenStore
is used, which is not recommended in production.
To configure a different Token Store, use Configurer.registerComponent(TokenStore.class, conf -> ... create token store ...)
Note that you can override the TokenStore to use with Tracking Processors in the respective EventHandlingConfiguration
or SagaConfiguration
that defines that Processor. Where possible, it is recommended to use a Token Store that stores tokens in the same database as where the Event Handlers update the view models. This way, changes to the view model can be stored atomically with the changed tokens, guaranteeing exactly once processing semantics.
Event Tracker Status
In some cases it might be useful to know the state of a Tracking Event Processor for each of its segment. One of those cases could be when we want to rebuild our view model and we want to check when the Processor is caught up with all the events. For cases like these, the TrackingEventProcessor
exposes processingStatus()
method, which returns a map where the key is the segment identifier, and the value is the event processing status. Based on this status we can determine whether the Processor is caught up and/or is replaying, and we can verify the Tracking Token of its segments.
Parallel Processing
As of Axon Framework 3.1, Tracking Processors can use multiple threads to process an Event Stream. They do so, by claiming a so-called segment, identifier by a number. Normally, a single thread will process a single Segment.
The number of Segments used can be defined. When a Processor starts for the first time, it can initialize a number of segments. This number defines the maximum number of threads that can process events simultaneously. Each node running of a TrackingProcessor will attempt to start its configured amount of Threads, to start processing these.
Event Handlers may have specific expectations on the ordering of events. If this is the case, the processor must ensure these events are sent to these Handlers in that specific order. Axon uses the SequencingPolicy
for this. The SequencingPolicy
is essentially a function, that returns a value for any given message. If the return value of the SequencingPolicy
function is equal for two distinct event messages, it means that those messages must be processed sequentially. By default, Axon components will use the SequentialPerAggregatePolicy
, which makes it so that Events published by the same Aggregate instance will be handled sequentially.
A Saga instance is never invoked concurrently by multiple threads. Therefore, a Sequencing Policy for a Saga is irrelevant. Axon will ensure each Saga instance receives the Events it needs to process in the order they have been published on the Event Bus.
Note
Note that Subscribing Processors don't manage their own threads. Therefore, it is not possible to configure how they should receive their events. Effectively, they will always work on a sequential-per-aggregate basis, as that is generally the level of concurrency in the Command Handling component.
Multi-node processing
For tracking processors, it doesn't matter whether the Threads handling the events are all running on the same node, or on different nodes hosting the same (logical) TrackingProcessor. When two instances of TrackingProcessor, having the same name, are active on different machines, they are considered two instances of the same logical processor. They will 'compete' for segments of the Event Stream. Each instance will 'claim' a segment, preventing events assigned to that segment from being processed on the other nodes.
The TokenStore
instance will use the JVM's name (usually a combination of the host name and process ID) as the default nodeId
. This can be overridden in TokenStore
implementations that support multi-node processing.
Distributing Events
In some cases, it is necessary to publish events to an external system, such as a message broker. Axon uses Publisher
s to publish events to third-party messaging systems, and MessageSource
s to read events from these systems into your Axon based application.
At the moment, there is support for publishing (and reading) events via Spring AMQP and Kafka.
Spring AMQP
Axon provides out-of-the-box support to transfer Events to and from an AMQP message broker, such as Rabbit MQ.
Forwarding events to an AMQP Exchange
The SpringAMQPPublisher
forwards events to an AMQP Exchange. It is initialized with a SubscribableMessageSource
, which is generally the EventBus
or EventStore
. Theoretically, this could be any source of Events that the publisher can Subscribe to.
To configure the SpringAMQPPublisher, simply define an instance as a Spring Bean. There is a number of setter methods that allow you to specify the behavior you expect, such as Transaction support, publisher acknowledgements (if supported by the broker), and the exchange name.
The default exchange name is 'Axon.EventBus'.
Note
Note that exchanges are not automatically created. You must still declare the Queues, Exchanges and Bindings you wish to use. Check the Spring documentation for more information.
Reading Events from an AMQP Queue
Spring has extensive support for reading messages from an AMQP Queue. However, this needs to be 'bridged' to Axon, so that these messages can be handled from Axon as if they are regular Event Messages.
The SpringAMQPMessageSource
allows Event Processors to read messages from a Queue, instead of the Event Store or Event Bus. It acts as an adapter between Spring AMQP and the SubscribableMessageSource
needed by these processors.
The easiest way to configure the SpringAMQPMessageSource, is by defining a bean which overrides the default onMessage
method and annotates it with @RabbitListener
, as follows:
Spring's @RabbitListener
annotation tells Spring that this method needs to be invoked for each message on the given Queue ('myQueue' in the example). This method simply invokes the super.onMessage()
method, which performs the actual publication of the Event to all the processors that have been subscribed to it.
To subscribe Processors to this MessageSource, pass the correct SpringAMQPMessageSource
instance to the constructor of the Subscribing Processor:
Note that Tracking Processors are not compatible with the SpringAMQPMessageSource.
Apache Kafka
Kafka is a very popular system for publishing and consuming events. It's architecture is fundamentally different from most messaging systems, and combines speed with reliability.
To use the Kafka components from Axon, make sure the axon-kafka
module is available on the classpath.
The axon-kafka
module is a new addition to the framework. Minor releases of the framework could include breaking changes to the APIs.
Publishing Events to a Kafka topic
When Event Messages are published to an Event Bus (or Event Store), they can be forwarded to a Kafka topic using the KafkaPublisher
. Publication of the messages to Kafka will happen in the same thread (and Unit of Work) that published the events to the Event Bus.
The KafkaPublisher
takes a KafkaPublisherConfiguration
instance, which provides the different values and settings required to publish events to Kafka.
Axon provides a DefaultProducerFactory
, which attempts to reuse created instances to avoid continuous creation of new ones. It's creation uses a similar builder pattern. The builder requires a configs
Map, which are the settings to use for the Kafka client, such as the Kafka instance locations. Please check the Kafka guide for the possible settings and their values.
Note that the DefaultProducerFactory
needs to be shutDown
properly, to ensure all producer instances are properly closed.
Consuming Events from a Kafka topic
Messages can be consumed by Tracking Event Processors by configuring a KafkaMessageSource
. This message source uses a Fetcher
to retrieve the actual messages from Kafka. You can either use the AsyncFetcher
, or provide your own.
The AsyncFetcher
is initialized using a builder, which requires the Kafka Configuration to initialize the client. Please check the Kafka guide for the possible settings and their values.
The AsyncFetcher
doesn't need to be explicitly started, as it will start when the first processors connect to it. It does need to be shut down, to ensure any thread pool or active connections are properly closed.
Customizing message format
By default, Axon uses the DefaultKafkaMessageConverter
to convert an EventMessage
to a Kafka ProducerRecord
and an ConsumerRecord
back into an EventMessage
. This implementation already allows for some customization, such as how the Message
's MetaData
is mapped to Kafka headers. You can also choose which serializer should be used to fill the payload of the ProducerRecord
.
For further customization, you can implement your own KafkaMessageConverter
, and wire it into the KafkaPublisherConfiguration
and AsyncFetcher
:
Configuration in Spring Boot
Axon will automatically provide certain Kafka related components based on the availability of beans and/or properties.
To enable a KafkaPublisher, either provide a bean of type ProducerFactory
, or set axon.kafka.producer.transaction-id-prefix
in application.properties
to have auto configuration configure a ProducerFactory with Transactional semantics. In either case, application.properties
should provide the necessary Kafka Client properties, available under the axon.kafka
prefix. If none are provided, default settings are used, and localhost:9092
is used as the bootstrap server.
To enable a KafkaMessageSource
, either provide a bean of type ConsumerFactory
, or provide the axon.kafka.consumer.group-id
setting in application.properties
. Also make sure all necessary Kafka Client Configuration properties are available under the axon.kafka
prefix.
Alternatively, you may provide your own KafkaMessageSource
bean(s), in which case Axon will not create the default KafkaMessageSource.
Asynchronous Event Processing
The recommended approach to handle Events asynchronously is by using a Tracking Event Processor. This implementation can guarantee processing of all events, even in case of a system failure (assuming the Events have been persisted).
However, it is also possible to handle Events asynchronously in a SubscribingProcessor
. To achieve this, the SubscribingProcessor
must be configured with an EventProcessingStrategy
. This strategy can be used to change how invocations of the Event Listeners should be managed.
The default strategy (DirectEventProcessingStrategy
) invokes these handlers in the thread that delivers the Events. This allows processors to use existing transactions.
The other Axon-provided strategy is the AsynchronousEventProcessingStrategy
. It uses an Executor to asynchronously invoke the Event Listeners.
Even though the AsynchronousEventProcessingStrategy
executes asynchronously, it is still desirable that certain events are processed sequentially. The SequencingPolicy
defines whether events must be handled sequentially, in parallel or a combination of both. Policies return a sequence identifier of a given event. If the policy returns an equal identifier for two events, this means that they must be handled sequentially by the event handler. A null
sequence identifier means the event may be processed in parallel with any other event.
Axon provides a number of common policies you can use:
The
FullConcurrencyPolicy
will tell Axon that this event handler may handle all events concurrently. This means that there is no relationship between the events that require them to be processed in a particular order.The
SequentialPolicy
tells Axon that all events must be processed sequentially. Handling of an event will start when the handling of a previous event is finished.SequentialPerAggregatePolicy
will force domain events that were raised from the same aggregate to be handled sequentially. However, events from different aggregates may be handled concurrently. This is typically a suitable policy to use for event listeners that update details from aggregates in database tables.
Besides these provided policies, you can define your own. All policies must implement the SequencingPolicy
interface. This interface defines a single method, getSequenceIdentifierFor
, that returns the sequence identifier for a given event. Events for which an equal sequence identifier is returned must be processed sequentially. Events that produce a different sequence identifier may be processed concurrently. For performance reasons, policy implementations should return null
if the event may be processed in parallel to any other event. This is faster, because Axon does not have to check for any restrictions on event processing.
It is recommended to explicitly define an ErrorHandler
when using the AsynchronousEventProcessingStrategy
. The default ErrorHandler
propagates exceptions, but in an asynchronous execution, there is nothing to propagate to, other than the Executor. This may result in Events not being processed. Instead, it is recommended to use an ErrorHandler
that reports errors and allows processing to continue. The ErrorHandler
is configured on the constructor of the SubscribingEventProcessor
, where the EventProcessingStrategy
is also provided.
Replaying events
In cases when you want to rebuild projections (view models), replaying past events comes in handy. The idea is to start from the beginning of time and invoke all event handlers anew. The TrackingEventProcessor
supports replaying of events. In order to achieve that, you should invoke the resetTokens()
method on it. It is important to know that the Tracking Event Processor must not be in active state when starting a reset. Hence it is wise to shut it down first, then reset it and once this was successful, start it up again. It is possible to define a @ResetHandler
, so you can do some preparation prior to resetting. Let's take a look how we can accomplish replaying. First, we'll see one simple projecting class:
And now, we can reset our TrackingEventProcessor
:
(1) Since release 3.3 of the framework it is possible to provide a token position to be used when resetting a TrackingEventProcessor
, thus specifying from which point in the event log it should start replaying the events.
Custom tracking token position
Prior to Axon release 3.3, you could only reset a TrackingEventProcessor
to the beginning of the event stream. As of version 3.3 functionality for starting a TrackingEventProcessor
from a custom position has been introduced. The TrackingEventProcessorConfiguration
provides the option to set an initial token for a given TrackingEventProcessor
through the andInitialTrackingToken(Function<StreamableMessageSource, TrackingToken>)
builder method. As an input parameter for the token builder function, we receive a StreamableMessageSource
which gives us three possibilities to build a token:
1) From the head of event stream: createHeadToken()
. 2) From the tail of event stream: createTailToken()
. 3) From some point in time: createTokenAt(Instant)
and createTokenSince(Duration)
- Creates a token that tracks all events after given time. If there is an event exactly at the given time, it will be taken into account too.
Of course, you can completely disregard the StreamableMessageSource
input parameter and create a token by yourself.
Below we can see an example of creating a TrackingEventProcessorConfiguration
with an initial token on "2007-12-03T10:15:30.00Z":
Event Interceptors
Similarly as with Command Messages, Event Messages can also be intercepted prior to publishing and handling to perform additional actions on all Events. This thus boils down to same two types of interceptors for messages: the Dispatch- and the Handler Interceptor.
Dispatch Interceptors are invoked before a Event (Message) is published on the Event Bus. Handler Interceptors on the other hand are invoked just before the Event Handler is invoked with a given Event (Message) in the Event Processor. Examples of operations performed in an interceptor are logging or authentication, which you might want to do regardless of the type of Event.
Dispatch Interceptors
Any Message Dispatch Interceptors registered to an Event Bus will be invoked when an Event is published. They have the ability to alter the Event Message, by adding Meta Data for example, or they can provide you with overall logging capabilities for when an Event is published. These interceptors are always invoked on the thread that published the Event.
Let's create an Event Message Dispatch Interceptor which logs each Event message being published on an EventBus
.
We can then register this dispatch interceptor with an EventBus
by doing the following:
Handler Interceptors
Message Handler Interceptors can take action both before and after Event processing. Interceptors can even block Event processing altogether, for example for security reasons.
Interceptors must implement the MessageHandlerInterceptor
interface. This interface declares one method, handle
, that takes three parameters: the (Event) Message, the current UnitOfWork
and an InterceptorChain
. The InterceptorChain
is used to continue the dispatching process, whereas the UnitOfWork
gives you (1) the message being handled and (2) provides the possibility to tie in logic prior, during or after (event) message handling (see UnitOfWork for more information about the phases).
Unlike Dispatch Interceptors, Handler Interceptors are invoked in the context of the Event Handler. That means they can attach correlation data based on the Message being handled to the Unit of Work, for example. This correlation data will then be attached to Event Messages being created in the context of that Unit of Work.
Let's create a Message Handler Interceptor which will only allow the handling of Events that contain axonUser
as a value for the userId
field in the MetaData
. If the userId
is not present in the meta-data, an exception will be thrown which will prevent the Event from being handled. And if the userId
's value does not match axonUser
, we will also not proceed up the chain. Authenticating the Event Message like shown in this example is a regular use case of the MessageHandlerInterceptor
.
We can register the handler interceptor with an EventProcessor
like so:
Note
Different from the
CommandBus
andQueryBus
, which both can have Handler and Dispatch Interceptors, theEventBus
can only have registered Dispatch Interceptors. This is the case because the Event publishing part, so the place which is in control of Event Message dispatching, is the sole purpose of the Event Bus. TheEventProcessor
s are in charge of handling the Event Messages, thus are the spot where the Handler Interceptors are registered.
Last updated