Message Intercepting

There are two different types of interceptors: dispatch interceptors and handler interceptors.

  1. Dispatch interceptors - Interceptors that are invoked before a message is dispatched. At that point, it may not even be known that a handler exists for that message. They can modify the message (add metadata) or block dispatching entirely.

  2. Handler interceptors - Interceptors that are invoked just before the message handler is invoked. They can perform actions before and after handling, and are aware of the active processing context.

Let’s look at each interceptor in a bit more detail.

Message dispatch interceptors are invoked when a message is dispatched/published on a bus. They can alter the message by adding metadata or block it by not proceeding with the chain. These interceptors are always invoked on the thread that dispatches the message.

The MessageDispatchInterceptor interface defines a single method:

public interface MessageDispatchInterceptor<M extends Message> {

    @Nonnull
    MessageStream<?> interceptOnDispatch(
            @Nonnull M message,
            @Nullable ProcessingContext context,
            @Nonnull MessageDispatchInterceptorChain<M> interceptorChain
    );
}

In turn, message handler interceptors can take action both before and after message handling. Interceptors can even block message processing altogether, for example for security reasons.

The MessageHandlerInterceptor interface also defines a single method:

public interface MessageHandlerInterceptor<M extends Message> {

    @Nonnull
    MessageStream<?> interceptOnHandle(
            @Nonnull M message,
            @Nonnull ProcessingContext context,
            @Nonnull MessageHandlerInterceptorChain<M> interceptorChain
    );
}

Unlike dispatch interceptors, handler interceptors are invoked in the context of the message handler with an active ProcessingContext. This allows them to:

  • Register lifecycle callbacks (see ProcessingContext).

  • Attach correlation data that will propagate to messages created during handling.

  • Manage transactions around handling.

  • Access processing phase information.

Note that both interceptor types are much a like. Both intercept a message, both receive an interceptor specific chain, both have a ProcessingContext parameter, and both return a MessageStream that allows them to work seamlessly with Axon’s async-native architecture. The differences lie in the used interceptor chain implementation, which is unique per type of interceptor, and the nullability of ProcessingContext when dispatching.

As a ProcessingContext is only active when handling a message, a dispatch interceptor will only have a non-null context when dispatching happens from within a message handler. Subsequently, a handler interceptor will always have an active processing context since it wraps message handling, for which a ProcessingContext is a hard requirement.

Generic message interceptors

The sections below describe dispatch and handler interceptors for the specific types of message Axon Framework supports. However, you can register a generic Message interceptor with Axon, which will automatically be attached to all message-specific dispatching or handling components:

public void registerMessageHandlerInterceptor(MessagingConfigurer configurer) {
    configurer.registerMessageHandlerInterceptor(
            // The LoggingInterceptor is provided by Axon out of the box.
            config -> new LoggingInterceptor<>()
    );
}
Component-specific interceptors

Besides being able to register interceptors for your buses and processors, you are able to register them for specific handling components. Please check this chapter for more details on this.

Command interceptors

One of the advantages of using a command bus is the ability to undertake action based on all incoming commands. Examples are logging or authentication, which you might want to do regardless of the type of command.

Command dispatch interceptors

Command dispatch interceptors are invoked when a command is dispatched on a command bus. They can alter the command message by adding metadata or block the command by not proceeding with the chain. These interceptors are always invoked on the thread that dispatches the command.

Let’s create a dispatch interceptor that logs each command:

public class CommandLoggingDispatchInterceptor
        implements MessageDispatchInterceptor<CommandMessage> {

    private static final Logger logger =
            LoggerFactory.getLogger(CommandLoggingDispatchInterceptor.class);

    @Override
    public MessageStream<?> interceptOnDispatch(
            CommandMessage command,
            ProcessingContext context,
            MessageDispatchInterceptorChain<CommandMessage> chain
    ) {

        logger.info("Dispatching command: {}", command.type());

        // Proceed with the chain
        return chain.proceed(command, context);
    }
}

You can also modify the message before dispatching:

public class MetadataEnrichingInterceptor
        implements MessageDispatchInterceptor<CommandMessage> {

    @Override
    public MessageStream<?> interceptOnDispatch(
            CommandMessage command,
            ProcessingContext context,
            MessageDispatchInterceptorChain<CommandMessage> chain
    ) {
        // Add metadata
        CommandMessage enrichedCommand = command.andMetadata(
            Metadata.with("timestamp", String.valueOf(System.currentTimeMillis()))
        );

        return chain.proceed(enrichedCommand, context);
    }
}

Register the interceptor with a CommandBus:

public void registerCommandDispatchInterceptor(MessagingConfigurer configurer) {
    configurer.registerCommandDispatchInterceptor(
            config -> new CommandLoggingDispatchInterceptor()
    );
}

Structural validation

There is no point in processing a command if it does not contain all required information in the correct format. A command that lacks information should be blocked as early as possible, preferably even before a transaction has been started. Therefore, an interceptor should check all incoming commands for the availability of such information. This is called structural validation.

Axon Framework has support for the JSR 303 Bean Validation specification. This allows you to annotate the fields on commands with annotations like @NotEmpty and @Pattern. You need to include a JSR 303 implementation (such as Hibernate-Validator) on your classpath. Then, configure a BeanValidationInterceptor on your command bus, and it will automatically find and configure your validator implementation.

Interceptor Ordering

You want to spend as few resources on an invalid command as possible. Therefore, this interceptor is generally placed at the front of the interceptor chain. In some cases, a LoggingInterceptor (provided by Axon) or AuditingInterceptor (custom) might need to be placed first, with the validating interceptor immediately following it.

The BeanValidationInterceptor also implements MessageHandlerInterceptor, allowing you to configure it as a handler interceptor as well.

Command handler interceptors

Command handler interceptors can take action both before and after command processing. Interceptors can even block command processing altogether, for example for security reasons.

Let’s create a handler interceptor that performs authorization:

public class AuthorizationInterceptor
        implements MessageHandlerInterceptor<CommandMessage> {

    @Override
    public MessageStream<?> interceptOnHandle(
            CommandMessage command,
            ProcessingContext context,
            MessageHandlerInterceptorChain<CommandMessage> chain
    ) {

        String userId = command.metadata().get("userId");

        if (userId == null) {
            throw new SecurityException("No user ID in metadata");
        }

        if (!"authorized-user".equals(userId)) {
            throw new SecurityException("User not authorized");
        }

        // User is authorized, proceed
        return chain.proceed(command, context);
    }
}

You can also perform actions after handling by using the ProcessingContext:

public class LoggingInterceptor
        implements MessageHandlerInterceptor<CommandMessage> {

    private static final Logger logger =
            LoggerFactory.getLogger(LoggingInterceptor.class);

    @Override
    public MessageStream<?> interceptOnHandle(
            CommandMessage command,
            ProcessingContext context,
            MessageHandlerInterceptorChain<CommandMessage> chain
    ) {
        logger.info("Before handling: {}", command.type());

        // Register action to run after successful handling
        context.whenComplete(ctx -> {
            logger.info("Successfully handled: {}", command.type());
        });

        // Register error handler
        context.onError((ctx, phase, error) -> {
            logger.error("Error handling {}: {}", command.type(), error.getMessage());
        });

        return chain.proceed(command, context);
    }
}

Register the interceptor with a CommandBus:

public void registerCommandHandlerInterceptor(MessagingConfigurer configurer) {
    configurer.registerCommandHandlerInterceptor(
            config -> new AuthorizationInterceptor()
    );
}

Event interceptors

Similar to command messages, event messages can also be intercepted prior to publishing and handling.

Event dispatch interceptors

Any event dispatch interceptors registered to an event bus will be invoked when an event is published. They have the ability to alter the event message by adding metadata or provide logging capabilities. These interceptors are always invoked on the thread that published the event.

public class EventLoggingDispatchInterceptor
        implements MessageDispatchInterceptor<EventMessage> {

    private static final Logger logger =
            LoggerFactory.getLogger(EventLoggingDispatchInterceptor.class);

    @Override
    public MessageStream<?> interceptOnDispatch(
            EventMessage event,
            ProcessingContext context,
            MessageDispatchInterceptorChain<EventMessage> chain
    ) {

        logger.info("Publishing event: {}", event.type());
        return chain.proceed(event, context);
    }
}

Below we have an example on how to register an EventMessage-specific MessageDispatchInterceptor with Axon:

public void registerEventDispatchInterceptor(MessagingConfigurer configurer) {
    configurer.registerEventDispatchInterceptor(
            config -> new EventLoggingDispatchInterceptor()
    );
}

Event handler interceptors

Event handler interceptors can take action both before and after event handling.

public class EventSecurityInterceptor
        implements MessageHandlerInterceptor<EventMessage> {

    @Override
    public MessageStream<?> interceptOnHandle(
            EventMessage event,
            ProcessingContext context,
            MessageHandlerInterceptorChain<EventMessage> chain
    ) {
        String userId = event.metadata().get("userId");
        if (userId == null || !"authorized-user".equals(userId)) {
            throw new SecurityException("Unauthorized event");
        }

        return chain.proceed(event, context);
    }
}

To register the EventSecurityInterceptor as a EventMessage-specific MessageHandlerInterceptor, you can utilize the MessagingConfigurer as follows:

public void registerEventHandlerInterceptor(MessagingConfigurer configurer) {
    configurer.registerEventHandlerInterceptor(
            config -> new EventSecurityInterceptor()
    );
}

Query interceptors

One of the advantages of using a query bus is the ability to undertake action based on all incoming queries. Examples are logging or authentication, which you might want to do regardless of the type of query.

Query dispatch interceptors

Query dispatch interceptors are invoked when a query is dispatched on a query bus. They have the ability to alter the message by adding metadata or block the handler execution.

public class QueryLoggingDispatchInterceptor
        implements MessageDispatchInterceptor<QueryMessage> {

    private static final Logger logger =
            LoggerFactory.getLogger(QueryLoggingDispatchInterceptor.class);

    @Override
    public MessageStream<?> interceptOnDispatch(
            QueryMessage query,
            ProcessingContext context,
            MessageDispatchInterceptorChain<QueryMessage> chain
    ) {

        logger.info("Dispatching query: {}", query.type());
        return chain.proceed(query, context);
    }
}

Below we have an example on how to register an QueryMessage-specific MessageDispatchInterceptor with Axon:

public void registerQueryDispatchInterceptor(MessagingConfigurer configurer) {
    configurer.registerQueryDispatchInterceptor(
            config -> new QueryLoggingDispatchInterceptor()
    );
}

Structural validation

There is no point in processing a query if it does not contain all required information in the correct format. A query that lacks information should be blocked as early as possible. Therefore, an interceptor should check all incoming queries for the availability of such information.

Axon Framework has support for JSR 303 Bean Validation. This allows you to annotate the fields on queries with annotations like @NotEmpty and @Pattern. Configure a BeanValidationInterceptor on your query bus, and it will automatically find and configure your validator implementation.

Interceptor Ordering

You want to spend as few resources on invalid queries as possible. Therefore, this interceptor is generally placed at the front of the interceptor chain.

The BeanValidationInterceptor also implements MessageHandlerInterceptor, allowing you to configure it as a handler interceptor as well.

Query handler interceptors

Query handler interceptors can take action both before and after query processing.

public class QuerySecurityInterceptor
        implements MessageHandlerInterceptor<QueryMessage> {

    @Override
    public MessageStream<?> interceptOnHandle(
            QueryMessage query,
            ProcessingContext context,
            MessageHandlerInterceptorChain<QueryMessage> chain
    ) {
        String userId = query.metadata().get("userId");
        if (userId == null || !"authorized-user".equals(userId)) {
            throw new SecurityException("Unauthorized query");
        }

        return chain.proceed(query, context);
    }
}

To register the QuerySecurityInterceptor as a QueryMessage-specific MessageHandlerInterceptor, you can utilize the MessagingConfigurer as follows:

public void registerEventHandlerInterceptor(MessagingConfigurer configurer) {
    configurer.registerEventHandlerInterceptor(
            config -> new EventSecurityInterceptor()
    );
}