Infrastructure

When it comes to dispatching queries, as explained in the Dispatching queries section, there are several implementations for actually sending query messages. The next sections provide an overview of the available implementations, as well as how to set up query dispatching infrastructure with Axon.

All query operations are async-native, returning CompletableFuture or Publisher types. This enables efficient asynchronous processing without blocking threads.

Query gateway

The query gateway is a convenient interface towards the query dispatching mechanism. While you are not required to use a gateway to dispatch queries, it is generally the easiest option to do so.

Axon provides a QueryGateway interface and the DefaultQueryGateway implementation. The query gateway provides methods for:

  • Point-to-point queries: query() for single results, queryMany() for multiple results (both return CompletableFuture)

  • Streaming queries: streamingQuery() returns a Publisher for streaming large result sets

  • Subscription queries: subscriptionQuery() returns a Publisher combining initial result and updates

All methods support an optional ProcessingContext parameter for correlation data propagation when dispatching from within a message handler.

The query gateway automatically handles message construction, dispatch interceptors, and result conversion. It’s configured with access to the query bus and dispatch interceptors.

Query bus

The query bus is the mechanism that dispatches queries to query handlers. Queries are registered using the query name (based on MessageType). Each query name can have only one registered handler.

DistributedQueryBus

The DistributedQueryBus, together with the AxonServerQueryBusConnector, is the default query bus when Axon Server is available. It connects to Axon Server to send and receive queries in a distributed way, allowing queries to be handled by any node in the cluster.

The DistributedQueryBus combined with the AxonServerQueryBusConnector provides:

  • Distributed query handling: Queries are routed to the appropriate handler across connected applications

  • Load balancing: When the same application is deployed across multiple instances, Axon Server can distribute queries among those instances

  • Subscription query support: Full support for subscription queries with update distribution across nodes

  • Query prioritization: Support for query priority to ensure critical queries are processed first

  • Local handler shortcut: When enabled (default), queries for which a local handler is registered are executed locally without going through the connector, improving performance by avoiding network overhead

Other QueryBusConnector implementations will follow in the future. Through those, distributing query messages without Axon Server will become a possibility.

Configuring the distributed query bus

The DistributedQueryBus can be configured using the DistributedQueryBusConfiguration class, which provides a fluent API for customizing query processing behavior.

Available configuration options:

  • Query threads: Number of threads used for query processing (default: 10)

  • Query queue capacity: Capacity of the priority queue for query processing tasks (default: 1000)

  • Custom executor service: Provide a custom ExecutorService for query processing

  • Local query handler preference: Enable or disable the local handler shortcut (default: enabled)

The local handler shortcut, when enabled, allows the distributed query bus to use local query handlers directly when available, bypassing remote dispatch. This improves performance by avoiding network overhead when a handler is available locally. Only when no local handler is available will the query be dispatched remotely through the connector.

Configuration examples
  • Configuration API

  • Spring Boot

Customize the distributed query bus configuration:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.queryhandling.distributed.DistributedQueryBusConfiguration;

public class AxonConfig {
    public void configureQueryBus(MessagingConfigurer configurer) {
        // Customize the configuration
        DistributedQueryBusConfiguration config = new DistributedQueryBusConfiguration()
                .queryThreads(20)                        // Set number of query processing threads
                .queryQueueCapacity(2000)                // Set queue capacity
                .preferLocalQueryHandler(true);         // Enable local handler shortcut (default)

        // Register the custom configuration
        configurer.registerComponent(DistributedQueryBusConfiguration.class, c -> config);
    }
}

To disable the local handler shortcut:

DistributedQueryBusConfiguration config = new DistributedQueryBusConfiguration()
        .preferLocalQueryHandler(false);  // Force all queries through connector

When using Spring Boot with Axon Server, configure the query bus through application properties:

# Configure query processing threads
axon.axonserver.query-threads=20

# Or configure programmatically
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.axonframework.messaging.queryhandling.distributed.DistributedQueryBusConfiguration;

@Configuration
public class AxonConfig {

    @Bean
    public DistributedQueryBusConfiguration queryBusConfiguration() {
        return new DistributedQueryBusConfiguration()
                .queryThreads(20)
                .preferLocalQueryHandler(true);
    }
}
Default configuration
  • Configuration API

  • Spring Boot

Declare dependencies:

<!--somewhere in the POM file-->
<dependency>
    <groupId>org.axonframework</groupId>
    <artifactId>axon-server-connector</artifactId>
    <version>${axon.version}</version>
</dependency>
<dependency>
    <groupId>org.axonframework</groupId>
    <artifactId>axon-messaging</artifactId>
    <version>${axon.version}</version>
</dependency>

Configure your application:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;

public class AxonApp {

    public static void main(String[] args) {
        // Returns a Configurer instance with default components configured.
        // `DistributedQueryBus` with `AxonServerQueryBusConnector` is configured as Query Bus by default.
        MessagingConfigurer configurer = MessagingConfigurer.create();
    }
}

By simply declaring dependency to axon-spring-boot-starter, Axon will automatically configure the AxonServerQueryBusConnector:

<!--somewhere in the POM file-->
<dependency>
    <groupId>org.axonframework</groupId>
    <artifactId>axon-spring-boot-starter</artifactId>
    <version>${axon.version}</version>
</dependency>
Excluding the Axon Server Connector

If you exclude the axon-server-connector dependency you will fall back to the non-AxonServer query bus option, the SimpleQueryBus (see below).

SimpleQueryBus

The SimpleQueryBus is a local, non-distributed query bus implementation that processes queries in the dispatching thread by default. It’s used when Axon Server is not available or when you explicitly configure it.

The SimpleQueryBus provides:

  • Local query handling: All queries are handled within the same JVM.

  • Simple routing: Routes queries to their registered handler based on query name.

  • Subscription query support: Full support for subscription queries within the same JVM.

  • Transaction management: Integrates with a TransactionManager for transactional query handling.

To configure a SimpleQueryBus (instead of an AxonServerQueryBus):

  • Configuration API

  • Spring Boot

import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.queryhandling.SimpleQueryBus;
import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory;

public class AxonConfig {
    // omitting other configuration methods...
    public void configureQueryBus(MessagingConfigurer configurer) {
        configurer.registerQueryBus(
                config ->  new SimpleQueryBus(config.getComponent(UnitOfWorkFactory.class))
        );
    }
}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.axonframework.messaging.queryhandling.QueryBus;
import org.axonframework.messaging.queryhandling.SimpleQueryBus;
import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory;

@Configuration
public class AxonConfig {
    // omitting other configuration methods...
    @Bean
    public QueryBus queryBus(UnitOfWorkFactory unitOfWorkFactory) {
        return new SimpleQueryBus(unitOfWorkFactory);
    }
}

Subscription query infrastructure

Subscription queries allow clients to receive an initial result and then continue receiving updates as long as the subscription is active. The subscription query infrastructure consists of several components working together.

QueryUpdateEmitter

The QueryUpdateEmitter is responsible for emitting updates to active subscription queries. The QueryUpdateEmitter is context-aware and must be created from the ProcessingContext:

import org.axonframework.messaging.queryhandling.QueryUpdateEmitter;
import org.axonframework.messaging.eventhandling.EventHandler;

@Component
public class CardSummaryProjection {

    @EventHandler
    public void on(CardRedeemedEvent event, ProcessingContext context) {
        // Create a context-aware emitter
        QueryUpdateEmitter emitter = QueryUpdateEmitter.forContext(context);

        // Update the model
        CardSummary summary = updateModel(event);

        // Emit update to subscription queries
        emitter.emit(
            FetchCardSummaryQuery.class,
            query -> query.cardSummaryId().equals(event.cardId()),
            summary
        );
    }
}

The emitter filters subscription queries based on the provided predicate and emits the update only to matching subscriptions. This allows fine-grained control over which subscribers receive which updates.

Automatic emitter creation for annotated methods

When using annotated message handlers (@EventHandler, @QueryHandler, etc.), Axon Framework automatically handles the QueryUpdateEmitter creation for you through parameter resolution. You can simply inject it as a method parameter without explicitly creating it from the ProcessingContext:

@EventHandler
public void on(CardRedeemedEvent event, QueryUpdateEmitter emitter) {
    // Axon automatically provides the context-aware emitter
    emitter.emit(FetchCardSummaryQuery.class,
                 query -> query.getCardSummaryId().equals(event.getCardId()),
                 updateModel(event));
}

Explicitly creating the emitter using QueryUpdateEmitter#forContext(ProcessingContext) is only necessary when manually registering query handlers or when you need direct control over the emitter creation.

Subscription lifecycle

Subscription queries follow this lifecycle:

  1. Subscription creation: Client calls QueryGateway#subscriptionQuery(Object, Class<R>), which returns a Publisher<R>.

  2. Initial result: The query is dispatched to a handler, and the initial result is emitted.

  3. Update buffering: Updates are buffered until the Publisher is subscribed to.

  4. Update streaming: Once subscribed, buffered and new updates are streamed to the client.

  5. Completion: The subscription completes when:

    • The client cancels the subscription (by disposing the Publisher).

    • The server calls QueryUpdateEmitter.complete() to signal no more updates.

    • An error occurs, completing the subscription exceptionally.

import org.axonframework.messaging.queryhandling.gateway.QueryGateway;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

public class QueryDispatcher {

    public void dispatchFetchCard(QueryGateway queryGateway) {
        String cardId = "...";
        // Client-side subscription query
        Publisher<CardSummary> results = queryGateway.subscriptionQuery(
                new FetchCardSummaryQuery(cardId),
                CardSummary.class
        );

        // Subscribe using Reactor (requires reactor-core dependency)
        Disposable subscription = Flux.from(results)
                                      .doOnNext(summary -> System.out.println("Received: " + summary))
                                      .doOnComplete(() -> System.out.println("No more updates"))
                                      .doOnError(error -> System.err.println("Error: " + error))
                                      .subscribe();

        // Later: cancel the subscription
        subscription.dispose();
    }
}

Update buffer

The QueryBus maintains an update buffer for each subscription query. Updates emitted before the client subscribes to the Publisher are stored in this buffer. Once the client subscribes, buffered updates are delivered first, followed by new updates.

The buffer size is configurable when creating a subscription query:

// Default buffer size based on Reactor's Queues.SMALL_BUFFER_SIZE constant
Publisher<CardSummary> results = queryGateway.subscriptionQuery(
    new FetchCardSummaryQuery(cardId),
    CardSummary.class
);

// Custom buffer size
Publisher<CardSummary> results = queryGateway.subscriptionQuery(
    new FetchCardSummaryQuery(cardId),
    CardSummary.class,
    512  // buffer size
);

If the buffer fills up before the client subscribes, attempting to add more updates will complete the subscription exceptionally. Choose an appropriate buffer size based on your expected update rate and subscription delay.