Infrastructure
When it comes to dispatching queries, as explained in the Dispatching queries section, there are several implementations for actually sending query messages. The next sections provide an overview of the available implementations, as well as how to set up query dispatching infrastructure with Axon.
All query operations are async-native, returning CompletableFuture or Publisher types. This enables efficient asynchronous processing without blocking threads.
Query gateway
The query gateway is a convenient interface towards the query dispatching mechanism. While you are not required to use a gateway to dispatch queries, it is generally the easiest option to do so.
Axon provides a QueryGateway interface and the DefaultQueryGateway implementation.
The query gateway provides methods for:
-
Point-to-point queries:
query()for single results,queryMany()for multiple results (both returnCompletableFuture) -
Streaming queries:
streamingQuery()returns aPublisherfor streaming large result sets -
Subscription queries:
subscriptionQuery()returns aPublishercombining initial result and updates
All methods support an optional ProcessingContext parameter for correlation data propagation when dispatching from within a message handler.
The query gateway automatically handles message construction, dispatch interceptors, and result conversion. It’s configured with access to the query bus and dispatch interceptors.
Query bus
The query bus is the mechanism that dispatches queries to query handlers.
Queries are registered using the query name (based on MessageType).
Each query name can have only one registered handler.
DistributedQueryBus
The DistributedQueryBus, together with the AxonServerQueryBusConnector, is the default query bus when Axon Server is available.
It connects to Axon Server to send and receive queries in a distributed way, allowing queries to be handled by any node in the cluster.
The DistributedQueryBus combined with the AxonServerQueryBusConnector provides:
-
Distributed query handling: Queries are routed to the appropriate handler across connected applications
-
Load balancing: When the same application is deployed across multiple instances, Axon Server can distribute queries among those instances
-
Subscription query support: Full support for subscription queries with update distribution across nodes
-
Query prioritization: Support for query priority to ensure critical queries are processed first
|
Other |
-
Configuration API
-
Spring Boot
Declare dependencies:
<!--somewhere in the POM file-->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-server-connector</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-messaging</artifactId>
<version>${axon.version}</version>
</dependency>
Configure your application:
import org.axonframework.messaging.core.configuration.MessagingConfigurer;
public class AxonApp {
public static void main(String[] args) {
// Returns a Configurer instance with default components configured.
// `DistributedQueryBus` with `AxonServerQueryBusConnector` is configured as Query Bus by default.
MessagingConfigurer configurer = MessagingConfigurer.create();
}
}
By simply declaring dependency to axon-spring-boot-starter, Axon will automatically configure the AxonServerQueryBusConnector:
<!--somewhere in the POM file-->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-starter</artifactId>
<version>${axon.version}</version>
</dependency>
|
Excluding the Axon Server Connector
If you exclude the |
SimpleQueryBus
The SimpleQueryBus is a local, non-distributed query bus implementation that processes queries in the dispatching thread by default. It’s used when Axon Server is not available or when you explicitly configure it.
The SimpleQueryBus provides:
-
Local query handling: All queries are handled within the same JVM.
-
Simple routing: Routes queries to their registered handler based on query name.
-
Subscription query support: Full support for subscription queries within the same JVM.
-
Transaction management: Integrates with a
TransactionManagerfor transactional query handling.
To configure a SimpleQueryBus (instead of an AxonServerQueryBus):
-
Configuration API
-
Spring Boot
import org.axonframework.messaging.core.configuration.MessagingConfigurer;
import org.axonframework.messaging.queryhandling.SimpleQueryBus;
import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory;
public class AxonConfig {
// omitting other configuration methods...
public void configureQueryBus(MessagingConfigurer configurer) {
configurer.registerQueryBus(
config -> new SimpleQueryBus(config.getComponent(UnitOfWorkFactory.class))
);
}
}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.axonframework.messaging.queryhandling.QueryBus;
import org.axonframework.messaging.queryhandling.SimpleQueryBus;
import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory;
@Configuration
public class AxonConfig {
// omitting other configuration methods...
@Bean
public QueryBus queryBus(UnitOfWorkFactory unitOfWorkFactory) {
return new SimpleQueryBus(unitOfWorkFactory);
}
}
Subscription query infrastructure
Subscription queries allow clients to receive an initial result and then continue receiving updates as long as the subscription is active. The subscription query infrastructure consists of several components working together.
QueryUpdateEmitter
The QueryUpdateEmitter is responsible for emitting updates to active subscription queries. The QueryUpdateEmitter is context-aware and must be created from the ProcessingContext:
import org.axonframework.messaging.queryhandling.QueryUpdateEmitter;
import org.axonframework.messaging.eventhandling.EventHandler;
@Component
public class CardSummaryProjection {
@EventHandler
public void on(CardRedeemedEvent event, ProcessingContext context) {
// Create a context-aware emitter
QueryUpdateEmitter emitter = QueryUpdateEmitter.forContext(context);
// Update the model
CardSummary summary = updateModel(event);
// Emit update to subscription queries
emitter.emit(
FetchCardSummaryQuery.class,
query -> query.cardSummaryId().equals(event.cardId()),
summary
);
}
}
The emitter filters subscription queries based on the provided predicate and emits the update only to matching subscriptions. This allows fine-grained control over which subscribers receive which updates.
|
Automatic emitter creation for annotated methods
When using annotated message handlers (
Explicitly creating the emitter using |
Subscription lifecycle
Subscription queries follow this lifecycle:
-
Subscription creation: Client calls
QueryGateway#subscriptionQuery(Object, Class<R>), which returns aPublisher<R>. -
Initial result: The query is dispatched to a handler, and the initial result is emitted.
-
Update buffering: Updates are buffered until the
Publisheris subscribed to. -
Update streaming: Once subscribed, buffered and new updates are streamed to the client.
-
Completion: The subscription completes when:
-
The client cancels the subscription (by disposing the
Publisher). -
The server calls
QueryUpdateEmitter.complete()to signal no more updates. -
An error occurs, completing the subscription exceptionally.
-
import org.axonframework.messaging.queryhandling.gateway.QueryGateway;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
public class QueryDispatcher {
public void dispatchFetchCard(QueryGateway queryGateway) {
String cardId = "...";
// Client-side subscription query
Publisher<CardSummary> results = queryGateway.subscriptionQuery(
new FetchCardSummaryQuery(cardId),
CardSummary.class
);
// Subscribe using Reactor (requires reactor-core dependency)
Disposable subscription = Flux.from(results)
.doOnNext(summary -> System.out.println("Received: " + summary))
.doOnComplete(() -> System.out.println("No more updates"))
.doOnError(error -> System.err.println("Error: " + error))
.subscribe();
// Later: cancel the subscription
subscription.dispose();
}
}
Update buffer
The QueryBus maintains an update buffer for each subscription query. Updates emitted before the client subscribes to the Publisher are stored in this buffer. Once the client subscribes, buffered updates are delivered first, followed by new updates.
The buffer size is configurable when creating a subscription query:
// Default buffer size based on Reactor's Queues.SMALL_BUFFER_SIZE constant
Publisher<CardSummary> results = queryGateway.subscriptionQuery(
new FetchCardSummaryQuery(cardId),
CardSummary.class
);
// Custom buffer size
Publisher<CardSummary> results = queryGateway.subscriptionQuery(
new FetchCardSummaryQuery(cardId),
CardSummary.class,
512 // buffer size
);
If the buffer fills up before the client subscribes, attempting to add more updates will complete the subscription exceptionally. Choose an appropriate buffer size based on your expected update rate and subscription delay.