1. 正则化
1.1 算法说明
类别:transformer【转换器】
正则化(Normalizer)是一个转换器, 它可以将多行向量输入转化为统一的形式。 参数为p (默认值:2) 来指定正则化中使用的p-norm。 正则化操作可以使输入数据标准化并提高后期学习算法的效果
范数对于数学的意义?1范数、2范数、无穷范数
几种范数的简单介绍
1.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.utils.{FileUtils, SparkUtils}
import org.apache.spark.ml.linalg.Vectors
/**
* @author hnbian
* @ Description
* @ Date 2018/12/28 15:09
**/
object Normalizer extends App {
val spark = SparkUtils.getSparkSession("Normalizer", 4)
import org.apache.spark.ml.feature.Normalizer
//获取文件路径
//val path = FileUtils.getFilePath("sample_libsvm_data_bak.txt")
//打印文件路径
//println(path)
//根据测试文件创建数据集
//val dataFrame = spark.read.format("libsvm").load(path)
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, -1.0)),
(1, Vectors.dense(2.0, 1.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")
dataFrame.show( false)
/**
* +---+--------------+
* |id |features |
* +---+--------------+
* |0 |[1.0,0.5,-1.0]|
* |1 |[2.0,1.0,1.0] |
* |2 |[4.0,10.0,2.0]|
* +---+--------------+
*/
// 使用 $L^1$ norm 正则化每个向量
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0) //默认2
// 将每一行的规整为1阶范数为1的向量,1阶范数即所有值绝对值之和。
val l1NormData = normalizer.transform(dataFrame)
l1NormData.show(false)
/**
* +---+--------------+------------------+
* |id |features |normFeatures |
* +---+--------------+------------------+
* |0 |[1.0,0.5,-1.0]|[0.4,0.2,-0.4] |
* |1 |[2.0,1.0,1.0] |[0.5,0.25,0.25] |
* |2 |[4.0,10.0,2.0]|[0.25,0.625,0.125]|
* +---+--------------+------------------+
*/
// 正则化每个向量到无穷阶范数 (无穷范数——向量中最大元素的绝对值)
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
/**
* +---+--------------+--------------+
* | id| features| normFeatures|
* +---+--------------+--------------+
* | 0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
* | 1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
* | 2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
* +---+--------------+--------------+
*/
}
2. 标准缩放
2.1 算法说明
类别:estimator【评估器】
标准缩放(StandardScaler) 处理的对象是每一列,也就是每一维特征,将特征标准化为单位标准差或者是0均值,或者是0均值单位标准差。
主要有以下两个参数:
1.withStd:默认值为真。(是否将数据标准化到单位标准差)
2.withMean:默认为假。是否变换为0均值。(此方法将产出一个稠密输出, 所以不适用于稀疏输入。)
StandardScaler 是一个评估器, 可以调用它的fit()方法,训练数据集产生一个StandardScalerModel, 用来计算汇总统计。 然后产生的模型可以用来转换向量至统一的标准差或者零均值特征。 注意如果特征的标准差为零,则该特征在向量中返回的默认值为0.0
2.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.utils.SparkUtils
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.linalg.Vectors
/**
* @author hnbian
* @ Description
* @ Date 2018/12/28 16:03
**/
object StandardScaler extends App {
val spark = SparkUtils.getSparkSession("StandardScaler",4)
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, -1.0)),
(1, Vectors.dense(2.0, 1.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")
dataFrame.show( false)
/**
* +---+--------------+
* |id |features |
* +---+--------------+
* |0 |[1.0,0.5,-1.0]|
* |1 |[2.0,1.0,1.0] |
* |2 |[4.0,10.0,2.0]|
* +---+--------------+
*/
//定义StandardScaler 评估器 并设置输入输出与相关参数
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true) //是否将数据标准化到单位标准差
.setWithMean(false) //是否变换为0均值
//训练数据产生一个模型
val scalerModel = scaler.fit(dataFrame)
//使用模型转换数据
val scaledData = scalerModel.transform(dataFrame)
//查看转换后的数据集
scaledData.show(false)
/**
* // 将每一列的标准差缩放到1
* +---+--------------+------------------------------------------------------------+
* |id |features |scaledFeatures |
* +---+--------------+------------------------------------------------------------+
* |0 |[1.0,0.5,-1.0]|[0.6546536707079771,0.09352195295828246,-0.6546536707079772]|
* |1 |[2.0,1.0,1.0] |[1.3093073414159542,0.18704390591656492,0.6546536707079772] |
* |2 |[4.0,10.0,2.0]|[2.6186146828319083,1.8704390591656492,1.3093073414159544] |
* +---+--------------+------------------------------------------------------------+
*/
}
3. 归一化
3.1 算法介绍
类别:estimator【评估器】
归一化(最大值-最小值缩放,MinMaxScaler)
MinMaxScaler作用是每一列即每一维特征。将每一维特征线性地映射到指定范围,通常是[0-1]。
MinMaxScaler计算数据集的汇总计量,并产生一个MinMaxScalerModel。该模型可以将独立的特征值转换到指定范围内。
注意: 因为零值转换后也可能变为非零值,所以即便为稀疏输入,输出也可能变为稠密向量。
有两个参数可以设置:
1.min:默认为0.0, 为转换后所有特征的下边界。
2.max:默认为1.0, 为转换后所有特征的上边界
对于特征E来说, 调整后的特征值如下:
$Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min $ 如果 $E_{max} == E_{min}$ 则 Rescaled = 0.5 * (max -min)
3.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.utils.SparkUtils
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors
/**
* @author hnbian 特征转换归一化代码示例
* @ Description
* @ Date 2018/12/28 16:34
**/
object MinMaxScaler extends App {
val spark = SparkUtils.getSparkSession("MinMaxScaler", 4)
//定义数据集
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, 3.0)),
(1, Vectors.dense(2.0, 11.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")
dataFrame.show(false)
/**
* +---+--------------+
* |id |features |
* +---+--------------+
* |0 |[1.0,0.5,-1.0]|
* |1 |[2.0,1.0,1.0] |
* |2 |[4.0,10.0,2.0]|
* +---+--------------+
*/
//定义MinMaxScaler评估器 设置输入输出列
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// 训练测试数据集并生成一个model
val scalerModel = scaler.fit(dataFrame)
// 使用model 对测试数据集进行转换
val scaledData = scalerModel.transform(dataFrame)
scaledData.show(false)
// 每维特征线性地映射,最小值映射到0,最大值映射到1。
/**
* +---+--------------+----------------------------+
* |id |features |scaledFeatures |
* +---+--------------+----------------------------+
* |0 |[1.0,0.5,3.0] |[0.0,0.0,1.0] |
* |1 |[2.0,11.0,1.0]|[0.3333333333333333,1.0,0.0]|
* |2 |[4.0,10.0,2.0]|[1.0,0.9047619047619048,0.5]|
* +---+--------------+----------------------------+
*/
}
4. 最大值-平均值缩放
4.1 算法介绍
类别:estimator【评估器】
最大值-平均值缩放(MaxAbsScaler )将每一维度的特征变换到[-1,1]的闭区间上,通过除以每一维度特征上的最大的绝对值,他不会平移整个分布,也不会破坏原来每一个特征向量的稀疏性。(因为不会转移/集中数据,所以不会破坏数据的稀疏性)
4.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.ml.feature.transforming.Normalizer.spark
import hnbian.spark.utils.SparkUtils
import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors
/**
* @author hnbian
* @ Description 最大值-平均值缩放
* @ Date 2018/12/28 16:54
**/
object MaxAbsScaler extends App {
val spark = SparkUtils.getSparkSession("MaxAbsScaler", 4)
//创建数据集
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, -1.0)),
(1, Vectors.dense(2.0, 1.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")
dataFrame.show( false)
/**
* +---+--------------+
* |id |features |
* +---+--------------+
* |0 |[1.0,0.5,-1.0]|
* |1 |[2.0,1.0,1.0] |
* |2 |[4.0,10.0,2.0]|
* +---+--------------+
*/
//定义MaxAbsScaler 评估器
val scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// 使用评估器训练数据得到一个模型
val scalerModel = scaler.fit(dataFrame)
// 使用模型对数据进行转换
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
// 每一维的绝对值的最大值为[4, 10, 2]
/**
* +--------------+----------------+
* | features| scaledFeatures|
* +--------------+----------------+
* |[1.0,0.5,-1.0]|[0.25,0.05,-0.5]|
* | [2.0,1.0,1.0]| [0.5,0.1,0.5]|
* |[4.0,10.0,2.0]| [1.0,1.0,1.0]|
* +--------------+----------------+
*/
}
5. 离散化重组
5.1 算法介绍
类别:transformer【转换器】
离散化重组(分箱操作,bucketizer) 将连续数值转换为离散类别,其中类别可由用户指定,比如:年龄是一个连续值特征,需要将其转换为离散类别(未成年人、青年人、中年人、老年人),这时就用到了bucketizer。
分类标准是自己设置的split 参数定义,具体参数说明如下:
split:如:double[] splits = {0, 18, 35,50, Double.PositiveInfinity} ;将年龄分为四类:0-18、18-35、35-50、50-Double.PositiveInfinity 四个阶段(若值为18 将会分到18-35这个分类内)
如果左右不确定可以设置为:Double.NegativeInfinity(负无穷), Double.PositiveInfinity(正无穷)
注意:
1.当不确定分裂的上下边界时,应当添加Double.Negativelnfinity和Double.Positivelnfinity以免越界
2.每个类别的数值严格要求是递增的,在类别空间外的数值将被归为错误一类
5.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.utils.SparkUtils
import org.apache.spark.ml.feature.Bucketizer
/**
* @author hnbian 离散化重组,分箱操作
* @ Description
* @ Date 2018/12/28 17:11
**/
object Bucketizer extends App {
val spark = SparkUtils.getSparkSession("Bucketizer", 4)
//定义数据集
val df = spark.createDataFrame(Seq(
(0, 10),
(1, 15),
(2, 30),
(3, 20),
(4, 51),
(5, 60),
(6, 18)
)).toDF("id", "age")
df.show(false)
/**
* +---+---+
* |id |age|
* +---+---+
* |0 |10 |
* |1 |15 |
* |2 |30 |
* |3 |20 |
* |4 |51 |
* |5 |60 |
* |6 |18 |
* +---+---+
*/
val splits = Array(0,18,35,55,Double.PositiveInfinity)
//定义转换器 并设置splits
val bucketizer = new Bucketizer()
.setInputCol("age")
.setOutputCol("age_stage")
.setSplits(splits)
bucketizer.transform(df).show(false)
/**
* +---+---+---------+
* |0 |10 |0.0 |
* |1 |15 |0.0 |
* |2 |30 |1.0 |
* |3 |20 |1.0 |
* |4 |51 |2.0 |
* |5 |60 |3.0 |
* |6 |18 |1.0 |
* +---+---+---------+
* 18在分类数值上被算在后一个分类中
*/
}
6. 分数求解器
6.1 算法介绍
类别:estimator【评估器】
分位数离散器和Bucketizer(分箱处理)一样也是将连续数值转换为离散类别特征。
实际上分数求解器(QuantileDiscretizer) 继承自Bucketizer
参数说明:
numBuckets:自己定义数据分几个类别并完成离散化(而不是像分箱一样自己设置splits)
RelativeError:精度函数,如果设置为0,则计算最精确分位数,这是一个时间代价很高的操作
另外分类上下边界为正负无穷,覆盖所有实数。
6.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.utils.SparkUtils
import org.apache.spark.ml.feature.QuantileDiscretizer
/**
* @author hnbian
* @ Description 分数求解器(与分箱一样对连续数据做离散化)
* @ Date 2018/12/28 17:31
**/
object QuantileDiscretizer extends App {
val spark = SparkUtils.getSparkSession("QuantileDiscretizer", 4)
//定义数据集
val df = spark.createDataFrame(Seq(
(0, 10),
(1, 15),
(2, 30),
(3, 20),
(4, 51),
(5, 60),
(6, 18)
)).toDF("id", "age")
df.show(false)
/**
* +---+---+
* |id |age|
* +---+---+
* |0 |10 |
* |1 |15 |
* |2 |30 |
* |3 |20 |
* |4 |51 |
* |5 |60 |
* |6 |18 |
* +---+---+
*/
//定义评估器 设置相关参数
val quantileDiscretizer = new QuantileDiscretizer()
.setInputCol("age")
.setOutputCol("age_stage")
.setNumBuckets(4) //将数据分为4类
.setRelativeError(0.1) //精度设置为0.1
//训练模型
val model = quantileDiscretizer.fit(df)
//使用模型转换数据并展示数据
model.transform(df).show(false)
/**
* +---+---+---------+
* |id |age|age_stage|
* +---+---+---------+
* |0 |10 |0.0 |
* |1 |15 |1.0 |
* |2 |30 |2.0 |
* |3 |20 |2.0 |
* |4 |51 |3.0 |
* |5 |60 |3.0 |
* |6 |18 |1.0 |
* +---+---+---------+
*/
}
7. 元素乘积
7.1 算法介绍
类别:transformer【转换器】
元素乘积(ElemtwiseProduct) 按提供的 “ weight ” 向量, 返回与输入向量元素级别的乘积。
即使说, 按提供的权重分别对输入数据进行缩放, 得到输入向量v以及权重向量w的Hadamard积。
7.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.utils.SparkUtils
import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors
/**
* @author hnbian
* @ Description 元素乘积
* @ Date 2018/12/28 17:49
**/
object ElemtwiseProduct extends App {
val spark = SparkUtils.getSparkSession("ElemtwiseProduct", 4)
// 创建一些向量数据; 也适用于稀疏向量
val dataFrame = spark.createDataFrame(Seq(
("a", Vectors.dense(1.0, 2.0, 3.0)),
("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
dataFrame.show()
/**
* +---+-------------+
* | id| vector|
* +---+-------------+
* | a|[1.0,2.0,3.0]|
* | b|[4.0,5.0,6.0]|
* +---+-------------+
*/
//定义转换向量
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
println(s"transformingVector = ${transformingVector}")
//transformingVector = [0.0,1.0,2.0]
//定义一个转换器
val transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector")
// 使用转换器对数据集进行转换
transformer.transform(dataFrame).show()
/**
* 矩阵中的每个元素分别与转换向量中的元素相乘
* +---+-------------+-----------------+
* | id| vector|transformedVector|
* +---+-------------+-----------------+
* | a|[1.0,2.0,3.0]| [0.0,2.0,6.0]|
* | b|[4.0,5.0,6.0]| [0.0,5.0,12.0]|
* +---+-------------+-----------------+
*/
}
8. SQL转换器
8.1 算法介绍
类别:transformer【转换器】
SQL转换器 (SQLTransformer)工具用来转换由SQL定义的陈述。目前仅支持SQL语法如”SELECT …FROM THIS …”,其中”THIS”代表输入数据的基础表。选择语句指定输出中展示的字段、元素和表达式,支持Spark SQL中的所有选择语句。用户可以基于选择结果使用Spark SQL建立方程或者用户自定义函数。SQLTransformer支持语法示例如下:
SELECTa, a + b AS a_b FROM THIS
SELECTa, SQRT(b) AS b_sqrt FROM THIS where a > 5
SELECTa, b, SUM(c) AS c_sum FROM THIS GROUP BY a, b
示例:
假设我们有如下DataFrame包含id,v1,v2列:
id | v1 | v2 |
---|---|---|
0 | 1.0 | 3.0 |
2 | 2.0 | 5.0 |
SELECT *,(v1 + v2) AS v3, (v1 * v2) AS v4 FROM THIS
使用SQLTransformer语句” SELECT *,(v1 + v2) AS v3, (v1 * v2) AS v4 FROM THIS “转换后得到输出如下:
id | v1 | v2 | v3 | v4 |
---|---|---|---|---|
0 | 1.0 | 3.0 | 4.0 | 3.0 |
2 | 2.0 | 5.0 | 7.0 | 10.0 |
8.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.utils.SparkUtils
import org.apache.spark.ml.feature.SQLTransformer
/**
* @author hnbian SQL转换器
* @ Description
* @ Date 2018/12/28 18:02
**/
object SQLTransformer extends App {
val spark = SparkUtils.getSparkSession("SQLTransformer", 4)
//定义数据集
val df = spark.createDataFrame(Seq(
(0, 1.0, 3.0),
(2, 2.0, 5.0))
).toDF("id", "v1", "v2")
//查看数据集
df.show(false)
/**
* +---+---+---+
* |id |v1 |v2 |
* +---+---+---+
* |0 |1.0|3.0|
* |2 |2.0|5.0|
* +---+---+---+
*/
//定义转换器,并设置转换sql语句
val sqlTrans = new SQLTransformer()
.setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
//转换数据并查看结果
sqlTrans.transform(df).show()
/**
* +---+---+---+---+----+
* | id| v1| v2| v3| v4|
* +---+---+---+---+----+
* | 0|1.0|3.0|4.0| 3.0|
* | 2|2.0|5.0|7.0|10.0|
* +---+---+---+---+----+
*/
}
9. 向量汇编
9.1 算法介绍
类别:transformer【转换器】
向量汇编(VectorAssembler)是一个转换器, 它将给定的若干列合并为一列向量。
它可以将原始特征和一系列通过其他转换器得到的特征合并为单一的特征向量, 来训练如逻辑回归和决策树等机器学习算法。
VectorAssembler可接受的输入类型:数值型, 布尔型, 向量型。 输入列的值将按指定顺序依次添加到一个新的向量中。
假设我们有如下DataFrame包含id,hour,mobile, userFeatures以及clicked列:
id | hour | mobile | userFeatures | clicked |
---|---|---|---|---|
0 | 18 | 1.0 | [0.0,12.0,0.5] | 1.0 |
userFeatures列中含有3个用户特征。 我们想将hour,mobile以及userFeatures 合并为一个新列, 将VectorAssembler的输入指定为hour,mobile 以及userFeatures, 输出指定为features , 通过转换我们得到以下结果:
id | hour | mobile | userFeatures | clicked | features |
---|---|---|---|---|---|
0 | 18 | 1.0 | [0.0,12.0,0.5] | 1.0 | [18.0,1.0,0.0,10.0,0.5] |
9.2 代码示例
package hnbian.spark.ml.feature.transforming
import hnbian.spark.utils.SparkUtils
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
/**
* @author hnbian
* @ Description 向量汇编
* @ Date 2018/12/28 18:13
**/
object VectorAssembler extends App {
val spark = SparkUtils.getSparkSession("VectorAssembler", 4)
//定义数据集
val dataset = spark.createDataFrame(
Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
//展示数据集
dataset.show()
/**
* +---+----+------+--------------+-------+
* | id|hour|mobile| userFeatures|clicked|
* +---+----+------+--------------+-------+
* | 0| 18| 1.0|[0.0,10.0,0.5]| 1.0|
* +---+----+------+--------------+-------+
*/
//定义转换器 并设置要合并的列名
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures")) //设置要合并的列名
.setOutputCol("features")
//转换数据集得到含有新向量的数据集
val output = assembler.transform(dataset)
println(output.select("features", "clicked").first())
//[[18.0,1.0,0.0,10.0,0.5],1.0]
//打印数据集
output.show(false)
/**
* +---+----+------+--------------+-------+-----------------------+
* |id |hour|mobile|userFeatures |clicked|features |
* +---+----+------+--------------+-------+-----------------------+
* |0 |18 |1.0 |[0.0,10.0,0.5]|1.0 |[18.0,1.0,0.0,10.0,0.5]|
* +---+----+------+--------------+-------+-----------------------+
*/
}