How to work with Kafka-consumer in Spring projects

How to work with Kafka-consumer in Spring projects

This article is a collection of small recommendations on how to work with Kafka-consumer in Spring based on personal experience.

For those who are just starting to work with Kafka, I recommend that you first familiarize yourself with other reviews and then return here – Getting started

What Spring offers out of the box

In Spring, there are 2 ways to integrate kafka-consumer into the code:

  1. Via the @KafkaListener directive

  2. By implementing the MessageListener interface or one of its successors, this method will not be considered in the article

A typical processing code for a fictional topic looks like this:

@Service
class UserLikeActionKafkaListener {

    @KafkaListener(
        topics = ["user.action.like"]
    )
    fun onUserLikeAction(action: UserActionDto) {

    }
}

Spring-starter offers to work with Kafka according to the following scheme:

  1. Implemented a global config that applies to all consumers/containers

  2. Factories KafkaListenerContainerFactory/ConsumerFactory common to all processors

  3. To override the settings from point 1 in a specific handler, annotation parameters are used @KafkaListener

What is wrong with this scheme

In production code, I encountered the following situations:

  1. One application works with several Kafka clusters

  2. The topics have a heterogeneous message format – xml/json/avro or within the same format, the style of writing keys inside json differs (CamelCase vs SnakeCase)

  3. Topics sometimes receive invalid messages that cannot be processed (deserialization errors)

  4. Temporarily disabling topic processing

  5. Changing the offset for the topic in the consumer group

  6. Overriding settings for a single topic handler only

  7. Minimal monitoring and troubleshooting is required

Based on these situations, I formalize the requirements for Kafka consumers:

  1. The application supports work with several clusters

  2. Topic handlers are autonomous and isolated. Each topic has its own deserialization rules

  3. Disabling the topic handler and changing the offset without stopping the program completely

  4. Minimal monitoring of message processing and localization of problems

Analysis of requirements

Support for multiple clusters in the application

Out of the box, the starter cannot work with several clusters at once.

You can try to reuse the settings class KafkaProperties to describe other clusters, but I don’t like this option for 2 reasons:

  1. In this class, part of the Kafka parameters are separated into separate fields. Why this was done is not clear, because then everything is collected in a regular map

  2. Sometimes one application processes several topics from a cluster, and if there is more than one cluster, the settings will be duplicated

Kafka cluster settings boil down to Map<String, Object>.
To add support for multiple clusters to the application, let’s make the following configuration:

@ConfigurationProperties("kafka")
data class KafkaClusterProperties(
    val clusters: Map<String, KafkaCluster>
)

class KafkaCluster {
    lateinit var bootstrapServers: List<String>
    lateinit var properties: Map<String, String>
}

Example of filling (cluster settings will be considered general settings that correspond to consumer/producer at the same time)

kafka:
  clusters:
    prod:
      bootstrap-servers: ${embedded.kafka.brokerList}
      properties:
        client.id: my-application
        #security.protocol: SASL_PLAINTEXT
        #sasl.mechanism: PLAIN
        #sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="${embedded.kafka.saslPlaintext.user}" password="${embedded.kafka.saslPlaintext.password}";
        # Все настройки с префиксом - ssl.*

Autonomy and isolation of topic handlers

By default, Spring creates shared factories for all containers KafkaListenerContainerFactory/ConsumerFactory.
What problems does this cause:

  1. In the tops, different formats of messages and keys (starting from version 2.8 became available DelegatingByTopicDeserializer)

  2. Topics sometimes require specific interceptors that depend on the message model

  3. Changing bean settings specific to one topic can break other handlers

  4. Topics have a different number of partitions. A separate container is created for parallel processing of each partition.

A single container is capable of reading from multiple partitions, but this will happen sequentially as the container switches to other partitions in turn. In Kafka, the number of partitions in a topic is a unit of concurrency, so Spring immediately creates a separate container for each partition to speed up processing (the number of created containers depends on the concurrency parameter and cannot be more than a few partitions)

The solution to these problems is the creation of separate factories KafkaListenerContainerFactory/ConsumerFactory to handle each topic.

But before that, let’s briefly return to the instructions @KafkaListener. Above, I already mentioned what scheme the starter offers out of the box (general config for all consumers with the possibility to redefine them in the instructions).

That is, a mixture is obtained – part of the settings will be in the configuration, and part will be in the code.
But I usually see the following options:

@KafkaListener(
    topics = ["user.action.like"],
    groupId = "\${ser.action.like.kafka.group-id}"
)
fun onUserLikeAction(action: UserActionDto) = Unit

In Spring, there is no way to use ready-made application settings for one container, but it is not a good idea to nail them in the code, they are usually replaced by placeholders.

Therefore, I recommend not to specify the parameters in the annotation and leave them in the settings as well:

class KafkaContainerProperties {
    lateinit var cluster: String
    lateinit var properties: Map<String, String>
    var autoStartup: Boolean = true
	var batchListener: Boolean = false
    var concurrency: Int = 1
}

We return to factory setting:

const val USER_LIKE_ACTION_KAFKA_LISTENER_CONTAINER_FACTORY = "userLikeActionKafkaListenerContainerFactory"

@EnableKafka
@Configuration(proxyBeanMethods = true)
class KafkaConfiguration(
    private val kafkaClusterProperties: KafkaClusterProperties
) {

    @Bean
    @ConfigurationProperties("user.like.action.kafka.consumer")
    fun userLikeActionKafkaConsumerProperties() = KafkaContainerProperties()

    @Bean
    fun userLikeActionKafkaConsumerFactory(): DefaultKafkaConsumerFactory<String, UserActionDto> {
        return DefaultKafkaConsumerFactory(
            kafkaClusterProperties.buildConsumerProperties(userLikeActionKafkaConsumerProperties()),
            StringDeserializer(),
            buildDefaultJsonDeserializer<UserActionDto>()
        )
    }


    @Bean(USER_LIKE_ACTION_KAFKA_LISTENER_CONTAINER_FACTORY)
    fun userLikeActionKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, UserActionDto> {
        return with(userLikeActionKafkaConsumerProperties()) {
            ConcurrentKafkaListenerContainerFactory<String, UserActionDto>().also {
                it.consumerFactory = userLikeActionKafkaConsumerFactory()
                it.setConcurrency(concurrency)
                it.setAutoStartup(autoStartup)
                it.setBatchListener(batchListener)
            }
        }
    }
}

val FILE_PROPERTIES_NAMES = hashSetOf(    SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
    SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
)


fun KafkaClusterProperties.getCluster(name: String): KafkaCluster {
    return this.clusters[name] ?: throw IllegalArgumentException("Not found kafka-cluster")
}

fun KafkaClusterProperties.buildConsumerProperties(
    containerProperties: KafkaContainerProperties
): Map<String, Any> {
    val cluster = getCluster(containerProperties.cluster)

    cluster.properties

    val commonProperties = hashMapOf(
        CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG to cluster.bootstrapServers,
    )

    val result = (commonProperties + cluster.properties + containerProperties.properties).mapValues {
        if (FILE_PROPERTIES_NAMES.contains(it.key)) {
            resourceToPath(it.value as String)
        } else {
            it.value
        }
    }

    return result
}


private fun resourceToPath(path: String): String {
    try {
        return ResourceUtils.getFile(path).absolutePath
    } catch (ex: IOException) {
        throw IllegalStateException("Resource '$path' must be on a file system", ex)
    }
}

Disabling topic processing/changing topic offset in the group

With this problem, everything is not so bad. In Spring, a feature is available to disable listeners via a parameter autoStartup in the instructions @KafkaListener.

And it works as it should if you add it to the code beforehand. When setting a value autoStartup=false Spring will not create containers for processing topics (in the examples above, this parameter is assigned to the class KafkaContainerProperties).

Sometimes there is a situation when you need to change the offset in the group (for example, if the processing of the topic has stopped, it is solved by skipping the message).

This is done only in the case of a complete stop of the consumer group. The autoStartup parameter will come in handy for this.

But it won’t work if you use a starter. In the starter group id is set for all consumers at once, but not for a specific topic.

And if you stop the processor of one topic, it will not lead to anything, because in the consumer group, consumers of other topics will continue to work and Kafka will return an error when the offset is changed.

Therefore, the implementation in Spring is not very successful (this parameter is overridden in a new class with settings or in the annotation @KafkaListener).

spring:
  kafka:
    consumer:
      group-id: my-application

For a group of consumers, we will create the following rule – “For one group of consumers, only one topic is fixed.”

If more than one topic is attached to one consumer group, another performance problem will arise – the more topics in one consumer group, the longer offset commits are executed

Monitoring

Spring supports two types of metrics: container and consumer.

Consider only the metrics for the container. spring.kafka.listener.

This metric has a timer type, but it is also suitable for monitoring the number of processed messages (since the timer consists of 2 parts: the number of timer calls and the amount of work time).

The metric is informative, but there is one nuance related to the tag name. According to the documentation, the value of this tag is the bean name of the created container.

And if you don’t specifically configure the container, then it will matter org.springframework.kafka.KafkaListenerEndpointContainer#.
But it is easily corrected like this – in the instructions @KafkaListener add 2 parameters: id and idIsGroup=false.

The value of id will be the value of the tag nameand idIsGroup is required to prevent Spring from overriding the consumer group name with the parameter value id.

The final handler will be as follows:

@KafkaListener(
    id = "onUserLikeAction",
    idIsGroup = false,
    topics = ["user.action.like"],
    containerFactory = USER_LIKE_ACTION_KAFKA_LISTENER_CONTAINER_FACTORY,
)
fun onUserLikeAction(action: UserActionDto) = Unit

Parameter id is indicated in the same way for the correct formation of the stream name (in Spring, each created container is a separate stream).

When taking a thread-dump, this will allow you to receive information in a complete form.

In addition to metrics, it is also useful to do the following things:

  1. Specify the message key, even if it is not necessary (for example, to organize messages). The message key is meta-information and is needed to quickly understand which business entity (business operation) the message belongs to. This is useful when the topic has a binary message format or an invalid message is recorded.

  2. Each message relates to a business transaction in one way or another. Adding meta-information to message headers makes it easier to maintain applications, as they speed up problem localization

  3. Tracing connection. It is better to start tracing from the deserialization stage

Other recommendations

  1. Parameter auto.offset.reset active only in 2 situations: when the group first subscribes to the topic and when a partition is created in the topic. To avoid message loss when creating partitions, this parameter must be set to a value earliest(if the first subscription requires a different value, it should be replaced later)

  2. Spring has several offset commit strategies. One of them is a manual commit. If you do a commit after each write, it will greatly affect the processing performance.

  3. Kafka does not support semantics exactly-once and you need to determine in advance what is required when reprocessing the message

  4. Sooner or later, message processing errors will occur, which can be resolved as follows:

    • skip the message and after fixing the problem do the redirection of the event

    • completely stop processing the topic/partition until the problem is resolved

    • redirect unprocessed messages to retryable-topic and continue processing

Full code example/text on github

Instead of output


Kafka, like other brokers, is a complex tool. Adding a broker to a product for no good reason will do no good

Related posts