Persistent Streams
A Subscribing Processor can use a persistent stream as its event source. By using a persistent stream, a Subscribing Processor can process events in parallel and replay events.
When a processor uses a persistent stream, it receives events from Axon Server. After processing a batch of events, it sends an acknowledgment back to Axon Server.
The persistent stream can be split into segments to allow for parallel processing within a single client or across multiple instances of the client. The number of segments can be changed dynamically. Axon Server distributes the segments across the subscribers to ensure that all segments are connected.
Events are assigned to a specific segment based on the sequencing policy for the persistent stream. Persistent streams support all the standard sequencing policies that can also be used for streaming processors.
|
If no sequencing policy is specified for a persistent stream, the configuration defaults to |
Clients can provide a filter in the persistent stream definition. This reduces the number of events that the client receives from Axon Server. The filter expression uses the same syntax as the ad-hoc query option in Axon Server.
Persistent streams do not require a token store in the client like streaming processors do. The state of the stream is maintained in Axon Server.
Configuration
For a specific Event Processor to use a persistent stream as its source, the event source must be a PersistentStreamEventSource.
You can configure this using the declarative Configuration API.
See the examples below, or consult the general processor configuration section for further options.
Each persistent stream must be identified by a unique name within your Axon Server environment.
The PersistentStreamEventSource requires a PersistentStreamProperties to define the initial stream creation settings:
-
streamName: the unique identifier of the persistent stream in Axon Server; reusing an existing name joins the same server-side stream. -
segments: the initial number of segments for parallel processing. -
sequencingPolicyName: the name of the sequencing policy; use the constants onPersistentStreamSequencingPolicy(for example,SEQUENTIAL_PER_AGGREGATE_POLICY). -
sequencingPolicyParameters: list of parameters for the sequencing policy; required forPropertySequencingPolicyandMetadataSequencingPolicy. -
initialPosition: the position to start reading from, which is either a global event sequence number,"HEAD"(start from the latest event), or"TAIL"(start from the beginning of the event store). -
filter: an optional server-side filter expression in Axon Server Query Language; usenullto receive all events.
For the MetadataSequencingPolicy, the sequencingPolicyParameters must contain the name of one or more event metadata fields.
Events with the same value for these fields are routed to the same segment.
The PropertySequencingPolicy requires 4 values in the sequencingPolicyParameters list:
-
The serialization format of the events:
JSONorXML(currently only JSON or XML supported). -
The payload type to apply the policy on.
-
An expression to extract the sequencing value from the event payload (
JsonPathfor JSON,XPathfor XML). -
A fallback policy name to use when the event payload type does not match; this may be another
PropertySequencingPolicyto chain additional payload-type rules.
Declarative configuration
When using Axon’s Configuration API directly, retrieve the infrastructure components via the Configuration parameter and pass them to the PersistentStreamEventSource constructor:
import io.axoniq.axonserver.connector.event.PersistentStreamProperties;
import io.axoniq.framework.axonserver.connector.api.AxonServerConfiguration;
import io.axoniq.framework.axonserver.connector.api.AxonServerConnectionManager;
import io.axoniq.framework.axonserver.connector.event.PersistentStreamEventSource;
import io.axoniq.framework.axonserver.connector.event.PersistentStreamScheduledExecutorBuilder;
import io.axoniq.framework.axonserver.connector.event.PersistentStreamSequencingPolicy;
import org.axonframework.common.configuration.Configuration;
import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.conversion.EventConverter;
import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessorsConfigurer;
import java.util.Collections;
import java.util.concurrent.Executors;
public class AxonConfig {
public void configureEventProcessing(MessagingConfigurer configurer) {
configurer.eventProcessing(eventConfigurer -> eventConfigurer.subscribing(
this::configureSubscribingProcessor
));
}
private SubscribingEventProcessorsConfigurer configureSubscribingProcessor(
SubscribingEventProcessorsConfigurer subscribingConfigurer
) {
return subscribingConfigurer.processor(
"example-processor",
config -> config.eventHandlingComponents(this::configureHandlingComponent)
.customized((c, subscribingConfig) -> subscribingConfig.eventSource(
buildPersistentStreamSource(c)
))
);
}
private PersistentStreamEventSource buildPersistentStreamSource(Configuration c) {
String streamName = "example-persistent-stream";
int segmentCount = 4;
int batchSize = 1024;
PersistentStreamProperties properties = new PersistentStreamProperties(
streamName,
segmentCount,
PersistentStreamSequencingPolicy.SEQUENTIAL_PER_AGGREGATE_POLICY,
Collections.emptyList(),
"HEAD",
null // no server-side filter
);
return new PersistentStreamEventSource(
streamName,
c.getComponent(AxonServerConnectionManager.class),
c.getComponent(AxonServerConfiguration.class),
c.getComponent(EventConverter.class),
properties,
PersistentStreamScheduledExecutorBuilder.defaultFactory().build(segmentCount, streamName),
c.getComponent(UnitOfWorkFactory.class),
batchSize
);
}
private EventHandlingComponentsConfigurer.AdditionalComponentPhase configureHandlingComponent(
EventHandlingComponentsConfigurer.RequiredComponentPhase componentConfigurer
) {
return componentConfigurer.autodetected(c -> new AnnotatedEventHandlingClass());
}
}
Replaying events
Persistent stream backed event processors allow replaying events in a similar way as streaming event processors do. However, due to the nature of subscribing processors and the server-side implementation of persistent streams there are a few limitations compared to streaming processors:
-
the
ReplayTokenthat might be accessed from the processing context or as annotated handler parameter always reports the next token position as thetoken at resetinstead to the real position at which the reset was triggered (so a replay startet a position10will have a replay token for an event at position5report position6astoken at reset, the event at position6will report position7astoken at reset) -
@ReplayContextannotated handler parameters are not supported (they will be alwaysnull) -
@ResetHandlerannotated methods (used to run pre-reset logic) are not supported (they will simply be ignored)