Publishing Events to Kafka
When Event Messages are published to an Event Bus (or Event Store), they can be forwarded to a Kafka topic using the KafkaPublisher
. To achieve this it will utilize a Kafka Producer
, retrieved through Axon’s ProducerFactory
. The KafkaPublisher
in turn receives the events to publish from a KafkaEventPublisher
.
Since the KafkaEventPublisher
is an event message handler in Axon terms, we can provide it to any Event Processor to receive the published events. The choice of event processor which brings differing characteristics for event publication to Kafka:
-
Subscribing Event Processor - publication of messages to Kafka will occur in the same thread (and Unit of Work) which published the events to the event bus. This approach ensures failure to publish to Kafka enforces failure of the initial event publication on the event bus
-
Tracking Event Processor - publication of messages to Kafka is run in a different thread (and Unit of Work) than the one which published the events to the event bus. This approach ensures the event has been published on the event bus regardless of whether publication to Kafka works
When setting up event publication it is also important to take into account which ConfirmationMode
is used. The ConfirmationMode
influences the process of actually producing an event message on a Kafka topic, but also what kind of Producer
the ProducerFactory
will instantiate:
-
TRANSACTIONAL - This will require the
Producer
to start, commit and (in case of failure) rollback the transaction of publishing an event message. Alongside this, it will create a pool ofProducer
instances in theProducerFactory
to avoid continuous creation of new ones, requiring the user to provide a "transactional id prefix" to uniquely identify everyProducer
in the pool. -
WAIT_FOR_ACK - Setting "WAIT_FOR_ACK" as the
ConfirmationMode
will require theProducer
instance to wait for a default of 1 second (configurable on theKafkaPublisher
) until the event message publication has been acknowledged. Alongside this, it will create a single, shareableProducer
instance from within theProducerFactory
. -
NONE - This is the default mode, which only ensures a single, shareable
Producer
instance from within theProducerFactory
.
Configuring event publication to Kafka
It is a several step process to configure Event publication to Kafka, which starts with the ProducerFactory
. Axon provides the DefaultProducerFactory
implementation of the ProducerFactory
, which should be instantiated through the provided DefaultProducerFactory.Builder
.
The builder has one hard requirement, which is the Producer
configuration Map
. The Map
contains the settings to use for the Kafka Producer
client, such as the Kafka instance locations. Please check the Kafka documentation for the possible settings and their values.
public class KafkaEventPublicationConfiguration {
// ...
public ProducerFactory<String, byte[]> producerFactory(Duration closeTimeout,
int producerCacheSize,
Map<String, Object> producerConfiguration,
ConfirmationMode confirmationMode,
String transactionIdPrefix) {
return DefaultProducerFactory.<String, byte[]>builder()
.closeTimeout(closeTimeout) // Defaults to "30" seconds
.producerCacheSize(producerCacheSize) // Defaults to "10"; only used for "TRANSACTIONAL" mode
.configuration(producerConfiguration) // Hard requirement
.confirmationMode(confirmationMode) // Defaults to a Confirmation Mode of "NONE"
.transactionalIdPrefix(transactionIdPrefix) // Hard requirement when in "TRANSACTIONAL" mode
.build();
}
// ...
}
The second infrastructure component to introduce is the KafkaPublisher
, which has a hard requirement on the ProducerFactory
. Additionally, this would be the place to define the Kafka topics upon which Axon event messages will be published. You can set a function from event to Optional<String>
. You can use this to only publish certain events, or put different events to different topics. Its not uncommon for Kafka topics to only contain one type of message. Note that the KafkaPublisher
needs to be shutDown
properly, to ensure all Producer
instances are properly closed.
public class KafkaEventPublicationConfiguration {
// ...
public KafkaPublisher<String, byte[]> kafkaPublisher(String topic,
ProducerFactory<String, byte[]> producerFactory,
KafkaMessageConverter<String, byte[]> kafkaMessageConverter,
int publisherAckTimeout) {
return KafkaPublisher.<String, byte[]>builder()
.topicResolver(m -> Optional.of(topic)) // Defaults to "Axon.Events" for all events
.producerFactory(producerFactory) // Hard requirement
.messageConverter(kafkaMessageConverter) // Defaults to a "DefaultKafkaMessageConverter"
.publisherAckTimeout(publisherAckTimeout) // Defaults to "1000" milliseconds; only used for "WAIT_FOR_ACK" mode
.build();
}
// ...
}
Lastly, we need to provide Axon’s event messages to the KafkaPublisher
. To that end a KafkaEventPublisher
should be instantiated through the builder pattern. Remember to add the KafkaEventPublisher
to an event processor implementation of your choice. It is recommended to use the KafkaEventPublisher#DEFAULT_PROCESSING_GROUP
as the processing group name of the event processor to distinguish it from other event processors.
public class KafkaEventPublicationConfiguration {
// ...
public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(KafkaPublisher<String, byte[]> kafkaPublisher) {
return KafkaEventPublisher.<String, byte[]>builder()
.kafkaPublisher(kafkaPublisher) // Hard requirement
.build();
}
public void registerPublisherToEventProcessor(EventProcessingConfigurer eventProcessingConfigurer,
KafkaEventPublisher<String, byte[]> kafkaEventPublisher) {
String processingGroup = KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
eventProcessingConfigurer.registerEventHandler(configuration -> kafkaEventPublisher)
.assignHandlerTypesMatching(
processingGroup,
clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class)
)
.registerSubscribingEventProcessor(processingGroup);
// Replace `registerSubscribingEventProcessor` for `registerTrackingEventProcessor` to use a tracking processor
}
// ...
}
Topic partition publication considerations
Kafka ensures message ordering on a topic-partition level, not on an entire topic. To control events of a certain group to be placed in a dedicated partition, based on aggregate identifier for example, the message converter’s SequencingPolicy
can be utilized.
The topic-partition pair events have been published in also has impact on event consumption. This extension mitigates any ordering concerns with the streamable solution, by ensuring a Consumer
always receives all events of a topic to be able to perform a complete ordering. This guarantee is however not given when using the subscribable event consumption approach. The subscribable stream leaves all the ordering specifics in the hands of Kafka, which means the events should be published on a consistent partition to ensure ordering.