VGA

Spring Kafka Transaction Sample

The following Spring Boot application is an example of chaining database and Kafka transactions. This blog post comes from this gitlab repository.

Presentation

Spring Application behaviour

The application 1️⃣ ️consumes a message from Apache Kafka: the incoming message. Then the application has to do two things atomically:

  • 2️⃣ update the database
  • 3️⃣ produce an outgoing message

Finally, 4️⃣ the outgoing message can be consumed by another application.

What can go wrong ?

During this process, what can possibly go wrong ?

  • the update of the database goes wrong 2️⃣
  • the production of the outgoing message goes wrong 3️⃣
  • the consumer encounters an issue and throws an exception
  • the application encounters an issue during the process (e.g. it can crash or be killed)

In order to solve those potential issues, this project leverages the ability of chaining Kafka and Database transactions.

Database Transaction First

The DBTransactionCommittedFirst shows an example of a DB transaction committed first.

The listener container that starts the Kafka transaction and the @Transactional annotation starts the DB transaction. The DB transaction is committed first; if the Kafka transaction fails to commit, the record will be redelivered so the DB update should be idempotent. Examples of Kafka Transactions with Other Transaction Managers

The KafkaTemplate will synchronize its transaction with the DB transaction and the commit/rollback occurs after the database.

Database transaction committed first

As we can see the incoming message can be re-consumed if the production of the outgoing message fails. However, thanks to the max.block.ms we are able to detect shortly if the Kafka producer is not able to produce the message. And so:

  • the whole transaction is rollback
  • the incoming message is saved into a DLT topic
  • the offset of the incoming message is committed

A test suites of each potential failing cases has been implemented into the test directory src/test/kotlin/spring/transaction/sample/db/transaction/committed/first

o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@436ffa8c]]
o.s.j.d.DataSourceTransactionManager     : Creating new transaction with name [spring.transaction.sample.db.transaction.committed.first.DBTransactionCommittedFirst.listen1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; 'dstm'
o.s.j.d.DataSourceTransactionManager     : Acquired Connection [HikariProxyConnection@696812105 wrapping org.postgresql.jdbc.PgConnection@5256fbc] for JDBC transaction
o.s.j.d.DataSourceTransactionManager     : Switching JDBC Connection [HikariProxyConnection@696812105 wrapping org.postgresql.jdbc.PgConnection@5256fbc] to manual commit
o.s.t.i.TransactionInterceptor           : Getting transaction for [spring.transaction.sample.db.transaction.committed.first.DBTransactionCommittedFirst.listen1]
Consuming message happy_path - topic db.transaction.happy.incoming - offset 0
Inserting into DB
o.s.jdbc.core.JdbcTemplate               : Executing SQL statement [insert into mytable (data) values ('happy_path')]
Sending to KAFKA
org.apache.kafka.clients.Metadata        : [Producer clientId=producer-tx-0, transactionalId=tx-0] Resetting the last seen epoch of partition db.transaction.happy.outgoing-0 to 0 since the associated topicId changed from null to GQVLTjHdQBGGMjpMKWnARw
o.s.t.i.TransactionInterceptor           : Completing transaction for [spring.transaction.sample.db.transaction.committed.first.DBTransactionCommittedFirst.listen1]
o.s.j.d.DataSourceTransactionManager     : Initiating transaction commit
o.s.j.d.DataSourceTransactionManager     : Committing JDBC transaction on Connection [HikariProxyConnection@696812105 wrapping org.postgresql.jdbc.PgConnection@5256fbc]
o.s.j.d.DataSourceTransactionManager     : Releasing JDBC Connection [HikariProxyConnection@696812105 wrapping org.postgresql.jdbc.PgConnection@5256fbc] after transaction
o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Discovered group coordinator localhost:52474 (id: 0 rack: null)
o.s.jdbc.core.JdbcTemplate               : Executing SQL query [SELECT data from mytable]
o.s.jdbc.datasource.DataSourceUtils      : Fetching JDBC Connection from DataSource
o.s.k.t.KafkaTransactionManager          : Initiating transaction commit

Kafka Transaction First

The NestedKafkaTransactionCommittedFirst shows an example of a Kafka transaction committed first.

Kafka transaction committed first

Here we commit the Kafka transaction first, and only commit the DB transaction if the Kafka transaction is successful.

A test suites of each potential failing cases has been implemented into the test directory src/test/kotlin/spring/transaction/sample/kafka/transaction/committed/first

o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Invoking InitProducerId for the first time in order to acquire a producer ID
org.apache.kafka.clients.Metadata        : [Producer clientId=producer-tx-0, transactionalId=tx-0] Cluster ID: ki0qvTQ6TmmPKoQiqGc_8g
o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Discovered transaction coordinator localhost:52437 (id: 0 rack: null)
o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-tx-0, transactionalId=tx-0] ProducerId set to 0 with epoch 3
org.apache.kafka.clients.Metadata        : [Producer clientId=producer-tx-0, transactionalId=tx-0] Resetting the last seen epoch of partition kafka.transaction.happy.test.incoming-0 to 0 since the associated topicId changed from null to iyBcsb3sRnCzW0Dz3l8lWg
o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@70330f49]]
o.s.j.d.DataSourceTransactionManager     : Creating new transaction with name [spring.transaction.sample.kafka.transaction.committed.first.NestedKafkaTransactionCommittedFirst.listen1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; 'dstm'
o.s.j.d.DataSourceTransactionManager     : Acquired Connection [HikariProxyConnection@737186320 wrapping org.postgresql.jdbc.PgConnection@5911dd25] for JDBC transaction
o.s.j.d.DataSourceTransactionManager     : Switching JDBC Connection [HikariProxyConnection@737186320 wrapping org.postgresql.jdbc.PgConnection@5911dd25] to manual commit
o.s.t.i.TransactionInterceptor           : Getting transaction for [spring.transaction.sample.kafka.transaction.committed.first.NestedKafkaTransactionCommittedFirst.listen1]
Consuming message happy_path - topic kafka.transaction.happy.test.incoming - offset 0
Inserting into DB
o.s.jdbc.core.JdbcTemplate               : Executing SQL statement [insert into mytable (data) values ('happy_path')]
Sending to KAFKA
org.apache.kafka.clients.Metadata        : [Producer clientId=producer-tx-0, transactionalId=tx-0] Resetting the last seen epoch of partition kafka.transaction.happy.test.outgoing-0 to 0 since the associated topicId changed from null to Qzavf0KTSVKpsM7cOdKVLQ
o.s.t.i.TransactionInterceptor           : Completing transaction for [spring.transaction.sample.kafka.transaction.committed.first.NestedKafkaTransactionCommittedFirst.listen1]
o.s.j.d.DataSourceTransactionManager     : Initiating transaction commit
o.s.j.d.DataSourceTransactionManager     : Committing JDBC transaction on Connection [HikariProxyConnection@737186320 wrapping org.postgresql.jdbc.PgConnection@5911dd25]
o.s.j.d.DataSourceTransactionManager     : Releasing JDBC Connection [HikariProxyConnection@737186320 wrapping org.postgresql.jdbc.PgConnection@5911dd25] after transaction

In this case, an application’s crash at the right time may cause a loss in the database update.

Appendices

Kafka Transaction

Kafka Transaction was designed for the consume-process-produce pattern. The purpose is to ensure that there are no duplicates in Kafka’s log (idempotence) and that messages are written atomically across multiple partitions (transactions). This allows for stream processing applications to do transactional message processing.

Enable it:

  • Idempotent Producers

    • Set enable.idempotence = true on the producer. This ensures Kafka’s partitions are free of duplicate records
    • Set a unique transactional.id for the producer
  • Downstream consumers read committed transactional records with isolation.level = read_committed

Error Handler

When transactions are being used, no error handlers are configured, by default, so that the exception will roll back the transaction. Error handling for transactional containers are handled by the AfterRollbackProcessor. If you provide a custom error handler when using transactions, it must throw an exception if you want the transaction rolled back.

In this project, a AfterRollbackProcessor is created to avoid retry during the tests. However, a DeadLetterPublishingRecoverer is created and added. Therefor, a failing message is ack and publish in a Kafka Transaction to a DLT topic.

    @Bean
    fun rollbackProcessor(kafkaTemplate: KafkaTemplate<String, String>): AfterRollbackProcessor<Any?, Any?> {
      val deadLetterPublishingRecoverer = DeadLetterPublishingRecoverer(kafkaTemplate)
      return DefaultAfterRollbackProcessor<Any?, Any?>(deadLetterPublishingRecoverer, FixedBackOff(0L, 0L))
    }

Having a failing Kafka Producer

A too large record

Using a Kafka producer interceptor, we implemented a ToxicProducerInterceptor. It mutates the record to be larger than the request size limit set into the application.yml file.

Using Toxiproxy

To make the application disconnected with the Kafka broker, we implemented a ToxiStrimziKafkaContainer. A toxiproxy container is used to cut the bandwidth of the kafka node.

Avoid Poison Pill

Using the ErrorHandlingDeserializer with delegates key and value deserializer.

  kafka:
    consumer:
      # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

      properties:
        # Delegate deserializers
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

Now, when either the key or value delegate fails to deserialize a poison pill, the ErrorHandlingDeserializer returns a null value and adds a DeserializationException in a header containing the cause and the raw bytes.

If the ConsumerRecord contains a DeserializationException header for either the key or the value, the container’s ErrorHandler is called with the failed ConsumerRecord, and the record is not passed to the listener (the class or method annotated with @KafkaListener).

By default, the container’s error handler is the SeekToCurrentErrorHandler. By configuring the LoggingErrorHandler, we can log the content of the poison pill.