Repository and Event Store

The repository is the mechanism that provides access to aggregates. The repository acts as a gateway to the actual storage mechanism used to persist the data. In CQRS, the repositories only need to be able to find aggregates based on their unique identifier. Any other types of queries should be performed against the query database.

In the Axon Framework, all repositories must implement the Repository interface. This interface prescribes three methods: load(identifier, version), load(identifier) and newInstance(factoryMethod). The load methods allows you to load aggregates from the repository. The optional version parameter is used to detect concurrent modifications (see Advanced conflict detection and resolution). newInstance is used to register newly created aggregates in the repository.

Depending on your underlying persistence storage and auditing needs, there are a number of base implementations that provide basic functionality needed by most repositories. Axon Framework makes a distinction between repositories that save the current state of the aggregate (see Standard Repositories), and those that store the events of an aggregate (see Event Sourcing Repositories).

Note that the Repository interface does not prescribe a delete(identifier) method. Deleting aggregates is done by invoking the AggregateLifecycle.markDeleted() method from within an aggregate. Deleting an aggregate is a state migration like any other, with the only difference that it is irreversible in many cases. You should create your own meaningful method on your aggregate which sets the aggregate's state to "deleted". This also allows you to register any events that you would like to have published.

Standard repositories

Standard repositories store the actual state of an Aggregate. Upon each change, the new state will overwrite the old. This makes it possible for the query components of the application to use the same information the command component also uses. This could, depending on the type of application you are creating, be the simplest solution. If that is the case, Axon provides some building blocks that help you implement such a repository.

Axon provides one out-of-the-box implementation for a standard Repository: the GenericJpaRepository. It expects the Aggregate to be a valid JPA Entity. It is configured with an EntityManagerProvider which provides the EntityManager to manage the actual persistence, and a class specifying the actual type of Aggregate stored in the Repository. You also pass in the EventBus to which Events are to be published when the Aggregate invokes the static AggregateLifecycle.apply() method.

You can also easily implement your own repository. In that case, it is best to extend from the abstract LockingRepository. As aggregate wrapper type, it is recommended to use the AnnotatedAggregate. See the sources of GenericJpaRepository for an example.

Event Sourcing repositories

Aggregate roots that are able to reconstruct their state based on events may also be configured to be loaded by an Event Sourcing Repository. Those repositories do not store the aggregate itself, but the series of events generated by the aggregate. Based on these events, the state of an aggregate can be restored at any time.

The EventSourcingRepository implementation provides the basic functionality needed by any event sourcing repository in the AxonFramework. It depends on an EventStore (see Implementing your own Event Store), which abstracts the actual storage mechanism for the events.

Optionally, you can provide an Aggregate Factory. The AggregateFactory specifies how an aggregate instance is created. Once an aggregate has been created, the EventSourcingRepository can initialize it using the Events it loaded from the Event Store. Axon Framework comes with a number of AggregateFactory implementations that you may use. If they do not suffice, it is very easy to create your own implementation.

GenericAggregateFactory

The GenericAggregateFactory is a special AggregateFactory implementation that can be used for any type of Event Sourced Aggregate Root. The GenericAggregateFactory creates an instance of the Aggregate type the repository manages. The Aggregate class must be non-abstract and declare a default no-arg constructor that does no initialization at all.

The GenericAggregateFactory is suitable for most scenarios where aggregates do not need special injection of non-serializable resources.

SpringPrototypeAggregateFactory

Depending on your architectural choices, it might be useful to inject dependencies into your aggregates using Spring. You could, for example, inject query repositories into your aggregate to ensure the existence (or nonexistence) of certain values.

To inject dependencies into your aggregates, you need to configure a prototype bean of your aggregate root in the Spring context that also defines the SpringPrototypeAggregateFactory. Instead of creating regular instances of using a constructor, it uses the Spring Application Context to instantiate your aggregates. This will also inject any dependencies in your aggregate.

Implementing your own AggregateFactory

In some cases, the GenericAggregateFactory just doesn't deliver what you need. For example, you could have an abstract aggregate type with multiple implementations for different scenarios (e.g. PublicUserAccount and BackOfficeAccount both extending an Account). Instead of creating different repositories for each of the aggregates, you could use a single repository, and configure an AggregateFactory that is aware of the different implementations.

The bulk of the work the Aggregate Factory does is creating uninitialized Aggregate instances. It must do so using a given aggregate identifier and the first Event from the stream. Usually, this Event is a creation event which contains hints about the expected type of aggregate. You can use this information to choose an implementation and invoke its constructor. Make sure no Events are applied by that constructor; the aggregate must be uninitialized.

Initializing aggregates based on the events can be a time-consuming effort, compared to the direct aggregate loading of the simple repository implementations. The CachingEventSourcingRepository provides a cache from which aggregates can be loaded if available.

Event store implementations

Event Sourcing repositories need an event store to store and load events from aggregates. An Event Store offers the functionality of an Event Bus, with the addition that it persists published events, and is able to retrieve events based on an Aggregate Identifier.

Axon provides an event store out of the box, the EmbeddedEventStore. It delegates actual storage and retrieval of events to an EventStorageEngine.

There are multiple EventStorageEngine implementations available:

JpaEventStorageEngine

The JpaEventStorageEngine stores events in a JPA-compatible data source. The JPA Event Store stores events in so called entries. These entries contain the serialized form of an event, as well as some fields where meta-data is stored for fast lookup of these entries. To use the JpaEventStorageEngine, you must have the JPA (javax.persistence) annotations on your classpath.

By default, the event store needs you to configure your persistence context (e.g. as defined in META-INF/persistence.xml file) to contain the classes DomainEventEntry and SnapshotEventEntry (both in the org.axonframework.eventsourcing.eventstore.jpa package).

Below is an example configuration of a persistence context configuration:

<persistence xmlns="http://java.sun.com/xml/ns/persistence" version="1.0">
    <persistence-unit name="eventStore" transaction-type="RESOURCE_LOCAL"> (1)
        <class>org...eventstore.jpa.DomainEventEntry</class> (2)
        <class>org...eventstore.jpa.SnapshotEventEntry</class>
    </persistence-unit>
</persistence>
  1. In this sample, there is a specific persistence unit for the event store. You may, however, choose to add the third line to any other persistence unit configuration.

  2. This line registers the DomainEventEntry (the class used by the JpaEventStore) with the persistence context.

Note

Axon uses Locking to prevent two threads from accessing the same Aggregate. However, if you have multiple JVMs on the same database, this won't help you. In that case, you'd have to rely on the database to detect conflicts. Concurrent access to the event store will result in a Key Constraint Violation, as the table only allows a single Event for an aggregate with any sequence number. Inserting a second event for an existing aggregate with an existing sequence number will result in an error.

The JpaEventStorageEngine can detect this error and translate it to a ConcurrencyException. However, each database system reports this violation differently. If you register your DataSource with the JpaEventStore, it will try to detect the type of database and figure out which error codes represent a Key Constraint Violation. Alternatively, you may provide a PersistenceExceptionTranslator instance, which can tell if a given exception represents a Key Constraint Violation.

If no DataSource or PersistenceExceptionTranslator is provided, exceptions from the database driver are thrown as-is.

By default, the JPA Event Storage Engine requires an EntityManagerProvider implementation that returns the EntityManager instance for the EventStorageEngine to use. This also allows for application managed persistence contexts to be used. It is the EntityManagerProvider's responsibility to provide a correct instance of the EntityManager.

There are a few implementations of the EntityManagerProvider available, each for different needs. The SimpleEntityManagerProvider simply returns the EntityManager instance which is given to it at construction time. This makes the implementation a simple option for Container Managed Contexts. Alternatively, there is the ContainerManagedEntityManagerProvider, which returns the default persistence context, and is used by default by the Jpa Event Store.

If you have a persistence unit called "myPersistenceUnit" which you wish to use in the JpaEventStore, this is what the EntityManagerProvider implementation could look like:

public class MyEntityManagerProvider implements EntityManagerProvider {

    private EntityManager entityManager;

    @Override
    public EntityManager getEntityManager() {
        return entityManager;
    }

    @PersistenceContext(unitName = "myPersistenceUnit")
    public void setEntityManager(EntityManager entityManager) {
        this.entityManager = entityManager;
    }

By default, the JPA Event Store stores entries in DomainEventEntry and SnapshotEventEntry entities. While this will suffice in many cases, you might encounter a situation where the meta-data provided by these entities is not enough. Or you might want to store events of different aggregate types in different tables.

If that is the case, you can extend the JpaEventStorageEngine. It contains a number of protected methods that you can override to tweak its behavior.

Warning

Note that persistence providers, such as Hibernate, use a first-level cache on their EntityManager implementation. Typically, this means that all entities used or returned in queries are attached to the EntityManager. They are only cleared when the surrounding transaction is committed or an explicit "clear" is performed inside the transaction. This is especially the case when the Queries are executed in the context of a transaction.

To work around this issue, make sure to exclusively query for non-entity objects. You can use JPA's "SELECT new SomeClass(parameters) FROM ..." style queries to work around this issue. Alternatively, call EntityManager.flush() and EntityManager.clear() after fetching a batch of events. Failure to do so might result in OutOfMemoryExceptions when loading large streams of events.

JDBC Event Storage Engine

The JDBC event storage engine uses a JDBC Connection to store Events in a JDBC compatible data storage. Typically, these are relational databases. Theoretically, anything that has a JDBC driver could be used to back the JDBC Event Storage Engine.

Similar to its JPA counterpart, the JDBC Event Storage Engine stores Events in entries. By default, each Event is stored in a single Entry, which corresponds with a row in a table. One table is used for Events and another for the Snapshots.

The JdbcEventStorageEngine uses a ConnectionProvider to obtain connections. Typically, these connections can be obtained directly from a DataSource. However, Axon will bind these connections to a Unit of Work, so that a single connection is used in a Unit of Work. This ensures that a single transaction is used to store all events, even when multiple Units of Work are nested in the same thread.

Note

Spring users are recommended to use the SpringDataSourceConnectionProvider to attach a connection from a DataSource to an existing transaction.

MongoDB Event Storage Engine

MongoDB is a document based NoSQL store. Its scalability characteristics make it suitable for use as an Event Store. Axon provides the MongoEventStorageEngine, which uses MongoDB as backing database. It is contained in the Axon Mongo module (Maven artifactId axon-mongo).

Events are stored in two separate collections: one for the actual event streams and one for the snapshots.

By default, the MongoEventStorageEngine stores each event in a separate document. It is, however, possible to change the StorageStrategy used. The alternative provided by Axon is the DocumentPerCommitStorageStrategy, which creates a single document for all Events that have been stored in a single commit (i.e. in the same DomainEventStream).

Storing an entire commit in a single document has the advantage that a commit is stored atomically. Furthermore, it requires only a single roundtrip for any number of events. A disadvantage is that it becomes harder to query events directly in the database. When refactoring the domain model, for example, it is harder to "transfer" events from one aggregate to another if they are included in a "commit document".

The MongoDB doesn't take a lot of configuration. All it needs is a reference to the collections to store the Events in, and you're set to go. For production environments, you may want to double check the indexes on your collections.

Event Store Utilities

Axon provides a number of Event Storage Engines that may be useful in certain circumstances.

The SequenceEventStorageEngine is a wrapper around two other Event Storage Engines. When reading, it returns the events from both event storage engines. Appended events are only appended to the second event storage engine. This is useful in cases where two different implementations of Event Storage are used for performance reasons, for example. The first would be a larger, but slower event store, while the second is optimized for quick reading and writing.

There is also an EventStorageEngine implementation that keeps the stored events in memory: the InMemoryEventStorageEngine. While it probably outperforms any other event store out there, it is not really meant for long-term production use. However, it is very useful in short-lived tools or tests that require an event store.

Influencing the serialization process

Event Stores need a way to serialize the Event to prepare it for storage. By default, Axon uses the XStreamSerializer, which uses XStream to serialize Events into XML. XStream is reasonably fast and is more flexible than Java Serialization. Furthermore, the result of XStream serialization is human readable. Quite useful for logging and debugging purposes.

The XStreamSerializer can be configured. You can define aliases it should use for certain packages, classes or even fields. Besides being a nice way to shorten potentially long names, aliases can also be used when class definitions of events change. For more information about aliases, visit the XStream website.

Alternatively, Axon also provides the JacksonSerializer, which uses Jackson to serialize Events into JSON. While it produces a more compact serialized form, it does require that classes stick to the conventions (or configuration) required by Jackson.

Note

Configuring the serializer using Java code (or other JVM languages) is easy. However, configuring it in a Spring XML Application Context is not so trivial, due to its limitations to invoke methods. One of the options is to create a FactoryBean that creates an instance of an XStreamSerializer and configures it in code. Check the Spring Reference for more information.

You may also implement your own Serializer, simply by creating a class that implements Serializer, and configuring the Event Store to use that implementation instead of the default.

Event Upcasting

Due to the ever-changing nature of software applications it is likely that event definitions also change over time. Since the Event Store is considered a read and append-only data source, your application must be able to read all events, regardless of when they have been added. This is where upcasting comes in.

Originally a concept of object-oriented programming, where "a subclass gets cast to its superclass automatically when needed", the concept of upcasting can also be applied to event sourcing. To upcast an event means to transform it from its original structure to its new structure. Unlike OOP upcasting, event upcasting cannot be done in full automation because the structure of the new event is unknown to the old event. Manually written Upcasters have to be provided to specify how to upcast the old structure to the new structure.

Upcasters are classes that take one input event of revision x and output zero or more new events of revision x + 1. Moreover, upcasters are processed in a chain, meaning that the output of one upcaster is sent to the input of the next. This allows you to update events in an incremental manner, writing an Upcaster for each new event revision, making them small, isolated, and easy to understand.

Note

Perhaps the greatest benefit of upcasting is that it allows you to do non-destructive refactoring, i.e. the complete event history remains intact.

In this section we'll explain how to write an upcaster, describe the different (abstract) implementations of the Upcaster that come with Axon, and explain how the serialized representations of events affects how upcasters are written.

To allow an upcaster to see what version of serialized object they are receiving, the Event Store stores a revision number as well as the fully qualified name of the Event. This revision number is generated by a RevisionResolver, configured in the serializer. Axon provides several implementations of the RevisionResolver, such as the AnnotationRevisionResolver, which checks for an @Revision annotation on the Event payload, a SerialVersionUIDRevisionResolver that uses the serialVersionUID as defined by Java Serialization API and a FixedValueRevisionResolver, which always returns a predefined value. The latter is useful when injecting the current application version. This will allow you to see which version of the application generated a specific event.

Maven users can use the MavenArtifactRevisionResolver to automatically use the project version. It is initialized using the groupId and artifactId of the project to obtain the version for. Since this only works in JAR files created by Maven, the version cannot always be resolved by an IDE. If a version cannot be resolved, null is returned.

Writing an upcaster

Old version of the event:

@Revision("1.0")
public class ComplaintEvent {
    private String id;
    private String companyName;

    // Constructor, getter, setter...
}

New version of the event:

@Revision("2.0")
public class ComplaintEvent {
    private String id;
    private String companyName;
    private String complain; // New field

    // Constructor, getter, setter...
}

Upcaster:

// Upcaster from 1.0 revision to 2.0 revision
public class ComplaintEventUpcaster extends SingleEventUpcaster {
    private static SimpleSerializedType targetType = new SimpleSerializedType(ComplainEvent.class.getTypeName(), "1.0");

    @Override
    protected boolean canUpcast(IntermediateEventRepresentation intermediateRepresentation) {
        return intermediateRepresentation.getType().equals(targetType);
    }

    @Override
    protected IntermediateEventRepresentation doUpcast(IntermediateEventRepresentation intermediateRepresentation) {
        return intermediateRepresentation.upcastPayload(
                new SimpleSerializedType(targetType.getName(), "2.0"),
                org.dom4j.Document.class,
                document -> {
                    document.getRootElement().addElement("complaint");
                    document.getRootElement().element("complaint").setText("no complaint description"); // Default value
                    return document;
                }
        );
    }
}

Spring boot configuration:

@Configuration
public class AxonConfiguration {

    @Bean
    public SingleEventUpcaster myUpcaster() {
        return new ComplaintEventUpcaster();
    }

    @Bean
    public JpaEventStorageEngine eventStorageEngine(Serializer serializer,
                                                    DataSource dataSource,
                                                    SingleEventUpcaster myUpcaster,
                                                    EntityManagerProvider entityManagerProvider,
                                                    TransactionManager transactionManager) throws SQLException {
        return new JpaEventStorageEngine(serializer,
                myUpcaster::upcast,
                dataSource,
                entityManagerProvider,
                transactionManager);
    }
}

TODO - Describe

  • Upcasters work on intermediate representations

  • They update Stream to Stream

  • Abstract implementations for 1-to-1 upcasting

  • code examples

Content type conversion

An upcaster works on a given content type (e.g. dom4j Document). To provide extra flexibility between upcasters, content types between chained upcasters may vary. Axon will try to convert between the content types automatically by using ContentTypeConverters. It will search for the shortest path from type x to type y, perform the conversion and pass the converted value into the requested upcaster. For performance reasons, conversion will only be performed if the canUpcast method on the receiving upcaster yields true.

The ContentTypeConverters may depend on the type of serializer used. Attempting to convert a byte[] to a dom4j Document will not make any sense unless a Serializer was used that writes an event as XML. To make sure the UpcasterChain has access to the serializer-specific ContentTypeConverters, you can pass a reference to the serializer to the constructor of the UpcasterChain.

Tip

To achieve the best performance, ensure that all upcasters in the same chain (where one's output is another's input) work on the same content type.

If the content type conversion that you need is not provided by Axon you can always write one yourself using the ContentTypeConverter interface.

The XStreamSerializer supports Dom4J as well as XOM as XML document representations. The JacksonSerializer supports Jackson's JsonNode.

Snapshotting

When aggregates live for a long time, and their state constantly changes, they will generate a large amount of events. Having to load all these events in to rebuild an aggregate's state may have a big performance impact. The snapshot event is a domain event with a special purpose: it summarises an arbitrary amount of events into a single one. By regularly creating and storing a snapshot event, the event store does not have to return long lists of events. Just the last snapshot events and all events that occurred after the snapshot was made.

For example, items in stock tend to change quite often. Each time an item is sold, an event reduces the stock by one. Every time a shipment of new items comes in, the stock is incremented by some larger number. If you sell a hundred items each day, you will produce at least 100 events per day. After a few days, your system will spend too much time reading in all these events just to find out whether it should raise an "ItemOutOfStockEvent". A single snapshot event could replace a lot of these events, just by storing the current number of items in stock.

Creating a snapshot

Snapshot creation can be triggered by a number of factors, for example the number of events created since the last snapshot, the time to initialize an aggregate exceeds a certain threshold, time-based, etc. Currently, Axon provides a mechanism that allows you to trigger snapshots based on an event count threshold.

The definition of when snapshots should be created, is provided by the SnapshotTriggerDefinition interface.

The EventCountSnapshotTriggerDefinition provides the mechanism to trigger snapshot creation when the number of events needed to load an aggregate exceeds a certain threshold. If the number of events needed to load an aggregate exceeds a certain configurable threshold, the trigger tells a Snapshotter to create a snapshot for the aggregate.

The snapshot trigger is configured on an Event Sourcing Repository and has a number of properties that allow you to tweak triggering:

  • Snapshotter sets the actual snapshotter instance, responsible for creating and storing the actual snapshot event;

  • Trigger sets the threshold at which to trigger snapshot creation;

A Snapshotter is responsible for the actual creation of a snapshot. Typically, snapshotting is a process that should disturb the operational processes as little as possible. Therefore, it is recommended to run the snapshotter in a different thread. The Snapshotter interface declares a single method: scheduleSnapshot(), which takes the aggregate's type and identifier as parameters.

Axon provides the AggregateSnapshotter, which creates and stores AggregateSnapshot instances. This is a special type of snapshot, since it contains the actual aggregate instance within it. The repositories provided by Axon are aware of this type of snapshot, and will extract the aggregate from it, instead of instantiating a new one. All events loaded after the snapshot events are streamed to the extracted aggregate instance.

Note

Do make sure that the Serializer instance you use (which defaults to the XStreamSerializer) is capable of serializing your aggregate. The XStreamSerializer requires you to use either a Hotspot JVM, or your aggregate must either have an accessible default constructor or implement the Serializable interface.

The AbstractSnapshotter provides a basic set of properties that allow you to tweak the way snapshots are created:

  • EventStore sets the event store that is used to load past events and store the snapshots. This event store must implement the SnapshotEventStore interface.

  • Executor sets the executor, such as a ThreadPoolExecutor that will provide the thread to process actual snapshot creation. By default, snapshots are created in the thread that calls the scheduleSnapshot() method, which is generally not recommended for production.

The AggregateSnapshotter provides one more property:

  • AggregateFactories is the property that allows you to set the factories that will create instances of your aggregates. Configuring multiple aggregate factories allows you to use a single Snapshotter to create snapshots for a variety of aggregate types. The EventSourcingRepository implementations provide access to the AggregateFactory they use. This can be used to configure the same aggregate factories in the Snapshotter as the ones used in the repositories.

Note

If you use an executor that executes snapshot creation in another thread, make sure you configure the correct transaction management for your underlying event store, if necessary.

Spring users can use the SpringAggregateSnapshotter, which will automatically look up the right AggregateFactory from the Application Context when a snapshot needs to be created.

Storing Snapshot Events

When a snapshot is stored in the Event Store, it will automatically use that snapshot to summarize all prior events and return it in their place. All event store implementations allow for concurrent creation of snapshots. This means they allow snapshots to be stored while another process is adding Events for the same aggregate. This allows the snapshotting process to run as a separate process altogether.

Note

Normally, you can archive all events once they are part of a snapshot event. Snapshotted events will never be read in again by the event store in regular operational scenario's. However, if you want to be able to reconstruct aggregate state prior to the moment the snapshot was created, you must keep the events up to that date.

Axon provides a special type of snapshot event: the AggregateSnapshot, which stores an entire aggregate as a snapshot. The motivation is simple: your aggregate should only contain the state relevant to take business decisions. This is exactly the information you want captured in a snapshot. All Event Sourcing Repositories provided by Axon recognize the AggregateSnapshot, and will extract the aggregate from it. Beware that using this snapshot event requires that the event serialization mechanism needs to be able to serialize the aggregate.

Initializing an Aggregate based on a Snapshot Event

A snapshot event is an event like any other. That means a snapshot event is handled just like any other domain event. When using annotations to demarcate event handlers (@EventHandler), you can annotate a method that initializes full aggregate state based on a snapshot event. The code sample below shows how snapshot events are treated like any other domain event within the aggregate.

public class MyAggregate extends AbstractAnnotatedAggregateRoot {

    // ... code omitted for brevity

    @EventHandler
    protected void handleSomeStateChangeEvent(MyDomainEvent event) {
        // ...
    }

    @EventHandler
    protected void applySnapshot(MySnapshotEvent event) {
        // the snapshot event should contain all relevant state
        this.someState = event.someState;
        this.otherState = event.otherState;
    }
}

There is one type of snapshot event that is treated differently: the AggregateSnapshot. This type of snapshot event contains the actual aggregate. The aggregate factory recognizes this type of event and extracts the aggregate from the snapshot. Then, all other events are re-applied to the extracted snapshot. That means aggregates never need to be able to deal with AggregateSnapshot instances themselves.

Advanced conflict detection and resolution

One of the major advantages of being explicit about the meaning of changes, is that you can detect conflicting changes with more precision. Typically, these conflicting changes occur when two users are acting on the same data (nearly) simultaneously. Imagine two users, both looking at a specific version of the data. They both decide to make a change to that data. They will both send a command like "on version X of this aggregate, do that", where X is the expected version of the aggregate. One of them will have the changes actually applied to the expected version. The other user won't.

Instead of simply rejecting all incoming commands when aggregates have been modified by another process, you could check whether the user's intent conflicts with any unseen changes.

To detect conflict, pass a parameter of type ConflictResolver to the @CommandHandler method of your aggregate. This interface provides detectConflicts methods that allow you to define the types of events that are considered a conflict when executing that specific type of command.

Note

Note that the ConflictResolver will only contain any potentially conflicting events if the Aggregate was loaded with an expected version. Use @TargetAggregateVersion on a field of a command to indicate the expected version of the Aggregate.

If events matching the predicate are found, an exception is thrown (the optional second parameter of detectConflicts allows you to define the exception to throw). If none are found, processing continues as normal.

If no invocations to detectConflicts are made, and there are potentially conflicting events, the @CommandHandler will fail. This may be the case when an expected version is provided, but no ConflictResolver is available in the parameters of the @CommandHandler method.

Last updated