About auto.offset.reset in Spring Kafka
Historically, Apache Kafka uses offset for its messages. Depending on the needs of the consumer settings, three values can be set in the auto.offset.reset parameter: earliest, latest, none. By default, if this parameter is not specified, the value latest is used.
In this issue, I want to focus on the none parameter.
Contents
Earliest
This parameter is used if you need to receive messages from the very beginning of the topic. As a rule, this option makes sense if you need to fill the DB, transfer all values from one place to another, etc.
Latest
Default setting. It is used when we want to get actual data ie. that came at the last moment of time. There is also nothing unusual here.
None
The parameter for which I decided to write this article. This option does not define offset rules for new consumers, it throws exceptions in such cases.
The task before me was to write a microservice that translates data from one Kafka server to another (an experienced reader may note that there is KafkaConnect, Apache Kafka Mirror Maker, etc., but I did not choose the requirements for implementing this event through a microservice).
Of the special requirements, it is possible that at the initial launch it is necessary to set an offset equal to “current date – 24 hours” and from that moment start consuming messages (a lot of data comes in a day).
As always, an easier and faster solution comes to mind – to add the initial load flag and pass it at the start of the program, which was done (imagine that we do not use any database, because the business did not allocate money). But suddenly the application will consume data and crash after an hour, and all the data has already been taken? Duplication will occur in the topic being produced. What to do then?
Yes, you can bother and immediately remove the initial loading flag after starting, still the offset for the group will already exist. But this is a crutch solution that needs to be discarded.
After thinking for a while, I decided to rewrite the code (I actually thought about rewriting it when I encountered the above situation of consumer crash and duplicate data). I’ve always heard that the auto.offset.reset parameter has 3 parameters, two of which I’ve had experience building apps with, but the third – none – remained a mystery to me.
It seemed like a no-brainer to use none. But in the process of writing the code, I bombed harder and harder. The thing is that there are no examples of none anywhere, and I searched all of Google (or rather, the entire first page of the query). The official documentation says:
You can choose “no” if you want to set the initial offset and you are going to load with range errors manually.
But there are no examples of correct usage. And in other sources it is approximately the same, because “This is a specific parameter and is suitable for individual cases and we will not consider it.”
A day later I finally came to a solution that I intend to share. I admit, you are better at writing code than I am. I am far from a perfect programmer, and if you can tell me how to structure it best (or any other solution at all), I would be extremely grateful.
Code
First, set the application.yml parameter (application.properties) to none.
spring:
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: lalala-group-id
auto-offset-reset: none
enable-auto-commit: false
client-id: lalala-client-id
I also disabled enable.auto.commit
package org.example.service;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
@Service
@RequiredArgsConstructor
@Log4j2
public class SettingService {
private final ConsumerFactory consumerFactory;
@Value("${spring.kafka.topic.test}")
String topic;
@PostConstruct
public void checkAndResetOffsetsIfNeeded() {
Properties consumerProps = new Properties();
// Передаем конфигурацию через ConsumerFactory
consumerProps.putAll(consumerFactory.getConfigurationProperties());
try (KafkaConsumer consumer = new KafkaConsumer(consumerProps)) {
// Создаем временного консьюмера и берем партиции для нужного топика
List partitionInfos = consumer.partitionsFor(topic);
Set topicPartitions = new HashSet();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
}
// присваеваем консьюмеру партиции
consumer.assign(topicPartitions);
// смотрим нет ли уже для группы консьюмера закомиченных оффсетов в партициях
Map commitedOffsets = consumer.committed(topicPartitions);
Instant resetTime = Instant.now().minus(Duration.ofHours(24));
Map latestOffsets = consumer.endOffsets(topicPartitions);
for (TopicPartition topicPartition : topicPartitions) {
// Берем закомиченные оффсеты из мапы
OffsetAndMetadata commitedOffset = commitedOffsets.get(topicPartition);
Long latestOffset = latestOffsets.get(topicPartition);
// В следующем блоке смотрим, есть ли закомиченные оффсеты, если нет, то ставим оффсет, равный resetTime
if (commitedOffset == null || commitedOffset.offset() == -1L) {
Map offsetsForTimes = consumer
.offsetsForTimes(Collections.singletonMap(topicPartition, resetTime.toEpochMilli()));
OffsetAndTimestamp offsetAndTimestamp = offsetsForTimes.get(topicPartition);
long newOffset = offsetAndTimestamp != null ? offsetAndTimestamp.offset() : latestOffset;
log.info("Resetting offset for partition {}", topicPartition.partition());
// Переходим на новый оффсет и коммитим, т.к. у меня вырублен автокоммит
consumer.seek(topicPartition, newOffset);
consumer.commitSync();
// Если есть закомиченный оффсет уже, то ничего не делаем
} else {
log.info("Offset for partition {} is already commited at {}",
topicPartition.partition(), commitedOffset.offset());
}
}
log.info("closing consumer setting");
}
}
KafkaListener is configured by default:
@KafkaListener(
groupId = "${spring.kafka.consumer.group-id}",
topics = "${spring.kafka.topic.test}"
)
public void listenTopic(ConsumerRecord message, Acknowledgment acknowledgment) {
try {
// какие-нибудь операции
acknowledgment.acknowledge();
} catch (ProducerException e) {
log.error(e.getMessage());
acknowledgment.nack(Duration.ofSeconds(3));
}
}
The above code will create a temporary consumer before starting the KafkaListener, through which we will already set the offsets. The method for working with the temporary consumer is called via the @ PostConstruct annotation, which will first manipulate the temporary consumer, close it, and then fire our KafkaListener. There are no conflicts.
Through the CommandLineRunner interface, the method call will not work properly. KafkaListener will be started simultaneously with the temporary consumer and will wake up an exception related to offsets (I don’t remember exactly which one, experiment). I’ve also tried turning off autostart, but it requires a KafkaListener task id parameter, and I haven’t experimented with that, which would have resulted in even more code (and I don’t know if it would be easy to scale the number of consumers).
I also tried to manipulate the offsets through the processing of the errors that arise, but apparently I missed something there and I did not manage to implement it (I do not rule out that such an approach is quite possible).
This is how you can work with none (I am the first to give an example of how to work with this parameter). If you already have examples of work with this parameter, I’m glad to have a look, leave them in the comments 🙂
All the best to all residents of Khabarovsk!