Spark ML 特征提取转换及处理算子实战技巧-Spark商业ML实战

2,015 阅读9分钟

本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,如有任何商业交流,可随时联系。

1 特征工程处理的分类

可以看到spark的特征工程分为以下4个方向:

  • Extraction: Extracting features from “raw” data
  • Transformation: Scaling, converting, or modifying features
  • Selection: Selecting a subset from a larger set of features
  • Locality Sensitive Hashing (LSH): This class of algorithms combines aspects of feature transformation with other algorithms.

特征抽取 ,特征转换,特征选择,特征转换,Spark ML整个特征工程架构如下图所示:

    1: Feature Extractors
       TF-IDF
       Word2Vec
       CountVectorizer
       FeatureHasher
    2:Feature Transformers
        Tokenizer
        StopWordsRemover
        n-gram
        Binarizer
        PCA
        PolynomialExpansion
        Discrete Cosine Transform (DCT)
        StringIndexer
        IndexToString
        OneHotEncoder (Deprecated since 2.3.0)
        OneHotEncoderEstimator
        VectorIndexer
        Interaction
        Normalizer
        StandardScaler
        MinMaxScaler
        MaxAbsScaler
        Bucketizer
        ElementwiseProduct
        SQLTransformer
        VectorAssembler
        VectorSizeHint
        QuantileDiscretizer
        Imputer
    3:Feature Selectors
        VectorSlicer
        RFormula
        ChiSqSelector
        Locality Sensitive Hashing
        LSH Operations
    4:Feature Transformation
        LSH Operations:
            Feature Transformation
            Approximate Similarity Join
            Approximate Nearest Neighbor Search
        LSH Algorithms:
            Bucketed Random Projection for Euclidean Distance
            MinHash for Jaccard Distance

2 从特征抽取开始

2.1 TF-IDF(万事开头难,这是我第一个纠结的案例)

词频-逆向文件频率”(TF-IDF)是一种在文本挖掘中广泛使用的特征向量化方法,它可以体现一个文档中词语在语料库中的重要程度。

  • 词语由t表示,文档由d表示,语料库由D表示。词频TF(t,d)是词语t在文档d中出现的次数。文件频率DF(t,D)是包含词语的文档的个数。如果我们只使用词频来衡量重要性,很容易过度强调在文档中经常出现,却没有太多实际信息的词语,比如“a”,“the”以及“of”。如果一个词语经常出现在语料库中,意味着它并不能很好的对文档进行区分。TF-IDF就是在数值化文档信息,衡量词语能提供多少信息以区分文档。其定义如下: IDF(t,D)=log|D|+1DF(t,D)+1​ 此处|D| 是语料库中总的文档数。公式中使用log函数,当词出现在所有文档中时,它的IDF值变为0。加1是为了避免分母为0的情况。TF-IDF 度量值表示如下: TFIDF(t,d,D)=TF(t,d)⋅IDF(t,D)​ 在Spark ML库中,TF-IDF被分成两部分:TF (+hashing) 和 IDF。

  • TF: HashingTF 是一个Transformer,在文本处理中,接收词条的集合然后把这些集合转化成固定长度的特征向量。这个算法在哈希的同时会统计各个词条的词频。

  • IDF: IDF是一个Estimator,在一个数据集上应用它的fit()方法,产生一个IDFModel。 该IDFModel 接收特征向量(由HashingTF产生),然后计算每一个词在文档中出现的频次。IDF会减少那些在语料库中出现频率较高的词的权重。

  • Spark.mllib 中实现词频率统计使用特征hash的方式,原始特征通过hash函数,映射到一个索引值。后面只需要统计这些索引值的频率,就可以知道对应词的频率。这种方式避免设计一个全局1对1的词到索引的映射,这个映射在映射大量语料库时需要花费更长的时间。但需要注意,通过hash的方式可能会映射到同一个值的情况,即不同的原始特征通过Hash映射后是同一个值。为了降低这种情况出现的概率,我们只能对特征向量升维。i.e., 提高hash表的桶数,默认特征维度是 2^20 = 1,048,576.

接下来以一组句子开始。首先使用分解器Tokenizer把句子划分为单个词语。对每一个句子(词袋),我们使用HashingTF将句子转换为特征向量,最后使用IDF重新调整特征向量。这种转换通常可以提高使用文本特征的性能。

import org.apache.spark.sql.SparkSession
import spark.implicits._
val sentenceData = spark.createDataFrame(Seq(
     |       (0, "I I I Spark and I I Spark"),
     |       (0, "I wish wish wish wish wish classes"),
     |       (1, "Logistic regression regression regression regression regression I love it ")
     |     )).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
wordsData.show(false)

+-----+--------------------------------------------------------------------------+-----------------------------------------------------------------------------------+
|label|sentence                                                                  |words                                                                              |
+-----+--------------------------------------------------------------------------+-----------------------------------------------------------------------------------+
|0    |I I I Spark and I I Spark                                                 |[i, i, i, spark, and, i, i, spark]                                                 |
|0    |I wish wish wish wish wish classes                                        |[i, wish, wish, wish, wish, wish, classes]                                         |
|1    |Logistic regression regression regression regression regression I love it |[logistic, regression, regression, regression, regression, regression, i, love, it]|
+-----+--------------------------------------------------------------------------+-----------------------------------------------------------------------------------+

得到分词后的文档序列后,即可使用HashingTF的transform()方法把句子哈希成特征向量,这里设置哈希表的桶数为2000。
val hashingTF = new HashingTF().
 |       setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(2000)
val featurizedData = hashingTF.transform(wordsData)


可以看到,分词序列被变换成一个稀疏特征向量,其中每个单词都被散列成了一个不同的索引值,特征向量在某一维度上的值即该词汇在文档中出现的次数。
featurizedData.select("rawFeatures").show(false)
+----------------------------------------------------+
|rawFeatures                                         |
+----------------------------------------------------+
|(2000,[333,1105,1329],[1.0,2.0,5.0])                |
|(2000,[495,1329,1809],[5.0,1.0,1.0])                |
|(2000,[240,495,695,1329,1604],[1.0,1.0,5.0,1.0,1.0])|
+----------------------------------------------------+

featurizedData.rdd.foreach(println)

[0,I I I Spark and I I Spark,WrappedArray(i, i, i, spark, and, i, i, spark),(2000,[333,1105,1329],[1.0,2.0,5.0])]

[0,I wish wish wish wish wish classes,WrappedArray(i, wish, wish, wish, wish, wish, classes),(2000,[495,1329,1809],[5.0,1.0,1.0])]

[1,Logistic regression regression regression regression regression I love it ,WrappedArray(logistic, regression, regression, regression, regression, regression, i, love, it),(2000,[240,495,695,1329,1604],[1.0,1.0,5.0,1.0,1.0])]

真是太难搞懂了,辛亏我改变了案例,发现i的hash值为1329,发现在"I I I Spark and I I Spark"出现了5次。即(2000,[333,1105,1329],[1.0,2.0,5.0])。因此,1329也即全局唯一了。

val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)

val rescaledData = idfModel.transform(featurizedData)

scala> val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
idf: org.apache.spark.ml.feature.IDF = idf_18dec771e2e0


使用IDF来对单纯的词频特征向量进行修正,使其更能体现不同词汇对文本的区别能力,IDF是一个Estimator,调用fit()方法并将词频向量传入,即产生一个IDFModel

scala>     val idfModel = idf.fit(featurizedData)
idfModel: org.apache.spark.ml.feature.IDFModel = idf_18dec771e2e0               


IDFModel是一个Transformer,调用它的transform()方法,即可得到每一个单词对应的TF-IDF度量值。
scala> val rescaledData = idfModel.transform(featurizedData)
rescaledData: org.apache.spark.sql.DataFrame = [label: int, sentence: string ... 3 more fields]

特征向量已经被其在语料库中出现的总次数进行了修正,通过TF-IDF得到的特征向量,在接下来可以被应用到相关的机器学习方法中。
scala>  rescaledData.select("features", "label").take(3).foreach(println)
[(2000,[333,1105,1329],[0.6931471805599453,1.3862943611198906,0.0]),0]
[(2000,[495,1329,1809],[1.4384103622589042,0.0,0.6931471805599453]),0]
[(2000,[240,495,695,1329,1604],[0.6931471805599453,0.28768207245178085,3.4657359027997265,0.0,0.6931471805599453]),1]

scala> rescaledData.rdd.foreach(println)

进一步得到详细对比:
[0,I I I Spark and I I Spark,WrappedArray(i, i, i, spark, and, i, i, spark),(2000,[333,1105,1329],[1.0,2.0,5.0]),(2000,[333,1105,1329],[0.6931471805599453,1.3862943611198906,0.0])]

[0,I wish wish wish wish wish classes,WrappedArray(i, wish, wish, wish, wish, wish, classes),(2000,[495,1329,1809],[5.0,1.0,1.0]),(2000,[495,1329,1809],[1.4384103622589042,0.0,0.6931471805599453])]

[1,Logistic regression regression regression regression regression I love it ,WrappedArray(logistic, regression, regression, regression, regression, regression, i, love, it),(2000,[240,495,695,1329,1604],[1.0,1.0,5.0,1.0,1.0]),(2000,[240,495,695,1329,1604],[0.6931471805599453,0.28768207245178085,3.4657359027997265,0.0,0.6931471805599453])]

发现 “I I I Spark and I I Spark” 句子中单词 i 在所有文章中都出现,所以其TF-IDF值为 0,发现spark出现了两次所以其TF-IDF值为1.3862943611198906。

2.2 Word2Vec (不知道如何用,待学习)

Word2vec是一个Estimator,它采用一系列代表文档的词语来训练word2vecmodel。该模型将每个词语映射到一个固定大小的向量。word2vecmodel使用文档中每个词语的平均数来将文档转换为向量,然后这个向量可以作为预测的特征,来计算文档相似度计算等等。

  • ml库中,Word2vec 的实现使用的是skip-gram模型。Skip-gram的训练目标是学习词表征向量分布,其优化目标是在给定中心词的词向量的情况下,最大化以下似然函数:

     import org.apache.spark.ml.feature.Word2Vec
     import org.apache.spark.ml.linalg.Vector
     import org.apache.spark.sql.Row
     
      val documentDF = spark.createDataFrame(Seq(
       "Hi I I I Spark  Spark".split(" "),
       "I wish wish wish wish wish wish".split(" "),
       "Logistic regression".split(" ")
     ).map(Tuple1.apply)).toDF("text")
     
     val word2Vec = new Word2Vec().setInputCol("text").setOutputCol("result").setVectorSize(3).setMinCount(0)
     val model = word2Vec.fit(documentDF)
     
     文档被转变为了一个3维的特征向量,这些特征向量就可以被应用到相关的机器学习方法
     scala> result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
          |           println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
     Text: [Hi, I, I, I, Spark, , Spark] => 
     Vector: [-0.07306859535830361,-0.02478547128183501,-0.010775725756372723]
     
     Text: [I, wish, wish, wish, wish, wish, wish] => 
     Vector: [-0.033820114231535366,-0.13763525443417685,0.14657753705978394]
     
     Text: [Logistic, regression] => 
     Vector: [-0.10231713205575943,0.0494652334600687,0.014658251544460654]
    

2.3 CountVectorizer 计数向量器

CountVectorizer旨在通过计数来将一个文档转换为向量。当不存在先验字典时,Countvectorizer作为Estimator提取词汇进行训练,并生成一个CountVectorizerModel用于存储相应的词汇向量空间。

    import spark.implicits._
    import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
    val df = spark.createDataFrame(Seq(
      (0, Array("a", "b", "c")),
      (1, Array( "b", "b", "c", "a"))
    )).toDF("id", "words")
    
    val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol("words").setOutputCol("features").setVocabSize(3).setMinDF(2).fit(df)
   
    在训练结束后,可以通过CountVectorizerModel的vocabulary成员获得到模型的词汇表:
    scala> cvModel.vocabulary
    res46: Array[String] = Array(b, a, c)
    
    cvModel.transform(df).show(false)
    
    +---+------------+-------------------------+
    |id |words       |features                 |
    +---+------------+-------------------------+
    |0  |[a, b, c]   |(3,[0,1,2],[1.0,1.0,1.0])|
    |1  |[b, b, c, a]|(3,[0,1,2],[2.0,1.0,1.0])|
    +---+------------+-------------------------+
    
    val cvm = new CountVectorizerModel(Array("a", "b", "c")) .setInputCol("words").setOutputCol("features")
    cvModel.transform(df).show(false)
    
    +---+------------+-------------------------+
    |id |words       |features                 |
    +---+------------+-------------------------+
    |0  |[a, b, c]   |(3,[0,1,2],[1.0,1.0,1.0])|
    |1  |[b, b, c, a]|(3,[0,1,2],[2.0,1.0,1.0])|
    +---+------------+-------------------------+

2. FeatureHasher 计数向量器

  • Feature hashing projects a set of categorical or numerical features into a feature vector of specified dimension (typically substantially smaller than that of the original feature space). This is done using the hashing trick to map features to indices in the feature vector.

      val df = Seq(
       (2.0, true, "1", "foo"),
       (3.0, false, "2", "bar")
      ).toDF("real", "bool", "stringNum", "string")
      
      val hasher = new FeatureHasher().setInputCols("real", "bool", "stringNum", "string").setOutputCol("features")
      
      hasher.transform(df).show(false)
      
      +----+-----+---------+------+--------------------------------------------------------+
      |real|bool |stringNum|string|features                                                |
      +----+-----+---------+------+--------------------------------------------------------+
      |2.0 |true |1        |foo   |(262144,[174475,247670,257907,262126],[2.0,1.0,1.0,1.0])|
      |3.0 |false|2        |bar   |(262144,[70644,89673,173866,174475],[1.0,1.0,1.0,3.0])  |
      +----+-----+---------+------+--------------------------------------------------------+
    

总结

匆匆结束本文特征提取专题,特征转换才是重头戏。后续更精彩。

秦凯新 于深圳