Codersee

Apache Kafka With Spring Boot and Kotlin

This image is a featured image for an article: Apache Kafka with Spring Boot and Kotlin. It contains a Spring Boot logo in the foreground and a photo of screen with code in the bacground.

1. Introduction

In this step-by-step guide, I will show you how to use Apache Kafka with Spring Boot and Kotlin.

When you finish this article, you will know precisely:

  • how to Produce and Consume String messages,
  • how to Publish and Consume JSON messages,
  • how to fix an infinite-loop problem with invalid JSON messages

Please keep in mind, that you should have Apache Kafka up and running on your local machine. If you would like to learn how to set it up, then check out my other articles under the Kafka tag.

2. Create New Project

Let’s start with creating a new Spring Boot project. As always, I highly encourage you to use the Spring Initializr page, when creating a new one:

The screenshot presents Spring Initializr page with selected Kafka and Spring Web dependencies and other settings.

To be on the same page, I’ve selected Spring Boot version 2.7.4 with Jar packaging and Java 17. Additionally, we will need two more dependencies:

  • Spring for Apache Kafka– necessary to work with our Kafka node,
  • Spring Web– although this one is not necessary, I’ve added it so that we can expose a test REST API endpoint.

3. Publish/Consume String Messages

As the first example, let’s take a look at how to publish and consume simple String messages.
 

Image shows two ebooks people can get for free after joining newsletter

 

3.1. application.yaml Configuration

After we import our Kafka Spring Boot Kotlin project into our favorite IDE (like IntelliJ, for instance), let’s navigate to the application.yaml file and put the following:

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:8098
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:8098
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

As we can see, these settings are responsible for setting both Kafka consumer and producer:

  • With bootstrap-servers, we point to the host and port, where we can reach out to our Kafka listener. Of course, depending on your node configuration this value may be different.
  • When it comes to the auto-offset-reset, it lets us specify consumer behavior when there is no initial offset in Kafka or if the current offset does not exist
  • With key-deserializer and value-deserializer, we specify the full qualifier to the deserializer we want to use. As the name suggests, we want to use the StringDeserializer for bot key and value deserialization.
  • Finally, the key-serializer and value-serializer let us do almost exactly the same, but for the serialization process.

3.2. Create Constants.kt File

As the next step, let’s create Constants.kt file inside the config package:

const val EXAMPLE_TOPIC_NAME = "someTopicOne"

const val GROUP_ID = "groupId"

As we can see, we put two String constants here with the example topic name and group ID.

Although this is not necessary, it will help us maintain our code a bit cleaner.

3.3. Implement String Messages Producer

Nextly, let’s implement an ExampleStringProducer class inside the producer package:

@Component
class ExampleStringProducer(
  private val kafkaTemplate: KafkaTemplate<String, String>
) {

  fun sendStringMessage(message: String) {
    kafkaTemplate.send(EXAMPLE_TOPIC_NAME, message)
  }

}

As can be seen, in order to send messages we have to inject a KafkaTemplate class. This generic class exposes methods for executing high-level operations and as type parameters (<String, String> in our case), we have to specify the desired key and the value types.

When it comes to the sendStringMessage function, it simply takes a String argument called message and sends it to the topic- called someTopicOne in our case.

3.4 Create Messages Consumer

With that being done, let’s implement a consumer called ExampleConsumer:

@Component
class ExampleConsumer {

  private val logger = LoggerFactory.getLogger(this.javaClass)

  @KafkaListener(topics = [EXAMPLE_TOPIC_NAME], groupId = GROUP_ID)
  fun firstListener(message: String) {
    logger.info("Message received: [$message]")
  }

}

This time, in order to make our firstListener function a target of a Kafka message listener on the specified topics, we have to mark it with the @KafkaListener annotation. Moreover, if we would like to handle multiple topics with this function, we could simply specify their names in the topics array.

It’s worth mentioning that with the groupId, we override the group ID property for the consumer factory, but for this particular listener only.

And to summarize this code snippet- the incoming message will be simply logged into the output.

3.5 Expose Test Controller

Finally, let’s expose a test REST endpoint, which we will use to send messages:

@RestController
class ExampleController(
  private val exampleStringProducer: ExampleStringProducer
) {

  @PostMapping("/test")
  @ResponseStatus(HttpStatus.NO_CONTENT)
  fun sendTestMessage(
    @RequestBody requestBody: RequestBodyDto
  ) {
    exampleStringProducer.sendStringMessage(
      message = requestBody.message
    )
  }

  data class RequestBodyDto(val message: String)
}

As we can see, this handler will be responding to the POST requests to the /test endpoint.

As a request body, we will send the message, which will be then passed to the sendStringMessage function and if everything works fine, we should get a 204 No Content response.

3.6 Test The Endpoint

In order to test our functionality, let’s run a couple of POST requests:

curl --location --request POST 'localhost:8080/test' \
--header 'Content-Type: application/json' \
--data-raw '{
    "message": "SomeMessage"
}'

As a result, we should see our messages printed to the output, like these:

Message received: [Some Example Message]
Message received: [Another one]

4. Kafka Spring Boot Kotlin JSON Messages

With all of that being done, we know precisely how to send and retrieve String messages.

So as the next thing, let’s learn how to publish and consume JSON Kafka messages in our Spring Boot Kotlin project.

4.1. Create DTOs

This time, let’s start a bit differently (and you will see why later) , by creating DTOs- ExampleDto and UserDto inside the dto package:

// ExampleDto.kt file:
data class ExampleDto(val someMessage: String)

// UserDto.kt file:
data class UserDto(val id: Long, val name: String)

We will use them later to transport our messages.

4.2. Add More Constants

As the next step, let’s introduce two additional topics names to the Constants.kt:

const val EXAMPLE_TOPIC_NAME_TWO = "someTopicTwo"
const val EXAMPLE_TOPIC_NAME_THREE = "someTopicThree"

4.3. Implement JSON Messages Producer

Nextly, let’s create an ExampleJsonProducer class:

@Component
class ExampleJsonProducer(
  private val exampleDtoKafkaTemplate: KafkaTemplate<String, ExampleDto>,
  private val userDtoKafkaTemplate: KafkaTemplate<String, UserDto>
) {

  fun sendExampleDtoMessage(dto: ExampleDto) {
    exampleDtoKafkaTemplate.send(EXAMPLE_TOPIC_NAME_TWO, dto)
  }

  fun sendUserDtoMessage(dto: UserDto) {
    userDtoKafkaTemplate.send(EXAMPLE_TOPIC_NAME_THREE, dto)
  }
}

This time, we inject two different KafkaTemplates– each one is responsible for handling specific DTO objects. Apart from that, everything looks almost the same as in paragraph 3- with different constants used.

4.4. Create JSON Messages Consumer

Following, let’s add two more listeners to the ExampleConsumer class:

@KafkaListener(topics = [EXAMPLE_TOPIC_NAME_TWO], groupId = GROUP_ID)
fun secondListener(message: ExampleDto) {
  logger.info("Message received: [$message]")
}

@KafkaListener(topics = [EXAMPLE_TOPIC_NAME_THREE], groupId = GROUP_ID)
fun secondListener(message: UserDto) {
  logger.info("Message received: [$message]")
}

As we can clearly see, nothing changed except the topics’ names.

4.5. Edit ExampleController

One of the last things we have to do are two more updates inside the ExampleController.

As the first one, we have to inject our new producer:

@RestController
class ExampleController(
  private val exampleStringProducer: ExampleStringProducer,
  private val exampleJsonProducer: ExampleJsonProducer
)

And add two more invocations inside the sendTestMessage function:

exampleJsonProducer.sendExampleDtoMessage(
  dto = ExampleDto(requestBody.message)
)
exampleJsonProducer.sendUserDtoMessage(
  dto = UserDto(
    id = Random.nextLong(0, 100),
    name = requestBody.message
  )
)

As we can see, when we query our endpoint with some payload, we expect that the provided message will be sent to our new topics. Moreover, in the case of UserDto, the id will be a randomly generated Long value.

4.6. application.yaml File

As the last thing before testing, let’s edit the application.yaml file a bit:

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:8098
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    producer:
      bootstrap-servers: localhost:8098
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

The only values we want to change here are value-deserializer for our consumer and value-serializer for our producer. In order to work with JSON payloads, we have to utilize the JsonDeserializer and JsonSerializer here.

4.7. Fix an infinite-loop problem

The title is a bit of a spoiler here (and the main reason we didn’t start paragraph 4 with a YAML file), but let’s run the application and query our endpoint:

curl --location --request POST 'localhost:8080/test' \
--header 'Content-Type: application/json' \
--data-raw '{
    "message": "SomeMessage"
}'

After that, when we check the logs, we will see an infinite loop with a thrown exception.

If we stop the application (or you are a robot :D), we will see the following message (among others):

This error handler cannot process ‘SerializationException’s directly; please consider configuring an ‘ErrorHandlingDeserializer’ in the value and/or key deserializer

Well, this message is pretty descriptive so when we apply this hint, the application.yaml file looks like this:

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:8098
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
          deserializer:
            value:
              delegate:
                class: org.springframework.kafka.support.serializer.JsonDeserializer
    producer:
      bootstrap-servers: localhost:8098
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Basically, as a fix, we have to utilize the ErrorHandlingDeserializer class, which lets Spring resolve the deserialization issue the consumer’s poll() method resolves.

To do so with a YAML config, we have to:

  • set org.springframework.kafka.support.serializer.ErrorHandlingDeserializer as a value-deserializer,
  • and add properties.spring.deserializer.value.delegate property delegating to JsonDeserializer

4.8. Fix Trusted Packages Issue

And again, spoiler alert 😀

Let’s rerun the POST request and see what happens:

curl --location --request POST 'localhost:8080/test' \
--header 'Content-Type: application/json' \
--data-raw '{
    "message": "SomeMessage"
}'

As we can clearly see, we got rid of the infinite loop, but still, we can’t read messages.

And this time message is pretty descriptive, as well:

The class ‘com.codersee.kafkaexample.dto.ExampleDto’ is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

Similarly, to fix this issue we have to add new property- properties.spring.json.trusted.packages– and either point to the package, where our DTOs live or set a wildcard.

In my case, the final application.yaml file looks, as follows:

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:8098
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: "com.codersee.kafkaexample.dto"
          deserializer:
            value:
              delegate:
                class: org.springframework.kafka.support.serializer.JsonDeserializer
    producer:
      bootstrap-servers: localhost:8098
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

4.9. Final Testing

Finally, we can rerun our beloved Spring Boot Kafka with Kotlin application and try the request once again:

curl --location --request POST 'localhost:8080/test' \
--header 'Content-Type: application/json' \
--data-raw '{
    "message": "SomeMessage"
}'

As a result, we should see the following:

Message received: [SomeMessage]
Message received: [ExampleDto(someMessage=SomeMessage)]
Message received: [UserDto(id=26, name=SomeMessage)]

As we can see, all 3 Apache Kafka producers and consumers are working, as expected (and if no, then let me know in the comment section).

5. Apache Kafka With Spring Boot and Kotlin Summary

And that would be all for this article about how to work with Apache Kafka With Spring Boot and Kotlin. If you’d like to see the whole project source code, then visit this GitHub repository.

I really hope that you enjoyed this step-by-step guide and will be delighted if you would like to share your feedback with me and the others in the comments section below.

Take care and have a great day! 😀

Leave a Reply

Your email address will not be published. Required fields are marked *

Categories

Author

Hi there! 👋

Hi there! 👋

My name is Piotr and I've created Codersee to share my knowledge about Kotlin, Spring Framework, and other related topics through practical, step-by-step guides. Always eager to chat and exchange knowledge.

Join the FREE weekly newsletter and get two free eBooks:

Image shows the covers of free ebooks accessible for newsletter subscribers.

You may opt out any time. Terms of Use and Privacy Policy.

To make Codersee work, we log user data. By using our site, you agree to our Privacy Policy and Terms of Use.