Query Dispatchers

The dispatching of a query is the starting point of such a message. Queries have to be dispatched, just like any type of message, before they can be handled. To that end Axon provides two interfaces:

  1. The QueryBus, and

  2. the QueryGateway.

This page shows how and when to use these components. How to configure and specifics on the gateway and bus implementations are discussed here.

The query bus and query gateway

The QueryBus is the mechanism that dispatches queries to query handlers. Queries are registered using the query name.

For those familiar with previous iterations of Axon Framework, it does not support scatter-gather queries where multiple handlers respond to the same query. If you rely heavily on scatter-gather queries, please provide feedback at this GitHub issue.

As a workaround, if you need to collect information from multiple sources, consider using multiple separate queries or implementing an aggregating query handler that coordinates the data collection internally.

The QueryGateway, as described in more detail here, is a convenient interface towards the query dispatching mechanism. While you are not required to use a gateway to dispatch queries, it is generally the easiest option to do so. It abstracts certain aspects for you, like the necessity to wrap a Query payload in a Query Message.

The QueryBus, as explained in more detail here, is a low-level infrastructure component. For typical application code, prefer using the QueryGateway, which provides a more convenient API and handles message creation for you.

Axon Framework supports three types of queries:

Point-to-point queries

A point-to-point query represents a query request to a single query handler. If no handler is found for a given query, a NoHandlerForQueryException is thrown.

Query handler example

First, define the query message with the @Query annotation:

import org.axonframework.messaging.queryhandling.annotation.Query;

@Query(namespace = "inventory", name = "FetchItems")
public class FetchItemsQuery {
    private final String criteria;

    public FetchItemsQuery(String criteria) {
        this.criteria = criteria;
    }

    public String getCriteria() {
        return criteria;
    }
}

Then write the query handler:

@QueryHandler (1)
public List<String> query(FetchItemsQuery query) { (2)
    // return the query result based on given criteria
    return repository.findItems(query.getCriteria());
}
1 The @QueryHandler annotation marks this method as a query handler.
2 The first parameter must be the @Query annotated message class. The handler will process queries matching the query name defined by the @Query annotation (inventory.FetchItems in this case). If you want to use a different parameter type (for example, Map<String, Object>), you must specify queryName in the @QueryHandler annotation.

Dispatching with QueryGateway

To query our view model using the QueryGateway:

Single result query

// Query for a single result
CompletableFuture<String> result = queryGateway.query(
    new FetchItemQuery("item-123"),
    String.class
);

result.thenAccept(item -> System.out.println("Item: " + item));

Multiple results query

// Query for multiple results
CompletableFuture<List<String>> results = queryGateway.queryMany(
    new FetchItemsQuery("criteria"),
    String.class
);

results.thenAccept(items -> items.forEach(System.out::println));
Async-native API

All query methods in Axon 5 return CompletableFuture, allowing you to:

  • Chain additional operations after receiving results

  • Handle errors gracefully

  • Integrate with reactive frameworks

The result is returned asynchronously regardless of the QueryBus implementation. However, if a @QueryHandler annotated function’s return type is CompletableFuture, the result will be fully asynchronous.

Dispatching with ProcessingContext

When dispatching queries from within a message handler (such as an event handler or command handler), you should provide the ProcessingContext to maintain correlation data:

@EventHandler
public void on(OrderPlacedEvent event,
               ProcessingContext context,
               QueryGateway queryGateway) {
    // Dispatch query with ProcessingContext for correlation
    queryGateway.query(
        new FetchInventoryQuery(event.getProductId()),
        Inventory.class,
        context
    ).thenAccept(inventory -> {
        // Handle inventory result...
    });
}

Providing the ProcessingContext ensures that:

  • Correlation data (trace ID, correlation ID, causation ID) flows from one message to another

  • The query dispatch is part of the same processing lifecycle

  • Distributed tracing works correctly across message boundaries

When dispatching from outside a message handler (for example, from an HTTP endpoint), you can omit the ProcessingContext or pass null:

@RestController
public class ItemController {

    private final QueryGateway queryGateway;

    public ItemController(QueryGateway queryGateway) {
        this.queryGateway = queryGateway;
    }

    @GetMapping("/items/{id}")
    public CompletableFuture<ItemDTO> getItem(@PathVariable String id) {
        return queryGateway.query(
            new FetchItemQuery(id),
            ItemDTO.class
            // No ProcessingContext needed when dispatching from HTTP endpoint
        );
    }
}

Subscription queries

The subscription query allows a client to get the initial state of the model it wants to query, and to stay up-to-date as the queried view model changes. In short, it is an invocation of a point-to-point query with the possibility to be updated when the initial state changes. To update a subscription with changes to the model, we use the QueryUpdateEmitter component provided by Axon.

Query handler example

Let’s look at a query handler that provides the initial state:

@QueryHandler
public List<CardSummary> handle(FetchCardSummariesQuery query) {
    return entityManager
        .createNamedQuery("CardSummary.fetch", CardSummary.class)
        .setParameter("idStartsWith", query.getFilter().getIdStartsWith())
        .setFirstResult(query.getOffset())
        .setMaxResults(query.getLimit())
        .getResultList();
}

Emitting updates

Once our GiftCard gets redeemed, we update any component interested in the updated state. We achieve this by emitting an update using the QueryUpdateEmitter component within the event handler:

@EventHandler
public void on(RedeemedEvent event, QueryUpdateEmitter emitter) {
    (1)
    CardSummary summary = entityManager.find(CardSummary.class, event.getId());
    summary.setRemainingValue(summary.getRemainingValue() - event.getAmount());

    (2)
    emitter.emit(
        FetchCardSummariesQuery.class, (3)
        query -> event.getId().startsWith(query.getFilter().getIdStartsWith()), (4)
        summary (5)
    );
}
1 First, we update our view model by updating the existing card.
2 If there is a subscription query interested in updates about this specific GiftCard, we emit an update.
3 The first parameter is the type of the query (FetchCardSummariesQuery in our case) which corresponds to the query type in the query handler.
4 The second parameter is a predicate that selects which subscription queries to update. In our case, we only update subscription queries interested in the GiftCard that has been updated.
5 The third parameter is the actual update, which in our case is the card summary.

Issuing a subscription query

Once we have the query handling and emitting side implemented, we can issue a subscription query to get the initial state of the GiftCard and be updated once it is redeemed:

(1)
commandGateway.sendAndWait(new IssueCardCommand("gc1", amount));

(2)
FetchCardSummariesQuery query =
    new FetchCardSummariesQuery(offset, limit, filter);

(3)
Publisher<CardSummary> results = queryGateway.subscriptionQuery(
    query,
    CardSummary.class
);

(4)
Disposable subscription = Flux.from(results)
    .subscribe(
        cardSummary -> System.out.println("Received: " + cardSummary),
        error -> System.err.println("Error: " + error),
        () -> System.out.println("Completed")
    );

(5)
commandGateway.sendAndWait(new RedeemCardCommand("gc1", amount));

(6)
// When done, cancel the subscription
subscription.dispose();
1 Issue a GiftCard with gc1 id and initial value of amount.
2 Create a subscription query to get the list of GiftCards and be updated once the state of GiftCard with id gc1 is changed.
3 Send the subscription query via the QueryGateway. The method returns a Publisher that combines both the initial results and any updates.
4 Subscribe to the Publisher to receive the initial state and all subsequent updates. The subscription is active until explicitly cancelled or the emitting side completes it.
5 When we issue a RedeemCardCommand, our event handler in the projection will be triggered, which will result in the emission of an update.
6 When done with the subscription, cancel it by disposing the subscription. This automatically closes the subscription query on the server side.
Subscription query as Publisher

In Axon 5, subscription queries return a single Publisher that streams both initial results and updates. The Publisher automatically combines initial state (0, 1, or N results) with any subsequent updates. When you cancel the subscription (by disposing the Disposable returned from subscribe()), it automatically closes the subscription query on the server side.

Mandatory dependency

The reactor-core dependency is mandatory for usage of subscription queries. Axon Framework uses Project Reactor internally to build the Publisher for subscription queries.

Streaming queries

The streaming query allows a client to stream large result sets without loading everything into memory at once. The streaming query relies on the reactive stream model, specifically the Publisher type.

The streaming query is flexible enough to handle any query return type. Any return type that is not a Publisher will automatically be converted to a Publisher. The Publisher will emit one or multiple items based on the query handler.

The QueryGateway provides the streamingQuery method to utilize the streaming query. It’s simple to use and requires just two parameters: the query payload and the expected response type class. Note that the streamingQuery method is lazy, meaning the query is sent once the Publisher is subscribed to.

Streaming query example

@QueryHandler
public List<CardSummary> handle(FetchCardSummariesQuery query) {
    return cardRepository.findAll(); (1)
}

public Publisher<CardSummary> consumer() {
    return queryGateway.streamingQuery(query, CardSummary.class); (2)
}
1 We query the cardRepository for all the cards. The repository can potentially return a result set containing thousands of items.
2 We use the queryGateway to issue the streaming query. If we used a point-to-point query with queryMany, we would get an extensive list transferred as a single result message over the network. This result can potentially cause a buffer overflow or maximum message size violation. Instead, we use streamingQuery, which converts our response to a stream and chunks the result into smaller messages containing the CardSummary instances.

Native streaming with Flux

For fine-grained control of the producing stream, you can use Project Reactor’s Flux as the return type:

@QueryHandler
public Flux<CardSummary> handle(FetchCardSummariesQuery query) {
    return reactiveCardRepository.findAll();
}

When using a Flux as the return type, you can control backpressure, stream cancellation, and implement more complex features like pagination.

Transaction leaking concerns

Once a consumer of the streaming query receives the Publisher to subscribe to, the transaction will be considered completed successfully. That means that any subsequent messages on the stream will not be part of the transaction, including errors. As the transaction is already over, an error will not be propagated to the parent transaction to invoke any rollback method.

This has the implication that the streaming query should not be used within a ProcessingContext (within message handlers or any other transactional methods) to chain other transactional actions (like sending a command or query).

Streaming back-pressure

Back-pressure (flow control) is an essential feature in reactive systems that allows consumers to control the data flow, ensuring they are not overwhelmed by the producer. The streaming query implements a pull-based back-pressure strategy, which means that the producer will emit data when the consumer is ready to receive it.

If you are using Axon Server, see the Axon Server flow control documentation for more information.

Cancellation

The streaming query can be implemented as an infinite stream. Hence, it’s important to cancel it once the client is not interested in receiving any more data.

The following sample shows how this could be achieved:

public Publisher<CardSummary> consumer() {
    return Flux.from(queryGateway.streamingQuery(query, CardSummary.class))
               .take(100)
               .takeUntil(message -> somePredicate.test(message));
}

The example above shows how the take operator limits the number of items to be emitted.

Error handling

A producer that produces an error by calling onError(Throwable) will terminate the handler execution. The consumer will, in turn, have its onError(Throwable) subscription handler called.

Note that exceptions do not flow upstream (from consumer to producer). If an error happens on the consumer side, the consumer error will trigger a cancel signal propagated to the producer. This signal will effectively cancel the stream without the producer knowing the reason.

Hence, it’s recommended to set a timeout on the query handler’s side in case of a finite stream. Essentially to protect against malfunctioning consumers or producers.

@QueryHandler
public Flux<CardSummary> handle(FetchCardSummariesQuery query) {
    return reactiveCardRepository.findAll()
                                 .timeout(Duration.ofSeconds(5));
}

The example above shows how the timeout operator is used to cancel a request if no responses have been observed during a five-second timespan.

Mandatory dependency

The reactor-core dependency is mandatory for usage of streaming queries. However, it is a compile time dependency and is not required for other Axon features.

Configuration

Both QueryGateway and QueryBus are automatically configured by Axon Framework.

Spring Boot configuration

When using Spring Boot with Axon, both components are available as beans and can be injected directly:

@Service
public class ItemService {

    private final QueryGateway queryGateway;

    public ItemService(QueryGateway queryGateway) {
        this.queryGateway = queryGateway;
    }

    public CompletableFuture<ItemDTO> findItem(String itemId) {
        return queryGateway.query(
            new FetchItemQuery(itemId),
            ItemDTO.class
        );
    }
}

Configuration API

When configuring Axon programmatically, you can access both components from the configuration:

ApplicationConfigurer configurer = DefaultConfigurer.defaultConfiguration();

configurer.messaging(messagingConfigurer -> {
    // QueryBus is registered by default, but can be customized
    messagingConfigurer.registerQueryBus(config -> new CustomQueryBus());
});

Configuration configuration = configurer.start();
QueryGateway queryGateway = configuration.getComponent(QueryGateway.class);
QueryBus queryBus = configuration.getComponent(QueryBus.class);

Summary

Axon Framework provides multiple ways to dispatch queries:

Query Type Use Case Key Features

Point-to-point

Single handler, single or multiple results

query() for single result, queryMany() for list results

Subscription

Initial state + updates over time

Reactive Flux for initial results and updates, requires reactor-core

Streaming

Large result sets, reactive streams

Back-pressure support, cancellation, chunked results, requires reactor-core

In most cases, you should use:

  • QueryGateway for dispatching queries from your application code

  • Provide ProcessingContext when dispatching from within message handlers

  • Omit ProcessingContext (or pass null) when dispatching from HTTP endpoints or scheduled tasks