GenericEventMessage.asEventMessage(Object)method allows you to wrap any object into an
EventMessage. If the passed object is already an
EventMessage, it is simply returned.
EventBusis the mechanism that dispatches events to the subscribed event handlers. Axon provides two implementation of the Event Bus:
EmbeddedEventStore. While both implementations support subscribing and tracking processors (see Events Processors below), the
EmbeddedEventStorepersists events, which allows you to replay them at a later stage. The
SimpleEventBushas a volatile storage and 'forgets' events as soon as they have been published to subscribed components.
SimpleEventBusis used by default. To configure the
EmbeddedEventStoreinstead, you need to supply an implementation of a StorageEngine, which does the actual storage of Events.
org.axonframework.example.eventhandlingwith 2 handlers, and
org.axonframework.example.eventhandling.modulewith a single handler
@Orderannotation. This annotation should be placed on class level of your Event Handler class, adding a
integervalue to specify the ordering.
EventHandlingConfigurationclass of the Configuration API.
EventHandlingConfigurationclass defines a number of methods that can be used to define how processors need to be configured.
registerEventProcessorFactoryallows you to define a default factory method that creates Event Processors for which no explicit factories have been defined.
registerEventProcessor(String name, EventProcessorBuilder builder)defines the factory method to use to create a Processor with given
name. Note that such Processor is only created if
nameis chosen as the processor for any of the available Event Handler beans.
registerTrackingProcessor(String name)defines that a processor with given name should be configured as a Tracking Event Processor, using default settings. It is configured with a TransactionManager and a TokenStore, both taken from the main configuration by default.
registerTrackingProcessor(String name, Function<Configuration, TrackingEventProcessorConfiguration> processorConfiguration, Function<Configuration, SequencingPolicy<? super EventMessage<?>>> sequencingPolicy)defines that a processor with given name should be configured as a Tracking Processor, and use the given
TrackingEventProcessorConfigurationto read the configuration settings for multi-threading. The
SequencingPolicydefines which expectations the processor has on sequential processing of events. See Parallel Processing below for more details.
usingTrackingProcessors()sets the default to Tracking Processors instead of Subscribing ones.
SagaConfigurationclass. It provides static methods to initialize an instance either for Tracking Processing, or Subscribing.
subscribingSagaManager()method allows you to pass a (builder for an)
EventProcessingStrategy. By default, Sagas are invoked in synchronously. This can be made asynchronous using this method. However, using Tracking Processors is the preferred way for asynchronous invocation.
TrackingProcessingConfigurationhas a few methods allowing you to specify how many segments will be created and which ThreadFactory should be used to create Processor threads. See Parallel Processing for more details.
SagaConfigurationclass for full details on how to configure event handling for a Saga.
InMemoryTokenStoreis used, which is not recommended in production.
Configurer.registerComponent(TokenStore.class, conf -> ... create token store ...)
SagaConfigurationthat defines that Processor. Where possible, it is recommended to use a Token Store that stores tokens in the same database as where the Event Handlers update the view models. This way, changes to the view model can be stored atomically with the changed tokens, guaranteeing exactly once processing semantics.
processingStatus()method, which returns a map where the key is the segment identifier, and the value is the event processing status. Based on this status we can determine whether the Processor is caught up and/or is replaying, and we can verify the Tracking Token of its segments.
SequencingPolicyfor this. The
SequencingPolicyis essentially a function, that returns a value for any given message. If the return value of the
SequencingPolicyfunction is equal for two distinct event messages, it means that those messages must be processed sequentially. By default, Axon components will use the
SequentialPerAggregatePolicy, which makes it so that Events published by the same Aggregate instance will be handled sequentially.
NoteNote that Subscribing Processors don't manage their own threads. Therefore, it is not possible to configure how they should receive their events. Effectively, they will always work on a sequential-per-aggregate basis, as that is generally the level of concurrency in the Command Handling component.
TokenStoreinstance will use the JVM's name (usually a combination of the host name and process ID) as the default
nodeId. This can be overridden in
TokenStoreimplementations that support multi-node processing.
Publishers to publish events to third-party messaging systems, and
MessageSources to read events from these systems into your Axon based application.
SpringAMQPPublisherforwards events to an AMQP Exchange. It is initialized with a
SubscribableMessageSource, which is generally the
EventStore. Theoretically, this could be any source of Events that the publisher can Subscribe to.
NoteNote that exchanges are not automatically created. You must still declare the Queues, Exchanges and Bindings you wish to use. Check the Spring documentation for more information.
SpringAMQPMessageSourceallows Event Processors to read messages from a Queue, instead of the Event Store or Event Bus. It acts as an adapter between Spring AMQP and the
SubscribableMessageSourceneeded by these processors.
onMessagemethod and annotates it with
@RabbitListener, as follows:
@RabbitListenerannotation tells Spring that this method needs to be invoked for each message on the given Queue ('myQueue' in the example). This method simply invokes the
super.onMessage()method, which performs the actual publication of the Event to all the processors that have been subscribed to it.
SpringAMQPMessageSourceinstance to the constructor of the Subscribing Processor:
axon-kafkamodule is available on the classpath.
KafkaPublisher. Publication of the messages to Kafka will happen in the same thread (and Unit of Work) that published the events to the Event Bus.
KafkaPublisherConfigurationinstance, which provides the different values and settings required to publish events to Kafka.
DefaultProducerFactory, which attempts to reuse created instances to avoid continuous creation of new ones. It's creation uses a similar builder pattern. The builder requires a
configsMap, which are the settings to use for the Kafka client, such as the Kafka instance locations. Please check the Kafka guide for the possible settings and their values.
DefaultProducerFactoryneeds to be
shutDownproperly, to ensure all producer instances are properly closed.
KafkaMessageSource. This message source uses a
Fetcherto retrieve the actual messages from Kafka. You can either use the
AsyncFetcher, or provide your own.
AsyncFetcheris initialized using a builder, which requires the Kafka Configuration to initialize the client. Please check the Kafka guide for the possible settings and their values.
AsyncFetcherdoesn't need to be explicitly started, as it will start when the first processors connect to it. It does need to be shut down, to ensure any thread pool or active connections are properly closed.
DefaultKafkaMessageConverterto convert an
EventMessageto a Kafka
ConsumerRecordback into an
EventMessage. This implementation already allows for some customization, such as how the
MetaDatais mapped to Kafka headers. You can also choose which serializer should be used to fill the payload of the
KafkaMessageConverter, and wire it into the
ProducerFactory, or set
application.propertiesto have auto configuration configure a ProducerFactory with Transactional semantics. In either case,
application.propertiesshould provide the necessary Kafka Client properties, available under the
axon.kafkaprefix. If none are provided, default settings are used, and
localhost:9092is used as the bootstrap server.
KafkaMessageSource, either provide a bean of type
ConsumerFactory, or provide the
application.properties. Also make sure all necessary Kafka Client Configuration properties are available under the
KafkaMessageSourcebean(s), in which case Axon will not create the default KafkaMessageSource.
SubscribingProcessor. To achieve this, the
SubscribingProcessormust be configured with an
EventProcessingStrategy. This strategy can be used to change how invocations of the Event Listeners should be managed.
DirectEventProcessingStrategy) invokes these handlers in the thread that delivers the Events. This allows processors to use existing transactions.
AsynchronousEventProcessingStrategy. It uses an Executor to asynchronously invoke the Event Listeners.
AsynchronousEventProcessingStrategyexecutes asynchronously, it is still desirable that certain events are processed sequentially. The
SequencingPolicydefines whether events must be handled sequentially, in parallel or a combination of both. Policies return a sequence identifier of a given event. If the policy returns an equal identifier for two events, this means that they must be handled sequentially by the event handler. A
nullsequence identifier means the event may be processed in parallel with any other event.
FullConcurrencyPolicywill tell Axon that this event handler may handle all events concurrently. This means that there is no relationship between the events that require them to be processed in a particular order.
SequentialPolicytells Axon that all events must be processed sequentially. Handling of an event will start when the handling of a previous event is finished.
SequentialPerAggregatePolicywill force domain events that were raised from the same aggregate to be handled sequentially. However, events from different aggregates may be handled concurrently. This is typically a suitable policy to use for event listeners that update details from aggregates in database tables.
SequencingPolicyinterface. This interface defines a single method,
getSequenceIdentifierFor, that returns the sequence identifier for a given event. Events for which an equal sequence identifier is returned must be processed sequentially. Events that produce a different sequence identifier may be processed concurrently. For performance reasons, policy implementations should return
nullif the event may be processed in parallel to any other event. This is faster, because Axon does not have to check for any restrictions on event processing.
ErrorHandlerwhen using the
AsynchronousEventProcessingStrategy. The default
ErrorHandlerpropagates exceptions, but in an asynchronous execution, there is nothing to propagate to, other than the Executor. This may result in Events not being processed. Instead, it is recommended to use an
ErrorHandlerthat reports errors and allows processing to continue. The
ErrorHandleris configured on the constructor of the
SubscribingEventProcessor, where the
EventProcessingStrategyis also provided.
TrackingEventProcessorsupports replaying of events. In order to achieve that, you should invoke the
resetTokens()method on it. It is important to know that the Tracking Event Processor must not be in active state when starting a reset. Hence it is wise to shut it down first, then reset it and once this was successful, start it up again. It is possible to define a
@ResetHandler, so you can do some preparation prior to resetting. Let's take a look how we can accomplish replaying. First, we'll see one simple projecting class:
TrackingEventProcessor, thus specifying from which point in the event log it should start replaying the events.
TrackingEventProcessorto the beginning of the event stream. As of version 3.3 functionality for starting a
TrackingEventProcessorfrom a custom position has been introduced. The
TrackingEventProcessorConfigurationprovides the option to set an initial token for a given
andInitialTrackingToken(Function<StreamableMessageSource, TrackingToken>)builder method. As an input parameter for the token builder function, we receive a
StreamableMessageSourcewhich gives us three possibilities to build a token:
createHeadToken(). 2) From the tail of event stream:
createTailToken(). 3) From some point in time:
createTokenSince(Duration)- Creates a token that tracks all events after given time. If there is an event exactly at the given time, it will be taken into account too.
StreamableMessageSourceinput parameter and create a token by yourself.
TrackingEventProcessorConfigurationwith an initial token on "2007-12-03T10:15:30.00Z":
EventBusby doing the following:
MessageHandlerInterceptorinterface. This interface declares one method,
handle, that takes three parameters: the (Event) Message, the current
InterceptorChainis used to continue the dispatching process, whereas the
UnitOfWorkgives you (1) the message being handled and (2) provides the possibility to tie in logic prior, during or after (event) message handling (see UnitOfWork for more information about the phases).
axonUseras a value for the
userIdfield in the
MetaData. If the
userIdis not present in the meta-data, an exception will be thrown which will prevent the Event from being handled. And if the
userId's value does not match
axonUser, we will also not proceed up the chain. Authenticating the Event Message like shown in this example is a regular use case of the
NoteDifferent from the
QueryBus, which both can have Handler and Dispatch Interceptors, the
EventBuscan only have registered Dispatch Interceptors. This is the case because the Event publishing part, so the place which is in control of Event Message dispatching, is the sole purpose of the Event Bus. The
EventProcessors are in charge of handling the Event Messages, thus are the spot where the Handler Interceptors are registered.