Spark ML 6.特征转换 2


1. 离散余弦变换

1.1 算法介绍

类别:transformer【转换器】

  • 离散余弦变(DCT, Discrete Cosine Transform )换是与傅里叶变换相关的一种变换, 它类似于离散傅里叶变换, 但是只使用实数。

  • 离散余弦变换相当于一个长度大概是它两倍的离散傅里叶变换,这个离散傅里叶变换是对一个实偶数进行的(因为一个实偶数的傅里叶变换扔是一个实偶数)。

  • 离散余弦变换经常被用于信号处理和图像处理方面。例如: 用于对信号和图像(包括静止图像和运动图像)进行有损数据压缩

1.2 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

package hnbian.spark.ml.feature.transforming

import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors


/**
* @author hnbian
* @ Description 离散余弦变换
* @ Date 2018/12/28 9:26
**/
object DCT extends App {
val spark = SparkUtils.getSparkSession("DCT", 4)

val data = Seq(
Vectors.dense(0.0, 1.0, -2.0, 3.0),
Vectors.dense(-1.0, 2.0, 4.0, -7.0),
Vectors.dense(14.0, -2.0, -5.0, 1.0))
//定义数据集
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
//打印定义的数据集
df.show(false)
/**
* +--------------------+
* |features |
* +--------------------+
* |[0.0,1.0,-2.0,3.0] |
* |[-1.0,2.0,4.0,-7.0] |
* |[14.0,-2.0,-5.0,1.0]|
* +--------------------+
*/
val dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false)

val dctDF = dct.transform(df)
dctDF.show(false)
/**
* +--------------------+----------------------------------------------------------------+
* |features |featuresDCT |
* +--------------------+----------------------------------------------------------------+
* |[0.0,1.0,-2.0,3.0] |[1.0,-1.1480502970952693,2.0000000000000004,-2.7716385975338604]|
* |[-1.0,2.0,4.0,-7.0] |[-1.0,3.378492794482933,-7.000000000000001,2.9301512653149677] |
* |[14.0,-2.0,-5.0,1.0]|[4.0,9.304453421915744,11.000000000000002,1.5579302036357163] |
* +--------------------+----------------------------------------------------------------+
*/

}

2. 字符串-索引变换

2.1 算法介绍

类别:estimator【评估器】

  • 字符串-索引变换(String-Indexer) 转换器可以把一列类别型的特征(或标签)进行编码,使其数值化索引的范围是[0,numLables],按照标签出现频率排序, 所以出现最频繁的标签其指标为0。

  • 该过程可以使得相应的特征索引化,使得某些无法接受类别型特征的算法可以使用,并提高诸如决策树等机器学习算法的效率。

  • 如果输入的是数值型的,我们会把它转化成字符型,然后再对其进行编码,如果下游的管道节点需要使用字符串-索引编码, 则必须输入和转换为字符串- 指标列名

假设我们有DataFeame数据含有id和category两列:

id category
0 a
1 b
2 c
3 a
4 a
5 c

category是有三种取值的字符串, 使用StringIndexer进行转换后我们可以得到如下输出

id category cateforyIndex
0 a 0.0
1 b 2.0
2 c 1.0
3 a 0.0
4 a 0.0
5 c 1.0

另外, 如果在转换新数据时出现了再训练中未出现的标签,StringIndexer将会报错(默认)或者跳过从未出现的标签实例。

2.2 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package hnbian.spark.ml.feature.transforming

import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.{StringIndexer, StringIndexerModel}

/**
* @author hnbian
* @ Description
* @ Date 2018/12/28 9:59
**/
object StringIndexer extends App {
val spark = SparkUtils.getSparkSession("StringIndexer", 4)
//定义数据集
val df1 = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(5, "c"))).toDF("id", "category")
//查看刚刚定义的数据集
df1.show(false)
/**
* +---+--------+
* |id |category|
* +---+--------+
* |0 |a |
* |1 |b |
* |2 |c |
* |3 |a |
* |4 |a |
* |5 |c |
* +---+--------+
*/
//创建一个StringIndexer对象
val indexer = new StringIndexer()
.setInputCol("category") //设定输入列名
.setOutputCol("categoryIndex") //设定输出列名

//对这个DataFrame进行训练,产生StringIndexerModel对象(转换器)
val stringIndexerModel = indexer.fit(df1)
//使用训练出来的模型对数据集进行转换
val indexed = stringIndexerModel.transform(df1)
//查看转换后的数据集
indexed.show(false)
/**
* +---+--------+-------------+
* |id |category|categoryIndex|
* +---+--------+-------------+
* |0 |a |0.0 |
* |1 |b |2.0 |
* |2 |c |1.0 |
* |3 |a |0.0 |
* |4 |a |0.0 |
* |5 |c |1.0 |
* +---+--------+-------------+
*/

/**
* 从上看转换后的数据集我们可以看到,
* StringIndexerModel依次按照出现频率的高低,把字符标签进行了排序,
* 即出现最多的“a”(3次)被编号成0,“c”(2次)为1,出现最少的“b”(1次)为0。
*/

/**
* 如果我们使用StringIndexerModel去转换一个模型内没有出现过的包含“d”标签的DataFrame 会有什么效果?
* 实际上,如果直接转换的话,Spark会抛出异常,报出“Unseen label: d”的错误。
* 为了处理这种情况,在模型训练后,可以通过设置setHandleInvalid("skip")来忽略掉那些未出现的标签,
* 这样,带有未出现标签的行将直接被过滤掉,所下所示:
*/

//构建一个包含“d”标签的数据集
val df2 = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(4, "a"),
(5, "d"))).toDF("id", "category")

df2.show(false)
/**
* +---+--------+
* |id |category|
* +---+--------+
* |0 |a |
* |1 |b |
* |2 |c |
* |3 |a |
* |4 |a |
* |5 |d |
* +---+--------+
*/
//使用模型转换包含“d”标签的数据集
val indexed2 = stringIndexerModel.transform(df2)
//展示转换后的数据集
//indexed2.show(false)
//报错:Caused by: org.apache.spark.SparkException: Unseen label: d. To handle unseen labels, set Param handleInvalid to keep.

//需要先跳过之前模型训练时不包含的标签再进行转换
val indexed3 = stringIndexerModel.setHandleInvalid("skip").transform(df2)
indexed3.show(false)
/**
* +---+--------+-------------+
* |id |category|categoryIndex|
* +---+--------+-------------+
* |0 |a |0.0 |
* |1 |b |2.0 |
* |2 |c |1.0 |
* |3 |a |0.0 |
* |4 |a |0.0 |
* +---+--------+-------------+
*/
/**
* 我们可以看到 在这次转换中c的次数已经跟b是相等的都只有一次但是c的索引值仍然是1,因为在模型训练时已经对c定义索引为1。
*/
}

3. 索引–字符串变换

3.1 算法说明

类别:estimator【评估器】

  • 与StringIndexer相对应,IndexToString的作用是把标签索引的一列重新映射回原有的字符型标签。

  • IndexToString 主要使用场景一般都是和StringIndexer配合,先用StringIndexer将标签转化成标签索引,进行模型训练,然后在预测标签的时候再把标签索引转化成原有的字符标签。当然,你也可以另外定义其他的标签。

    • 首先和StringIndexer的实验相同,我们用StringIndexer读取数据集中的“category”列,把字符型标签转化成标签索引,
    • 然后输出到“categoryIndex”列上,构建出新的DataFrame。

3.2 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package hnbian.spark.ml.feature.transforming

import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}


/**
* @author hnbian
* @ Description
* @ Date 2018/12/28 10:49
**/
object IndexToString extends App {

val spark = SparkUtils.getSparkSession("IndexToString", 4)
//定义数据集
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
//展示数据集
df.show(false)
/**
* +---+--------+
* |id |category|
* +---+--------+
* |0 |a |
* |1 |b |
* |2 |c |
* |3 |a |
* |4 |a |
* |5 |c |
* +---+--------+
*/
//创建StringIndexer 评估器
val stringIndexer = new StringIndexer()
.setInputCol("category") //设置输入列
.setOutputCol("categoryIndex")
//设置输出列
//使用刚刚定义的StringIndexer评估器训练数据获得模型
val indexer = stringIndexer.fit(df)
//使用模型转换数据
val df2 = indexer.transform(df)
df2.show(false)
/**
* +---+--------+-------------+
* |id |category|categoryIndex|
* +---+--------+-------------+
* |0 |a |0.0 |
* |1 |b |2.0 |
* |2 |c |1.0 |
* |3 |a |0.0 |
* |4 |a |0.0 |
* |5 |c |1.0 |
* +---+--------+-------------+
*/
//定义一个IndexToString 转换器
val converter = new IndexToString()
.setInputCol("categoryIndex") //设置读取“categoryIndex”上的标签索引
.setOutputCol("originalCategory") //设置输出到“originalCategory”列
//转换数据集字符型标签,然后再输出到“originalCategory”列上
val converted = converter.transform(df2)
//通过输出“originalCategory”列,可以看到数据集中原有的字符标签
converted.show(false)
/**
* +---+--------+-------------+----------------+
* |id |category|categoryIndex|originalCategory|
* +---+--------+-------------+----------------+
* |0 |a |0.0 |a |
* |1 |b |2.0 |b |
* |2 |c |1.0 |c |
* |3 |a |0.0 |a |
* |4 |a |0.0 |a |
* |5 |c |1.0 |c |
* +---+--------+-------------+----------------+
*/
}

4. 独热编码

4.1 算法说明

类别:estimator【评估器】

  • 将在Spark 3.0 版本删除此算法

  • 独热编码(One-Hot Encoding) 是指把一列类别性特征(或称名词性特征,nominal/categorical features)映射成一系列的二元连续特征的过程,

  • 原有的类别性特征有几种可能取值,这一特征就会被映射成几个二元连续特征,

  • 每一个特征代表一种取值,若该样本表现出该特征,则取1,否则取0。

  • One-Hot编码适合一些期望类别特征为连续特征的算法,比如说逻辑回归等。

4.2 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package hnbian.spark.ml.feature.transforming

import org.apache.spark.ml.feature.{OneHotEncoder, OneHotEncoderEstimator, StringIndexer}

/**
* @author hnbian
* @ Description 特征转换独热编码示例
* @ Date 2018/12/28 11:12
**/
object OneHotEncoderEstimator extends App {

import hnbian.spark.SparkUtils

val spark = SparkUtils.getSparkSession("OneHotEncoder", 4)

// 首先创建一个DataFrame,其包含一列类别性特征,需要注意的是,
// 在使用OneHotEncoder进行转换前,DataFrame需要先使用StringIndexer将原始标签数值化:
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c"),
(6, "d"),
(7, "d"),
(8, "d"),
(9, "d"),
(10, "e"),
(11, "e"),
(12, "e"),
(13, "e"),
(14, "e")
)).toDF("id", "category")
df.show()
/**
* +---+--------+
* | id|category|
* +---+--------+
* | 0| a|
* | 1| b|
* | 2| c|
* | 3| a|
* | 4| a|
* | 5| c|
* | 6| d|
* | 7| d|
* | 8| d|
* | 9| d|
* | 10| e|
* | 11| e|
* | 12| e|
* | 13| e|
* | 14| e|
* +---+--------+
*/
//定义StringIndexer 评估器并训练出模型
val indexerModel = new StringIndexer().
setInputCol("category").
setOutputCol("categoryIndex").
fit(df)

//对数据集进行转换
val indexed = indexerModel.transform(df)
//查看转换后的数据
indexed.show(false)
/**
* a 3次,b 1次,c 2次,d 4次,e 5次
* e 5次、d 4次、a 3次、c 2次、b 1次
* +---+--------+-------------+
* |id |category|categoryIndex|
* +---+--------+-------------+
* |0 |a |2.0 |
* |1 |b |4.0 |
* |2 |c |3.0 |
* |3 |a |2.0 |
* |4 |a |2.0 |
* |5 |c |3.0 |
* |6 |d |1.0 |
* |7 |d |1.0 |
* |8 |d |1.0 |
* |9 |d |1.0 |
* |10 |e |0.0 |
* |11 |e |0.0 |
* |12 |e |0.0 |
* |13 |e |0.0 |
* |14 |e |0.0 |
* +---+--------+-------------+
*/

/**
* 创建OneHotEncoder对象对处理后的DataFrame进行编码,
* 可以看见,编码后的二进制特征呈稀疏向量形式,
* 与StringIndexer编码的顺序相同,需注意的是最后一个Category(”b”)被编码为全0向量,
* 若希望”b”也占有一个二进制特征,则可在创建OneHotEncoder时指定setDropLast(false)。
*/
//创建一个OneHotEncoderEstimator 评估器
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("categoryIndex"))
.setOutputCols(Array("categoryVec"))
//.setDropLast(false) //默认为true 是否删除最后的类别,如共有五个类别输入,删除最后一个则剩下四个类别

//使用评估器训练模型
val encoderModel = encoder.fit(indexed)
//转换数据集并查看转换后的结果集
encoderModel.transform(indexed).show(false)
/**
* a 3次,b 1次,c 2次,d 4次,e 5次
* e 5次、d 4次、a 3次、c 2次、b 1次
* +---+--------+-------------+-------------+
* |id |category|categoryIndex|categoryVec |
* +---+--------+-------------+-------------+
* |0 |a |2.0 |(4,[2],[1.0])|
* |1 |b |4.0 |(4,[],[]) |
* |2 |c |3.0 |(4,[3],[1.0])|
* |3 |a |2.0 |(4,[2],[1.0])|
* |4 |a |2.0 |(4,[2],[1.0])|
* |5 |c |3.0 |(4,[3],[1.0])|
* |6 |d |1.0 |(4,[1],[1.0])|
* |7 |d |1.0 |(4,[1],[1.0])|
* |8 |d |1.0 |(4,[1],[1.0])|
* |9 |d |1.0 |(4,[1],[1.0])|
* |10 |e |0.0 |(4,[0],[1.0])|
* |11 |e |0.0 |(4,[0],[1.0])|
* |12 |e |0.0 |(4,[0],[1.0])|
* |13 |e |0.0 |(4,[0],[1.0])|
* |14 |e |0.0 |(4,[0],[1.0])|
* +---+--------+-------------+-------------+
*/

//.setDropLast(false) 设置为false 时打印数据
/**
* +---+--------+-------------+-------------+
* |id |category|categoryIndex|categoryVec |
* +---+--------+-------------+-------------+
* |0 |a |2.0 |(5,[2],[1.0])|
* |1 |b |4.0 |(5,[4],[1.0])|
* |2 |c |3.0 |(5,[3],[1.0])|
* |3 |a |2.0 |(5,[2],[1.0])|
* |4 |a |2.0 |(5,[2],[1.0])|
* |5 |c |3.0 |(5,[3],[1.0])|
* |6 |d |1.0 |(5,[1],[1.0])|
* |7 |d |1.0 |(5,[1],[1.0])|
* |8 |d |1.0 |(5,[1],[1.0])|
* |9 |d |1.0 |(5,[1],[1.0])|
* |10 |e |0.0 |(5,[0],[1.0])|
* |11 |e |0.0 |(5,[0],[1.0])|
* |12 |e |0.0 |(5,[0],[1.0])|
* |13 |e |0.0 |(5,[0],[1.0])|
* |14 |e |0.0 |(5,[0],[1.0])|
* +---+--------+-------------+-------------+
*/
}

5. 向量-索引变换

5.1 算法说明

类别:estimator【评估器】

  • 之前介绍的StringIndexer是针对单个类别型特征进行转换,

  • 倘若所有特征都已经被组织在一个向量中,又想对其中某些单个分量进行处理时,Spark ML提供了向量-索引变换(VectorIndexer)类来解决向量数据集中的类别性特征转换。

  • 通过为其提供maxCategories超参数,它可以自动识别哪些特征是类别型的,并且将原始值转换为类别索引。它基于不同特征值的数量来识别哪些特征需要被类别化,那些取值可能性最多不超过maxCategories的特征需要会被认为是类别型的。

  • 它的处理流程如下:

  1. 获得一个向量类型的输入以及maxCategories参数。

  2. 基于原始数值识别哪些特征需要被类别化, 其中最多maxCategories需要被类别化。

  3. 对于每一个类别特征计算0-based类别指标。

  4. 对类别特征进行索引然后将原始值转换为指标。

  • 索引后的类别特征可以帮助决策树等算法处理类别特征, 并得到较好的结果。

  • 在下面的例子中, 我们读入一个数据集, 然后使用VectorIndexer来决定哪些特征需要被作为非数值类型处理, 将非数值型特征转换为他们的索引。

5.2 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package hnbian.spark.ml.feature.transforming

import hnbian.spark.SparkUtils
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.linalg.{Vector, Vectors}

/**
* @author hnbian
* @ Description
* @ Date 2018/12/28 12:09
**/
object VectorIndexer extends App {
val spark = SparkUtils.getSparkSession("VectorIndexer", 4)

val data = Seq(
Vectors.dense(-1.0, 1.0, 1.0),
Vectors.dense(-1.0, 3.0, 1.0),
Vectors.dense(0.0, 5.0, 1.0))
//定义数据集
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
//展示数据集
df.show()
/**
* +--------------+
* | features|
* +--------------+
* |[-1.0,1.0,1.0]|
* |[-1.0,3.0,1.0]|
* | [0.0,5.0,1.0]|
* +--------------+
*/
//定义 VectorIndexer 评估器

val indexer = new VectorIndexer().
setInputCol("features").
setOutputCol("indexed").
setMaxCategories(2) //设置最大种类别为2,即每列种的元素类型不能大于2种,比如中间列为1,3,5 为3种

//使用VectorIndexer训练出模型,来决定哪些特征需要被作为类别特征
val indexerModel = indexer.fit(df)
/**
* 即只有种类小于2的特征才被认为是类别型特征,否则被认为是连续型特征
* 可以通过VectorIndexerModel的categoryMaps成员来获得被转换的特征及其映射,
* 这里可以看到共有两个特征被转换,分别是下标为0和2的元素。
* 可以看到,0号特征只有-1,0两种取值,分别被映射成0,1,而2号特征只有1种取值,被映射成0。
*/
indexerModel.transform(df).show()
/**
* +--------------+-------------+
* | features| indexed|
* +--------------+-------------+
* |[-1.0,1.0,1.0]|[1.0,1.0,0.0]|
* |[-1.0,3.0,1.0]|[1.0,3.0,0.0]|
* | [0.0,5.0,1.0]|[0.0,5.0,0.0]|
* +--------------+-------------+
*/
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " + categoricalFeatures.mkString(", "))
//Chose 2 categorical features: 0, 2

}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录