Spark ML 13.回归算法 2


1. 梯度提升树回归

1.1 算法简介

梯度提升树是一种决策树的继承算法。它通过反复迭代训练决策树来最小化损失函数。 决策树类似,梯度提升树具有可处理类别特征,易扩展到多分类问题, 不需要特征缩放等性质。 spark.ml 通过使用现有 decision tree 工具来实现。

梯度提升树依次迭代训练一系列的决策树。在一次迭代中, 算法使用现有的集成来对每个训练实例的类别进行预测, 然后将预测结果与真实值标签值进行比较。通过重新标记, 来赋予预测结果不好的实例更好的权重。 所以, 在下次迭代中, 决策树会对先前的错误进行修正。

对实例标签进行重新标记的机制由损失函数来指定, 每次迭代过程中, 梯度迭代树在训练数据上进一步减少损失函数的值。spark.ml为分类问题提供一种损失函数(Log Loss), 为回归问题提供两种损失函数(平方误差与绝对误差)。

Spark.ml 支持二分类以及回归的随机森林算法, 适用于连续特征以及类别特征。

*注意梯度提升树目前不支持多分类问题。

1.2 参数介绍

参数名称 类型 说明
featuresCol 字符串 特征列名
labelCol 字符串 标签列名
predictionCol 字符串 预测结果列名
checkpointInterval 整数 设置检查点间隔(>=1),或不设置检查点(-1)
impurity 字符串 计算信息增益的准则(不区分大小写)
lossType 字符串 损失函数类型
maxBins 整数 连续特征离散化的最大数量,以及选择每个节点分裂特征的方式
maxDepth 整数 树的最大深度(>=0)
maxIter 整数 迭代次数(>=0)
minInfoGain 双精度 分裂节点时所需最小信息增益
minInstancesPerNode 整数 分裂后自节点最少包含的实例数量
seed 长整型 随机种子
subsamplingRate 双精度 学习一棵决策树使用的训练数据比例,范围[0,1]
stepSize 双精度 每次迭代优化步长

1.3 示例

下面的例子中,GBTRegressor仅迭代了一次,在实际操作中是不现实的。


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

object GBTRegressor extends App{
  val conf = new SparkConf().setAppName("GBTRegressor")
  //设置master local[4] 指定本地模式开启模拟worker线程数
  conf.setMaster("local[4]")
  //创建sparkContext文件
  val sc = new SparkContext(conf)
  val spark = SparkSession.builder().getOrCreate()
  sc.setLogLevel("Error")
  val data = spark.read.format("libsvm").load("F:\\data\\sample_libsvm_data.txt")

  import org.apache.spark.ml.Pipeline
  import org.apache.spark.ml.evaluation.RegressionEvaluator
  import org.apache.spark.ml.feature.VectorIndexer
  import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}

  // 自动识别分类特征,并为它们编制索引。
  //设置maxCategories,所以具有> 4个不同值的特征被视为连续的。
  val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(data)

  // 将数据分成训练和测试集(30%用于测试)。
  val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

  // Train a GBT model.
  val gbt = new GBTRegressor()
    .setLabelCol("label")
    .setFeaturesCol("indexedFeatures")
    .setMaxIter(10)

  // Chain indexer and GBT in a Pipeline.
  val pipeline = new Pipeline()
    .setStages(Array(featureIndexer, gbt))

  // Train model. This also runs the indexer.
  val model = pipeline.fit(trainingData)

  // Make predictions.
  val predictions = model.transform(testData)
  predictions.show(3)
  /**
+-----+--------------------+--------------------+----------+
|label|            features|     indexedFeatures|prediction|
+-----+--------------------+--------------------+----------+
|  0.0|(692,[95,96,97,12...|(692,[95,96,97,12...|       0.0|
|  0.0|(692,[122,123,148...|(692,[122,123,148...|       0.0|
|  0.0|(692,[124,125,126...|(692,[124,125,126...|       0.0|
+-----+--------------------+--------------------+----------+
    */
  // Select example rows to display.
  predictions.select("prediction", "label", "features").show(5)
  /**
+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|(692,[95,96,97,12...|
|       0.0|  0.0|(692,[122,123,148...|
|       0.0|  0.0|(692,[124,125,126...|
+----------+-----+--------------------+
    */
  // Select (prediction, true label) and compute test error.
  val evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse")
  val rmse = evaluator.evaluate(predictions)

  println("Root Mean Squared Error (RMSE) on test data = " + rmse)
//Root Mean Squared Error (RMSE) on test data = 0.19245008972987526
  val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
  println("Learned regression GBT model:\n" + gbtModel.toDebugString)
  /**
Learned regression GBT model:
GBTRegressionModel (uid=gbtr_3d59f66ce413) with 10 trees
  Tree 0 (weight 1.0):
    If (feature 406 <= 72.0)
     If (feature 99 in {0.0,3.0})
      Predict: 0.0
     Else (feature 99 not in {0.0,3.0})
      Predict: 1.0
    Else (feature 406 > 72.0)
     Predict: 1.0
  Tree 1 (weight 0.1):
    Predict: 0.0
  Tree 2 (weight 0.1):
    Predict: 0.0
  Tree 3 (weight 0.1):
    Predict: 0.0
  Tree 4 (weight 0.1):
    Predict: 0.0
  Tree 5 (weight 0.1):
    Predict: 0.0
  Tree 6 (weight 0.1):
    Predict: 0.0
  Tree 7 (weight 0.1):
    Predict: 0.0
  Tree 8 (weight 0.1):
    Predict: 0.0
  Tree 9 (weight 0.1):
    Predict: 0.0
    */
}

2. 生存回归

2.1 介绍

在spark.ml中, 我们实施加速失效时间模型( Acceleratedfailure time ), 对于截尾数据它是一个参数化生存回归的模型。它描述了一个有对数生存时间的模型, 所以它也常被称为生存分析的对数线性模型。 与比例危险模型不同, 因AFT模型中每个实例对目标函数的贡献是独立的, 其更容易并行化。

2.2 参数介绍

参数名称 类型 说明
censorCol 字符串型 检查器列名
featuresCol 字符串 特征列名
labelCol 字符串 标签列名
quantilesCol 字符串 分位数列名
fitIntercept 布尔 是否训练拦截对象
maxIter 整数 迭代次数(>=0)
quantileProbabilities 双精度数组 分位数概率数组
stepSize 双精度 每次迭代优化步长
tol 双精度 迭代算法的收敛性

2.3 调用示例


import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

/**
  * Created by admin on 2018/4/26.
  * 生存回归(加速失效时间模型)
  */
object AFTSurvivalRegression extends App{
  val conf = new SparkConf().setAppName("AFTSurvivalRegression")
  //设置master local[4] 指定本地模式开启模拟worker线程数
  conf.setMaster("local[4]")
  //创建sparkContext文件
  val sc = new SparkContext(conf)
  val spark = SparkSession.builder().getOrCreate()
  sc.setLogLevel("Error")

  import org.apache.spark.ml.linalg.Vectors
  import org.apache.spark.ml.regression.AFTSurvivalRegression

  val training = spark.createDataFrame(Seq(
    (1.218, 1.0, Vectors.dense(1.560, -0.605)),
    (2.949, 0.0, Vectors.dense(0.346, 2.158)),
    (3.627, 0.0, Vectors.dense(1.380, 0.231)),
    (0.273, 1.0, Vectors.dense(0.520, 1.151)),
    (4.199, 0.0, Vectors.dense(0.795, -0.226))
  )).toDF("label", "censor", "features")
  val quantileProbabilities = Array(0.3, 0.6)
  val aft = new AFTSurvivalRegression()
    .setQuantileProbabilities(quantileProbabilities)
    .setQuantilesCol("quantiles")

  val model = aft.fit(training)

  // Print the coefficients, intercept and scale parameter for AFT survival regression
  println(s"Coefficients: ${model.coefficients} Intercept: " +
    s"${model.intercept} Scale: ${model.scale}")
  //Coefficients: [-0.4963111466650682,0.19844437699933642] Intercept: 2.638094615104006 Scale: 1.547234557436469
  model.transform(training).show(false)
  /**
+-----+------+--------------+------------------+---------------------------------------+
|label|censor|features      |prediction        |quantiles                              |
+-----+------+--------------+------------------+---------------------------------------+
|1.218|1.0   |[1.56,-0.605] |5.718979487634987 |[1.1603238947151624,4.9954560102747525]|
|2.949|0.0   |[0.346,2.158] |18.076521181495483|[3.66754584547177,15.789611866277758]  |
|3.627|0.0   |[1.38,0.231]  |7.381861804239103 |[1.4977061305190842,6.447962612338967] |
|0.273|1.0   |[0.52,1.151]  |13.577612501425332|[2.754762148150695,11.859872224069742] |
|4.199|0.0   |[0.795,-0.226]|9.013097744073871 |[1.8286676321297772,7.872826505878406] |
+-----+------+--------------+------------------+---------------------------------------+
    */
}

3. 保序回归

3.1 算法介绍

保序回归是回归算法的一种。

保序回归给定一个有限的实数集合 $Y=y_1,y_2,y_3…y_n$ 代表观察到的响应,以及$X=x_1,x_2,x_3…x_n$ 代表未知的响应值,训练一个模型来最小化下列方程 $f(x) = \sum_{i=1}^n w_i(y_i-x_i)^2$ 其中 $x_1 \le X_2\le X_3…\le X_n$,$w_i$ 为权重是正值。其结果方程为保序回归, 而其解是唯一的。 它可以被视为有顺序约束的最小二乘法问题。 实际上保序回归在拟合原始数据点时是一个单调函数。 我们实现池旁者算法, 它使用并行包旭算法。 训练数据是DataFrame格式, 包含标签、特征值以及权重三列。 另外包旭算法还有一个参数名为isotonic, 其默认值为真, 它指定保序回归为保序(单调递增) 或者反序(单调递减)。

训练返回一个保序回归模型, 可以被用来预测已知或未知特征的标签。 保序回归的结果是分段线性函数, 预测规则如下:

1,如果预测输入与训练中的特征值完全匹配, 则返回响应标签。 如果一个特征值对应多个预测标签, 则返回其中一个, 具体是哪一个未指定。

2,如果预测输入比训练中的特征值都高(或者都低,)则响应返回最高特征值或者最低特征值对应标签。 如果一个特征值对应多个预测标签值, 则响应返回最高值或最低值。

3, 如果预测输入落入两个特征值之间, 则预测将会是一个分段线性函数, 其值有两个最近特征值的预测值计算得到。 如果一个特征对应多个预测标签值, 则使用上述两种情况中的处理方式解决。

3.2 参数介绍

参数名称 类型 说明
featuresCol 字符串 特征列名
labelCol 字符串 标签列名
predictionCol 字符串 预测结果列名
weightCol 字符串 列权重
featuresIndex 整数 当特征列维向量时提供索引值,否则不进行处理
isotonic 布尔 输出序列为保序/增序(真)或者反序/降序(假)

3.3 调用示例

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

/**
  * 六、保序回归
  */
object IsotonicRegression extends App{

  val conf = new SparkConf().setAppName("IsotonicRegression")
  //设置master local[4] 指定本地模式开启模拟worker线程数
  conf.setMaster("local[4]")
  //创建sparkContext文件
  val sc = new SparkContext(conf)
  val spark = SparkSession.builder().getOrCreate()
  sc.setLogLevel("Error")

  val dataset = spark.read.format("libsvm").load("D:\\data\\sample_libsvm_data.txt")


  import org.apache.spark.ml.regression.IsotonicRegression

  // Trains an isotonic regression model.
  val ir = new IsotonicRegression()
  val model = ir.fit(dataset)

  println(s"Boundaries in increasing order: ${model.boundaries}")
  //  Boundaries in increasing order: [0.0,0.0]
  println(s"Predictions associated with the boundaries: ${model.predictions}")
  //Predictions associated with the boundaries: [0.0,1.0]

  // Makes predictions.
  model.transform(dataset).show()
  /**
+-----+--------------------+----------+
|label|            features|prediction|
+-----+--------------------+----------+
|  0.0|(692,[127,128,129...|       0.0|
|  1.0|(692,[158,159,160...|       0.0|
|  1.0|(692,[124,125,126...|       0.0|
+-----+--------------------+----------+
    */
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Spark ML 14.聚类算法 Spark ML 14.聚类算法
1. K均值1.1 算法介绍:K 均值(K-means) 是一个常用的聚类算法来讲数据按预定的簇数进行剧集。k-means 算法的基本思想史: 以空间K个点为中心进行聚类, 对靠近他们的对象归类。 通过迭代的方法, 主次更新各聚类中心的值,
2019-02-25
下一篇 
Spark ML 12.回归算法 1 Spark ML 12.回归算法 1
1. 广义线性模型1. 算法介绍与线性回归假设输出服从高斯分布不同, 广义线性模型(GLMs)指定先行模型的因变量 Y¡ 服从指数型分布。Spark的GeneralizedLinearRegression接口允许指定GLMs包括线性回归、泊
2019-01-25
  目录