1

I’m encountering an issue with possible idempotencyBreachException in a service used by Kafka listener.

I’m not entirely sure how to handle this situation correctly. I have a lockExecutor that locks a given customerId and a cache used as an idempotencyRepository. In 99.99% of cases everything will work as expected, but I’m concerned about the scenario where an IdempotencyBreachException eventually occurs and got 2xdata saved in database. Should I simply log the error and handle it manually when it occur, or is there any better way to handle it?

I considered using @Retryable on the processing method, but this creates a problem: the cache key is already consumed, and I know I shouldn’t remove it. Additionally, I’m worried about how retries would interact with rollbacks in this context, as it will be concurrent processing. At the moment, the only solution that comes to mind is to log the exception and handle the case manually if necessary in the future.

@Service
@RequiredArgsConstructor
public class ProcessingEventsServiceImpl {

    private final IdempotencyRepository idempotencyRepository;
    private final TransactionalService transactionalService;
    private final LockExecutor lockExecutor;

    public void process(InboundData inbound) {

        final String key = inbound.getTransactionId();
        final String customerId = inbound.getCustomerId();
        final Provenance provenance = inbound.getDataProvenance();

        lockExecutor.run(customerId, () -> {

            if (idempotencyRepository.isKeyUsed(provenance, key)) {
                return;
            }

            transactionalService.executeTransactionProcess(inbound);
            try {
                idempotencyRepository.markKeyAsUsed(provenance, key);

            } catch (IdempotencyBreachException e) {
                log.error(...);
            }
        });
    }
}
@Component
@RequiredArgsConstructor
public class KafkaListener {

    private final ProcessingService processingService;
    private final DataMapper mapper;

    private static final Logger LOGGER =
            LoggerFactory.getLogger(KafkaListener.class);

    @KafkaListener(topics = "topic")
    public void onEvent(Event event) {
        var data = mapper.mapEventToModel(event);
        processingService.process(data);
    }
}
@Service
@RequiredArgsConstructor
public class TransactionalService {
    private final InboundRepository repository;
    private final InboundMapper mapper;

    @Transactional
    public void executeTransactionalProcess(TransferData inbound) {
        var entity = repository.saveOrUpdate(inbound);
        repository.save(entity);
    }
}

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.