1. 广义线性模型 1. 算法介绍 与线性回归假设输出服从高斯分布不同, 广义线性模型(GLMs)指定先行模型的因变量 Y¡ 服从指数型分布。Spark的GeneralizedLinearRegression接口允许指定GLMs包括线性回归、泊松回归、逻辑回归等来处理多种预测问题。 目前spark.ml 仅支持指数型分布家族中的一部分如下:
Family
Response Type
Supported Links
Gaussian
Continuous
Identity*, Log, Inverse
Binomial
Binary
Logit*, Probit, CLogLog
Poisson
Count
Log*, Identity, Sqrt
Gamma
Continuous
Inverse*, Idenity, Log
Tweedie
Zero-inflated continuous
Power link function
*注意目前Spark在 GeneralizedLinearRegression 仅支持最多4096个特征, 如果特征超过4096个将会发生异常。对于线性回归和逻辑回归,如果模型特征数量不断增长, 则可通过LinerRegression 和logisticRegression来训练。
GLMs 桥求指数型分布可以为正则或者自然数形式。 自然指数分布为如下形式:$f_{Y(y|\theta,\tau)}=h(y,\tau) exp (\frac{\theta*y-A(\theta)}{d(\tau)})$ 其中 $\theta$ 是强度参数,$\tau$ 是分散度参数。
在 GLM 中响应变量 $Y_i$ 服从自然指数族分布:$y_i \sim f(.|\theta_i,\tau)$
其中强度参数 $\theta_i$ 与响应变量 $\mu_i$的期望值联系如下:$\mu_i = A’(\theta_i)$
其中 $A’(\theta_i)$ 由所选择的分布形式决定。
GLMs 同样允许指定连接函数,连接函数决定了响应变量期望值与现行预测期之间的关系:$g(\mu_i) = \eta_i = x_i^T \beta$
通常,连接函数选择:如 $A’ = g^{-1}$ 在强度参数矛现行预测期之间产生一个简单的关系。
这种情况下,连接函数也称为正则连接函数 $\theta_i = A’^{-1}(\mu_i) = g(g^{-1}(\eta_i)) = \eta_i$
Spark的GeneralizedLinearRegression接口提供汇总统计来诊断GLM模型的你和程度, 包括残差, P值。Akaike信息准则以及其它。
2. 参数说明
参数名称
类型
说明
featuresCol
字符串
特征列名
labelCol
字符串
标签列名
predictionCol
字符串
预测结果列名
family
字符串
模型中使用的误差分布类型 默认 :gaussian
fitIntercept
布尔
是否训练拦截对象 默认true
link
字符串
连接函数名,描述线性预测器和分布函数均值之间关系 在family 不是“tweedie” 时使用
linkPredictiongCol
字符串
连接函数(线性预测器列名)
maxIter
整数
最多迭代次数(>=0)
regParam
双精度
正则化参数(>=0)
solver
字符串
优化的求解算法
tol
双精度
迭代算法的收敛性
weightCol
字符串
列权重
3. 代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 import hnbian.spark.utils.SparkUtils import org.apache.spark.ml.regression.GeneralizedLinearRegression import utils.FileUtils object GeneralizedLinearRegression extends App { val spark = SparkUtils .getSparkSession("GeneralizedLinearRegression" , 4 ) val filePath = FileUtils .getFilePath("sample_linear_regression_data.txt" ) val dataDF = spark.read.format("libsvm" ).load(filePath) dataDF.show(false ) val glr = new GeneralizedLinearRegression () .setFamily("gaussian" ) .setLink("identity" ) .setMaxIter(10 ) .setRegParam(0.3 ) .setFitIntercept(false ) dataDF.persist() val model = glr.fit(dataDF) model.transform(dataDF).show() dataDF.unpersist() println(s"Coefficients: ${model.coefficients} " ) println(s"Intercept: ${model.intercept} " ) val summary = model.summary summary.residuals().show(3 ) println(s"Coefficient Standard Errors: ${summary.coefficientStandardErrors.mkString(",")} " ) println(s"T Values: ${summary.tValues.mkString(",")} " ) println(s"P Values: ${summary.pValues.mkString(",")} " ) println(s"Dispersion: ${summary.dispersion} " ) println(s"Null Deviance: ${summary.nullDeviance} " ) println(s"Residual Degree Of Freedom Null: ${summary.residualDegreeOfFreedomNull} " ) println(s"Deviance: ${summary.deviance} " ) println(s"Residual Degree Of Freedom: ${summary.residualDegreeOfFreedom} " ) println(s"AIC: ${summary.aic} " ) }
2. 决策树回归 2.1 算法介绍 算法介绍:
决策树以及其集成算法是机器学习分类和回归问题中非常流行的算法, 因其已解释性、可处理类别特征、易扩展到多分类、不需要特征缩放等性质被广泛使用。 树集成算法如随机森林以及boosting 算法几乎是解决分类和回归问题中表现最优的算法。
决策树是一个贪心算法递归地将特征空间划分为两部分, 在同一个叶子节点的数据最后会拥有同样的标签。 每次划分通过贪心的以获得最大信息增益为目的, 从可选择的分裂方式中选择最佳的分裂节点, 节点不纯度有节点所含类别的同质性来衡量。 同居提供为分类提供两种不纯度衡量(基尼不纯度和熵), 为回归提供一种不纯度衡量(方差)
spark.ml支持二分类、多分类以及回归的决策树算法,适用于连续特征以及类别特征。 另外, 对于分类问题, 工具可以返回属于每种类别的概率(类别条件概率), 对于回归问题工具可以返回预测在偏置样本上的方差。
2.2 参数介绍
参数名称
类型
说明
featuresCol
字符串
特征列名
labelCol
字符串
标签列名
predictionCol
字符串
预测结果列名
varianceCol
字符串
预测的有偏样本偏差的列名
checkpointInterval
整数
设置检查点间隔(>=1),或不设置检查点(-1)
impurity
字符串
计算信息增益的准则(不区分大小写)
maxBins
整数
连续特征离散化的最大数量,以及选择每个节点分裂特征的方式
maxDepth
整数
树的最大深度(>=0)
minInfoGain
双精度
分裂节点时所需最小信息增益
minInstancesPerNode
整数
分裂后自节点最少包含的实例数量
seed
长整型
随机种子
2.3 调用示例 下面的例子导入LibSVM格式数据, 并将之划分为训练数据和测试数据。 使用第一部分数据进行训练, 剩下数据来测试。 训练之前我们使用了两种数据预处理芳芳来对特征进行转换, 并且添加了元数据到DataFrame
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 import org.apache.spark.{SparkConf , SparkContext }import org.apache.spark.sql.SparkSession object b_DecisionTreeRegressionModel extends App { val conf = new SparkConf ().setAppName("a_Tokenizer" ) conf.setMaster("local[4]" ) val sc = new SparkContext (conf) val spark = SparkSession .builder().getOrCreate() sc.setLogLevel("Error" ) val data = spark.read.format("libsvm" ).load("D:\\data\\sample_libsvm_data.txt" ) import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.regression.DecisionTreeRegressionModel import org.apache.spark.ml.regression.DecisionTreeRegressor val featureIndexer = new VectorIndexer () .setInputCol("features" ) .setOutputCol("indexedFeatures" ) .setMaxCategories(4 ) .fit(data) val Array (trainingData, testData) = data.randomSplit(Array (0.7 , 0.3 )) val dt = new DecisionTreeRegressor () .setLabelCol("label" ) .setFeaturesCol("indexedFeatures" ) val pipeline = new Pipeline () .setStages(Array (featureIndexer, dt)) val model = pipeline.fit(trainingData) val predictions = model.transform(testData) predictions.show(3 ) predictions.select("prediction" , "label" , "features" ).show(3 ) val evaluator = new RegressionEvaluator () .setLabelCol("label" ) .setPredictionCol("prediction" ) .setMetricName("rmse" ) val rmse = evaluator.evaluate(predictions) println("Root Mean Squared Error (RMSE) on test data = " + rmse) val treeModel = model.stages(1 ).asInstanceOf[DecisionTreeRegressionModel ] println("Learned regression tree model:\n" + treeModel.toDebugString) }
3. 随机森林回归 3.1 算法介绍 随机森林是决策树的继承算法。随机森林包含多个决策树来降低过拟合的风险。随机森林同样具有已解释性、可处理类别特征、易扩展到多分类问题、不需特征缩放等特性。
随机森林分别训练一系列决策树,所以训练过程是并行的。因算法中加入随机过程, 所以每个决策树又有少量区别。 通过合并每个树的预测结果来减少预测的方差, 提高在测试集上的性能表现。
随机性提现:
1,每次迭代时, 对原始数据进行二次抽样来获得不同的训练数据
2,对于每个树节点, 考虑不同的随机特征子集来进行分裂。
除此之外, 决策时的训练过程和单独决策树训练过程相同。
对新实例进行预测时,随机森林需要整合其各个决策树的预测结果。回归和分类问题的整合方式略有不同。
分类问题采取投票制, 每个决策树投票给一个类别, 获得最多投票的类别为最终结果。
回归问题每个树得到的预测结果为实数, 最终的预测结果为各个树预测结果的平均值。
spark.ml 支持二分类、多分类以及回归的随机森林算法, 适用于连续特征以及类别特征。
3.2 参数介绍
参数名称
类型
说明
featuresCol
字符串
特征列名
labelCol
字符串
标签列名
predictionCol
字符串
预测结果列名
probabilityCol
字符串
类别条件概率预测结果列名
checkpointInterval
整数
设置检查点间隔(>=1), 或不设置检查点(-1)
featureSubsetStrategy
字符串
每次分裂候选特征数量
impurity
字符串
计算信息增益的准则(不区分大小写)
maxBins
整数
连续特征离散化的最大数量,以及选择每个节点分裂特征的方式
maxDepth
整数
树的最大深度(>=0)
minInfoGain
双精度
分裂节点时所需最小信息增益
minInstancesPerNode
整数
分裂后自节点最少包含的实例数量
numTrees
整数
训练的树的数量
seed
长整型
随机种子
subsamplingRate
双精度
学习一棵决策树使用的训练数据比例,范围[0,1]
thresholds
双精度数组
多分类预测的阀值,以调整预测结果在各个类别的概率
3.3 调用示例 下面的例子导入LibSVM格式数据, 并将之划分为训练数据和测试数据, 使用第一部分数据进行训练, 剩下数据来测试。 训练之前我们使用了两种数据预测处理方法对特征进行转换,并且添加了元数据到DataFrame
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 import org.apache.spark.{SparkConf , SparkContext }import org.apache.spark.sql.SparkSession object c_RandomForest extends App { val conf = new SparkConf ().setAppName("c_RandomForest" ) conf.setMaster("local[4]" ) val sc = new SparkContext (conf) val spark = SparkSession .builder().getOrCreate() sc.setLogLevel("Error" ) val data = spark.read.format("libsvm" ).load("F:\\data\\sample_libsvm_data.txt" ) import org.apache.spark.ml.Pipeline import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.regression.{RandomForestRegressionModel , RandomForestRegressor } val featureIndexer = new VectorIndexer () .setInputCol("features" ) .setOutputCol("indexedFeatures" ) .setMaxCategories(4 ) .fit(data) val Array (trainingData, testData) = data.randomSplit(Array (0.7 , 0.3 )) val rf = new RandomForestRegressor () .setLabelCol("label" ) .setFeaturesCol("indexedFeatures" ) val pipeline = new Pipeline () .setStages(Array (featureIndexer, rf)) val model = pipeline.fit(trainingData) val predictions = model.transform(testData) predictions.show(3 ) predictions.select("prediction" , "label" , "features" ).show(3 ) val evaluator = new RegressionEvaluator () .setLabelCol("label" ) .setPredictionCol("prediction" ) .setMetricName("rmse" ) val rmse = evaluator.evaluate(predictions) println("Root Mean Squared Error (RMSE) on test data = " + rmse) val rfModel = model.stages(1 ).asInstanceOf[RandomForestRegressionModel ] println("Learned regression forest model:\n" + rfModel.toDebugString) }