Consuming Events from Kafka
Event messages in an Axon application can be consumed through either a Subscribing or a Tracking Event Processor. Both options are maintained when it comes to consuming events from a Kafka topic, which from a set-up perspective translates to a SubscribableMessageSource or a StreamableKafkaMessageSource respectively, Both will be described in more detail later on, as we first shed light on the general requirements for event consumption in Axon through Kafka.
Both approaches use a similar mechanism to poll events with a Kafka Consumer
, which breaks down to a combination of a ConsumerFactory
and a Fetcher
. The extension provides a DefaultConsumerFactory
, whose sole requirement is a Map
of configuration properties. The Map
contains the settings to use for the Kafka Consumer
client, such as the Kafka instance locations. Please check the Kafka documentation for the possible settings and their values.
public class KafkaEventConsumptionConfiguration {
// ...
public ConsumerFactory<String, byte[]> consumerFactory(Map<String, Object> consumerConfiguration) {
return new DefaultConsumerFactory<>(consumerConfiguration);
}
// ...
}
It is the Fetcher
instance’s job to retrieve the actual messages from Kafka by directing a Consumer
instance it receives from the message source. You can draft up your own implementation or use the provided AsyncFetcher
to this end. The AsyncFetcher
doesn’t need to be explicitly started, as it will react on the message source starting it. It does need to be shut down, to ensure any thread pool or active connections are properly closed.
public class KafkaEventConsumptionConfiguration {
// ...
public Fetcher<?, ?, ?> fetcher(long timeoutMillis,
ExecutorService executorService) {
return AsyncFetcher.builder()
.pollTimeout(timeoutMillis) // Defaults to "5000" milliseconds
.executorService(executorService) // Defaults to a cached thread pool executor
.build();
}
// ...
}
Consuming Events with a subscribable message source
Using the SubscribableKafkaMessageSource
means you are inclined to use a SubscribingEventProcessor
to consume the events in your event handlers.
When using this source, Kafka’s idea of pairing Consumer
instances into "Consumer Groups" is used. This is strengthened by making the groupId
upon source construction a hard requirement. To use a common groupId
essentially means that the event-stream-workload can be shared on Kafka’s terms, whereas a SubscribingEventProcessor
typically works on its own accord regardless of the number of instances. The workload sharing can be achieved by having several application instances with the same groupId
or by adjusting the consumer count through the SubscribableKafkaMessageSource
builder. The same benefit holds for resetting an event stream, which in Axon is reserved to the TrackingEventProcessor
, but is now opened up through Kafka’s own API’s.
Although the SubscribableKafkaMessageSource
thus provides the niceties the tracking event processor normally provides, it does come with two catches:
-
Axon’s approach of the
SequencingPolicy
to deduce which thread receives which events is entirely lost. It is thus dependent on which topic-partition pairs are given to aConsumer
for the events your handlers receives. From a usage perspective this means event message ordering is no longer guaranteed by Axon. It is thus the user’s job to ensure events are published in the right topic-partition pair. -
The API Axon provides for resets is entirely lost, since this API can only be correctly triggered through the
TrackingEventProcessor#resetTokens
operation
Due to the above it is recommended the user is knowledgeable about Kafka’s specifics on message consumption.
When it comes to configuring a SubscribableKafkaMessageSource
as a message source for a SubscribingEventProcessor
, there is one additional requirement beside source creation and registration. The source should only start with polling for events as soon as all interested subscribing event processors have been subscribed to it. To ensure the SubscribableKafkaMessageSource#start()
operation is called at the right point in the configuration lifecycle, the KafkaMessageSourceConfigurer
should be utilized:
public class KafkaEventConsumptionConfiguration {
// ...
public KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer(Configurer configurer) {
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
configurer.registerModule(kafkaMessageSourceConfigurer);
return kafkaMessageSourceConfigurer;
}
public SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(List<String> topics,
String groupId,
ConsumerFactory<String, byte[]> consumerFactory,
Fetcher<String, byte[], EventMessage<?>> fetcher,
KafkaMessageConverter<String, byte[]> messageConverter,
int consumerCount,
KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer) {
SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
.topics(topics) // Defaults to a collection of "Axon.Events"
.groupId(groupId) // Hard requirement
.consumerFactory(consumerFactory) // Hard requirement
.fetcher(fetcher) // Hard requirement
.messageConverter(messageConverter) // Defaults to a "DefaultKafkaMessageConverter"
.consumerCount(consumerCount) // Defaults to a single Consumer
.build();
// Registering the source is required to tie into the Configurers lifecycle to start the source at the right stage
kafkaMessageSourceConfigurer.registerSubscribableSource(configuration -> subscribableKafkaMessageSource);
return subscribableKafkaMessageSource;
}
public void configureSubscribableKafkaSource(EventProcessingConfigurer eventProcessingConfigurer,
String processorName,
SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource) {
eventProcessingConfigurer.registerSubscribingEventProcessor(
processorName,
configuration -> subscribableKafkaMessageSource
);
}
// ...
}
The KafkaMessageSourceConfigurer
is an Axon ModuleConfiguration
which ties in to the start and end lifecycle of the application. It should receive the SubscribableKafkaMessageSource
as a source which should start and stop. The KafkaMessageSourceConfigurer
instance itself should be registered as a module to the main Configurer
.
If only a single subscribing event processor will be subscribed to the Kafka message source, SubscribableKafkaMessageSource.Builder#autoStart()
can be toggled on. This will start the SubscribableKafkaMessageSource
upon the first subscription.
Consuming Events with a streamable message source
Using the StreamableKafkaMessageSource
means you are inclined to use a TrackingEventProcessor
to consume the events in your event handlers.
Whereas the subscribable Kafka message source uses Kafka’s idea of sharing the workload through multiple Consumer
instances in the same "Consumer Group", the streamable approach doesn’t use a consumer group, and assigns all available partitions.
public class KafkaEventConsumptionConfiguration {
// ...
public StreamableKafkaMessageSource<String, byte[]> streamableKafkaMessageSource(List<String> topics,
ConsumerFactory<String, byte[]> consumerFactory,
Fetcher<String, byte[], KafkaEventMessage> fetcher,
KafkaMessageConverter<String, byte[]> messageConverter,
int bufferCapacity) {
return StreamableKafkaMessageSource.<String, byte[]>builder()
.topics(topics) // Defaults to a collection of "Axon.Events"
.consumerFactory(consumerFactory) // Hard requirement
.fetcher(fetcher) // Hard requirement
.messageConverter(messageConverter) // Defaults to a "DefaultKafkaMessageConverter"
.bufferFactory(
() -> new SortedKafkaMessageBuffer<>(bufferCapacity)) // Defaults to a "SortedKafkaMessageBuffer" with a buffer capacity of "1000"
.build();
}
public void configureStreamableKafkaSource(EventProcessingConfigurer eventProcessingConfigurer,
String processorName,
StreamableKafkaMessageSource<String, byte[]> streamableKafkaMessageSource) {
eventProcessingConfigurer.registerTrackingEventProcessor(
processorName,
configuration -> streamableKafkaMessageSource
);
}
// ...
}
Note that as with any tracking event processor, the progress on the event stream is stored in a TrackingToken
. Using the StreamableKafkaMessageSource
means a KafkaTrackingToken
containing topic-partition to offset pairs is stored in the TokenStore
. If no other TokenStore
is provided, and auto-configuration is used, a KafkaTokenStore
will be set instead of an InMemoryTokenStore
. The KafkaTokenStore
by default uses the __axon_token_store_updates
topic. This should be a compacted topic, which should be created and configured automatically.