Spark ML 6.特征转换 2


1. 离散余弦变换

1.1 算法介绍

类别:transformer【转换器】

  • 离散余弦变(DCT, Discrete Cosine Transform )换是与傅里叶变换相关的一种变换, 它类似于离散傅里叶变换, 但是只使用实数。

  • 离散余弦变换相当于一个长度大概是它两倍的离散傅里叶变换,这个离散傅里叶变换是对一个实偶数进行的(因为一个实偶数的傅里叶变换扔是一个实偶数)。

  • 离散余弦变换经常被用于信号处理和图像处理方面。例如: 用于对信号和图像(包括静止图像和运动图像)进行有损数据压缩

1.2 代码示例


package hnbian.spark.ml.feature.transforming

import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors


/**
  * @author hnbian
  *         @ Description 离散余弦变换
  *         @ Date 2018/12/28 9:26
  **/
object DCT extends App {
  val spark = SparkUtils.getSparkSession("DCT", 4)

  val data = Seq(
    Vectors.dense(0.0, 1.0, -2.0, 3.0),
    Vectors.dense(-1.0, 2.0, 4.0, -7.0),
    Vectors.dense(14.0, -2.0, -5.0, 1.0))
  //定义数据集
  val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
  //打印定义的数据集
  df.show(false)
  /**
    * +--------------------+
    * |features            |
    * +--------------------+
    * |[0.0,1.0,-2.0,3.0]  |
    * |[-1.0,2.0,4.0,-7.0] |
    * |[14.0,-2.0,-5.0,1.0]|
    * +--------------------+
    */
  val dct = new DCT()
    .setInputCol("features")
    .setOutputCol("featuresDCT")
    .setInverse(false)

  val dctDF = dct.transform(df)
  dctDF.show(false)
  /**
    * +--------------------+----------------------------------------------------------------+
    * |features            |featuresDCT                                                     |
    * +--------------------+----------------------------------------------------------------+
    * |[0.0,1.0,-2.0,3.0]  |[1.0,-1.1480502970952693,2.0000000000000004,-2.7716385975338604]|
    * |[-1.0,2.0,4.0,-7.0] |[-1.0,3.378492794482933,-7.000000000000001,2.9301512653149677]  |
    * |[14.0,-2.0,-5.0,1.0]|[4.0,9.304453421915744,11.000000000000002,1.5579302036357163]   |
    * +--------------------+----------------------------------------------------------------+
    */

}

2. 字符串-索引变换

2.1 算法介绍

类别:estimator【评估器】

  • 字符串-索引变换(String-Indexer) 转换器可以把一列类别型的特征(或标签)进行编码,使其数值化索引的范围是[0,numLables],按照标签出现频率排序, 所以出现最频繁的标签其指标为0。

  • 该过程可以使得相应的特征索引化,使得某些无法接受类别型特征的算法可以使用,并提高诸如决策树等机器学习算法的效率。

  • 如果输入的是数值型的,我们会把它转化成字符型,然后再对其进行编码,如果下游的管道节点需要使用字符串-索引编码, 则必须输入和转换为字符串- 指标列名

假设我们有DataFeame数据含有id和category两列:

id category
0 a
1 b
2 c
3 a
4 a
5 c

category是有三种取值的字符串, 使用StringIndexer进行转换后我们可以得到如下输出

id category cateforyIndex
0 a 0.0
1 b 2.0
2 c 1.0
3 a 0.0
4 a 0.0
5 c 1.0

另外, 如果在转换新数据时出现了再训练中未出现的标签,StringIndexer将会报错(默认)或者跳过从未出现的标签实例。

2.2 代码示例

package hnbian.spark.ml.feature.transforming

import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.{StringIndexer, StringIndexerModel}

/**
  * @author hnbian
  *         @ Description 
  *         @ Date 2018/12/28 9:59
  **/
object StringIndexer extends App {
  val spark = SparkUtils.getSparkSession("StringIndexer", 4)
  //定义数据集
  val df1 = spark.createDataFrame(Seq(
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (5, "c"))).toDF("id", "category")
  //查看刚刚定义的数据集
  df1.show(false)
  /**
    * +---+--------+
    * |id |category|
    * +---+--------+
    * |0  |a       |
    * |1  |b       |
    * |2  |c       |
    * |3  |a       |
    * |4  |a       |
    * |5  |c       |
    * +---+--------+
    */
  //创建一个StringIndexer对象
  val indexer = new StringIndexer()
    .setInputCol("category") //设定输入列名
    .setOutputCol("categoryIndex") //设定输出列名

  //对这个DataFrame进行训练,产生StringIndexerModel对象(转换器)
  val stringIndexerModel = indexer.fit(df1)
  //使用训练出来的模型对数据集进行转换
  val indexed = stringIndexerModel.transform(df1)
  //查看转换后的数据集
  indexed.show(false)
  /**
    * +---+--------+-------------+
    * |id |category|categoryIndex|
    * +---+--------+-------------+
    * |0  |a       |0.0          |
    * |1  |b       |2.0          |
    * |2  |c       |1.0          |
    * |3  |a       |0.0          |
    * |4  |a       |0.0          |
    * |5  |c       |1.0          |
    * +---+--------+-------------+
    */

  /**
    * 从上看转换后的数据集我们可以看到,
    * StringIndexerModel依次按照出现频率的高低,把字符标签进行了排序,
    * 即出现最多的“a”(3次)被编号成0,“c”(2次)为1,出现最少的“b”(1次)为0。
    */

  /**
    * 如果我们使用StringIndexerModel去转换一个模型内没有出现过的包含“d”标签的DataFrame 会有什么效果?
    * 实际上,如果直接转换的话,Spark会抛出异常,报出“Unseen label: d”的错误。
    * 为了处理这种情况,在模型训练后,可以通过设置setHandleInvalid("skip")来忽略掉那些未出现的标签,
    * 这样,带有未出现标签的行将直接被过滤掉,所下所示:
    */

  //构建一个包含“d”标签的数据集
  val df2 = spark.createDataFrame(Seq(
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (4, "a"),
    (5, "d"))).toDF("id", "category")

  df2.show(false)
  /**
    * +---+--------+
    * |id |category|
    * +---+--------+
    * |0  |a       |
    * |1  |b       |
    * |2  |c       |
    * |3  |a       |
    * |4  |a       |
    * |5  |d       |
    * +---+--------+
    */
    //使用模型转换包含“d”标签的数据集
  val indexed2 = stringIndexerModel.transform(df2)
  //展示转换后的数据集
  //indexed2.show(false)
  //报错:Caused by: org.apache.spark.SparkException: Unseen label: d.  To handle unseen labels, set Param handleInvalid to keep.

  //需要先跳过之前模型训练时不包含的标签再进行转换
  val indexed3 = stringIndexerModel.setHandleInvalid("skip").transform(df2)
  indexed3.show(false)
  /**
    * +---+--------+-------------+
    * |id |category|categoryIndex|
    * +---+--------+-------------+
    * |0  |a       |0.0          |
    * |1  |b       |2.0          |
    * |2  |c       |1.0          |
    * |3  |a       |0.0          |
    * |4  |a       |0.0          |
    * +---+--------+-------------+
    */
  /**
    * 我们可以看到 在这次转换中c的次数已经跟b是相等的都只有一次但是c的索引值仍然是1,因为在模型训练时已经对c定义索引为1。
    */
}

3. 索引–字符串变换

3.1 算法说明

类别:estimator【评估器】

  • 与StringIndexer相对应,IndexToString的作用是把标签索引的一列重新映射回原有的字符型标签。

  • IndexToString 主要使用场景一般都是和StringIndexer配合,先用StringIndexer将标签转化成标签索引,进行模型训练,然后在预测标签的时候再把标签索引转化成原有的字符标签。当然,你也可以另外定义其他的标签。

    • 首先和StringIndexer的实验相同,我们用StringIndexer读取数据集中的“category”列,把字符型标签转化成标签索引,
    • 然后输出到“categoryIndex”列上,构建出新的DataFrame。

3.2 代码示例

package hnbian.spark.ml.feature.transforming

import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}


/**
  * @author hnbian
  *         @ Description 
  *         @ Date 2018/12/28 10:49
  **/
object IndexToString extends App {

  val spark = SparkUtils.getSparkSession("IndexToString", 4)
  //定义数据集
  val df = spark.createDataFrame(Seq(
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
  )).toDF("id", "category")
  //展示数据集
  df.show(false)
  /**
    * +---+--------+
    * |id |category|
    * +---+--------+
    * |0  |a       |
    * |1  |b       |
    * |2  |c       |
    * |3  |a       |
    * |4  |a       |
    * |5  |c       |
    * +---+--------+
    */
  //创建StringIndexer 评估器
  val stringIndexer = new StringIndexer()
    .setInputCol("category") //设置输入列
    .setOutputCol("categoryIndex")
  //设置输出列
  //使用刚刚定义的StringIndexer评估器训练数据获得模型
  val indexer = stringIndexer.fit(df)
  //使用模型转换数据
  val df2 = indexer.transform(df)
  df2.show(false)
  /**
    * +---+--------+-------------+
    * |id |category|categoryIndex|
    * +---+--------+-------------+
    * |0  |a       |0.0          |
    * |1  |b       |2.0          |
    * |2  |c       |1.0          |
    * |3  |a       |0.0          |
    * |4  |a       |0.0          |
    * |5  |c       |1.0          |
    * +---+--------+-------------+
    */
  //定义一个IndexToString 转换器
  val converter = new IndexToString()
    .setInputCol("categoryIndex") //设置读取“categoryIndex”上的标签索引
    .setOutputCol("originalCategory") //设置输出到“originalCategory”列
  //转换数据集字符型标签,然后再输出到“originalCategory”列上
  val converted = converter.transform(df2)
  //通过输出“originalCategory”列,可以看到数据集中原有的字符标签
  converted.show(false)
  /**
    * +---+--------+-------------+----------------+
    * |id |category|categoryIndex|originalCategory|
    * +---+--------+-------------+----------------+
    * |0  |a       |0.0          |a               |
    * |1  |b       |2.0          |b               |
    * |2  |c       |1.0          |c               |
    * |3  |a       |0.0          |a               |
    * |4  |a       |0.0          |a               |
    * |5  |c       |1.0          |c               |
    * +---+--------+-------------+----------------+
    */
}

4. 独热编码

4.1 算法说明

类别:estimator【评估器】

  • 将在Spark 3.0 版本删除此算法

  • 独热编码(One-Hot Encoding) 是指把一列类别性特征(或称名词性特征,nominal/categorical features)映射成一系列的二元连续特征的过程,

  • 原有的类别性特征有几种可能取值,这一特征就会被映射成几个二元连续特征,

  • 每一个特征代表一种取值,若该样本表现出该特征,则取1,否则取0。

  • One-Hot编码适合一些期望类别特征为连续特征的算法,比如说逻辑回归等。

4.2 代码示例

package hnbian.spark.ml.feature.transforming

import org.apache.spark.ml.feature.{OneHotEncoder, OneHotEncoderEstimator, StringIndexer}

/**
  * @author hnbian
  *         @ Description 特征转换独热编码示例
  *         @ Date 2018/12/28 11:12
  **/
object OneHotEncoderEstimator extends App {

  import hnbian.spark.SparkUtils

  val spark = SparkUtils.getSparkSession("OneHotEncoder", 4)

  // 首先创建一个DataFrame,其包含一列类别性特征,需要注意的是,
  // 在使用OneHotEncoder进行转换前,DataFrame需要先使用StringIndexer将原始标签数值化:
  val df = spark.createDataFrame(Seq(
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c"),
    (6, "d"),
    (7, "d"),
    (8, "d"),
    (9, "d"),
    (10, "e"),
    (11, "e"),
    (12, "e"),
    (13, "e"),
    (14, "e")
  )).toDF("id", "category")
  df.show()
  /**
    * +---+--------+
    * | id|category|
    * +---+--------+
    * |  0|       a|
    * |  1|       b|
    * |  2|       c|
    * |  3|       a|
    * |  4|       a|
    * |  5|       c|
    * |  6|       d|
    * |  7|       d|
    * |  8|       d|
    * |  9|       d|
    * | 10|       e|
    * | 11|       e|
    * | 12|       e|
    * | 13|       e|
    * | 14|       e|
    * +---+--------+
    */
  //定义StringIndexer 评估器并训练出模型
  val indexerModel = new StringIndexer().
    setInputCol("category").
    setOutputCol("categoryIndex").
    fit(df)

  //对数据集进行转换
  val indexed = indexerModel.transform(df)
  //查看转换后的数据
  indexed.show(false)
  /**
    * a 3次,b 1次,c 2次,d 4次,e 5次
    * e 5次、d 4次、a 3次、c 2次、b 1次
    * +---+--------+-------------+
    * |id |category|categoryIndex|
    * +---+--------+-------------+
    * |0  |a       |2.0          |
    * |1  |b       |4.0          |
    * |2  |c       |3.0          |
    * |3  |a       |2.0          |
    * |4  |a       |2.0          |
    * |5  |c       |3.0          |
    * |6  |d       |1.0          |
    * |7  |d       |1.0          |
    * |8  |d       |1.0          |
    * |9  |d       |1.0          |
    * |10 |e       |0.0          |
    * |11 |e       |0.0          |
    * |12 |e       |0.0          |
    * |13 |e       |0.0          |
    * |14 |e       |0.0          |
    * +---+--------+-------------+
    */

  /**
    * 创建OneHotEncoder对象对处理后的DataFrame进行编码,
    * 可以看见,编码后的二进制特征呈稀疏向量形式,
    * 与StringIndexer编码的顺序相同,需注意的是最后一个Category(”b”)被编码为全0向量,
    * 若希望”b”也占有一个二进制特征,则可在创建OneHotEncoder时指定setDropLast(false)。
    */
  //创建一个OneHotEncoderEstimator 评估器
  val encoder = new OneHotEncoderEstimator()
    .setInputCols(Array("categoryIndex"))
    .setOutputCols(Array("categoryVec"))
    //.setDropLast(false) //默认为true 是否删除最后的类别,如共有五个类别输入,删除最后一个则剩下四个类别

  //使用评估器训练模型
  val encoderModel =  encoder.fit(indexed)
  //转换数据集并查看转换后的结果集
  encoderModel.transform(indexed).show(false)
  /**
    * a 3次,b 1次,c 2次,d 4次,e 5次
    * e 5次、d 4次、a 3次、c 2次、b 1次
    * +---+--------+-------------+-------------+
    * |id |category|categoryIndex|categoryVec  |
    * +---+--------+-------------+-------------+
    * |0  |a       |2.0          |(4,[2],[1.0])|
    * |1  |b       |4.0          |(4,[],[])    |
    * |2  |c       |3.0          |(4,[3],[1.0])|
    * |3  |a       |2.0          |(4,[2],[1.0])|
    * |4  |a       |2.0          |(4,[2],[1.0])|
    * |5  |c       |3.0          |(4,[3],[1.0])|
    * |6  |d       |1.0          |(4,[1],[1.0])|
    * |7  |d       |1.0          |(4,[1],[1.0])|
    * |8  |d       |1.0          |(4,[1],[1.0])|
    * |9  |d       |1.0          |(4,[1],[1.0])|
    * |10 |e       |0.0          |(4,[0],[1.0])|
    * |11 |e       |0.0          |(4,[0],[1.0])|
    * |12 |e       |0.0          |(4,[0],[1.0])|
    * |13 |e       |0.0          |(4,[0],[1.0])|
    * |14 |e       |0.0          |(4,[0],[1.0])|
    * +---+--------+-------------+-------------+
    */

  //.setDropLast(false) 设置为false 时打印数据
  /**
    * +---+--------+-------------+-------------+
    * |id |category|categoryIndex|categoryVec  |
    * +---+--------+-------------+-------------+
    * |0  |a       |2.0          |(5,[2],[1.0])|
    * |1  |b       |4.0          |(5,[4],[1.0])|
    * |2  |c       |3.0          |(5,[3],[1.0])|
    * |3  |a       |2.0          |(5,[2],[1.0])|
    * |4  |a       |2.0          |(5,[2],[1.0])|
    * |5  |c       |3.0          |(5,[3],[1.0])|
    * |6  |d       |1.0          |(5,[1],[1.0])|
    * |7  |d       |1.0          |(5,[1],[1.0])|
    * |8  |d       |1.0          |(5,[1],[1.0])|
    * |9  |d       |1.0          |(5,[1],[1.0])|
    * |10 |e       |0.0          |(5,[0],[1.0])|
    * |11 |e       |0.0          |(5,[0],[1.0])|
    * |12 |e       |0.0          |(5,[0],[1.0])|
    * |13 |e       |0.0          |(5,[0],[1.0])|
    * |14 |e       |0.0          |(5,[0],[1.0])|
    * +---+--------+-------------+-------------+
    */
}

5. 向量-索引变换

5.1 算法说明

类别:estimator【评估器】

  • 之前介绍的StringIndexer是针对单个类别型特征进行转换,

  • 倘若所有特征都已经被组织在一个向量中,又想对其中某些单个分量进行处理时,Spark ML提供了向量-索引变换(VectorIndexer)类来解决向量数据集中的类别性特征转换。

  • 通过为其提供maxCategories超参数,它可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引。它基于不同特征值的数量来识别哪些特征需要被类别化,那些取值可能性最多不超过maxCategories的特征需要会被认为是类别型的。

  • 它的处理流程如下:

  1. 获得一个向量类型的输入以及maxCategories参数。

  2. 基于原始数值识别哪些特征需要被类别化, 其中最多maxCategories需要被类别化。

  3. 对于每一个类别特征计算0-based类别指标。

  4. 对类别特征进行索引然后将原始值转换为指标。

  • 索引后的类别特征可以帮助决策树等算法处理类别特征, 并得到较好的结果。

  • 在下面的例子中, 我们读入一个数据集, 然后使用VectorIndexer来决定哪些特征需要被作为非数值类型处理, 将非数值型特征转换为他们的索引。

5.2 代码示例

package hnbian.spark.ml.feature.transforming

import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.linalg.{Vector, Vectors}

/**
  * @author hnbian
  *         @ Description
  *         @ Date 2018/12/28 12:09
  **/
object VectorIndexer extends App {
  val spark = SparkUtils.getSparkSession("VectorIndexer", 4)

  val data = Seq(
    Vectors.dense(-1.0, 1.0, 1.0),
    Vectors.dense(-1.0, 3.0, 1.0),
    Vectors.dense(0.0, 5.0, 1.0))
  //定义数据集
  val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
  //展示数据集
  df.show()
  /**
    * +--------------+
    * |      features|
    * +--------------+
    * |[-1.0,1.0,1.0]|
    * |[-1.0,3.0,1.0]|
    * | [0.0,5.0,1.0]|
    * +--------------+
    */
  //定义 VectorIndexer 评估器

  val indexer = new VectorIndexer().
    setInputCol("features").
    setOutputCol("indexed").
    setMaxCategories(2) //设置最大种类别为2,即每列种的元素类型不能大于2种,比如中间列为1,3,5 为3种

  //使用VectorIndexer训练出模型,来决定哪些特征需要被作为类别特征
  val indexerModel = indexer.fit(df)
  /**
    * 即只有种类小于2的特征才被认为是类别型特征,否则被认为是连续型特征
    * 可以通过VectorIndexerModel的categoryMaps成员来获得被转换的特征及其映射,
    * 这里可以看到共有两个特征被转换,分别是下标为0和2的元素。
    * 可以看到,0号特征只有-1,0两种取值,分别被映射成0,1,而2号特征只有1种取值,被映射成0。
    */
  indexerModel.transform(df).show()
  /**
    * +--------------+-------------+
    * |      features|      indexed|
    * +--------------+-------------+
    * |[-1.0,1.0,1.0]|[1.0,1.0,0.0]|
    * |[-1.0,3.0,1.0]|[1.0,3.0,0.0]|
    * | [0.0,5.0,1.0]|[0.0,5.0,0.0]|
    * +--------------+-------------+
    */
  val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
  println(s"Chose ${categoricalFeatures.size} categorical features: " + categoricalFeatures.mkString(", "))
  //Chose 2 categorical features: 0, 2

}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Spark ML 7.特征转换 3 Spark ML 7.特征转换 3
1. 正则化1.1 算法说明类别:transformer【转换器】 正则化(Normalizer)是一个转换器, 它可以将多行向量输入转化为统一的形式。 参数为p (默认值:2) 来指定正则化中使用的p-norm。 正则化操作可以使输入数据
2019-01-03
下一篇 
Spark ML 5.特征转换 1 Spark ML 5.特征转换 1
1. 分词器1.1 算法介绍 类别:transformer【转换器】 Tokenizer Tokenization 将文本划分为单词。下面例子将展示如何把句子划分为单词。 RegexTokenizer基于正则表达式提供了更多的划分选项。默
2018-12-29
  目录