1. 离散余弦变换
1.1 算法介绍
类别:transformer【转换器】
离散余弦变(DCT, Discrete Cosine Transform )换是与傅里叶变换相关的一种变换, 它类似于离散傅里叶变换, 但是只使用实数。
离散余弦变换相当于一个长度大概是它两倍的离散傅里叶变换,这个离散傅里叶变换是对一个实偶数进行的(因为一个实偶数的傅里叶变换扔是一个实偶数)。
离散余弦变换经常被用于信号处理和图像处理方面。例如: 用于对信号和图像(包括静止图像和运动图像)进行有损数据压缩
1.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors
/**
* @author hnbian
* @ Description 离散余弦变换
* @ Date 2018/12/28 9:26
**/
object DCT extends App {
val spark = SparkUtils.getSparkSession("DCT", 4)
val data = Seq(
Vectors.dense(0.0, 1.0, -2.0, 3.0),
Vectors.dense(-1.0, 2.0, 4.0, -7.0),
Vectors.dense(14.0, -2.0, -5.0, 1.0))
//定义数据集
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
//打印定义的数据集
df.show(false)
/**
* +--------------------+
* |features |
* +--------------------+
* |[0.0,1.0,-2.0,3.0] |
* |[-1.0,2.0,4.0,-7.0] |
* |[14.0,-2.0,-5.0,1.0]|
* +--------------------+
*/
val dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false)
val dctDF = dct.transform(df)
dctDF.show(false)
/**
* +--------------------+----------------------------------------------------------------+
* |features |featuresDCT |
* +--------------------+----------------------------------------------------------------+
* |[0.0,1.0,-2.0,3.0] |[1.0,-1.1480502970952693,2.0000000000000004,-2.7716385975338604]|
* |[-1.0,2.0,4.0,-7.0] |[-1.0,3.378492794482933,-7.000000000000001,2.9301512653149677] |
* |[14.0,-2.0,-5.0,1.0]|[4.0,9.304453421915744,11.000000000000002,1.5579302036357163] |
* +--------------------+----------------------------------------------------------------+
*/
}
2. 字符串-索引变换
2.1 算法介绍
类别:estimator【评估器】
字符串-索引变换(String-Indexer) 转换器可以把一列类别型的特征(或标签)进行编码,使其数值化索引的范围是[0,numLables],按照标签出现频率排序, 所以出现最频繁的标签其指标为0。
该过程可以使得相应的特征索引化,使得某些无法接受类别型特征的算法可以使用,并提高诸如决策树等机器学习算法的效率。
如果输入的是数值型的,我们会把它转化成字符型,然后再对其进行编码,如果下游的管道节点需要使用字符串-索引编码, 则必须输入和转换为字符串- 指标列名
假设我们有DataFeame数据含有id和category两列:
id | category |
---|---|
0 | a |
1 | b |
2 | c |
3 | a |
4 | a |
5 | c |
category是有三种取值的字符串, 使用StringIndexer进行转换后我们可以得到如下输出
id | category | cateforyIndex |
---|---|---|
0 | a | 0.0 |
1 | b | 2.0 |
2 | c | 1.0 |
3 | a | 0.0 |
4 | a | 0.0 |
5 | c | 1.0 |
另外, 如果在转换新数据时出现了再训练中未出现的标签,StringIndexer将会报错(默认)或者跳过从未出现的标签实例。
2.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.{StringIndexer, StringIndexerModel}
/**
* @author hnbian
* @ Description
* @ Date 2018/12/28 9:59
**/
object StringIndexer extends App {
val spark = SparkUtils.getSparkSession("StringIndexer", 4)
//定义数据集
val df1 = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(5, "c"))).toDF("id", "category")
//查看刚刚定义的数据集
df1.show(false)
/**
* +---+--------+
* |id |category|
* +---+--------+
* |0 |a |
* |1 |b |
* |2 |c |
* |3 |a |
* |4 |a |
* |5 |c |
* +---+--------+
*/
//创建一个StringIndexer对象
val indexer = new StringIndexer()
.setInputCol("category") //设定输入列名
.setOutputCol("categoryIndex") //设定输出列名
//对这个DataFrame进行训练,产生StringIndexerModel对象(转换器)
val stringIndexerModel = indexer.fit(df1)
//使用训练出来的模型对数据集进行转换
val indexed = stringIndexerModel.transform(df1)
//查看转换后的数据集
indexed.show(false)
/**
* +---+--------+-------------+
* |id |category|categoryIndex|
* +---+--------+-------------+
* |0 |a |0.0 |
* |1 |b |2.0 |
* |2 |c |1.0 |
* |3 |a |0.0 |
* |4 |a |0.0 |
* |5 |c |1.0 |
* +---+--------+-------------+
*/
/**
* 从上看转换后的数据集我们可以看到,
* StringIndexerModel依次按照出现频率的高低,把字符标签进行了排序,
* 即出现最多的“a”(3次)被编号成0,“c”(2次)为1,出现最少的“b”(1次)为0。
*/
/**
* 如果我们使用StringIndexerModel去转换一个模型内没有出现过的包含“d”标签的DataFrame 会有什么效果?
* 实际上,如果直接转换的话,Spark会抛出异常,报出“Unseen label: d”的错误。
* 为了处理这种情况,在模型训练后,可以通过设置setHandleInvalid("skip")来忽略掉那些未出现的标签,
* 这样,带有未出现标签的行将直接被过滤掉,所下所示:
*/
//构建一个包含“d”标签的数据集
val df2 = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(4, "a"),
(5, "d"))).toDF("id", "category")
df2.show(false)
/**
* +---+--------+
* |id |category|
* +---+--------+
* |0 |a |
* |1 |b |
* |2 |c |
* |3 |a |
* |4 |a |
* |5 |d |
* +---+--------+
*/
//使用模型转换包含“d”标签的数据集
val indexed2 = stringIndexerModel.transform(df2)
//展示转换后的数据集
//indexed2.show(false)
//报错:Caused by: org.apache.spark.SparkException: Unseen label: d. To handle unseen labels, set Param handleInvalid to keep.
//需要先跳过之前模型训练时不包含的标签再进行转换
val indexed3 = stringIndexerModel.setHandleInvalid("skip").transform(df2)
indexed3.show(false)
/**
* +---+--------+-------------+
* |id |category|categoryIndex|
* +---+--------+-------------+
* |0 |a |0.0 |
* |1 |b |2.0 |
* |2 |c |1.0 |
* |3 |a |0.0 |
* |4 |a |0.0 |
* +---+--------+-------------+
*/
/**
* 我们可以看到 在这次转换中c的次数已经跟b是相等的都只有一次但是c的索引值仍然是1,因为在模型训练时已经对c定义索引为1。
*/
}
3. 索引–字符串变换
3.1 算法说明
类别:estimator【评估器】
与StringIndexer相对应,IndexToString的作用是把标签索引的一列重新映射回原有的字符型标签。
IndexToString 主要使用场景一般都是和StringIndexer配合,先用StringIndexer将标签转化成标签索引,进行模型训练,然后在预测标签的时候再把标签索引转化成原有的字符标签。当然,你也可以另外定义其他的标签。
- 首先和StringIndexer的实验相同,我们用StringIndexer读取数据集中的“category”列,把字符型标签转化成标签索引,
- 然后输出到“categoryIndex”列上,构建出新的DataFrame。
3.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
/**
* @author hnbian
* @ Description
* @ Date 2018/12/28 10:49
**/
object IndexToString extends App {
val spark = SparkUtils.getSparkSession("IndexToString", 4)
//定义数据集
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
//展示数据集
df.show(false)
/**
* +---+--------+
* |id |category|
* +---+--------+
* |0 |a |
* |1 |b |
* |2 |c |
* |3 |a |
* |4 |a |
* |5 |c |
* +---+--------+
*/
//创建StringIndexer 评估器
val stringIndexer = new StringIndexer()
.setInputCol("category") //设置输入列
.setOutputCol("categoryIndex")
//设置输出列
//使用刚刚定义的StringIndexer评估器训练数据获得模型
val indexer = stringIndexer.fit(df)
//使用模型转换数据
val df2 = indexer.transform(df)
df2.show(false)
/**
* +---+--------+-------------+
* |id |category|categoryIndex|
* +---+--------+-------------+
* |0 |a |0.0 |
* |1 |b |2.0 |
* |2 |c |1.0 |
* |3 |a |0.0 |
* |4 |a |0.0 |
* |5 |c |1.0 |
* +---+--------+-------------+
*/
//定义一个IndexToString 转换器
val converter = new IndexToString()
.setInputCol("categoryIndex") //设置读取“categoryIndex”上的标签索引
.setOutputCol("originalCategory") //设置输出到“originalCategory”列
//转换数据集字符型标签,然后再输出到“originalCategory”列上
val converted = converter.transform(df2)
//通过输出“originalCategory”列,可以看到数据集中原有的字符标签
converted.show(false)
/**
* +---+--------+-------------+----------------+
* |id |category|categoryIndex|originalCategory|
* +---+--------+-------------+----------------+
* |0 |a |0.0 |a |
* |1 |b |2.0 |b |
* |2 |c |1.0 |c |
* |3 |a |0.0 |a |
* |4 |a |0.0 |a |
* |5 |c |1.0 |c |
* +---+--------+-------------+----------------+
*/
}
4. 独热编码
4.1 算法说明
类别:estimator【评估器】
将在Spark 3.0 版本删除此算法
独热编码(One-Hot Encoding) 是指把一列类别性特征(或称名词性特征,nominal/categorical features)映射成一系列的二元连续特征的过程,
原有的类别性特征有几种可能取值,这一特征就会被映射成几个二元连续特征,
每一个特征代表一种取值,若该样本表现出该特征,则取1,否则取0。
One-Hot编码适合一些期望类别特征为连续特征的算法,比如说逻辑回归等。
4.2 代码示例
package hnbian.spark.ml.feature.transforming
import org.apache.spark.ml.feature.{OneHotEncoder, OneHotEncoderEstimator, StringIndexer}
/**
* @author hnbian
* @ Description 特征转换独热编码示例
* @ Date 2018/12/28 11:12
**/
object OneHotEncoderEstimator extends App {
import hnbian.spark.SparkUtils
val spark = SparkUtils.getSparkSession("OneHotEncoder", 4)
// 首先创建一个DataFrame,其包含一列类别性特征,需要注意的是,
// 在使用OneHotEncoder进行转换前,DataFrame需要先使用StringIndexer将原始标签数值化:
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c"),
(6, "d"),
(7, "d"),
(8, "d"),
(9, "d"),
(10, "e"),
(11, "e"),
(12, "e"),
(13, "e"),
(14, "e")
)).toDF("id", "category")
df.show()
/**
* +---+--------+
* | id|category|
* +---+--------+
* | 0| a|
* | 1| b|
* | 2| c|
* | 3| a|
* | 4| a|
* | 5| c|
* | 6| d|
* | 7| d|
* | 8| d|
* | 9| d|
* | 10| e|
* | 11| e|
* | 12| e|
* | 13| e|
* | 14| e|
* +---+--------+
*/
//定义StringIndexer 评估器并训练出模型
val indexerModel = new StringIndexer().
setInputCol("category").
setOutputCol("categoryIndex").
fit(df)
//对数据集进行转换
val indexed = indexerModel.transform(df)
//查看转换后的数据
indexed.show(false)
/**
* a 3次,b 1次,c 2次,d 4次,e 5次
* e 5次、d 4次、a 3次、c 2次、b 1次
* +---+--------+-------------+
* |id |category|categoryIndex|
* +---+--------+-------------+
* |0 |a |2.0 |
* |1 |b |4.0 |
* |2 |c |3.0 |
* |3 |a |2.0 |
* |4 |a |2.0 |
* |5 |c |3.0 |
* |6 |d |1.0 |
* |7 |d |1.0 |
* |8 |d |1.0 |
* |9 |d |1.0 |
* |10 |e |0.0 |
* |11 |e |0.0 |
* |12 |e |0.0 |
* |13 |e |0.0 |
* |14 |e |0.0 |
* +---+--------+-------------+
*/
/**
* 创建OneHotEncoder对象对处理后的DataFrame进行编码,
* 可以看见,编码后的二进制特征呈稀疏向量形式,
* 与StringIndexer编码的顺序相同,需注意的是最后一个Category(”b”)被编码为全0向量,
* 若希望”b”也占有一个二进制特征,则可在创建OneHotEncoder时指定setDropLast(false)。
*/
//创建一个OneHotEncoderEstimator 评估器
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("categoryIndex"))
.setOutputCols(Array("categoryVec"))
//.setDropLast(false) //默认为true 是否删除最后的类别,如共有五个类别输入,删除最后一个则剩下四个类别
//使用评估器训练模型
val encoderModel = encoder.fit(indexed)
//转换数据集并查看转换后的结果集
encoderModel.transform(indexed).show(false)
/**
* a 3次,b 1次,c 2次,d 4次,e 5次
* e 5次、d 4次、a 3次、c 2次、b 1次
* +---+--------+-------------+-------------+
* |id |category|categoryIndex|categoryVec |
* +---+--------+-------------+-------------+
* |0 |a |2.0 |(4,[2],[1.0])|
* |1 |b |4.0 |(4,[],[]) |
* |2 |c |3.0 |(4,[3],[1.0])|
* |3 |a |2.0 |(4,[2],[1.0])|
* |4 |a |2.0 |(4,[2],[1.0])|
* |5 |c |3.0 |(4,[3],[1.0])|
* |6 |d |1.0 |(4,[1],[1.0])|
* |7 |d |1.0 |(4,[1],[1.0])|
* |8 |d |1.0 |(4,[1],[1.0])|
* |9 |d |1.0 |(4,[1],[1.0])|
* |10 |e |0.0 |(4,[0],[1.0])|
* |11 |e |0.0 |(4,[0],[1.0])|
* |12 |e |0.0 |(4,[0],[1.0])|
* |13 |e |0.0 |(4,[0],[1.0])|
* |14 |e |0.0 |(4,[0],[1.0])|
* +---+--------+-------------+-------------+
*/
//.setDropLast(false) 设置为false 时打印数据
/**
* +---+--------+-------------+-------------+
* |id |category|categoryIndex|categoryVec |
* +---+--------+-------------+-------------+
* |0 |a |2.0 |(5,[2],[1.0])|
* |1 |b |4.0 |(5,[4],[1.0])|
* |2 |c |3.0 |(5,[3],[1.0])|
* |3 |a |2.0 |(5,[2],[1.0])|
* |4 |a |2.0 |(5,[2],[1.0])|
* |5 |c |3.0 |(5,[3],[1.0])|
* |6 |d |1.0 |(5,[1],[1.0])|
* |7 |d |1.0 |(5,[1],[1.0])|
* |8 |d |1.0 |(5,[1],[1.0])|
* |9 |d |1.0 |(5,[1],[1.0])|
* |10 |e |0.0 |(5,[0],[1.0])|
* |11 |e |0.0 |(5,[0],[1.0])|
* |12 |e |0.0 |(5,[0],[1.0])|
* |13 |e |0.0 |(5,[0],[1.0])|
* |14 |e |0.0 |(5,[0],[1.0])|
* +---+--------+-------------+-------------+
*/
}
5. 向量-索引变换
5.1 算法说明
类别:estimator【评估器】
之前介绍的StringIndexer是针对单个类别型特征进行转换,
倘若所有特征都已经被组织在一个向量中,又想对其中某些单个分量进行处理时,Spark ML提供了向量-索引变换(VectorIndexer)类来解决向量数据集中的类别性特征转换。
通过为其提供maxCategories超参数,它可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引。它基于不同特征值的数量来识别哪些特征需要被类别化,那些取值可能性最多不超过maxCategories的特征需要会被认为是类别型的。
它的处理流程如下:
获得一个向量类型的输入以及maxCategories参数。
基于原始数值识别哪些特征需要被类别化, 其中最多maxCategories需要被类别化。
对于每一个类别特征计算0-based类别指标。
对类别特征进行索引然后将原始值转换为指标。
索引后的类别特征可以帮助决策树等算法处理类别特征, 并得到较好的结果。
在下面的例子中, 我们读入一个数据集, 然后使用VectorIndexer来决定哪些特征需要被作为非数值类型处理, 将非数值型特征转换为他们的索引。
5.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.linalg.{Vector, Vectors}
/**
* @author hnbian
* @ Description
* @ Date 2018/12/28 12:09
**/
object VectorIndexer extends App {
val spark = SparkUtils.getSparkSession("VectorIndexer", 4)
val data = Seq(
Vectors.dense(-1.0, 1.0, 1.0),
Vectors.dense(-1.0, 3.0, 1.0),
Vectors.dense(0.0, 5.0, 1.0))
//定义数据集
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
//展示数据集
df.show()
/**
* +--------------+
* | features|
* +--------------+
* |[-1.0,1.0,1.0]|
* |[-1.0,3.0,1.0]|
* | [0.0,5.0,1.0]|
* +--------------+
*/
//定义 VectorIndexer 评估器
val indexer = new VectorIndexer().
setInputCol("features").
setOutputCol("indexed").
setMaxCategories(2) //设置最大种类别为2,即每列种的元素类型不能大于2种,比如中间列为1,3,5 为3种
//使用VectorIndexer训练出模型,来决定哪些特征需要被作为类别特征
val indexerModel = indexer.fit(df)
/**
* 即只有种类小于2的特征才被认为是类别型特征,否则被认为是连续型特征
* 可以通过VectorIndexerModel的categoryMaps成员来获得被转换的特征及其映射,
* 这里可以看到共有两个特征被转换,分别是下标为0和2的元素。
* 可以看到,0号特征只有-1,0两种取值,分别被映射成0,1,而2号特征只有1种取值,被映射成0。
*/
indexerModel.transform(df).show()
/**
* +--------------+-------------+
* | features| indexed|
* +--------------+-------------+
* |[-1.0,1.0,1.0]|[1.0,1.0,0.0]|
* |[-1.0,3.0,1.0]|[1.0,3.0,0.0]|
* | [0.0,5.0,1.0]|[0.0,5.0,0.0]|
* +--------------+-------------+
*/
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " + categoricalFeatures.mkString(", "))
//Chose 2 categorical features: 0, 2
}