Spark ML 10.分类算法 2


1. 决策树

1.1 算法简介

决策树以及其继承算法是机器学习分类和回归问题中非常流行的算法,因其易解释性、可处理类别特征、易扩展到多分类问题、不需特征缩放等性质被广泛使用。决策树模式呈树形结构,其中:

  • 每个内部节点 代表一个属性上的测试
  • 每个分支 代表一个测试输出
  • 每个叶节点 代表一种类别

学习时:利用训练数据,根据损失函数最小化的原则建立决策树模型。

预测时:对新的数据,利用决策树模型进行分类。

树集成算法如随机森林以及boosting算法几乎是解决分类和回归问题中表现最优的算法。

决策树是一个贪心算法递归地将特征空间划分为两部分, 在同一个叶子节点的数据最后会拥有同样的标签。 每次划分通过贪心的以获得最大信息增益为目的, 从可选择的分裂方式中选择最佳的分类节点。 节点不纯度由节点所含类别的同质性来衡量。 工具为分类提供两种不纯度衡量(基尼不纯度和熵), 为回归提供一种不纯度衡量(方差)

spark.ml 支持二分类、多分类以及回归的决策树算法, 适用于连续特征以及类别特征。 另外,对于分类问题, 工具可以返回属于每种类别的概率(类别条件概率), 对于回归问题工具可以返回预测在偏置样本上的方差

决策树学习通常包括3个步骤:特征选择、决策树的生成和决策树的剪枝。

1.2 特征选择

特征选择在于选取对训练数据具有分类能力的特征,这样可以提高决策树学习的效率。通常特征选择的准则是信息增益(或信息增益比、基尼指数等),每次计算每个特征的信息增益,并比较它们的大小,选择信息增益最大(信息增益比最大、基尼指数最小)的特征。下面我们重点介绍一下特征选择的准则:信息增益。

首先定义信息论中广泛使用的一个度量标准——熵(entropy),它是表示随机变量不确定性的度量。熵越大,随机变量的不确定性就越大。而信息增益(informational entropy)表示得知某一特征后使得信息的不确定性减少的程度。简单的说,一个属性的信息增益就是由于使用这个属性分割样例而导致的期望熵降低。信息增益、信息增益比和基尼指数的具体定义如下:

信息增益:特征 A 对训练数据集 D 的信息增益($g(D,A)$)定义为:集合 D 的经验熵$H(D)$ 与特征 A 给定条件下 D 的经验条件熵 $H(D|A)$ 之差,即 $g(D,A) = H(D) - H(D|A)$

信息增益比:特征 A 对训练数据集 D 的信息增益比 $g_R(D,A)$ 定义为 其信息增益 $g(D,A)$ 与训练数据集 D 关于特征 A 的值的熵 $H_A(D)$ 之比,即 $g_R(D,A) = \frac{g(D,A)}{H_A(D)}$ 其中 $H_A(D) = - \sum_{i=1}^n \frac{D_i}{D} log2 \frac{D_i}{D}$ ,n 是特征 A 取值的个数。

基尼指数:分类问题中,假设有 K 个类,样本点属于第 K 类的概率为 $P_k$, 则概率分布的基尼指数定义为:$gini(p) = \sum_{k=1}^k P_k(1-P_k) = 1-\sum_{k-1}^k p_k^2$

1.3 决策树的生成

从根结点开始,对结点计算所有可能的特征的信息增益,选择信息增益最大的特征作为结点的特征,由该特征的不同取值建立子结点,再对子结点递归地调用以上方法,构建决策树;直到所有特征的信息增均很小或没有特征可以选择为止,最后得到一个决策树。

决策树需要有停止条件来终止其生长的过程。一般来说最低的条件是:当该节点下面的所有记录都属于同一类,或者当所有的记录属性都具有相同的值时。这两种条件是停止决策树的必要条件,也是最低的条件。在实际运用中一般希望决策树提前停止生长,限定叶节点包含的最低数据量,以防止由于过度生长造成的过拟合问题。

1.4 决策树的剪枝

决策树生成算法递归地产生决策树,直到不能继续下去为止。这样产生的树往往对训练数据的分类很准确,但对未知的测试数据的分类却没有那么准确,即出现过拟合现象。解决这个问题的办法是考虑决策树的复杂度,对已生成的决策树进行简化,这个过程称为剪枝。

决策树的兼职往往通过极小化决策树整体的损失函数来实现,

一般来说,损失函数可以进行如下定义 $C_a(T) = C(T) + a|T|$ ,

  • T 为 任意子树,

  • $C(T)$ 为对训练数据的预测误差(如基尼指数),

  • |T|为子树的叶节点个数,

  • $a \ge 0$ 为参数

  • $C_a(T)$ 为参数时 a 时的子树 T 的整体损失,参数 a 权衡训练数据的拟合程度与模型的复杂度,

  • 对于固定的a ,一定存在是损失函数 $C_a(T)$ 最小子树,将其表示为 $T_a$。

  • 当 a 大的时候,最优子树 $T_a$ 偏小

  • 当 a 小的时候,最优子树 $T_a$ 偏大

1.5 参数说明

参数名称 类型 说明
featuresCol 字符串 特征列名
labelCol 字符串 标签列名
predictionCol 字符串 预测结果列名
probabilityCol 字符串 类别条件概率预测结果列名
checkpointInterval 整数 设置检查点间隔(>=1), 或不设置检查点(-1)
featureSubsetStrategy 字符串 每次分裂候选特征数量
rawPredictionCol 字符串 原始预测
impurity 字符串 计算信息增益的准则(不区分大小写)
maxBins 整数 连续特征离散化的最大数量,以及选择每个节点分裂特征的方式
maxDepth 整数 树的最大深度(>=0)
minInfoGain 双精度 分裂节点时所需最小信息增益
minInstancesPerNode 整数 分裂后自节点最少包含的实例数量
numTrees 整数 训练的树的数量
seed 长整型 随机种子
subsamplingRate 双精度 学习一棵决策树使用的训练数据比例,范围[0,1]
thresholds 双精度数组 多分类预测的阀值,以调整预测结果在各个类别的概率

1.6 分类代码示例

我们以iris数据集为例进行分析。iris以鸢尾花的特征作为数据来源,数据集包含150个数据集,分为3类,每类50个数据,每个数据包含4个属性,是在数据挖掘、数据分类中非常常用的测试集、训练集。决策树可以用于分类和回归,接下来我们将在代码中分别进行介绍。代码步骤:

  1. 导入spark.implicits._,使其支持把一个RDD隐式转换为一个DataFrame。
  2. 我们用case class定义一个schema:Iris,Iris就是我们需要的数据的结构;
  3. 读取文本文件,第一个map把每行的数据用“,”隔开,比如在我们的数据集中,每行被分成了5部分,前4部分是鸢尾花的4个特征,最后一部分是鸢尾花的分类;
  4. 我们这里把特征存储在Vector中,创建一个Iris模式的RDD,然后转化成dataframe;
  5. 然后把刚刚得到的数据注册成一个表iris,注册成这个表之后,我们就可以通过sql语句进行数据查询;
  6. 选出我们需要的数据后,我们可以把结果打印出来查看一下数据。
package hnbian.spark.ml.algorithms.classification


import hnbian.spark.utils.SparkUtils
import hnbian.spark.utils.FileUtils

import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

/**
  * @author hnbian
  * @ Description 决策树分类代码示例
  * @ Date 2019/1/4 15:30
  **/
object DecisionTree extends App {

  val spark = SparkUtils.getSparkSession("DecisionTree", 4)

  val filePath = FileUtils.getFilePath("iris.txt")
  println(filePath)

  import spark.implicits._

  val data = spark.sparkContext
    .textFile(filePath)
    .map(_.split(","))
    .map(p => Iris(Vectors.dense(p(0).toDouble, p(1).toDouble, p(2).toDouble, p(3).toDouble), p(4).toString())).toDF()

  data.show(false)
  /**
    * +-----------------+-----------+
    * |features         |label      |
    * +-----------------+-----------+
    * |[5.1,3.5,1.4,0.2]|Iris-setosa|
    * |[4.9,3.0,1.4,0.2]|Iris-setosa|
    * |[4.7,3.2,1.3,0.2]|Iris-setosa|
    * |[4.6,3.1,1.5,0.2]|Iris-setosa|
    * |[5.0,3.6,1.4,0.2]|Iris-setosa|
    * |[5.4,3.9,1.7,0.4]|Iris-setosa|
    * |[4.6,3.4,1.4,0.3]|Iris-setosa|
    * +-----------------+-----------+
    */
  data.map(t => t(1) + ":" + t(0)).collect().foreach(println)
  //分别获取标签列和特征列,进行索引,并进行了重命名。
  val labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(data)
  //labelIndexer.transform(data)

  val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(data)

  //这里我们设置一个labelConverter,目的是把预测的类别重新转化成字符型的。
  val labelConverter = new IndexToString()
    .setInputCol("prediction")
    .setOutputCol("predictedLabel")
    .setLabels(labelIndexer.labels)

  //接下来,我们把数据集随机分成训练集和测试集,其中训练集占70%。
  val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

  //导入所需要的包
  import org.apache.spark.ml.classification.DecisionTreeClassificationModel
  import org.apache.spark.ml.classification.DecisionTreeClassifier
  import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

  //训练决策树模型,这里我们可以通过setter的方法来设置决策树的参数,
  // 也可以用ParamMap来设置(具体的可以查看spark mllib的官网)。
  // 具体的可以设置的参数可以通过explainParams()来获取。
  val dtClassifier = new DecisionTreeClassifier()
    .setLabelCol("indexedLabel")
    .setFeaturesCol("indexedFeatures")

  //在pipeline中进行设置
  val pipelinedClassifier = new Pipeline()
    .setStages(Array(labelIndexer, featureIndexer, dtClassifier, labelConverter))
  //训练决策树模型
  val modelClassifier = pipelinedClassifier.fit(trainingData)
  //进行预测
  val predictionsClassifier = modelClassifier.transform(testData)
  //查看预测结果
  predictionsClassifier.select("predictedLabel", "label", "features").show(20)

  //模型评估
  val evaluatorClassifier = new MulticlassClassificationEvaluator()
    .setLabelCol("indexedLabel")
    .setPredictionCol("prediction")
    .setMetricName("accuracy")

  val accuracy = evaluatorClassifier.evaluate(predictionsClassifier)

  println(s"准确率 = ${accuracy * 100}%")
  //准确率 = 94.87179487179486%
  println(s"Test Error = ${(1.0 - accuracy)}")
  //Test Error = 0.0625
  val treeModelClassifier = modelClassifier.stages(2).asInstanceOf[DecisionTreeClassificationModel]
  // 查看决策树
  println("Learned classification tree model:\n" + treeModelClassifier.toDebugString)
  /**
  DecisionTreeClassificationModel (uid=dtc_24f632ec622a) of depth 5 with 15 nodes
  If (feature 2 <= 2.5999999999999996)
   Predict: 2.0
  Else (feature 2 > 2.5999999999999996)
   If (feature 2 <= 5.05)
    If (feature 3 <= 1.65)
     Predict: 0.0
    Else (feature 3 > 1.65)
     If (feature 0 <= 6.05)
      If (feature 1 <= 3.05)
       Predict: 1.0
      Else (feature 1 > 3.05)
       Predict: 0.0
     Else (feature 0 > 6.05)
      Predict: 0.0
   Else (feature 2 > 5.05)
    If (feature 3 <= 1.65)
     If (feature 0 <= 6.05)
      Predict: 0.0
     Else (feature 0 > 6.05)
      Predict: 1.0
    Else (feature 3 > 1.65)
     Predict: 1.0
    */


  spark.stop()
}

case class Iris(features: org.apache.spark.ml.linalg.Vector, label: String)

1.7 回归代码示例

package hnbian.sparkml.algorithms.regression

import hnbian.spark.ml.algorithms.classification.DecisionTreeClassifier.{data, labelIndexer}
import hnbian.spark.utils.SparkUtils
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor}
import utils.FileUtils

/**
  * @author hnbian 2019/1/17 17:05
  *  决策树回归代码示例
  */
object DecisionTreeRegressor extends App {
  val spark = SparkUtils.getSparkSession("DecisionTreeRegressor", 4)

  val filePath = FileUtils.getFilePath("iris.txt")
  println(filePath)

  import spark.implicits._

  //加载数据
  val data = spark.sparkContext
    .textFile(filePath)
    .map(_.split(","))
    .map(p => Iris(Vectors.dense(p(0).toDouble, p(1).toDouble, p(2).toDouble, p(3).toDouble), p(4).toString())).toDF()

  //展示数据
  data.show(false)
  /**
    * +-----------------+-----------+
    * |features         |label      |
    * +-----------------+-----------+
    * |[5.1,3.5,1.4,0.2]|Iris-setosa|
    * |[4.9,3.0,1.4,0.2]|Iris-setosa|
    * |[4.7,3.2,1.3,0.2]|Iris-setosa|
    * |[4.6,3.1,1.5,0.2]|Iris-setosa|
    * +-----------------+-----------+
    */
  //定义决策树实例
  val dtRegressor = new DecisionTreeRegressor()
    .setLabelCol("indexedLabel")
    .setFeaturesCol("indexedFeatures")

  //分别获取标签列和特征列,进行索引,并进行了重命名。
  val labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(data)
  //labelIndexer.transform(data)

  val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(data)

  //这里我们设置一个labelConverter,目的是把预测的类别重新转化成字符型的。
  val labelConverter = new IndexToString()
    .setInputCol("prediction")
    .setOutputCol("predictedLabel")
    .setLabels(labelIndexer.labels)

  //在pipeline中进行设置
  val pipelineRegressor = new Pipeline().setStages(Array(labelIndexer, featureIndexer, dtRegressor, labelConverter))

  //接下来,我们把数据集随机分成训练集和测试集,其中训练集占70%。
  val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
  //训练决策树模型
  val modelRegressor = pipelineRegressor.fit(trainingData)
  //预测
  val predictionsRegressor = modelRegressor.transform(testData)
  //展示预测结果
  predictionsRegressor.show()
  /**
    * +-----------------+---------------+------------+-----------------+----------+---------------+
    * |         features|          label|indexedLabel|  indexedFeatures|prediction| predictedLabel|
    * +-----------------+---------------+------------+-----------------+----------+---------------+
    * |[4.4,3.0,1.3,0.2]|    Iris-setosa|         2.0|[4.4,3.0,1.3,0.2]|       2.0|    Iris-setosa|
    * |[4.6,3.1,1.5,0.2]|    Iris-setosa|         2.0|[4.6,3.1,1.5,0.2]|       2.0|    Iris-setosa|
    * |[4.6,3.2,1.4,0.2]|    Iris-setosa|         2.0|[4.6,3.2,1.4,0.2]|       2.0|    Iris-setosa|
    * |[4.6,3.4,1.4,0.3]|    Iris-setosa|         2.0|[4.6,3.4,1.4,0.3]|       2.0|    Iris-setosa|
    * |[4.8,3.4,1.9,0.2]|    Iris-setosa|         2.0|[4.8,3.4,1.9,0.2]|       2.0|    Iris-setosa|
    * |[4.9,3.1,1.5,0.1]|    Iris-setosa|         2.0|[4.9,3.1,1.5,0.1]|       2.0|    Iris-setosa|
    * +-----------------+---------------+------------+-----------------+----------+---------------+
    */

  //模型评估
  val evaluatorRegressor = new RegressionEvaluator()
    .setLabelCol("indexedLabel")
    .setPredictionCol("prediction")
    .setMetricName("rmse")

  //获取标准误差
  val rmse = evaluatorRegressor.evaluate(predictionsRegressor)

  //打印标准误差
  println("Root Mean Squared Error (RMSE) on test data = " + rmse)
  //Root Mean Squared Error (RMSE) on test data = 0.1414213562373095

  val treeModelRegressor = modelRegressor.stages(2).asInstanceOf[DecisionTreeRegressionModel]
  //打印决策树
  println("Learned regression tree model:\n" + treeModelRegressor.toDebugString)
  /**
    *Learned regression tree model:
    *DecisionTreeRegressionModel (uid=dtr_a1688d438ba6) of depth 4 with 11 nodes
    *  If (feature 2 <= 2.45)
    *   Predict: 2.0
    *  Else (feature 2 > 2.45)
    *   If (feature 3 <= 1.75)
    *    If (feature 2 <= 4.95)
    *     If (feature 0 <= 4.95)
    *      Predict: 1.0
    *     Else (feature 0 > 4.95)
    *      Predict: 0.0
    *    Else (feature 2 > 4.95)
    *     If (feature 3 <= 1.65)
    *      Predict: 1.0
    *     Else (feature 3 > 1.65)
    *      Predict: 0.0
    *   Else (feature 3 > 1.75)
    *    Predict: 1.0
    */
}

case class Iris(features: org.apache.spark.ml.linalg.Vector, label: String)

2. 随机森林

2.1 算法简介

随机森林是由多个决策树构成的森林,算法分类结果由这些决策树投票得到,决策树在生成的过程当中分别在行方向和列方向上添加随机过程,行方向上构建决策树时采用放回抽样(bootstraping)得到训练数据,列方向上采用无放回随机抽样得到特征子集,并据此得到其最优切分点,这便是随机森林算法的基本原理。下图给出了随机森林算法分类原理,从图中可以看到,随机森林是一个组合模型,内部仍然是基于决策树,同单一的决策树分类不同的是,随机森林通过多个决策树投票结果进行分类,算法不容易出现过度拟合问题。随机森林同样具有易解释性、可处理类别特征、易扩展到多分类问题、不需特征缩放等性质。

2.2 随机森林在分布式环境下的优化策略

随机森林算法在单机环境下很容易实现,但在分布式环境下特别是在 Spark 平台上,传统单机形式的迭代方式必须要进行相应改进才能适用于分布式环境,这是因为在分布式环境下,数据也是分布式的(如下图 所示),算法设计不得当会生成大量的 IO 操作,例如频繁的网络数据传输,从而影响算法效率。

单机环境下数据存储 分布式环境下数据存储

因此,在 Spark 上进行随机森林算法的实现,需要进行一定的优化,Spark 中的随机森林算法主要实现了三个优化策略:

1. 切分点抽样统计:在单机环境下的决策树对连续变量进行切分点选择时,一般是通过对特征点进行排序,然后取相邻两个数之间的点作为切分点,这在单机环境下是可行的,但如果在分布式环境下如此操作的话,会带来大量的网络传输操作,特别是当数据量达到 PB 级时,算法效率将极为低下。为避免该问题,Spark 中的随机森林在构建决策树时,会对各分区采用一定的子特征策略进行抽样,然后生成各个分区的统计数据,并最终得到切分点。(如下图 所示)

2. 特征装箱(Binning):决策树的构建过程就是对特征的取值不断进行划分的过程,对于离散的特征,如果有 M 个值,最多2^(m-1) -1个划分,如果值是有序的,那么就最多 M-1 个划分。比如年龄特征,有老,中,少 3 个值,如果无序有2^(m-1) -1(((2^3-1)-1) = 3)个划分即 3 种划分:(老|中,少);(老,中|少);(老,少|中);如果是有序的,即按老,中,少的序,那么只有 m-1 个,即 2 种划分,(老|中,少);(老,中|少)。对于连续的特征,其实就是进行范围划分,而划分的点就是 split(切分点),划分出的区间就是 bin。对于连续特征,理论上 split 是无数的,在分布环境下不可能取出所有的值,因此它采用的是(1)中的切点抽样统计方法。如下图采用了不同的切分点之后的标签分类统计。

3. 逐层训练(level-wise training):单机版本的决策数生成过程是通过递归调用(本质上是深度优先)的方式构造树,在构造树的同时,需要移动数据,将同一个子节点的数据移动到一起。此方法在分布式数据结构上无法有效的执行,而且也无法执行,因为数据太大,无法放在一起,所以在分布式环境下采用的策略是逐层构建树节点(本质上是广度优先),这样遍历所有数据的次数等于所有树中的最大层数。每次遍历时,只需要计算每个节点所有切分点统计参数,遍历完后,根据节点的特征划分,决定是否切分,以及如何切分。如下图

2.3 参数说明

参数名称 类型 说明
featuresCol 字符串 特征列名
labelCol 字符串 标签列名
predictionCol 字符串 预测结果列名
probabilityCol 字符串 类别条件概率预测结果列名
checkpointInterval 整数 设置检查点间隔(>=1), 或不设置检查点(-1)
featureSubsetStrategy 字符串 每次分裂候选特征数量
rawPredictionCol 字符串 原始预测
impurity 字符串 计算信息增益的准则(不区分大小写)
maxBins 整数 连续特征离散化的最大数量,以及选择每个节点分裂特征的方式
maxDepth 整数 树的最大深度(>=0)
minInfoGain 双精度 分裂节点时所需最小信息增益
minInstancesPerNode 整数 分裂后自节点最少包含的实例数量
numTrees 整数 训练的树的数量
seed 长整型 随机种子
subsamplingRate 双精度 学习一棵决策树使用的训练数据比例,范围[0,1]
thresholds 双精度数组 多分类预测的阀值,以调整预测结果在各个类别的概率

2.4 分类代码示例

package hnbian.sparkml.algorithms.classification

import utils.FileUtils
import hnbian.spark.utils.SparkUtils
import hnbian.sparkml.utils.Evaluations
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

/**
  * @author hnbian
  * @ Description  随机森林代码示例
  * @ Date 2019/1/10 16:18
  **/
object RandomForest extends App {


  val spark = SparkUtils.getSparkSession("RandomForest", 4)

  val filePath = FileUtils.getFilePath("sample_libsvm_data.txt")

  val data = spark.read.format("libsvm").load(filePath)

  //打印数据
  data.show()
  /**
    * +-----+--------------------+
    * |label|            features|
    * +-----+--------------------+
    * |  1.0|(692,[125,126,153...|
    * |  1.0|(692,[127,128,154...|
    * |  1.0|(692,[154,155,156...|
    * |  1.0|(692,[152,153,154...|
    * |  0.0|(692,[127,128,129...|
    * +-----+--------------------+
    */
  //标签转为向量索引
  val labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(data)

  //自动识别分类特征,并为它们编制索引。
  //设置maxCategories,所以具有> 4个不同值的特征被视为连续的。
  val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(data)

  // 将数据分成训练和测试集(30%用于测试)。
  val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
  // 设置随机森林模型
  val rf = new RandomForestClassifier()
    .setLabelCol("indexedLabel")
    .setFeaturesCol("indexedFeatures")
    .setNumTrees(3)

  // 将索引标签转换回原始标签。
  val labelConverter = new IndexToString()
    .setInputCol("prediction")
    .setOutputCol("predictedLabel")
    .setLabels(labelIndexer.labels)

  // 将每个模型添加到工作流中
  val pipeline = new Pipeline()
    .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))

  // 训练模型
  val model = pipeline.fit(trainingData)

  // 进行预测
  val predictions = model.transform(testData)
  predictions.show(5)
  /**
    * +-----+--------------------+------------+--------------------+-------------+-----------+----------+--------------+
    * |label|            features|indexedLabel|     indexedFeatures|rawPrediction|probability|prediction|predictedLabel|
    * +-----+--------------------+------------+--------------------+-------------+-----------+----------+--------------+
    * |  0.0|(692,[100,101,102...|         1.0|(692,[100,101,102...|    [5.0,5.0]|  [0.5,0.5]|       0.0|           1.0|
    * |  0.0|(692,[121,122,123...|         1.0|(692,[121,122,123...|   [0.0,10.0]|  [0.0,1.0]|       1.0|           0.0|
    * |  0.0|(692,[124,125,126...|         1.0|(692,[124,125,126...|   [0.0,10.0]|  [0.0,1.0]|       1.0|           0.0|
    * |  0.0|(692,[125,126,127...|         1.0|(692,[125,126,127...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
    * |  0.0|(692,[126,127,128...|         1.0|(692,[126,127,128...|    [2.0,8.0]|  [0.2,0.8]|       1.0|           0.0|
    * +-----+--------------------+------------+--------------------+-------------+-----------+----------+--------------+
    */


  //模型评估
  val (accuracy, precision, recall, f1) = Evaluations.multiClassEvaluate(predictions)
  println("\n\n========= 评估结果 ==========")
  println(s"\n准确率:$accuracy")
  println(s"加权精确率:$precision")
  println(s"加权召回率:$recall")
  println(s"F1值:$f1")
  /**
    * 准确率:0.96875
    * 加权精确率:0.9711538461538461
    * 加权召回率:0.96875
    * F1值:0.9689743589743589
    */
  val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
  println("Learned classification forest model:\n" + rfModel.toDebugString)
  /**
    * Learned classification forest model:
    * RandomForestClassificationModel (uid=rfc_82f005014192) with 3 trees
    * Tree 0 (weight 1.0):
    * If (feature 512 <= 8.0)
    * If (feature 454 <= 12.0)
    * If (feature 486 <= 212.0)
    * Predict: 0.0
    * Else (feature 486 > 212.0)
    * Predict: 1.0
    * Else (feature 454 > 12.0)
    * Predict: 1.0
    * Else (feature 512 > 8.0)
    * Predict: 1.0
    *
    * Tree 1 (weight 1.0):
    * If (feature 462 <= 63.0)
    * If (feature 492 <= 190.5)
    * Predict: 1.0
    * Else (feature 492 > 190.5)
    * Predict: 0.0
    * Else (feature 462 > 63.0)
    * Predict: 0.0
    *
    * Tree 2 (weight 1.0):
    * If (feature 524 <= 21.0)
    * If (feature 429 <= 7.0)
    * Predict: 0.0
    * Else (feature 429 > 7.0)
    * Predict: 1.0
    * Else (feature 524 > 21.0)
    * Predict: 1.0
    */
}

3.3 回归代码示例

package hnbian.sparkml.algorithms.regression

import hnbian.spark.utils.SparkUtils
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
import utils.FileUtils

/**
  * @author hnbian 2019/1/18 11:46
  *         随机森林回归代码示例
  */
object RandomForestRegressor extends App {
  val filePath = FileUtils.getFilePath("sample_libsvm_data.txt")
  val spark = SparkUtils.getSparkSession("RandomForestRegressor", 4)

  val data = spark.read.format("libsvm").load(filePath)

  data.show()
  /**
    * +-----+--------------------+
    * |label|            features|
    * +-----+--------------------+
    * |  1.0|(692,[125,126,153...|
    * |  1.0|(692,[127,128,154...|
    * |  1.0|(692,[154,155,156...|
    * |  1.0|(692,[152,153,154...|
    * +-----+--------------------+
    */
  //自动识别分类特征,并为它们编制索引。
  //设置maxCategories,所以具有> 4个不同值的特征被视为连续的。
  val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(data)

  //将数据分成训练和测试集(30%用于测试)。
  val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

  // Train a RandomForest model.
  val rf = new RandomForestRegressor()
    .setLabelCol("label")
    .setFeaturesCol("indexedFeatures")
    .setNumTrees(4)

  // 创建工作流并添加stage
  val pipeline = new Pipeline()
    .setStages(Array(featureIndexer, rf))

  // 训练模型
  val model = pipeline.fit(trainingData)

  // 进行预测
  val predictions = model.transform(testData)
  predictions.show(30)
  /**
    * +-----+--------------------+--------------------+----------+
    * |label|            features|     indexedFeatures|prediction|
    * +-----+--------------------+--------------------+----------+
    * |  0.0|(692,[100,101,102...|(692,[100,101,102...|       0.1|
    * |  0.0|(692,[123,124,125...|(692,[123,124,125...|       0.0|
    * |  0.0|(692,[124,125,126...|(692,[124,125,126...|       0.0|
    * +-----+--------------------+--------------------+----------+
    */
  // 选择(预测,真实标签)并计算测试错误。
  val evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse")
  //打印标准误差
  val rmse = evaluator.evaluate(predictions)
  println("Root Mean Squared Error (RMSE) on test data = " + rmse)
  //Root Mean Squared Error (RMSE) on test data = 0.19695964928958382
  val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
  println("Learned regression forest model:\n" + rfModel.toDebugString)

  /**
    * Learned regression forest model:
    * RandomForestRegressionModel (uid=rfr_54d456fb4103) with 4 trees
    *   Tree 0 (weight 1.0):
    *     If (feature 434 <= 70.5)
    *      Predict: 0.0
    *     Else (feature 434 > 70.5)
    *      Predict: 1.0
    *   Tree 1 (weight 1.0):
    *     If (feature 489 <= 1.5)
    *      Predict: 0.0
    *     Else (feature 489 > 1.5)
    *      Predict: 1.0
    *   Tree 2 (weight 1.0):
    *     If (feature 434 <= 70.5)
    *      Predict: 0.0
    *     Else (feature 434 > 70.5)
    *      Predict: 1.0
    *   Tree 3 (weight 1.0):
    *     If (feature 490 <= 29.0)
    *      Predict: 0.0
    *     Else (feature 490 > 29.0)
    *      Predict: 1.0
    */

}

3. 梯度提升树

3.1 算法介绍

梯度提升树(Gradient Boosting Decison Tree)(GBDT)有很多简称,有GBT(Gradient Boosting Tree), GTB(Gradient Tree Boosting ), GBRT(Gradient Boosting Regression Tree), MART(Multiple Additive Regression Tree),其实都是指的同一种算法,本文统一简称GBDT。

梯度提升树是一种决策树的集成算法。 它通过反复迭代训练决策树来最小化损失函数。 与决策树类似,梯度提升树具有可处理类别特征、易扩展到多分类问题、不需要特征缩放等性质。 spark.ml通过使用现有 decision tree 工具来实现

梯度提升树依次迭代训练一系列的决策树。 在一次迭代中, 算法使用现有的集成来对每个训练示例的类别进行预测, 然后将预测结果与真实的标签值进行比较。 通过重新标记, 来赋予预测结果不好的实例更高的权重。 所以在下次迭代中, 决策树会对先前的错误进行修正。

对实例标签进行重新标记的机制由损失函数来指定。 每次迭代过程中,梯度迭代树在训练数据上进一步减少损失函数的值。 spark.ml为分类问题提供一种损失函数(Log Loss), 为回归问题提供两种损失函数(平方误差与绝对误差)

GBDT支持二分类以及回归的随机森林算法, 适用于连续特征以及类别特征。

3.2 梯度提升树与随机分林

梯度提升树随机森林 都是用于决策树树集成的算法,但训练过程是不同的。 有几个实际的权衡点:

  • 梯度提升树 一次训练一棵树,因此它们比随机森林需要更长时间的训练。 随机森林可以并行训练多棵树。
  • 使用具有GBDT训练较小(较浅)树比使用随机森林更有优势,并且训练较小树需要的时间更短。
  • 随机森林可能不太容易过度拟合。在随机森林中训练更多树可以降低过拟合的可能性,但是使用GBT训练更多树会增加过拟合的可能性。(在统计语言中,随机森林通过使用更多树来减少方差,而GBDT通过使用更多树来减少偏差。)
  • 随机森林更容易调整,因为性能随树数量增加而改善(对于GBDT来说,如果树木数量增长太大,性能可能会开始降低)。

简而言之,两种算法都很有效,并且应基于特定数据集来选择合适的算法。

3.3 偏差 和 方差 的区别

  偏差:描述的是预测值(估计值)的期望与真实值之间的差距。偏差越大,越偏离真实数据。
  方差:描述的是预测值的变化范围,离散程度,也就是离其期望值的距离。方差越大,数据的分布越分散。(如下图)

Low Variance(低方差) High Variance(高方差)
Low Bias
(低偏差)
Hish Bias
(高偏差)

3.4 参数列表

maxDepth 树的最大深度,0意味着只有一个叶节点,1意味着有一个内部节点+两个叶节点。 支持:>=0 默认:5
maxBins 用于离散连续特征的最大分桶数,用于每个节点特征分裂时分裂点的选择,分桶数越大意味着粒度越高。 支持:>=2并且>=任一类别特征的分类数 默认:32
minInstancesPerNode 分裂后每个子节点含有的最小样本数,如果分裂后左孩子或右孩子含有的样本数少于该值,则该分裂无效。 支持:>=1 默认:1
minInfoGain 树节点分裂时的最小信息增益。 支持:>=0.0 默认:0.0
maxMemoryInMB 每次会对一组节点进行切分,分组是按照树的层次逐步进行。每组需要切分的节点个数视内存大小而定,如果内存太小,每次只能切分一个节点。单位MB 默认:256MB
cacheNodeIds 如果为true,算法会为每个实例缓存树节点ID;如果为false,算法会将树传递给执行器用于匹配实例和树节点。缓存有利于加速训练深度较大的树,用户可以通过参数checkpointInterval设置缓存被检查的频率或者不检查。 默认:false
checkpointInterval 表示缓存的树节点ID的检查频率,当cacheNodeIds为true并且检查目录(checkpoint directory)通过sparkContext设置过才有效。 支持:>=1或者-1代表不检查,10意味着每10次迭代检查一次。 默认:10
Impurity 用于计算信息增益的准则。不支持通过GBTClassifier.setImpurity方法设置该值。 支持:entropy、gini 默认:gini
subsamplingRate 每一次迭代训练基学习器(决策树)时所使用的训练数据集的百分比。 支持:(0, 1] 默认:1.0
Seed 随机数种子 默认:this.getClass.getName.hashCode.toLong
maxIter 最大迭代次数 支持:>=0 默认:20
stepSize 学习率(learning rate/step size)参数,用于缩小(shrinking)每个基学习器的贡献。 支持:(0, 1] 默认:0.1
lossType GBT最小化的损失函数,不区分大小写。 支持:logistic 默认:logistic

3.5 关键参数

有三个关键参数需要仔细分析:loss(损失函数的类型)、numIterations(迭代次数)、learningRate(学习率)。可以通过下面的方式设置

// 定义GBTClassifier,注意在Spark中输出(预测列)都有默认的设置,可以不自己设置
  val gbtClassifier = new GBTClassifier()
    .setLabelCol("indexedLabel")// 输入label
    .setMaxIter(10) // 最大迭代次数(numIteration)
    .setStepSize(0.5) // 设置学习率(learningRate)
    .setImpurity("entropy") // 计算信息增益的准则 or "gini"
    .setLossType("logistic") // 损失函数的类型(loss)

1. loss (损失函数类型)

Spark中已经实现的损失函数类型有以下三种,注意每一种都只适合一类问题,要么是回归,要么是分类。
  分类只可选择 Log Loss,回归问题可选择平方误差和绝对值误差。分别又称为L2损失和L1损失。绝对值误差(L1损失)在处理带有离群值的数据时比L2损失更加具有鲁棒性。

Loss Task Formula Description
Log Loss Classification $2\sum_{i=1}^n log(1 + exp(-2y_iF((x_i))))$ Twice binomial negative log likelihood.
Squared Error Regression 也称为L2损失。回归任务的默认损失类型。
Absolute Error Regression 也称为L1损失。对于异常值比Squared Error更强大

2. numIterations(迭代次数)

GBDT迭代次数,每一次迭代将产生一棵树,因此numIterations也是算法中所包含的树的数目。增加numIterations会提高训练集数据预测准确率(注意是训练集数据上的准确率哦)。但是相应的会增加训练的时间。如何选择合适的参数防止过拟合,一定需要做验证。将数据分为两份,一份是训练集,一份是验证集。
  随着迭代次数的增加,一开始在验证集上预测误差会减小,迭代次数增大到一定程度后误差反而会增加,那么通过准确度vs.迭代次数曲线可以选择最合适的numIterations。

3. learningRate(学习率)

这个参数一般不需要调试,如果发现算法面对某个数据集,变现得极其不稳定,那么就要减小学习率再试一下,一般会有改善(稳定性变好)。小的学习率(步长)肯定会增加训练的时间。

3.6 代码示例

package hnbian.sparkml.algorithms.classification

import hnbian.spark.utils.SparkUtils
import hnbian.sparkml.utils.Evaluations
import utils.FileUtils
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

/**
  * @author hnbian
  * @ Description
  * @ Date 2019/1/11 10:30
  **/
object GBTClassifier extends App {

  val spark = SparkUtils.getSparkSession("GBTClassifier", 4)

  val filePath = FileUtils.getFilePath("sample_libsvm_data.txt")

  //加载数据
  val data = spark.read.format("libsvm").load(filePath)
  //展示加载出的数据
  data.show()
  /**
    * +-----+--------------------+
    * |label|            features|
    * +-----+--------------------+
    * |  1.0|(692,[125,126,153...|
    * |  1.0|(692,[127,128,154...|
    * |  1.0|(692,[154,155,156...|
    * |  1.0|(692,[152,153,154...|
    * |  0.0|(692,[127,128,129...|
    * |  1.0|(692,[158,159,160...|
    * +-----+--------------------+
    */

  //定义GBTClassifier,注意在Spark中输出(预测列)都有默认的设置,可以不自己设置
  val gbtClassifier = new GBTClassifier()
    .setLabelCol("indexedLabel")//输入label
    .setFeaturesCol("indexedFeatures")
    .setMaxIter(3) //最大迭代次数(numIteration)
    .setStepSize(0.5) // 设置学习率(learningRate)
    .setImpurity("entropy") //计算信息增益的准则 or "gini"
    .setLossType("logistic") // 损失函数的类型(loss)

  // 索引标签,将元数据添加到标签列。
  // Fit on 整个数据集包含索引中的所有标签。
  val labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(data)

  //自动识别分类特征,并为它们编制索引。
  //设置maxCategories,所以具有> 4个不同值的特征被视为连续的。
  val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(data)

  // 将数据分成训练和测试集(30%用于测试)。
  val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

  // 将索引标签转换回原始标签。
  val labelConverter = new IndexToString()
    .setInputCol("prediction")
    .setOutputCol("predictedLabel")
    .setLabels(labelIndexer.labels)

  // Chain indexers and GBT in a Pipeline.链式索引器和管道中的GBT。
  val pipeline = new Pipeline()
    .setStages(Array(labelIndexer, featureIndexer, gbtClassifier, labelConverter))

  // 训练模型。 这也运行索引。
  val model = pipeline.fit(trainingData)

  // 进行预测
  val predictions = model.transform(testData)
  predictions.show(3)
  /**
    * +-----+--------------------+------------+--------------------+--------------------+--------------------+----------+--------------+
    * |label|            features|indexedLabel|     indexedFeatures|       rawPrediction|         probability|prediction|predictedLabel|
    * +-----+--------------------+------------+--------------------+--------------------+--------------------+----------+--------------+
    * |  0.0|(692,[98,99,100,1...|         1.0|(692,[98,99,100,1...|[-1.9064626927749...|[0.02160633849263...|       1.0|           0.0|
    * |  0.0|(692,[100,101,102...|         1.0|(692,[100,101,102...|[-0.7639208415359...|[0.17830969280890...|       1.0|           0.0|
    * |  0.0|(692,[121,122,123...|         1.0|(692,[121,122,123...|[-1.9064626927749...|[0.02160633849263...|       1.0|           0.0|
    * +-----+--------------------+------------+--------------------+--------------------+--------------------+----------+--------------+
    */

  //模型评估
  val (accuracy, precision, recall, f1) = Evaluations.multiClassEvaluate(predictions)
  println("\n\n========= 评估结果 ==========")
  println(s"\n准确率:$accuracy")
  println(s"加权精确率:$precision")
  println(s"加权召回率:$recall")
  println(s"F1值:$f1")
  /**
    * ========= 评估结果=========
    *
    * 准确率:0.9428571428571428
    * 加权精确率:0.9488721804511278
    * 加权召回率:0.9428571428571428
    * F1值:0.9427637721755369
    */

  val gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel]
  println("Learned classification GBT model:\n" + gbtModel.toDebugString)
  /**
    * Learned classification GBT model:
    * GBTClassificationModel (uid=gbtc_38531a8df50a) with 3 trees
    * Tree 0 (weight 1.0):
    * If (feature 406 <= 126.5)
    * If (feature 99 in {2.0})
    * Predict: -1.0
    * Else (feature 99 not in {2.0})
    * Predict: 1.0
    * Else (feature 406 > 126.5)
    * Predict: -1.0
    * Tree 1 (weight 0.5):
    * If (feature 434 <= 79.5)
    * If (feature 184 <= 253.5)
    * Predict: 0.47681168808847024
    * Else (feature 184 > 253.5)
    * Predict: -0.4768116880884694
    * Else (feature 434 > 79.5)
    * If (feature 351 <= 70.5)
    * Predict: -0.4768116880884702
    * Else (feature 351 > 70.5)
    * Predict: -0.47681168808847035
    * Tree 2 (weight 0.5):
    * If (feature 490 <= 27.5)
    * If (feature 99 in {2.0})
    * Predict: -0.3099993569763609
    * Else (feature 99 not in {2.0})
    * If (feature 100 <= 126.5)
    * Predict: 0.3099993569763608
    * Else (feature 100 > 126.5)
    * Predict: 0.30999935697636083
    * Else (feature 490 > 27.5)
    * If (feature 124 <= 188.5)
    * Predict: -0.3099993569763608
    * Else (feature 124 > 188.5)
    * Predict: -0.30999935697636083
    */
}

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Spark ML 11.分类算法 3 Spark ML 11.分类算法 3
1. 多层感知机1.1 算法介绍多层感知器 (MLP, Multilayer Perceptron) 是一种多层的前馈神经网络模型,所谓前馈型神经网络,指其从输入层开始只接收前一层的输入,并把计算结果输出到后一层,并不会给前一层有所反馈,整
2019-01-20
下一篇 
Spark ML 9.分类算法 1 Spark ML 9.分类算法 1
1. 逻辑回归逻辑回归(Logistic Regression)是预测分类的流程方法,它是 广义线性模型 的一个特例来预测结果分类的可能性。在spar.ml 逻辑回归中可以使用二项式逻辑回归来预测二进制结果,也可以通过多项式逻辑回归来预测多
2019-01-08
  目录