Dead-Letter Queue

When configuring error handling for your event processors, you might want to consider a Dead-Letter Queue to park events that you were unable to handle.

Instead of either logging the error and continuing, or infinitely retrying the current event, a Dead-Letter Queue will park the event in the queue so you can decide to try and handle it again later. In addition, it will prevent handling of later events in the same sequence until the failed event is successfully processed.

Insight and Management
AxonIQ Console provides insight into the Dead-Letter Queue and tools for its management. It’s straightforward to see the dead letters in the queue and decide to retry them or remove them from the queue. You can find more information on the Dead-Letter Queue page of AxonIQ Console.

Note that you cannot share a dead-letter queue between different processing groups. Hence, each processing group you want to enable this behavior for should receive a unique dead-letter queue instance.

Dead-Letter Queues do not support Sagas

Currently, there is no support for using a dead-letter queue for sagas. We’ve taken this decision as we cannot support a sequenced dead lettering approach as we do for regular event handling.

Furthermore, we cannot do this, as a saga’s associations can vary widely between events. Due to this, the sequence of events may change, breaking this level of support. Hence, there’s no way of knowing whether a next event in the stream does or does not belong to a saga.

Event ordering

Axon Framework’s event processors maintain the ordering of events within the same sequence, even when you configure parallel processing. A perfect example when this is a requirement is the need to handle events of the same aggregate in their publishing order. Simply dead lettering one failed event would cause later events in the sequence to be applied to inconsistent state.

So it’s important that a dead-letter queue for events enqueues an event and any following events in the sequence. To that end, the supported dead-letter queue is a so-called SequencedDeadLetterQueue.

Integral to its design is to allow for queueing failed events and events that belong to a faulty sequence. It does so by maintaining a sequence identifier for each event, determined by the sequencing policy.

Implementations

We currently provide the following dead-letter queue implementations:

  • InMemorySequencedDeadLetterQueue - In-memory variant of the dead-letter queue. Useful for testing purposes, but as it does not persist dead letters, it is unsuited for production environments.

  • JpaSequencedDeadLetterQueue - JPA variant of the dead-letter queue. It constructs a dead_letter_entry table where it persists failed-events in. The JPA dead-letter queue is a suitable option for production environments by persisting the dead letters.

  • JdbcSequencedDeadLetterQueue - JDBC variant of the dead-letter queue. It constructs a dead_letter_entry table where it persists failed-events in. The JDBC dead-letter queue is a suitable option for production environments by persisting the dead letters.

  • MongoSequencedDeadLetterQueue - Mongo variant of the dead-letter queue, available via the Mongo Extension.

It constructs a deadletters collection where it persists failed-events in. The MongoDB dead-letter queue is a suitable option for production environments by persisting the dead letters.

Idempotency

Before configuring a SequencedDeadLetterQueue it is vital to validate whether your event handling functions are idempotent. As a processing group consists of several Event Handling Components (as explained in the intro of this chapter), some handlers may succeed in event handling while others will not. As a configured dead-letter queue does not stall event handling, a failure in one Event Handling Component does not cause a rollback for other event handlers. Furthermore, as the dead-letter support is on the processing group level, dead-letter processing will invoke all event handlers for that event within the processing group.

Thus, if your event handlers are not idempotent, processing letters may result in undesired side effects.

Hence, we strongly recommend making your event handlers idempotent when using the dead-letter queue.

The principle of exactly once delivery is no longer guaranteed; at-least-once delivery is the reality to cope with.

Configuration

A JpaSequencedDeadLetterQueue configuration example:

  • Configuration API

  • Spring Boot

public class AxonConfig {
    // omitting other configuration methods...
    public void configureDeadLetterQueue(EventProcessingConfigurer processingConfigurer) {
        // Replace "my-processing-group" for the processing group you want to configure the DLQ on.
        processingConfigurer.registerDeadLetterQueue(
                "my-processing-group",
                config -> JpaSequencedDeadLetterQueue.builder()
                                                     .processingGroup("my-processing-group")
                                                     .maxSequences(256)
                                                     .maxSequenceSize(256)
                                                     .entityManagerProvider(config.getComponent(EntityManagerProvider.class))
                                                     .transactionManager(config.getComponent(TransactionManager.class))
                                                     .serializer(config.serializer())
                                                     .build()
        );
    }
}
@Configuration
public class AxonConfig {
    // omitting other configuration methods...
    @Bean
    public ConfigurerModule deadLetterQueueConfigurerModule() {
        // Replace "my-processing-group" for the processing group you want to configure the DLQ on.
        return configurer -> configurer.eventProcessing().registerDeadLetterQueue(
                "my-processing-group",
                config -> JpaSequencedDeadLetterQueue.builder()
                                                     .processingGroup("my-processing-group")
                                                     .maxSequences(256)
                                                     .maxSequenceSize(256)
                                                     .entityManagerProvider(config.getComponent(EntityManagerProvider.class))
                                                     .transactionManager(config.getComponent(TransactionManager.class))
                                                     .serializer(config.serializer())
                                                     .build()
        );
    }
}

You can set the maximum number of saved sequences (defaults to 1024) and the maximum number of dead letters in a sequence (also defaults to 1024). If either of these thresholds is exceeded, the queue will throw a DeadLetterQueueOverflowException. This exception means the processing group will stop processing new events altogether. Thus, the processing group moves back to the behavior described at the start of the Error Handling section.

Configuration through a provider

To make it easier to use a dead-letter queue on multiple processing groups, it’s possible to set a dead-letter queue provider. The provider is a function that takes a processing group, and returns either null, meaning it will not be configured using a dead-letter queue, or a function that takes the Configuration and returns a new dead-letter queue.

Here is a JpaSequencedDeadLetterQueue configuration example that uses a collection to determine if a dead-letter queue should be created for a given processing group:

  • Configuration API

  • Spring Boot

public class AxonConfig {
    // omitting other configuration methods...
    public void configureDeadLetterQueue(EventProcessingConfigurer processingConfigurer) {
        processingConfigurer.registerDeadLetterQueueProvider(
                processingGroup -> {
                    //dlqEnabledGroups is a collection with the groups that should have a dlq
                    if (dlqEnabledGroups.contains(processingGrouping)) {
                        return config -> JpaSequencedDeadLetterQueue.builder()
                                                             .processingGroup(processingGroup)
                                                             .entityManagerProvider(config.getComponent(
                                                                     EntityManagerProvider.class
                                                             ))
                                                             .transactionManager(config.getComponent(
                                                                     TransactionManager.class
                                                             ))
                                                             .serializer(config.serializer())
                                                             .build();
                    } else {
                        return null;
                    }
                }
        );
    }
}
@Configuration
public class AxonConfig {
    // omitting other configuration methods...
    @Bean
    public ConfigurerModule deadLetterQueueConfigurerModule () {
        return configurer -> configurer.eventProcessing().registerDeadLetterQueueProvider(
                processingGroup -> {
                    //dlqEnabledGroups is a collection with the groups that should have a dlq
                    if (dlqEnabledGroups.contains(processingGrouping)) {
                        return config -> JpaSequencedDeadLetterQueue.builder()
                                                             .processingGroup(processingGroup)
                                                             .entityManagerProvider(config.getComponent(
                                                                     EntityManagerProvider.class
                                                             ))
                                                             .transactionManager(config.getComponent(
                                                                     TransactionManager.class
                                                             ))
                                                             .serializer(config.serializer())
                                                             .build();
                    } else {
                        return null;
                    }
                }
        );
    }
}

If you are using Spring Boot, a default dead-letter queue provider will be set if using JPA, JDBC, or Mongo. The default dead-letter queue provider will use the axon.eventhandling.processors.my-processor.dlq.enabled property to determine whether to return null or a dead-letter queue factory method. For example, by setting the axon.eventhandling.processors.my-processing-group.dlq.enabled to true you would enable the dead-letter queue for the my-processing-group processing group.

Processing sequences

Once you resolve the problem that led to dead lettering events, we can start processing the dead letters. We recommend using the SequencedDeadLetterProcessor interface for this, as it processes an entire dead-letter sequence instead of single dead-letter entries. It will thus ensure the event order is maintained during the retry.

The SequencedDeadLetterProcessor provides two operations to process dead letters:

  1. boolean processAny() - Process the oldest dead-letter sequence. Returns true if it processes a sequence successfully.

  2. boolean process(Predicate<DeadLetter<? extends EventMessage<?>>) - Process the oldest dead-letter sequence matching the predicate. Note that the predicate only filters based on a sequence’s first entry. Returns true if it processes a sequence successfully.

If the processing of a dead letter fails, the event will be offered to the dead-letter queue again. How the dead-lettering process reacts to this depends on the enqueue policy.

You can retrieve a SequencedDeadLetterProcessor from the EventProcessingConfiguration based on a processing group name if you have configured a dead-letter queue for this processing group. Below are a couple of examples of how to process dead-letter sequences:

  • Process the oldest dead-letter sequence matching ErrorEvent

  • Process the oldest dead-letter sequence in the queue

  • Process all dead-letter sequences in the queue

public class DeadletterProcessor {

    private EventProcessingConfiguration config;

    public void retryErrorEventSequence(String processingGroup) {
        config.sequencedDeadLetterProcessor(processingGroup)
              .ifPresent(letterProcessor -> letterProcessor.process(
                      deadLetter -> deadLetter.message().getPayload() instanceof ErrorEvent
              ));
    }
}
public class DeadletterProcessor {

    private EventProcessingConfiguration config;

    public void retryAnySequence(String processingGroup) {
        config.sequencedDeadLetterProcessor(processingGroup)
              .ifPresent(SequencedDeadLetterProcessor::processAny);
    }
}
public class DeadletterProcessor {

    private EventProcessingConfiguration config;

    public void retryAllSequences(String processingGroup) {
        Optional<SequencedDeadLetterProcessor<EventMessage<?>>> optionalLetterProcessor =
                config.sequencedDeadLetterProcessor(processingGroup);
        if (!optionalLetterProcessor.isPresent()) {
            return;
        }
        SequencedDeadLetterProcessor<EventMessage<?>> letterProcessor = optionalLetterProcessor.get();

        // Retrieve all the dead lettered event sequences:
       Iterable<Iterable<DeadLetter<? extends EventMessage<?>>>> deadLetterSequences =
               config.deadLetterQueue(processingGroup)
                     .map(SequencedDeadLetterQueue::deadLetters)
                     .orElseThrow(() -> new IllegalArgumentException("No such Processing Group"));

       // Iterate over all sequences:
       for (Iterable<DeadLetter<? extends EventMessage<?>>> sequence : deadLetterSequences) {
           Iterator<DeadLetter<? extends EventMessage<?>>> sequenceIterator = sequence.iterator();
           String firstLetterId = sequenceIterator.next()
                                                  .message()
                                                  .getIdentifier();

           // SequencedDeadLetterProcessor#process automatically retries an entire sequence.
           // Hence, we only need to filter on the first entry of the sequence:
          letterProcessor.process(deadLetter -> deadLetter.message().getIdentifier().equals(firstLetterId));
       }
    }
}

For some event handlers, it is beneficial to know if the event it is processing is dead-lettered. To that end, you can include a parameter of type DeadLetter<EventMessage<T>> to your event handling methods. The generic refers to the type of event handled by the event handler. The injected DeadLetter parameter exposes several attributes, like the cause() and diagnostics(), for example.

Do note that the DeadLetter parameter is nullable. When the injected DeadLetter is null, you deal with a non-dead-lettered event. If it is not null, the event handling occurs as a follow-up of invoking the process(Predicate<DeadLetter<? extends EventMessage<?>>) or processAny() methods on the SequencedDeadLetterProcessor.

For added clarity, here’s an event handler sample containing a DeadLetter parameter:

@ProcessingGroup("my-processing-group")
class MyProcessingGroup {
    // omitted  services and other event handlers for simplicity...
    @EventHandler
    public void on(SomeEvent event, DeadLetter<EventMessage<SomeEvent>> deadLetter) {
        if (deadLetter != null) {
            // dead-letter processing...
        } else {
            // regular event handling...
        }
    }
}

Attributes

A dead letter contains the following attributes:

attribute type description

message

EventMessage

The EventMessage for which handling failed. The message contains your event, among other Message properties.

cause

Optional<Cause>

The cause for the message to be dead lettered. Empty if the letter is enqueued because it is part of a sequence.

enqueuedAt

Instant

The moment in time when the event was enqueued in a dead-letter queue.

lastTouched

Instant

The moment in time when this letter was last touched. Will equal the enqueuedAt value if this letter is enqueued for the first time.

diagnostics

MetaData

The diagnostic MetaData concerning this letter. Filled through the enqueue policy.

Enqueue policy

By default, when you configure a dead-letter queue and event handling fails, the event is dead-lettered. However, you might not want all event failures to result in dead-lettered entries. Similarly, when letter processing fails, you might want to reconsider whether you want to enqueue the letter again.

To that end, you can configure a so-called EnqueuePolicy. The enqueue policy ingests a DeadLetter and a cause (Throwable) and returns an EnqueueDecision. The EnqueueDecision, in turn, describes if the framework should or should not enqueue the dead letter. It’s also possible to change the exception, for example to be sure that it will fit in the database, as the cause will be stored.

You can customize the dead-letter policy to exclude some events when handling fails. As a consequence, these events will be skipped. Note that Axon Framework invokes the policy on initial event handling and on dead-letter processing.

Reevaluating the policy after processing failed may be essential to ensure a dead letter isn’t stuck in the queue forever. To deal with this scenario, you can attach additional diagnostic information to the dead letter through the policy. For example to add a number of retries to the dead letter to base your decision on. See the sample EnqueuePolicy below for this:

public class CustomEnqueuePolicy implements EnqueuePolicy<EventMessage<?>> {

    @Override
    public EnqueueDecision<EventMessage<?>> decide(DeadLetter<? extends EventMessage<?>> letter, Throwable cause) {
        if (cause instanceof NullPointerException) {
            // It's pointless:
            return Decisions.doNotEnqueue();
        }

        final int retries = (int) letter.diagnostics().getOrDefault("retries", -1);
        if (letter.message().getPayload() instanceof ErrorEvent) {
            // Important and new entry:
            return Decisions.enqueue(cause);
        }
        if (retries < 10) {
            // Let's continue and increase retries:
            return Decisions.requeue(cause, l -> l.diagnostics().and("retries", retries + 1));
        }

        // Exhausted all retries:
        return Decisions.evict();
    }
}

The Decisions utility class provides the most reasonable decisions, but you are free to construct your own EnqueueDecision when necessary. See the following example for configuring our custom policy:

  • Configuration API

  • Spring Boot

public class AxonConfig {
    // omitting other configuration methods...
    public void configureEnqueuePolicy(EventProcessingConfigurer configurer) {
        // Replace "my-processing-group" for the processing group you want to configure the policy on.
        configurer.registerDeadLetterPolicy("my-processing-group", config -> new MyEnqueuePolicy());
    }
}
@Configuration
public class AxonConfig {
    // omitting other configuration methods...
    @Bean
    public ConfigurerModule enqueuePolicyConfigurerModule() {
        // Replace "my-processing-group" for the processing group you want to configure the policy on.
        return configurer -> configurer.eventProcessing()
                                       .registerDeadLetterPolicy("my-processing-group", config -> new MyEnqueuePolicy());
    }
}