1. 参数说明
当各分区下有已提交的offset时(默认使用latest)
参数
说明
earliest
从提交的offset开始消费;无提交的offset时,从头开始消费
latest
从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
[ root@node1 kafka_2.12-1.0.2]
Created topic "test5" .
[ root@node1 kafka_2.12-1.0.2]
Topic:test5 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test5 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test5 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: test5 Partition: 2 Leader: 2 Replicas: 2 Isr: 2
2. 测试代码
import org. apache. kafka. clients. producer. KafkaProducer;
import org. apache. kafka. clients. producer. ProducerRecord;
import org. apache. kafka. clients. producer. RecordMetadata;
import java. util. Properties;
import java. util. concurrent. Future;
public class Producer {
private static final String KAFKA_BOOTSTRAP_SERVERS = "hnode1:9092,hnode2:9092,hnode3:9092" ;
private static final String KAFKA_TOPIC= "test5" ;
private static final int bath = 30 ;
public static void main ( String[ ] args) {
Properties properties = new Properties ( ) ;
properties. put ( "bootstrap.servers" , KAFKA_BOOTSTRAP_SERVERS) ;
properties. put ( "metadata.broker.list" , KAFKA_BOOTSTRAP_SERVERS) ;
properties. put ( "key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ) ;
properties. put ( "value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ) ;
properties. put ( "producer.type" , "sync" ) ;
properties. put ( "acks" , "1" ) ;
KafkaProducer producer = null;
try {
producer = new KafkaProducer ( properties) ;
for ( int i= 0 ; i< bath; i++ ) {
ProducerRecord< String, String> record = new ProducerRecord < String, String> ( KAFKA_TOPIC, String. valueOf ( i) , "message-" + i) ;
Future< RecordMetadata> future = producer. send ( record) ;
System. out. printf ( "partition=%s, offset=%s, value=%s \n" , future. get ( ) . partition ( ) , future. get ( ) . offset ( ) , "message-" + i) ;
}
} catch ( Exception ex) {
ex. printStackTrace ( ) ;
} finally {
if ( producer != null) {
producer. close ( ) ;
}
}
}
}
import org. apache. kafka. clients. consumer. ConsumerConfig;
import org. apache. kafka. clients. consumer. ConsumerRecords;
import org. apache. kafka. clients. consumer. KafkaConsumer;
import java. util. Arrays;
import java. util. Properties;
public class Consumer {
private static final String KAFKA_BOOTSTRAP_SERVERS = "hnode1:9092,hnode2:9092,hnode3:9092" ;
private static final String KAFKA_TOPIC= "test5" ;
public static void main ( String args[ ] ) {
try {
Properties props = new Properties ( ) ;
props. put ( ConsumerConfig. BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS) ;
props. put ( ConsumerConfig. GROUP_ID_CONFIG, "test5-group" ) ;
props. put ( ConsumerConfig. ENABLE_AUTO_COMMIT_CONFIG, "false" ) ;
props. put ( ConsumerConfig. AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000" ) ;
props. put ( ConsumerConfig. SESSION_TIMEOUT_MS_CONFIG, "30000" ) ;
props. put ( ConsumerConfig. KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" ) ;
props. put ( ConsumerConfig. VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" ) ;
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "none" ) ;
KafkaConsumer< String, String> consumer= new KafkaConsumer < String, String> ( props) ;
consumer. subscribe ( Arrays. asList ( KAFKA_TOPIC) ) ;
while ( true ) {
ConsumerRecords< String, String> records= consumer. poll ( 100 ) ;
if ( records. count ( ) == 0 ) {
System. out. println ( System. currentTimeMillis ( ) + "- waiting ..." ) ;
} else {
records. forEach (
record- >
System. out. printf ( "partition: %s,offset: %s,key: %s,value: %s \n" ,
record. partition ( ) , record. offset ( ) , record. key ( ) , record. value ( ) ) )
;
}
}
} catch ( Exception e) {
e. printStackTrace ( ) ;
}
}
}
3. 测试新创建的topic 将 enable.auto.commit 设置为false,向topic tes3中发送20条消息,分别使用修改值为none。earliest、latest运行测试代码
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "none" ) ;
org. apache. kafka. clients. consumer. NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [ test5- 2 , test5- 1 , test5- 0 ]
at org. apache. kafka. clients. consumer. internals. Fetcher. resetOffsets ( Fetcher. java: 425 )
at org. apache. kafka. clients. consumer. internals. Fetcher. updateFetchPositions ( Fetcher. java: 284 )
at org. apache. kafka. clients. consumer. KafkaConsumer. updateFetchPositions ( KafkaConsumer. java: 1783 )
at org. apache. kafka. clients. consumer. KafkaConsumer. pollOnce ( KafkaConsumer. java: 1151 )
at org. apache. kafka. clients. consumer. KafkaConsumer. poll ( KafkaConsumer. java: 1111 )
at com. hnbian. kafka. reset. Consumer. main ( Consumer. java: 60 )
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest" ) ;
1581779920861 - waiting . . .
partition: 1 , offset: 0 , key: 4 , value: message- 4
. . .
partition: 1 , offset: 9 , key: 24 , value: message- 24
partition: 1 , offset: 10 , key: 26 , value: message- 26
partition: 2 , offset: 0 , key: 0 , value: message- 0
partition: 2 , offset: 1 , key: 2 , value: message- 2
. . .
partition: 2 , offset: 5 , key: 29 , value: message- 29
partition: 0 , offset: 0 , key: 1 , value: message- 1
. . .
partition: 0 , offset: 11 , key: 27 , value: message- 27
partition: 0 , offset: 12 , key: 28 , value: message- 28
1581779920992 - waiting . . .
1581779921095 - waiting . . .
1581779921202 - waiting . . .
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "latest" ) ;
1581780075451 - waiting . . .
1581780075556 - waiting . . .
1581780075661 - waiting . . .
1581780075761 - waiting . . .
partition: 2 , offset: 6 , key: 0 , value: message- 0
. . .
partition: 2 , offset: 9 , key: 9 , value: message- 9
1581780075965 - waiting . . .
1581780076070 - waiting . . .
1581780076174 - waiting . . .
1581780076277 - waiting . . .
参数
结果
none
没有为消费者组找到先前的offset值时,抛出异常
earliest
从头消费数据
latest
消费新的数据(从consumer创建开始,后生产的数据),之前产生的数据不消费。
4. 测试某一个分区有提交offset
经过测试1 之后topic test5 中应该有40条数据,先消费 partition0 中的 5条数据并提交offset
import org. apache. kafka. clients. consumer. ConsumerConfig;
import org. apache. kafka. clients. consumer. ConsumerRecords;
import org. apache. kafka. clients. consumer. KafkaConsumer;
import org. apache. kafka. common. TopicPartition;
import java. util. Arrays;
import java. util. Properties;
public class Consumer2 {
private static final String KAFKA_BOOTSTRAP_SERVERS = "hnode1:9092,hnode2:9092,hnode3:9092" ;
private static final String KAFKA_TOPIC= "test5" ;
public static void main ( String[ ] args)
Properties props = new Properties ( ) ;
props. put ( ConsumerConfig. BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS) ;
props. put ( ConsumerConfig. GROUP_ID_CONFIG, "test5-group" ) ;
props. put ( ConsumerConfig. MAX_POLL_RECORDS_CONFIG, 5 ) ;
props. put ( ConsumerConfig. ENABLE_AUTO_COMMIT_CONFIG, "false" ) ;
props. put ( ConsumerConfig. AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000" ) ;
props. put ( ConsumerConfig. SESSION_TIMEOUT_MS_CONFIG, "30000" ) ;
props. put ( ConsumerConfig. KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" ) ;
props. put ( ConsumerConfig. VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" ) ;
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest" ) ;
KafkaConsumer< String, String> consumer= new KafkaConsumer < String, String> ( props) ;
consumer. assign ( Arrays. asList ( new TopicPartition ( KAFKA_TOPIC, 0 ) ) ) ;
boolean b = true ;
while ( b) {
ConsumerRecords< String, String> records= consumer. poll ( 10000 ) ;
records. forEach ( record- > System. out. printf ( "partition: %s,offset: %s,key: %s,value: %s \n" ,
record. partition ( ) , record. offset ( ) , record. key ( ) , record. value ( ) ) ) ;
consumer. commitSync ( ) ;
b= false ;
}
}
}
partition: 0 , offset: 0 , key: 1 , value: message- 1
partition: 0 , offset: 1 , key: 5 , value: message- 5
partition: 0 , offset: 2 , key: 7 , value: message- 7
partition: 0 , offset: 3 , key: 8 , value: message- 8
partition: 0 , offset: 4 , key: 11 , value: message- 11
[ root@node1 kafka_2.12-1.0.2]
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test5 0 5 17 12 - - -
TOPIC
PARTITION
CURRENT-OFFSET
LOG-END-OFFSET
LAG
CONSUMER-ID
HOST
CLIENT-ID
topic名字
分区id
当前已消费的条数
总条数
未消费的条数
消费id
主机ip
客户端id
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "none" ) ;
org. apache. kafka. clients. consumer. NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [ test5- 2 , test5- 1 ]
at org. apache. kafka. clients. consumer. internals. Fetcher. resetOffsets ( Fetcher. java: 425 )
at org. apache. kafka. clients. consumer. internals. Fetcher. updateFetchPositions ( Fetcher. java: 284 )
at org. apache. kafka. clients. consumer. KafkaConsumer. updateFetchPositions ( KafkaConsumer. java: 1783 )
at org. apache. kafka. clients. consumer. KafkaConsumer. pollOnce ( KafkaConsumer. java: 1151 )
at org. apache. kafka. clients. consumer. KafkaConsumer. poll ( KafkaConsumer. java: 1111 )
at com. hnbian. kafka. reset. Consumer. main ( Consumer. java: 53 )
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest" ) ;
1581780913111 - waiting . . .
partition: 2 , offset: 0 , key: 0 , value: message- 0
. . .
partition: 2 , offset: 8 , key: 3 , value: message- 3
partition: 2 , offset: 9 , key: 9 , value: message- 9
partition: 1 , offset: 0 , key: 4 , value: message- 4
. . .
partition: 1 , offset: 11 , key: 4 , value: message- 4
partition: 1 , offset: 12 , key: 6 , value: message- 6
partition: 0 , offset: 5 , key: 15 , value: message- 15
partition: 0 , offset: 6 , key: 17 , value: message- 17
. . .
partition: 0 , offset: 16 , key: 8 , value: message- 8
1581780913244 - waiting . . .
1581780913350 - waiting . . .
1581780913451 - waiting . . .
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "latest" ) ;
1581780970180 - waiting . . .
partition: 0 , offset: 5 , key: 15 , value: message- 15
partition: 0 , offset: 6 , key: 17 , value: message- 17
. . .
partition: 0 , offset: 16 , key: 8 , value: message- 8
1581780970310 - waiting . . .
1581780970413 - waiting . . .
参数
结果
none
当该topic下所有分区中存在未提交的offset时,抛出异常。
earliest
当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest
当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
5. 将所有数据都消费完再生产数据 经过上个测试后 topic 里面已经有40条数据了,先消费完这40条数据后,再发送10条数据到topic,分别使用三个参数查看结果
查看消费情况
[ root@node1 kafka_2.12-1.0.2]
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test5 2 10 10 0 - - -
test5 1 13 13 0 - - -
test5 0 17 17 0 - - -
生产10条数据并查看消费情况
[ root@node1 kafka_2.12-1.0.2]
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test5 2 10 14 4 - - -
test5 1 13 15 2 - - -
test5 0 17 21 4 - - -
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "none" ) ;
1581782489971 - waiting . . .
partition: 1 , offset: 13 , key: 4 , value: message- 4
partition: 1 , offset: 14 , key: 6 , value: message- 6
partition: 0 , offset: 17 , key: 1 , value: message- 1
. . .
partition: 0 , offset: 20 , key: 8 , value: message- 8
partition: 2 , offset: 10 , key: 0 , value: message- 0
. . .
partition: 2 , offset: 13 , key: 9 , value: message- 9
1581782490102 - waiting . . .
1581782490205 - waiting . . .
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest" ) ;
1581782489971 - waiting . . .
partition: 1 , offset: 13 , key: 4 , value: message- 4
partition: 1 , offset: 14 , key: 6 , value: message- 6
partition: 0 , offset: 17 , key: 1 , value: message- 1
. . .
partition: 0 , offset: 20 , key: 8 , value: message- 8
partition: 2 , offset: 10 , key: 0 , value: message- 0
. . .
partition: 2 , offset: 13 , key: 9 , value: message- 9
1581782490102 - waiting . . .
1581782490205 - waiting . . .
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "latest" ) ;
1581782489971 - waiting . . .
partition: 1 , offset: 13 , key: 4 , value: message- 4
partition: 1 , offset: 14 , key: 6 , value: message- 6
partition: 0 , offset: 17 , key: 1 , value: message- 1
. . .
partition: 0 , offset: 20 , key: 8 , value: message- 8
partition: 2 , offset: 10 , key: 0 , value: message- 0
. . .
partition: 2 , offset: 13 , key: 9 , value: message- 9
1581782490102 - waiting . . .
1581782490205 - waiting . . .
参数
结果
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
earliest
当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest
当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
6. 修改group 组名进行消费
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "none" ) ;
org. apache. kafka. clients. consumer. NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [ test5- 2 , test5- 1 , test5- 0 ]
at org. apache. kafka. clients. consumer. internals. Fetcher. resetOffsets ( Fetcher. java: 425 )
at org. apache. kafka. clients. consumer. internals. Fetcher. updateFetchPositions ( Fetcher. java: 284 )
at org. apache. kafka. clients. consumer. KafkaConsumer. updateFetchPositions ( KafkaConsumer. java: 1783 )
at org. apache. kafka. clients. consumer. KafkaConsumer. pollOnce ( KafkaConsumer. java: 1151 )
at org. apache. kafka. clients. consumer. KafkaConsumer. poll ( KafkaConsumer. java: 1111 )
at com. hnbian. kafka. reset. Consumer. main ( Consumer. java: 53 )
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest" ) ;
1581782956257 - waiting . . .
partition: 1 , offset: 0 , key: 4 , value: message- 4
partition: 1 , offset: 1 , key: 6 , value: message- 6
partition: 1 , offset: 2 , key: 10 , value: message- 10
partition: 1 , offset: 3 , key: 12 , value: message- 12
. . .
partition: 1 , offset: 14 , key: 6 , value: message- 6
partition: 0 , offset: 0 , key: 1 , value: message- 1
partition: 0 , offset: 1 , key: 5 , value: message- 5
. . .
partition: 0 , offset: 19 , key: 7 , value: message- 7
partition: 0 , offset: 20 , key: 8 , value: message- 8
partition: 2 , offset: 0 , key: 0 , value: message- 0
. . .
partition: 2 , offset: 13 , key: 9 , value: message- 9
1581782956392 - waiting . . .
1581782956497 - waiting . . .
1581782956601 - waiting . . .
props. put ( ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "latest" ) ;
1581783088831 - waiting . . .
1581783088932 - waiting . . .
partition: 2 , offset: 14 , key: 0 , value: message- 0
. . .
1581783089161 - waiting . . .
1581783089262 - waiting . . .
结论 组与组间的消费者是没有关系的。topic中已有分组消费数据,新建其他分组ID的消费者时,之前分组提交的offset对新建的分组消费不起作用。