In this article, we explore the complexities of handling transactions across Kafka and relational databases in Spring Boot applications.
The traditional approach, using ChainedKafkaTransactionManager, has been deprecated in 2021.
But worry not, the solution is elegantly simple thanks to the guidance from Gary Russell, the leading contributor to the Spring Kafka project.
We'll dive into consumer-initiated transactions and see how to configure our application properly for this use case. We will also demystify what happens "Under the Boot" by observing logs to understand how transactions are managed and committed.
The Solution
Well, the solution is simple. Fortunately, Gary Russel, the main Spring Kafka contributor, didn't leave us hanging here.
In the ticket in which he deprecated the Chained Kafka Transaction Manager, he also led us to what is the recommended approach for anyone who relied on the deprecated class:
For consumer-initiated transactions, annotate the listener method with
@Transactional
; the container (configured with a KTM) starts the kakfa transaction and the transaction interceptor starts the DB transaction. This provides virtually the same functionality as the Chained KTM in that the DB transaction will commit or roll back just before the Kafka transaction.For producer-initiated transactions, Transaction Synchronization already works; if another transaction is in process when the transactional KafkaTemplate is called, the template will synchronize the Kafka transaction with the existing transaction.
And he also left documentation in the repository with examples on how to implement it. The solution is simple.
Enable Kafka Transaction Manager
First of all, we need to enable Spring Kafka's Kafka Transaction Manager. We can do it by simply setting the transactional id property in our application.properties
:
spring.kafka.producer.transaction-id-prefix=tx-
By setting this property, Spring Boot will automatically configure the Kafka Transaction Manager. (Documentation)
Configure your Listener Containers to use Kafka Transaction Manager
Spring Boot will also configure our listener containers to use the auto configured KafkaTransactionManager
. This will ensure that our listeners will automatically begin a Kafka transaction when receiving a new message.
Using the @Transactional Annotation
Here, when implementing our listener, we must ensure we are also annotating the method with the @Transactional
annotation. This annotation will tell our application to begin a JPA transaction upon receiving a message.
@KafkaListener(id = "group1", topics = "topic1")
@Transactional("transactionManager")
public void listen1(String in) {
log.info("Received from topic1: {}", in);
log.info("Sending to topic2: {}", in.toUpperCase());
kafkaTemplate.send("topic2", in.toUpperCase());
log.info("Writing to database: {}", in);
demoRepository.save(
DemoEntity.builder()
.name(in)
.timestamp(System.currentTimeMillis())
.build()
);
}
The trick here is giving transactionManager as the value for the @Transactional
annotation. This is because there will be two transaction managers available: transactionManager
and kafkaTransactionManager
.
The transactionManager
bean is an instance of JpaTransactionManager while the kafkaTransactionManager
bean is an instance of KafkaTransactionManager.
And the reason why we want to give the transactionManager
as the value for the @Transactional
annotation is because our KafkaMessageListenerContainer
is already creating transactions for us on consuption. Whenever a new message comes in, it will automatically begin a Kafka transaction before it starts running our method.
Therefore, all we have to do is tell Spring Boot to, before our method is run, to also begin a transaction, but at this time, for the JpaTransactionManager
.
And that’s it!
There’s nothing else we have to do. When receiving new messages, Spring Kafka will automatically begin a Kafka Transaction and after that, the @Transactional
annotation will create a JPA Transaction. If the JPA Transaction fails, the Kafka Transaction will also fail and be rolled back.
Under the Boot
Let’s not just take my word for granted. Let’s also look at the logs under different scenarios to see what’s actually going on in our application.
First of all, let’s tell Spring Boot to change the logging level of a few classes:
logging:
level:
org:
apache:
kafka: error
orm:
jpa:
JpaTransactionManager: trace
kafka:
transaction: trace
listener:
KafkaMessageListenerContainer: error
Here, we want to mute some noise generated by the org.springframework.kafka.listener.KafkaMessageListenerContainer
class and the org.apache.kafka
package. We do it by setting them to error
.
Then, we want to actually see more information about the JpaTransactionManager
class and the org.springframework.transaction
and the org.springframework.kafka.transaction
packages. Therefore, we set them to trace.
Now, when our application gets started, we want to send a message to Kafka:
@Bean
public ApplicationRunner sendKafkaMessage(KafkaTemplate<String, String> template) {
return args -> template.executeInTransaction(t -> t.send("topic1", "test"));
}
This will trigger our listener method and we will be able to see what’s actually going on under the “Boot”.
@Transactional(“transactionManager”)
Let’s try first by using the transactionManager
as the value of our @Transactional
annotation.
These are the logs that were generated:
14:28:33.833 DEBUG o.s.k.t.KafkaTransactionManager - Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
14:28:33.834 DEBUG o.s.k.t.KafkaTransactionManager - Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@685b500f]]
14:28:33.839 DEBUG o.s.o.j.JpaTransactionManager - Creating new transaction with name [demo.kafka.demokafkatransaction.DemoListener.listen1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; 'transactionManager'
14:28:33.839 DEBUG o.s.o.j.JpaTransactionManager - Opened new EntityManager [SessionImpl(151636314<open>)] for JPA transaction
14:28:33.842 DEBUG o.s.o.j.JpaTransactionManager - Exposing JPA transaction as JDBC [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@611e4b08]
14:28:33.842 INFO d.k.d.DemoListener - Received from topic1: test
14:28:33.842 INFO d.k.d.DemoListener - Sending to topic2: TEST
14:28:33.847 INFO d.k.d.DemoListener - Writing to database: test
14:28:33.848 DEBUG o.s.o.j.JpaTransactionManager - Found thread-bound EntityManager [SessionImpl(151636314<open>)] for JPA transaction
14:28:33.848 DEBUG o.s.o.j.JpaTransactionManager - Participating in existing transaction
14:28:33.856 DEBUG o.s.o.j.JpaTransactionManager - Initiating transaction commit
14:28:33.856 DEBUG o.s.o.j.JpaTransactionManager - Committing JPA transaction on EntityManager [SessionImpl(151636314<open>)]
14:28:33.868 DEBUG o.s.o.j.JpaTransactionManager - Closing JPA EntityManager [SessionImpl(151636314<open>)] after transaction
14:28:33.986 DEBUG o.s.k.t.KafkaTransactionManager - Initiating transaction commit
You can see that our logs start with KafkaTransactionManager
creating a new Kafka Transaction:
KafkaTransactionManager - Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
Right after the Kafka Transaction is created, the JpaTransactionManager
creates a Database Transaction as well:
JpaTransactionManager - Creating new transaction with name [demo.kafka.demokafkatransaction.DemoListener.listen1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; 'transactionManager'
Only after the Database Transaction is created, we will see our actual implementation being run:
DemoListener - Received from topic1: test
DemoListener - Sending to topic2: TEST
DemoListener - Writing to database: test
As we start writing into the Database, we can see that the JpaTransactionManager
discovers that a Database Transaction is already running and that it will take part into this transaction:
JpaTransactionManager - Participating in existing transaction
For each operation in the database we would see a line like the one above. And this is important because we want all of the operations to run within the same transaction.
As we reach the end of our method, we will see that the JpaTransactionManager
will init the commit of its transaction:
JpaTransactionManager - Initiating transaction commit
And right after that, the KafkaTransactionManager
will also commit its own transaction:
KafkaTransactionManager - Initiating transaction commit
That’s it. We can through the logs that our Kafka Transaction is wrapping the Database Transaction. It starts before the Database Transaction starts and is commited after the Database Transaction is commited.
@Transactional(“kafkaTransactionManager”)
Now let’s see what would happen if instead we give our @Transactional
annotation the KafkaTransactionManager
as its value.
Once again, let’s analyze the generated logs after we run our application:
14:30:29.741 DEBUG o.s.k.t.KafkaTransactionManager - Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
14:30:29.742 DEBUG o.s.k.t.KafkaTransactionManager - Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@f8fd0f4]]
14:30:29.751 DEBUG o.s.k.t.KafkaTransactionManager - Participating in existing transaction
14:30:29.751 INFO d.k.d.DemoListener - Received from topic1: test
14:30:29.751 INFO d.k.d.DemoListener - Sending to topic2: TEST
14:30:29.754 INFO d.k.d.DemoListener - Writing to database: test
14:30:29.757 DEBUG o.s.o.j.JpaTransactionManager - Creating new transaction with name [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
14:30:29.757 DEBUG o.s.o.j.JpaTransactionManager - Opened new EntityManager [SessionImpl(530662483<open>)] for JPA transaction
14:30:29.759 DEBUG o.s.o.j.JpaTransactionManager - Exposing JPA transaction as JDBC [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@74c4c7d0]
14:30:29.767 DEBUG o.s.o.j.JpaTransactionManager - Initiating transaction commit
14:30:29.767 DEBUG o.s.o.j.JpaTransactionManager - Committing JPA transaction on EntityManager [SessionImpl(530662483<open>)]
14:30:29.780 DEBUG o.s.o.j.JpaTransactionManager - Closing JPA EntityManager [SessionImpl(530662483<open>)] after transaction
14:30:29.895 DEBUG o.s.k.t.KafkaTransactionManager - Initiating transaction commit
Our logs start once again with KafkaTransactionManager creating a new transaction:
KafkaTransactionManager - Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
However, we can already see a difference right after the Kafka Transaction is created. This time, the KafkaTransactionManager says that it’s participating in an existing transaction:
KafkaTransactionManager - Participating in existing transaction
This is because our Kafka Transaction was first created by the KafkaMessageListenerContainer
. After it’s creation, the @Transactional
annotation tries to create another transaction, but fortunately, it identified that a transaction was already active and decided to participate in the active one.
Then, we can also see that no Database Transaction is created. Instead, our implementation starts executing right after the Kafka Transaction is created:
DemoListener - Received from topic1: test
DemoListener - Sending to topic2: TEST
DemoListener - Writing to database: test
And only once our implementation tries to write into the database, a Database Transaction is created:
JpaTransactionManager - Creating new transaction with name [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
And the following lines of code are very important because we can see that it is not our implementation that is triggering the creation of a Database Transaction, it is the save()
method from the JpaRepository
.
And it’s creating an actual transaction instead of participating in an existing one. This means that this will be commited as soon as the save()
method is completed:
JpaTransactionManager - Initiating transaction commit
If we were doing multiple operations in the database, we would see a new Database Transaction being created and commited for each one of them.
And finally, after all Database Transaction have been commited. We see that the KafkaTransactionManager
commits its transaction:
KafkaTransactionManager - Initiating transaction commit
Even though the Kafka Transaction would have failed if any of the Database Transactions failed, the previous successful Database Transactions wouldn’t have rolled back. This is not the behavior we are looking for.
Conclusion
In conclusion, handling distributed transactions in a microservice architecture backed by Apache Kafka and Spring Boot doesn’t have to be a daunting task. Though the deprecation of ChainedKafkaTransactionManager
may have initially created confusion, the solutions provided by Spring Kafka offer a streamlined approach to likely achieve atomicity across both Kafka and database transactions.
By configuring a few properties and using the @Transactional
annotation wisely, we can ensure that the Kafka transaction is only committed if the database transaction is also committed.
In my next story, I want to explore the Transactional Outbox Pattern, a pattern designed to ensure even stronger guarantees for transactional consistency in a distributed environment. Stay tuned!
GitHub
https://github.com/raphaeldelio/demo-kafka-transaction
Source
- https://github.com/spring-projects/spring-kafka/issues/1699
- Examples of Kafka Transactions with Other Transaction Managers
- https://github.com/spring-projects/spring-kafka/blob/ad5e754bf1c0b9c2566fe1ab900d33467ae07dd9/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc
- https://docs.spring.io/spring-framework/reference/data-access/transaction/declarative/annotations.html
Top comments (2)
Thank you for this article! I also created a Gitlab repository to test chaining Kafka and database transactions: gitlab.com/victor.gallet9/spring-k...
Don't hesitate to give me feedbacks
Very practical post, thank you for your sharing