Token Store Migration

Axon Framework 5 adds a mask column to the token store table used by streaming event processors. This column is required: without it, the framework cannot start. If you are migrating from Axon Framework 4, the existing schema must be updated and the column must be populated with correct values.

Understanding the mask column

Streaming event processors divide the event stream into segments. Each row in the token store represents one segment for a given processor. In Axon Framework 5, a mask column is stored alongside the segment column.

The mask is a bitmask that, together with the segment ID, determines which events that segment processes. It is derived from how many times the root segment has been split and changes whenever the processor scales up or down.

For a processor with four equal-size segments, the token store contains:

processorName segment mask

inventory-projection

0

3

inventory-projection

1

3

inventory-projection

2

3

inventory-projection

3

3

For a processor with two segments:

processorName segment mask

inventory-projection

0

1

inventory-projection

1

1

The non-trivial case is when the number of segments is not a power of two. A processor that started with two segments and then had one of them split produces three segments:

processorName segment mask

inventory-projection

0

3

inventory-projection

1

1

inventory-projection

2

3

Segments 0 and 2 (the split pair) have mask 3, while segment 1 (which was never split) retains mask 1. The correct mask for each segment can be determined unambiguously from the complete set of segment IDs for that processor, but not from any individual segment ID in isolation. This is what makes the migration non-trivial: you must read all segment IDs for a processor together and compute their masks as a group.

Choosing a migration strategy

Three strategies are available. Choose based on your constraints:

Strategy Description Downtime

Scale to single segment

Scale all processors to one segment before migrating. The mask is trivially 0.

Brief outage

Reset processors to latest

Delete all token entries and let Axon Framework 5 create them fresh at the latest position.

Brief outage

Programmatic mask migration

Compute and set the correct mask for every existing token entry.

Zero downtime possible

Option 1: Scale to a single segment

This is the simplest strategy. Before the schema change, scale every processor down to a single segment. The single remaining segment is always the root segment (segment ID 0, mask 0), where both segment ID and mask are 0. The SQL migration is a single ALTER TABLE.

Steps:

  1. Scale all event processors to a single segment by repeatedly merging segments until only segment 0 remains. See Splitting and merging segments for how to trigger a merge.

  2. Shut down the application.

  3. Apply the schema change:

    ALTER TABLE token_entry ADD COLUMN mask INTEGER NOT NULL DEFAULT 0;
  4. Upgrade to Axon Framework 5 and restart.

  5. Scale the processors back up to the desired number of segments.

After merging down to a single segment, every processor has exactly one row with segment = 0. The root segment always has both segment ID and mask equal to 0, so the default of 0 is correct.

Option 2: Reset processors to latest

If scaling processors down first is impractical, you can delete the existing token entries and let Axon Framework 5 reinitialize them at the latest position in the event stream. No mask computation is needed because the framework creates the new entries with correct mask values.

For this to be safe, the processors must be fully caught up before the restart, so that no events are missed when they reinitialize at the latest position.

Steps:

  1. Halt all writes to the application so that no new events are produced.

  2. Wait for all event processors to catch up to the latest position in the event stream.

  3. Shut down the application.

  4. Add the mask column and delete all existing token entries:

    ALTER TABLE token_entry ADD COLUMN mask INTEGER NOT NULL DEFAULT 0;
    DELETE FROM token_entry;
  5. Configure the processors to start at the latest position when no token entry exists, using LatestTrackingToken. Refer to Streaming Event Processor for the initial token configuration.

  6. Upgrade to Axon Framework 5 and restart. The processors find no existing tokens, initialize at the latest position, and create fresh entries with the correct mask values.

  7. Re-enable writes.

Events that arrive between step 1 (halting writes) and step 7 (re-enabling writes) are not processed by the reset processors, because they initialize at the latest position that existed at startup time. After writes are re-enabled, the processors continue from that point forward.

Option 3: Programmatic mask migration

This option computes the correct mask for every existing token entry and updates the table in place. It can run while the application is offline or be integrated into a schema migration tool such as Flyway or Liquibase. No scaling or token deletion is required.

The mask computation mirrors the split logic of org.axonframework.messaging.eventhandling.processing.streaming.segmenting.Segment. Given the full set of segment IDs for a processor, the algorithm reconstructs the segment hierarchy by simulating splits from the root and identifies the mask for each leaf segment.

Add the column

First, add the mask column as nullable so existing rows are not immediately rejected:

ALTER TABLE token_entry ADD COLUMN mask INTEGER;

Flyway Java migration

Add a versioned Flyway Java migration that reads the segment IDs, computes the masks, and writes them back:

import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;

import java.sql.*;
import java.util.*;

public class V2__PopulateTokenStoreMask extends BaseJavaMigration {

    @Override
    public void migrate(Context context) throws Exception {
        Connection conn = context.getConnection();
        Map<String, List<Integer>> segmentsByProcessor = readSegments(conn);
        updateMasks(conn, segmentsByProcessor);
    }

    private Map<String, List<Integer>> readSegments(Connection conn) throws SQLException {
        Map<String, List<Integer>> result = new LinkedHashMap<>();
        try (PreparedStatement stmt = conn.prepareStatement(
                "SELECT processorName, segment FROM token_entry ORDER BY processorName, segment");
             ResultSet rs = stmt.executeQuery()) {
            while (rs.next()) {
                result.computeIfAbsent(rs.getString("processorName"), k -> new ArrayList<>())
                      .add(rs.getInt("segment"));
            }
        }
        return result;
    }

    private void updateMasks(Connection conn,
                             Map<String, List<Integer>> segmentsByProcessor) throws SQLException {
        try (PreparedStatement stmt = conn.prepareStatement(
                "UPDATE token_entry SET mask = ? WHERE processorName = ? AND segment = ?")) {
            for (Map.Entry<String, List<Integer>> entry : segmentsByProcessor.entrySet()) {
                String processorName = entry.getKey();
                int[] ids = entry.getValue().stream().mapToInt(Integer::intValue).toArray();
                for (SegmentInfo seg : computeSegments(ids)) {
                    stmt.setInt(1, seg.mask());
                    stmt.setString(2, processorName);
                    stmt.setInt(3, seg.segmentId());
                    stmt.addBatch();
                }
            }
            stmt.executeBatch();
        }
    }

    private static List<SegmentInfo> computeSegments(int[] segmentIds) {
        if (segmentIds.length == 0) {
            return List.of();
        }
        List<Integer> ids = Arrays.stream(segmentIds).boxed().toList();
        Set<SegmentInfo> result = new HashSet<>();
        computeSegments(new SegmentInfo(0, 0), ids, result);
        return result.stream()
                     .sorted(Comparator.comparingInt(SegmentInfo::segmentId))
                     .toList();
    }

    private static void computeSegments(SegmentInfo current, List<Integer> segmentIds,
                                        Set<SegmentInfo> result) {
        int newMask = (current.mask() << 1) | 1;
        int sibling = current.segmentId() + (current.mask() == 0 ? 1 : newMask ^ current.mask());
        if (segmentIds.contains(sibling)) {
            computeSegments(new SegmentInfo(current.segmentId(), newMask), segmentIds, result);
            computeSegments(new SegmentInfo(sibling, newMask), segmentIds, result);
        } else {
            result.add(current);
        }
    }

    private record SegmentInfo(int segmentId, int mask) {}
}

Apply the NOT NULL constraint

Once the migration has run successfully, enforce the constraint:

-- PostgreSQL
ALTER TABLE token_entry ALTER COLUMN mask SET NOT NULL;

-- MySQL / MariaDB
ALTER TABLE token_entry MODIFY COLUMN mask INTEGER NOT NULL;

-- SQL Server
ALTER TABLE token_entry ALTER COLUMN mask INTEGER NOT NULL;

Liquibase

For Liquibase, extract the computeSegments logic above into a helper class and invoke it from a CustomTaskChange implementation. Use a <changeSet> to add the nullable column first, then call the custom change to populate it, and finally apply the NOT NULL constraint in a third changeset.

Table and column names

The SQL examples above use the default JDBC token store column names. Adjust them to match your actual configuration:

Store type Default table name Column names

JPA with Spring Boot (SpringPhysicalNamingStrategy)

token_entry

processor_name, segment, mask

JDBC (TokenSchema defaults)

TokenEntry

processorName, segment, mask

JPA column names depend on the Hibernate physical naming strategy configured in your application. Spring Boot’s default SpringPhysicalNamingStrategy converts camelCase field names to snake_case. Using PhysicalNamingStrategyStandardImpl keeps them as-is. Check your persistence configuration to confirm the exact column names in your database before running any migration script.