Event Processors

Event handlers define the business logic to be performed when an event is received. Event Processors are the components that take care of the technical aspects of that processing. They start a unit of work and possibly a transaction. However, they also ensure that correlation data can be correctly attached to all messages created during event processing, among other non-functional requirements.

Event processors have event handling components. Event handling components, in turn, have event handlers. Each event handling component belongs to a single event processor. The event processor manages all aspects of event processing including threading, error handling, sequencing, and transaction management.

Event processors come in roughly two forms:

Subscribing event processors subscribe to a source of events and are invoked by the thread managed by the publishing mechanism. Streaming event processors, on the other hand, pull their messages from a source using a thread that it manages itself.

For more specifics on either type, consult their respective sections here and here. The rest of this page describes the Event Processor’s common concepts and configuration options.

Assigning handlers to processors

All processors have a name, which identifies a processor instance across JVM instances. Two processors with the same name are considered as two instances of the same processor.

Each event handler will belong to an event handling component. An event handling component can include any number of event handlers. These event handling components are attached to a processor, whose name by default is the package name of the event handler’s class. Furthermore, the default processor implementation used by Axon is the PooledStreamingEventProcessor. The event processor type can be customized, as is shown in the subscribing and streaming sections.

There are two main approaches to configuring event processors and assigning handlers:

  • Declarative configuration - Using Axon’s configuration API to explicitly define which event handling components belong to which processor.

  • Autodetected configuration - Using Spring Boot to automatically detect and assign event handling components to processors based on their package name.

This section describes the process to register an event handler.

Declarative configuration

Assuming the declarative approach is used and we have the following event handling components:

  • org.axonframework.example.eventhandling.MyHandler

  • org.axonframework.example.eventhandling.MyOtherHandler

  • org.axonframework.example.eventhandling.module.ModuleHandler

To register these, we need to explicitly invoke the MessagingConfigurer#eventProcessing(Consumer<EventProcessingConfigurer>). In the example below, a subscribing processor is constructed, which can be replaced for a streaming processor if desired:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessorsConfigurer;

public class AxonConfig {

    public void configureEventProcessing(MessagingConfigurer configurer) {
        configurer.eventProcessing(eventConfigurer -> eventConfigurer.subscribing(
                this::configureSubscribingProcessor
        ));
    }

    private SubscribingEventProcessorsConfigurer configureSubscribingProcessor(
            SubscribingEventProcessorsConfigurer subscribingConfigurer
    ) {
        return subscribingConfigurer.processor(
                "my-processor",
                config -> config.eventHandlingComponents(this::configureHandlingComponent)
                                .notCustomized()
        );
    }

    private EventHandlingComponentsConfigurer.AdditionalComponentPhase configureHandlingComponent(
            EventHandlingComponentsConfigurer.RequiredComponentPhase componentConfigurer
    ) {
        return componentConfigurer.autodetected(c -> new MyHandler())
                                  .autodetected(c -> new ModuleHandler())
                                  .autodetected(c -> new MyOtherHandler());
    }
}

Autodetected configuration

Assuming the autodetected approach is used, let us consider that the following event handling components have been registered:

  • org.axonframework.example.eventhandling.MyHandler

  • org.axonframework.example.eventhandling.MyOtherHandler

  • org.axonframework.example.eventhandling.module.ModuleHandler

Without any intervention, this will trigger the creation of two Streaming Processors, namely:

  1. org.axonframework.example.eventhandling with two event handling components called MyHandler and MyOtherHandler.

  2. org.axonframework.example.eventhandling.module with the event handling component ModuleHandler.

Ordering event handling components within a processor

To order event handling components within an Event Processor, the order in which components are registered (as described in the Registering Event Handlers section) is guiding. Thus, the ordering in which an Event Processor will call handling components for event handling is the same as their insertion ordering in the Configuration API.

If we use an autodetected configuration like Spring as the mechanism for wiring everything, we can explicitly specify the event handler component ordering by adding the @Order annotation. This annotation is placed on the event handling component class, containing an integer value to specify the ordering.

Note that it is not possible to order event handlers belonging to different Event Processors. Each Event Processor acts as an isolated component without any intervention from other Event Processors.

Ordering Event Handlers within a Processor

Although we can place an order among event handling components within an Event Processor, separation of components is recommended.

Placing an overall ordering on event handling components means those components are inclined to interact with one another, introducing a form of coupling. Due to this, the event handling process will become complex to manage (for example, for new team members). Furthermore, embracing an ordering approach might lead to place all event handling components in a global ordering, decreasing processing speeds in general.

In all, you are free to use an ordering, but we recommend using it sparingly.

Processor lifecycle

Every EventProcessor exposes a common set of lifecycle methods to start, stop, and inspect state. These methods are available on all processor implementations, including the Subscribing and Streaming Event Processor.

Starting and stopping

start() and shutdown() both return a CompletableFuture<Void> that completes when the operation has fully finished.

In most scenarios, you do not need to invoke start() or shutdown() manually. Axon Framework manages the processor lifecycle automatically as part of application startup and shutdown. Only call these methods directly when you need explicit control, for example when triggering a programmatic replay or conditionally pausing processing.

Checking state

Two synchronous methods allow you to inspect the current state of a processor at any point:

  • isRunning(): returns true when the processor is active and processing events. Returns false after a clean shutdown() or before start() has been called.

  • isError(): returns true when the processor has gone into error mode due to an unhandled exception. Returns false after a clean shutdown(). See Error mode for more on how the Streaming Event Processor handles errors.

import org.axonframework.common.configuration.AxonConfiguration;
import org.axonframework.messaging.eventhandling.processing.EventProcessor;

public class ProcessorStateChecker {

    private final AxonConfiguration configuration;

    public ProcessorStateChecker(AxonConfiguration configuration) {
        this.configuration = configuration;
    }

    public void printState(String processorName) {
        EventProcessor processor = configuration.getComponent(EventProcessor.class, processorName);
        System.out.println("Running: " + processor.isRunning());
        System.out.println("Error:   " + processor.isError());
    }
}

Error handling

Errors are inevitable in any application. Depending on where they happen, you may want to respond differently.

By default, exceptions raised by event handlers are handled by the ErrorHandler. The default ErrorHandler used is the PropagatingErrorHandler, which rethrows any exceptions it catches.

In the case of a Streaming Event Processor, this means the processor will go into error mode, releasing any tokens and retrying at an incremental interval (starting at 1 second, up to max 60 seconds). A Subscribing Event Processor will report a publication error to the component that provided the event.

How the Event Processor deals with a rethrown exception differs per implementation. The behaviour for the Subscribing- and the Streaming Event Processor can respectively be found here and here.

We can configure a default ErrorHandler for all Event Processors or an ErrorHandler for specific processors:

  • Configuration API - Default

  • Configuration API - Specific

To register a default ErrorHandler for any Event Processor, you can do the following:

import jakarta.annotation.Nonnull;
import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.eventhandling.processing.errorhandling.ErrorContext;
import org.axonframework.messaging.eventhandling.processing.errorhandling.ErrorHandler;

public class AxonConfig {

    public void configureEventProcessing(MessagingConfigurer configurer) {
        configurer.eventProcessing(eventConfigurer -> eventConfigurer.defaults(
                defaults -> defaults.errorHandler(new CustomErrorHandler())
        ));
    }
}

To register a custom ErrorHandler for a specific (in this case Subscribing) Processor, have a look at the following example:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessorsConfigurer;

public class AxonConfig {

    public void configureEventProcessing(MessagingConfigurer configurer) {
        configurer.eventProcessing(eventConfigurer -> eventConfigurer.subscribing(
                this::configureSubscribingProcessor
        ));
    }

    private SubscribingEventProcessorsConfigurer configureSubscribingProcessor(
            SubscribingEventProcessorsConfigurer subscribingConfigurer
    ) {
        return subscribingConfigurer.processor(
                "my-processor",
                config -> config.eventHandlingComponents(this::configureHandlingComponent)
                                .customized((c, subscribingConfig) -> subscribingConfig.errorHandler(
                                        new CustomErrorHandler()
                                ))
        );
    }

    private EventHandlingComponentsConfigurer.AdditionalComponentPhase configureHandlingComponent(
            EventHandlingComponentsConfigurer.RequiredComponentPhase componentConfigurer
    ) {
        return componentConfigurer.autodetected(c -> new MyHandler());
    }
}

For providing a custom solution, the `ErrorHandler’s single method needs to be implemented:

public interface ErrorHandler {

    void handleError(@Nonnull ErrorContext errorContext) throws Exception;
}

Based on the provided ErrorContext object, you can decide to ignore the error, schedule retries, or rethrow the exception.

Dead-letter queue

Dead-Letter Queue not available in Axon Framework 5.0

The Dead-Letter Queue feature is not yet available in Axon Framework 5.0. It will be reintroduced in Axon Framework 5.1. The documentation below describes the Dead-Letter Queue concepts for reference and future use.

Although configuring an Error Handler helps you to deal with exceptions when processing events, event handling will typically still stop entirely. When you only log the error and allow processing to proceed, you will most likely end up with missing data until you fix the predicament and replay past events. If you instead propagate the exception so the event processor keeps retrying, the event processor will stall entirely when the cause is consistent.

Although this behavior is sufficient on many occasions, sometimes it is beneficial if we can unblock event handling by parking the problematic event. This is where the dead-letter queue comes in. It is a mechanism to park events until they can be processed successfully. You can find more information in the Dead-Letter Queue section.

General processor configuration

Alongside handler assignment and error handling, Event Processors allow configuration for other components too. For Subscribing and Streaming Event Processor specific options, their respective sections should be checked. The remainder of this page will cover the generic configuration options for each Event Processor.

Event handler interceptors

Since the Event Processor is the invoker of event handling methods, it is a spot to configure Message Handler Interceptors too. Since Event Processors are dedicated to event handling, the MessageHandlerInterceptor is required to deal with an EventMessage. Differently put, an EventHandlerInterceptor can be registered to Event Processors.

There are several approaches to registering interceptors, each with a different scope.

Default interceptors for all processors

Default handling interceptors can be attached to all Event Processors in your application through the MessagingConfigurer. This is explained in more detail here.

Interceptors for a specific processor instance

Interceptors can be registered for a specific Event Processor instance by name. Use the withInterceptor(MessageHandlerInterceptor<? super EventMessage>) operation as part of customizing a processor:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;

public class AxonConfig {

    public void registerProcessorSpecificInterceptor(MessagingConfigurer configurer) {
        // For SubscribingEventProcessor:
        configurer.eventProcessing()
                  .subscribing()
                  .customize("my-processor", config -> config.withInterceptor(new CustomEventHandlerInterceptor()));
        // For PooledStreamingEventProcessor:
        configurer.eventProcessing()
                  .pooledStreaming()
                  .customize("my-processor", config -> config.withInterceptor(new CustomEventHandlerInterceptor()));
    }
}

Interceptors for specific event handling components

Interceptors can be registered as part of the event handling component configuration itself, using intercepted() on the EventHandlingComponentsConfigurer. Calling intercepted() closes the component registration phase, so all components must be registered before calling it. Multiple intercepted() calls accumulate interceptors in registration order:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessorsConfigurer;

public class AxonConfig {

    public void configureEventProcessing(MessagingConfigurer configurer) {
        configurer.eventProcessing(eventConfigurer -> eventConfigurer.subscribing(
                this::configureSubscribingProcessor
        ));
    }

    private SubscribingEventProcessorsConfigurer configureSubscribingProcessor(
            SubscribingEventProcessorsConfigurer subscribingConfigurer
    ) {
        return subscribingConfigurer.processor(
                "my-processor",
                config -> config.eventHandlingComponents(this::configureHandlingComponents)
                                .notCustomized()
        );
    }

    private EventHandlingComponentsConfigurer.CompletePhase configureHandlingComponents(
            EventHandlingComponentsConfigurer.RequiredComponentPhase componentConfigurer
    ) {
        return componentConfigurer.autodetected("orderHandler", cfg -> new OrderEventHandler())
                                  .autodetected("inventoryHandler", cfg -> new InventoryEventHandler())
                                  .intercepted(cfg -> new AuditLoggingInterceptor())
                                  .intercepted(cfg -> new TenantFilterInterceptor(tenantId));
    }
}

Component registration ends when the first .intercepted(…​) is called. Additional interceptors can still be added with further .intercepted(…​) calls and are applied in registration order.

Use withExceptionHandler() to handle exceptions thrown by event handlers in all components managed by the configurer. Return MessageStream.empty() to suppress the exception and allow event processing to continue. Return MessageStream.failed(error) (with the same or a different exception) to propagate the error to the event processor:

import org.axonframework.messaging.core.MessageStream;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;

private EventHandlingComponentsConfigurer.CompletePhase configureHandlingComponents(
        EventHandlingComponentsConfigurer.RequiredComponentPhase componentConfigurer
) {
    return componentConfigurer.autodetected("orderHandler", cfg -> new OrderEventHandler())
                              .withExceptionHandler((event, context, error) -> {
                                  log.warn("Handler failed for {}: {}", event.qualifiedName(), error.getMessage());
                                  return MessageStream.empty(); // suppress; use MessageStream.failed(error) to propagate
                              });
}

Annotated interceptors on the handler class

Methods on the handler class itself can be annotated with @EventHandlerInterceptor. These methods are automatically discovered and run before every @EventHandler method on the same component instance, making them ideal for cross-cutting concerns that share the component’s state (such as tenant filtering or component-level auditing).

Two styles are supported:

  • A before-interceptor is a void method with no MessageHandlerInterceptorChain parameter. It runs before the handler, and the chain automatically proceeds when the method returns normally. If the method throws, the handler is not invoked.

  • A surround-interceptor declares a MessageHandlerInterceptorChain parameter and returns a MessageStream. It controls whether the handler is invoked by calling chain.proceed(event, ctx). Returning without calling proceed short-circuits the chain.

import org.axonframework.messaging.eventhandling.EventMessage;
import org.axonframework.messaging.eventhandling.annotation.EventHandler;
import org.axonframework.messaging.eventhandling.annotation.EventHandlerInterceptor;

// Before-interceptor: runs before every @EventHandler on this component
public class AuditingEventHandler {

    private final AuditLog auditLog;

    @EventHandlerInterceptor
    void audit(EventMessage event) {
        auditLog.record(event.qualifiedName());
        // chain proceeds automatically after this returns
    }

    @EventHandler
    void on(OrderPlaced event) { ... }

    @EventHandler
    void on(OrderCancelled event) { ... }
}
import org.axonframework.messaging.core.MessageHandlerInterceptorChain;
import org.axonframework.messaging.core.MessageStream;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.axonframework.messaging.eventhandling.EventMessage;
import org.axonframework.messaging.eventhandling.annotation.EventHandler;
import org.axonframework.messaging.eventhandling.annotation.EventHandlerInterceptor;

// Surround-interceptor: short-circuit events intended for other tenants
public class TenantScopedEventHandler {

    private final TenantId tenantId;

    @EventHandlerInterceptor
    MessageStream<?> filterByTenant(
            EventMessage event,
            MessageHandlerInterceptorChain<EventMessage> chain,
            ProcessingContext ctx
    ) {
        if (!tenantId.equals(event.metaData().get("tenantId"))) {
            return MessageStream.empty(); // skip: not our tenant
        }
        return chain.proceed(event, ctx);
    }

    @EventHandler
    void on(OrderPlaced event) { ... }
}

@ExceptionHandler methods handle exceptions thrown by @EventHandler methods on the same component. When an event handler throws, the framework looks for a matching @ExceptionHandler on the same component before propagating the error. If the method returns normally, the exception is suppressed and event processing continues; if the method throws (or re-throws), the exception propagates.

Use the resultType attribute to narrow the handler to a specific exception type. Without it, the method is eligible for any exception that matches its declared parameter type (or any exception if there are no parameters).

import org.axonframework.messaging.eventhandling.annotation.EventHandler;
import org.axonframework.messaging.core.interception.annotation.ExceptionHandler;

public class MyEventHandler {

    @EventHandler
    void on(OrderPlaced event) {
        // may throw
    }

    @ExceptionHandler
    void onException(RuntimeException ex) {
        log.error("Handler failed: {}", ex.getMessage());
        // return normally to suppress the exception and continue processing
    }

    @ExceptionHandler(resultType = OptimisticLockingFailureException.class)
    void onConcurrencyFailure(OptimisticLockingFailureException ex) {
        // handle a specific exception type; other exceptions are not affected
    }
}

Message monitors

Any Event Processor instance provides the means to contain a Message Monitor. Message Monitors (discussed in more detail here) allow for monitoring the flow of messages throughout an Axon application. For Event Processors, the message monitor deals explicitly with the events flowing through the Event Processor towards the event handling functions.

To register a default MessagMonitor for events, you use the MessageConfigurer#registerEventMonitor operation as shown:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;

public class AxonConfig {

    public void registerEventMonitor(MessagingConfigurer configurer) {
        configurer.registerEventMonitor(config -> new CustomMessageMonitor());
    }
}