Kafka总结(四)kafka auto.offset.reset参数说明


1. 参数说明

  • 当各分区下有已提交的offset时(默认使用latest)
参数 说明
earliest 从提交的offset开始消费;无提交的offset时,从头开始消费
latest 从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none 从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  • 新建topic之后进行测试
1
2
3
4
5
6
7
8
# 创建一个topic 该topic有3个partition 和1个副本(replica)
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --replication-factor 1 --partitions 3 --topic test5
Created topic "test5".
[root@node1 kafka_2.12-1.0.2]# bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --describe --topic test5
Topic:test5 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test5 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test5 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: test5 Partition: 2 Leader: 2 Replicas: 2 Isr: 2

2. 测试代码

  • 发送数据代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;

/**
* @Author haonan.bian
* @Description //TODO
* @Date 2020-02-15 23:04
**/
public class Producer {
private static final String KAFKA_BOOTSTRAP_SERVERS = "hnode1:9092,hnode2:9092,hnode3:9092";
private static final String KAFKA_TOPIC= "test5";
//发送多少数据
private static final int bath = 30;

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers",KAFKA_BOOTSTRAP_SERVERS);
properties.put("metadata.broker.list", KAFKA_BOOTSTRAP_SERVERS);
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("producer.type", "sync"); //同步发送
properties.put("acks", "1"); //Leader写成功才算成功

KafkaProducer producer = null;
try{
producer = new KafkaProducer(properties);
for(int i=0;i<bath;i++){
ProducerRecord<String,String> record = new ProducerRecord<String, String>(KAFKA_TOPIC,String.valueOf(i),"message-"+i);
Future<RecordMetadata> future = producer.send(record);
System.out.printf("partition=%s, offset=%s, value=%s \n",future.get().partition(),future.get().offset(),"message-"+i);
}
}catch(Exception ex){
ex.printStackTrace();
}finally{
if(producer != null){
producer.close();
}
}
}
}

  • 消费数据代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* @Author haonan.bian
* @Description 消费者
* @Date 2020-02-15 23:06
**/
public class Consumer {
private static final String KAFKA_BOOTSTRAP_SERVERS = "hnode1:9092,hnode2:9092,hnode3:9092";
private static final String KAFKA_TOPIC= "test5";
public static void main(String args[]){
try{
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test5-group");
//是否开启自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(KAFKA_TOPIC));
while(true){
ConsumerRecords<String,String> records=consumer.poll(100);
if(records.count()==0){
System.out.println(System.currentTimeMillis()+"- waiting ...");
}else{
records.forEach(
record->
System.out.printf("partition: %s,offset: %s,key: %s,value: %s \n",
record.partition(),record.offset(),record.key(),record.value()))
;
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}

3. 测试新创建的topic

将 enable.auto.commit 设置为false,向topic tes3中发送20条消息,分别使用修改值为none。earliest、latest运行测试代码

  • none
1
2
3
4
5
6
7
8
9
10
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

//运行结果 抛出异常
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test5-2, test5-1, test5-0]
at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsets(Fetcher.java:425)
at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:284)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1783)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1151)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at com.hnbian.kafka.reset.Consumer.main(Consumer.java:60)
  • earliest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//运行结果 消费完所有数据
1581779920861- waiting ...
partition: 1,offset: 0,key: 4,value: message-4
...
partition: 1,offset: 9,key: 24,value: message-24
partition: 1,offset: 10,key: 26,value: message-26
partition: 2,offset: 0,key: 0,value: message-0
partition: 2,offset: 1,key: 2,value: message-2
...
partition: 2,offset: 5,key: 29,value: message-29
partition: 0,offset: 0,key: 1,value: message-1
...
partition: 0,offset: 11,key: 27,value: message-27
partition: 0,offset: 12,key: 28,value: message-28
1581779920992- waiting ...
1581779921095- waiting ...
1581779921202- waiting ...
  • latest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

// 运行结果,启动之后为消费到消息,生产者再次发送10条消息后消费到10条消息
1581780075451- waiting ...
1581780075556- waiting ...
1581780075661- waiting ...
1581780075761- waiting ...
partition: 2,offset: 6,key: 0,value: message-0
...
partition: 2,offset: 9,key: 9,value: message-9
1581780075965- waiting ...
1581780076070- waiting ...
1581780076174- waiting ...
1581780076277- waiting ...

  • 结论:当新建消费组时
参数 结果
none 没有为消费者组找到先前的offset值时,抛出异常
earliest 从头消费数据
latest 消费新的数据(从consumer创建开始,后生产的数据),之前产生的数据不消费。

4. 测试某一个分区有提交offset

  • 经过测试1 之后topic test5 中应该有40条数据,先消费 partition0 中的 5条数据并提交offset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
/**
* @Author haonan.bian
* @Description 消费 parititon 0 5条数据
* @Date 2020-02-15 23:25
**/
public class Consumer2 {
private static final String KAFKA_BOOTSTRAP_SERVERS = "hnode1:9092,hnode2:9092,hnode3:9092";
private static final String KAFKA_TOPIC= "test5";
public static void main(String[] args)
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test5-group");
//每次取出的数据最大数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
//是否开启自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

//topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);

consumer.assign(Arrays.asList(new TopicPartition(KAFKA_TOPIC,0)));
boolean b = true;
while(b){
//超时时间过后如果没有返回数据进行下一次轮询,
ConsumerRecords<String,String> records=consumer.poll(10000);

records.forEach(record-> System.out.printf("partition: %s,offset: %s,key: %s,value: %s \n",
record.partition(),record.offset(),record.key(),record.value()));

consumer.commitSync();
b=false;
}
}
}

partition: 0,offset: 0,key: 1,value: message-1
partition: 0,offset: 1,key: 5,value: message-5
partition: 0,offset: 2,key: 7,value: message-7
partition: 0,offset: 3,key: 8,value: message-8
partition: 0,offset: 4,key: 11,value: message-11
  • 查看topic 消费情况
1
2
3
[root@node1 kafka_2.12-1.0.2]# bin/kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group test5-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test5 0 5 17 12 - - -
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic名字 分区id 当前已消费的条数 总条数 未消费的条数 消费id 主机ip 客户端id
  • none
1
2
3
4
5
6
7
8
9
10
11
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

//运行结果, 因为 刚刚只提交了partition0 的offset 剩余两个partition没提交所以抛出异常
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test5-2, test5-1]
at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsets(Fetcher.java:425)
at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:284)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1783)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1151)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at com.hnbian.kafka.reset.Consumer.main(Consumer.java:53)

  • earliest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//运行结果 消费完剩下的35条数据
1581780913111- waiting ...
partition: 2,offset: 0,key: 0,value: message-0
...
partition: 2,offset: 8,key: 3,value: message-3
partition: 2,offset: 9,key: 9,value: message-9
partition: 1,offset: 0,key: 4,value: message-4
...
partition: 1,offset: 11,key: 4,value: message-4
partition: 1,offset: 12,key: 6,value: message-6
partition: 0,offset: 5,key: 15,value: message-15
partition: 0,offset: 6,key: 17,value: message-17
...
partition: 0,offset: 16,key: 8,value: message-8
1581780913244- waiting ...
1581780913350- waiting ...
1581780913451- waiting ...
  • latest
1
2
3
4
5
6
7
8
9
10
11
12
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

//运行结果 消费到partition0 剩余的数据,未消费到未提交offset的partition的数据

1581780970180- waiting ...
partition: 0,offset: 5,key: 15,value: message-15
partition: 0,offset: 6,key: 17,value: message-17
...
partition: 0,offset: 16,key: 8,value: message-8
1581780970310- waiting ...
1581780970413- waiting ...

  • 结论:当新建消费组时
参数 结果
none 当该topic下所有分区中存在未提交的offset时,抛出异常。
earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。

5. 将所有数据都消费完再生产数据

经过上个测试后 topic 里面已经有40条数据了,先消费完这40条数据后,再发送10条数据到topic,分别使用三个参数查看结果

查看消费情况

1
2
3
4
5
[root@node1 kafka_2.12-1.0.2]# bin/kafka-consumer-groups.sh --bootstrap-server node1:9092 --describe --group test5-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test5 2 10 10 0 - - -
test5 1 13 13 0 - - -
test5 0 17 17 0 - - -

生产10条数据并查看消费情况

1
2
3
4
5
6
[root@node1 kafka_2.12-1.0.2]# bin/kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group test5-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test5 2 10 14 4 - - -
test5 1 13 15 2 - - -
test5 0 17 21 4 - - -

  • none
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

//运行结果 消费完最新的10条数据
1581782489971- waiting ...
partition: 1,offset: 13,key: 4,value: message-4
partition: 1,offset: 14,key: 6,value: message-6
partition: 0,offset: 17,key: 1,value: message-1
...
partition: 0,offset: 20,key: 8,value: message-8
partition: 2,offset: 10,key: 0,value: message-0
...
partition: 2,offset: 13,key: 9,value: message-9
1581782490102- waiting ...
1581782490205- waiting ...


  • earliest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//运行结果 消费完最新的10条数据
1581782489971- waiting ...
partition: 1,offset: 13,key: 4,value: message-4
partition: 1,offset: 14,key: 6,value: message-6
partition: 0,offset: 17,key: 1,value: message-1
...
partition: 0,offset: 20,key: 8,value: message-8
partition: 2,offset: 10,key: 0,value: message-0
...
partition: 2,offset: 13,key: 9,value: message-9
1581782490102- waiting ...
1581782490205- waiting ...

  • latest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

//运行结果

//运行结果 消费完最新的10条数据
1581782489971- waiting ...
partition: 1,offset: 13,key: 4,value: message-4
partition: 1,offset: 14,key: 6,value: message-6
partition: 0,offset: 17,key: 1,value: message-1
...
partition: 0,offset: 20,key: 8,value: message-8
partition: 2,offset: 10,key: 0,value: message-0
...
partition: 2,offset: 13,key: 9,value: message-9
1581782490102- waiting ...
1581782490205- waiting ...

  • 结论:当新建消费组时
参数 结果
none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。

6. 修改group 组名进行消费

  • none
1
2
3
4
5
6
7
8
9
10
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

//运行结果 抛出异常
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test5-2, test5-1, test5-0]
at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsets(Fetcher.java:425)
at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:284)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1783)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1151)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at com.hnbian.kafka.reset.Consumer.main(Consumer.java:53)
  • earliest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//运行结果 消费完所有数据

1581782956257- waiting ...
partition: 1,offset: 0,key: 4,value: message-4
partition: 1,offset: 1,key: 6,value: message-6
partition: 1,offset: 2,key: 10,value: message-10
partition: 1,offset: 3,key: 12,value: message-12
...
partition: 1,offset: 14,key: 6,value: message-6
partition: 0,offset: 0,key: 1,value: message-1
partition: 0,offset: 1,key: 5,value: message-5
...
partition: 0,offset: 19,key: 7,value: message-7
partition: 0,offset: 20,key: 8,value: message-8
partition: 2,offset: 0,key: 0,value: message-0
...
partition: 2,offset: 13,key: 9,value: message-9
1581782956392- waiting ...
1581782956497- waiting ...
1581782956601- waiting ...

  • latest
1
2
3
4
5
6
7
8
9
10
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

//运行结果 未消费到数据 再次发送10条数据 全部消费到
1581783088831- waiting ...
1581783088932- waiting ...
partition: 2,offset: 14,key: 0,value: message-0
...
1581783089161- waiting ...
1581783089262- waiting ...

  • 结论
    组与组间的消费者是没有关系的。topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录