StreamingEventProcessor, or Streaming Processor for short, is a type of Event Processor. As any Event Processor, it serves as the technical aspect to handle events by invoking the event handlers written in an Axon application.
StreamableMessageSourceis an infrastructure component through which we can open a stream of events. The source can also specify positions on the event stream, so-called Tracking Tokens, used as start positions when opening an event stream. An example of a
EventStore, like for example Axon Server or an RDBMS.
StreamableMessageSource. Using separate threads decouples the
StreamingEventProcessorfrom other operations (e.g., event publication or command handling), allowing for cleaner separation within any application. Using separate threads allows for parallelization of the event load, either within a single JVM or between several.
StreamableMessageSource. The first time a stream has started, it, by default, will begin at the tail (the oldest/the very first token) of the stream. It keeps track of the event processing progress while traversing the stream. It does so by storing the Tracking Tokens, or tokens for short, accompanying the events. This solution works towards tracking the progress since the tokens specify the event's position on the stream.
Head or Tail?The oldest (very first) token is located at the tail of the stream, and the latest (newest) token is positioned at the head of the stream.
Default Event ProcessorWhich
EventProcessortype becomes the default processor depends on the event message source available in your application. In the majority of use cases, an Event Store is present. As the Event Store is a type of
StreamableMessageSource, the default will switch to the Tracking Event Processor.If the application only has an Event Bus configured, the framework will lack a
StreamableMessageSource. It will fall back to the Subscribing Event Processor as the default in these scenarios. This implementation will use the configured
TrackingEventProcessor, you can invoke the
., use the map notation:
TrackingEventProcessorConfigurationcan be used. When invoking the
registerTrackingEventProcessormethod, you can provide a tracking processor configuration object, or you can register the configuration instance explicitly:
PooledStreamingEventProcessor, you can invoke the
., use the map notation:
PooledStreamingProcessorConfigurationcan be used. When invoking the
registerPooledStreamingEventProcessormethod, you can provide a pooled streaming processor configuration object, or you can register the configuration instance explicitly:
TrackingEventProcessorwill retry processing the event using an incremental back-off period. It will start at 1 second and double after each attempt until it reaches the maximum wait time of 60 seconds per attempt. This back-off time ensures that in a distributed environment, when another node is able to process events, it will have the opportunity to claim the token required to process the event.
PooledStreamingEventProcessorsimply aborts the failed part of the process. The Pooled Streaming Processor can deal with this since the threading mode is different from the Tracking Processor. As such, the chance is high the failed process will be picked up quickly by another thread within the same JVM. This chance increases further whenever the PSEP instance is distributed over several application instances.
TrackingToken, the "token" for short. Such a token accompanies each message a streaming processor receives through its event stream. It's this token that:
TrackingTokenafter handling batches of events. Keeping the progress requires CRUD operation, for which the Streaming Processor uses the
TrackingToken. The processor will update this claim every time it has finished handling a batch of events. This so-called "claim extension" is, just as updating and saving of tokens, delegated to the Token Store. Hence, the Streaming Processors achieves collaboration among instances/threads through token claims.
StreamableMessageSourceto retrieve a stream of events that will open on start-up. It requires a
TrackingTokento open this stream, which it will fetch from the
TokenStore. However, if a Streaming Processor starts for the first time, there is no
TrackingTokenpresent to open the stream with yet.
A Saga's Streaming Processor initial positionA Streaming Processor dedicated to a Saga will default the initial token to the head of the stream. The default initial token position ensures that the Saga does not react to events from the past, as in most cases, this would introduce unwanted side effects.
TokenStorehas (accidentally) been cleared between application runs, thus losing the stored tokens.
InMemoryTokenStorewas used, and hence the processor could never persist the token to begin with.
StreamingEventProcessoris configurable for every processor instance. When configuring the initial token builder function, the received input parameter is the
StreamableMessageSource. The message source, in turn, gives three possibilities to build a token, namely:
createHeadToken()- Creates a token from the head of the event stream.
createTailToken()- Creates a token from the tail of the event stream. Creating tail tokens is the default value for most Streaming Processors.
createTokenSince(Duration)- Creates a token that tracks all events after a given time. If there is an event precisely at that given moment in time, it will also be taken into account.
StreamableMessageSourceinput parameter and create a token by yourself. Consider the following snippets if you want to configure a different initial token:
tokenClaimInterval- Defines how long to wait between attempts to claim a segment. A processor uses this value to steal token claims from other processor threads. This value defaults to 5000 milliseconds.
eventAvailabilityTimeout- Defines the time to "wait for events" before extending the claim. Only the Tracking Event Processor uses this. The value defaults to 1000 milliseconds.
claimExtensionThreshold- Threshold to extend the claim in the absence of events. Only the Pooled Streaming Event Processor uses this. The value defaults 5000 milliseconds.
TokenStoreprovides the CRUD operations for the
StreamingEventProcessorto interact with
TrackingTokens. The streaming processor will use the store to construct, fetch and claim tokens.
InMemoryTokenStoreis used. The
InMemoryTokenStoreis not recommended in most production scenarios since it cannot maintain the progress through application shutdowns. Unintentionally using the
InMemoryTokenStorecounts towards one of the unexpected scenarios where the framework creates an initial token on each application start-up.
TokenStoreimplementation that keeps the tokens in memory. This implementation does not suffice as a production-ready store in most applications.
TokenStoreimplementation using JPA to store the tokens with. Expects that a table is constructed based on the
org.axonframework.eventhandling.tokenstore.jpa.TokenEntry. It is easily auto-configurable with, for example, Spring Boot.
TokenStoreimplementation using JDBC to store the tokens with. Expects that the schema is constructed through the
TokenTableFactorycan be chosen here, like the
TokenStoreimplementation using Mongo to store the tokens with.
Where to store Tokens?Where possible, we recommend using a token store that stores tokens in the same database as to where the event handlers update the view models. This way, changes to the view model can be stored atomically with the changed tokens. Furthermore, it guarantees exactly-once processing semantics.
TokenStorefor all processors:
TokenStorefor a specific processor, use:
TokenStoreimplementation is defined based on dependencies available in Spring Boot, in the following order:
TokenStorebean is defined, that bean is used.
EntityManageris available, the
DataSourceis defined, the
InMemoryTokenstore is used.
EventProcessingConfigurer, which allows more fine-grained customization:
initialSegmentCountproperty. Only when a streaming processor starts for the first time can it initialize the number of segments to use. This requirement follows from the fact each token represents a single segment. Tokens, in turn, can only be initialized if they are not present yet, as is explained in more detail here.
PooledStreamingEventProcessoris one and sixteen, respectively.
Parallel Processing and Subscribing Event ProcessorsNote that Subscribing Event Processor don't manage their own threads. Therefore, it is not possible to configure how they should receive their events. Effectively, they will always work on a sequential-per-aggregate basis, as that is generally the level of concurrency in the command handling component.
SequencingPolicyfor this. The
SequencingPolicyis a function that returns a value for any given message. If the return value of the
SequencingPolicyfunction is equal for two distinct event messages, it means that those messages must be processed sequentially. By default, Axon components will use the
SequentialPerAggregatePolicy, making it so that events published by the same aggregate instance will be handled sequentially. Check out this section to understand how to influence the sequencing policy.
SequencingPolicycontrols this order. The
SequencingPolicydefines whether events must be handled sequentially, in parallel, or a combination of both. Policies return a sequence identifier of a given event.
SequencingPolicyreturns a different value for two events, they may be processed concurrently. Note that if the policy returns a
nullsequence identifier, the event may be processed in parallel with any other events.
** Parallel Processing and Sagas**A saga instance is never invoked concurrently by multiple threads. Therefore, the
SequencingPolicyis irrelevant for a saga. Axon will ensure each saga instance receives the events it needs to process in the order they have been published on the event bus.
SequentialPerAggregatePolicy- The default policy. It will force domain events that were raised from the same aggregate to be handled sequentially. Thus, events from different aggregates may be handled concurrently. This policy is typically suitable for Event Handling Components that update details from aggregates in databases.
FullConcurrencyPolicy- This policy will tell Axon that this Event Processor may handle all events concurrently. This means that there is no relationship between the events that require them to be processed in a particular order.
SequentialPolicy- This policy tells Axon that it can process all events sequentially. Handling of an event will start when the handling of a previous event has finished.
PropertySequencingPolicy- When configuring this policy, the user is required to provide a property name or property extractor function. This implementation provides a flexible solution to set up a custom sequencing policy based on a standard value present in your events. Note that this policy only reacts to properties present in the event class.
MetaDataSequencingPolicy- When configuring this policy, the user is required to provide a
metaDataKeyto be used. This implementation provides a flexible solution to set up a custom sequencing policy based on a standard value present in your events' metadata.
SequencingPolicyin a properties file, we should provide a bean name:
SequencingPolicyinterface. This interface defines a single method,
getSequenceIdentifierFor(T), that returns the sequence identifier for a given event:
StreamingEventProcessorto use several threads. The following section describes the threading differences between the Tracking- and Pooled Streaming Event Processor. These sections are followed up with samples on configuring multiple threads for the TEP and PSEP, respectively.
Thread and Segment CountAdjusting the number of threads will not automatically parallelize a Streaming Processor. A segment claim is required to let a thread process any events. Hence, increasing the thread count should be paired with adjusting the segment count.
ThreadFactoryto start the process of claiming segments. It will use a single thread per segment it is able to claim until the processor exhausts the configured amount of threads. Each thread will open a stream with the
StreamableMessageSourceand start processing events at their own speed. Other segment operations, like split and merge, are processed by the thread owning the segment operated on.
PooledStreamingEventProcessoruses two threads pools instead of the single fixed set of threads used by the
TrackingEventProcessor. The first thread pool is in charge of opening a stream with the event source, claiming as many segments as possible, and delegating all the work.
Coordinator. This coordinator defaults to using a
ScheduledExecutorServicewith a single thread, which suffices in most scenarios.
Coordinatorof the pooled streaming processor could claim. The
WorkPackagefor each segment and provides them the events to handle. The work package will, in turn, invoke the Event Handling Components to process the events. These packages run within the second thread pool, the so-called "worker executor" pool. The worker-pool also defaults to
ScheduledExecutorServicewith a single thread.
maxClaimedSegmentsis configurable if required (the defaults is
Short.MAX). The fact the TEP can only claim a single segment per thread highlights a problem of that implementation. Events will go unprocessed if there are more segments than threads when using the tracking processor since events belong to a single segment. Furthermore, it makes dynamic scaling tougher since you cannot adjust the number of threads at runtime. Here we see significant benefits for using the PSEP instead of the TEP since it completely drops the "one segment per thread" policy. As such, partial processing is never a problem, the
ScheduledExecutorServiceis configurable, which allows sharing the executor between different processor instances. Thus, the PSEP provides a higher level of flexibility towards optimizing the total amount of threads used within an application. The freedom in thread pool configuration is helpful when, for example, the number of different Event Processors in a single application increases.
Which Streaming Processor should I use?In most scenarios, the
PooledStreamingEventProcessoris the recommended processor implementation. We conclude this based on the segment-to-thread-count ratio, its ability to share thread pools, and the lower amount of opened event streams.The
TrackingEventProcessorwill still be ideal if you anticipate the processing speed between segments to differ significantly. Also, if the application does not have too many processor instances, the need to share thread pools is loosened.
TokenStore. By default, it will use the JVM's name (usually a combination of the hostname and process ID) as the
StreamingEventProcessor, with the
releaseSegment(int segmentId, long releaseDuration, TimeUnit unit)method
releaseSegmentmethod. When invoking
StreamingEventProcessorwill "let go of" the segment for some time.
StreamingEventProcessor, with the
StreamingProcessorController, there are a couple of points to consider. When invoking the split/merge operation on a
StreamingEventProcessor, that processor should be in charge of the segment you want to split or merge. Thus, either the streaming processor already has a claim on the segment(s) or can claim the segment(s). Without the claims, the processor will simply fail the split or merge operation.
segmentIdand the segment the framework will merge it with. We can calculate the segment identifier the provided
segmentId will be merged with through theSegment#mergeableSegmentId` method.
Segment Selection ConsiderationsWhen splitting or merging through Axon Server, it chooses the most appropriate segment to split or merge for you. When using the Axon Framework API directly, the developer should deduce the segment to split or segments to merge by themselves:
- Split: for fair balancing, a split is ideally performed on the largest segment
- Merge: for fair balancing, a merge is ideally performed on the smallest segment
StreamingEventProcessor. The following sections describe how to initiate a replay and what replay API the framework provides.
resetTokens()method and provides a couple of options:
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialTrackingTokenSupplier)- Resets the
TrackingTokento the results of the
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialTrackingTokenSupplier, R resetContext)- Resets the
TrackingTokento the results of the
initialTrackingTokenSupplier, providing the
resetTokens(TrackingToken startPosition)- Resets the
TrackingTokento the provided
Partial ReplaysA replay does not always have to start "from the beginning of time." Partially replaying the event stream suffices for a lot of applications.To perform a so-called "partial replay," you should provide the token at a specific point in time. The
createTokenSince(Duration)can be used for this.Be mindful that when initiating a partial replay, the event handlers may handle an event in the middle of model construction. Hence, event handlers need to be "aware" that some events might not have been handled at all. Making the event handlers lenient (e.g., deal with missing data) or performing ad-hoc manual replays would help in that area.
resetTokensoperation. Once the reset was successful, the processor can be started up again.