Event Processors
Event handlers define the business logic to be performed when an event is received. Event Processors are the components that take care of the technical aspects of that processing. They start a unit of work and possibly a transaction. However, they also ensure that correlation data can be correctly attached to all messages created during event processing, among other non-functional requirements.
The image below depicts a representation of the organization of Event Processors and Event Handlers:
Axon has a layered approach towards organizing event handlers. First, an event handler is positioned in a Processing Group. Each event handler, or "Event Handling Component," will only ever belong to a single Processing Group. The Processing Group provides a level of configurable non-functional requirements, like error handling and the sequencing policy.
The Event Processors, in turn, is in charge of the Processing Group. An Event Processor will control 1 to N Processing Groups, although there will be a one-to-one mapping in most cases. Similar to the Event Handling Component, a Processing Group will belong to a single Event Processor. This last layer allows the definition of the type of Event Processor used and concepts like the threading model and a more fine-grained degree of error handling.
Event Processors come in roughly two forms: Subscribing and Streaming.
Subscribing Event Processors subscribe to a source of events and are invoked by the thread managed by the publishing mechanism. Streaming Event Processors, on the other hand, pull their messages from a source using a thread that it manages itself.
For more specifics on either type, consult their respective sections here and here. The rest of this page dedicates itself to describing the Event Processor's common concepts and configuration options. Note that throughout, the EventProcessingConfigurer
is used. The EventProcessingConfigurer
is part of Axon's Configuration API, dedicated to configuring Event Processors.
Assigning handlers to processors
All processors have a name, which identifies a processor instance across JVM instances. Two processors with the same name are considered as two instances of the same processor.
All event handlers are attached to a processor whose name by default is the package name of the event handler's class. Furthermore, the default processor implementation used by Axon is the Tracking Event Processor. The (default) event processor used can be adjusted, as is shown in the subscribing and streaming sections.
Event handlers, or Event Handling Components, come in roughly two flavors: "regular" (singleton, stateless) event handlers and sagas. This section describes the process to register an event handler, whereas this page describes the saga registration process.
Now let us consider that the following event handlers have been registered:
org.axonframework.example.eventhandling.MyHandler
org.axonframework.example.eventhandling.MyOtherHandler
org.axonframework.example.eventhandling.module.ModuleHandler
Without any intervention, this will trigger the creation of two processors, namely:
org.axonframework.example.eventhandling
with two handlers calledMyHandler
andMyOtherHandler
org.axonframework.example.eventhandling.module
with the single handlerModuleHandler
Using the package name serves as a suitable default, but using dedicated names for an Event Processor and/or the Processing Group is recommended. The most straightforward approach to reaching a transparent naming scheme of your event handlers is by using the ProcessingGroup
annotation. This annotation resembles the Processing Group level discussed in the introduction.
The ProcessingGroup
annotation requires the insertion of a name and can only be set on the class. Let us adjust the previous sample by using this annotation instead of the package names for grouping handlers:
Using the ProcessingGroup
annotation as depicted, we again construct two processors:
my-handlers
with two handlers calledMyHandler
andMyOtherHandler
module-handlers
with the single handlerModuleHandler
If more control is required to group Event Handling Components, we recommend consulting the assignment rules section.
Event Handler Assignment Rules
The Configuration API allows you to configure other strategies for assigning event handling classes to processors or assigning specific handler instances to particular processors. We can separate these assignment rules into roughly two groups: Event Handler to Processing Group and Processing Group to Event Processor. Below is an exhaustive list of all the assignment rules the EventProcessingConfigurer
exposes:
Event Handler to Processing Group
byDefaultAssignTo(String)
- defines the default Processing Group name to assign an event handler to.It will only be taken into account if there are no more specifics rules and if the
ProcessingGroup
annotation is not present.byDefaultAssignHandlerInstancesTo(Function<Object, String>)
- defines a lambda invoked to assign an event handling instance to a desired Processing Group by returning that group's name.It will only be taken into account if there are no more specifics rules and if the
ProcessingGroup
annotation is not present.byDefaultAssignHandlerTypesTo(Function<Class<?>, String>)
- defines a lambda invoked to assign an event handler type to a desired Processing Group by returning that group's name.It will only be taken into account if there are no more specifics rules and if the
ProcessingGroup
annotation is not present.assignHandlerInstancesMatching(String, Predicate<Object>)
- assigns event handlers to the given Processing Group name based on a predicate ingesting an event handling instance.The operation uses a natural priority of zero. If an instance matches several criteria, the outcome is undefined.
assignHandlerTypesMatching(String, Predicate<Class<?>>)
- assigns event handlers to the given Processing Group name based on a predicate ingesting an event handler type.The operation uses a natural priority of zero. If an instance matches several criteria, the outcome is undefined.
assignHandlerInstancesMatching(String, int, Predicate<Object>)
- assigns event handlers to the given Processing Group name based on a predicate ingesting an event handling instance.Uses the given priority to decide on rule-ordering. The higher the priority value, the more important the rule is.
If an instance matches several criteria, the outcome is undefined.
assignHandlerTypesMatching(String, int, Predicate<Class<?>>)
- assigns event handlers to the given Processing Group name based on a predicate ingesting an event handler type.Uses the given priority to decide on rule-ordering. The higher the priority, the more important the rule is.
If an instance matches several criteria, the outcome is undefined.
Processing Group to Event Processor
assignProcessingGroup(String, String)
- defines a given Processing Group name that belongs to the given Event Processor's name.assignProcessingGroup(Function<String, String>)
- defines a lambda invoked to assign a Processing Group name to the desired Event Processor by returning that processor's name.
Ordering Event Handlers within a processor
To order event handlers within an Event Processor, the order in which event handlers are registered (as described in the Registering Event Handlers section) is guiding. Thus, the ordering in which an Event Processor will call event handlers for event handling is the same as their insertion ordering in the Configuration API.
If we use Spring as the mechanism for wiring everything, we can explicitly specify the event handler component ordering by adding the @Order
annotation. This annotation is placed on the event handler class name, containing an integer
value to specify the ordering.
Note that it is not possible to order event handlers belonging to different Event Processors. Each Event Processor acts as an isolated component without any intervention from other Event Processors.
Ordering Considerations
Although we can place an order among event handlers within an Event Processor, separation of event handlers is recommended.
Placing an overall ordering on event handlers means those components are inclined to interact with one another, introducing a form of coupling. Due to this, the event handling process will become complex to manage (e.g., for new team members). Furthermore, embracing an ordering approach might lead to place all event handlers in a global ordering, decreasing processing speeds in general.
In all, you are free to use an ordering, but we recommend using it sparingly.
Error Handling
Errors are inevitable in any application. Depending on where they happen, you may want to respond differently.
By default, exceptions raised by event handlers are caught in the Processing Group layer, logged, and processing continues with the following events. When an exception is thrown when a processor is trying to commit a transaction, update a token, or in any other part of the process, the exception will be propagated.
In the case of a Streaming Event Processor, this means the processor will go into error mode, releasing any tokens and retrying at an incremental interval (starting at 1 second, up to max 60 seconds). A Subscribing Event Processor will report a publication error to the component that provided the event.
To change this behavior, both the Processing Group and Event Processor level allow customization on how to deal with exceptions:
Processing Group - Listener Invocation Error Handler
The component dealing with exceptions thrown from an event handling method is called the ListenerInvocationErrorHandler
. By default, these exceptions are logged (with the LoggingErrorHandler
implementation), and processing continues with the next handler or message.
The default ListenerInvocationErrorHandler
used by each processing group can be customized. Furthermore, we can configure the error handling behavior per processing group:
It is easy to implement custom error handling behavior. The error handling method to implement provides the exception, the event that was handled, and a reference to the handler that was handling the message:
You can choose to retry, ignore or rethrow the exception. The exception will bubble up to the Event Processor level when rethrown.
Event Processor - Error Handler
Exceptions occurring outside an event handler's scope, or have bubbled up from there, are handled by the ErrorHandler
. The default error handler is the PropagatingErrorHandler
, which will rethrow any exceptions it catches.
How the Event Processor deals with a rethrown exception differ per implementation. The behaviour for the Subscribing- and the Streaming Event Processor can respectively be found here and here.
We can configure a default ErrorHandler
for all Event Processors or an ErrorHandler
for specific processors:
For providing a custom solution, the ErrorHandler
's single method needs to be implemented:
Based on the provided ErrorContext
object, you can decide to ignore the error, schedule retries, perform dead-letter-queue delivery, or rethrow the exception.
General processor configuration
Alongside handler assignment and error handling, Event Processors allow configuration for other components too. For Subscribing and Streaming Event Processor specific options, their respective sections should be checked. The remainder of this page will cover the generic configuration options for each Event Processor.
Event Processor Builders
The EventProcessingConfigurer
provides access to a lot of configurable components for Event Processors. Sometimes it is easier or preferable to provide an entire function to construct an Event Processor, however. To that end, we can configure a custom EventProcessorBuilder
:
The EventProcessorBuilder
functional interface provides the event processor's name, the Configuration
and the EventHandlerInvoker
, and requires returning an EventProcessor
instance. Note that any Axon component that an Event Processor requires (e.g., an EventStore
) is retrievable from the Configuration
.
The EventProcessingConfigurer
provides two methods to configure an EventProcessorBuilder
:
registerEventProcessorFactory(EventProcessorBuilder)
- allows you to define a default factory method that creates event processors for which no explicit factories are definedregisterEventProcessor(String, EventProcessorBuilder)
- defines the factory method to use to create a processor with givenname
Event Handler Interceptors
Since the Event Processor is the invoker of event handling methods, it is a spot to configure Message Handler Interceptors too. Since Event Processors are dedicated to event handling, the MessageHandlerInterceptor
is required to deal with an EventMessage
. Differently put, an EventHandlerInterceptor can be registered to Event Processors.
The EventProcessingConfigurer
provides two methods to configure MessageHandlerInterceptor
instances:
registerDefaultHandlerInterceptor(BiFunction<Configuration, String, MessageHandlerInterceptor<? super EventMessage<?>>>)
- registers a defaultMessageHandlerInterceptor
that will be configured on every Event Processor instanceregisterHandlerInterceptor(String, Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>>)
- registers aMessageHandlerInterceptor
that will be configured for the Event Processor matching the givenString
Message Monitors
Any Event Processor instance provides the means to contain a Message Monitor. Message Monitors (discussed in more detail here) allow for monitoring the flow of messages throughout an Axon application. For Event Processors, the message monitor deals explicitly with the events flowing through the Event Processor towards the event handling functions.
The EventProcessingConfigurer
provides two approaches towards configuring a MessageMonitor
:
registerMessageMonitor(String, Function<Configuration, MessageMonitor<Message<?>>>)
- registers the givenMessageMonitor
to the Event Processor matching the givenString
registerMessageMonitorFactory(String, MessageMonitorFactory)
- registers the givenMessageMonitorFactory
to construct aMessageMonitor
for the Event Processor matching the givenString
The MessageMonitorFactory
provides a more fine-grained approach, used throughout the Configuration API, to construct a MessageMonitor
:
We can use the Configuration
to retrieve the required dependencies to construct the MessageMonitor
. The type and name reflect which infrastructure component the factory constructs a monitor for. Whenever you use the MessageMonitorFactory
to construct a MessageMonitor
for an Event Processor, the factory expects the componentType
to be an EventProcessor
implementation. The componentName
, on the other hand, would resemble the name of the Event Processor.
Transaction Management
As components that deal with event handling, the Event Processor is a logical place to provide transaction configuration options. Note that in the majority of the scenarios, the defaults will suffice. This section simply serves to show these options to allow adjustment if the application requires it.
The first of these is the TransactionManager
. Axon uses the TransactionManager
to attach a transaction to every Unit of Work. Within a Spring environment, the TransactionManager
defaults to a SpringTransactionManager
, which uses Spring's PlatformTransactionManager
under the hood. In non Spring environments, it would be wise to build a TransactionManager
implement if transaction management is required, of course. Such an implementation only requires the definition of the TransactionManager#startTransaction()
method. To adjust the transaction manager for an Event Processor, the registerTransactionManager(String, Function<Configuration, TransactionManager>)
on the EventProcessingConfigurer
should be used.
Secondly, you can adjust the desired RollbackConfiguration
per Event Processor. It is the RollbackConfiguration
that decide when a Unit of Work should rollback the transaction. The default RollbackConfiguration
is to rollback on any type of Throwable
; the Unit of Work page describes the other options you can choose. To adjust the default behaviour, the registerRollbackConfiguration(String, Function<Configuration, RollbackConfiguration>)
function should be invoked on the EventProcessingConfigurer
.
Last updated