Event Processing

The Events generated by the application need to be dispatched to the components that update the query databases, search engines or any other resources that need them: the Event Handlers. It is the responsibility of the Event Bus to dispatch Event Messages to all components interested. On the receiving end, Event Processors are responsible for handling those events, which includes invocation of the appropriate Event Handlers.

Publishing Events

In the vast majority of cases, the Aggregates will publish events by applying them. However, occasionally, it is necessary to publish an event (possibly from within another component), directly to the Event Bus. To publish an event, simply wrap the payload describing the event in an EventMessage. The GenericEventMessage.asEventMessage(Object) method allows you to wrap any object into an EventMessage. If the passed object is already an EventMessage, it is simply returned.

Event Bus

The EventBus is the mechanism that dispatches events to the subscribed event handlers. Axon provides two implementation of the Event Bus: SimpleEventBus and EmbeddedEventStore. While both implementations support subscribing and tracking processors (see Events Processors below), the EmbeddedEventStore persists events, which allows you to replay them at a later stage. The SimpleEventBus has a volatile storage and 'forgets' events as soon as they have been published to subscribed components.

When using the Configuration API, the SimpleEventBus is used by default. To configure the EmbeddedEventStore instead, you need to supply an implementation of a StorageEngine, which does the actual storage of Events.

Configurer configurer = DefaultConfigurer.defaultConfiguration();
configurer.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine());

Event Processors

Event Handlers define the business logic to be performed when an Event is received. Event Processors are the components that take care of the technical aspects of that processing. They start a Unit of Work and possibly a transaction, but also ensure that correlation data can be correctly attached to all messages created during Event processing.

Event Processors come in roughly two forms: Subscribing and Tracking. The Subscribing Event Processors subscribe themselves to a source of Events and are invoked by the thread managed by the publishing mechanism. Tracking Event Processors, on the other hand, pull their messages from a source using a thread that it manages itself.

Assigning handlers to processors

All processors have a name, which identifies a processor instance across JVM instances. Two processors with the same name, can be considered as two instances of the same processor.

All Event Handlers are attached to a Processor whose name is the package name of the Event Handler's class.

For example, the following classes:

  • org.axonframework.example.eventhandling.MyHandler,

  • org.axonframework.example.eventhandling.MyOtherHandler, and

  • org.axonframework.example.eventhandling.module.MyHandler

will trigger the creation of two Processors:

  • org.axonframework.example.eventhandling with 2 handlers, and

  • org.axonframework.example.eventhandling.module with a single handler

The Configuration API allows you to configure other strategies for assigning classes to processors, or even assign specific instances to specific processors.

Ordering Event Handlers within a single Event Processor

To order Event Handlers within an Event Processor, the ordering in which Event Handlers are registered (as described in the Registering Event Handlers section) is guiding. Thus, the ordering in which Event Handlers will be called by an Event Processor for Event Handling is their insertion ordering in the configuration API.

If Spring is selected as the mechanism to wire everything, the ordering of the Event Handlers can be specified by adding the @Order annotation. This annotation should be placed on class level of your Event Handler class, adding a integer value to specify the ordering.

Do note that it is not possible to order Event Handlers which are not a part of the same Event Processor.

Configuring processors

Processors take care of the technical aspects of handling an event, regardless of the business logic triggered by each event. However, the way "regular" (singleton, stateless) event handlers are Configured is slightly different from Sagas, as different aspects are important for both types of handlers.

Event Handlers

By default, Axon will use Subscribing Event Processors. It is possible to change how Handlers are assigned and how processors are configured using the EventHandlingConfiguration class of the Configuration API.

The EventHandlingConfiguration class defines a number of methods that can be used to define how processors need to be configured.

  • registerEventProcessorFactory allows you to define a default factory method that creates Event Processors for which no explicit factories have been defined.

  • registerEventProcessor(String name, EventProcessorBuilder builder) defines the factory method to use to create a Processor with given name. Note that such Processor is only created if name is chosen as the processor for any of the available Event Handler beans.

  • registerTrackingProcessor(String name) defines that a processor with given name should be configured as a Tracking Event Processor, using default settings. It is configured with a TransactionManager and a TokenStore, both taken from the main configuration by default.

  • registerTrackingProcessor(String name, Function<Configuration, TrackingEventProcessorConfiguration> processorConfiguration, Function<Configuration, SequencingPolicy<? super EventMessage<?>>> sequencingPolicy) defines that a processor with given name should be configured as a Tracking Processor, and use the given TrackingEventProcessorConfiguration to read the configuration settings for multi-threading. The SequencingPolicy defines which expectations the processor has on sequential processing of events. See Parallel Processing below for more details.

  • usingTrackingProcessors() sets the default to Tracking Processors instead of Subscribing ones.

Sagas

Sagas are configured using the SagaConfiguration class. It provides static methods to initialize an instance either for Tracking Processing, or Subscribing.

To configure a Saga to run in subscribing mode, simply do:

SagaConfiguration<MySaga> sagaConfig = SagaConfiguration.subscribingSagaManager(MySaga.class);

If you don't want to use the default EventBus / Store as source for this Saga to get its messages from, you can define another source of messages as well:

SagaConfiguration.subscribingSagaManager(MySaga.class, c -> /* define source here */);

Another variant of the subscribingSagaManager() method allows you to pass a (builder for an) EventProcessingStrategy. By default, Sagas are invoked in synchronously. This can be made asynchronous using this method. However, using Tracking Processors is the preferred way for asynchronous invocation.

To configure a Saga to use a Tracking Processor, simply do:

SagaConfiguration.trackingSagaManager(MySaga.class);

This will set the default properties, meaning a single Thread is used to process events. To change this:

SagaConfiguration.trackingSagaManager(MySaga.class)
                 // configure 4 threads
                 .configureTrackingProcessor(c -> TrackingProcessingConfiguration.forParallelProcessing(4))

The TrackingProcessingConfiguration has a few methods allowing you to specify how many segments will be created and which ThreadFactory should be used to create Processor threads. See Parallel Processing for more details.

If you are using Spring, you can configure your saga like this:

@Saga(configurationBean = "mySagaConfiguration")
public class Saga {...}
...
// somewhere in configuration
@Bean
public SagaConfiguration<Saga> mySagaConfiguration() {
    return SagaConfiguration.subscribingSagaManager(Saga.class);
}

Check out the API documentation (JavaDoc) of the SagaConfiguration class for full details on how to configure event handling for a Saga.

Token Store

Tracking Processors, unlike Subscribing ones, need a Token Store to store their progress in. Each message a Tracking Processor receives through its Event Stream is accompanied by a Token. This Token allows the processor to reopen the Stream at any later point, picking up where it left off with the last Event.

The Configuration API takes the Token Store, as well as most other components Processors need from the Global Configuration instance. If no TokenStore is explicitly defined, an InMemoryTokenStore is used, which is not recommended in production.

To configure a different Token Store, use Configurer.registerComponent(TokenStore.class, conf -> ... create token store ...)

Note that you can override the TokenStore to use with Tracking Processors in the respective EventHandlingConfiguration or SagaConfiguration that defines that Processor. Where possible, it is recommended to use a Token Store that stores tokens in the same database as where the Event Handlers update the view models. This way, changes to the view model can be stored atomically with the changed tokens, guaranteeing exactly once processing semantics.

Event Tracker Status

In some cases it might be useful to know the state of a Tracking Event Processor for each of its segment. One of those cases could be when we want to rebuild our view model and we want to check when the Processor is caught up with all the events. For cases like these, the TrackingEventProcessor exposes processingStatus() method, which returns a map where the key is the segment identifier, and the value is the event processing status. Based on this status we can determine whether the Processor is caught up and/or is replaying, and we can verify the Tracking Token of its segments.

Parallel Processing

As of Axon Framework 3.1, Tracking Processors can use multiple threads to process an Event Stream. They do so, by claiming a so-called segment, identifier by a number. Normally, a single thread will process a single Segment.

The number of Segments used can be defined. When a Processor starts for the first time, it can initialize a number of segments. This number defines the maximum number of threads that can process events simultaneously. Each node running of a TrackingProcessor will attempt to start its configured amount of Threads, to start processing these.

Event Handlers may have specific expectations on the ordering of events. If this is the case, the processor must ensure these events are sent to these Handlers in that specific order. Axon uses the SequencingPolicy for this. The SequencingPolicy is essentially a function, that returns a value for any given message. If the return value of the SequencingPolicy function is equal for two distinct event messages, it means that those messages must be processed sequentially. By default, Axon components will use the SequentialPerAggregatePolicy, which makes it so that Events published by the same Aggregate instance will be handled sequentially.

A Saga instance is never invoked concurrently by multiple threads. Therefore, a Sequencing Policy for a Saga is irrelevant. Axon will ensure each Saga instance receives the Events it needs to process in the order they have been published on the Event Bus.

Note

Note that Subscribing Processors don't manage their own threads. Therefore, it is not possible to configure how they should receive their events. Effectively, they will always work on a sequential-per-aggregate basis, as that is generally the level of concurrency in the Command Handling component.

Multi-node processing

For tracking processors, it doesn't matter whether the Threads handling the events are all running on the same node, or on different nodes hosting the same (logical) TrackingProcessor. When two instances of TrackingProcessor, having the same name, are active on different machines, they are considered two instances of the same logical processor. They will 'compete' for segments of the Event Stream. Each instance will 'claim' a segment, preventing events assigned to that segment from being processed on the other nodes.

The TokenStore instance will use the JVM's name (usually a combination of the host name and process ID) as the default nodeId. This can be overridden in TokenStore implementations that support multi-node processing.

Distributing Events

In some cases, it is necessary to publish events to an external system, such as a message broker.

Spring AMQP

Axon provides out-of-the-box support to transfer Events to and from an AMQP message broker, such as Rabbit MQ.

Forwarding events to an AMQP Exchange

The SpringAMQPPublisher forwards events to an AMQP Exchange. It is initialized with a SubscribableMessageSource, which is generally the EventBus or EventStore. Theoretically, this could be any source of Events that the publisher can Subscribe to.

To configure the SpringAMQPPublisher, simply define an instance as a Spring Bean. There is a number of setter methods that allow you to specify the behavior you expect, such as Transaction support, publisher acknowledgements (if supported by the broker), and the exchange name.

The default exchange name is 'Axon.EventBus'.

Note

Note that exchanges are not automatically created. You must still declare the Queues, Exchanges and Bindings you wish to use. Check the Spring documentation for more information.

Reading Events from an AMQP Queue

Spring has extensive support for reading messages from an AMQP Queue. However, this needs to be 'bridged' to Axon, so that these messages can be handled from Axon as if they are regular Event Messages.

The SpringAMQPMessageSource allows Event Processors to read messages from a Queue, instead of the Event Store or Event Bus. It acts as an adapter between Spring AMQP and the SubscribableMessageSource needed by these processors.

The easiest way to configure the SpringAMQPMessageSource, is by defining a bean which overrides the default onMessage method and annotates it with @RabbitListener, as follows:

@Bean
public SpringAMQPMessageSource myMessageSource(Serializer serializer) {
    return new SpringAMQPMessageSource(serializer) {
        @RabbitListener(queues = "myQueue")
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            super.onMessage(message, channel);
        }
    };
}

Spring's @RabbitListener annotation tells Spring that this method needs to be invoked for each message on the given Queue ('myQueue' in the example). This method simply invokes the super.onMessage() method, which performs the actual publication of the Event to all the processors that have been subscribed to it.

To subscribe Processors to this MessageSource, pass the correct SpringAMQPMessageSource instance to the constructor of the Subscribing Processor:

// in an @Configuration file:
@Autowired
public void configure(EventHandlingConfiguration ehConfig, SpringAmqpMessageSource myMessageSource) {
    ehConfig.registerSubscribingEventProcessor("myProcessor", c -> myMessageSource);
}

Note that Tracking Processors are not compatible with the SpringAMQPMessageSource.

Asynchronous Event Processing

The recommended approach to handle Events asynchronously is by using a Tracking Event Processor. This implementation can guarantee processing of all events, even in case of a system failure (assuming the Events have been persisted).

However, it is also possible to handle Events asynchronously in a SubscribingProcessor. To achieve this, the SubscribingProcessor must be configured with an EventProcessingStrategy. This strategy can be used to change how invocations of the Event Listeners should be managed.

The default strategy (DirectEventProcessingStrategy) invokes these handlers in the thread that delivers the Events. This allows processors to use existing transactions.

The other Axon-provided strategy is the AsynchronousEventProcessingStrategy. It uses an Executor to asynchronously invoke the Event Listeners.

Even though the AsynchronousEventProcessingStrategy executes asynchronously, it is still desirable that certain events are processed sequentially. The SequencingPolicy defines whether events must be handled sequentially, in parallel or a combination of both. Policies return a sequence identifier of a given event. If the policy returns an equal identifier for two events, this means that they must be handled sequentially by the event handler. A null sequence identifier means the event may be processed in parallel with any other event.

Axon provides a number of common policies you can use:

  • The FullConcurrencyPolicy will tell Axon that this event handler may handle all events concurrently. This means that there is no relationship between the events that require them to be processed in a particular order.

  • The SequentialPolicy tells Axon that all events must be processed sequentially. Handling of an event will start when the handling of a previous event is finished.

  • SequentialPerAggregatePolicy will force domain events that were raised from the same aggregate to be handled sequentially. However, events from different aggregates may be handled concurrently. This is typically a suitable policy to use for event listeners that update details from aggregates in database tables.

Besides these provided policies, you can define your own. All policies must implement the SequencingPolicy interface. This interface defines a single method, getSequenceIdentifierFor, that returns the sequence identifier for a given event. Events for which an equal sequence identifier is returned must be processed sequentially. Events that produce a different sequence identifier may be processed concurrently. For performance reasons, policy implementations should return null if the event may be processed in parallel to any other event. This is faster, because Axon does not have to check for any restrictions on event processing.

It is recommended to explicitly define an ErrorHandler when using the AsynchronousEventProcessingStrategy. The default ErrorHandler propagates exceptions, but in an asynchronous execution, there is nothing to propagate to, other than the Executor. This may result in Events not being processed. Instead, it is recommended to use an ErrorHandler that reports errors and allows processing to continue. The ErrorHandler is configured on the constructor of the SubscribingEventProcessor, where the EventProcessingStrategy is also provided.

Replaying events

In cases when you want to rebuild projections (view models), replaying past events comes in handy. The idea is to start from the beginning of time and invoke all event handlers anew. The TrackingEventProcessor supports replaying of events. In order to achieve that, you should invoke the resetTokens() method on it. It is important to know that the Tracking Event Processor must not be in active state when starting a reset. Hence it is wise to shut it down first, then reset it and once this was successful, start it up again. It is possible to define a @ResetHandler, so you can do some preparation prior to resetting. Let's take a look how we can accomplish replaying. First, we'll see one simple projecting class:

@ProcessingGroup("projections")
public class MyProjection {
    ...
    @EventHandler
    public void on(MyEvent event, ReplayStatus replayStatus) { // we can wire a ReplayStatus here so we can see whether this
                                                             // event is delivered to our handler as a 'REGULAR' event or
                                                             // 'REPLAY' event
        // do event handling
    }

    @AllowReplay(false) // it is possible to prevent some handlers from being replayed
    @EventHandler
    public void on(MyOtherEvent event) {
        // perform some side effect introducing functionality, like sending an e-mail, which we do not want to be replayed
    }    

    @ResetHandler
    public void onReset() { // will be called before replay starts
        // do pre-reset logic, like clearing out the Projection table for a clean slate
    }
    ...
}

And now, we can reset our TrackingEventProcessor:

configuration.eventProcessingConfiguration()
             .eventProcessorByProcessingGroup("projections", TrackingEventProcessor.class)
             .ifPresent(trackingEventProcessor -> {
                 trackingEventProcessor.shutDown();
                 trackingEventProcessor.resetTokens();
                 trackingEventProcessor.start();
             });

Event Interceptors

Similarly as with Command Messages, Event Messages can also be intercepted prior to publishing and handling to perform additional actions on all Events. This thus boils down to same two types of interceptors for messages: the Dispatch- and the Handler Interceptor.

Dispatch Interceptors are invoked before a Event (Message) is published on the Event Bus. Handler Interceptors on the other hand are invoked just before the Event Handler is invoked with a given Event (Message) in the Event Processor. Examples of operations performed in an interceptor are logging or authentication, which you might want to do regardless of the type of Event.

Dispatch Interceptors

Any Message Dispatch Interceptors registered to an Event Bus will be invoked when an Event is published. They have the ability to alter the Event Message, by adding Meta Data for example, or they can provide you with overall logging capabilities for when an Event is published. These interceptors are always invoked on the thread that published the Event.

Let's create an Event Message Dispatch Interceptor which logs each Event message being published on an EventBus.

public class EventLoggingDispatchInterceptor implements MessageDispatchInterceptor<EventMessage<?>> {

    private static final Logger logger = LoggerFactory.getLogger(EventLoggingDispatchInterceptor.class);

    @Override
    public BiFunction<Integer, EventMessage<?>, EventMessage<?>> handle(List<? extends EventMessage<?>> messages) {
        return (index, event) -> {
            logger.info("Publishing event: [{}].", event);
            return event;
        };
    }
}

We can then register this dispatch interceptor with an EventBus by doing the following:

public class EventBusConfiguration {

    public EventBus configureEventBus(EventStorageEngine eventStorageEngine) {
        // Note that an EventStore is a more specific implementation of an EventBus
        EventBus eventBus = new EmbeddedEventStore(eventStorageEngine);
        eventBus.registerDispatchInterceptor(new EventLoggingDispatchInterceptor());
        return eventBus;
    }
}

Handler Interceptors

Message Handler Interceptors can take action both before and after Event processing. Interceptors can even block Event processing altogether, for example for security reasons.

Interceptors must implement the MessageHandlerInterceptor interface. This interface declares one method, handle, that takes three parameters: the (Event) Message, the current UnitOfWork and an InterceptorChain. The InterceptorChain is used to continue the dispatching process, whereas the UnitOfWork gives you (1) the message being handled and (2) provides the possibility to tie in logic prior, during or after (event) message handling (see UnitOfWork for more information about the phases).

Unlike Dispatch Interceptors, Handler Interceptors are invoked in the context of the Event Handler. That means they can attach correlation data based on the Message being handled to the Unit of Work, for example. This correlation data will then be attached to Event Messages being created in the context of that Unit of Work.

Let's create a Message Handler Interceptor which will only allow the handling of Events that contain axonUser as a value for the userId field in the MetaData. If the userId is not present in the meta-data, an exception will be thrown which will prevent the Event from being handled. And if the userId's value does not match axonUser, we will also not proceed up the chain. Authenticating the Event Message like shown in this example is a regular use case of the MessageHandlerInterceptor.

public class MyEventHandlerInterceptor implements MessageHandlerInterceptor<EventMessage<?>> {

    @Override
    public Object handle(UnitOfWork<? extends EventMessage<?>> unitOfWork, InterceptorChain interceptorChain) throws Exception {
        EventMessage<?> event = unitOfWork.getMessage();
        String userId = Optional.ofNullable(event.getMetaData().get("userId"))
                                .map(uId -> (String) uId)
                                .orElseThrow(IllegalEventException::new);
        if ("axonUser".equals(userId)) {
            return interceptorChain.proceed();
        }
        return null;
    }
}

We can register the handler interceptor with an EventProcessor like so:

public class EventProcessorConfiguration {

    public EventProcessingConfiguration eventProcessingConfiguration() {
        return new EventProcessingConfiguration()
                .registerTrackingEventProcessor("my-tracking-processor")
                .registerHandlerInterceptor("my-tracking-processor", configuration -> new MyEventHandlerInterceptor());
    }
}

Note

Different from the CommandBus and QueryBus, which both can have Handler and Dispatch Interceptors, the EventBus can only have registered Dispatch Interceptors. This is the case because the Event publishing part, so the place which is in control of Event Message dispatching, is the sole purpose of the Event Bus. The EventProcessors are in charge of handling the Event Messages, thus are the spot where the Handler Interceptors are registered.

Last updated