Processing Context
The ProcessingContext is the core abstraction for managing the lifecycle of message processing in Axon Framework.
It coordinates all actions performed during the processing of a message, regardless of whether it’s a command, event, or query. As such, it is constructed by any message handling component in Axon Framework, so that you don’t have to.
While you’re unlikely to interact with it directly in most cases, understanding the ProcessingContext helps you leverage Axon’s lifecycle management capabilities when needed.
|
As stated directly above, you are unlikely to interact with the |
Overview
When Axon processes a message, it creates a ProcessingContext that:
-
Manages the processing lifecycle through distinct phases (pre-invocation, invocation, commit, etc.)
-
Stores resources needed during processing (database connections, entity managers, etc.)
-
Coordinates cleanup actions to release resources after processing
-
Provides access to framework components from the configuration
-
Supports async-native processing with
CompletableFuture-based APIs
The ProcessingContext ensures that message processing is atomic. Processing either completes successfully or is rolled back entirely if something goes wrong.
In most cases, Axon’s building blocks automatically manage the ProcessingContext for you.
You typically only need to interact with it when:
-
Registering lifecycle callbacks for cleanup or transaction management not supported by Axon Framework out of the box.
-
Managing custom resources that span multiple processing steps.
-
Implementing advanced interceptors or custom infrastructure components.
-
Single-point of access to Axon’s configured components.
Accessing the processing context
The ProcessingContext is injected as a parameter in your message handlers:
@CommandHandler
public OrderResult handle(PlaceOrderCommand command,
ProcessingContext context) {
// Access the context to register callbacks, manage resources, etc.
context.onCommit(ctx -> {
logger.info("Order placed successfully");
return CompletableFuture.completedFuture(null);
});
return processOrder(command);
}
@EventHandler
public void on(OrderPlacedEvent event,
ProcessingContext context) {
// Register cleanup action
context.doFinally(ctx -> releaseResources());
updateProjection(event);
}
Processing lifecycle
The ProcessingContext manages message processing through a series of phases.
Each phase serves a specific purpose in the processing lifecycle.
Lifecycle phases
The processing lifecycle consists of these phases, executed in order:
| Phase | Order | Purpose |
|---|---|---|
Pre-Invocation |
-10000 |
Execute setup actions before the handler is invoked. Use for validation, security checks, or preparing resources. |
Invocation |
0 |
The main processing phase where your message handler executes. |
Post-Invocation |
10000 |
Execute actions after handler invocation but before commit. Use for post-processing logic that should happen before persistence. |
Prepare-Commit |
20000 |
Prepare resources for commit. Use for validation before the final commit. |
Commit |
30000 |
Commit changes. Use for persisting state, committing transactions, publishing events, or sending messages. |
After-Commit |
40000 |
Execute actions after successful commit. Use for notifications, logging, or triggering follow-up processes. |
Phase execution rules
-
Sequential execution: Phases execute strictly in order from lowest to highest.
-
Parallel actions: Multiple actions within the same phase may execute in parallel.
-
Phase completion: All actions in a phase must complete before moving to the next phase.
-
Failure handling: If any action fails, subsequent phases are skipped and error handlers execute.
Registering lifecycle actions
You can register actions to execute in any phase:
@CommandHandler
public void handle(CreateOrderCommand command,
ProcessingContext context) {
// Asynchronous action registration...
context.onCommit(
ctx -> saveToDatabase(order).thenApply(result -> {
logger.info("Order saved: {}", result);
return null;
})
);
// Synchronous action registration...
context.runOnAfterCommit(
ctx -> notificationService.sendOrderConfirmation(command.orderId())
);
}
All lifecycle methods come in two variants:
-
Async variant (
on*) - For actions that returnCompletableFuture<?> -
Sync variant (
runOn*) - For simple actions with no return value
Available lifecycle methods:
public void registerPhaseHandlers(ProcessingContext context) {
// Async variants
context.onPreInvocation(ctx -> doAsyncSetup());
context.onInvocation(ctx -> handleAsync())
context.onPostInvocation(ctx -> doAsyncPostProcessing());
context.onPrepareCommit(ctx -> validateBeforeCommit());
context.onCommit(ctx -> commitTransaction());
context.onAfterCommit(ctx -> sendNotifications());
// Sync variants
context.runOnPreInvocation(ctx -> doSetup());
context.runOnInvocation(ctx -> handleSync());
context.runOnPostInvocation(ctx -> doPostProcessing());
context.runOnPrepareCommit(ctx -> validate());
context.runOnCommit(ctx -> commit());
context.runOnAfterCommit(ctx -> notify());
}
For synchronous actions that need to return a value to be passed to a following step, wrap the result in a CompletableFuture:
public void registerSyncResultHandlerInFuture(ProcessingContext context) {
context.onCommit(ctx -> {
String result = performSyncOperation();
return CompletableFuture.completedFuture(result);
});
}
Error and completion handlers
Beyond the standard phases, you can register handlers for errors and completion:
@EventHandler
public void on(OrderPlacedEvent event, ProcessingContext context) {
// Register error handler - invoked when any phase fails
context.onError((ctx, phase, error) -> {
logger.error("Processing failed in phase {}: {}", phase, error.getMessage());
rollbackChanges();
publishErrorEvent(event, error);
});
// Register completion handler - invoked when all phases succeed
context.whenComplete(ctx -> {
logger.info("Processing completed successfully");
updateMetrics();
});
// Register finally handler - invoked regardless of success or failure
context.doFinally(ctx -> {
releaseResources();
cleanupTempFiles();
});
processEvent(event);
}
Execution order for error scenarios:
-
Normal phases execute until one fails, then
-
onErrorhandlers execute, then -
whenCompletehandlers are skipped, then -
doFinallyhandlers execute.
Execution order for success scenarios:
-
All normal phases execute successfully, then
-
onErrorhandlers are skipped, then -
whenCompletehandlers execute, then -
doFinallyhandlers execute.
Resource management
The ProcessingContext provides type-safe resource management for storing and retrieving resources needed during message processing. This allows Axon Framework to add resources in an early phase of the ProcessingContext, which is then accessed during a later phase of the context’s lifecycle.
Resource keys
Resources are identified using type-safe keys:
import org.axonframework.messaging.core.Context.ResourceKey;
// Define resource keys
ResourceKey<EntityManager> EM_KEY = ResourceKey.withLabel("EntityManager");
ResourceKey<Connection> DB_CONN = ResourceKey.withLabel("DatabaseConnection");
ResourceKey<List<String>> TAGS = ResourceKey.withLabel("Tags");
Using typed keys prevents casting errors and provides IDE auto-completion support.
Storing and retrieving resources
@CommandHandler
public void handle(CreateOrderCommand command, ProcessingContext context) {
// Add or replace resource
EntityManager em = entityManagerFactory.createEntityManager();
context.putResource(EM_KEY, em);
// Get resource
EntityManager retrieved = context.getResource(EM_KEY);
// Check if resource exists
if (context.containsResource(EM_KEY)) {
// Resource is available
}
// Get or create resource (compute if absent)
Connection conn = context.computeResourceIfAbsent(
DB_CONN, () -> dataSource.getConnection()
);
// Register cleanup
context.doFinally(ctx -> {
EntityManager manager = ctx.removeResource(EM_KEY);
if (manager != null) {
manager.close();
}
});
}
Resource lifecycle pattern
A common pattern for managing resources:
// Note that this is an advanced example!
// Axon Framework typically deals with transaction and connection details for you, so you don't have too.
// That makes thi example useful ONLY if you are using a storage solution that is not supported by Axon Framework out of the box.
public class TransactionInterceptor implements MessageHandlerInterceptor<CommandMessage> {
private static final ResourceKey<Transaction> TX_KEY = ResourceKey.withLabel("Transaction");
@Override
public MessageStream<?> interceptOnHandle(
CommandMessage command,
ProcessingContext context,
MessageHandlerInterceptorChain<CommandMessage> chain
) {
// Create resource in pre-invocation
context.runOnPreInvocation(ctx -> {
Transaction tx = transactionManager.beginTransaction();
ctx.putResource(TX_KEY, tx);
});
// Commit in commit phase
context.onCommit(ctx -> {
Transaction tx = ctx.getResource(TX_KEY);
return tx.commitAsync();
});
// Rollback on error
context.onError((ctx, phase, error) -> {
Transaction tx = ctx.getResource(TX_KEY);
if (tx != null) {
tx.rollback();
}
});
// Cleanup in finally
context.doFinally(ctx -> {
Transaction tx = ctx.removeResource(TX_KEY);
if (tx != null) {
tx.close();
}
});
return chain.proceed(command, context);
}
}
Updating resources
You can update existing resources using the update method:
@EventHandler
public void on(OrderPlacedEvent event, ProcessingContext context) {
// Process event...
// and update resource using a function
context.updateResource(TAGS, tags -> {
if (tags == null) {
tags = new ArrayList<>();
}
tags.add("processed");
return tags;
});
}
Accessing the current message
One of the default resources Axon Framework adds for you to the ProcessingContext, is the entire Message:
@CommandHandler
public void handle(MyCommand command, ProcessingContext context) {
// Retrieve the current message
Message message = Message.fromContext(context);
// Cast to specific message type if needed
CommandMessage commandMessage = (CommandMessage) message;
// Access message properties
String messageId = message.identifier();
Metadata metadata = message.metadata();
Instant timestamp = ((EventMessage) message).timestamp(); // For events
}
Accessing configured components
The ProcessingContext provides access to components registered in the Axon Framework configuration:
@EventHandler
public void on(OrderPlacedEvent event, ProcessingContext context) {
// Access framework components
EventBus eventBus = context.component(EventBus.class);
CommandGateway gateway = context.component(CommandGateway.class);
// Access named components
QueryBus queryBus = context.component(QueryBus.class, "myQueryBus");
// Use components
gateway.send(new ProcessOrderCommand(event.getOrderId()));
}
This is particularly useful in interceptors or custom infrastructure components where you need to access framework services.
Context propagation
When dispatching new messages from within a handler, it is strongly recommended you do some from with the active ProcessingContext to maintain correlation data and share resources.
Using EventAppender
EventAppender automatically uses the current processing context when publishing events:
@CommandHandler
public void handle(PlaceOrderCommand command,
EventAppender appender) {
// EventAppender uses the context automatically
// Metadata and correlation data from context are propagated
appender.append(new OrderPlacedEvent(command.getOrderId()));
}
Using CommandDispatcher
CommandDispatcher is automatically context-aware when injected into handlers:
@EventHandler
public CompletableFuture<Void> on(OrderPlacedEvent event,
CommandDispatcher commandDispatcher) {
// CommandDispatcher is already bound to the current ProcessingContext
// Correlation data propagates automatically
CommandResult result = commandDispatcher.send(
new ProcessOrderCommand(event.getOrderId())
);
// Return the CompletableFuture so the handler only completes when the command finishes
return result.toCompletableFuture()
.thenAccept(r -> logger.info("Command processed successfully"))
.exceptionally(ex -> {
logger.error("Command failed: {}", ex.getMessage());
return null;
});
}
Creating branched contexts
You can create a new context that shares resources from a parent context:
@EventHandler
public void on(OrderCreatedEvent event, ProcessingContext context) {
// Create branched context with additional resource
ProcessingContext enrichedContext = context.withResource(
CORRELATION_ID_KEY,
event.getOrderId()
);
// The enriched context has all resources from the original context
// plus the new correlation ID
// Lifecycle callbacks registered on either context affect both
}
Advanced usage
Although using the ProcessingContext directly already counts as an advanced use case for the typical Axon Framework users, there are even more advanced usages when you are interacting with it directly.
Custom phases
You can define custom phases to insert logic at specific points in the lifecycle. As Axon Framework itself will never use your custom phases to register operation, you are certain your tasks are the only ones running during that point in time:
class CustomPhaseInterceptor implements MessageHandlerInterceptor<Message> {
@Nonnull
@Override
public MessageStream<?> interceptOnHandle(
@Nonnull Message message,
@Nonnull ProcessingContext context,
@Nonnull MessageHandlerInterceptorChain<Message> chain
) {
// Order between PRE_INVOCATION (order = -10000) and INVOCATION (order = 0) phase.
ProcessingLifecycle.Phase customPhase = () -> -5000;
context.on(customPhase, ctx -> {
// Custom logic here
performCustomSetup();
return CompletableFuture.completedFuture(null);
});
return chain.proceed(message, context);
}
}
Checking lifecycle state
You can query the current state of the processing context:
@EventHandler
public void on(OrderPlacedEvent event, ProcessingContext context) {
if (context.isStarted()) {
// Processing has started
}
if (context.isError()) {
// An error occurred during processing
}
if (context.isCommitted()) {
// Processing committed successfully
}
if (context.isCompleted()) {
// Processing completed (success or failure)
}
// Process event...
}