Message Correlation

In messaging systems it is common to group messages together, or correlate them. In Axon Framework a Command message might result in one or several Event messages and a Query message might result in one or several QueryResponse messages. Usually, correlation is implemented using a specific message property, a so called correlation identifier.

Correlation Data Provider

Messages in Axon Framework use the MetaData property to transport meta information about the message. The MetaData object is of type Map<String, Object> and is passed around with the message. In order to fill the MetaData of a new message produced within a Unit of Work, a so called CorrelationDataProvider can be used. It is the Unit of Work which is in charge of populating the MetaData of a new message based on this CorrelationDataProvider. Axon Framework currently provides a couple of implementations of this functional interface:

MessageOriginProvider

By default, the MessageOriginProvider is registered as the correlation data provider to use. It is responsible for two values to be passed around from one Message to another, namely the correlationId and traceId. The correlationId of a message always references the identifier of the message it originates from (i.e. the parent message). The traceId on the other hand references to the message identifier which started the chain of messages (i.e. the root message). When neither of these fields are present in the parent message when a new message is created, the message identifier will be used for both. To place this in an example, if you would handle a Command message which in turn publishes an Event message, the Event message's MetaData will be populated based on:

  • The Command message's identifier for the correlationId.

  • The Command message's presence of the traceId in the MetaData or otherwise the message's identifier for the traceId.

SimpleCorrelationDataProvider

The SimpleCorrelationDataProvider is configured to unconditionally copy values of specified keys from one message to metadata of the other. To do so, the constructor of the SimpleCorrelationDataProvider must be called with a list of keys which should be copied. Here is an example how to configure it to copy the myId and myId2 values.

public class Configuration {
public CorrelationDataProvider customCorrelationDataProvider() {
return new SimpleCorrelationDataProvider("myId", "myId2");
}
}

MultiCorrelationDataProvider

A MultiCorrelationDataProvider is capable to combine the effect of multiple correlation data providers. To do so, the constructor of the MultiCorrelationDataProvider must be called with a list of providers, as shown in the following sample:

public class Configuration {
public CorrelationDataProvider customCorrelationDataProviders() {
return new MultiCorrelationDataProvider<CommandMessage<?>>(
Arrays.asList(
new SimpleCorrelationDataProvider("someKey"),
new MessageOriginProvider()
)
);
}
}

Implementing custom correlation data provider

If the predefined providers don't fulfill your requirements, you can always implement your own CorrelationDataProvider. The class must implement the CorrelationDataProvider interface, as is depicted in the following example:

public class AuthCorrelationDataProvider implements CorrelationDataProvider {
private final Function<String, String> usernameProvider;
public AuthCorrelationDataProvider(Function<String, String> userProvider) {
this.usernameProvider = userProvider;
}
@Override
public Map<String, ?> correlationDataFor(Message<?> message) {
Map<String, Object> correlationData = new HashMap<>();
if (message instanceof CommandMessage<?>) {
if (message.getMetaData().containsKey("authorization")) {
String token = (String) message.getMetaData().get("authorization");
correlationData.put("username", usernameProvider.apply(token));
}
}
return correlationData;
}
}

Configuration

When the default MessageOriginProvider isn't sufficient for your use case, the (custom) correlation data providers need to be registered inside you application. If you are using the Axon Configuration API, make sure to call Configuration#configureCorrelationDataProviders method to register the correlated data providers. If you are relying on Spring Boot Autoconfiguration, just provide a factory method exposing the Spring Bean, or multiple if you need more than one provider. The following snippets shows some possible approaches of registration:

Axon Configuration API
Spring Boot Autoconfiguration
Axon Configuration API
public class Configuration {
public void configuring() {
Configurer configurer =
DefaultConfigurer.defaultConfiguration()
.configureCorrelationDataProviders(config -> Arrays.asList(
new SimpleCorrelationDataProvider("someKey"),
new MessageOriginProvider()
));
}
}
Spring Boot Autoconfiguration
@Configuration
public class CorrelationDataProviderConfiguration {
// Configuring a single CorrelationDataProvider will automatically override the default MessageOriginProvider
@Bean
public CorrelationDataProvider someKeyCorrelationProvider() {
return new SimpleCorrelationDataProvider("someKey");
}
@Bean
public CorrelationDataProvider messageOriginProvider() {
return new MessageOriginProvider();
}
}