Subscribing Event Processor
The SubscribingEventProcessor, or Subscribing Processor for short, is a type of Event Processor.
As any Event Processor, it serves as the technical aspect to handle events by invoking the event handlers written in an Axon application.
The Subscribing Processor defines itself by receiving the events from a SubscribableEventSource.
The SubscribableEventSource is an infrastructure component to register a Subscribing Processor too.
After registration to the SubscribableEventSource, the event source gives the events to the SubscribingEventProcessor in the order they are received.
The most practical use of a Subscribing Processor is with a persistent stream as its event source, which allows for parallel processing and replay capabilities.
Axon currently provides two main types of SubscribableEventSource implementations:
-
EventBus - An interface for publishing and subscribing to events. It delivers events to subscribers in the publishing thread.
-
Persistent Stream - A event source backed by Axon Server that persists events and supports parallel processing across segments.
When using a simple bus solution like the EventBus as the event source, the Subscribing Processor will only receive current events.
Operations like replaying are, therefore, not an option for any Subscribing Processor.
Furthermore, when the EventBus is used as the event source, the event publishing thread is the same one that handles the event in the Subscribing Processor.
Although this coupled approach deserves a spot within the framework, most scenarios require further decoupling of components by separating the threads as well.
When, for example, an application requires event processing parallelization to get a higher performance, this can be a blocker.
This predicament is why the SubscribingEventProcessor is not the default in Axon Framework.
Instead, the "Pooled Streaming Event Processor" (a Streaming Processor implementation) takes up that role. It provides greater flexibility for developers for configuring the event processor in greater detail, including parallel processing and event replay capabilities. However, when using a Subscribing Processor with a persistent stream as its event source, similar benefits can be achieved.
Subscribing Processor Use Cases
The
SubscribingEventProcessoris a sane option in roughly two scenarios:
When using
SubscribableEventSourcebacked by a persisted and replayable store, like persistent streams.When an entity should be updated within the same thread and transaction that published the event.
Note that when you aim to follow the CQRS paradigm, that scenario two is not recommended between command and query models. When following CQRS, publishing an event should be regarded as updating a command model. Handling an event to update an entity is seen as a query model update in virtually all scenarios. Hence, think twice before deciding on scenario two!
Configuring
To configure a Subscribing Event Processor you have roughly two approaches:
-
Autodetected configuration, like Spring
Both solutions have examples drafted below, showing how to configure the "example-processor" with a specific event source. For generic configuration options for any Event Processor, we refer to the general processor configuration section.
Declarative configuration
When using Axon’s configuration API, you should invoke the MessagingConfigurer#eventProcessing(Consumer<EventProcessingConfigurer>) operation.
The EventProcessingConfigurer lambda guides users towards:
-
First, selecting the Event Processor type,
-
second, register a lambda to retrieve the Event Handling Components, and,
-
third, customization for the event processor, if any.
The example below shows how this approach is used to define a (1) SubscribingEventProcessor with the (2) AnnotatedEventHandlingClass as the single event handling component, and (3) with a specific event source customization:
import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessorsConfigurer;
public class AxonConfig {
public void configureEventProcessing(MessagingConfigurer configurer) {
configurer.eventProcessing(eventConfigurer -> eventConfigurer.subscribing(
this::configureSubscribingProcessor
));
}
private SubscribingEventProcessorsConfigurer configureSubscribingProcessor(
SubscribingEventProcessorsConfigurer subscribingConfigurer
) {
return subscribingConfigurer.processor(
"example-processor",
config -> config.eventHandlingComponents(this::configureHandlingComponent)
.customized((c, subscribingConfig) -> subscribingConfig.eventSource(
c.getComponent(EventBus.class)
))
);
}
private EventHandlingComponentsConfigurer.AdditionalComponentPhase configureHandlingComponent(
EventHandlingComponentsConfigurer.RequiredComponentPhase componentConfigurer
) {
return componentConfigurer.autodetected(c -> new AnnotatedEventHandlingClass());
}
}
Autodetected configuration - Spring Boot properties file
A properties file allows the configuration of several fields for the Subscribing Event Processor. If more flexibility is required, like concretely defining the event handling components to attach, we recommend use of the declarative configuration instead.
axon.eventhandling.processors.example-processor.mode=subscribing
axon.eventhandling.processors.example-processor.source=eventBus
If the name of an event processor contains periods ., use the map notation:
axon.eventhandling.processors[example.processor].mode=subscribing
axon.eventhandling.processors[example.processor].source=eventBus
Error mode
Whenever the error handler rethrows an exception, the SubscribingEventProcessor will have it bubble up to the publishing component of the event.
Providing the exception to the event publisher allows the publishing component to deal with it accordingly.
Persistent streams
|
Persistent streams not available in Axon Framework 5.0
The Persistent Streams feature is not yet available in Axon Framework 5.0. It will be introduced in Axon Framework 5.2. The documentation below describes the Persistent Streams concepts for reference and future use. |
A Subscribing Processor can use a Persistent Stream as its event source. By using a persistent stream we allow a Subscribing Processor to process events in parallel and to replay events.
When a processor uses a persistent stream, it receives events from Axon Server. After processing (a batch of) events, it sends an acknowledgment back to Axon Server.
The persistent stream can be split in segments to allow for parallel processing within a single client or across multiple instances of the client. The number of segments can be changed dynamically. Axon Server distributes the segments across the subscribers to ensure that all segments are connected.
Events are assigned to a specific segment based on the sequencing policy for the persistent stream. Persistent streams support all the standard sequencing policies that also can be used for streaming processors.
Clients can provide a filter in the persistent stream definition. This reduces the number of events that the client receives from Axon Server. The expression used to filter events are the same as expressions used in the ad-hoc query option in Axon Server.
Persistent streams do not require a token store in the client. The state of the stream is maintained in Axon Server.
Configuration
For a specific Event Processor to be a Subscribing instance using a persistent stream, the declarative or autodetected configuration can be used.
The event source for the event processor must be a PersistentStreamMessageSource.
Each persistent stream must be identified by a unique name within your Axon Server environment. This name serves as the stream’s identifier, and it’s crucial to understand that creating a new stream with an existing name will overwrite the previous stream connection.
The PersistentStreamMessageSource requires a PersistentStreamProperties to set the initial properties to create the persistent stream.
The properties contain:
-
streamName: The name of the persistent stream. It’s a unique identifier of the connection with Axon Sever. Usage of the same name will overwrite the existing connection. -
segments: The initial number of segments. -
sequencingPolicyName: The sequencing policy name. -
sequencingPolicyParameters: List of parameters for the sequencing policy. -
initialPosition: First token to read. -
filter: Filter for events on Axon Server side, usenullto receive all events.
The sequencingPolicyParameters must be set if the sequencing policy is PropertySequencingPolicy or MetadataSequencingPolicy.
For the MetadataSequencingPolicy, the sequencingPolicyParameters must contain the name of one or more of the event’s metadata fields.
Events with the same value for these fields are passed in the same segment.
The PropertySequencingPolicy requires 4 values in the sequencingPolicyParameters list.
-
The serialization type for the events. Supported values are
JSONorXML. -
The payload type to apply the policy on.
-
An expression to extract the property value from the event payload. If the serialization type is
JSONthis must be aJsonPathexpression. ForXMLthis must be anXpathexpression. -
A fallback policy, the name of a sequencing policy to use if the payload type does not match the type specified in the second parameter. This may be
PropertySequencingPolicyto specify an expression for another payload type. In this case add the serialization type, payload type, expression and fallback policy parameters for the alternative payload type.
Declarative configuration
import io.axoniq.axonserver.connector.event.PersistentStreamProperties;
import org.axonframework.axonserver.connector.event.axon.PersistentStreamMessageSource;
import org.axonframework.axonserver.connector.event.axon.PersistentStreamSequencingPolicyProvider;
import org.axonframework.common.configuration.Configuration;
import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessorsConfigurer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
public class AxonConfig {
public void configureEventProcessing(MessagingConfigurer configurer) {
configurer.eventProcessing(eventConfigurer -> eventConfigurer.subscribing(
this::configureSubscribingProcessor
));
}
private SubscribingEventProcessorsConfigurer configureSubscribingProcessor(
SubscribingEventProcessorsConfigurer subscribingConfigurer
) {
return subscribingConfigurer.processor(
"example-processor",
config -> config.eventHandlingComponents(this::configureHandlingComponent)
.customized((c, subscribingConfig) -> subscribingConfig.eventSource(
constructPersistentStream(c)
))
);
}
public PersistentStreamMessageSource constructPersistentStream(Configuration configuration) {
String streamName = "example-persistent-stream-name";
int segmentCount = 4;
// Not required for sequential per aggregate policy.
// See PersistentStreamSequencingPolicyProvider JavaDoc for other policies.
List<String> sequencingPolicyParameters = new ArrayList<>();
// Optional parameter to filter events on Axon Server's side.
String filter = null;
int batchsize = 1024;
PersistentStreamProperties persistentStreamProperties = new PersistentStreamProperties(
streamName,
segmentCount,
PersistentStreamSequencingPolicyProvider.SEQUENTIAL_PER_AGGREGATE_POLICY,
sequencingPolicyParameters,
PersistentStreamProperties.HEAD_POSITION,
filter
);
return new PersistentStreamMessageSource(
streamName,
configuration,
persistentStreamProperties,
Executors.newScheduledThreadPool(4),
batchsize
);
}
private EventHandlingComponentsConfigurer.AdditionalComponentPhase configureHandlingComponent(
EventHandlingComponentsConfigurer.RequiredComponentPhase componentConfigurer
) {
return componentConfigurer.autodetected(c -> new AnnotatedEventHandlingClass());
}
}
Autodetected configuration - Spring Boot properties file
A properties file allows the configuration of some fields on an Event Processor. Do note that the Java configuration provides more degrees of freedom.
The source property for the processor must point to a Spring Bean which is a PersistentStreamMessageSource or a PersistentStreamMessageSourceDefinition.
This bean can also be defined in the properties file.
axon.eventhandling.processors.example-processor.mode=subscribing
axon.eventhandling.processors.example-processor.source=example-persistent-stream
axon.axonserver.persistent-streams.example-persistent-stream.name=Example Persistent Stream
axon.axonserver.persistent-streams.example-persistent-stream.batch-size=100
axon.axonserver.persistent-streams.example-persistent-stream.initial-segment-count=4