axon-kafka
module is available on the classpath. Using the extension requires setting up and configuring Kafka following your project's requirements. How this is achieved is outside of the scope of this reference guide and should be found in Kafka's documentation.KafkaPublisher
. To achieve this it will utilize a Kafka Producer
, retrieved through Axon's ProducerFactory
. The KafkaPublisher
in turn receives the events to publish from a KafkaEventPublisher
.KafkaEventPublisher
is an event message handler in Axon terms, we can provide it to any Event Processor to receive the published events. The choice of event processor which brings differing characteristics for event publication to Kafka:ConfirmationMode
is used. The ConfirmationMode
influences the process of actually producing an event message on a Kafka topic, but also what kind of Producer
the ProducerFactory
will instantiate:Producer
to start, commit and (in case of failure) rollback the transaction of publishing an event message. Alongside this, it will create a pool of Producer
instances in the ProducerFactory
to avoid continuous creation of new ones, requiring the user to provide a "transactional id prefix" to uniquely identify every Producer
in the pool.ConfirmationMode
will require the Producer
instance to wait for a default of 1 second (configurable on the KafkaPublisher
) until the event message publication has been acknowledged. Alongside this, it will create a single, shareable Producer
instance from within the ProducerFactory
.Producer
instance from within the ProducerFactory
.ProducerFactory
. Axon provides the DefaultProducerFactory
implementation of the ProducerFactory
, which should be instantiated through the provided DefaultProducerFactory.Builder
.Producer
configuration Map
. The Map
contains the settings to use for the Kafka Producer
client, such as the Kafka instance locations. Please check the Kafka documentation for the possible settings and their values.KafkaPublisher
, which has a hard requirement on the ProducerFactory
. Additionally, this would be the place to define the Kafka topic upon which Axon event messages will be published. Note that the KafkaPublisher
needs to be shutDown
properly, to ensure all Producer
instances are properly closed.KafkaPublisher
. To that end a KafkaEventPublisher
should be instantiate through the builder pattern. Remember to add the KafkaEventPublisher
to an event processor implementation of your choice. It is recommended to use the KafkaEventPublisher#DEFAULT_PROCESSING_GROUP
as the processing group name of the event processor to distinguish it from other event processors.SequencingPolicy
can be utilized.Consumer
always receives all events of a topic to be able to perform a complete ordering. This guarantee is however not given when using the subscribable event consumption approach. The subscribable stream leaves all the ordering specifics in the hands of Kafka, which means the events should be published on a consistent partition to ensure ordering.Consumer
, which breaks down to a combination of a ConsumerFactory
and a Fetcher
. The extension provides a DefaultConsumerFactory
, whose sole requirement is a Map
of configuration properties. The Map
contains the settings to use for the Kafka Consumer
client, such as the Kafka instance locations. Please check the Kafka documentation for the possible settings and their values.Fetcher
instance's job to retrieve the actual messages from Kafka by directing a Consumer
instance it receives from the message source. You can draft up your own implementation or use the provided AsyncFetcher
to this end. The AsyncFetcher
doesn't need to be explicitly started, as it will react on the message source starting it. It does need to be shut down, to ensure any thread pool or active connections are properly closed.SubscribableKafkaMessageSource
means you are inclined to use a SubscribingEventProcessor
to consume the events in your event handlers.Consumer
instances into "Consumer Groups" is used. This is strengthened by making the groupId
upon source construction a hard requirement. To use a common groupId
essentially means that the event-stream-workload can be shared on Kafka's terms, whereas a SubscribingEventProcessor
typically works on it's own accord regardless of the number of instances. The workload sharing can be achieved by having several application instances with the same groupId
or by adjusting the consumer count through the SubscribableKafkaMessageSource
's builder. The same benefit holds for resetting an event stream, which in Axon is reserved to the TrackingEventProcessor
, but is now opened up through Kafka's own API's.SubscribableKafkaMessageSource
thus provides the niceties the tracking event processor normally provides, it does come with two catches:SequencingPolicy
to deduce which thread receives which events is entirely lost.Consumer
for the events your handlers receives.TrackingEventProcessor#resetTokens
operationSubscribableKafkaMessageSource
as a message source for a SubscribingEventProcessor
, there is one additional requirement beside source creation and registration. The source should only start with polling for events as soon as all interested subscribing event processors have been subscribed to it. To ensure the SubscribableKafkaMessageSource#start()
operation is called at the right point in the configuration lifecycle, the KafkaMessageSourceConfigurer
should be utilized:KafkaMessageSourceConfigurer
is an Axon ModuleConfiguration
which ties in to the start and end lifecycle of the application. It should receive the SubscribableKafkaMessageSource
as a source which should start and stop. The KafkaMessageSourceConfigurer
instance itself should be registered as a module to the main Configurer
.SubscribableKafkaMessageSource.Builder#autoStart()
can be toggled on. This will start the SubscribableKafkaMessageSource
upon the first subscription.StreamableKafkaMessageSource
means you are inclined to use a TrackingEventProcessor
to consume the events in your event handlers.Consumer
instances in the same "Consumer Group", the streamable approach enforces a unique consumer group per Consumer
instance. Axon requires uniquely identified consumer group/Consumer
pairs to (1) ensure event ordering and (2) to guarantee that each instance/thread receives the correct portion of the event stream during parallel processing. The distinct group id is derived by the StreamableKafkaMessageSource
through a groupIdPrefix
and a groupdIdSuffixFactory
, which are adjustable through the source's builder.TrackingToken
. Using the StreamableKafkaMessageSource
means a KafkaTrackingToken
containing topic-partition to offset pairs is stored in the TokenStore
.KafkaMessageConverter<K, V>
has been shown as a requirement for event production and consumption. The K
is the format of the message's key, where the V
stand for the message's value. The extension provides a DefaultKafkaMessageConverter
which converts an axon EventMessage
to a Kafka ProducerRecord
, and an ConsumerRecord
back into an EventMessage
. This DefaultKafkaMessageConverter
uses String
as the key and byte[]
as the value of the message to de-/serialize.EventMessage
's MetaData
is mapped to Kafka headers. This is achieved by adjusting the "header value mapper" in the DefaultKafkaMessageConverter
's builder.SequencingPolicy
can be adjusted to change the behaviour of the record key being used. The default sequencing policy is the SequentialPerAggregatePolicy
, which leads to the aggregate identifier of an event being the key of a ProducerRecord
and ConsumerRecord
.DefaultKafkaMessageConverter
supports this by provisioning an EventUpcasterChain
and run the upcasting process on the MetaData
and Payload
of individual messages converted from ConsumerRecord
before those are passed to the Serializer
and converted into Event
instances.KafkaMessageConverter
feeds the upcasters with messages one-by-one, limiting it to one-to-one or one-to-many upcasting only. Upcasters performing a many-to-one or many-to-many operation thus won't be able to operate inside the extension (yet).Serializer
used by the converter can be adjusted. See the Serializer section for more details on this.KafkaMessageConverter
on both the producing and consuming end, as otherwise exception upon deserialization should be expected.org.axonframework.extensions.kafka
and artifact id axon-kafka-spring-boot-starter
. When using the auto configuration, the following components will be created for you automatically:DefaultKafkaMessageConverter
using the configured eventSerializer
(which defaults to XStreamSerializer
).String
for the keys and a byte[]
for the record's valuesDefaultProducerFactory
using a String
for the keys and a byte[]
for the record's values.axon.kafka.publisher.confirmation-mode
should be adjusted to change this mode,axon.kafka.producer.transaction-id-prefix
property to be provided.axon.kafka.producer.transaction-id-prefix
is non-null and non-empty,KafkaPublisher
.Producer
instance from the ProducerFactory
to publish events to the configured Kafka topic.KafkaEventPublisher
. Used to provide events to the KafkaPublisher
and to assign a processor name__axon-kafka-event-publishing-group
to it. Defaults to a SubscribingEventProcessor
.TrackingEventProcessor
is desired, the axon.kafka.producer.event-processor-mode
should be set to tracking
DefaultConsumerFactory
using a String
for the keys and a byte[]
for the record's valuesAsyncFetcher
. To adjust the Fetcher
's poll timeout, the axon.kafka.fetcher.poll-timeout
can be set.StreamableKafkaMessageSource
which can be used for TrackingEventProcessor
instancesapplication.properties
file. The Kafka extension configuration specifics should be placed under prefix axon.kafka
. On this level, the bootstrapServers
(defaults to localhost:9092
) and default-topic
used by the producing and consuming side can be defined.DefaultProducerFactory
and DefaultConsumerFactory
expects a Map
of configuration properties, which correspond to Kafka Producer
and Consumer
specific properties respectively. As such, Axon itself passes along these properties without using them directly itself. The application.properties
file provides a number of named properties under the axon.kafka.producer.
and axon.kafka.consumer.
prefixes. If the property you are looking for is not predefined in Axon KafkaProperties
file, you are always able to introduce properties in a map style.Auto configuring aSubscribableKafkaMessageSource
The auto configuredStreamableKafkaMessageSource
can be toggled off by setting theaxon.kafka.consumer.event-processing-mode
tosubscribing
.Note that this does not create aSubscribableKafkaMessageSource
for you out of the box. To set up a subscribable message, we recommend to read this section.