Kafka is an alternative approach to distributing events, besides Axon Server which is default.
Kafka is a very popular system for publishing and consuming events. It's architecture is fundamentally different from most messaging systems, and combines speed with reliability.
To use the Kafka components from Axon, make sure the
axon-kafka module is available on the classpath.
When Event Messages are published to an Event Bus (or Event Store), they can be forwarded to a Kafka topic using the
KafkaPublisher. Publication of the messages to Kafka will happen in the same thread (and Unit of Work) that published the events to the Event Bus.
KafkaPublisher takes a
KafkaPublisherConfiguration instance, which provides the different values and settings required to publish events to Kafka.
KafkaPublisherConfiguration configuration = KafkaPublisherConfiguration.<String, byte>builder().withProducerFactory(factory) // the factory for creating the actual client instances for sending events to kafka.withTopic("topic") // the topic to send the events to. Defaults to 'Axon.Events'.build();KafkaPublisher<String, byte> publisher = new KafkaPublisher<>(configuration); // create the publisher itselfpublisher.start(); // to start publishing all events
Axon provides a
DefaultProducerFactory, which attempts to reuse created instances to avoid continuous creation of new ones. It's creation uses a similar builder pattern. The builder requires a
configs Map, which are the settings to use for the Kafka client, such as the Kafka instance locations. Please check the Kafka guide for the possible settings and their values.
DefaultProducerFactory.builder(configs).withConfirmationMode(ConfirmationMode.WAIT_FOR_ACK) // either TRANSACTIONAL, WAIT_FOR_ACK or NONE (default).build();// or, to create a transactional ProducerFactoryDefaultProducerFactory.builder(configs).withTransactionalIdPrefix("myTxPrefix") // this will also set ConfirmationMode to TRANSACTIONAL.build();
Note that the
DefaultProducerFactory needs to be
shutDown properly, to ensure all producer instances are properly closed.
Messages can be consumed by Tracking Event Processors by configuring a
KafkaMessageSource. This message source uses a
Fetcher to retrieve the actual messages from Kafka. You can either use the
AsyncFetcher, or provide your own.
AsyncFetcher is initialized using a builder, which requires the Kafka Configuration to initialize the client. Please check the Kafka guide for the possible settings and their values.
// the fetcher only requires Kafka Client Configuration properties:AsyncFetcher.builder(configs).build();// but customization is possible:AsyncFetcher.builder(configs).withTopic("myTopic") // the Kafka topic to read from. Defaults to 'Axon.Events'.withPool(customThreadPool) // defaults to a cached thread pool.withPollTimeout(customTimeout, timeUnit) // defaults to 5 seconds.onRecordPublished(callback) // register behavior to execute on every incoming message.withBufferFactory(bufferFactory) // to customize the implementation of the buffers used to hold messages before they are consumed.build();
AsyncFetcher doesn't need to be explicitly started, as it will start when the first processors connect to it. It does need to be shut down, to ensure any thread pool or active connections are properly closed.
By default, Axon uses the
DefaultKafkaMessageConverter to convert an
EventMessage to a Kafka
ProducerRecord and an
ConsumerRecord back into an
EventMessage. This implementation already allows for some customization, such as how the
MetaData is mapped to Kafka headers. You can also choose which serializer should be used to fill the payload of the
For further customization, you can implement your own
KafkaMessageConverter, and wire it into the
KafkaPublisherConfiguration.<String, byte>builder() // the <String, byte> defines the type of key and payload, respectively.withMessageConverter(customConverter) // the converter needs to match the expected key and payload type.build();AsyncFetcher.builder(configs).withMessageConverter(customConverter).build();
Axon will automatically provide certain Kafka related components based on the availability of beans and/or properties.
To enable a KafkaPublisher, either provide a bean of type
ProducerFactory, or set
application.properties to have auto configuration configure a ProducerFactory with Transactional semantics. In either case,
application.properties should provide the necessary Kafka Client properties, available under the
axon.kafka prefix. If none are provided, default settings are used, and
localhost:9092 is used as the bootstrap server.
To enable a
KafkaMessageSource, either provide a bean of type
ConsumerFactory, or provide the
axon.kafka.consumer.group-id setting in
application.properties. Also make sure all necessary Kafka Client Configuration properties are available under the
Alternatively, you may provide your own
KafkaMessageSource bean(s), in which case Axon will not create the default KafkaMessageSource.