Event Processors

Event handlers define the business logic to be performed when an event is received. Event Processors are the components that take care of the technical aspects of that processing. They start a unit of work and possibly a transaction. However, they also ensure that correlation data can be correctly attached to all messages created during event processing, among other non-functional requirements.

Event processors have event handling components. Event handling components, in turn, have event handlers. Each event handling component belongs to a single event processor. The event processor manages all aspects of event processing including threading, error handling, sequencing, and transaction management.

Event processors come in roughly two forms:

Subscribing event processors subscribe to a source of events and are invoked by the thread managed by the publishing mechanism. Streaming event processors, on the other hand, pull their messages from a source using a thread that it manages itself.

For more specifics on either type, consult their respective sections here and here. The rest of this page describes the Event Processor’s common concepts and configuration options.

Assigning handlers to processors

All processors have a name, which identifies a processor instance across JVM instances. Two processors with the same name are considered as two instances of the same processor.

Each event handler will belong to an event handling component. An event handling component can include any number of event handlers. These event handling components are attached to a processor, whose name by default is the package name of the event handler’s class. Furthermore, the default processor implementation used by Axon is the PooledStreamingEventProcessor. The event processor type can be customized, as is shown in the subscribing and streaming sections.

There are two main approaches to configuring event processors and assigning handlers:

  • Declarative configuration - Using Axon’s configuration API to explicitly define which event handling components belong to which processor.

  • Autodetected configuration - Using Spring Boot to automatically detect and assign event handling components to processors based on their package name.

This section describes the process to register an event handler.

Declarative configuration

Assuming the declarative approach is used and we have the following event handling components:

  • org.axonframework.example.eventhandling.MyHandler

  • org.axonframework.example.eventhandling.MyOtherHandler

  • org.axonframework.example.eventhandling.module.ModuleHandler

To register these, we need to explicitly invoke the MessagingConfigurer#eventProcessing(Consumer<EventProcessingConfigurer>). In the example below, a subscribing processor is constructed, which can be replaced for a streaming processor if desired:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessorsConfigurer;

public class AxonConfig {

    public void configureEventProcessing(MessagingConfigurer configurer) {
        configurer.eventProcessing(eventConfigurer -> eventConfigurer.subscribing(
                this::configureSubscribingProcessor
        ));
    }

    private SubscribingEventProcessorsConfigurer configureSubscribingProcessor(
            SubscribingEventProcessorsConfigurer subscribingConfigurer
    ) {
        return subscribingConfigurer.processor(
                "my-processor",
                config -> config.eventHandlingComponents(this::configureHandlingComponent)
                                .notCustomized()
        );
    }

    private EventHandlingComponentsConfigurer.AdditionalComponentPhase configureHandlingComponent(
            EventHandlingComponentsConfigurer.RequiredComponentPhase componentConfigurer
    ) {
        return componentConfigurer.autodetected(c -> new MyHandler())
                                  .autodetected(c -> new ModuleHandler())
                                  .autodetected(c -> new MyOtherHandler());
    }
}

Autodetected configuration

Assuming the autodetected approach is used, let us consider that the following event handling components have been registered:

  • org.axonframework.example.eventhandling.MyHandler

  • org.axonframework.example.eventhandling.MyOtherHandler

  • org.axonframework.example.eventhandling.module.ModuleHandler

Without any intervention, this will trigger the creation of two Streaming Processors, namely:

  1. org.axonframework.example.eventhandling with two event handling components called MyHandler and MyOtherHandler.

  2. org.axonframework.example.eventhandling.module with the event handling component ModuleHandler.

Ordering event handling components within a processor

To order event handling components within an Event Processor, the order in which components are registered (as described in the Registering Event Handlers section) is guiding. Thus, the ordering in which an Event Processor will call handling components for event handling is the same as their insertion ordering in the Configuration API.

If we use an autodetected configuration like Spring as the mechanism for wiring everything, we can explicitly specify the event handler component ordering by adding the @Order annotation. This annotation is placed on the event handling component class, containing an integer value to specify the ordering.

Note that it is not possible to order event handlers belonging to different Event Processors. Each Event Processor acts as an isolated component without any intervention from other Event Processors.

Ordering Event Handlers within a Processor

Although we can place an order among event handling components within an Event Processor, separation of components is recommended.

Placing an overall ordering on event handling components means those components are inclined to interact with one another, introducing a form of coupling. Due to this, the event handling process will become complex to manage (for example, for new team members). Furthermore, embracing an ordering approach might lead to place all event handling components in a global ordering, decreasing processing speeds in general.

In all, you are free to use an ordering, but we recommend using it sparingly.

Error handling

Errors are inevitable in any application. Depending on where they happen, you may want to respond differently.

By default, exceptions raised by event handlers are handled by the ErrorHandler. The default ErrorHandler used is the PropagatingErrorHandler, which rethrows any exceptions it catches.

In the case of a Streaming Event Processor, this means the processor will go into error mode, releasing any tokens and retrying at an incremental interval (starting at 1 second, up to max 60 seconds). A Subscribing Event Processor will report a publication error to the component that provided the event.

How the Event Processor deals with a rethrown exception differs per implementation. The behaviour for the Subscribing- and the Streaming Event Processor can respectively be found here and here.

We can configure a default ErrorHandler for all Event Processors or an ErrorHandler for specific processors:

  • Configuration API - Default

  • Configuration API - Specific

To register a default ErrorHandler for any Event Processor, you can do the following:

import jakarta.annotation.Nonnull;
import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.eventhandling.processing.errorhandling.ErrorContext;
import org.axonframework.messaging.eventhandling.processing.errorhandling.ErrorHandler;

public class AxonConfig {

    public void configureEventProcessing(MessagingConfigurer configurer) {
        configurer.eventProcessing(eventConfigurer -> eventConfigurer.defaults(
                defaults -> defaults.errorHandler(new CustomErrorHandler())
        ));
    }
}

To register a custom ErrorHandler for a specific (in this case Subscribing) Processor, have a look at the following example:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessorsConfigurer;

public class AxonConfig {

    public void configureEventProcessing(MessagingConfigurer configurer) {
        configurer.eventProcessing(eventConfigurer -> eventConfigurer.subscribing(
                this::configureSubscribingProcessor
        ));
    }

    private SubscribingEventProcessorsConfigurer configureSubscribingProcessor(
            SubscribingEventProcessorsConfigurer subscribingConfigurer
    ) {
        return subscribingConfigurer.processor(
                "my-processor",
                config -> config.eventHandlingComponents(this::configureHandlingComponent)
                                .customized((c, subscribingConfig) -> subscribingConfig.errorHandler(
                                        new CustomErrorHandler()
                                ))
        );
    }

    private EventHandlingComponentsConfigurer.AdditionalComponentPhase configureHandlingComponent(
            EventHandlingComponentsConfigurer.RequiredComponentPhase componentConfigurer
    ) {
        return componentConfigurer.autodetected(c -> new MyHandler());
    }
}

For providing a custom solution, the `ErrorHandler’s single method needs to be implemented:

public interface ErrorHandler {

    void handleError(@Nonnull ErrorContext errorContext) throws Exception;
}

Based on the provided ErrorContext object, you can decide to ignore the error, schedule retries, or rethrow the exception.

Dead-letter queue

Dead-Letter Queue not available in Axon Framework 5.0

The Dead-Letter Queue feature is not yet available in Axon Framework 5.0. It will be reintroduced in Axon Framework 5.1. The documentation below describes the Dead-Letter Queue concepts for reference and future use.

Although configuring an Error Handler helps you to deal with exceptions when processing events, event handling will typically still stop entirely. When you only log the error and allow processing to proceed, you will most likely end up with missing data until you fix the predicament and replay past events. If you instead propagate the exception so the event processor keeps retrying, the event processor will stall entirely when the cause is consistent.

Although this behavior is sufficient on many occasions, sometimes it is beneficial if we can unblock event handling by parking the problematic event. This is where the dead-letter queue comes in. It is a mechanism to park events until they can be processed successfully. You can find more information in the Dead-Letter Queue section.

General processor configuration

Alongside handler assignment and error handling, Event Processors allow configuration for other components too. For Subscribing and Streaming Event Processor specific options, their respective sections should be checked. The remainder of this page will cover the generic configuration options for each Event Processor.

Event handler interceptors

Since the Event Processor is the invoker of event handling methods, it is a spot to configure Message Handler Interceptors too. Since Event Processors are dedicated to event handling, the MessageHandlerInterceptor is required to deal with an EventMessage. Differently put, an EventHandlerInterceptor can be registered to Event Processors.

There are two approaches to registering interceptors with your event handling processes. Firstly, you can define default handling interceptors that will be attached to all Event Processors in your application. This can be achieved through the MessagingConfigurer and is explained in more detail here.

Secondly, handler interceptor can be registered per specific Event Processor instance. For this you should use the withInterceptr(MessageHandlerInterceptor<? super EventMessage>) operation as part of customizing a processor based on its name as shown:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;

public class AxonConfig {

    public void registerProcessorSpecificInterceptor(MessagingConfigurer configurer) {
        // For SubscribingEventProcessor:
        configurer.eventProcessing()
                  .subscribing()
                  .customize("my-processor", config -> config.withInterceptor(new CustomEventHandlerInterceptor()));
        // For PooledStreamEventProcessor:
        configurer.eventProcessing()
                  .pooledStreaming()
                  .customize("my-processor", config -> config.withInterceptor(new CustomEventHandlerInterceptor()));
    }
}

Message monitors

Any Event Processor instance provides the means to contain a Message Monitor. Message Monitors (discussed in more detail here) allow for monitoring the flow of messages throughout an Axon application. For Event Processors, the message monitor deals explicitly with the events flowing through the Event Processor towards the event handling functions.

To register a default MessagMonitor for events, you use the MessageConfigurer#registerEventMonitor operation as shown:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;

public class AxonConfig {

    public void registerEventMonitor(MessagingConfigurer configurer) {
        configurer.registerEventMonitor(config -> new CustomMessageMonitor());
    }
}