1. 正则化
1.1 算法说明
类别:transformer【转换器】
正则化(Normalizer)是一个转换器, 它可以将多行向量输入转化为统一的形式。 参数为p (默认值:2) 来指定正则化中使用的p-norm。 正则化操作可以使输入数据标准化并提高后期学习算法的效果
范数对于数学的意义?1范数、2范数、无穷范数
几种范数的简单介绍
1.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| package hnbian.spark.ml.feature.transforming import hnbian.spark.utils.{FileUtils, SparkUtils} import org.apache.spark.ml.linalg.Vectors
object Normalizer extends App { val spark = SparkUtils.getSparkSession("Normalizer", 4) import org.apache.spark.ml.feature.Normalizer val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 0.5, -1.0)), (1, Vectors.dense(2.0, 1.0, 1.0)), (2, Vectors.dense(4.0, 10.0, 2.0)) )).toDF("id", "features") dataFrame.show( false)
val normalizer = new Normalizer() .setInputCol("features") .setOutputCol("normFeatures") .setP(1.0) val l1NormData = normalizer.transform(dataFrame) l1NormData.show(false)
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity) println("Normalized using L^inf norm") lInfNormData.show()
}
|
2. 标准缩放
2.1 算法说明
类别:estimator【评估器】
标准缩放(StandardScaler) 处理的对象是每一列,也就是每一维特征,将特征标准化为单位标准差或者是0均值,或者是0均值单位标准差。
主要有以下两个参数:
1.withStd:默认值为真。(是否将数据标准化到单位标准差)
2.withMean:默认为假。是否变换为0均值。(此方法将产出一个稠密输出, 所以不适用于稀疏输入。)
StandardScaler 是一个评估器, 可以调用它的fit()方法,训练数据集产生一个StandardScalerModel, 用来计算汇总统计。 然后产生的模型可以用来转换向量至统一的标准差或者零均值特征。 注意如果特征的标准差为零,则该特征在向量中返回的默认值为0.0
2.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package hnbian.spark.ml.feature.transforming import hnbian.spark.utils.SparkUtils import org.apache.spark.ml.feature.StandardScaler import org.apache.spark.ml.linalg.Vectors
object StandardScaler extends App { val spark = SparkUtils.getSparkSession("StandardScaler",4) val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 0.5, -1.0)), (1, Vectors.dense(2.0, 1.0, 1.0)), (2, Vectors.dense(4.0, 10.0, 2.0)) )).toDF("id", "features") dataFrame.show( false)
val scaler = new StandardScaler() .setInputCol("features") .setOutputCol("scaledFeatures") .setWithStd(true) .setWithMean(false) val scalerModel = scaler.fit(dataFrame) val scaledData = scalerModel.transform(dataFrame) scaledData.show(false)
}
|
3. 归一化
3.1 算法介绍
类别:estimator【评估器】
归一化(最大值-最小值缩放,MinMaxScaler)
MinMaxScaler作用是每一列即每一维特征。将每一维特征线性地映射到指定范围,通常是[0-1]。
MinMaxScaler计算数据集的汇总计量,并产生一个MinMaxScalerModel。该模型可以将独立的特征值转换到指定范围内。
注意: 因为零值转换后也可能变为非零值,所以即便为稀疏输入,输出也可能变为稠密向量。
有两个参数可以设置:
1.min:默认为0.0, 为转换后所有特征的下边界。
2.max:默认为1.0, 为转换后所有特征的上边界
对于特征E来说, 调整后的特征值如下:
$Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min $ 如果 $E_{max} == E_{min}$ 则 Rescaled = 0.5 * (max -min)
3.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package hnbian.spark.ml.feature.transforming import hnbian.spark.utils.SparkUtils import org.apache.spark.ml.feature.MinMaxScaler import org.apache.spark.ml.linalg.Vectors
object MinMaxScaler extends App { val spark = SparkUtils.getSparkSession("MinMaxScaler", 4) val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 0.5, 3.0)), (1, Vectors.dense(2.0, 11.0, 1.0)), (2, Vectors.dense(4.0, 10.0, 2.0)) )).toDF("id", "features") dataFrame.show(false)
val scaler = new MinMaxScaler() .setInputCol("features") .setOutputCol("scaledFeatures") val scalerModel = scaler.fit(dataFrame) val scaledData = scalerModel.transform(dataFrame) scaledData.show(false)
}
|
4. 最大值-平均值缩放
4.1 算法介绍
类别:estimator【评估器】
最大值-平均值缩放(MaxAbsScaler )将每一维度的特征变换到[-1,1]的闭区间上,通过除以每一维度特征上的最大的绝对值,他不会平移整个分布,也不会破坏原来每一个特征向量的稀疏性。(因为不会转移/集中数据,所以不会破坏数据的稀疏性)
4.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package hnbian.spark.ml.feature.transforming import hnbian.spark.ml.feature.transforming.Normalizer.spark import hnbian.spark.utils.SparkUtils import org.apache.spark.ml.feature.MaxAbsScaler import org.apache.spark.ml.linalg.Vectors
object MaxAbsScaler extends App { val spark = SparkUtils.getSparkSession("MaxAbsScaler", 4) val dataFrame = spark.createDataFrame(Seq( (0, Vectors.dense(1.0, 0.5, -1.0)), (1, Vectors.dense(2.0, 1.0, 1.0)), (2, Vectors.dense(4.0, 10.0, 2.0)) )).toDF("id", "features") dataFrame.show( false)
val scaler = new MaxAbsScaler() .setInputCol("features") .setOutputCol("scaledFeatures") val scalerModel = scaler.fit(dataFrame) val scaledData = scalerModel.transform(dataFrame) scaledData.select("features", "scaledFeatures").show()
}
|
5. 离散化重组
5.1 算法介绍
类别:transformer【转换器】
离散化重组(分箱操作,bucketizer) 将连续数值转换为离散类别,其中类别可由用户指定,比如:年龄是一个连续值特征,需要将其转换为离散类别(未成年人、青年人、中年人、老年人),这时就用到了bucketizer。
分类标准是自己设置的split 参数定义,具体参数说明如下:
split:如:double[] splits = {0, 18, 35,50, Double.PositiveInfinity} ;将年龄分为四类:0-18、18-35、35-50、50-Double.PositiveInfinity 四个阶段(若值为18 将会分到18-35这个分类内)
如果左右不确定可以设置为:Double.NegativeInfinity(负无穷), Double.PositiveInfinity(正无穷)
注意:
1.当不确定分裂的上下边界时,应当添加Double.Negativelnfinity和Double.Positivelnfinity以免越界
2.每个类别的数值严格要求是递增的,在类别空间外的数值将被归为错误一类
5.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package hnbian.spark.ml.feature.transforming import hnbian.spark.utils.SparkUtils import org.apache.spark.ml.feature.Bucketizer
object Bucketizer extends App { val spark = SparkUtils.getSparkSession("Bucketizer", 4) val df = spark.createDataFrame(Seq( (0, 10), (1, 15), (2, 30), (3, 20), (4, 51), (5, 60), (6, 18) )).toDF("id", "age") df.show(false)
val splits = Array(0,18,35,55,Double.PositiveInfinity) val bucketizer = new Bucketizer() .setInputCol("age") .setOutputCol("age_stage") .setSplits(splits) bucketizer.transform(df).show(false)
}
|
6. 分数求解器
6.1 算法介绍
类别:estimator【评估器】
分位数离散器和Bucketizer(分箱处理)一样也是将连续数值转换为离散类别特征。
实际上分数求解器(QuantileDiscretizer) 继承自Bucketizer
参数说明:
numBuckets:自己定义数据分几个类别并完成离散化(而不是像分箱一样自己设置splits)
RelativeError:精度函数,如果设置为0,则计算最精确分位数,这是一个时间代价很高的操作
另外分类上下边界为正负无穷,覆盖所有实数。
6.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| package hnbian.spark.ml.feature.transforming import hnbian.spark.utils.SparkUtils import org.apache.spark.ml.feature.QuantileDiscretizer
object QuantileDiscretizer extends App { val spark = SparkUtils.getSparkSession("QuantileDiscretizer", 4) val df = spark.createDataFrame(Seq( (0, 10), (1, 15), (2, 30), (3, 20), (4, 51), (5, 60), (6, 18) )).toDF("id", "age") df.show(false)
val quantileDiscretizer = new QuantileDiscretizer() .setInputCol("age") .setOutputCol("age_stage") .setNumBuckets(4) .setRelativeError(0.1) val model = quantileDiscretizer.fit(df) model.transform(df).show(false)
}
|
7. 元素乘积
7.1 算法介绍
类别:transformer【转换器】
元素乘积(ElemtwiseProduct) 按提供的 “ weight ” 向量, 返回与输入向量元素级别的乘积。
即使说, 按提供的权重分别对输入数据进行缩放, 得到输入向量v以及权重向量w的Hadamard积。
7.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package hnbian.spark.ml.feature.transforming import hnbian.spark.utils.SparkUtils import org.apache.spark.ml.feature.ElementwiseProduct import org.apache.spark.ml.linalg.Vectors
object ElemtwiseProduct extends App { val spark = SparkUtils.getSparkSession("ElemtwiseProduct", 4) val dataFrame = spark.createDataFrame(Seq( ("a", Vectors.dense(1.0, 2.0, 3.0)), ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector") dataFrame.show()
val transformingVector = Vectors.dense(0.0, 1.0, 2.0) println(s"transformingVector = ${transformingVector}") val transformer = new ElementwiseProduct() .setScalingVec(transformingVector) .setInputCol("vector") .setOutputCol("transformedVector") transformer.transform(dataFrame).show()
}
|
8. SQL转换器
8.1 算法介绍
类别:transformer【转换器】
SQL转换器 (SQLTransformer)工具用来转换由SQL定义的陈述。目前仅支持SQL语法如”SELECT …FROM THIS …”,其中”THIS”代表输入数据的基础表。选择语句指定输出中展示的字段、元素和表达式,支持Spark SQL中的所有选择语句。用户可以基于选择结果使用Spark SQL建立方程或者用户自定义函数。SQLTransformer支持语法示例如下:
1 2 3 4 5 6 7
| SELECTa, a + b AS a_b FROM THIS
SELECTa, SQRT(b) AS b_sqrt FROM THIS where a > 5
SELECTa, b, SUM(c) AS c_sum FROM THIS GROUP BY a, b
|
示例:
假设我们有如下DataFrame包含id,v1,v2列:
| id |
v1 |
v2 |
| 0 |
1.0 |
3.0 |
| 2 |
2.0 |
5.0 |
SELECT *,(v1 + v2) AS v3, (v1 * v2) AS v4 FROM THIS
使用SQLTransformer语句” SELECT *,(v1 + v2) AS v3, (v1 * v2) AS v4 FROM THIS “转换后得到输出如下:
| id |
v1 |
v2 |
v3 |
v4 |
| 0 |
1.0 |
3.0 |
4.0 |
3.0 |
| 2 |
2.0 |
5.0 |
7.0 |
10.0 |
8.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package hnbian.spark.ml.feature.transforming import hnbian.spark.utils.SparkUtils import org.apache.spark.ml.feature.SQLTransformer
object SQLTransformer extends App { val spark = SparkUtils.getSparkSession("SQLTransformer", 4) val df = spark.createDataFrame(Seq( (0, 1.0, 3.0), (2, 2.0, 5.0)) ).toDF("id", "v1", "v2") df.show(false)
val sqlTrans = new SQLTransformer() .setStatement("SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__") sqlTrans.transform(df).show()
}
|
9. 向量汇编
9.1 算法介绍
类别:transformer【转换器】
向量汇编(VectorAssembler)是一个转换器, 它将给定的若干列合并为一列向量。
它可以将原始特征和一系列通过其他转换器得到的特征合并为单一的特征向量, 来训练如逻辑回归和决策树等机器学习算法。
VectorAssembler可接受的输入类型:数值型, 布尔型, 向量型。 输入列的值将按指定顺序依次添加到一个新的向量中。
假设我们有如下DataFrame包含id,hour,mobile, userFeatures以及clicked列:
| id |
hour |
mobile |
userFeatures |
clicked |
| 0 |
18 |
1.0 |
[0.0,12.0,0.5] |
1.0 |
userFeatures列中含有3个用户特征。 我们想将hour,mobile以及userFeatures 合并为一个新列, 将VectorAssembler的输入指定为hour,mobile 以及userFeatures, 输出指定为features , 通过转换我们得到以下结果:
| id |
hour |
mobile |
userFeatures |
clicked |
features |
| 0 |
18 |
1.0 |
[0.0,12.0,0.5] |
1.0 |
[18.0,1.0,0.0,10.0,0.5] |
9.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package hnbian.spark.ml.feature.transforming import hnbian.spark.utils.SparkUtils import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.linalg.Vectors
object VectorAssembler extends App { val spark = SparkUtils.getSparkSession("VectorAssembler", 4) val dataset = spark.createDataFrame( Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0)) ).toDF("id", "hour", "mobile", "userFeatures", "clicked") dataset.show()
val assembler = new VectorAssembler() .setInputCols(Array("hour", "mobile", "userFeatures")) .setOutputCol("features") val output = assembler.transform(dataset) println(output.select("features", "clicked").first()) output.show(false)
}
|