1. Avro 介绍 Avro 是一种数据序列化系统,由 Apache Foundation 开发并维护。它提供了丰富的数据结构类型,并且可以用于编码结构化数据。Avro 主要被用于 Apache Hadoop 这样的大数据处理系统中,它可以提供高效而且紧凑的序列化以及反序列化操作。
Avro 的特点包括:
基于 JSON 的模式定义 :Avro 使用 JSON 格式定义数据结构,这使得模式易于阅读和编写。
编码与解码时模式独立 :Avro 的一大特点是模式用于编码和解码,这允许对数据结构进行演化而无需修改所有读取数据的应用。这个特性对于长期存储大量数据的系统来说非常有用。
高效 :Avro 的二进制编码非常紧凑,这使得 Avro 特别适合于大数据场景。它没有其他一些序列化系统的大量开销,比如标记字段名。
支持多种语言 :Avro 支持许多编程语言,包括 Java, C, C++, C#, Python 和 Ruby 等。
总的来说,Avro 是一个强大的工具,可以有效地处理复杂的数据结构,并且它的设计理念适应于大数据处理,能有效地处理大规模数据的序列化和反序列化操作。
2. Avro 结合 kafka 在分布式数据流处理系统中,Avro 和 Kafka 的结合带来了很多重要的好处:
模式演化: Avro 支持模式演化,这就意味着你可以在不中断服务的情况下修改你的数据模式。这对于实时数据流处理系统来说非常重要,因为它允许系统在不停机的情况下进行修改和升级。
高效的序列化: Avro 提供了一种高效且紧凑的二进制数据格式,这对于 Kafka 这样的数据流处理系统来说非常重要。它可以帮助减少网络传输的开销和存储的需求。
跨语言支持: Avro 支持多种编程语言,这对于在多种语言编写的服务中使用 Kafka 的系统来说非常有利。这就意味着,无论服务是用 Java、Python 还是其他语言编写的,都可以通过 Avro 和 Kafka 进行有效的数据交换。
明确的数据契约: 由于 Avro 使用明确定义的模式,它提供了一种明确的数据契约,可以在生产者和消费者之间提供清晰的数据期望。这有助于提高系统的健壮性,并减少因数据不一致而产生的问题。
因此,结合 Avro 和 Kafka 可以帮助构建一个健壮、高效且易于维护和演化的实时数据流处理系统。
3. Avro json schema UserBehavior.avsc, 注意 : 创建的文件后缀名一定要是 avsc
1 2 3 4 5 6 7 8 9 10 11 12 13 这里是一个 avro 的json描述, 请你进行解读 { "namespace" : "com.hnbian" , "type" : "record" , "name" : "UserBehavior" , "fields" : [ { "name" : "userId" , "type" : "long" } , { "name" : "itemId" , "type" : "long" } , { "name" : "categoryId" , "type" : "int" } , { "name" : "behavior" , "type" : "string" } , { "name" : "timestamp" , "type" : "long" } ] }
添加 avro json schema 以后,编译 (mvn compile) 项目后将会自动在 com.hnbian 包下生成 UserBehavior.java 类
类型说明:
基本类型(Primitive Types):
null: 表示空值或无值
boolean: 表示布尔值,true 或 false
int: 表示32位整数
long: 表示64位整数
float: 表示单精度浮点数
double: 表示双精度浮点数
bytes: 表示字节序列
string: 表示 Unicode 字符序列
复合类型(Complex Types):
record: 表示一组带名字(名称必须唯一)的字段,每个字段都有自己的类型。这与大多数编程语言中的类或结构体类似。
enum: 表示一组有名字的值,类似于 Java 的枚举。
array: 表示一种类型的连续值。
map: 表示一组键值对,其中键是 string 类型,值可以是任意类型。
union: 表示可以是多种类型的值,例如,一个字段的类型可以是 int 或 null。
fixed: 表示一个具有固定数目的字节的值。
以下是一个增加了其他类型的简单示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 { "namespace" : "com.hnbian" , "type" : "record" , "name" : "UserBehavior" , "fields" : [ { "name" : "userId" , "type" : "long" } , { "name" : "itemId" , "type" : "long" } , { "name" : "categoryId" , "type" : "int" } , { "name" : "behavior" , "type" : "string" } , { "name" : "timestamp" , "type" : "long" } , { "name" : "isPurchased" , "type" : "boolean" } , { "name" : "itemPrice" , "type" : "double" } , { "name" : "comments" , "type" : { "type" : "array" , "items" : "string" } } , { "name" : "attributes" , "type" : { "type" : "map" , "values" : "string" } } , { "name" : "rating" , "type" : [ "null" , "int" ] } ] }
4. maven 依赖与编译 4.1 maven 依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > org.example</groupId > <artifactId > Kafka_avro_consumer</artifactId > <version > 1.0-SNAPSHOT</version > <properties > <maven.compiler.source > 8</maven.compiler.source > <maven.compiler.target > 8</maven.compiler.target > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > </properties > <dependencies > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-kafka_2.11</artifactId > <version > 1.13.0</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-java_2.11</artifactId > <version > 1.13.0</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-clients_2.11</artifactId > <version > 1.13.0</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-avro</artifactId > <version > 1.13.0</version > </dependency > <dependency > <groupId > org.apache.avro</groupId > <artifactId > avro</artifactId > <version > 1.10.2</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.apache.avro</groupId > <artifactId > avro-maven-plugin</artifactId > <version > 1.8.2</version > <executions > <execution > <phase > generate-sources</phase > <goals > <goal > schema</goal > </goals > <configuration > <sourceDirectory > ${project.basedir}/src/main/avro/</sourceDirectory > <outputDirectory > ${project.basedir}/src/main/java/</outputDirectory > </configuration > </execution > </executions > </plugin > </plugins > </build > </project >
4.2 进行项目编译 编译后的 UserBehavior 类
5. 测试数据 1 2 3 4 5 6 7 543462,1715,1464116,pv,1511658000 662867,2244074,1575622,pv,1511658000 561558,3611281,965809,pv,1511658000 894923,3076029,1879194,pv,1511658000 834377,4541270,3738615,pv,1511658000 315321,942195,4339722,pv,1511658000 625915,1162383,570735,pv,1511658000
6. 自定义序列化类 包含定义序列化与反序列化两个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package com.hnbian;import org.apache.avro.io.BinaryDecoder;import org.apache.avro.io.BinaryEncoder;import org.apache.avro.io.DecoderFactory;import org.apache.avro.io.EncoderFactory;import org.apache.avro.specific.SpecificDatumReader;import org.apache.avro.specific.SpecificDatumWriter;import org.apache.kafka.common.serialization.Deserializer;import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayInputStream;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.util.Map;public class SimpleAvroSchemaJava implements Serializer <UserBehavior>, Deserializer<UserBehavior> { @Override public void configure (Map<String, ?> map, boolean b) { } @Override public byte [] serialize(String s, UserBehavior userBehavior) { SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter <>(userBehavior.getSchema()); ByteArrayOutputStream out = new ByteArrayOutputStream (); BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null ); try { writer.write(userBehavior, encoder); } catch (IOException e) { e.printStackTrace(); } return out.toByteArray(); } @Override public void close () {} @Override public UserBehavior deserialize (String s, byte [] bytes) { UserBehavior userBehavior = new UserBehavior (); ByteArrayInputStream arrayInputStream = new ByteArrayInputStream (bytes); SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader <>(userBehavior.getSchema()); BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null ); try { userBehavior = stockSpecificDatumReader.read(null , binaryDecoder); } catch (IOException e) { e.printStackTrace(); } return userBehavior; } }
7. 编写 kafka 生产者类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 package com.hnbian;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.io.BufferedReader;import java.io.FileReader;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class UserBehaviorProducerKafka { public static void main (String[] args) throws InterruptedException { List<UserBehavior> data = getData(); Properties props = new Properties (); props.setProperty("bootstrap.servers" , "node1:6667,node:6667,node3:6667" ); props.setProperty("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); props.setProperty("value.serializer" , SimpleAvroSchemaJava.class.getName()); KafkaProducer<String, UserBehavior> userBehaviorProducer = new KafkaProducer <>(props); for (UserBehavior userBehavior : data) { ProducerRecord<String, UserBehavior> producerRecord = new ProducerRecord <>("test" , userBehavior); userBehaviorProducer.send(producerRecord); System.out.println("数据写入成功" + data); Thread.sleep(1000 ); } } public static List<UserBehavior> getData () { ArrayList<UserBehavior> userBehaviors = new ArrayList <>(); try { BufferedReader br = new BufferedReader (new FileReader ("UserBehavior.csv" )); String line; while ((line = br.readLine()) != null ) { String[] split = line.split("," ); userBehaviors.add( new UserBehavior ( Long.parseLong(split[0 ]) , Long.parseLong(split[1 ]) , Integer.parseInt(split[2 ]) , split[3 ] , Long.parseLong(split[4 ])) ); } } catch (Exception e) { e.printStackTrace(); } return userBehaviors; } }
8. 编写kafka 消费者类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.hnbian;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections;import java.util.Properties;public class UserBehaviorConsumer { public static void main (String[] args) { Properties prop = new Properties (); prop.put("bootstrap.servers" , "node1:6667,node2:6667,node3:6667" ); prop.put("group.id" , "UserBehavior" ); prop.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); prop.put("value.deserializer" , SimpleAvroSchemaJava.class.getName()); KafkaConsumer<String, UserBehavior> consumer = new KafkaConsumer <>(prop); consumer.subscribe(Collections.singletonList("test" )); while (true ) { ConsumerRecords<String, UserBehavior> poll = consumer.poll(1000 ); for (ConsumerRecord<String, UserBehavior> stringStockConsumerRecord : poll) { System.out.println(stringStockConsumerRecord.value()); } } } }
9. 异常 在反序列化时可能遇见一些异常信息如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:170 ) at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78 ) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44 ) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142 ) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738 ) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94 ) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58 ) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99 ) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300 ) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704 ) at java.lang.Thread.run(Thread.java:748 ) Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -53 at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336 ) at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263 ) at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201 ) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422 ) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414 ) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181 ) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153 ) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232 ) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:122 ) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222 ) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175 ) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153 ) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145 ) at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167 )
请检查你的序列化与反序列化时使用的 schema 是否一致, 包括字段名称, 字段类型, 字段顺序等等。
10. 其他 在进行反序列化时还有一种情况, 如果Kafka中的数据字段比较多, 但是我们不需要那么多的数据, 只需要其中的某些字段, avro 在进行反序列化的定义schema 的时候可以按字段顺序反序列化部分字段, 但是中间不能有空出来的字段
如序列化 schema 如下:
1 2 3 4 5 6 7 8 9 10 11 12 { "namespace" : "com.hnbian" , "type" : "record" , "name" : "UserBehavior" , "fields" : [ {"name" : "userId" , "type" : "long" }, {"name" : "itemId" , "type" : "long" }, {"name" : "categoryId" , "type" : "int" }, {"name" : "behavior" , "type" : "string" }, {"name" : "timestamp" , "type" : "long" } ] }
在反序列化时, 可以定义
1 2 3 4 5 6 7 8 9 10 11 { "namespace" : "com.hnbian" , "type" : "record" , "name" : "UserBehavior" , "fields" : [ {"name" : "userId" , "type" : "long" }, {"name" : "itemId" , "type" : "long" }, {"name" : "categoryId" , "type" : "int" } ] }
但是不能如下定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 { "namespace" : "com.hnbian" , "type" : "record" , "name" : "UserBehavior" , "fields" : [ {"name" : "userId" , "type" : "long" }, {"name" : "itemId" , "type" : "long" }, {"name" : "behavior" , "type" : "string" }, {"name" : "timestamp" , "type" : "long" } ] }