1. 梯度提升树回归
1.1 算法简介
梯度提升树是一种决策树的继承算法。它通过反复迭代训练决策树来最小化损失函数。 决策树类似,梯度提升树具有可处理类别特征,易扩展到多分类问题, 不需要特征缩放等性质。 spark.ml 通过使用现有 decision tree 工具来实现。
梯度提升树依次迭代训练一系列的决策树。在一次迭代中, 算法使用现有的集成来对每个训练实例的类别进行预测, 然后将预测结果与真实值标签值进行比较。通过重新标记, 来赋予预测结果不好的实例更好的权重。 所以, 在下次迭代中, 决策树会对先前的错误进行修正。
对实例标签进行重新标记的机制由损失函数来指定, 每次迭代过程中, 梯度迭代树在训练数据上进一步减少损失函数的值。spark.ml为分类问题提供一种损失函数(Log Loss), 为回归问题提供两种损失函数(平方误差与绝对误差)。
Spark.ml 支持二分类以及回归的随机森林算法, 适用于连续特征以及类别特征。
*注意梯度提升树目前不支持多分类问题。
1.2 参数介绍
参数名称 | 类型 | 说明 |
---|---|---|
featuresCol | 字符串 | 特征列名 |
labelCol | 字符串 | 标签列名 |
predictionCol | 字符串 | 预测结果列名 |
checkpointInterval | 整数 | 设置检查点间隔(>=1),或不设置检查点(-1) |
impurity | 字符串 | 计算信息增益的准则(不区分大小写) |
lossType | 字符串 | 损失函数类型 |
maxBins | 整数 | 连续特征离散化的最大数量,以及选择每个节点分裂特征的方式 |
maxDepth | 整数 | 树的最大深度(>=0) |
maxIter | 整数 | 迭代次数(>=0) |
minInfoGain | 双精度 | 分裂节点时所需最小信息增益 |
minInstancesPerNode | 整数 | 分裂后自节点最少包含的实例数量 |
seed | 长整型 | 随机种子 |
subsamplingRate | 双精度 | 学习一棵决策树使用的训练数据比例,范围[0,1] |
stepSize | 双精度 | 每次迭代优化步长 |
1.3 示例
下面的例子中,GBTRegressor仅迭代了一次,在实际操作中是不现实的。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
object GBTRegressor extends App{
val conf = new SparkConf().setAppName("GBTRegressor")
//设置master local[4] 指定本地模式开启模拟worker线程数
conf.setMaster("local[4]")
//创建sparkContext文件
val sc = new SparkContext(conf)
val spark = SparkSession.builder().getOrCreate()
sc.setLogLevel("Error")
val data = spark.read.format("libsvm").load("F:\\data\\sample_libsvm_data.txt")
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
// 自动识别分类特征,并为它们编制索引。
//设置maxCategories,所以具有> 4个不同值的特征被视为连续的。
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data)
// 将数据分成训练和测试集(30%用于测试)。
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a GBT model.
val gbt = new GBTRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10)
// Chain indexer and GBT in a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(featureIndexer, gbt))
// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
predictions.show(3)
/**
+-----+--------------------+--------------------+----------+
|label| features| indexedFeatures|prediction|
+-----+--------------------+--------------------+----------+
| 0.0|(692,[95,96,97,12...|(692,[95,96,97,12...| 0.0|
| 0.0|(692,[122,123,148...|(692,[122,123,148...| 0.0|
| 0.0|(692,[124,125,126...|(692,[124,125,126...| 0.0|
+-----+--------------------+--------------------+----------+
*/
// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
/**
+----------+-----+--------------------+
|prediction|label| features|
+----------+-----+--------------------+
| 0.0| 0.0|(692,[95,96,97,12...|
| 0.0| 0.0|(692,[122,123,148...|
| 0.0| 0.0|(692,[124,125,126...|
+----------+-----+--------------------+
*/
// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("Root Mean Squared Error (RMSE) on test data = " + rmse)
//Root Mean Squared Error (RMSE) on test data = 0.19245008972987526
val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
println("Learned regression GBT model:\n" + gbtModel.toDebugString)
/**
Learned regression GBT model:
GBTRegressionModel (uid=gbtr_3d59f66ce413) with 10 trees
Tree 0 (weight 1.0):
If (feature 406 <= 72.0)
If (feature 99 in {0.0,3.0})
Predict: 0.0
Else (feature 99 not in {0.0,3.0})
Predict: 1.0
Else (feature 406 > 72.0)
Predict: 1.0
Tree 1 (weight 0.1):
Predict: 0.0
Tree 2 (weight 0.1):
Predict: 0.0
Tree 3 (weight 0.1):
Predict: 0.0
Tree 4 (weight 0.1):
Predict: 0.0
Tree 5 (weight 0.1):
Predict: 0.0
Tree 6 (weight 0.1):
Predict: 0.0
Tree 7 (weight 0.1):
Predict: 0.0
Tree 8 (weight 0.1):
Predict: 0.0
Tree 9 (weight 0.1):
Predict: 0.0
*/
}
2. 生存回归
2.1 介绍
在spark.ml中, 我们实施加速失效时间模型( Acceleratedfailure time ), 对于截尾数据它是一个参数化生存回归的模型。它描述了一个有对数生存时间的模型, 所以它也常被称为生存分析的对数线性模型。 与比例危险模型不同, 因AFT模型中每个实例对目标函数的贡献是独立的, 其更容易并行化。
2.2 参数介绍
参数名称 | 类型 | 说明 |
---|---|---|
censorCol | 字符串型 | 检查器列名 |
featuresCol | 字符串 | 特征列名 |
labelCol | 字符串 | 标签列名 |
quantilesCol | 字符串 | 分位数列名 |
fitIntercept | 布尔 | 是否训练拦截对象 |
maxIter | 整数 | 迭代次数(>=0) |
quantileProbabilities | 双精度数组 | 分位数概率数组 |
stepSize | 双精度 | 每次迭代优化步长 |
tol | 双精度 | 迭代算法的收敛性 |
2.3 调用示例
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
/**
* Created by admin on 2018/4/26.
* 生存回归(加速失效时间模型)
*/
object AFTSurvivalRegression extends App{
val conf = new SparkConf().setAppName("AFTSurvivalRegression")
//设置master local[4] 指定本地模式开启模拟worker线程数
conf.setMaster("local[4]")
//创建sparkContext文件
val sc = new SparkContext(conf)
val spark = SparkSession.builder().getOrCreate()
sc.setLogLevel("Error")
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.AFTSurvivalRegression
val training = spark.createDataFrame(Seq(
(1.218, 1.0, Vectors.dense(1.560, -0.605)),
(2.949, 0.0, Vectors.dense(0.346, 2.158)),
(3.627, 0.0, Vectors.dense(1.380, 0.231)),
(0.273, 1.0, Vectors.dense(0.520, 1.151)),
(4.199, 0.0, Vectors.dense(0.795, -0.226))
)).toDF("label", "censor", "features")
val quantileProbabilities = Array(0.3, 0.6)
val aft = new AFTSurvivalRegression()
.setQuantileProbabilities(quantileProbabilities)
.setQuantilesCol("quantiles")
val model = aft.fit(training)
// Print the coefficients, intercept and scale parameter for AFT survival regression
println(s"Coefficients: ${model.coefficients} Intercept: " +
s"${model.intercept} Scale: ${model.scale}")
//Coefficients: [-0.4963111466650682,0.19844437699933642] Intercept: 2.638094615104006 Scale: 1.547234557436469
model.transform(training).show(false)
/**
+-----+------+--------------+------------------+---------------------------------------+
|label|censor|features |prediction |quantiles |
+-----+------+--------------+------------------+---------------------------------------+
|1.218|1.0 |[1.56,-0.605] |5.718979487634987 |[1.1603238947151624,4.9954560102747525]|
|2.949|0.0 |[0.346,2.158] |18.076521181495483|[3.66754584547177,15.789611866277758] |
|3.627|0.0 |[1.38,0.231] |7.381861804239103 |[1.4977061305190842,6.447962612338967] |
|0.273|1.0 |[0.52,1.151] |13.577612501425332|[2.754762148150695,11.859872224069742] |
|4.199|0.0 |[0.795,-0.226]|9.013097744073871 |[1.8286676321297772,7.872826505878406] |
+-----+------+--------------+------------------+---------------------------------------+
*/
}
3. 保序回归
3.1 算法介绍
保序回归是回归算法的一种。
保序回归给定一个有限的实数集合 $Y=y_1,y_2,y_3…y_n$ 代表观察到的响应,以及$X=x_1,x_2,x_3…x_n$ 代表未知的响应值,训练一个模型来最小化下列方程 $f(x) = \sum_{i=1}^n w_i(y_i-x_i)^2$ 其中 $x_1 \le X_2\le X_3…\le X_n$,$w_i$ 为权重是正值。其结果方程为保序回归, 而其解是唯一的。 它可以被视为有顺序约束的最小二乘法问题。 实际上保序回归在拟合原始数据点时是一个单调函数。 我们实现池旁者算法, 它使用并行包旭算法。 训练数据是DataFrame格式, 包含标签、特征值以及权重三列。 另外包旭算法还有一个参数名为isotonic, 其默认值为真, 它指定保序回归为保序(单调递增) 或者反序(单调递减)。
训练返回一个保序回归模型, 可以被用来预测已知或未知特征的标签。 保序回归的结果是分段线性函数, 预测规则如下:
1,如果预测输入与训练中的特征值完全匹配, 则返回响应标签。 如果一个特征值对应多个预测标签, 则返回其中一个, 具体是哪一个未指定。
2,如果预测输入比训练中的特征值都高(或者都低,)则响应返回最高特征值或者最低特征值对应标签。 如果一个特征值对应多个预测标签值, 则响应返回最高值或最低值。
3, 如果预测输入落入两个特征值之间, 则预测将会是一个分段线性函数, 其值有两个最近特征值的预测值计算得到。 如果一个特征对应多个预测标签值, 则使用上述两种情况中的处理方式解决。
3.2 参数介绍
参数名称 | 类型 | 说明 |
---|---|---|
featuresCol | 字符串 | 特征列名 |
labelCol | 字符串 | 标签列名 |
predictionCol | 字符串 | 预测结果列名 |
weightCol | 字符串 | 列权重 |
featuresIndex | 整数 | 当特征列维向量时提供索引值,否则不进行处理 |
isotonic | 布尔 | 输出序列为保序/增序(真)或者反序/降序(假) |
3.3 调用示例
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
/**
* 六、保序回归
*/
object IsotonicRegression extends App{
val conf = new SparkConf().setAppName("IsotonicRegression")
//设置master local[4] 指定本地模式开启模拟worker线程数
conf.setMaster("local[4]")
//创建sparkContext文件
val sc = new SparkContext(conf)
val spark = SparkSession.builder().getOrCreate()
sc.setLogLevel("Error")
val dataset = spark.read.format("libsvm").load("D:\\data\\sample_libsvm_data.txt")
import org.apache.spark.ml.regression.IsotonicRegression
// Trains an isotonic regression model.
val ir = new IsotonicRegression()
val model = ir.fit(dataset)
println(s"Boundaries in increasing order: ${model.boundaries}")
// Boundaries in increasing order: [0.0,0.0]
println(s"Predictions associated with the boundaries: ${model.predictions}")
//Predictions associated with the boundaries: [0.0,1.0]
// Makes predictions.
model.transform(dataset).show()
/**
+-----+--------------------+----------+
|label| features|prediction|
+-----+--------------------+----------+
| 0.0|(692,[127,128,129...| 0.0|
| 1.0|(692,[158,159,160...| 0.0|
| 1.0|(692,[124,125,126...| 0.0|
+-----+--------------------+----------+
*/
}