Consuming Events from Kafka

Event messages in an Axon application can be consumed through either a Subscribing or a Tracking Event Processor. Both options are maintained when it comes to consuming events from a Kafka topic, which from a set-up perspective translates to a SubscribableMessageSource or a StreamableKafkaMessageSource respectively, Both will be described in more detail later on, as we first shed light on the general requirements for event consumption in Axon through Kafka.

Both approaches use a similar mechanism to poll events with a Kafka Consumer, which breaks down to a combination of a ConsumerFactory and a Fetcher. The extension provides a DefaultConsumerFactory, whose sole requirement is a Map of configuration properties. The Map contains the settings to use for the Kafka Consumer client, such as the Kafka instance locations. Please check the Kafka documentation for the possible settings and their values.

public class KafkaEventConsumptionConfiguration {
    // ...
    public ConsumerFactory<String, byte[]> consumerFactory(Map<String, Object> consumerConfiguration) {
        return new DefaultConsumerFactory<>(consumerConfiguration);
    }
    // ...
}

It is the Fetcher instance’s job to retrieve the actual messages from Kafka by directing a Consumer instance it receives from the message source. You can draft up your own implementation or use the provided AsyncFetcher to this end. The AsyncFetcher doesn’t need to be explicitly started, as it will react on the message source starting it. It does need to be shut down, to ensure any thread pool or active connections are properly closed.

public class KafkaEventConsumptionConfiguration {
    // ...
    public Fetcher<?, ?, ?> fetcher(long timeoutMillis,
                                    ExecutorService executorService) {
        return AsyncFetcher.builder()
                           .pollTimeout(timeoutMillis)          // Defaults to "5000" milliseconds
                           .executorService(executorService)    // Defaults to a cached thread pool executor
                           .build();
    }
    // ...
}

Consuming Events with a subscribable message source

Using the SubscribableKafkaMessageSource means you are inclined to use a SubscribingEventProcessor to consume the events in your event handlers.

When using this source, Kafka’s idea of pairing Consumer instances into "Consumer Groups" is used. This is strengthened by making the groupId upon source construction a hard requirement. To use a common groupId essentially means that the event-stream-workload can be shared on Kafka’s terms, whereas a SubscribingEventProcessor typically works on its own accord regardless of the number of instances. The workload sharing can be achieved by having several application instances with the same groupId or by adjusting the consumer count through the SubscribableKafkaMessageSource builder. The same benefit holds for resetting an event stream, which in Axon is reserved to the TrackingEventProcessor, but is now opened up through Kafka’s own API’s.

Although the SubscribableKafkaMessageSource thus provides the niceties the tracking event processor normally provides, it does come with two catches:

  1. Axon’s approach of the SequencingPolicy to deduce which thread receives which events is entirely lost. It is thus dependent on which topic-partition pairs are given to a Consumer for the events your handlers receives. From a usage perspective this means event message ordering is no longer guaranteed by Axon. It is thus the user’s job to ensure events are published in the right topic-partition pair.

  2. The API Axon provides for resets is entirely lost, since this API can only be correctly triggered through the TrackingEventProcessor#resetTokens operation

Due to the above it is recommended the user is knowledgeable about Kafka’s specifics on message consumption.

When it comes to configuring a SubscribableKafkaMessageSource as a message source for a SubscribingEventProcessor, there is one additional requirement beside source creation and registration. The source should only start with polling for events as soon as all interested subscribing event processors have been subscribed to it. To ensure the SubscribableKafkaMessageSource#start() operation is called at the right point in the configuration lifecycle, the KafkaMessageSourceConfigurer should be utilized:

public class KafkaEventConsumptionConfiguration {
    // ...
    public KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer(Configurer configurer) {
        KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
        configurer.registerModule(kafkaMessageSourceConfigurer);
        return kafkaMessageSourceConfigurer;
    }

    public SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(List<String> topics,
                                                                                         String groupId,
                                                                                         ConsumerFactory<String, byte[]> consumerFactory,
                                                                                         Fetcher<String, byte[], EventMessage<?>> fetcher,
                                                                                         KafkaMessageConverter<String, byte[]> messageConverter,
                                                                                         int consumerCount,
                                                                                         KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
        SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
                .topics(topics)                     // Defaults to a collection of "Axon.Events"
                .groupId(groupId)                   // Hard requirement
                .consumerFactory(consumerFactory)   // Hard requirement
                .fetcher(fetcher)                   // Hard requirement
                .messageConverter(messageConverter) // Defaults to a "DefaultKafkaMessageConverter"
                .consumerCount(consumerCount)       // Defaults to a single Consumer
                .build();
        // Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
        kafkaMessageSourceConfigurer.registerSubscribableSource(configuration -> subscribableKafkaMessageSource);
        return subscribableKafkaMessageSource;
    }

    public void configureSubscribableKafkaSource(EventProcessingConfigurer eventProcessingConfigurer,
                                                 String processorName,
                                                 SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource) {
        eventProcessingConfigurer.registerSubscribingEventProcessor(
                processorName,
                configuration -> subscribableKafkaMessageSource
        );
    }
    // ...
}

The KafkaMessageSourceConfigurer is an Axon ModuleConfiguration which ties in to the start and end lifecycle of the application. It should receive the SubscribableKafkaMessageSource as a source which should start and stop. The KafkaMessageSourceConfigurer instance itself should be registered as a module to the main Configurer.

If only a single subscribing event processor will be subscribed to the Kafka message source, SubscribableKafkaMessageSource.Builder#autoStart() can be toggled on. This will start the SubscribableKafkaMessageSource upon the first subscription.

Consuming Events with a streamable message source

Using the StreamableKafkaMessageSource means you are inclined to use a TrackingEventProcessor to consume the events in your event handlers.

Whereas the subscribable Kafka message source uses Kafka’s idea of sharing the workload through multiple Consumer instances in the same "Consumer Group", the streamable approach doesn’t use a consumer group, and assigns all available partitions.

public class KafkaEventConsumptionConfiguration {
    // ...
    public StreamableKafkaMessageSource<String, byte[]> streamableKafkaMessageSource(List<String> topics,
                                                                                     ConsumerFactory<String, byte[]> consumerFactory,
                                                                                     Fetcher<String, byte[], KafkaEventMessage> fetcher,
                                                                                     KafkaMessageConverter<String, byte[]> messageConverter,
                                                                                     int bufferCapacity) {
        return StreamableKafkaMessageSource.<String, byte[]>builder()
                .topics(topics)                                                 // Defaults to a collection of "Axon.Events"
                .consumerFactory(consumerFactory)                               // Hard requirement
                .fetcher(fetcher)                                               // Hard requirement
                .messageConverter(messageConverter)                             // Defaults to a "DefaultKafkaMessageConverter"
                .bufferFactory(
                        () -> new SortedKafkaMessageBuffer<>(bufferCapacity))   // Defaults to a "SortedKafkaMessageBuffer" with a buffer capacity of "1000"
                .build();
    }

    public void configureStreamableKafkaSource(EventProcessingConfigurer eventProcessingConfigurer,
                                               String processorName,
                                               StreamableKafkaMessageSource<String, byte[]> streamableKafkaMessageSource) {
        eventProcessingConfigurer.registerTrackingEventProcessor(
                processorName,
                configuration -> streamableKafkaMessageSource
        );
    }
    // ...
}

Note that as with any tracking event processor, the progress on the event stream is stored in a TrackingToken. Using the StreamableKafkaMessageSource means a KafkaTrackingToken containing topic-partition to offset pairs is stored in the TokenStore. If no other TokenStore is provided, and auto-configuration is used, a KafkaTokenStore will be set instead of an InMemoryTokenStore. The KafkaTokenStore by default uses the __axon_token_store_updates topic. This should be a compacted topic, which should be created and configured automatically.