1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Properties;
public class Consumer2 { private static final String KAFKA_BOOTSTRAP_SERVERS = "hnode1:9092,hnode2:9092,hnode3:9092"; private static final String KAFKA_TOPIC= "test5"; public static void main(String[] args) Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test5-group"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
consumer.assign(Arrays.asList(new TopicPartition(KAFKA_TOPIC,0))); boolean b = true; while(b){ ConsumerRecords<String,String> records=consumer.poll(10000);
records.forEach(record-> System.out.printf("partition: %s,offset: %s,key: %s,value: %s \n", record.partition(),record.offset(),record.key(),record.value()));
consumer.commitSync(); b=false; } } }
partition: 0,offset: 0,key: 1,value: message-1 partition: 0,offset: 1,key: 5,value: message-5 partition: 0,offset: 2,key: 7,value: message-7 partition: 0,offset: 3,key: 8,value: message-8 partition: 0,offset: 4,key: 11,value: message-11
|