Flink系列 11. 介绍Flink中 ProcessFunction 的使用


1. ProcessFunction 介绍

1.1 介绍

Flink一般的转换算子是无法访问事件的时间戳信息和WaterMark信息的。例如 MapFunction 这样的 map 转换算子就无法访问时间戳或者当前事件的事件时间。而这在某些应用场景下,这些信息确极为重要。基于此 DataStream API 提供了一系列的 Low-Level 转换算子,可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。

Process Function 是用来构建事件驱动的应用以及实现自定义的业务逻辑 ( 使用之前的 window 函数和转换算子无法实现 ) 。例如 Flink SQL 就是使用 Process Function 实现的。所有的 Process Function 都继承自 RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。

1.2 内部构件

ProcessFunction 是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:

  • events:数据流中的元素

  • state:状态,用于容错和一致性,仅用于keyed stream

  • timers:定时器,支持事件时间和处理时间,仅用于keyed stream

1.3 分类

Flink 提供了 8 个 Process Function:

  1. ProcessFunction:用于DataStream流数据处理

  2. KeyedProcessFunction:用于keyBy之后的KeyedStream流处理

  3. CoProcessFunction:用于connect连接的流处理

  4. ProcessJoinFunction:用于join流操作

  5. BroadcastProcessFunction:用于广播

  6. KeyedBroadcastProcessFunction:keyBy之后的广播

  7. ProcessWindowFunction:窗口增量聚合

  8. ProcessAllWindowFunction:全窗口增量聚合

2. ProcessFunction

2.1 ProcessFunction 介绍

  • 可以使用 steam.process(ProcessFunction)的方式使用 ProcessFunction
  • 从 ProcessFunction类图可见,它有RichFunction的特性open、close,也有两个重要的方法processElement和onTimer
ProcessFunction 继承图

2.2 ProcessFunction 源码

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

   private static final long serialVersionUID = 1L;

   /**
    * ProcessFunction处理数据的主要方法,处理输入流中的每个元素
    * 此函数可以使用Collector输出零个或多个元素,
    * 还可以使用Context参数更新内部状态或设置计时器。
    *
    * @param value 输入类型.
    * @param ctx 上下文
    * @param out 使用 collector 输出数据
    */
   public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

   /**
    * 在使用TimerService设置的计时器触发时调用。
    */
   public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

   /**
    * 上下文
    */
   public abstract class Context {

      /**
       * 当前正在处理的元素的时间戳或触发计时器的时间戳
       */
      public abstract Long timestamp();

      /**
       * 用于注册计时器和查询时间的TimerService
       */
      public abstract TimerService timerService();

      /**
       * 向OutputTag标识的侧输出流发出记录
       *
       * @param outputTag 指定侧输出流
       * @param value 发送的记录
       */
      public abstract <X> void output(OutputTag<X> outputTag, X value);
   }

   /**
    * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
    */
   public abstract class OnTimerContext extends Context {
      /**
       * 触发计时器的 TimeDomain 
       */
      public abstract TimeDomain timeDomain();
   }

}

2.3 测试代码

import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TestProcessFunction extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

  stream1
    .map(data => {
      val arr = data.split(",")
      Obj1(arr(0), arr(1), arr(2).toLong)
    })
    .process(new CustomProcessFunction)
    .print("TestProcessFunction")
  env.execute()
}

class CustomProcessFunction extends ProcessFunction[Obj1,String]{

  /**
    * 处理流中的每个数据,返回 ID 大于 10 的数据与处理数据的 processTime
    * @param value
    * @param ctx
    * @param out
    */
  override def processElement(value: Obj1, ctx: ProcessFunction[Obj1, String]#Context, out: Collector[String]): Unit = {
    if(value.id > "10"){
      out.collect(s"${value.name},${ctx.timerService().currentProcessingTime()}")
    }
  }
}

3. KeyedProcessFunction

3.1 介绍

  • KeyedProcessFunction 用来处理 KeyedStream 中的数据。
  • KeyedProcessFunction[KEY, IN, OUT] 还额外提供了两个方法:
/**
 * 流中的每一个元素都会在这个方法中进行处理
 * 参数说明
 * Collector :将处理完成的数据输出,可能输出 0 到多个结果
 * Context:可以访问元素的时间戳,元素的 key,以及  TimerService 时间服务。
 *        : 还可以将结果输出到别的流(side outputs)。
 * I :输入数据类型
 */
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;


/**
 * onTimer是一个回调函数。当之前注册的定时器触发时调用。
 * 参数说明
 * timestamp:为定时器所设定的触发的时间戳。
 * Collector:为输出结果的集合。
 * OnTimerContext:和processElement的Context参数一样,提供上下文的一些信息,例如定时器触发的时间信息
 */
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

3.2 测试代码

import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TestKeyedProcessFunction extends App {

  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)

  stream1
    .map(data => {
      val arr = data.split(",")
      Obj1(arr(0), arr(1), arr(2).toLong)
    })
    .keyBy(_.id)
    .process(new CustomKeyedProcessFunction)
    .print("TestKeyedProcessFunction")
  env.execute()
}

/**
  * KeyedProcessFunction
  * String, 输入的 key 的数据类型
  * Obj1, 输入的数据类型
  * String 输出的数据类型
  */
class CustomKeyedProcessFunction extends KeyedProcessFunction[String, Obj1, String]{

  override def processElement(value: Obj1, ctx: KeyedProcessFunction[String, Obj1, String]#Context, out: Collector[String]): Unit = {
    println(s"当前 key:${ctx.getCurrentKey}")
    println(s"当前 ProcessingTime:${ctx.timerService().currentProcessingTime()}")

    out.collect(value.name)
  }
}

4. CoProcessFunction

  • DataStream API 提供了 CoProcessFunction 这样的 low-level 操作来对于两条输入流进行处理

  • CoProcessFunction 提供了操作每一个输入流的方法: processElement1()processElement2()

  • 类似于 ProcessFunction 这两种方法都通过 Context 对象来调用。这个 Context 对象可以访问事件数据,定时器时间戳,TimerService,以及 side outputs。CoProcessFunction 也提供了 onTimer()回调函数。

import com.hnbian.flink.common.Obj1
import com.hnbian.flink.common.Record
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TestCoProcessFunction extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
  val stream2: DataStream[String] = env.socketTextStream("localhost",8888)

  private val stream1Obj: DataStream[Obj1] = stream1
    .map(data => {
      val arr = data.split(",")
      Obj1(arr(0), arr(1), arr(2).toLong)
    })

  val stream2Rec: DataStream[Record] = stream2.map(data => {
    val arr = data.split(",")
    Record(arr(0), arr(1), arr(2).toInt)
  })

  stream1Obj
    .connect(stream2Rec)
    .process(new CustomCoProcessFunction)
    .print()

  env.execute()
}

/**
  * 第一个流输入类型为 Obj1
  * 第二个流输入类型为 Record
  * 返回类型为 String
  */
class CustomCoProcessFunction extends CoProcessFunction[Obj1,Record,String]{
  override def processElement1(value: Obj1, ctx: CoProcessFunction[Obj1, Record, String]#Context, out: Collector[String]): Unit = {
    out.collect(s"processElement1:${value.name},${value.getClass}")
  }

  override def processElement2(value: Record, ctx: CoProcessFunction[Obj1, Record, String]#Context, out: Collector[String]): Unit = {
    out.collect(s"processElement2:${value.name},${value.getClass}")
  }
}

5. ProcessJoinFunction

ProcessJoinFunction 是一个接口,它定义了对在窗口中的两个流进行内连接操作时应用的函数。此函数会接收两个流中的匹配元素以及元素的时间戳,并产生一个结果。

这个接口通常与 IntervalJoin API一起使用,以在满足特定条件的情况下将两个流进行连接。这个条件通常是时间条件,即只有当两个元素的时间戳在指定的时间间隔内时,才会将它们进行连接。

leftKeyedStream
  // intervalJoin目前只支持Event Time
  .intervalJoin(rightKeyedStream)
  // 时间间隔,设定下界和上界
  .between(Time.minutes(-10),Time.seconds(0))
  // 不包含下界
  //.lowerBoundExclusive()
  // 不包含上界
  //.upperBoundExclusive()
  // 自定义ProcessJoinFunction 处理Join到的元素
  .process(ProcessJoinFunction)
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object TestProcessJoinFunction extends App {

  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
  val stream2: DataStream[String] = env.socketTextStream("localhost",8888)

  private val stream1Obj: DataStream[Obj1] = stream1
    .map(data => {
      val arr = data.split(",")
      Obj1(arr(0), arr(1), arr(2).toLong)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Obj1](Time.seconds(3)) {
    override def extractTimestamp(element: Obj1) = element.time * 1000
  })

  val stream1Obj2: DataStream[Obj1] = stream2.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toLong)
  }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Obj1](Time.seconds(3)) {
    override def extractTimestamp(element: Obj1) = element.time * 1000
  })

  private val value: KeyedStream[Obj1, String]#IntervalJoined[Obj1, Obj1, String] = stream1Obj
    .keyBy(_.name)
    // 指定时间区间 join 数据
    .intervalJoin(stream1Obj2.keyBy(_.name))
    // 设置时间范围 从 EventTime前10分钟,到EventTime 时间
    .between(Time.minutes(-10), Time.seconds(0))

  value.process(new CustomProcessJoinFunction).print("TestProcessJoinFunction")

  env.execute()
}

class CustomProcessJoinFunction extends ProcessJoinFunction[Obj1,Obj1,(String,Obj1,Obj1)]{
  override def processElement
  (obj: Obj1,
   obj2: Obj1,
   ctx: ProcessJoinFunction[Obj1, Obj1, (String, Obj1, Obj1)]#Context,
   out: Collector[(String, Obj1, Obj1)]): Unit = {
      out.collect((obj.name,obj,obj2))
  }
}

6. BroadcastProcessFunction

BroadcastProcessFunction 是一个特殊的 CoProcessFunction,用于处理被广播的数据流。这种处理函数允许开发者创建一个或多个数据流,其中一个或多个可以被标记为广播流,并将它们的数据广播到所有并行实例。

import com.hnbian.flink.common.{Class,Student}
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TestBroadcastProcessFunction extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
  val stream2: DataStream[String] = env.socketTextStream("localhost",8888)


  private val StudentStream: DataStream[Student] = stream1
    .map(data => {
      val arr = data.split(",")
      Student(arr(0), arr(1), arr(2))
    })

  val descriptor = new MapStateDescriptor[String,  String]("classInfo", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)

  val ClassStream: DataStream[Class] = stream2.map(data => {
    val arr = data.split(",")
    Class(arr(0), arr(1))
  })
  val ClassBradoStream: BroadcastStream[Class] = ClassStream.broadcast(descriptor)

  StudentStream
    .connect(ClassBradoStream)
    .process(new CustomBroadcastProcessFunction)
    .print("TestBroadcastProcessFunction")

  env.execute()
}

/**
  * 参数
  * 未广播数据类型
  * 广播数据类型
  * 输出数据类型
  */
class CustomBroadcastProcessFunction extends BroadcastProcessFunction[Student,Class,String]{
  override def processElement(value: Student, ctx: BroadcastProcessFunction[Student, Class, String]#ReadOnlyContext, out: Collector[String]): Unit = {

    val classInfo = ctx.getBroadcastState(TestBroadcastProcessFunction.descriptor)

    val className: String = classInfo.get(value.classId)

    out.collect(s"stuId:${value.id}  stuName:${value.name} stuClassName:${className}")
  }

  override def processBroadcastElement(value: Class, ctx: BroadcastProcessFunction[Student, Class, String]#Context, out: Collector[String]): Unit = {

    val classInfo = ctx.getBroadcastState(TestBroadcastProcessFunction.descriptor)
    println("更新状态")
    classInfo.put(value.id,value.name)
  }
}

7. KeyedBroadcastProcessFunction

KeyedBroadcastProcessFunction 是用于处理带键的数据流和被广播的数据流的处理函数。KeyedBroadcastProcessFunction 继承自 BroadcastProcessFunction,并添加了对键控流(即带键的数据流)的处理。

import com.hnbian.flink.common.{Class, Student}
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object TestKeyedBroadcastProcessFunction extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val stream1: DataStream[String] = env.socketTextStream("localhost",9999)
  val stream2: DataStream[String] = env.socketTextStream("localhost",8888)


  private val StudentStream: DataStream[Student] = stream1
    .map(data => {
      val arr = data.split(",")
      Student(arr(0), arr(1), arr(2))
    })

  val descriptor = new MapStateDescriptor[String,  String]("classInfo", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)

  val ClassStream: DataStream[Class] = stream2.map(data => {
    val arr = data.split(",")
    Class(arr(0), arr(1))
  })
  val ClassBradoStream: BroadcastStream[Class] = ClassStream.broadcast(descriptor)

  StudentStream.keyBy(_.classId)
    .connect(ClassBradoStream)
    .process(new CustomKeyedBroadcastProcessFunction)
    .print("TestBroadcastProcessFunction")

  env.execute()
}

/**
  * key 类型
  * 未广播数据类型
  * 广播数据类型
  * 输出数据类型
  */
class CustomKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction[String,Student,Class,String]{
  override def processElement(value: Student, ctx: KeyedBroadcastProcessFunction[String, Student, Class, String]#ReadOnlyContext, out: Collector[String]): Unit = {

    println(s"processElement.key = ${ctx.getCurrentKey}")

    val classInfo = ctx.getBroadcastState(TestKeyedBroadcastProcessFunction.descriptor)

    val className: String = classInfo.get(value.classId)

    out.collect(s"stuId:${value.id}  stuName:${value.name} stuClassName:${className}")
  }

  override def processBroadcastElement(value: Class, ctx: KeyedBroadcastProcessFunction[String, Student, Class, String]#Context, out: Collector[String]): Unit = {
    val classInfo = ctx.getBroadcastState(TestKeyedBroadcastProcessFunction.descriptor)
    println("更新状态")
    classInfo.put(value.id,value.name)
  }
}

8. ProcessWindowFunction

ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性,但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。

import java.text.SimpleDateFormat
import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object TestProcessWindowFunction extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toInt)
  })

  // 设置一个窗口时间是 5 秒的窗口
  stream2
    .keyBy(_.id)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .process(new CuntomProcessFunction)
    .print("TestWindowFunction")

  environment.execute()

}

/**
  *
  * IN –输入值的类型。
  * OUT –输出值的类型。
  * KEY –密钥的类型。
  * W –窗口的类型
  */
class CuntomProcessFunction extends ProcessWindowFunction[Obj1, String, String, TimeWindow] {
  override def process(key: String, context: Context, elements: Iterable[Obj1], out: Collector[String]): Unit = {
    var count = 0
    val sdf = new SimpleDateFormat("HH:mm:ss")

    println(
      s"""
         |window key:${key},
         |开始时间:${sdf.format(context.window.getStart)},
         |结束时间:${sdf.format(context.window.getEnd)},
         |maxTime:${sdf.format(context.window.maxTimestamp())}
         |""".stripMargin)

    // 遍历,获得窗口所有数据
    for (obj <- elements) {
      println(obj.toString)
      count += 1
    }
    out.collect(s"Window ${context.window} , count : ${count}")
  }
}

9. ProcessAllWindowFunction

与 ProcessWindowFunction 功能类似,不过作用在 AllWindowedStream 之上。

import com.hnbian.flink.common.Obj1
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ArrayBuffer

object TestProcessWindowAllFunction extends App {
  val environment:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  val stream1: DataStream[String] = environment.socketTextStream("localhost",9999)

  val stream2: DataStream[Obj1] = stream1.map(data => {
    val arr = data.split(",")
    Obj1(arr(0), arr(1), arr(2).toInt)
  })

  // 设置一个窗口时间是 5 秒的窗口
  private val allValueStream: AllWindowedStream[Obj1, TimeWindow] = stream2
    .keyBy(_.id)
    .timeWindowAll(Time.seconds(5))

  allValueStream
    .process(new CustomProcessAllWindowFunction)
    .print("TestProcessWindowAllFunction")

  environment.execute()
}


/**
  * 定义一个ProcessAllWindowFunction
  * 把窗口内所有用户名 拼接成字符串 用 "," 分隔
  * 输入类型 obj1
  * 输出类型  元组(Long,String)
  * 窗口类型 TimeWindow
  */
class CustomProcessAllWindowFunction extends ProcessAllWindowFunction[Obj1, (Long,String), TimeWindow]{
  override def process(context: Context, elements: Iterable[Obj1], out: Collector[(Long, String)]): Unit = {

    println(s"start:${context.window.getStart}")
    println(s"end:${context.window.getEnd}")
    println(s"maxTimestamp:${context.window.maxTimestamp()}")
    val key =  context.window.getStart
    val value = new ArrayBuffer[String]()

    for (obj1<- elements){
      value.append(obj1.name)
    }
    // 把窗口内所有用户名 拼接成字符串 用 "," 分隔
    out.collect((key,value.mkString(",")))
  }
}

10. SideOutput(侧输出流)

大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。processfunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象, X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。


import com.hnbian.flink.common.Student
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
  * @Author haonan.bian
  * @Description //TODO
  * @Date 2021/1/21 11:38 
  **/
object TestSideOutput extends App {
  // 创建执行环境
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val stream1: DataStream[String] = env.socketTextStream("localhost", 9999)

  private val stream2: DataStream[Student] = stream1
    .map(data => {
      val arr = data.split(",")
      Student(arr(0), arr(1), arr(2))
    })


  val StudentStream: DataStream[Student] = stream2.process(new ProcessFunction[Student,Student] {
    override def processElement(value: Student, ctx: ProcessFunction[Student, Student]#Context, out: Collector[Student]) = {
      lazy val outputTag: OutputTag[Student] = new OutputTag[Student]("class2")
      if (value.classId == "2") {
        ctx.output(outputTag, value)
      }else{
        // 所有数据直接常规输出到主流
        out.collect(value)
      }
    }
  })

  StudentStream
    .getSideOutput(new OutputTag[Student]("class2"))
    .print("outPutTag")

  StudentStream.print("StudentStream")

  env.execute()
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Flink系列 12. 介绍Flink中 Timer 的使用 Flink系列 12. 介绍Flink中 Timer 的使用
1. Timer 介绍 Timer(定时器)是 Flink 提供的用于 Processing Time 或 Event Time 变化的机制。 Timer 是 Flink 内部的定时器,与 key 和 timestamp 相关,相同的 k
2020-08-29
下一篇 
java.lang.NumberFormatException  Not a version 9 异常处理 java.lang.NumberFormatException Not a version 9 异常处理
1. 异常说明今天写了一段 Flink 窗口 function 的代码运行的时候抛出一个 NumberFormatException: Not a version: 9 的异常,异常信息如下: Exception in thread "m
2020-08-25
  目录