Kafka总结(四)kafka auto.offset.reset参数说明


1. 参数说明

  • 当各分区下有已提交的offset时(默认使用latest)
参数 说明
earliest 从提交的offset开始消费;无提交的offset时,从头开始消费
latest 从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none 从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  • 新建topic之后进行测试
# 创建一个topic 该topic有3个partition 和1个副本(replica)
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --replication-factor 1 --partitions 3 --topic test5
Created topic "test5".
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic test5
Topic:test5    PartitionCount:3    ReplicationFactor:1    Configs:
    Topic: test5    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test5    Partition: 1    Leader: 1    Replicas: 1    Isr: 1
    Topic: test5    Partition: 2    Leader: 2    Replicas: 2    Isr: 2

2. 测试代码

  • 发送数据代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;

/**
 * @Author haonan.bian
 * @Description //TODO
 * @Date 2020-02-15 23:04
 **/
public class Producer {
    private static final String KAFKA_BOOTSTRAP_SERVERS = "hnode1:9092,hnode2:9092,hnode3:9092";
    private static final String KAFKA_TOPIC= "test5";
    //发送多少数据
    private static final int bath = 30;

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers",KAFKA_BOOTSTRAP_SERVERS);
        properties.put("metadata.broker.list", KAFKA_BOOTSTRAP_SERVERS);
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("producer.type", "sync"); //同步发送
        properties.put("acks", "1"); //Leader写成功才算成功

        KafkaProducer producer = null;
        try{
            producer = new KafkaProducer(properties);
            for(int i=0;i<bath;i++){
                ProducerRecord<String,String> record = new ProducerRecord<String, String>(KAFKA_TOPIC,String.valueOf(i),"message-"+i);
                Future<RecordMetadata> future = producer.send(record);
                System.out.printf("partition=%s, offset=%s, value=%s  \n",future.get().partition(),future.get().offset(),"message-"+i);
            }
        }catch(Exception ex){
            ex.printStackTrace();
        }finally{
            if(producer != null){
                producer.close();
            }
        }
    }
}
  • 消费数据代码
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
 * @Author haonan.bian
 * @Description 消费者
 * @Date 2020-02-15 23:06
 **/
public class Consumer {
    private static final String KAFKA_BOOTSTRAP_SERVERS = "hnode1:9092,hnode2:9092,hnode3:9092";
    private static final String KAFKA_TOPIC= "test5";
    public static void main(String args[]){
        try{
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test5-group");
            //是否开启自动提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            //topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

            //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
            //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

            //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
            //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

            KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList(KAFKA_TOPIC));
            while(true){
                ConsumerRecords<String,String> records=consumer.poll(100);
                if(records.count()==0){
                    System.out.println(System.currentTimeMillis()+"- waiting ...");
                }else{
                    records.forEach(
                            record->
                                    System.out.printf("partition: %s,offset: %s,key: %s,value: %s \n",
                                            record.partition(),record.offset(),record.key(),record.value()))
                    ;
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

3. 测试新创建的topic

将 enable.auto.commit 设置为false,向topic tes3中发送20条消息,分别使用修改值为none。earliest、latest运行测试代码

  • none
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

//运行结果 抛出异常
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test5-2, test5-1, test5-0]
    at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsets(Fetcher.java:425)
    at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:284)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1783)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1151)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
    at com.hnbian.kafka.reset.Consumer.main(Consumer.java:60)
  • earliest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//运行结果 消费完所有数据
1581779920861- waiting ...
partition: 1,offset: 0,key: 4,value: message-4 
... 
partition: 1,offset: 9,key: 24,value: message-24 
partition: 1,offset: 10,key: 26,value: message-26 
partition: 2,offset: 0,key: 0,value: message-0 
partition: 2,offset: 1,key: 2,value: message-2 
...
partition: 2,offset: 5,key: 29,value: message-29 
partition: 0,offset: 0,key: 1,value: message-1 
... 
partition: 0,offset: 11,key: 27,value: message-27 
partition: 0,offset: 12,key: 28,value: message-28 
1581779920992- waiting ...
1581779921095- waiting ...
1581779921202- waiting ...
  • latest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

// 运行结果,启动之后为消费到消息,生产者再次发送10条消息后消费到10条消息
1581780075451- waiting ...
1581780075556- waiting ...
1581780075661- waiting ...
1581780075761- waiting ...
partition: 2,offset: 6,key: 0,value: message-0 
...
partition: 2,offset: 9,key: 9,value: message-9 
1581780075965- waiting ...
1581780076070- waiting ...
1581780076174- waiting ...
1581780076277- waiting ...
  • 结论:当新建消费组时
参数 结果
none 没有为消费者组找到先前的offset值时,抛出异常
earliest 从头消费数据
latest 消费新的数据(从consumer创建开始,后生产的数据),之前产生的数据不消费。

4. 测试某一个分区有提交offset

  • 经过测试1 之后topic test5 中应该有40条数据,先消费 partition0 中的 5条数据并提交offset
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
/**
 * @Author haonan.bian
 * @Description 消费 parititon 0 5条数据
 * @Date 2020-02-15 23:25
 **/
public class Consumer2 {
    private static final String KAFKA_BOOTSTRAP_SERVERS = "hnode1:9092,hnode2:9092,hnode3:9092";
    private static final String KAFKA_TOPIC= "test5";
    public static void main(String[] args) 
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test5-group");
        //每次取出的数据最大数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
        //是否开启自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

        //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        //当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);

        consumer.assign(Arrays.asList(new TopicPartition(KAFKA_TOPIC,0)));
        boolean b = true;
        while(b){
            //超时时间过后如果没有返回数据进行下一次轮询,
            ConsumerRecords<String,String> records=consumer.poll(10000);

            records.forEach(record-> System.out.printf("partition: %s,offset: %s,key: %s,value: %s \n",
                    record.partition(),record.offset(),record.key(),record.value()));

            consumer.commitSync();
            b=false;
        }
    }
}

partition: 0,offset: 0,key: 1,value: message-1 
partition: 0,offset: 1,key: 5,value: message-5 
partition: 0,offset: 2,key: 7,value: message-7 
partition: 0,offset: 3,key: 8,value: message-8 
partition: 0,offset: 4,key: 11,value: message-11
  • 查看topic 消费情况
[root@node1 kafka_2.12-1.0.2]# bin/kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group test5-group
TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST  CLIENT-ID
test5  0          5               17              12   -            -     -
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic名字 分区id 当前已消费的条数 总条数 未消费的条数 消费id 主机ip 客户端id
  • none
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

//运行结果, 因为 刚刚只提交了partition0 的offset 剩余两个partition没提交所以抛出异常
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test5-2, test5-1]
    at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsets(Fetcher.java:425)
    at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:284)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1783)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1151)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
    at com.hnbian.kafka.reset.Consumer.main(Consumer.java:53)
  • earliest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//运行结果 消费完剩下的35条数据
1581780913111- waiting ...
partition: 2,offset: 0,key: 0,value: message-0 
...
partition: 2,offset: 8,key: 3,value: message-3 
partition: 2,offset: 9,key: 9,value: message-9 
partition: 1,offset: 0,key: 4,value: message-4 
...
partition: 1,offset: 11,key: 4,value: message-4 
partition: 1,offset: 12,key: 6,value: message-6 
partition: 0,offset: 5,key: 15,value: message-15 
partition: 0,offset: 6,key: 17,value: message-17 
...
partition: 0,offset: 16,key: 8,value: message-8 
1581780913244- waiting ...
1581780913350- waiting ...
1581780913451- waiting ...
  • latest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

//运行结果 消费到partition0 剩余的数据,未消费到未提交offset的partition的数据

1581780970180- waiting ...
partition: 0,offset: 5,key: 15,value: message-15 
partition: 0,offset: 6,key: 17,value: message-17 
...
partition: 0,offset: 16,key: 8,value: message-8 
1581780970310- waiting ...
1581780970413- waiting ...
  • 结论:当新建消费组时
参数 结果
none 当该topic下所有分区中存在未提交的offset时,抛出异常。
earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。

5. 将所有数据都消费完再生产数据

经过上个测试后 topic 里面已经有40条数据了,先消费完这40条数据后,再发送10条数据到topic,分别使用三个参数查看结果

查看消费情况

[root@node1 kafka_2.12-1.0.2]# bin/kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group test5-group
TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST  CLIENT-ID
test5  2          10              10              0    -            -     -
test5  1          13              13              0    -            -     -
test5  0          17              17              0    -            -     -

生产10条数据并查看消费情况

[root@node1 kafka_2.12-1.0.2]# bin/kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group test5-group
TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID  HOST  CLIENT-ID
test5  2          10              14              4    -            -     -
test5  1          13              15              2    -            -     -
test5  0          17              21              4    -            -     -
  • none
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

//运行结果 消费完最新的10条数据
1581782489971- waiting ...
partition: 1,offset: 13,key: 4,value: message-4 
partition: 1,offset: 14,key: 6,value: message-6 
partition: 0,offset: 17,key: 1,value: message-1 
...
partition: 0,offset: 20,key: 8,value: message-8 
partition: 2,offset: 10,key: 0,value: message-0 
...
partition: 2,offset: 13,key: 9,value: message-9
1581782490102- waiting ...
1581782490205- waiting ...

  • earliest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//运行结果 消费完最新的10条数据
1581782489971- waiting ...
partition: 1,offset: 13,key: 4,value: message-4 
partition: 1,offset: 14,key: 6,value: message-6 
partition: 0,offset: 17,key: 1,value: message-1 
...
partition: 0,offset: 20,key: 8,value: message-8 
partition: 2,offset: 10,key: 0,value: message-0 
...
partition: 2,offset: 13,key: 9,value: message-9
1581782490102- waiting ...
1581782490205- waiting ...
  • latest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

//运行结果

//运行结果 消费完最新的10条数据
1581782489971- waiting ...
partition: 1,offset: 13,key: 4,value: message-4 
partition: 1,offset: 14,key: 6,value: message-6 
partition: 0,offset: 17,key: 1,value: message-1 
...
partition: 0,offset: 20,key: 8,value: message-8 
partition: 2,offset: 10,key: 0,value: message-0 
...
partition: 2,offset: 13,key: 9,value: message-9
1581782490102- waiting ...
1581782490205- waiting ...
  • 结论:当新建消费组时
参数 结果
none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。

6. 修改group 组名进行消费

  • none
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

//运行结果 抛出异常
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test5-2, test5-1, test5-0]
    at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsets(Fetcher.java:425)
    at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:284)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1783)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1151)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
    at com.hnbian.kafka.reset.Consumer.main(Consumer.java:53)
  • earliest

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//运行结果 消费完所有数据

1581782956257- waiting ...
partition: 1,offset: 0,key: 4,value: message-4 
partition: 1,offset: 1,key: 6,value: message-6 
partition: 1,offset: 2,key: 10,value: message-10 
partition: 1,offset: 3,key: 12,value: message-12 
...
partition: 1,offset: 14,key: 6,value: message-6 
partition: 0,offset: 0,key: 1,value: message-1 
partition: 0,offset: 1,key: 5,value: message-5 
...
partition: 0,offset: 19,key: 7,value: message-7 
partition: 0,offset: 20,key: 8,value: message-8 
partition: 2,offset: 0,key: 0,value: message-0 
...
partition: 2,offset: 13,key: 9,value: message-9 
1581782956392- waiting ...
1581782956497- waiting ...
1581782956601- waiting ...
  • latest
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

//运行结果 未消费到数据 再次发送10条数据 全部消费到
1581783088831- waiting ...
1581783088932- waiting ...
partition: 2,offset: 14,key: 0,value: message-0 
...
1581783089161- waiting ...
1581783089262- waiting ...
  • 结论
    组与组间的消费者是没有关系的。topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Java中线程状态与线程死锁 Java中线程状态与线程死锁
1. 什么是线程 线程(英语:thread)是操作系统能够进行运算调度的最小单位。大部分情况下,它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在
2020-02-15
下一篇 
Kafka总结(三)配置参数说明 Kafka总结(三)配置参数说明
1. Broker 相关配置 Name Description zookeeper.connect zookeeper集群的地址,逗号分割 advertised.host.name 已弃用:仅在未设
2020-02-12
  目录