Message Correlation
In messaging systems it is common to group messages together or correlate them.
In Axon Framework a Command
message might result in one or several Event
messages and a Query
message might result in one or several QueryResponse
messages.
Usually, correlation is implemented using a specific message property, a so-called correlation identifier.
Correlation data provider
Messages in Axon Framework use the MetaData
property to transport meta-information about the message.
The MetaData
object is of type Map<String, Object>
and is passed around with the message.
To fill the MetaData
of a new message produced within a Unit of Work, a so called CorrelationDataProvider
can be used.
It is the Unit of Work that is in charge of populating the MetaData
of a new message based on this CorrelationDataProvider
.
Axon Framework currently provides a couple of implementations of this functional interface:
MessageOriginProvider
By default, the MessageOriginProvider
is registered as the correlation data provider to use.
It is responsible for two values to be passed around from one Message
to another, namely the correlationId
and traceId
.
The correlationId
of a message always references the identifier of the message it originates from (that is, the parent message).
The traceId
on the other hand references to the message identifier which started the chain of messages (that is, the root message).
When neither of these fields is present in the parent message when a new message is created, the message identifier will be used for both.
To place this in an example, if you would handle a Command
message which in turn publishes an Event
message, the Event
message’s MetaData
will be populated based on:
-
The
Command
message’s identifier for thecorrelationId
. -
The
Command
message’s presence of thetraceId
in theMetaData
or otherwise the message’s identifier for thetraceId
.
SimpleCorrelationDataProvider
The SimpleCorrelationDataProvider
is configured to unconditionally copy values of specified keys from one message to metadata of the other.
To do so, the constructor of the SimpleCorrelationDataProvider
must be called with a list of keys which should be copied.
Here is an example how to configure it to copy the myId
and myId2
values.
public class Configuration {
public CorrelationDataProvider customCorrelationDataProvider() {
return new SimpleCorrelationDataProvider("myId", "myId2");
}
}
MultiCorrelationDataProvider
A MultiCorrelationDataProvider
is capable to combine the effect of multiple correlation data providers.
To do so, the constructor of the MultiCorrelationDataProvider
must be called with a list of providers, as shown in the following sample:
public class Configuration {
public CorrelationDataProvider customCorrelationDataProviders() {
return new MultiCorrelationDataProvider<CommandMessage<?>>(
Arrays.asList(
new SimpleCorrelationDataProvider("someKey"),
new MessageOriginProvider()
)
);
}
}
Implementing custom correlation data provider
If the predefined providers don’t fulfill your requirements, you can always implement your own CorrelationDataProvider
.
The class must implement the CorrelationDataProvider
interface, as is depicted in the following example:
public class AuthCorrelationDataProvider implements CorrelationDataProvider {
private final Function<String, String> usernameProvider;
public AuthCorrelationDataProvider(Function<String, String> userProvider) {
this.usernameProvider = userProvider;
}
@Override
public Map<String, ?> correlationDataFor(Message<?> message) {
Map<String, Object> correlationData = new HashMap<>();
if (message instanceof CommandMessage<?>) {
if (message.getMetaData().containsKey("authorization")) {
String token = (String) message.getMetaData().get("authorization");
correlationData.put("username", usernameProvider.apply(token));
}
}
return correlationData;
}
}
Configuration
When the default MessageOriginProvider
isn’t sufficient for your use case, the (custom) correlation data providers need to be registered inside you application.
If you are using the Axon Configuration API, make sure to call Configuration#configureCorrelationDataProviders
method to register the correlated data providers.
If you are relying on Spring Boot Autoconfiguration, just provide a factory method exposing the Spring Bean, or multiple if you need more than one provider.
The following snippets show some possible approaches of registration:
-
Configuration API
-
Spring Boot
public class Configuration {
public void configuring() {
Configurer configurer = DefaultConfigurer
.defaultConfiguration()
.configureCorrelationDataProviders(config -> Arrays.asList(
new SimpleCorrelationDataProvider("someKey"),
new MessageOriginProvider()
));
}
}
@Configuration
public class CorrelationDataProviderConfiguration {
// Configuring a single CorrelationDataProvider will automatically override the default MessageOriginProvider
@Bean
public CorrelationDataProvider someKeyCorrelationProvider() {
return new SimpleCorrelationDataProvider("someKey");
}
@Bean
public CorrelationDataProvider messageOriginProvider() {
return new MessageOriginProvider();
}
}