Persistent Streams

A Subscribing Processor can use a persistent stream as its event source. By using a persistent stream, a Subscribing Processor can process events in parallel and replay events.

When a processor uses a persistent stream, it receives events from Axon Server. After processing a batch of events, it sends an acknowledgment back to Axon Server.

The persistent stream can be split into segments to allow for parallel processing within a single client or across multiple instances of the client. The number of segments can be changed dynamically. Axon Server distributes the segments across the subscribers to ensure that all segments are connected.

Events are assigned to a specific segment based on the sequencing policy for the persistent stream. Persistent streams support all the standard sequencing policies that can also be used for streaming processors.

If no sequencing policy is specified for a persistent stream, the configuration defaults to SequentialPolicy. This policy instructs the persistent stream to process all events sequentially. Be sure to explicitly check and configure your sequencing policy to match your specific sequencing requirements. If you are using an aggregate-based approach with a non-DCB context in Axon Server, consider using the SequentialPerAggregatePolicy instead of the SequentialPolicy.

Clients can provide a filter in the persistent stream definition. This reduces the number of events that the client receives from Axon Server. The filter expression uses the same syntax as the ad-hoc query option in Axon Server.

Persistent streams do not require a token store in the client like streaming processors do. The state of the stream is maintained in Axon Server.

Configuration

For a specific Event Processor to use a persistent stream as its source, the event source must be a PersistentStreamEventSource. You can configure this using the declarative Configuration API.

See the examples below, or consult the general processor configuration section for further options.

Each persistent stream must be identified by a unique name within your Axon Server environment. The PersistentStreamEventSource requires a PersistentStreamProperties to define the initial stream creation settings:

  • streamName: the unique identifier of the persistent stream in Axon Server; reusing an existing name joins the same server-side stream.

  • segments: the initial number of segments for parallel processing.

  • sequencingPolicyName: the name of the sequencing policy; use the constants on PersistentStreamSequencingPolicy (for example, SEQUENTIAL_PER_AGGREGATE_POLICY).

  • sequencingPolicyParameters: list of parameters for the sequencing policy; required for PropertySequencingPolicy and MetadataSequencingPolicy.

  • initialPosition: the position to start reading from, which is either a global event sequence number, "HEAD" (start from the latest event), or "TAIL" (start from the beginning of the event store).

  • filter: an optional server-side filter expression in Axon Server Query Language; use null to receive all events.

For the MetadataSequencingPolicy, the sequencingPolicyParameters must contain the name of one or more event metadata fields. Events with the same value for these fields are routed to the same segment.

The PropertySequencingPolicy requires 4 values in the sequencingPolicyParameters list:

  1. The serialization format of the events: JSON or XML (currently only JSON or XML supported).

  2. The payload type to apply the policy on.

  3. An expression to extract the sequencing value from the event payload (JsonPath for JSON, XPath for XML).

  4. A fallback policy name to use when the event payload type does not match; this may be another PropertySequencingPolicy to chain additional payload-type rules.

Declarative configuration

When using Axon’s Configuration API directly, retrieve the infrastructure components via the Configuration parameter and pass them to the PersistentStreamEventSource constructor:

import io.axoniq.axonserver.connector.event.PersistentStreamProperties;
import io.axoniq.framework.axonserver.connector.api.AxonServerConfiguration;
import io.axoniq.framework.axonserver.connector.api.AxonServerConnectionManager;
import io.axoniq.framework.axonserver.connector.event.PersistentStreamEventSource;
import io.axoniq.framework.axonserver.connector.event.PersistentStreamScheduledExecutorBuilder;
import io.axoniq.framework.axonserver.connector.event.PersistentStreamSequencingPolicy;
import org.axonframework.common.configuration.Configuration;
import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.conversion.EventConverter;
import org.axonframework.messaging.eventhandling.processing.subscribing.SubscribingEventProcessorsConfigurer;

import java.util.Collections;
import java.util.concurrent.Executors;

public class AxonConfig {

    public void configureEventProcessing(MessagingConfigurer configurer) {
        configurer.eventProcessing(eventConfigurer -> eventConfigurer.subscribing(
                this::configureSubscribingProcessor
        ));
    }

    private SubscribingEventProcessorsConfigurer configureSubscribingProcessor(
            SubscribingEventProcessorsConfigurer subscribingConfigurer
    ) {
        return subscribingConfigurer.processor(
                "example-processor",
                config -> config.eventHandlingComponents(this::configureHandlingComponent)
                                .customized((c, subscribingConfig) -> subscribingConfig.eventSource(
                                        buildPersistentStreamSource(c)
                                ))
        );
    }

    private PersistentStreamEventSource buildPersistentStreamSource(Configuration c) {
        String streamName = "example-persistent-stream";
        int segmentCount = 4;
        int batchSize = 1024;

        PersistentStreamProperties properties = new PersistentStreamProperties(
                streamName,
                segmentCount,
                PersistentStreamSequencingPolicy.SEQUENTIAL_PER_AGGREGATE_POLICY,
                Collections.emptyList(),
                "HEAD",
                null  // no server-side filter
        );

        return new PersistentStreamEventSource(
                streamName,
                c.getComponent(AxonServerConnectionManager.class),
                c.getComponent(AxonServerConfiguration.class),
                c.getComponent(EventConverter.class),
                properties,
                PersistentStreamScheduledExecutorBuilder.defaultFactory().build(segmentCount, streamName),
                c.getComponent(UnitOfWorkFactory.class),
                batchSize
        );
    }

    private EventHandlingComponentsConfigurer.AdditionalComponentPhase configureHandlingComponent(
            EventHandlingComponentsConfigurer.RequiredComponentPhase componentConfigurer
    ) {
        return componentConfigurer.autodetected(c -> new AnnotatedEventHandlingClass());
    }
}

Replaying events

Persistent stream backed event processors allow replaying events in a similar way as streaming event processors do. However, due to the nature of subscribing processors and the server-side implementation of persistent streams there are a few limitations compared to streaming processors:

  • the ReplayToken that might be accessed from the processing context or as annotated handler parameter always reports the next token position as the token at reset instead to the real position at which the reset was triggered (so a replay startet a position 10 will have a replay token for an event at position 5 report position 6 as token at reset, the event at position 6 will report position 7 as token at reset)

  • @ReplayContext annotated handler parameters are not supported (they will be always null)

  • @ResetHandler annotated methods (used to run pre-reset logic) are not supported (they will simply be ignored)

Persistent streams and the dead letter queue

Dead letter queues not supported for persistent streams in Axon Framework 5.2

The dead letter queue support for persistent streams is not yet available in Axon Framework 5.2. It will be introduced in a later version of the Framework.