Infrastructure

When it comes to dispatching queries, as explained in the Dispatching queries section, there are several implementations for actually sending query messages. The next sections provide an overview of the available implementations, as well as how to set up query dispatching infrastructure with Axon.

All query operations are async-native, returning CompletableFuture or Publisher types. This enables efficient asynchronous processing without blocking threads.

Query gateway

The query gateway is a convenient interface towards the query dispatching mechanism. While you are not required to use a gateway to dispatch queries, it is generally the easiest option to do so.

Axon provides a QueryGateway interface and the DefaultQueryGateway implementation. The query gateway provides methods for:

  • Point-to-point queries: query() for single results, queryMany() for multiple results (both return CompletableFuture)

  • Streaming queries: streamingQuery() returns a Publisher for streaming large result sets

  • Subscription queries: subscriptionQuery() returns a Publisher combining initial result and updates

All methods support an optional ProcessingContext parameter for correlation data propagation when dispatching from within a message handler.

The query gateway automatically handles message construction, dispatch interceptors, and result conversion. It’s configured with access to the query bus and dispatch interceptors.

Query bus

The query bus is the mechanism that dispatches queries to query handlers. Queries are registered using the query name (based on MessageType). Each query name can have only one registered handler.

DistributedQueryBus

The DistributedQueryBus, together with the AxonServerQueryBusConnector, is the default query bus when Axon Server is available. It connects to Axon Server to send and receive queries in a distributed way, allowing queries to be handled by any node in the cluster.

The DistributedQueryBus combined with the AxonServerQueryBusConnector provides:

  • Distributed query handling: Queries are routed to the appropriate handler across connected applications

  • Load balancing: When the same application is deployed across multiple instances, Axon Server can distribute queries among those instances

  • Subscription query support: Full support for subscription queries with update distribution across nodes

  • Query prioritization: Support for query priority to ensure critical queries are processed first

Other QueryBusConnector implementations will follow in the future. Through those, distributing query messages without Axon Server will become a possibility.

  • Configuration API

  • Spring Boot

Declare dependencies:

<!--somewhere in the POM file-->
<dependency>
    <groupId>org.axonframework</groupId>
    <artifactId>axon-server-connector</artifactId>
    <version>${axon.version}</version>
</dependency>
<dependency>
    <groupId>org.axonframework</groupId>
    <artifactId>axon-messaging</artifactId>
    <version>${axon.version}</version>
</dependency>

Configure your application:

import org.axonframework.messaging.core.configuration.MessagingConfigurer;

public class AxonApp {

    public static void main(String[] args) {
        // Returns a Configurer instance with default components configured.
        // `DistributedQueryBus` with `AxonServerQueryBusConnector` is configured as Query Bus by default.
        MessagingConfigurer configurer = MessagingConfigurer.create();
    }
}

By simply declaring dependency to axon-spring-boot-starter, Axon will automatically configure the AxonServerQueryBusConnector:

<!--somewhere in the POM file-->
<dependency>
    <groupId>org.axonframework</groupId>
    <artifactId>axon-spring-boot-starter</artifactId>
    <version>${axon.version}</version>
</dependency>
Excluding the Axon Server Connector

If you exclude the axon-server-connector dependency you will fall back to the non-AxonServer query bus option, the SimpleQueryBus (see below).

SimpleQueryBus

The SimpleQueryBus is a local, non-distributed query bus implementation that processes queries in the dispatching thread by default. It’s used when Axon Server is not available or when you explicitly configure it.

The SimpleQueryBus provides:

  • Local query handling: All queries are handled within the same JVM.

  • Simple routing: Routes queries to their registered handler based on query name.

  • Subscription query support: Full support for subscription queries within the same JVM.

  • Transaction management: Integrates with a TransactionManager for transactional query handling.

To configure a SimpleQueryBus (instead of an AxonServerQueryBus):

  • Configuration API

  • Spring Boot

import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.queryhandling.SimpleQueryBus;
import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory;

public class AxonConfig {
    // omitting other configuration methods...
    public void configureQueryBus(MessagingConfigurer configurer) {
        configurer.registerQueryBus(
                config ->  new SimpleQueryBus(config.getComponent(UnitOfWorkFactory.class))
        );
    }
}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.axonframework.messaging.queryhandling.QueryBus;
import org.axonframework.messaging.queryhandling.SimpleQueryBus;
import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory;

@Configuration
public class AxonConfig {
    // omitting other configuration methods...
    @Bean
    public QueryBus queryBus(UnitOfWorkFactory unitOfWorkFactory) {
        return new SimpleQueryBus(unitOfWorkFactory);
    }
}

Subscription query infrastructure

Subscription queries allow clients to receive an initial result and then continue receiving updates as long as the subscription is active. The subscription query infrastructure consists of several components working together.

QueryUpdateEmitter

The QueryUpdateEmitter is responsible for emitting updates to active subscription queries. The QueryUpdateEmitter is context-aware and must be created from the ProcessingContext:

import org.axonframework.messaging.queryhandling.QueryUpdateEmitter;
import org.axonframework.messaging.eventhandling.EventHandler;

@Component
public class CardSummaryProjection {

    @EventHandler
    public void on(CardRedeemedEvent event, ProcessingContext context) {
        // Create a context-aware emitter
        QueryUpdateEmitter emitter = QueryUpdateEmitter.forContext(context);

        // Update the model
        CardSummary summary = updateModel(event);

        // Emit update to subscription queries
        emitter.emit(
            FetchCardSummaryQuery.class,
            query -> query.cardSummaryId().equals(event.cardId()),
            summary
        );
    }
}

The emitter filters subscription queries based on the provided predicate and emits the update only to matching subscriptions. This allows fine-grained control over which subscribers receive which updates.

Automatic emitter creation for annotated methods

When using annotated message handlers (@EventHandler, @QueryHandler, etc.), Axon Framework automatically handles the QueryUpdateEmitter creation for you through parameter resolution. You can simply inject it as a method parameter without explicitly creating it from the ProcessingContext:

@EventHandler
public void on(CardRedeemedEvent event, QueryUpdateEmitter emitter) {
    // Axon automatically provides the context-aware emitter
    emitter.emit(FetchCardSummaryQuery.class,
                 query -> query.getCardSummaryId().equals(event.getCardId()),
                 updateModel(event));
}

Explicitly creating the emitter using QueryUpdateEmitter#forContext(ProcessingContext) is only necessary when manually registering query handlers or when you need direct control over the emitter creation.

Subscription lifecycle

Subscription queries follow this lifecycle:

  1. Subscription creation: Client calls QueryGateway#subscriptionQuery(Object, Class<R>), which returns a Publisher<R>.

  2. Initial result: The query is dispatched to a handler, and the initial result is emitted.

  3. Update buffering: Updates are buffered until the Publisher is subscribed to.

  4. Update streaming: Once subscribed, buffered and new updates are streamed to the client.

  5. Completion: The subscription completes when:

    • The client cancels the subscription (by disposing the Publisher).

    • The server calls QueryUpdateEmitter.complete() to signal no more updates.

    • An error occurs, completing the subscription exceptionally.

import org.axonframework.messaging.queryhandling.gateway.QueryGateway;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

public class QueryDispatcher {

    public void dispatchFetchCard(QueryGateway queryGateway) {
        String cardId = "...";
        // Client-side subscription query
        Publisher<CardSummary> results = queryGateway.subscriptionQuery(
                new FetchCardSummaryQuery(cardId),
                CardSummary.class
        );

        // Subscribe using Reactor (requires reactor-core dependency)
        Disposable subscription = Flux.from(results)
                                      .doOnNext(summary -> System.out.println("Received: " + summary))
                                      .doOnComplete(() -> System.out.println("No more updates"))
                                      .doOnError(error -> System.err.println("Error: " + error))
                                      .subscribe();

        // Later: cancel the subscription
        subscription.dispose();
    }
}

Update buffer

The QueryBus maintains an update buffer for each subscription query. Updates emitted before the client subscribes to the Publisher are stored in this buffer. Once the client subscribes, buffered updates are delivered first, followed by new updates.

The buffer size is configurable when creating a subscription query:

// Default buffer size based on Reactor's Queues.SMALL_BUFFER_SIZE constant
Publisher<CardSummary> results = queryGateway.subscriptionQuery(
    new FetchCardSummaryQuery(cardId),
    CardSummary.class
);

// Custom buffer size
Publisher<CardSummary> results = queryGateway.subscriptionQuery(
    new FetchCardSummaryQuery(cardId),
    CardSummary.class,
    512  // buffer size
);

If the buffer fills up before the client subscribes, attempting to add more updates will complete the subscription exceptionally. Choose an appropriate buffer size based on your expected update rate and subscription delay.