I have a spring boot (2.2.4.RELEASE) application where my DB operation is annotated with @ Transactional and I have CustomApplicationListener class.
My listener method is invoked twice during my transaction call. The listener method is invoked when I publish an event and after the transaction method completes.
As per spec, the @TransactionalEventListener method should be invoked after transaction commits but I do get callback twice.
The actual DB transaction method
@KafkaListener(id="${kafka.topic}", topics = "${kafka.topic}",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:1}",
containerFactory = "kafkaListenerContainerFactory",
errorHandler = "errorHandler")
@Transactional(transactionManager = "chainedKafkaTransactionManager")
public void listen(@Payload String payload,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.CONSUMER) KafkaConsumer<String, String> consumer,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
@Header(KafkaHeaders.OFFSET) long offset) throws Exception {
LOGGER.info(">>>>>> Received transactional message(in-transaction) from topic[{}] and payload[{}]",topic, payload);
send(payload);
}
private void send(String payload) throws Exception {
jdbcTemplate.update("insert into task(title) values ('" + payload + "')");
customEventPublisher.publish(payload + "=> Consumed & DB insert is success" ); **// I do get call back as an when this line executes.**
System.out.println(kafkaTemplate.inTransaction());
}
Listener method
@TransactionalEventListener
public void onApplicationEvent(CustomEvent event) {
LOGGER.info("Receving after DB commit so transaction is in sync...");
kafkaTemplate.send(replyTopic, event.getMsg());
LOGGER.info("published after DB commit in sync...");
}
Can you guide me on what configuration goes wrong here?
Comment From: wilkinsona
Thanks for the report. Unfortunately, it's hard to say what is wrong based on the information that you have provided as we can't see everything that is involved. If you would like us to spend some more time investigating, please spend some time providing a complete yet minimal sample that reproduces the problem. You can share it with us pushing it to a separate repository on GitHub or by zipping it up and attaching it to this issue.
Comment From: tamilsmani
you can find the code on the below path.
KafaTransactionConfiguration.java - we received a message from Kafa topic and listener method in (https://github.com/tamilsmani/kafka-demo/blob/master/src/main/java/com/example/config/KafaTransactionConfiguration.java)
and our Main spring boot class in the below path. https://github.com/tamilsmani/kafka-demo/blob/master/src/main/java/com/example/KafkaTranactionDemoApplication.java
As I said earlier, the listener method called twice which is in - https://github.com/tamilsmani/kafka-demo/blob/master/src/main/java/com/example/event/CustomEventListener.java
Can you guide me on what configuration goes wrong here?
Comment From: wilkinsona
Thanks for the sample.
Unfortunately, I'm not sure what I need to do with it to reproduce the problem. The code doesn't seem to compile due to some missing Jackson dependencies. Upon correcting that, I can run it but it doesn't seem to reproduce the behaviour that you have described. Uncommenting the send
call in the main
method results in a failure as MySQL isn't available.
If you would like us to spend some more time investigating, can you please spend some time minimising the sample by stripping out everything that isn't necessary to reproduce the problem? It would also be useful if it used an in-memory database. Failing that some instructions on the necessary MySQL configuration would be good.
Comment From: tamilsmani
Hi Wilkinsona,
Created a standalone running application to test the scenario. Please run the below code and it has embedded DB & @TransactionalEventListener configured. You can see the console log that Listener is called twice.
https://github.com/tamilsmani/kafka-demo/blob/master/src/main/java/com/example/SpringEventListenerDemoApplication.java
Comment From: wilkinsona
Thank you. I can see what is happening now. The problem is that your CustomEventListener
is both an ApplicationListener
and as an event-listening method due to @TransactionalEventListener
on onApplicationEvent()
. This means that onApplicationEvent
is invoked both as part of the ApplicationListener
contract (when the event is published) and the @TransactionalEventListener
contract (when the transaction completes). If you only want the listener to be invoked when the transaction completes, it should not implement ApplicationListener
:
@Component
public class CustomEventListener {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomEventListener.class);
@TransactionalEventListener
public void onApplicationEvent(CustomEvent event) {
LOGGER.info("Event Received - {}", event.getMsg());
}
}
If you have any further questions, please follow up on Stack Overflow or Gitter. As mentioned in the guidelines for contributing, we prefer to use GitHub issues only for bugs and enhancements.
Comment From: tamilsmani
Thanks ...it resolved