1. 特征处理介绍
特征处理主要分三部分:
特征提取:从原始数据中提取特征
特征转换:特征的维度、特征的转化、特征的修改
特征选取:从大规模特征中选取一个子集
Spark 特征提取提供三种算法:分别是 TF-IDF、 Word2Vec 以及 CountVectorizer
2. TF-IDF(词频-逆向文件频率)
2.1 算法介绍
类别:transformer【转换器】
词频 – 逆向文件频率(TF-IDF(HashingTF and IDF)) 是一种在本文挖掘中广泛使用的特征向量化方法, 它可以体现一个文档中词语在语料库中的重要程度**。**
| t |
词语 |
| d |
文档 |
| D |
语料库 |
| TF(t,d) |
词频: 是词语t 在文档d中出现的次数 |
| DF(t,D) |
文件频率:包含词语t的文档个数 |
如果我们只使用词频来衡量重要性,很容易过度强调在文档中经常出现,却没有太多实际信息的词语,比如“a”,“the”以及“of”。
如果一个词语经常出现在语料库中,意味着它并不能很好的对文档进行区分。
TF-IDF就是在数值化文档信息,衡量词语能提供多少信息以区分文档。
其定义为:$IDF(t,D) = log \frac{|D|+1}{DF(t,D)+1} = \frac{文档总数+1}{文档频率+1}$
- 此处 |D| 是语料库中总的文档数。
- 公式中使用log函数,当词出现在所有文档中时,它的IDF值变为0。
- 加1是为了避免分母为0的情况。
TF-IDF 度量值表示为:$TFIDF(t,d,D) = TF(t,d) · IDF(t,D)$
在Spark ML库中,TF-IDF被分成两部分:TF (+hashing) 和 IDF。
**TF:**HashingTF 是一个Transformer,在文本处理中,接收词条的集合然后把这些集合转化成固定长度的特征向量。这个算法在哈希的同时会统计各个词条的词频。
**IDF:**IDF是一个Estimator,在一个数据集上应用它的fit()方法,产生一个IDFModel。 该IDFModel 接收特征向量(由HashingTF产生),然后计算每一个词在文档中出现的频次。IDF会减少那些在语料库中出现频率较高的词的权重。
Spark.mllib 中实现词频率统计使用特征hash的方式,原始特征通过hash函数,映射到一个索引值。后面只需要统计这些索引值的频率,就可以知道对应词的频率。这种方式避免设计一个全局1对1的词到索引的映射,这个映射在映射大量语料库时需要花费更长的时间。但需要注意,通过hash的方式可能会映射到同一个值的情况,即不同的原始特征通过Hash映射后是同一个值。为了降低这种情况出现的概率,我们只能对特征向量升维。i.e., 提高hash表的桶数,默认特征维度是 $2^{20} = 1,048,576$
在下面的代码段中,我们以一组句子开始。首先使用分解器Tokenizer把句子划分为单个词语。对每一个句子(词袋),我们使用HashingTF将句子转换为特征向量,最后使用IDF重新调整特征向量。这种转换通常可以提高使用文本特征的性能。
2.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| package hnbian.spark.ml.feature.extractors import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer} import org.apache.spark.sql.SparkSession
object TF_IDF extends App { val conf = new SparkConf().setAppName("TF_IDF") conf.setMaster("local[4]") val sc = new SparkContext(conf) val spark = SparkSession.builder().getOrCreate() sc.setLogLevel("Error") import spark.implicits._ val sentenceData = spark.createDataFrame(Seq( (0,"I heard about Spark and If love Spark"), (0,"I wish Java could Spark use case classes"), (1,"Logistic regression Spark models are neat"), )).toDF("label","sentence") sentenceData.show(false)
val tokenizer = new Tokenizer() .setInputCol("sentence") .setOutputCol("words") val wordsData = tokenizer.transform(sentenceData) wordsData.show(false)
val hashingTF = new HashingTF() .setInputCol("words") .setOutputCol("newFeatures") .setNumFeatures(2000) val featurizedData = hashingTF.transform(wordsData) featurizedData.show(false)
val idf = new IDF().setInputCol("newFeatures").setOutputCol("features") val idfModel = idf.fit(featurizedData) val rescaledData = idfModel.transform(featurizedData) rescaledData.show(false)
rescaledData.select("features", "label").take(3).foreach(println)
}
|
3. Word2Vec
3.1 算法介绍
类别:estimator【评估器】
Word2Vec 是一种著名的 词嵌入(Word Embedding) 方法,它可以计算每个单词在其给定语料库环境下的 分布式词向量(Distributed Representation,亦直接被称为词向量)。
词向量表示可以在一定程度上刻画每个单词的语义。如果词的语义相近,它们的词向量在向量空间中也相互接近,这使得词语的向量化建模更加精确,可以改善现有方法并提高鲁棒性。
词向量已被证明在许多自然语言处理问题中具有非常重要的作用。如:机器翻译,标注问题,实体识别等
Word2Vec是一个Estimator,它采用一系列代表文档的词语来训练word2vecmodel。
该模型将每个词语映射到一个固定大小的向量。word2vecmodel使用文档中每个词语的平均数来将文档转换为向量,然后这个向量可以作为预测的特征,来计算文档相似度计算等等。
Word2Vec具有两种模型:
- CBOW :思想是通过每个词的上下文窗口词词向量来预测中心词的词向量。
- Skip-gram:思想是通过每个中心词来预测其上下文窗口词,并根据预测结果来修正中心词的词向量。
在ml 库中,Word2Vec 的实现使用的是skip-gram模型。Skip-gram的训练目标是学习词表特征向量分布,其优化目标是在给定中心词的词向量情况下,最大化一下似然函数: $\frac{1}{T} \sum_{t=0}^1 \sum_{j=k-1}^{j=k} logp(W_{t+j}|W_t) $
其中 $w_1 … w_t$ 是一系列词序列,这里 $w_t$ 表示中心词,而 $w_{t+j}(j \in [-k,k])$ 是上下文窗口中的词。这里每个上下文窗口的词 $w_i$ 在给定中心词 $w_j$ 下的条件概率由 softmax 函数的形式计算,如下公式所示,其中 $u_w$ 和 $u_w$ 分别代表当前词的词向量以及当前上下文的词向量表示: $p(w_i|w_j) = \frac{exp(u_{w_i}^T u_{w_i})}{\sum_{l=1}^V exp(u_l^T u_{w_i})}$
因为 Skip-gram 模型使用的 softmax 计算较为复杂,所以 ml 与经典的额 Word2Vec 采用了相同的策略,使用 huffman 树来进行层次 softmax(hiercahical Softmax ) 方法来进行优化,使得 $log p (|w_i w_j)$ 计算复杂度从 $ O(V) $ 降到了 O(log(V))。
下面的代码段中,我们首先用一组文档,其中一个词语序列代表一个文档。对于每个文档,我们将其转换为一个特征向量。此特征向量可以被传递到一个学习算法。
3.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| package hnbian.spark.ml.feature.extractors import org.apache.spark.ml.feature.Word2Vec import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession
object Word2Vec extends App { val conf = new SparkConf().setAppName("Word2Vec") conf.setMaster("local[4]") val sc = new SparkContext(conf) val spark = SparkSession.builder().getOrCreate() sc.setLogLevel("Error") val documentDF = spark.createDataFrame(Seq( "Hi I heard about Spark".split(" "), "I wish Java could use case classes".split(" "), "Logistic regression models are neat".split(" ") ).map(Tuple1.apply)).toDF("text") documentDF.show(false)
val word2Vec = new Word2Vec(). setInputCol("text"). setOutputCol("result"). setVectorSize(5). setMinCount(0) val model = word2Vec.fit(documentDF) model.transform(documentDF).show(false)
}
|
4. CountVectorizer
4.1 算法介绍
类别:estimator【评估器】
CountVectorizer旨在通过计数来将一个文档转换为向量。当不存在先验字典时,CountVectorizer为Estimator取词汇进行训练,并生成一个 CountVectorizerModel 用于存储相应的词汇向量空间。该模型产生文档关于词语的稀疏表示,其表示可以传递给其他算法,例如LDA。
在CountVectorizerModel 的训练过程中,CountVectorizer将根据语料库中的词频排序从高到低进行选择,词汇表的最大含量由vocabsize超参数来指定,超参数minDF,则指定词汇表中的词语至少要在多少个不同文档中出现。
4.2 代码示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| package hnbian.spark.ml.feature.extractors import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
object CountVectorizer extends App { val conf = new SparkConf().setAppName("CountVectorizer") conf.setMaster("local[4]") val sc = new SparkContext(conf) val spark = SparkSession.builder().getOrCreate() sc.setLogLevel("Error") val df = spark.createDataFrame(Seq( (0, Array("a", "b", "c", "E", "E", "E", "d", "d")), (1, Array("a", "b", "b", "E", "c", "a")) )).toDF("id", "words") df.show(false)
val cvModel: CountVectorizerModel = new CountVectorizer() .setInputCol("words") .setOutputCol("features") .setVocabSize(3) .setMinDF(2) .fit(df) println(cvModel.vocabulary.toBuffer) cvModel.transform(df).show(false)
val cvm = new CountVectorizerModel(Array("a", "b", "c")). setInputCol("words"). setOutputCol("features") println(cvm.vocabulary.toBuffer) cvm.transform(df).show(false)
}
|