Customizing Event Message Format
In the previous sections the KafkaMessageConverter<K, V>
has been shown as a requirement for event production and consumption. The K
is the format of the message’s key, where the V
stand for the message’s value. The extension provides a DefaultKafkaMessageConverter
which converts an axon EventMessage
to a Kafka ProducerRecord
, and an ConsumerRecord
back into an EventMessage
. This DefaultKafkaMessageConverter
uses String
as the key and byte[]
as the value of the message to de-/serialize.
Albeit the default, this implementation allows for some customization, such as how the EventMessage
MetaData
is mapped to Kafka headers. This is achieved by adjusting the "header value mapper" in the DefaultKafkaMessageConverter
builder.
The SequencingPolicy
can be adjusted to change the behaviour of the record key being used. The default sequencing policy is the SequentialPerAggregatePolicy
, which leads to the aggregate identifier of an event being the key of a ProducerRecord
and ConsumerRecord
.
The format of an event message defines an API between the producer and the consumer of the message. This API may change over time, leading to incompatibility between the event class' structure on the receiving side and the event structure of a message containing the old format. Axon addresses the topic of Event Versioning by introducing Event Upcasters. The DefaultKafkaMessageConverter
supports this by provisioning an EventUpcasterChain
and run the upcasting process on the MetaData
and Payload
of individual messages converted from ConsumerRecord
before those are passed to the Serializer
and converted into Event
instances.
Note that the KafkaMessageConverter
feeds the upcasters with messages one-by-one, limiting it to one-to-one or one-to-many upcasting only. Upcasters performing a many-to-one or many-to-many operation thus won’t be able to operate inside the extension (yet).
Lastly, the Serializer
used by the converter can be adjusted. See the Serializer section for more details on this.
public class KafkaMessageConversationConfiguration {
// ...
public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(Serializer serializer,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy,
BiFunction<String, Object, RecordHeader> headerValueMapper,
EventUpcasterChain upcasterChain) {
return DefaultKafkaMessageConverter.builder()
.serializer(serializer) // Hard requirement
.sequencingPolicy(sequencingPolicy) // Defaults to a "SequentialPerAggregatePolicy"
.upcasterChain(upcasterChain) // Defaults to empty upcaster chain
.headerValueMapper(headerValueMapper) // Defaults to "HeaderUtils#byteMapper()"
.build();
}
// ...
}
Make sure to use an identical KafkaMessageConverter
on both the producing and consuming end, as otherwise exception upon deserialization should be expected. A CloudEventKafkaMessageConverter
is also available using the Cloud Events spec.