objectGBTRegressorextendsApp{ val conf = newSparkConf().setAppName("GBTRegressor") //设置master local[4] 指定本地模式开启模拟worker线程数 conf.setMaster("local[4]") //创建sparkContext文件 val sc = newSparkContext(conf) val spark = SparkSession.builder().getOrCreate() sc.setLogLevel("Error") val data = spark.read.format("libsvm").load("F:\\data\\sample_libsvm_data.txt")
// Train a GBT model. val gbt = newGBTRegressor() .setLabelCol("label") .setFeaturesCol("indexedFeatures") .setMaxIter(10)
// Chain indexer and GBT in a Pipeline. val pipeline = newPipeline() .setStages(Array(featureIndexer, gbt))
// Train model. This also runs the indexer. val model = pipeline.fit(trainingData)
// Make predictions. val predictions = model.transform(testData) predictions.show(3) /** +-----+--------------------+--------------------+----------+ |label| features| indexedFeatures|prediction| +-----+--------------------+--------------------+----------+ | 0.0|(692,[95,96,97,12...|(692,[95,96,97,12...| 0.0| | 0.0|(692,[122,123,148...|(692,[122,123,148...| 0.0| | 0.0|(692,[124,125,126...|(692,[124,125,126...| 0.0| +-----+--------------------+--------------------+----------+ */ // Select example rows to display. predictions.select("prediction", "label", "features").show(5) /** +----------+-----+--------------------+ |prediction|label| features| +----------+-----+--------------------+ | 0.0| 0.0|(692,[95,96,97,12...| | 0.0| 0.0|(692,[122,123,148...| | 0.0| 0.0|(692,[124,125,126...| +----------+-----+--------------------+ */ // Select (prediction, true label) and compute test error. val evaluator = newRegressionEvaluator() .setLabelCol("label") .setPredictionCol("prediction") .setMetricName("rmse") val rmse = evaluator.evaluate(predictions)
println("Root Mean Squared Error (RMSE) on test data = " + rmse) //Root Mean Squared Error (RMSE) on test data = 0.19245008972987526 val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel] println("Learned regression GBT model:\n" + gbtModel.toDebugString) /** Learned regression GBT model: GBTRegressionModel (uid=gbtr_3d59f66ce413) with 10 trees Tree 0 (weight 1.0): If (feature 406 <= 72.0) If (feature 99 in {0.0,3.0}) Predict: 0.0 Else (feature 99 not in {0.0,3.0}) Predict: 1.0 Else (feature 406 > 72.0) Predict: 1.0 Tree 1 (weight 0.1): Predict: 0.0 Tree 2 (weight 0.1): Predict: 0.0 Tree 3 (weight 0.1): Predict: 0.0 Tree 4 (weight 0.1): Predict: 0.0 Tree 5 (weight 0.1): Predict: 0.0 Tree 6 (weight 0.1): Predict: 0.0 Tree 7 (weight 0.1): Predict: 0.0 Tree 8 (weight 0.1): Predict: 0.0 Tree 9 (weight 0.1): Predict: 0.0 */ }
2. 生存回归
2.1 介绍
在spark.ml中, 我们实施加速失效时间模型( Acceleratedfailure time ), 对于截尾数据它是一个参数化生存回归的模型。它描述了一个有对数生存时间的模型, 所以它也常被称为生存分析的对数线性模型。 与比例危险模型不同, 因AFT模型中每个实例对目标函数的贡献是独立的, 其更容易并行化。
/** * Created by admin on 2018/4/26. * 生存回归(加速失效时间模型) */ objectAFTSurvivalRegressionextendsApp{ val conf = newSparkConf().setAppName("AFTSurvivalRegression") //设置master local[4] 指定本地模式开启模拟worker线程数 conf.setMaster("local[4]") //创建sparkContext文件 val sc = newSparkContext(conf) val spark = SparkSession.builder().getOrCreate() sc.setLogLevel("Error")
// Trains an isotonic regression model. val ir = newIsotonicRegression() val model = ir.fit(dataset)
println(s"Boundaries in increasing order: ${model.boundaries}") // Boundaries in increasing order: [0.0,0.0] println(s"Predictions associated with the boundaries: ${model.predictions}") //Predictions associated with the boundaries: [0.0,1.0]