spark实现逻辑回归和集群环境运行

1,032 阅读4分钟

spark mllib 机器学习

scala代码:

package Classification

/**
  * LogisticRegression Algorithm For Hive
  * 逻辑回归测试环境连hive部署自动化
  * Created by wy on 2019/04/01
  */
import java.io.File

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.optimization.{L1Updater, SimpleUpdater, SquaredL2Updater, Updater}
import org.apache.spark.mllib.classification.ClassificationModel
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

object LrForHive {
  //屏蔽不必要的日志显示在终端上
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

  def main(args: Array[String]): Unit = {
    //程序入口初始化
    val conf = new SparkConf().setAppName("LrForHive")
    conf.set("set hive.cli.print.header","false")   //去除hive表列名

    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    //hive读数据
    val Data = sqlContext.sql("select * from sospdm.sosp_ml_supervip_big_order_mem").rdd

    val data = Data.map(x => x(0).toString).map{
      line => {
        val arr = line.split("#")
        val label = arr(1).toDouble
        val features = arr(0).split(",").map(_.toDouble)
        LabeledPoint(label,Vectors.dense(features))  //创建一个稠密向量
      }
    }

    data.cache()  //缓存

    //特征标准化
    val vectors = data.map(x => x.features)
    val scalar = new StandardScaler(withMean = true, withStd = true).fit(vectors) //将向量传到转换函数
    val scaledData = data.map(x => LabeledPoint(x.label, scalar.transform(x.features)))
    scaledData.cache()

    /** 模型参数调优MLlib线性模型优化技术:SGD和L-BFGS(只在逻辑回归中使用LogisticRegressionWithLBFGS)*/
    //线性模型
    //定义训练调参辅助函数,根据给定输入训练模型 (输入, 则正则化参数, 迭代次数, 正则化形式, 步长)
    def trainWithParams(input: RDD[LabeledPoint], regParam: Double, numIterations: Int,
                        updater: Updater, stepSize: Double) = {
      val lr =new LogisticRegressionWithSGD  //逻辑回归也可以用LogisticRegressionWithLBFGS
      lr.optimizer
        .setNumIterations(numIterations)  //迭代次数
        .setStepSize(stepSize)            //步长
        .setRegParam(regParam)            //则正则化参数
        .setUpdater(updater)              //正则化形式
      lr.run(input)                       //输入训练数据RDD
    }

    //定义第二个辅助函数,label为需要调试的参数,data:输入预测的数据,model训练的模型
    def createMetrics(label: Double, data: RDD[LabeledPoint], model: ClassificationModel) = {
      val scoreAndLabels = data.map { point =>
        (model.predict(point.features),point.label)  //(predicts,label)
      }
      val metrics = new BinaryClassificationMetrics(scoreAndLabels)
      (label, metrics.areaUnderROC())  //计算AUC
    }

    //1迭代次数
    val iterateResults = Seq(1, 5, 10, 50, 100).map { param =>
      //训练
      val model = trainWithParams(scaledData, 0.0, param, new SimpleUpdater, 1.0)
      //拟合,计算AUC
      createMetrics(param, scaledData, model)
    }
    iterateResults.foreach { case (param, auc) => println(f"$param iterations, AUC = ${auc * 100}%2.2f%%")}
    var maxIterateAuc = 0.0
    var bestIterateParam = 0
    for(x <- iterateResults){
      if(x._2 > maxIterateAuc){
        maxIterateAuc = x._2
        bestIterateParam = x._1.toInt
      }
    }
    println("max auc: " + maxIterateAuc + " best numIterations param: " + bestIterateParam)

    //2步长 大步长收敛快,太大可能导致收敛到局部最优解
    val stepResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param =>
      val model = trainWithParams(scaledData, 0.0, bestIterateParam, new SimpleUpdater, param)
      createMetrics(param, scaledData, model)
    }
    stepResults.foreach { case (param, auc) => println(f"$param stepSize, AUC = ${auc * 100}%2.2f%%")}
    var maxStepAuc = 0.0
    var bestStepParam = 0.0
    for(x <- stepResults){
      if(x._2 > maxStepAuc){
        maxStepAuc = x._2
        bestStepParam = x._1
      }
    }
    println("max auc: " + maxStepAuc + " best stepSize param: " + bestStepParam)

    //3.1正则化参数,默认值为0.0,L1正则new L1Updater
    val regL1Results = Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0).map{ param =>
      val model = trainWithParams(scaledData, param, bestIterateParam, new L1Updater, bestStepParam)
      createMetrics(param, scaledData, model)
    }
    regL1Results.foreach{ case (param,auc) => println(f"$param regParam L1, AUC = ${auc * 100}%2.2f%%")}
    var maxRegL1Auc = 0.0
    var bestRegL1Param = 0.0
    for(x <- regL1Results){
      if(x._2 > maxRegL1Auc){
        maxRegL1Auc = x._2
        bestRegL1Param = x._1
      }
    }
    println("max auc: " + maxRegL1Auc + " best L1regParam: " + bestRegL1Param)

    //3.2正则化参数:默认值为0.0,L2正则new SquaredL2Updater
    val regL2Results = Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0).map{ param =>
      val model = trainWithParams(scaledData, param, bestIterateParam, new SquaredL2Updater, bestStepParam)
      createMetrics(param, scaledData, model)
    }
    regL2Results.foreach{ case (param,auc) => println(f"$param regParam L2, AUC = ${auc * 100}%2.2f%%")}
    var maxRegL2Auc = 0.0
    var bestRegL2Param = 0.0
    for(x <- regL2Results){
      if(x._2 > maxRegL2Auc){
        maxRegL2Auc = x._2
        bestRegL2Param = x._1
      }
    }
    println("max auc: " + maxRegL2Auc + " best L2regParam: " + bestRegL2Param)
    //4正则化形式:默认为new SimpleUpdater 正则化系数无效,前两个参数调参后最优AUC为maxStepAuc
    //则,3.1和3.2的最优AUC与maxStepAuc比较,较大的则为最优正则化形式
    var bestRegParam = 0.0
    var bestUpdaterID = 0
    if(maxStepAuc >= maxRegL1Auc ){
      if(maxStepAuc >= maxRegL2Auc){
        bestUpdaterID = 0
        bestRegParam = 0.0
      }
      else {
        bestUpdaterID = 2
        bestRegParam = bestRegL2Param
      }
    }
    else {
      if(maxRegL2Auc >= maxRegL1Auc){
        bestUpdaterID = 2
        bestRegParam = bestRegL2Param
      }
      else {
        bestUpdaterID = 1
        bestRegParam = bestRegL1Param
      }
    }
    val Updaters = Seq(new SimpleUpdater, new L1Updater, new SquaredL2Updater)
    val bestUpdater = Updaters(bestUpdaterID)
    //最优参数:
    println("------------------更新模型训练参数---------------------")
    println(f"best numIterations param: $bestIterateParam\n" +
      f"best stepSize param: $bestStepParam\n" +
      f"best regParam: $bestRegParam\n" +
      f"best regUpdater: $bestUpdater\n"
    )

    val upDateLrModel = trainWithParams(scaledData, bestRegParam, bestIterateParam, bestUpdater, bestStepParam)

    /**
    //删除模型目录和文件
    def dirDel(path: File) {
      if (!path.exists())
        return
      else if (path.isFile) {
        path.delete()
        return
      }
      val file: Array[File] = path.listFiles()
      for (d <- file) {
        dirDel(d)
      }
      path.delete()
    }
    val path: File = new File("/data/home/sospdm/tmp_wy/Model/upDateLrModel.model")
    dirDel(path)  //删除原模型保存的文件,不删除新模型保存会报错
    // 保存和加载模型
    upDateLrModel.save(sc, "/data/home/sospdm/tmp_wy/Model/upDateLrModel.model")
    val loadLrModel = LogisticRegressionModel.load(sc, "/data/home/sospdm/tmp_wy/Model/upDateLrModel.model")
    */

    //打印模型权重值
    val newRes = upDateLrModel.weights.toArray

    val rdd1 = sc.parallelize(Seq(newRes)) //转换成RDD


    //通过StructType直接指定每个字段的schema
    val schema = StructType(
      List(
        StructField("feature01", DoubleType, nullable = true),
        StructField("feature02", DoubleType, nullable = true),
        StructField("feature03", DoubleType, nullable = true),
        StructField("feature04", DoubleType, nullable = true),
        StructField("feature05", DoubleType, nullable = true),
        StructField("feature06", DoubleType, nullable = true),
        StructField("feature07", DoubleType, nullable = true),
        StructField("feature08", DoubleType, nullable = true),
        StructField("feature09", DoubleType, nullable = true),
        StructField("feature10", DoubleType, nullable = true),
        StructField("feature11", DoubleType, nullable = true),
        StructField("feature12", DoubleType, nullable = true),
        StructField("feature13", DoubleType, nullable = true),
        StructField("feature14", DoubleType, nullable = true)
      )
    )

    //将RDD1映射到rowRDD,19个特征权重值
    val resRDD = rdd1.map(f => (f(0),f(1),f(2),f(3),f(4),f(5),f(6),f(7),f(8),f(9),f(10)
      ,f(11),f(12),f(13)))

    val rowRDD = resRDD.map(x => Row(x._1.toDouble,x._2.toDouble,x._3.toDouble,
      x._4.toDouble,x._5.toDouble,x._6.toDouble,x._7.toDouble,x._8.toDouble,x._9.toDouble,
      x._10.toDouble,x._11.toDouble,x._12.toDouble,x._13.toDouble,x._14.toDouble))
    //转换成DF,并放入表item_sim
    val df = sqlContext.createDataFrame(rowRDD, schema)

    df.createOrReplaceTempView("item_sim_c01")

    //结果写入hive
    sqlContext.sql("drop table if exists sospdm.sosp_ml_supervip_big_order_features")
    sqlContext.sql("create table if not exists " +
      "sospdm.sosp_ml_supervip_big_order_features as select * from item_sim_c01")

    //方法2
    val rdd2 = sc.parallelize(newRes)
    val rowRdd2 = rdd2.map(p => Row(p))
    val schema2 = StructType(
      List(
        StructField("feature01", DoubleType, nullable = true)
      )
    )
    val df2 = sqlContext.createDataFrame(rowRdd2, schema2)
    df2.createOrReplaceTempView("item_sim_c02")
    //结果写入hive
    sqlContext.sql("drop table if exists sospdm.sosp_ml_supervip_big_order_features_02")
    sqlContext.sql("create table if not exists " +
      "sospdm.sosp_ml_supervip_big_order_features_02 as select * from item_sim_c02")

    sc.stop()
  }
}

hive查询结果:

hive> select * from sosp_ml_supervip_big_order_features;
OK
0.6663199754057857      -0.010171216293572719   0.0023033400349458714   0.038338481430094495
-0.01642462221720575     0.024300063006121263    0.010833461995473337    0.7449827421313793  
-0.028370767756329837    -0.01679050770672618    -0.004508776927906877   -0.01072063206632886
-0.05246889909683635     0.0167997584085957
Time taken: 0.118 seconds, Fetched: 1 row(s)
hive> select * from sosp_ml_supervip_big_order_features_02;
OK
0.6663199754057857
-0.010171216293572719
0.0023033400349458714
0.038338481430094495
-0.01642462221720575
0.024300063006121263
0.010833461995473337
0.7449827421313793
-0.028370767756329837
-0.01679050770672618
-0.004508776927906877
-0.010720632066328865
-0.05246889909683635
0.0167997584085957
Time taken: 0.095 seconds, Fetched: 14 row(s)