对 kafka 中的消息进行 Avro 序列化


1. Avro 介绍

Avro 是一种数据序列化系统,由 Apache Foundation 开发并维护。它提供了丰富的数据结构类型,并且可以用于编码结构化数据。Avro 主要被用于 Apache Hadoop 这样的大数据处理系统中,它可以提供高效而且紧凑的序列化以及反序列化操作。

Avro 的特点包括:

  1. 基于 JSON 的模式定义:Avro 使用 JSON 格式定义数据结构,这使得模式易于阅读和编写。
  2. 编码与解码时模式独立:Avro 的一大特点是模式用于编码和解码,这允许对数据结构进行演化而无需修改所有读取数据的应用。这个特性对于长期存储大量数据的系统来说非常有用。
  3. 高效:Avro 的二进制编码非常紧凑,这使得 Avro 特别适合于大数据场景。它没有其他一些序列化系统的大量开销,比如标记字段名。
  4. 支持多种语言:Avro 支持许多编程语言,包括 Java, C, C++, C#, Python 和 Ruby 等。

总的来说,Avro 是一个强大的工具,可以有效地处理复杂的数据结构,并且它的设计理念适应于大数据处理,能有效地处理大规模数据的序列化和反序列化操作。

2. Avro 结合 kafka

在分布式数据流处理系统中,Avro 和 Kafka 的结合带来了很多重要的好处:

  1. 模式演化: Avro 支持模式演化,这就意味着你可以在不中断服务的情况下修改你的数据模式。这对于实时数据流处理系统来说非常重要,因为它允许系统在不停机的情况下进行修改和升级。
  2. 高效的序列化: Avro 提供了一种高效且紧凑的二进制数据格式,这对于 Kafka 这样的数据流处理系统来说非常重要。它可以帮助减少网络传输的开销和存储的需求。
  3. 跨语言支持: Avro 支持多种编程语言,这对于在多种语言编写的服务中使用 Kafka 的系统来说非常有利。这就意味着,无论服务是用 Java、Python 还是其他语言编写的,都可以通过 Avro 和 Kafka 进行有效的数据交换。
  4. 明确的数据契约: 由于 Avro 使用明确定义的模式,它提供了一种明确的数据契约,可以在生产者和消费者之间提供清晰的数据期望。这有助于提高系统的健壮性,并减少因数据不一致而产生的问题。

因此,结合 Avro 和 Kafka 可以帮助构建一个健壮、高效且易于维护和演化的实时数据流处理系统。

3. Avro json schema

UserBehavior.avsc, 注意 : 创建的文件后缀名一定要是 avsc

这里是一个 avro 的json描述, 请你进行解读
{
    "namespace": "com.hnbian", // 要生成  类的路径
    "type": "record", // 类型 avro 使用 record
    "name": "UserBehavior", // 会自动生成对应的类名 UserBehavior.java
    "fields": [ // 要指定的字段
        {"name": "userId", "type": "long"},
        {"name": "itemId",  "type": "long"},
        {"name": "categoryId", "type": "int"},
        {"name": "behavior", "type": "string"},
        {"name": "timestamp", "type": "long"}
    ]
}

添加 avro json schema 以后,编译 (mvn compile) 项目后将会自动在 com.hnbian 包下生成 UserBehavior.java 类

类型说明:

  • 除了上面写的类型, avro 还支持其他多种类型
  1. 基本类型(Primitive Types):
    • null: 表示空值或无值
    • boolean: 表示布尔值,true 或 false
    • int: 表示32位整数
    • long: 表示64位整数
    • float: 表示单精度浮点数
    • double: 表示双精度浮点数
    • bytes: 表示字节序列
    • string: 表示 Unicode 字符序列
  2. 复合类型(Complex Types):
    • record: 表示一组带名字(名称必须唯一)的字段,每个字段都有自己的类型。这与大多数编程语言中的类或结构体类似。
    • enum: 表示一组有名字的值,类似于 Java 的枚举。
    • array: 表示一种类型的连续值。
    • map: 表示一组键值对,其中键是 string 类型,值可以是任意类型。
    • union: 表示可以是多种类型的值,例如,一个字段的类型可以是 int 或 null。
    • fixed: 表示一个具有固定数目的字节的值。

以下是一个增加了其他类型的简单示例:

{
    "namespace": "com.hnbian",
    "type": "record",
    "name": "UserBehavior",
    "fields": [
        {"name": "userId", "type": "long"},
        {"name": "itemId",  "type": "long"},
        {"name": "categoryId", "type": "int"},
        {"name": "behavior", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "isPurchased", "type": "boolean"},  // 新添加的 boolean 类型字段
        {"name": "itemPrice", "type": "double"},  // 新添加的 double 类型字段
        {
            "name": "comments", 
            "type": { // 新添加的 array 类型字段
                "type": "array", 
                "items": "string"
            }
        },
        {
            "name": "attributes", 
            "type": { // 新添加的 map 类型字段
                "type": "map", 
                "values": "string"
            }
        },
        {
            "name": "rating", 
            "type": ["null", "int"] // 新添加的 union 类型字段
        }
    ]
}

4. maven 依赖与编译

4.1 maven 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>Kafka_avro_consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.2</version>
        </dependency>
    </dependencies>

    <!--使用插件可以在 Maven 构建生命周期中生成 Avro schemas 的 Java 类-->
    <build>
    <plugins>
        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>1.8.2</version>
            <executions>
                <execution>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                    </goals>
                    <configuration>
                        <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
    </build>

</project>

4.2 进行项目编译

编译后的 UserBehavior 类

5. 测试数据

543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000

6. 自定义序列化类

包含定义序列化与反序列化两个方法

package com.hnbian;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;

/**
 * 自定义序列化和反序列化
 */
public class SimpleAvroSchemaJava implements Serializer<UserBehavior>, Deserializer<UserBehavior> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }

    @Override
    public byte[] serialize(String s, UserBehavior userBehavior) {
        // 创建序列化执行器
        SpecificDatumWriter<UserBehavior> writer = new SpecificDatumWriter<>(userBehavior.getSchema());
        // 创建一个流 用存储序列化后的二进制文件
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        // 创建二进制编码器
        BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
        try {
            // 数据入都流中
            writer.write(userBehavior, encoder);
        } catch (IOException e) {
            e.printStackTrace();
        }

        return out.toByteArray();
    }

    @Override
    public void close() {}

    @Override
    public UserBehavior deserialize(String s, byte[] bytes) {
        // 用来保存结果数据
        UserBehavior userBehavior = new UserBehavior();
        // 创建输入流用来读取二进制文件
        ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(bytes);
        // 创建输入序列化执行器
        SpecificDatumReader<UserBehavior> stockSpecificDatumReader = new SpecificDatumReader<>(userBehavior.getSchema());
        // 创建二进制解码器
        BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(arrayInputStream, null);
        try {
            // 数据读取
            userBehavior = stockSpecificDatumReader.read(null, binaryDecoder);
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 结果返回
        return userBehavior;
    }
}

7. 编写 kafka 生产者类

package com.hnbian;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * 生产测试数据
 **/
public class UserBehaviorProducerKafka {
    public static void main(String[] args) throws InterruptedException {
        // 从 CSV 文件中获取数据
        List<UserBehavior> data = getData();

        // 创建 Kafka 配置文件
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:6667,node:6667,node3:6667");
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value.serializer 需要指定自定义的序列化类,否则将无效
        props.setProperty("value.serializer", SimpleAvroSchemaJava.class.getName());

        // 创建 Kafka 生产者
        KafkaProducer<String, UserBehavior> userBehaviorProducer = new KafkaProducer<>(props);

        // 遍历数据
        for (UserBehavior userBehavior : data) {
            // 创建 Kafka 消息
            ProducerRecord<String, UserBehavior> producerRecord 
                = new ProducerRecord<>("test", userBehavior);
            // 将消息发送到 Kafka
            userBehaviorProducer.send(producerRecord);
            System.out.println("数据写入成功" + data);
            // 每次发送数据后,线程休眠1秒
            Thread.sleep(1000);
        }
    }

    // 从 CSV 文件中读取数据的函数
    public static List<UserBehavior> getData() {
        // 初始化一个存放 UserBehavior 对象的列表
        ArrayList<UserBehavior> userBehaviors = new ArrayList<>();
        try {
            // 创建一个从 CSV 文件读取数据的 BufferedReader 对象
            BufferedReader br = new BufferedReader(new FileReader("UserBehavior.csv"));
            String line;
            // 读取文件中的每一行数据
            while ((line = br.readLine()) != null) {
                // 使用逗号分隔每一行数据,得到一个字符串数组
                String[] split = line.split(",");
                // 使用字符串数组中的数据创建一个 UserBehavior 对象,并添加到列表中
                userBehaviors.add(
                    new UserBehavior(
                        Long.parseLong(split[0])
                        , Long.parseLong(split[1])
                        , Integer.parseInt(split[2])
                        , split[3]
                        , Long.parseLong(split[4]))
                );
            }
        } catch (Exception e) {
            // 打印出任何异常信息
            e.printStackTrace();
        }
        // 返回包含所有 UserBehavior 对象的列表
        return userBehaviors;
    }
}

8. 编写kafka 消费者类

package com.hnbian;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/** 消费数据 测试数据 **/
public class UserBehaviorConsumer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put("bootstrap.servers", "node1:6667,node2:6667,node3:6667");
        prop.put("group.id", "UserBehavior");
        prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 设置反序列化类为自定义的avro反序列化类
        prop.put("value.deserializer", SimpleAvroSchemaJava.class.getName());

        KafkaConsumer<String, UserBehavior> consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singletonList("test"));
        while (true) {
            ConsumerRecords<String, UserBehavior> poll = consumer.poll(1000);
            for (ConsumerRecord<String, UserBehavior> stringStockConsumerRecord : poll) {
                System.out.println(stringStockConsumerRecord.value());
            }
        }
    }
}

9. 异常

在反序列化时可能遇见一些异常信息如下:

java.io.IOException: Failed to deserialize Avro record.
    at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:170)
    at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -53
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:122)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)

请检查你的序列化与反序列化时使用的 schema 是否一致, 包括字段名称, 字段类型, 字段顺序等等。

10. 其他

在进行反序列化时还有一种情况, 如果Kafka中的数据字段比较多, 但是我们不需要那么多的数据, 只需要其中的某些字段, avro 在进行反序列化的定义schema 的时候可以按字段顺序反序列化部分字段, 但是中间不能有空出来的字段

如序列化 schema 如下:

{
    "namespace": "com.hnbian",
    "type": "record",
    "name": "UserBehavior",
    "fields": [
        {"name": "userId", "type": "long"},
        {"name": "itemId",  "type": "long"},
        {"name": "categoryId", "type": "int"},
        {"name": "behavior", "type": "string"},
        {"name": "timestamp", "type": "long"}
    ]
}

在反序列化时, 可以定义

{
    "namespace": "com.hnbian",
    "type": "record",
    "name": "UserBehavior",
    "fields": [
        {"name": "userId", "type": "long"},
        {"name": "itemId",  "type": "long"},
        {"name": "categoryId", "type": "int"}
    ]
}
# 减掉了最后两个字段

但是不能如下定义:

{
    "namespace": "com.hnbian",
    "type": "record",
    "name": "UserBehavior",
    "fields": [
        {"name": "userId", "type": "long"},
        {"name": "itemId",  "type": "long"},
        {"name": "behavior", "type": "string"},
        {"name": "timestamp", "type": "long"}
    ]
}

# 减去了中间的 categoryId 字段
# 这时也会报反序列化异常 Failed to deserialize Avro record.

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
华为云编译好的 arm 架构Ambari 下载地址 华为云编译好的 arm 架构Ambari 下载地址
华为鲲鹏 aarch64 版本 Ambari HDP 下载地址 https://mirrors.huaweicloud.com/kunpeng/yum/el/7/bigdata/ ambarihttps://mirrors.huaweicl
2023-07-18
下一篇 
DolphinScheduler 部署文档 DolphinScheduler 部署文档
本文档部署了最新的 3.14 版本, 官网上的部署文档有些错误, 本文中进行了修复 1. DolphinScheduler 简介Apache DolphinScheduler 官网地址: https://dolphinscheduler
  目录