Streaming Event Processors
The StreamingEventProcessor, or Streaming Processor for short, is a type of Event Processor. As any Event Processor, it serves as the technical aspect to handle events by invoking the event handlers written in an Axon application.
The Streaming Processor defines itself by receiving the events from a StreamableMessageSource. The StreamableMessageSource is an infrastructure component through which we can open a stream of events. The source can also specify positions on the event stream, so-called Tracking Tokens, used as start positions when opening an event stream. An example of a StreamableMessageSource is the EventStore, like for example Axon Server or an RDBMS.
Furthermore, Streaming Processors use separate threads to process the events retrieved from the StreamableMessageSource. Using separate threads decouples the StreamingEventProcessor from other operations (e.g., event publication or command handling), allowing for cleaner separation within any application. Using separate threads allows for parallelization of the event load, either within a single JVM or between several.
When starting a Streaming Processor, it will open an event stream through the configured StreamableMessageSource. The first time a stream has started, it, by default, will begin at the tail (the oldest/the very first token) of the stream. It keeps track of the event processing progress while traversing the stream. It does so by storing the Tracking Tokens, or tokens for short, accompanying the events. This solution works towards tracking the progress since the tokens specify the event's position on the stream.
Head or Tail?
The oldest (very first) token is located at the tail of the stream, and the latest (newest) token is positioned at the head of the stream.
Maintaining the progress through tokens makes a Streaming Processor 1. able to deal with stopping and starting the processor, 2. more resilient against unintended shutdowns, and 3. the token provides a means to replay events by adjusting the position of tokens.
All combined, the Streaming Processor allows for decoupling, parallelization, resiliency, and replay-ability. It is these features that make the Streaming Processor the logical choice for the majority of applications. Due to this, the "Tracking Event Processor," a type of Streaming Processor, is the default Event Processor.
Default Event Processor
Which EventProcessor type becomes the default processor depends on the event message source available in your application. In the majority of use cases, an Event Store is present. As the Event Store is a type of StreamableMessageSource, the default will switch to the Tracking Event Processor.
If the application only has an Event Bus configured, the framework will lack a StreamableMessageSource. It will fall back to the Subscribing Event Processor as the default in these scenarios. This implementation will use the configured EventBus as its SubscribableMessageSource.
There are two implementations of Streaming Processor available in Axon Framework:
    1.
    the Tracking Event Processor (TEP for short), and
    2.
    the Pooled Streaming Event Processor (PSEP for short).
Both implementations support the same set of operations. Operations like replaying events through a reset, parallelism and tracking the progress with tokens. They diverge on their threading approach and work separation, as discussed in more detail in this section.

Configuring

The Streaming Processors have several additional components that you can configure, next to the base options. For other streaming processor features that are configurable, we refer to their respective sections for more details. This chapter will cover how to configure a Tracking or Pooled Streaming Processor respectively.

Configuring a Tracking Processor

Firstly, to specify that new event processors should default to a TrackingEventProcessor, you can invoke the usingTrackingEventProcessors method:
Axon Configuration API
Spring Boot AutoConfiguration
1
public class AxonConfig {
2
// ...
3
public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) {
4
processingConfigurer.usingTrackingEventProcessors();
5
}
6
}
Copied!
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) {
6
processingConfigurer.usingTrackingEventProcessors();
7
}
8
}
Copied!
For a specific Event Processor to be a Tracking instance, registerTrackingEventProcessor is used:
Axon Configuration API
Spring Boot AutoConfiguration - Java
Spring Boot AutoConfiguration - Properties file
1
public class AxonConfig {
2
// ...
3
public void configureTrackingProcessors(EventProcessingConfigurer processingConfigurer) {
4
// This configuration object allows for fine-grained control over the Tracking Processor
5
TrackingEventProcessorConfiguration tepConfig =
6
TrackingEventProcessorConfiguration.forSingleThreadedProcessing();
7
8
// To configure a processor to be tracking ...
9
processingConfigurer.registerTrackingEventProcessor("my-processor")
10
// ... to define a specific StreamableMessageSource ...
11
.registerTrackingEventProcessor(
12
"my-processor", conf -> /* create/return StreamableMessageSource */
13
)
14
// ... to provide additional configuration ...
15
.registerTrackingEventProcessor(
16
"my-processor", conf -> /* create/return StreamableMessageSource */,
17
conf -> tepConfig
18
);
19
}
20
}
Copied!
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void configureTrackingProcessors(EventProcessingConfigurer processingConfigurer) {
6
// This configuration object allows for fine-grained control over the Tracking Processor
7
TrackingEventProcessorConfiguration tepConfig =
8
TrackingEventProcessorConfiguration.forSingleThreadedProcessing();
9
10
// To configure a processor to be tracking ...
11
processingConfigurer.registerTrackingEventProcessor("my-processor")
12
// ... to define a specific StreamableMessageSource ...
13
.registerTrackingEventProcessor(
14
"my-processor", conf -> /* create/return StreamableMessageSource */
15
)
16
// ... to provide additional configuration ...
17
.registerTrackingEventProcessor(
18
"my-processor", conf -> /* create/return StreamableMessageSource */,
19
conf -> tepConfig
20
);
21
}
22
}
Copied!
A properties file allows the configuration of some fields on an Event Processor. Do note that the Java configuration provides more degrees of freedom.
1
axon.eventhandling.processors.my-processor.mode=tracking
2
axon.eventhandling.processors.my-processor.source=eventStore
Copied!
If the name of an event processor contains periods ., use the map notation:
1
axon.eventhandling.processors[my.processor].mode=tracking
2
axon.eventhandling.processors[my.processor].source=eventStore
Copied!
For more fine-grained control when configuring a Tracking Processor, the TrackingEventProcessorConfiguration can be used. When invoking the registerTrackingEventProcessor method, you can provide a tracking processor configuration object, or you can register the configuration instance explicitly:
Axon Configuration API
Spring Boot AutoConfiguration - Java
1
public class AxonConfig {
2
// ...
3
public void registerTrackingProcessorConfig(EventProcessingConfigurer processingConfigurer) {
4
TrackingEventProcessorConfiguration tepConfig =
5
TrackingEventProcessorConfiguration.forSingleThreadedProcessing();
6
7
// To register a default tracking config ...
8
processingConfigurer.registerTrackingEventProcessorConfiguration(config -> tepConfig)
9
// ... to register a config for a specific processor.
10
.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig);
11
}
12
}
Copied!
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void registerTrackingProcessorConfig(EventProcessingConfigurer processingConfigurer) {
6
TrackingEventProcessorConfiguration tepConfig =
7
TrackingEventProcessorConfiguration.forSingleThreadedProcessing();
8
9
// To register a default tracking config ...
10
processingConfigurer.registerTrackingEventProcessorConfiguration(config -> tepConfig)
11
// ... to register a config for a specific processor.
12
.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig);
13
}
14
}
Copied!

Configuring a Pooled Streaming Processor

Firstly, to specify that every new processors should default to a PooledStreamingEventProcessor, you can invoke the usingPooledStreamingProcessors method:
Axon Configuration API
Spring Boot AutoConfiguration
1
public class AxonConfig {
2
// ...
3
public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) {
4
processingConfigurer.usingPooledStreamingProcessors();
5
}
6
}
Copied!
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) {
6
processingConfigurer.usingPooledStreamingProcessors();
7
}
8
}
Copied!
For a specific Event Processor to be a Pooled Streaming instance, registerPooledStreamingProcessor is used:
Axon Configuration API
Spring Boot AutoConfiguration - Java
Spring Boot AutoConfiguration - Properties file
1
public class AxonConfig {
2
// ...
3
public void configurePooledStreamingProcessors(EventProcessingConfigurer processingConfigurer) {
4
// This configuration object allows for fine-grained control over the Pooled Streaming Processor
5
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
6
(config, builder) -> builder/* ... */;
7
8
// To configure a processor to be pooled streaming ...
9
processingConfigurer.registerPooledStreamingEventProcessor("my-processor")
10
// ... to define a specific StreamableMessageSource ...
11
.registerPooledStreamingEventProcessor(
12
"my-processor", conf -> /* create/return StreamableMessageSource */
13
)
14
// ... to provide additional configuration ...
15
.registerPooledStreamingEventProcessor(
16
"my-processor", conf -> /* create/return StreamableMessageSource */, psepConfig
17
);
18
}
19
}
Copied!
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void configurePooledStreamingProcessors(EventProcessingConfigurer processingConfigurer) {
6
// This configuration object allows for fine-grained control over the Pooled Streaming Processor
7
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
8
(config, builder) -> builder/* ... */;
9
10
// To configure a processor to be pooled streaming ...
11
processingConfigurer.registerPooledStreamingEventProcessor("my-processor")
12
// ... to define a specific StreamableMessageSource ...
13
.registerPooledStreamingEventProcessor(
14
"my-processor", conf -> /* create/return StreamableMessageSource */
15
)
16
// ... to provide additional configuration ...
17
.registerPooledStreamingEventProcessor(
18
"my-processor", conf -> /* create/return StreamableMessageSource */, psepConfig
19
);
20
}
21
}
Copied!
A properties file allows the configuration of some fields on an Event Processor. Do note that the Java configuration provides more degrees of freedom.
1
axon.eventhandling.processors.my-processor.mode=pooled
2
axon.eventhandling.processors.my-processor.source=eventStore
Copied!
If the name of an event processor contains periods ., use the map notation:
1
axon.eventhandling.processors[my.processor].mode=pooled
2
axon.eventhandling.processors[my.processor].source=eventStore
Copied!
For more fine-grained control when configuring a Pooled Streaming Processor, the PooledStreamingProcessorConfiguration can be used. When invoking the registerPooledStreamingEventProcessor method, you can provide a pooled streaming processor configuration object, or you can register the configuration instance explicitly:
Axon Configuration API
Spring Boot AutoConfiguration - Java
1
public class AxonConfig {
2
// ...
3
public void registerPooledStreamingProcessorConfig(EventProcessingConfigurer processingConfigurer) {
4
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
5
(config, builder) -> builder/* ... */;
6
7
// To register a default pooled streaming config ...
8
processingConfigurer.registerPooledStreamingEventProcessorConfiguration(psepConfig)
9
// ... to register a config for a specific processor.
10
.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);
11
}
12
}
Copied!
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void registerPooledStreamingProcessorConfig(EventProcessingConfigurer processingConfigurer) {
6
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
7
(config, builder) -> builder/* ... */;
8
9
// To register a default pooled streaming config ...
10
processingConfigurer.registerPooledStreamingEventProcessorConfiguration(psepConfig)
11
// ... to register a config for a specific processor.
12
.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);
13
}
14
}
Copied!

Error Mode

The error mode differs between the Tracking- and Pooled Streaming Event Processor.
Whenever the error handler rethrows an exception, a TrackingEventProcessor will retry processing the event using an incremental back-off period. It will start at 1 second and double after each attempt until it reaches the maximum wait time of 60 seconds per attempt. This back-off time ensures that in a distributed environment, when another node is able to process events, it will have the opportunity to claim the token required to process the event.
The PooledStreamingEventProcessor simply aborts the failed part of the process. The Pooled Streaming Processor can deal with this since the threading mode is different from the Tracking Processor. As such, the chance is high the failed process will be picked up quickly by another thread within the same JVM. This chance increases further whenever the PSEP instance is distributed over several application instances.

Tracking Tokens

A vital attribute of the Streaming Event Processor is its capability to keep and maintain the processing progress. It does so through the TrackingToken, the "token" for short. Such a token accompanies each message a streaming processor receives through its event stream. It's this token that:
    1.
    specifies the position of the event on the overall stream, and
    2.
    is used by the Streaming Processor to open the event stream at the desired position on start-up.
Using tokens gives the Streaming Event Processor several benefits, like:
    Being able to reopen the stream at any later point, picking up where it left off with the last event.
    Dealing with unintended shutdowns without losing track of the last events they've handled.
    Collaboration over the event handling load from two perspectives.
    First, the tokens make sure only a single thread is actively processing specific events.
    Secondly, it allows parallelization of the load over several threads or nodes of a Streaming Processor.
    Replaying events by adjusting the token position of that processor.
To be able to reopen the stream at a later point, we should keep the progress somewhere. The progress is kept by updating and saving the TrackingToken after handling batches of events. Keeping the progress requires CRUD operation, for which the Streaming Processor uses the TokenStore.
For a Streaming Processor to process any events, it needs "a claim" on a TrackingToken. The processor will update this claim every time it has finished handling a batch of events. This so-called "claim extension" is, just as updating and saving of tokens, delegated to the Token Store. Hence, the Streaming Processors achieves collaboration among instances/threads through token claims.
In the absence of a claim, a processor will actively try to retrieve one. If a token claim is not extended for a configurable time window, other processor threads are able to "steal" the claim. Token stealing can, for example, happen if event processing is slow or encountered some exceptions.

Initial Tracking Token

The Streaming Processor uses a StreamableMessageSource to retrieve a stream of events that will open on start-up. It requires a TrackingToken to open this stream, which it will fetch from the TokenStore. However, if a Streaming Processor starts for the first time, there is no TrackingToken present to open the stream with yet.
Whenever this situation occurs, a Streaming Processor will construct an "initial token." By default, the initial token will start at the tail of the event stream. Thus, the processor will begin at the start and handle every event present in the message source. This start position is configurable, as is described here.
A Saga's Streaming Processor initial position
A Streaming Processor dedicated to a Saga will default the initial token to the head of the stream. The default initial token position ensures that the Saga does not react to events from the past, as in most cases, this would introduce unwanted side effects.
Conceptually there are a couple of scenarios when a processor builds an initial token on application startup. The obvious one is already shared, namely when a processor starts for the first time. There are, however, also other situations when a token is built that might be unexpected, like:
    The TokenStore has (accidentally) been cleared between application runs, thus losing the stored tokens.
    The application running the processor starts in a new environment (e.g., test or acceptance) for the first time.
    An InMemoryTokenStore was used, and hence the processor could never persist the token to begin with.
    The application is (accidentally) pointing to another storage solution than expected.
Whenever a Streaming Processor's event handlers show unexpected behavior in the form of missed or reprocessed events, a new initial token might have been triggered. In those cases, we recommend to validate if any of the above situations occurred.

Token Configuration

There are a couple of things we can configure when it comes to tokens. We can separate these options in "initial token" and "token claim" configuration, as described in the following sections:
Initial Token
The initial token for a StreamingEventProcessor is configurable for every processor instance. When configuring the initial token builder function, the received input parameter is the StreamableMessageSource. The message source, in turn, gives three possibilities to build a token, namely:
    1.
    createHeadToken() - Creates a token from the head of the event stream.
    2.
    createTailToken() - Creates a token from the tail of the event stream. Creating tail tokens is the default value for most Streaming Processors.
    3.
    createTokenAt(Instant) / createTokenSince(Duration) - Creates a token that tracks all events after a given time.
    If there is an event precisely at that given moment in time, it will also be taken into account.
Of course, you can completely disregard the StreamableMessageSource input parameter and create a token by yourself. Consider the following snippets if you want to configure a different initial token:
Tracking Processor - Axon Configuration API
Tracking Processor - Spring Boot AutoConfiguration
Pooled Streaming Processor - Axon Configuration API
Pooled Streaming Processor - Spring Boot AutoConfiguration
1
public class AxonConfig {
2
// ...
3
public void configureInitialTrackingToken(EventProcessingConfigurer processingConfigurer) {
4
TrackingEventProcessorConfiguration tepConfig =
5
TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
6
.andInitialTrackingToken(StreamableMessageSource::createHeadToken);
7
8
processingConfigurer.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig);
9
}
10
}
Copied!
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void configureInitialTrackingToken(EventProcessingConfigurer processingConfigurer) {
6
TrackingEventProcessorConfiguration tepConfig =
7
TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
8
.andInitialTrackingToken(StreamableMessageSource::createTailToken);
9
10
processingConfigurer.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig);
11
}
12
}
Copied!
1
public class AxonConfig {
2
// ...
3
public void configureInitialTrackingToken(EventProcessingConfigurer processingConfigurer) {
4
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
5
(config, builder) -> builder.initialToken(messageSource -> messageSource.createTokenSince(
6
messageSource -> messageSource.createTokenAt(Instant.parse("20020-12-01T10:15:30.00Z"))
7
));
8
9
processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);
10
}
11
}
Copied!
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void configureInitialTrackingToken(EventProcessingConfigurer processingConfigurer) {
6
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
7
(config, builder) -> builder.initialToken(
8
messageSource -> messageSource.createTokenSince(Duration.ofDays(31))
9
);
10
11
processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);
12
}
13
}
Copied!
Token Claims
As described here, a streaming processor should claim a token before it is allowed to perform any processing work. There are several scenarios where a processor may keep the claim for too long. This can occur when, for example, the event handling process is slow or encountered an exception.
In those scenarios, another processor can steal a token claim to proceed with processing. There are a couple of configurable values that influence this process:
    tokenClaimInterval - Defines how long to wait between attempts to claim a segment.
    A processor uses this value to steal token claims from other processor threads. This value defaults to 5000 milliseconds.
    eventAvailabilityTimeout - Defines the time to "wait for events" before extending the claim.
    Only the Tracking Event Processor uses this. The value defaults to 1000 milliseconds.
    claimExtensionThreshold - Threshold to extend the claim in the absence of events.
    Only the Pooled Streaming Event Processor uses this. The value defaults 5000 milliseconds.
Consider the following snippets if you want to configure any of these values:
Tracking Processor - Axon Configuration API
Tracking Processor - Spring Boot AutoConfiguration
Pooled Streaming Processor - Axon Configuration API
Pooled Streaming Processor - Spring Boot AutoConfiguration
1
public class AxonConfig {
2
// ...
3
public void configureTokenClaimValues(EventProcessingConfigurer processingConfigurer) {
4
TrackingEventProcessorConfiguration tepConfig =
5
TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
6
.andTokenClaimInterval(1000, TimeUnit.MILLISECONDS)
7
.andEventAvailabilityTimeout(2000, TimeUnit.MILLISECONDS);
8
9
processingConfigurer.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig);
10
}
11
}
Copied!
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void configureTokenClaimValues(EventProcessingConfigurer processingConfigurer) {
6
TrackingEventProcessorConfiguration tepConfig =
7
TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
8
.andTokenClaimInterval(1000, TimeUnit.MILLISECONDS)
9
.andEventAvailabilityTimeout(2000, TimeUnit.MILLISECONDS);
10
11
processingConfigurer.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig);
12
}
13
}
Copied!
1
public class AxonConfig {
2
// ...
3
public void configureTokenClaimValues(EventProcessingConfigurer processingConfigurer) {
4
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
5
(config, builder) -> builder.tokenClaimInterval(2000)
6
.claimExtensionThreshold(3000);
7
8
processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);
9
}
10
}
Copied!
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void configureTokenClaimValues(EventProcessingConfigurer processingConfigurer) {
6
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
7
(config, builder) -> builder.tokenClaimInterval(2000)
8
.claimExtensionThreshold(3000);
9
10
processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);
11
}
12
}
Copied!

Token Store

The TokenStore provides the CRUD operations for the StreamingEventProcessor to interact with TrackingTokens. The streaming processor will use the store to construct, fetch and claim tokens.
When no token store is explicitly defined, an InMemoryTokenStore is used. The InMemoryTokenStore is not recommended in most production scenarios since it cannot maintain the progress through application shutdowns. Unintentionally using the InMemoryTokenStore counts towards one of the unexpected scenarios where the framework creates an initial token on each application start-up.
The framework provides a couple of TokenStore implementations:
    InMemoryTokenStore - A TokenStore implementation that keeps the tokens in memory.
    This implementation does not suffice as a production-ready store in most applications.
    JpaTokenStore - A TokenStore implementation using JPA to store the tokens with.
    Expects that a table is constructed based on the org.axonframework.eventhandling.tokenstore.jpa.TokenEntry.
    It is easily auto-configurable with, for example, Spring Boot.
    JdbcTokenStore - A TokenStore implementation using JDBC to store the tokens with.
    Expects that the schema is constructed through the JdbcTokenStore#createSchema(TokenTableFactory) method.
    Several TokenTableFactory can be chosen here, like the GenericTokenTableFactory, PostgresTokenTableFactory or Oracle11TokenTableFactory implementation.
    MongoTokenStore- A TokenStore implementation using Mongo to store the tokens with.
Where to store Tokens?
Where possible, we recommend using a token store that stores tokens in the same database as to where the event handlers update the view models. This way, changes to the view model can be stored atomically with the changed tokens. Furthermore, it guarantees exactly-once processing semantics.
Note that you can configure the token store to use for a streaming processor in the EventProcessingConfigurer:
Axon Configuration API
Spring Boot AutoConfiguration
To configure a TokenStore for all processors:
1
public class AxonConfig {
2
// ...
3
public void registerTokenStore(EventProcessingConfigurer processingConfigurer) {
4
TokenStore tokenStore = JpaTokenStore.builder()
5
// …
6
.build();
7
8
processingConfigurer.registerTokenStore(config -> tokenStore);
9
}
10
}
Copied!
Alternatively, to configure a TokenStore for a specific processor, use:
1
public class AxonConfig {
2
// ...
3
public void registerTokenStore(EventProcessingConfigurer processingConfigurer, String processorName) {
4
TokenStore tokenStore = JdbcTokenStore.builder()
5
// …
6
.build();
7
8
processingConfigurer.registerTokenStore(processorName, config -> tokenStore);
9
}
10
}
Copied!
The default TokenStore implementation is defined based on dependencies available in Spring Boot, in the following order:
    1.
    If any TokenStore bean is defined, that bean is used.
    2.
    Otherwise, if an EntityManager is available, the JpaTokenStore is defined.
    3.
    Otherwise, if a DataSource is defined, the JdbcTokenStore is created.
    4.
    Lastly, the InMemoryToken store is used.
To override the TokenStore, either define a bean in a Spring @Configuration class:
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Bean
5
public TokenStore myTokenStore() {
6
return JpaTokenStore.builder()
7
// …
8
.build();
9
}
10
}
Copied!
Alternatively, inject the EventProcessingConfigurer, which allows more fine-grained customization:
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void registerTokenStore(EventProcessingConfigurer processingConfigurer) {
6
TokenStore tokenStore = JdbcTokenStore.builder()
7
// …
8
.build();
9
10
processingConfigurer.registerTokenStore(conf -> tokenStore)
11
// or, to define one for a specific processor:
12
.registerTokenStore("my-processor", conf -> tokenStore);
13
}
14
}
Copied!

Parallel Processing

Streaming processors can use multiple threads to process an event stream. Using multiple threads allows the StreamingEventProcessor to more efficiently process batches of events. As described here, a streaming processor's thread requires a claim on a tracking token to process events.
Thus, to be able to parallelize the load, we require several tokens per processor. To that end, each token instance represents a segment of the event stream, wherein each segment is identified through a number. The stream segmentation approach ensures events aren't handled twice (or more), as that would otherwise introduce unintentional duplication. Due to this, the Streaming Processor's API references segment claims instead of token claims throughout.
You can define the number of segments used by adjusting the initialSegmentCount property. Only when a streaming processor starts for the first time can it initialize the number of segments to use. This requirement follows from the fact each token represents a single segment. Tokens, in turn, can only be initialized if they are not present yet, as is explained in more detail here.
Whenever the number of segments should be adjusted during runtime, you can use the split and merge functionality. To adjust the number of initial segments, consider the following sample:
Tracking Processor - Axon Configuration API
Tracking Processor - Spring Boot AutoConfiguration
Pooled Streaming Processor - Axon Configuration API
Pooled Streaming Processor - Spring Boot AutoConfiguration
Spring Boot AutoConfiguration - Properties File
The default number of segments for a TrackingEventProcessor is one.
1
public class AxonConfig {
2
// ...
3
public void configureSegmentCount(EventProcessingConfigurer processingConfigurer) {
4
TrackingEventProcessorConfiguration tepConfig =
5
TrackingEventProcessorConfiguration.forParallelProcessing(2)
6
.andInitialSegmentsCount(2);
7
8
processingConfigurer.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig);
9
}
10
}
Copied!
The default number of segments for a TrackingEventProcessor is one.
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void configureSegmentCount(EventProcessingConfigurer processingConfigurer) {
6
TrackingEventProcessorConfiguration tepConfig =
7
TrackingEventProcessorConfiguration.forParallelProcessing(2)
8
.andInitialSegmentsCount(2);
9
10
processingConfigurer.registerTrackingEventProcessorConfiguration("my-processor", config -> tepConfig);
11
}
12
}
Copied!
The default number of segments for a PooledStreamingEventProcessor is sixteen.
1
public class AxonConfig {
2
// ...
3
public void configureSegmentCount(EventProcessingConfigurer processingConfigurer) {
4
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
5
(config, builder) -> builder.initialSegmentCount(32);
6
7
processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);
8
}
9
}
Copied!
The default number of segments for a PooledStreamingEventProcessor is sixteen.
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void configureSegmentCount(EventProcessingConfigurer processingConfigurer) {
6
EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
7
(config, builder) -> builder.initialSegmentCount(32);
8
9
processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);
10
}
11
}
Copied!
The default number of segments for a TrackingEventProcessor and PooledStreamingEventProcessor is one and sixteen, respectively.
1
axon.eventhandling.processors.my-processor.mode=pooled
2
# Sets the initial number of segments
3
axon.eventhandling.processors.my-processor.initialSegmentCount=32
Copied!
Parallel Processing and Subscribing Event Processors
Note that Subscribing Event Processor don't manage their own threads. Therefore, it is not possible to configure how they should receive their events. Effectively, they will always work on a sequential-per-aggregate basis, as that is generally the level of concurrency in the command handling component.
The Event Handling Components a processor is in charge of may have specific expectations on the event order. The ordering is guaranteed when only a single thread is processing events. Maintaining the ordering requires additional work when the stream is segmented for parallel processing, however. When this is the case, the processor must ensure it sends the events to these handlers in that specific order.
Axon uses the SequencingPolicy for this. The SequencingPolicy is a function that returns a value for any given message. If the return value of the SequencingPolicy function is equal for two distinct event messages, it means that those messages must be processed sequentially. By default, Axon components will use the SequentialPerAggregatePolicy, making it so that events published by the same aggregate instance will be handled sequentially. Check out this section to understand how to influence the sequencing policy.
Each node running a streaming processor will attempt to start its configured amount of threads to start processing events. The number of segments that a single thread can claim differ between the Tracking- and Pooled Streaming Event Processor. A tracking processor can only claim a single segment per thread, whereas the pooled streaming processor can claim any amount of segments per thread. These approaches provide different pros and cons for each implementation, which this section explains further.

Sequential Processing

Even though events are processed asynchronously from their publisher, it is often desirable to process certain events in their publishing order. In Axon, the SequencingPolicy controls this order. The SequencingPolicy defines whether events must be handled sequentially, in parallel, or a combination of both. Policies return a sequence identifier of a given event.
If the policy returns the same identifier for two events, they must be handled sequentially by the Event Handling Component. Thus, if the SequencingPolicy returns a different value for two events, they may be processed concurrently. Note that if the policy returns a null sequence identifier, the event may be processed in parallel with any other events.
Parallel Processing and Sagas
A saga instance is never invoked concurrently by multiple threads. Therefore, the SequencingPolicy is irrelevant for a saga. Axon will ensure each saga instance receives the events it needs to process in the order they have been published on the event bus.
Conceptually, the SequencingPolicy decides whether an event belongs to a given segment. Furthermore, Axon guarantees that Events that are part of the same segment are processed sequentially.
The framework provides several policies you can use out of the box:
    SequentialPerAggregatePolicy - The default policy.
    It will force domain events that were raised from the same aggregate to be handled sequentially.
    Thus, events from different aggregates may be handled concurrently.
    This policy is typically suitable for Event Handling Components that update details from aggregates in databases.
    FullConcurrencyPolicy - This policy will tell Axon that this Event Processor may handle all events concurrently.
    This means that there is no relationship between the events that require them to be processed in a particular order.
    SequentialPolicy - This policy tells Axon that it can process all events sequentially.
    Handling of an event will start when the handling of a previous event has finished.
    PropertySequencingPolicy - When configuring this policy, the user is required to provide a property name or property extractor function.
    This implementation provides a flexible solution to set up a custom sequencing policy based on a standard value present in your events.
    Note that this policy only reacts to properties present in the event class.
Consider the following snippets when configuring a (custom) SequencingPolicy:
Axon Configuration API
Spring Boot AutoConfiguration
Spring Boot AutoConfiguration - Properties File
1
public class AxonConfig {
2
// ...
3
public void configureSequencingPolicy(EventProcessingConfigurer processingConfigurer) {
4
PropertySequencingPolicy<SomeEvent, String> mySequencingPolicy =
5
PropertySequencingPolicy.builder(SomeEvent.class, String.class)
6
.propertyName("myProperty")
7
.build();
8
9
processingConfigurer.registerDefaultSequencingPolicy(config -> mySequencingPolicy)
10
// or, to define one for a specific processor:
11
.registerSequencingPolicy("my-processor", config -> mySequencingPolicy);
12
}
13
}
Copied!
1
@Configuration
2
public class AxonConfig {
3
// ...
4
@Autowired
5
public void configureSequencingPolicy(EventProcessingConfigurer processingConfigurer,
6
SequencingPolicy<EventMessage<?>> mySequencingPolicy) {
7
8
processingConfigurer.registerDefaultSequencingPolicy(config -> mySequencingPolicy)
9
// or, to define one for a specific processor:
10
.registerSequencingPolicy("my-processor", config -> mySequencingPolicy);
11
}
12
13
@Bean
14
public SequencingPolicy<EventMessage<?>> mySequencingPolicy() {
15
return new SequentialPolicy();
16
}