Event Processors

Event handlers define the business logic to be performed when an event is received. Event Processors are the components that take care of the technical aspects of that processing. They start a unit of work and possibly a transaction. However, they also ensure that correlation data can be correctly attached to all messages created during event processing, among other non-functional requirements.

The image below depicts a representation of the organization of Event Processors and Event Handlers:

Axon has a layered approach towards organizing event handlers. First, an event handler is positioned in a Processing Group. Each event handler, or "Event Handling Component," will only ever belong to a single Processing Group. The Processing Group provides a level of configurable non-functional requirements, like error handling and the sequencing policy.

The Event Processors, in turn, is in charge of the Processing Group. An Event Processor will control 1 to N Processing Groups, although there will be a one-to-one mapping in most cases. Similar to the Event Handling Component, a Processing Group will belong to a single Event Processor. This last layer allows the definition of the type of Event Processor used and concepts like the threading model and a more fine-grained degree of error handling.

Event Processors come in roughly two forms: Subscribing and Streaming.

Subscribing Event Processors subscribe to a source of events and are invoked by the thread managed by the publishing mechanism. Streaming Event Processors, on the other hand, pull their messages from a source using a thread that it manages itself.

For more specifics on either type, consult their respective sections here and here. The rest of this page dedicates itself to describing the Event Processor's common concepts and configuration options. Note that throughout, the EventProcessingConfigurer is used. The EventProcessingConfigurer is part of Axon's Configuration API, dedicated to configuring Event Processors.

Assigning handlers to processors

All processors have a name, which identifies a processor instance across JVM instances. Two processors with the same name are considered as two instances of the same processor.

All event handlers are attached to a processor whose name by default is the package name of the event handler's class. Furthermore, the default processor implementation used by Axon is the Tracking Event Processor. The (default) event processor used can be adjusted, as is shown in the subscribing and streaming sections.

Event handlers, or Event Handling Components, come in roughly two flavors: "regular" (singleton, stateless) event handlers and sagas. This section describes the process to register an event handler, whereas this page describes the saga registration process.

Now let us consider that the following event handlers have been registered:

  • org.axonframework.example.eventhandling.MyHandler

  • org.axonframework.example.eventhandling.MyOtherHandler

  • org.axonframework.example.eventhandling.module.ModuleHandler

Without any intervention, this will trigger the creation of two processors, namely:

  1. org.axonframework.example.eventhandling with two handlers called MyHandler and MyOtherHandler

  2. org.axonframework.example.eventhandling.module with the single handler ModuleHandler

Using the package name serves as a suitable default, but using dedicated names for an Event Processor and/or the Processing Group is recommended. The most straightforward approach to reaching a transparent naming scheme of your event handlers is by using the ProcessingGroup annotation. This annotation resembles the Processing Group level discussed in the introduction.

The ProcessingGroup annotation requires the insertion of a name and can only be set on the class. Let us adjust the previous sample by using this annotation instead of the package names for grouping handlers:

@ProcessingGroup("my-handlers")
class MyHandler {
    // left out event handling functions...
}

@ProcessingGroup("my-handlers")
class MyOtherHandler{
    // ...
}

@ProcessingGroup("module-handlers")
class ModuleHandler {
    // ...
}

Using the ProcessingGroup annotation as depicted, we again construct two processors:

  1. my-handlers with two handlers called MyHandler and MyOtherHandler

  2. module-handlers with the single handler ModuleHandler

If more control is required to group Event Handling Components, we recommend consulting the assignment rules section.

Event Handler Assignment Rules

The Configuration API allows you to configure other strategies for assigning event handling classes to processors or assigning specific handler instances to particular processors. We can separate these assignment rules into roughly two groups: Event Handler to Processing Group and Processing Group to Event Processor. Below is an exhaustive list of all the assignment rules the EventProcessingConfigurer exposes:

Event Handler to Processing Group

  • byDefaultAssignTo(String) - defines the default Processing Group name to assign an event handler to.

    It will only be taken into account if there are no more specifics rules and if the ProcessingGroup annotation is not present.

  • byDefaultAssignHandlerInstancesTo(Function<Object, String>) - defines a lambda invoked to assign an event handling instance to a desired Processing Group by returning that group's name.

    It will only be taken into account if there are no more specifics rules and if the ProcessingGroup annotation is not present.

  • byDefaultAssignHandlerTypesTo(Function<Class<?>, String>) - defines a lambda invoked to assign an event handler type to a desired Processing Group by returning that group's name.

    It will only be taken into account if there are no more specifics rules and if the ProcessingGroup annotation is not present.

  • assignHandlerInstancesMatching(String, Predicate<Object>) - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handling instance.

    The operation uses a natural priority of zero. If an instance matches several criteria, the outcome is undefined.

  • assignHandlerTypesMatching(String, Predicate<Class<?>>) - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handler type.

    The operation uses a natural priority of zero. If an instance matches several criteria, the outcome is undefined.

  • assignHandlerInstancesMatching(String, int, Predicate<Object>) - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handling instance.

    Uses the given priority to decide on rule-ordering. The higher the priority value, the more important the rule is.

    If an instance matches several criteria, the outcome is undefined.

  • assignHandlerTypesMatching(String, int, Predicate<Class<?>>) - assigns event handlers to the given Processing Group name based on a predicate ingesting an event handler type.

    Uses the given priority to decide on rule-ordering. The higher the priority, the more important the rule is.

    If an instance matches several criteria, the outcome is undefined.

Processing Group to Event Processor

  • assignProcessingGroup(String, String) - defines a given Processing Group name that belongs to the given Event Processor's name.

  • assignProcessingGroup(Function<String, String>) - defines a lambda invoked to assign a Processing Group name to the desired Event Processor by returning that processor's name.

Ordering Event Handlers within a processor

To order event handlers within an Event Processor, the order in which event handlers are registered (as described in the Registering Event Handlers section) is guiding. Thus, the ordering in which an Event Processor will call event handlers for event handling is the same as their insertion ordering in the Configuration API.

If we use Spring as the mechanism for wiring everything, we can explicitly specify the event handler component ordering by adding the @Order annotation. This annotation is placed on the event handler class name, containing an integer value to specify the ordering.

Note that it is not possible to order event handlers belonging to different Event Processors. Each Event Processor acts as an isolated component without any intervention from other Event Processors.

Ordering Considerations

Although we can place an order among event handlers within an Event Processor, separation of event handlers is recommended.

Placing an overall ordering on event handlers means those components are inclined to interact with one another, introducing a form of coupling. Due to this, the event handling process will become complex to manage (e.g., for new team members). Furthermore, embracing an ordering approach might lead to place all event handlers in a global ordering, decreasing processing speeds in general.

In all, you are free to use an ordering, but we recommend using it sparingly.

Error Handling

Errors are inevitable in any application. Depending on where they happen, you may want to respond differently.

By default, exceptions raised by event handlers are caught in the Processing Group layer, logged, and processing continues with the following events. When an exception is thrown when a processor is trying to commit a transaction, update a token, or in any other part of the process, the exception will be propagated.

In the case of a Streaming Event Processor, this means the processor will go into error mode, releasing any tokens and retrying at an incremental interval (starting at 1 second, up to max 60 seconds). A Subscribing Event Processor will report a publication error to the component that provided the event.

To change this behavior, both the Processing Group and Event Processor level allow customization on how to deal with exceptions:

Processing Group - Listener Invocation Error Handler

The component dealing with exceptions thrown from an event handling method is called the ListenerInvocationErrorHandler. By default, these exceptions are logged (with the LoggingErrorHandler implementation), and processing continues with the next handler or message.

The default ListenerInvocationErrorHandler used by each processing group can be customized. Furthermore, we can configure the error handling behavior per processing group:

public class AxonConfig {
    // ...
    public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) {
        // To configure a default ...
        processingConfigurer.registerDefaultListenerInvocationErrorHandler(conf -> /* create listener error handler */)
                            // ... or for a specific processing group: 
                            .registerListenerInvocationErrorHandler("my-processing-group", conf -> /* create listener error handler */);
    }
}

It is easy to implement custom error handling behavior. The error handling method to implement provides the exception, the event that was handled, and a reference to the handler that was handling the message:

public interface ListenerInvocationErrorHandler {

    void onError(Exception exception, 
                 EventMessage<?> event, 
                 EventMessageHandler eventHandler) throws Exception;
}

You can choose to retry, ignore or rethrow the exception. The exception will bubble up to the Event Processor level when rethrown.

Event Processor - Error Handler

Exceptions occurring outside an event handler's scope, or have bubbled up from there, are handled by the ErrorHandler. The default error handler is the PropagatingErrorHandler, which will rethrow any exceptions it catches.

How the Event Processor deals with a rethrown exception differ per implementation. The behaviour for the Subscribing- and the Streaming Event Processor can respectively be found here and here.

We can configure a default ErrorHandler for all Event Processors or an ErrorHandler for specific processors:

public class AxonConfig {
    // ...
    public void configureProcessingGroupErrorHandling(EventProcessingConfigurer processingConfigurer) {
        // To configure a default ...
        processingConfigurer.registerDefaultErrorHandler(conf -> /* create error handler */)
                            // ... or for a specific processor: 
                            .registerErrorHandler("my-processor", conf -> /* create error handler */);
    }
}

For providing a custom solution, the ErrorHandler's single method needs to be implemented:

public interface ErrorHandler {

    void handleError(ErrorContext errorContext) throws Exception;
}

Based on the provided ErrorContext object, you can decide to ignore the error, schedule retries, perform dead-letter-queue delivery, or rethrow the exception.

General processor configuration

Alongside handler assignment and error handling, Event Processors allow configuration for other components too. For Subscribing and Streaming Event Processor specific options, their respective sections should be checked. The remainder of this page will cover the generic configuration options for each Event Processor.

Event Processor Builders

The EventProcessingConfigurer provides access to a lot of configurable components for Event Processors. Sometimes it is easier or preferable to provide an entire function to construct an Event Processor, however. To that end, we can configure a custom EventProcessorBuilder:

@FunctionalInterface
interface EventProcessorBuilder {

    // Note: the `EventHandlerInvoker` is the component which holds the event handling functions.
    EventProcessor build(String name, 
                         Configuration configuration, 
                         EventHandlerInvoker eventHandlerInvoker);
}

The EventProcessorBuilder functional interface provides the event processor's name, the Configuration and the EventHandlerInvoker, and requires returning an EventProcessor instance. Note that any Axon component that an Event Processor requires (e.g., an EventStore) is retrievable from the Configuration.

The EventProcessingConfigurer provides two methods to configure an EventProcessorBuilder:

  1. registerEventProcessorFactory(EventProcessorBuilder) - allows you to define a default factory method that creates event processors for which no explicit factories are defined

  2. registerEventProcessor(String, EventProcessorBuilder) - defines the factory method to use to create a processor with given name

Event Handler Interceptors

Since the Event Processor is the invoker of event handling methods, it is a spot to configure Message Handler Interceptors too. Since Event Processors are dedicated to event handling, the MessageHandlerInterceptor is required to deal with an EventMessage. Differently put, an EventHandlerInterceptor can be registered to Event Processors.

The EventProcessingConfigurer provides two methods to configure MessageHandlerInterceptor instances:

  • registerDefaultHandlerInterceptor(BiFunction<Configuration, String, MessageHandlerInterceptor<? super EventMessage<?>>>) - registers a default MessageHandlerInterceptor that will be configured on every Event Processor instance

  • registerHandlerInterceptor(String, Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>>) - registers a MessageHandlerInterceptor that will be configured for the Event Processor matching the given String

Message Monitors

Any Event Processor instance provides the means to contain a Message Monitor. Message Monitors (discussed in more detail here) allow for monitoring the flow of messages throughout an Axon application. For Event Processors, the message monitor deals explicitly with the events flowing through the Event Processor towards the event handling functions.

The EventProcessingConfigurer provides two approaches towards configuring a MessageMonitor:

  • registerMessageMonitor(String, Function<Configuration, MessageMonitor<Message<?>>>) - registers the given MessageMonitor to the Event Processor matching the given String

  • registerMessageMonitorFactory(String, MessageMonitorFactory) - registers the given MessageMonitorFactory to construct a MessageMonitor for the Event Processor matching the given String

The MessageMonitorFactory provides a more fine-grained approach, used throughout the Configuration API, to construct a MessageMonitor:

@FunctionalInterface
public interface MessageMonitorFactory {

    MessageMonitor<Message<?>> create(Configuration configuration, 
                                      Class<?> componentType, 
                                      String componentName);
}

We can use the Configuration to retrieve the required dependencies to construct the MessageMonitor. The type and name reflect which infrastructure component the factory constructs a monitor for. Whenever you use the MessageMonitorFactory to construct a MessageMonitor for an Event Processor, the factory expects the componentType to be an EventProcessor implementation. The componentName, on the other hand, would resemble the name of the Event Processor.

Transaction Management

As components that deal with event handling, the Event Processor is a logical place to provide transaction configuration options. Note that in the majority of the scenarios, the defaults will suffice. This section simply serves to show these options to allow adjustment if the application requires it.

The first of these is the TransactionManager. Axon uses the TransactionManager to attach a transaction to every Unit of Work. Within a Spring environment, the TransactionManager defaults to a SpringTransactionManager, which uses Spring's PlatformTransactionManager under the hood. In non Spring environments, it would be wise to build a TransactionManager implement if transaction management is required, of course. Such an implementation only requires the definition of the TransactionManager#startTransaction() method. To adjust the transaction manager for an Event Processor, the registerTransactionManager(String, Function<Configuration, TransactionManager>) on the EventProcessingConfigurer should be used.

Secondly, you can adjust the desired RollbackConfiguration per Event Processor. It is the RollbackConfiguration that decide when a Unit of Work should rollback the transaction. The default RollbackConfiguration is to rollback on any type of Throwable; the Unit of Work page describes the other options you can choose. To adjust the default behaviour, the registerRollbackConfiguration(String, Function<Configuration, RollbackConfiguration>) function should be invoked on the EventProcessingConfigurer.

Last updated