spark mllib
机器学习
scala代码:
package Classification
/**
* LogisticRegression Algorithm For Hive
* 逻辑回归测试环境连hive部署自动化
* Created by wy on 2019/04/01
*/
import java.io.File
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.optimization.{L1Updater, SimpleUpdater, SquaredL2Updater, Updater}
import org.apache.spark.mllib.classification.ClassificationModel
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
object LrForHive {
//屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
def main(args: Array[String]): Unit = {
//程序入口初始化
val conf = new SparkConf().setAppName("LrForHive")
conf.set("set hive.cli.print.header","false") //去除hive表列名
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
//hive读数据
val Data = sqlContext.sql("select * from sospdm.sosp_ml_supervip_big_order_mem").rdd
val data = Data.map(x => x(0).toString).map{
line => {
val arr = line.split("#")
val label = arr(1).toDouble
val features = arr(0).split(",").map(_.toDouble)
LabeledPoint(label,Vectors.dense(features)) //创建一个稠密向量
}
}
data.cache() //缓存
//特征标准化
val vectors = data.map(x => x.features)
val scalar = new StandardScaler(withMean = true, withStd = true).fit(vectors) //将向量传到转换函数
val scaledData = data.map(x => LabeledPoint(x.label, scalar.transform(x.features)))
scaledData.cache()
/** 模型参数调优MLlib线性模型优化技术:SGD和L-BFGS(只在逻辑回归中使用LogisticRegressionWithLBFGS)*/
//线性模型
//定义训练调参辅助函数,根据给定输入训练模型 (输入, 则正则化参数, 迭代次数, 正则化形式, 步长)
def trainWithParams(input: RDD[LabeledPoint], regParam: Double, numIterations: Int,
updater: Updater, stepSize: Double) = {
val lr =new LogisticRegressionWithSGD //逻辑回归也可以用LogisticRegressionWithLBFGS
lr.optimizer
.setNumIterations(numIterations) //迭代次数
.setStepSize(stepSize) //步长
.setRegParam(regParam) //则正则化参数
.setUpdater(updater) //正则化形式
lr.run(input) //输入训练数据RDD
}
//定义第二个辅助函数,label为需要调试的参数,data:输入预测的数据,model训练的模型
def createMetrics(label: Double, data: RDD[LabeledPoint], model: ClassificationModel) = {
val scoreAndLabels = data.map { point =>
(model.predict(point.features),point.label) //(predicts,label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(label, metrics.areaUnderROC()) //计算AUC
}
//1迭代次数
val iterateResults = Seq(1, 5, 10, 50, 100).map { param =>
//训练
val model = trainWithParams(scaledData, 0.0, param, new SimpleUpdater, 1.0)
//拟合,计算AUC
createMetrics(param, scaledData, model)
}
iterateResults.foreach { case (param, auc) => println(f"$param iterations, AUC = ${auc * 100}%2.2f%%")}
var maxIterateAuc = 0.0
var bestIterateParam = 0
for(x <- iterateResults){
if(x._2 > maxIterateAuc){
maxIterateAuc = x._2
bestIterateParam = x._1.toInt
}
}
println("max auc: " + maxIterateAuc + " best numIterations param: " + bestIterateParam)
//2步长 大步长收敛快,太大可能导致收敛到局部最优解
val stepResults = Seq(0.001, 0.01, 0.1, 1.0, 10.0).map { param =>
val model = trainWithParams(scaledData, 0.0, bestIterateParam, new SimpleUpdater, param)
createMetrics(param, scaledData, model)
}
stepResults.foreach { case (param, auc) => println(f"$param stepSize, AUC = ${auc * 100}%2.2f%%")}
var maxStepAuc = 0.0
var bestStepParam = 0.0
for(x <- stepResults){
if(x._2 > maxStepAuc){
maxStepAuc = x._2
bestStepParam = x._1
}
}
println("max auc: " + maxStepAuc + " best stepSize param: " + bestStepParam)
//3.1正则化参数,默认值为0.0,L1正则new L1Updater
val regL1Results = Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0).map{ param =>
val model = trainWithParams(scaledData, param, bestIterateParam, new L1Updater, bestStepParam)
createMetrics(param, scaledData, model)
}
regL1Results.foreach{ case (param,auc) => println(f"$param regParam L1, AUC = ${auc * 100}%2.2f%%")}
var maxRegL1Auc = 0.0
var bestRegL1Param = 0.0
for(x <- regL1Results){
if(x._2 > maxRegL1Auc){
maxRegL1Auc = x._2
bestRegL1Param = x._1
}
}
println("max auc: " + maxRegL1Auc + " best L1regParam: " + bestRegL1Param)
//3.2正则化参数:默认值为0.0,L2正则new SquaredL2Updater
val regL2Results = Seq(0.0, 0.001, 0.01, 0.1, 1.0, 10.0).map{ param =>
val model = trainWithParams(scaledData, param, bestIterateParam, new SquaredL2Updater, bestStepParam)
createMetrics(param, scaledData, model)
}
regL2Results.foreach{ case (param,auc) => println(f"$param regParam L2, AUC = ${auc * 100}%2.2f%%")}
var maxRegL2Auc = 0.0
var bestRegL2Param = 0.0
for(x <- regL2Results){
if(x._2 > maxRegL2Auc){
maxRegL2Auc = x._2
bestRegL2Param = x._1
}
}
println("max auc: " + maxRegL2Auc + " best L2regParam: " + bestRegL2Param)
//4正则化形式:默认为new SimpleUpdater 正则化系数无效,前两个参数调参后最优AUC为maxStepAuc
//则,3.1和3.2的最优AUC与maxStepAuc比较,较大的则为最优正则化形式
var bestRegParam = 0.0
var bestUpdaterID = 0
if(maxStepAuc >= maxRegL1Auc ){
if(maxStepAuc >= maxRegL2Auc){
bestUpdaterID = 0
bestRegParam = 0.0
}
else {
bestUpdaterID = 2
bestRegParam = bestRegL2Param
}
}
else {
if(maxRegL2Auc >= maxRegL1Auc){
bestUpdaterID = 2
bestRegParam = bestRegL2Param
}
else {
bestUpdaterID = 1
bestRegParam = bestRegL1Param
}
}
val Updaters = Seq(new SimpleUpdater, new L1Updater, new SquaredL2Updater)
val bestUpdater = Updaters(bestUpdaterID)
//最优参数:
println("------------------更新模型训练参数---------------------")
println(f"best numIterations param: $bestIterateParam\n" +
f"best stepSize param: $bestStepParam\n" +
f"best regParam: $bestRegParam\n" +
f"best regUpdater: $bestUpdater\n"
)
val upDateLrModel = trainWithParams(scaledData, bestRegParam, bestIterateParam, bestUpdater, bestStepParam)
/**
//删除模型目录和文件
def dirDel(path: File) {
if (!path.exists())
return
else if (path.isFile) {
path.delete()
return
}
val file: Array[File] = path.listFiles()
for (d <- file) {
dirDel(d)
}
path.delete()
}
val path: File = new File("/data/home/sospdm/tmp_wy/Model/upDateLrModel.model")
dirDel(path) //删除原模型保存的文件,不删除新模型保存会报错
// 保存和加载模型
upDateLrModel.save(sc, "/data/home/sospdm/tmp_wy/Model/upDateLrModel.model")
val loadLrModel = LogisticRegressionModel.load(sc, "/data/home/sospdm/tmp_wy/Model/upDateLrModel.model")
*/
//打印模型权重值
val newRes = upDateLrModel.weights.toArray
val rdd1 = sc.parallelize(Seq(newRes)) //转换成RDD
//通过StructType直接指定每个字段的schema
val schema = StructType(
List(
StructField("feature01", DoubleType, nullable = true),
StructField("feature02", DoubleType, nullable = true),
StructField("feature03", DoubleType, nullable = true),
StructField("feature04", DoubleType, nullable = true),
StructField("feature05", DoubleType, nullable = true),
StructField("feature06", DoubleType, nullable = true),
StructField("feature07", DoubleType, nullable = true),
StructField("feature08", DoubleType, nullable = true),
StructField("feature09", DoubleType, nullable = true),
StructField("feature10", DoubleType, nullable = true),
StructField("feature11", DoubleType, nullable = true),
StructField("feature12", DoubleType, nullable = true),
StructField("feature13", DoubleType, nullable = true),
StructField("feature14", DoubleType, nullable = true)
)
)
//将RDD1映射到rowRDD,19个特征权重值
val resRDD = rdd1.map(f => (f(0),f(1),f(2),f(3),f(4),f(5),f(6),f(7),f(8),f(9),f(10)
,f(11),f(12),f(13)))
val rowRDD = resRDD.map(x => Row(x._1.toDouble,x._2.toDouble,x._3.toDouble,
x._4.toDouble,x._5.toDouble,x._6.toDouble,x._7.toDouble,x._8.toDouble,x._9.toDouble,
x._10.toDouble,x._11.toDouble,x._12.toDouble,x._13.toDouble,x._14.toDouble))
//转换成DF,并放入表item_sim
val df = sqlContext.createDataFrame(rowRDD, schema)
df.createOrReplaceTempView("item_sim_c01")
//结果写入hive
sqlContext.sql("drop table if exists sospdm.sosp_ml_supervip_big_order_features")
sqlContext.sql("create table if not exists " +
"sospdm.sosp_ml_supervip_big_order_features as select * from item_sim_c01")
//方法2
val rdd2 = sc.parallelize(newRes)
val rowRdd2 = rdd2.map(p => Row(p))
val schema2 = StructType(
List(
StructField("feature01", DoubleType, nullable = true)
)
)
val df2 = sqlContext.createDataFrame(rowRdd2, schema2)
df2.createOrReplaceTempView("item_sim_c02")
//结果写入hive
sqlContext.sql("drop table if exists sospdm.sosp_ml_supervip_big_order_features_02")
sqlContext.sql("create table if not exists " +
"sospdm.sosp_ml_supervip_big_order_features_02 as select * from item_sim_c02")
sc.stop()
}
}
hive查询结果:
hive> select * from sosp_ml_supervip_big_order_features;
OK
0.6663199754057857 -0.010171216293572719 0.0023033400349458714 0.038338481430094495
-0.01642462221720575 0.024300063006121263 0.010833461995473337 0.7449827421313793
-0.028370767756329837 -0.01679050770672618 -0.004508776927906877 -0.01072063206632886
-0.05246889909683635 0.0167997584085957
Time taken: 0.118 seconds, Fetched: 1 row(s)
hive> select * from sosp_ml_supervip_big_order_features_02;
OK
0.6663199754057857
-0.010171216293572719
0.0023033400349458714
0.038338481430094495
-0.01642462221720575
0.024300063006121263
0.010833461995473337
0.7449827421313793
-0.028370767756329837
-0.01679050770672618
-0.004508776927906877
-0.010720632066328865
-0.05246889909683635
0.0167997584085957
Time taken: 0.095 seconds, Fetched: 14 row(s)