Event Snapshots

Snapshotting

When aggregates live for a long time, and their state constantly changes, they will generate a large amount of events. Having to load all these events in to rebuild an aggregate’s state may have a big performance impact. The snapshot event is a domain event with a special purpose: it summarises an arbitrary amount of events into a single one. By regularly creating and storing a snapshot event, the event store does not have to return long lists of events. Just the latest snapshot events and all events that occurred after the snapshot was made.

For example, items in stock tend to change quite often. Each time an item is sold, an event reduces the stock by one. Every time a shipment of new items comes in, the stock is incremented by some larger number. If you sell a hundred items each day, you will produce at least 100 events per day. After a few days, your system will spend too much time reading in all these events just to find out whether it should raise an "ItemOutOfStockEvent". A single snapshot event could replace a lot of these events, just by storing the current number of items in stock.

To measure it to know!

AxonIQ Console can measure the number of events that were loaded during command handling, so you know if you need to create snapshots or not. It’s also able to track many more interesting metrics, such as the load time of the aggregate, and the time it took to store the events.

Creating a snapshot

Snapshot creation can be triggered by a number of factors, for example: the number of events created since the last snapshot, the time to initialize an aggregate exceeds a certain threshold, time-based, etc. Currently, Axon provides a mechanism that allows you to trigger snapshots based on an event count threshold.

The definition of when snapshots should be created, is provided by the SnapshotTriggerDefinition interface.

The EventCountSnapshotTriggerDefinition provides the mechanism to trigger snapshot creation when the number of events needed to load an aggregate exceeds a certain threshold. If the number of events needed to load an aggregate exceeds a certain configurable threshold, the trigger tells a Snapshotter to create a snapshot for the aggregate.

The snapshot trigger is configured on an event sourcing repository and has a number of properties that allow you to tweak triggering:

  • Snapshotter sets the actual snapshotter instance, responsible for creating and storing the actual snapshot event;

  • Trigger sets the threshold at which to trigger snapshot creation;

A Snapshotter is responsible for the actual creation of a snapshot. Typically, snapshotting is a process that should disturb the operational processes as little as possible. Therefore, it is recommended to run the snapshotter in a different thread. The Snapshotter interface declares a single method: scheduleSnapshot(), which takes the aggregate’s type and identifier as parameters.

Axon provides the AggregateSnapshotter, which creates and stores AggregateSnapshot instances. This is a special type of snapshot, since it contains the actual aggregate instance within it. The repositories provided by Axon are aware of this type of snapshot, and will extract the aggregate from it, instead of instantiating a new one. All events loaded after the snapshot events are streamed to the extracted aggregate instance.

Serializing a Snapshot Event

Do make sure the Serializer instance you use (which defaults to the XStreamSerializer) is capable of serializing your aggregate. The XStreamSerializer requires you to use either a Hotspot JVM, or your aggregate must either have an accessible default constructor or implement the Serializable interface.

The AbstractSnapshotter provides a basic set of properties that allow you to tweak the way snapshots are created:

  • EventStore sets the event store, which is used to load past events and store the snapshots. This event store must implement the SnapshotEventStore interface.

  • Executor sets the executor, such as a ThreadPoolExecutor that will provide the thread to process actual snapshot creation. By default, snapshots are created in the thread that calls the scheduleSnapshot() method, which is generally not recommended for production.

The AggregateSnapshotter provides one more property:

  • AggregateFactories is the property that allows you to set the factories that will create instances of your aggregates. Configuring multiple aggregate factories allows you to use a single Snapshotter to create snapshots for a variety of aggregate types. The EventSourcingRepository implementations and the AggregateConfiguration provide access to the AggregateFactory being used for a given Aggregate. Both provide the factory through the EventSourcingRepository#getAggregateFactory and AggregateConfiguration#aggregateFactory methods respectively. The result from either can be used to configure the same aggregate factories in the Snapshotter as the ones used by the Aggregate.

Snapshotter Configuration

If you use an executor that executes snapshot creation in another thread, make sure you configure the correct transaction management for your underlying event store, if necessary.

For both non-Spring and Spring users a default Snapshotter is provided. The former uses the Configuration API to provide a default AggregateSnapshotter, retrieving the aggregate factories from the registered Aggregates / AggregateConfiguration`s. Spring uses a `SpringAggregateSnapshotter, which will automatically looks up the right AggregateFactory instances from the application context when a snapshot needs to be created.

The @Revision annotation has a dedicated, automatically configured SnapshotFilter implementation. This implementation is used to filter out non-matching snapshots from the Repository’s loading process. So when the `@Revision annotation is used on an aggregate the snapshots will be filtered out automatically. When the`@Revision` on an aggregate is missing a RevisionSnapshotFilter is configured for revision null.

Axon Configuration API

AggregateConfigurer<GiftCard> giftCardConfigurer =
        AggregateConfigurer.defaultConfiguration(GiftCard.class)
                           .configureSnapshotTrigger(config -> new EventCountSnapshotTriggerDefinition(
                                   config.snapshotter(), 500
                           ));
Configurer configurer = DefaultConfigurer.defaultConfiguration()
                                         .configureAggregate(giftCardConfigurer);

Spring Boot auto configuration

It is possible to define a custom SnapshotTriggerDefinition for an aggregate as a Spring bean. In order to tie the SnapshotTriggerDefinition bean to an aggregate, use the snapshotTriggerDefinition attribute on @Aggregate annotation. Listing below shows how to define a custom EventCountSnapshotTriggerDefinition which will take a snapshot every 500 events.

Note that a Snapshotter instance, if not explicitly defined as a bean already, will be automatically configured for you. This means you can simply pass the Snapshotter as a parameter to your SnapshotTriggerDefinition.

@Bean
public SnapshotTriggerDefinition giftCardSnapshotTrigger(Snapshotter snapshotter) {
    return new EventCountSnapshotTriggerDefinition(snapshotter, 500);
}

...

@Aggregate(snapshotTriggerDefinition = "giftCardSnapshotTrigger")
public class GiftCard {...}

Storing snapshot events

When a snapshot is stored in the event store, it will automatically use that snapshot to summarize all prior events and return it in their place. All event store implementations allow for concurrent creation of snapshots. This means they allow snapshots to be stored while another process is adding events for the same aggregate. This allows the snapshotting process to run as a separate process altogether.

Snapshots as a replacement of your events?

Normally, you can archive all events once they are part of a snapshot event. Snapshotted events will never be read in again by the event store in regular operational scenarios. However, if you want to be able to reconstruct an aggregate state prior to the moment the snapshot was created, you must keep the events up to that date.

Axon provides a special type of snapshot event: the AggregateSnapshot, which stores an entire aggregate as a snapshot. The motivation is simple: your aggregate should only contain the state relevant to take business decisions. This is exactly the information you want captured in a snapshot. All event sourcing repositories provided by Axon recognize the AggregateSnapshot, and will extract the aggregate from it. Beware that using this snapshot event requires that the event serialization mechanism needs to be able to serialize the aggregate.

Filtering snapshot events

When enabling snapshotting, several snapshots would be stored per Aggregate instance in the event store. At a certain stage, some of these snapshot events are no longer being used by the application as newer versions took their place. Especially if these snapshot events portray an old format of the aggregate by using the AggregateSnapshot event would it be smart to no longer load these.

You could take the stance of dropping all the snapshots which are stored (for a given aggregate type), but this means snapshots will be recreated with a 100% certainty. It is also possible to filter out snapshot events when reading your Aggregate from the event store. To that end, a SnapshotFilter can be defined per Aggregate type or for the entire EventStore.

The SnapshotFilter is a functional interface, providing two main operations: allow(DomainEventData<?) and combine(SnapshotFilter). The former provides the DomainEventData which reflects the snapshot events. The latter allows combining several `SnapshotFilter`s together.

The following snippets show how to configure a SnapshotFilter:

Axon Configuration API

SnapshotFilter giftCardSnapshotFilter = snapshotData -> /* allow or disallow this snapshotData */;

AggregateConfigurer<GiftCard> giftCardConfigurer =
        AggregateConfigurer.defaultConfiguration(GiftCard.class)
                           .configureSnapshotFilter(config -> giftCardSnapshotFilter);
Configurer configurer = DefaultConfigurer.defaultConfiguration()
                                         .configureAggregate(giftCardConfigurer);

Spring Boot auto configuration

It is possible to define a custom SnapshotFilter for an aggregate as a Spring bean. In order to tie the SnapshotFilter bean to an aggregate, use the snapshotFilter attribute on @Aggregate annotation.

@Bean
public SnapshotFilter giftCardSnapshotFilter() {
    return snapshotData -> /* allow or disallow this snapshotData */;
}

...

@Aggregate(snapshotFilter = "giftCardSnapshotFilter")
public class GiftCard {...}

The above snippet would be feasible to follow if fine-grained control is required when filtering snapshots from the store. For example, when your snapshots are not based on the Aggregate class (which is the default). When this is not required, you can base yourself on the default SnapshotFilter - the RevisionSnapshotFilter.

To configure this SnapshotFilter, all you have to do is use the @Revision annotation on your Aggregate class. In doing so, the RevisionSnapshotFilter is set, filtering non-matching snapshots from the Repository’s loading process, based on the value maintained within the `@Revision annotation.

Through this, with every new production deployment of your application that adjusts the Aggregate state, you would only have to adjust the revision value in the annotation. Check out the following example for how to set this up:

// "1" is an example revision value.
// You're free to choose whatever value that fits your application's versioning scheme.
@Revision("1")
public class GiftCard {
    // Omitted aggregate internals for simplicity.
}

Initializing an aggregate based on a snapshot event

A snapshot event is an event like any other. That means a snapshot event is handled just like any other domain event. When using annotations to demarcate event handlers (@EventHandler), you can annotate a method that initializes full aggregate state based on a snapshot event. The code sample below shows how snapshot events are treated like any other domain event within the aggregate.

public class MyAggregate extends AbstractAnnotatedAggregateRoot {

    // ...

    @EventHandler
    protected void handleSomeStateChangeEvent(MyDomainEvent event) {
        // ...
    }

    @EventHandler
    protected void applySnapshot(MySnapshotEvent event) {
        // the snapshot event should contain all relevant state
        this.someState = event.someState;
        this.otherState = event.otherState;
    }
}

There is one type of snapshot event that is treated differently: the AggregateSnapshot. This type of snapshot event contains the actual aggregate. The aggregate factory recognizes this type of event and extracts the aggregate from the snapshot. Then, all other events are re-applied to the extracted snapshot. That means aggregates never need to be able to deal with AggregateSnapshot instances themselves.

Caching

A well-designed command handling module should pose no problems when implementing caching. Especially when using event sourcing, loading an aggregate from an Event Store can be an expensive operation. With a properly configured cache in place, loading an aggregate can be converted into a pure in-memory process.

To that end, Axon allows the configuration of a Cache object. The framework currently provides several implementations to choose from:

  • WeakReferenceCache - An in-memory cache solution. In most scenarios, this is a good start.

  • EhCacheAdapter - An AbstractCacheAdapter, wrapping EhCache into a usable solution for Axon. This can be used with major version 2, and is therefore deprecated.

  • EhCache3Adapter - An AbstractCacheAdapter, wrapping EhCache into a usable solution for Axon. This can be used only with major version 3. Which has a different group name than version 2.

  • JCacheAdapter - An AbstractCacheAdapter, wrapping JCache into a usable solution for Axon.

  • AbstractCacheAdapter - Abstract implementation towards supporting Axon’s Cache API. Helpful in writing an adapter for a cache implementation that Axon does not support out of the box.

Before configuring a Cache, please consider the following guidelines. They will help you get the most out of your caching solution:

  • Make sure the unit of work never needs to perform a rollback for functional reasons. A rollback means that an aggregate has reached an invalid state. Axon will automatically invalidate the cache entries involved. The following request will force the aggregate to be reconstructed from its events. If you use exceptions as a potential (functional) return value, you can configure a RollbackConfiguration on your command bus. By default, the configuration will roll back the unit of work on unchecked exceptions for command handlers and on all exceptions for event handlers.

  • All commands for a single aggregate must arrive on the machine with the aggregate in its cache. This requirement means that commands should be consistently routed to the same machine for as long as that machine is "healthy." Routing commands consistently prevents the cache from going stale. A hit on a stale cache will cause a command to be executed and fail when events are stored in the event store. By default, Axon’s distributed command bus components will use consistent hashing to route commands.

  • Configure a sensible time to live / time to idle. By default, caches tend to have a relatively short time to live, a matter of minutes. For a command handling component with consistent routing, a longer time-to-idle and time-to-live is usually better. This setting prevents the need to re-initialize an aggregate based on its events because its cache entry expired. The time-to-live of your cache should match the expected lifetime of your aggregate.

  • Cache data in-memory. For proper optimization, caches should keep data in-memory (and preferably on-heap) for best performance. This approach prevents the need to (re)serialize aggregates when storing to disk and even off-heap.

To configure a cache for your Aggregates, consider the following snippet:

Axon Configuration API

public class AxonConfig {
    // omitting other configuration methods...
    public void configureAggregateWithCache(Configurer configurer) {
        AggregateConfigurer<GiftCard> giftCardConfigurer =
                AggregateConfigurer.defaultConfiguration(GiftCard.class)
                                   .configureCache(config -> new WeakReferenceCache());

        configurer.configureAggregate(giftCardConfigurer);
    }
}

Spring Boot auto configuration

The Aggregate annotation allows specification of the cache bean:

@Aggregate(cache = "giftCardCache")
public class GiftCard {
    // state, command handlers and event sourcing handlers...
}

This approach does require the bean name to be present in the Application Context of course:

@Configuration
public class AxonConfig {
    // omitting other configuration methods...
    @Bean
    public Cache giftCardCache() {
        return new WeakReferenceCache();
    }
}