Subscribing Event Processor
The SubscribingEventProcessor, or Subscribing Processor for short, is a type of Event Processor.
As any Event Processor, it serves as the technical aspect to handle events by invoking the event handlers written in an Axon application.
The Subscribing Processor defines itself by receiving the events from a SubscribableEventSource.
The SubscribableEventSource is an infrastructure component to register a Subscribing Processor too.
After registration to the SubscribableEventSource, the event source gives the events to the SubscribingEventProcessor in the order they are received.
The most practical use of a Subscribing Processor is with a persistent stream as its event source, which allows for parallel processing and replay capabilities.
Axon currently provides two main types of SubscribableEventSource implementations:
-
EventBus - An interface for publishing and subscribing to events. It delivers events to subscribers in the publishing thread.
-
Persistent Stream - A event source backed by Axon Server that persists events and supports parallel processing across segments.
When using a simple bus solution like the EventBus as the event source, the Subscribing Processor will only receive current events.
Operations like replaying are, therefore, not an option for any Subscribing Processor.
Furthermore, when the EventBus is used as the event source, the event publishing thread is the same one that handles the event in the Subscribing Processor.
Although this coupled approach deserves a spot within the framework, most scenarios require further decoupling of components by separating the threads as well.
When, for example, an application requires event processing parallelization to get a higher performance, this can be a blocker.
This predicament is why the SubscribingEventProcessor is not the default in Axon Framework.
Instead, the "Pooled Streaming Event Processor" (a Streaming Processor implementation) takes up that role. It provides greater flexibility for developers for configuring the event processor in greater detail, including parallel processing and event replay capabilities. However, when using a Subscribing Processor with a persistent stream as its event source, similar benefits can be achieved.
Subscribing Processor Use Cases
The
SubscribingEventProcessoris a sane option in roughly two scenarios:
When using
SubscribableEventSourcebacked by a persisted and replayable store, like persistent streams.When an entity should be updated within the same thread and transaction that published the event.
Note that when you aim to follow the CQRS paradigm, that scenario two is not recommended between command and query models. When following CQRS, publishing an event should be regarded as updating a command model. Handling an event to update an entity is seen as a query model update in virtually all scenarios. Hence, think twice before deciding on scenario two!
Configuring
To configure a Subscribing Event Processor you have three main approaches:
-
Declarative configuration - Using the Configuration API directly
-
Bean-based configuration - Using
EventProcessorDefinitionbeans in Spring Boot -
Properties-based configuration - Using Spring Boot properties files
The examples below show how to configure the "example-processor" with a specific event source. For generic configuration options for any Event Processor, we refer to the general processor configuration section.
Declarative configuration
When using Axon’s configuration API directly, you should invoke the MessagingConfigurer#eventProcessing(Consumer<EventProcessingConfigurer>) operation.
The EventProcessingConfigurer lambda guides users towards:
-
First, selecting the Event Processor type,
-
second, register a lambda to retrieve the Event Handling Components, and,
-
third, customization for the event processor, if any.
The example below shows how this approach is used to define a (1) SubscribingEventProcessor with the (2) AnnotatedEventHandlingClass as the single event handling component, and (3) with a specific event source customization:
import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessorsConfigurer;
public class AxonConfig {
public void configureEventProcessing(MessagingConfigurer configurer) {
configurer.eventProcessing(eventConfigurer -> eventConfigurer.subscribing(
this::configureSubscribingProcessor
));
}
private SubscribingEventProcessorsConfigurer configureSubscribingProcessor(
SubscribingEventProcessorsConfigurer subscribingConfigurer
) {
return subscribingConfigurer.processor(
"example-processor",
config -> config.eventHandlingComponents(this::configureHandlingComponent)
.customized((c, subscribingConfig) -> subscribingConfig.eventSource(
c.getComponent(EventBus.class)
))
);
}
private EventHandlingComponentsConfigurer.AdditionalComponentPhase configureHandlingComponent(
EventHandlingComponentsConfigurer.RequiredComponentPhase componentConfigurer
) {
return componentConfigurer.autodetected(c -> new AnnotatedEventHandlingClass());
}
}
Properties-based configuration - Spring Boot properties file
A properties file allows the configuration of several fields for the Subscribing Event Processor. If more flexibility is required, like concretely defining the event handling components to attach, we recommend use of the declarative configuration instead.
axon.eventhandling.processors.example-processor.mode=subscribing
axon.eventhandling.processors.example-processor.source=eventBus
If the name of an event processor contains periods ., use the map notation:
axon.eventhandling.processors[example.processor].mode=subscribing
axon.eventhandling.processors[example.processor].source=eventBus
Autodetected configuration - EventProcessorDefinition based configuration
When using Spring Boot, you can define EventProcessorDefinition beans to configure subscribing event processors.
This approach provides programmatic control over which event handlers are assigned to each processor and how each processor is configured.
import org.axonframework.extension.spring.config.EventHandlerSelector;
import org.axonframework.extension.spring.config.EventProcessorDefinition;
import org.axonframework.messaging.eventhandling.EventBus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class EventProcessorConfig {
@Bean
public EventProcessorDefinition exampleProcessorDefinition(EventBus eventBus) {
return EventProcessorDefinition.subscribing("example-processor")
.assigningHandlers(EventHandlerSelector.matchesNamespaceOnType(
"orders"
))
.customized(config -> config
.eventSource(eventBus)
);
}
}
The assigningHandlers method uses the EventHandlerSelector to receive an EventHandlerDescriptor that provides access to:
-
beanName()- The Spring bean name -
beanType()- The event handler class -
beanDefinition()- The Spring bean definition -
component()- The component builder
This allows flexible handler selection based on naming conventions, packages, types, or other criteria.
In the example above, we use the convenience EventHandlerSelector#matchesNamespaceOnType method that assigns handlers that are annotated with @Namespace("orders") to an SubscribingEventProcessor with the name "example-processor".
Event Handlers can describe their @Namespace on several levels, as described here.
When the Event Processor name and namespace are identical, you can use the EventProcessorDefinition.subscribingMatching() to skip the assigningHandlers method automatically:
import org.axonframework.extension.spring.config.EventProcessorDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class EventProcessorConfig {
@Bean
public EventProcessorDefinition exampleProcessorDefinition() {
return EventProcessorDefinition.subscribingMatching("orders")
.customized(config -> config
.eventSource(eventBus)
);
}
}
Use notCustomized() instead of customized() when you don’t need custom processor settings but still want explicit handler assignment:
import org.axonframework.extension.spring.config.EventProcessorDefinition;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class EventProcessorConfig {
@Bean
public EventProcessorDefinition exampleProcessorDefinition() {
return EventProcessorDefinition.subscribingMatching("orders")
.notCustomized();
}
}
Advanced configuration with interceptors and error handling:
import org.axonframework.extension.spring.config.EventProcessorDefinition;
import org.axonframework.messaging.eventhandling.EventBus;
import org.axonframework.messaging.interceptors.MessageHandlerInterceptor;
import org.axonframework.messaging.eventhandling.EventMessage;
import org.axonframework.messaging.eventhandling.processing.error.PropagatingErrorHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class EventProcessorConfig {
@Bean
public EventProcessorDefinition advancedProcessorDefinition(
EventBus eventBus,
MessageHandlerInterceptor<? super EventMessage> interceptor) {
return EventProcessorDefinition.subscribing("advanced-processor")
.assigningHandlers(descriptor ->
descriptor.beanName().startsWith("advanced"))
.customized(config -> config
.eventSource(eventBus)
.withInterceptor(interceptor)
.errorHandler(PropagatingErrorHandler.INSTANCE));
}
}
|
Configuration precedence and handler assignment When using multiple configuration approaches:
The |
Error mode
Whenever the error handler rethrows an exception, the SubscribingEventProcessor will have it bubble up to the publishing component of the event.
Providing the exception to the event publisher allows the publishing component to deal with it accordingly.
Persistent streams
A Subscribing Processor can use a persistent stream as its event source. By using a persistent stream, a Subscribing Processor can process events in parallel and replay events.
For details on how to use persistent streams, consult the Persistent Streams reference page.