Subscribing Event Processor
The SubscribingEventProcessor
, or Subscribing Processor for short, is a type of Event Processor.
As any Event Processor, it serves as the technical aspect to handle events by invoking the event handlers written in an Axon application.
The Subscribing Processor defines itself by receiving the events from a SubscribableMessageSource
.
The SubscribableMessageSource
is an infrastructure component to register a Subscribing Processor too.
After registration to the SubscribableMessageSource
, the message source gives the events to the SubscribingEventProcessor
in the order they are received.
Examples of a SubscribableMessageSource
are the EventBus
or the AMQP Extension.
Both the EventBus
and the AMQP Extension are simple message bus solutions for events.
The simple bus solution makes the SubscribableMessageSource
and thus the Subscribing Processor an approach to only receive current events.
Operations like replaying are, therefore, not an option for any Subscribing Processor as long as the SubscribableMessageSource
follows this paradigm.
Furthermore, the message source will use the same thread that receives the events to invoke the registered Subscribing Processors.
When the EventBus
is, for example, used as the message source, this means that the event publishing thread is the same one handling the event in the Subscribing Processor.
Although this approach deserves a spot within the framework, most scenarios require further decoupling of components by separating the threads as well.
When, for example, an application requires event processing parallelization to get a higher performance, this can be a blocker.
This predicament is why the SubscribingEventProcessor
is not the default in Axon Framework.
Instead, the "Tracking Event Processor" (a Streaming Processor implementation) takes up that role. It provides greater flexibility for developers for configuring the event processor in greater detail.
Subscribing Processor Use Cases
Although the
SubscribingEventProcessor
does not support easy parallelization or replays, there are still scenarios when it is beneficial.
Configuring
Other than configuring that an app uses a Subscribing Event Processor, everything is covered here.
Firstly, to specify that a new Event Processors should default to a SubscribingEventProcessor
, you can use the usingSubscribingEventProcessors
method:
Axon Configuration API
public class AxonConfig {
// omitting other configuration methods...
public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) {
processingConfigurer.usingSubscribingEventProcessors();
}
}
Spring Boot auto configuration
@Configuration
public class AxonConfig {
// omitting other configuration methods...
@Bean
public ConfigurerModule processorDefaultConfigurerModule() {
return configurer -> configurer.eventProcessing(EventProcessingConfigurer::usingSubscribingEventProcessors);
}
}
For a specific Event Processor to be a Subscribing instance, registerSubscribingEventProcessor
is used:
Axon Configuration API
public class AxonConfig {
// omitting other configuration methods...
public void configureSubscribingProcessors(EventProcessingConfigurer processingConfigurer) {
// To configure a processor to be subscribing ...
processingConfigurer.registerSubscribingEventProcessor("my-processor")
// ... to define a specific SubscribableMessageSource ...
.registerSubscribingEventProcessor("my-processor", conf -> /* create/return SubscribableMessageSource */);
}
}
Spring Boot auto configuration - Java
@Configuration
public class AxonConfig {
// omitting other configuration methods...
@Bean
public ConfigurerModule subscribingProcessorsConfigurerModule() {
return configurer -> configurer.eventProcessing(
// To configure a processor to be subscribing ...
processingConfigurer -> processingConfigurer.registerSubscribingEventProcessor("my-processor")
// ... to define a specific SubscribableMessageSource ...
.registerSubscribingEventProcessor(
"my-processor",
conf -> /* create/return SubscribableMessageSource */
)
);
}
}
Spring Boot auto configuration - Properties file
A properties file allows the configuration of some fields on an Event Processor. Do note that the Java configuration provides more degrees of freedom.
axon.eventhandling.processors.my-processor.mode=subscribing
axon.eventhandling.processors.my-processor.source=eventBus
If the name of an event processor contains periods .
, use the map notation:
axon.eventhandling.processors[my.processor].mode=subscribing
axon.eventhandling.processors[my.processor].source=eventBus
Error mode
Whenever the error handler rethrows an exception, the SubscribingEventProcessor
will have it bubble up to the publishing component of the event.
Providing the exception to the event publisher allows the publishing component to deal with it accordingly.
Persistent streams
Persistent streams require Axon Server version 2024.1 or higher.
A Subscribing Processor can use a Persistent Stream as its message source. By using a persistent stream we allow a Subscribing Processor to process events in parallel and to replay events.
When a processor uses a persistent stream, it receives events from Axon Server. After processing (a batch of) events, it sends an acknowledgment back to Axon Server.
The persistent stream can be split in segments to allow for parallel processing within a single client or across multiple instances of the client. The number of segments can be changed dynamically. Axon Server distributes the segments across the subscribers to ensure that all segments are connected.
Events are assigned to a specific segment based on the sequencing policy for the persistent stream. Persistent streams support all the standard sequencing policies that also can be used for streaming processors.
Clients can provide a filter in the persistent stream definition. This reduces the number of events that the client receives from Axon Server. The expression used to filter events are the same as expressions used in the ad-hoc query option in Axon Server.
Persistent streams do not require a token store in the client. The state of the stream is maintained in Axon Server.
Configuration
For a specific Event Processor to be a Subscribing instance using a persistent stream, registerSubscribingEventProcessor
is used. The message source for the event processor must be a PersistentStreamMessageSource
.
The PersistentStreamMessageSource
requires a PersistentStreamProperties
to set the initial properties to create the persistent stream. The properties contain:
-
streamName
: The name of the persistent stream. -
segments
: The initial number of segments. -
sequencingPolicyName
: The sequencing policy name. -
sequencingPolicyParameters
: List of parameters for the sequencing policy. -
initialPosition
: First token to read. -
filter
: Filter for events on Axon Server side, usenull
to receive all events.
The sequencingPolicyParameters
must be set if the sequencing policy is PropertySequencingPolicy
or MetaDataSequencingPolicy
.
For the MetaDataSequencingPolicy
, the sequencingPolicyParameters
must contain the name of one or more of the event’s metadata fields. Events with the same value for these fields are passed in the same segment.
The PropertySequencingPolicy
requires 4 values in the sequencingPolicyParameters
list.
-
The serialization type for the events. Supported values are
JSON
orXML
. -
The payload type to apply the policy on.
-
An expression to extract the property value from the event payload. If the serialization type is
JSON
this must be aJsonPath
expression. ForXML
this must be anXpath
expression. -
A fallback policy, the name of a sequencing policy to use if the payload type does not match the type specified in the second parameter. This may be
PropertySequencingPolicy
to specify an expression for another payload type. In this case add the serialization type, payload type, expression and fallback policy parameters for the alternative payload type.
Axon Configuration API
public class AxonConfig {
// omitting other configuration methods...
public void configureSubscribingProcessors(EventProcessingConfigurer processingConfigurer) {
// To configure a processor to be subscribing ...
processingConfigurer.registerSubscribingEventProcessor("my-processor", conf -> /* create/return PersistentStreamMessageSource */);
}
}
Spring Boot auto configuration - Java
@Configuration
public class AxonConfig {
// omitting other configuration methods...
@Bean
public ConfigurerModule subscribingProcessorsConfigurerModule() {
return configurer -> configurer.eventProcessing(
// To configure a processor to be subscribing ...
processingConfigurer -> processingConfigurer.registerSubscribingEventProcessor(
"my-processor",
conf -> /* create/return PersistentStreamMessageSource */
)
);
}
}
Spring Boot auto configuration - Properties file
A properties file allows the configuration of some fields on an Event Processor. Do note that the Java configuration provides more degrees of freedom.
The source property for the processor must point to a Spring Bean which is a PersistentStreamMessageSource or a PersistentStreamMessageSourceDefinition. This bean can also be defined in the properties file.
axon.eventhandling.processors.my-processor.mode=subscribing
axon.eventhandling.processors.my-processor.source=my-persistent-stream
axon.axonserver.persistent-streams.my-persistent-stream.name=My Persistent Stream
axon.axonserver.persistent-streams.my-persistent-stream.batch-size=100
axon.axonserver.persistent-streams.my-persistent-stream.initial-segment-count=4