# Spark ML下实现的多分类AdaBoost + NaiveBayes算法

### 1. Naive Bayes算法

朴素贝叶斯算法算是生成模型中一个最经典的分类算法之一了，常用的有Bernoulli和Multinomial两种。在文本分类上经常会用到这两种方法。在词袋模型中，对于一篇文档 $$d$$ 中出现的词 $$w\_0,w\_1,...,w\_n$$ , 这篇文章被分类为 $$c$$ 的概率为$$p(c|w\_0,w\_1,...,w\_n) = \frac{p(c,w\_0,w\_1,...,w\_n)}{p(w\_0,w\_1,...,w\_n)} = \frac{p(w\_0,w\_1,...,w\_n|c)\*p(c)}{p(w\_0,w\_1,...,w\_n)}$$ 对于一篇给定文章，分母为常数，基于朴素贝叶斯的各词在一篇文章中出现独立性假设，最后我们需要比较的就是在不同类别 $$c$$ 下 $$p(w\_0|c)p(w\_1|c)...p(w\_n|c)p(c)$$ 的大小。

naive bayes模型的参数就是在每个类别 $$c$$ 下各词出现的概率的 $$p(w\_0|c),p(w\_1|c),...,p(w\_n|c))$$ 和该类别出现的概率 $$p(c)$$ ，参数的估计通常就是根据训练样本进行词频的统计并计算相应概率，其中$$p(c) = \frac{count(c)}{count(doc)}$$，即为训练数据中c类别文章的总数量除以训练集中文章的总数量。针对 $$p(w\_i|c)$$ 的估计，Bernoulli和Multinomial略有不同。

#### Bernoulli

文章中某词 $$w\_i$$ 出现过，则记为1，所以$$p(w\_i|c) = \frac{count(w\_i,c)}{count(c)}$$ 即为在类别为c的训练集文章中出现词 $$w\_i$$ 的文章数量除以训练集中为别为c的文章总数量

#### Multinomial

这种情况下文章的词并不是非0即1的one hot特征，而是带有权重的数值特征，通常可以使用tf或者tf-idf值。$$p(w\_i|c) = \frac{T\_{ci}}{\sum\_{t}{T\_{ct}}}$$ 其中 $$T\_{ci}$$ *为类别c的训练文章中词* $$w\_i$$ *的所有权重和，* $$\sum{t}{T\_{ct}}$$ 为类别c的文章中所有词的权重之和。预测的时候对于词 $$w\_i$$ 计算该词在该文章中的权重 $$T\_i$$ ，使用 $$p(w\_i|c)^{T\_i}$$ 作为连乘部分的概率。不过实际上经常使用对数概率，所以可以将指数运算变为乘法运算，在代码中就可以利用矩阵相乘直接计算。

还有一些细节问题，例如数据稀疏，平滑处理等因为不是本文的重点，这里就不详细解释了。

### 2. Adaboost算法

作为一种boosting方法，adaboost在很多算法上都有着不俗的表现。不过在基于naive bayes的文档分类领域，貌似实际效果很一般。在stack overflow上也看到有人讨论，说adaboost对于多个弱分类器的提升效果很不错，但是naive bayes的文档分类通常已经有很不错的表现了，提升效果一般。不过不管效果提升怎么样，实现一下试试也没什么坏处，顺便还可以熟悉一下spark的相关操作。经典的adaboost算法适用于二分类的情况，但是我们的文本是多分类的情况，依靠多个二分类器表决不失为一种方法，但是比较麻烦，好在找到了介绍多分类adaboost算法的论文，照着论文依葫芦画瓢也不难。下面先分别多分类和二分类的adaboost

#### 2.1 二分类adaboost

对于给定的二类分类的训练数据集$$T = {(x\_1, y\_2),(x\_2, y\_2)...,(x\_N, y\_N)}$$ 其中每个 $$x$$ 是一个样本的特征向量， $$y\in{-1, +1}$$ ，算法流程如下：

1. 初始化各个样本的权重为$$D\_1 = (w\_{11}, w\_{12}, ... , w\_{1i}, ... , w\_{1N}), w\_{1i} = \frac{1}{N}, i = 1, 2, ... , N$$
2. 对于第m次迭代， $$m = 1, 2, ..., M$$ ：
   * 每次迭代使用带有当前权重 $$D\_m$$ 的样本进行训练，得到一个基本分类器 $$G\_m(x)$$&#x20;
   * 计算在分类器 $$D\_m$$ 下，训练样本分类结果的误差率 $$e\_m = \sum^{N}{i = 1}{w\_{mi}I(G\_m{x\_i} \neq{y\_i})}$$ ，因为每一步权重都做了归一化，所以分母不用再除以样本权重之和
   * 根据误差率 $$e\_m$$ 计算分类器 $$D\_m$$ 的系数 $$\alpha\_m = log\frac{1-e\_m}{e\_m}$$
   * 根据系数 $$\alpha\_m$$ *更新各样本的权重* $$D\_{m+1} = {w\_{m+1, 1}, w\_{m+1, 2}, ... , w\_{m+1, N}} w\_{m+1, i} = w\_{m, i}  exp(\alpha\_m  I(G\_m{x\_i} \neq{y\_i}))$$&#x20;
   * 对 $$D\_{m+1}$$ 做归一化处理， $$\sum{i = 1}^{N}{w\_{m+1, i}} = 1$$&#x20;
3. 最后对多个分类器 $$D\_m$$ 的结果进行加权表决， $$c(x) = argmax\_k\sum{m = 1}^{M}{\alpha\_m\*I(D\_m(x) = k)}$$&#x20;

注意到对于二分类的adaboost需要每次的分类误差率 $$e\_m \leq{\frac{1}{2}}$$ ，否则的话将会导致 $$\alpha\_m < 0$$ ，然后样本权重的更新将会朝着反方向进行。

#### 2.2 多分类adaboost

对于K分类的情况，算法基本与二分类的情况一致。但是要求每次的分类误差率 $$e\_m \leq{\frac{1}{2}}$$ 是非常困难的，联系到二分类误差率阈值选择 $$\frac{1}{2}$$ ，K分类的情况选择误差率为 $$\frac{K-1}{K}$$ ，然后 $$\alpha\_m$$ 的计算改为 $$\alpha\_m = log(\frac{1-e\_m}{e\_m}) + log(K-1)$$ 容易验证只要 $$e\_m \leq{\frac{K-1}{K}}$$ ，则有 $$\alpha\_m \geq{log(\frac{1-\frac{K-1}{K}}{\frac{K-1}{K}}) + log(K-1)} = log(\frac{1}{K-1}) + log(K-1) = 0$$ ，这种情况下，多分类adaboost对于被误分的样本的侧重加大了，因为 $$\alpha\_m$$ 因为添加了正项 $$log(K-1)$$ 而增大了。

adaboost的一种解释是模型为加法模型，损失函数为指数函数，学习算法为前向分步算法的分类算法，这个以后再另外写一篇。这里给出一个比较直观好懂的解释：

* 迭代过程中误差率小的模型具有大的模型系数，也就是说表现好的子模型在最后加权的时候具有更大的“话语权”
* 迭代过程中上一次被误分的样本在下一次训练时将会具有更大的权重，更容易被分类正确

### 3. Spark ML的使用

提到Spark ml就不得不提Spark mllib，两者的区别主要在于ml面向的数据是Dataset，而mllib面向的是rdd，Dataset相当于在底层rdd的基础上做了进一步的优化。而且ml中一系列算法更适合创建包含从数据清洗到特征工程再到模型训练等一系列工作的ML pipelines，这个类似于sklearn中的pipeline，非常简洁好用。

pipeline中的Transformer，Estimator，Stage等概念[官方文档](https://spark.apache.org/docs/latest/ml-pipeline.html)上写的很清楚，而且还有事例，就不在这里解释了。这里以naive bayes为例简单介绍一下怎么利用spark ml的pipelines进行机器学习模型的训练和预测。

首先是pipelines的创建：

```scala
#!scala
// pipeline for train 
def createPipiline(dataset: Dataset[_]): 
    Pipeline ={ 
    //step 1 sentence 拆成 words 
    val tokenizer = new RegexTokenizer().setInputCol("sentence").setOutputCol("words").setPattern(",") 
    //step 2 label 转化为以0开始的labelIndex 为了适应spark.ml 
    val indexer = new StringIndexer().setInputCol("label").setOutputCol("labelIndex").fit(dataset) 
    //step3 统计tf词频 
    val countModel = new CountVectorizer().setInputCol("words").setOutputCol("rawFeatures") 
    //step4 tf-idf 10 
    val idfModel = new IDF().setInputCol("rawFeatures").setOutputCol("features") 
    //step5 normalize tf-idf vector 
    val normalizer = new Normalizer().setInputCol("features").setOutputCol("normalizedFeatures") 
    //step6 naive bayes model 
    val naiveBayes = new NaiveBayes().setFeaturesCol("normalizedFeatures").setLabelCol("labelIndex").setWeightCol("obsWeights").setPredictionCol("prediction").setModelType("multinomial").setSmoothing(1.0) 
    //step7 predict label to real label 
    val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(indexer.labels) 
    newPipeline().setStages(Array(tokenizer, indexer, countModel, idfModel, normalizer, naiveBayes, labelConverter)) }
```

这里注意到我在创建这个pipeline的时候还传入了训练数据，但是一般情况下训练数据是在拟合模型而不是在模型建立的时候就提前传入的。这里是因为最后面那个labelConverter的transformer需要使用indexer.labels这个参数，而indexer要获取这个参数就要提前拟合训练数据，也就是indexer的创建发生在整个pipeline的拟合之前，所以我就先穿入了训练数据集。注意到这里训练数据就相当于被训练了两次，所以可以先cache()操作一下。

pipeline创建好以后的使用就相对简单多了，传入数据就可以了。

```scala
#!scala
val pipeline = ModelUsage.createPipeline(dataRDDTrain) 
// train and test 
val combinedModel = pipeline.fit(dataRDDTrain) 
val predictResult = combinedModel.transform(dataRDDTest).select("predictedLabel", "label").rdd.map(row => (row.getDouble(0), row.getDouble(1))) 
val evaluator = newMulticlassMetrics(predictResult) 

println("confusionMatrix:") 
println(evaluator.confusionMatrix) 
println(evaluator.accuracy)
```

注意到ML拟合的结果都是Double类型的，比如说我一个label是55但是输出是55.0，评估模型准确度的时候注意一下就好，影响不大。

Spark ML的一个好处是数据dataset像水一样通过预先创建好的pipeline，可以指定每一个stage处理的column名，再添加生成的数据到新的一列。自始至终，这些中间数据都在结果的dataset里，想要哪些数据指定列名就可以了。这样的话就避免了每次都要处理数据使它们符合中间模型的输入结构，而且最后还要自己再整合需要的字段到一起。

由于我们的文章数据特点比较鲜明，没有任何参数调优，在4w(80% train 20% test)的四分类数据上就已经有了95%的正确率了。

### 4. 自定义扩展Spark ML

既然直接用现有naive bayes模型就已经有了95%的正确率，那要是加上adaboost呢？

直接实现adaboost算法很简单，但是毕竟spark ml的pipeline这么好用，而dataset这么好的封装加上这么多现有的类似StringIndxer等工具类transformer总不能全部重写吧。所以就想到怎么去自定义一个跟Spark ML兼容的model，上网查了查相关资料，在已有的naivebayes模型基础上进行了改进实现了与Spark ML兼容的adaboost naivebayes model。

**注意**

1. 由于我们的模型需要先拟合训练数据得到模型，随后才能使用模型，这里面分别涉及到estimator和transformer，因此我们需要分别实现这两个部分。
2. 我要实现的adaboost+naivebayes模型是一个概率模型，因此我的Estimator和Transformer分别继承自ProbabilisticClassifier和ProbabilisticClassificationModel，而不是最原始的Estimator和Transformer，这样就减少了很多不必要的代码重写，但是如果是想玩玩整整自己实现一个模型的话就要从最基本的一点点开始写了，可以参照上面第一篇文章所讲，这里就不多细说了。

当然可能会有疑问，既然可以继承ProbabilisticClassifier，那为什么不直接集成NaiveBayes不是更简单么？我一开始也是这样想的，但是发现Spark ML里NaiveBayes里大部分方法和属性都是私有或者受保护的，我要改就得修改Spark源码，但是我的Spark程序是在公司服务器运行的，总不能每次都让公司用我改过之后的Spark包吧。。。

#### 4.1 模型参数

首先，对于任何一个模型模型的训练，我们一般都会需要传递一些参数，这里利用scala的trait实现一个参数接口。

```scala
#!scala
trait AdaboostNaiveBayesParams extendsParams { 
    //进行adaboost时的最高迭代次数 
    final val maxIter: IntParam = new IntParam(this, "maxIter", "max number of iterations") 
    def getMaxIter: Int =$(maxIter) 
        //进行adaboost时准确率变化小于某个阈值时迭代提前终止 
        final val threshold: DoubleParam = new DoubleParam(this, "threshold", "improvement threshold among iterations") 
    def getThreshold: Double =$(threshold) 
        //朴素Bayes的平滑系数 
        final val smoothing : DoubleParam = new DoubleParam(this, "smoothing", "naive bayes smooth") 
    def getSmoothing : Double =$(smoothing) 
        //朴素Bayes类型"multinomial"(default) and "bernoulli" 
        final val modelType : Param[String] = new Param[String](this, "modelType", "naive bayes model type") 
    def getModelType : String =$(modelType) 14 }       
```

这一部分没什么解释的，都是一些模型常用参数。

#### 4.2 模型Estimator

这一部分可以说是最重要的部分，Estimator拟合好了，Transformer基本属于调用一下就好了。先贴代码，再一行行解释。

```scala
#!scala
class AdaboostNaiveBayes(override val uid: String) extends ProbabilisticClassifier[Vector, AdaboostNaiveBayes, AdaboostNaiveBayesModel] with AdaboostNaiveBayesParams { 
    def this() = this(Identifiable.randomUID("AdaboostNaiveBayes")) 
    //model parameters assignment 
    def setMaxIter(value: Int): this.type =set(maxIter, value) 
    def setThreshold(value: Double): this.type =set(threshold, value)  
    def setSmoothing(value: Double): this.type =set(smoothing, value) 
    def setModelType(value: String): this.type =set(modelType, value) 

    setMaxIter(20) 
    setThreshold(0.02) 
    setSmoothing(1.0) 
    setModelType("multinomial") 

    //method used by fit()
    override protected def train(dataset: Dataset[_]): AdaboostNaiveBayesModel ={ 
        val datasetSize =dataset.count().toInt 
        val labelSize = dataset.select("label").distinct().count() 

        //各子模型及其权重 
        val modelWeights = newArray[Double]($(maxIter))  
        val modelArray = newArray[NaiveBayesModel]($(maxIter)) 
        var alpha = 0.0 
        //初始化各样本等权重 
        val dataWeight: (Double, Double, Double) => Double = (obsWeight: Double, labelIndex: Double, prediction: Double) =>{ 
            if (labelIndex ==prediction) { 
                obsWeight 
            }
            else{ 
                obsWeight *math.exp(alpha) 
            }
        } 
        val sqlfunc =udf(dataWeight) 
        //初始化还没有prediction 
        var temp = dataset.withColumn("obsWeights", lit(1.0)) 
        var i = 0 42 var error1 = 2.0 
        var error2 = 1.0//&& (error1 - error2) > $(threshold) 
        var weightSum = datasetSize.toDouble*datasetSize 

        while (i <$(maxIter)) { 
            val naiveBayes = new NaiveBayes().setFeaturesCol($(featuresCol)).setLabelCol($(labelCol)).setWeightCol("obsWeights").setPredictionCol($(predictionCol)).setModelType($(modelType)).setSmoothing($(smoothing)).fit(temp) 
            temp =naiveBayes.transform(temp).cache() 
            var error = temp.select("labelIndex", "prediction", "obsWeights").rdd.map(row =>{ 
                if (row(0) != row(1)) 
                    row.getDouble(2) 
                else 
                    0.0
            }).sum()/(datasetSize) 
        error1 =error2 
        error2 =error 
        alpha = Math.log((labelSize - 1) * (1 - error) /error) 
        modelWeights(i) =alpha 
        modelArray(i) =naiveBayes 
        //更新权重 
        temp = temp.withColumn("obsWeights", sqlfunc(col("obsWeights"), col($(labelCol)), col($(predictionCol)))); 
        weightSum = temp.select("obsWeights").rdd.map(row => (row.getDouble(0))).sum() 
        temp =temp.drop($(predictionCol), $(rawPredictionCol), $(probabilityCol)) 
        temp = temp.withColumn("obsWeights", col("obsWeights")/(weightSum/datasetSize))

        i += 1 } 
        newAdaboostNaiveBayesModel(uid, i, modelWeights, modelArray)  } 
        override def copy(extra: ParamMap): AdaboostNaiveBayes =defaultCopy(extra) }
```

1-3行是继承ProbabilisticClassifer和实现前面我们自己定义的AdaboostNaiveBayesParam参数接口，ProbabilisticClassifer的继承使用看看源码里NaiveBayes是怎么做的就可以照着学了。

5行是一个最基本的构造函数，分配给对象一个id值

77行是一个拷贝构造函数，这个必须要实现，最简单的可以直接像这里一样调用defaultCopy函数就好了。这个函数用来在引入新的参数的时候复制当前stage返回加入新参数后的一个新模型

8-11行是给模型设定初始参数用的，这几个函数没有定义在AdaboostNaiveBayesParam里是因为这些参数的传入只发生在模型拟合前，在预测的时候是不能设定的，所以对后面的Transformer应该是不可见的，因此只在这里定义。注意到这些函数的返回类型和模型类型一致，其实就是每一步都返回一个加入的参数的新的模型，这里就利用了之前的拷贝构造函数。

13-16行是给模型设定默认参数。

19行开始的train函数就是我们在对模型调用fit方法时使用的函数。返回的是一个AdaboostNaiveBayesModel，是我们随后需要定义的跟AdaboostNaiveBayes这个Estimator对应的Transformer。

21-22行分别获取数据集的数量和其中label的数量

25-26是初始化所有子模型及其权重，因为adaboost每一次迭代都会生成一个新的模型并计算该模型在最终结果投票时的权重。

30-38是一个自定义udf函数，对每个样本计算预测的label和真实label，并根据该样本的现有权重obsWeight进行更新，可以理解为如果分类正确，其权重不变，否则增大其权重。

40行 初始化所有样本为等权重，如果样本数据非常不平衡的话，可以尝试在这一步就引入偏差权重，我由于使用的数据各个类之间数量是一样的，所以全部初始话为1

41-44行初始化一些错误率等参数

46行开始进行adaboost迭代过程。

47-49行是在当前样本权重情况下调用普通的NaiveBayes进行训练的到当前迭代下的子模型

49行这个cache一定不能少，否则迭代的速度只能呵呵了，毕竟temp用到了非常多次的action。

51-57行是计算该模型的错误率

59-61行是更新误差，并计算该模型的权重alpha

63-64行是保存当前子模型和权重

66-69行是利用之前定义的udf函数更新所有样本的权重并对其进行归一化

74 行是利用计算得到的参数去构建一个AdaboostNaiveBayesModel，这里传入所有的子模型及其权重，i表示的是总迭代次数，就是子模型的数量。

#### 4.3 模型Transformer

这里要实现的AdaboostNaiveBayesModel是从ProbabilisticClassificationModel，因此要手动实现对应的必须要的几个方法。

代码如下：

```scala
#!scala
class AdaboostNaiveBayesModel(override val uid: String, val iternums: Int, val modelWeights: Array[Double], val modelArray: Array[NaiveBayesModel]) extendsProbabilisticClassificationModel[Vector, AdaboostNaiveBayesModel] with AdaboostNaiveBayesParams { 
    override val numClasses = modelArray(0).pi.size 
    private def multinomialCalculation(features: Vector): Vector = { 
        val result: Vector = new DenseVector(newArray(numClasses)) 
        for (i <- 0until iternums) { 
            val prob: Vector =modelArray(i).theta.multiply(features) 
            prob.foreachActive { (index, value) =>{ 
                prob.toArray(index) = value +modelArray(i).pi(index) 
            }   
        } 
        result.toArray(prob.argmax) = result(prob.argmax) +modelWeights(i) 
    } 
    result
} 
override def predictRaw(features: Vector): Vector ={ 
    multinomialCalculation(features) 
} 
override def raw2probabilityInPlace(rawPrediction: Vector): Vector = { 
    rawPrediction match { 
        case dv: DenseVector => 
            var i = 0 28 val size = dv.size 
            val maxLog = dv.values.max 
            for (i <- 0 until size) { 
                dv.values(i) = math.exp(dv.values(i) - maxLog)
            } 
            val probSum = dv.values.sum 
            for (i <- 0until size) { 
                dv.values(i) = dv.values(i) /probSum
            } 
            dv 
        case sv: SparseVector => throw new RuntimeException("Unexpected error in AdaboostNaiveBayesModel:  raw2probabilityInPlace encountered SparseVector")}}
    override def copy(extra: ParamMap) = {
        defaultCopy(extra)
    }
}
```

第5行是读取一下总的标签个数以供后面使用

44-46行是拷贝构造函数

20-22行是对一个输入计算它在各个label下的得分，这个得分的大小表示的是判断到该标签概率的大小，但是并不是概率值，因为我们的BayesModel模型参数是做了log变换的

24-43行是怎么讲结果向量转化为和为1的概率值，30-32行是个小技巧，我一开始好奇为什么一定要减掉maxLog，因为这个按理说并不会影响到后面的计算结果，后来发现这样能避免浮点数的问题，因为不减的话，会出现求完math.exp后值约为零的情况，导致后面的计算出现问题

这样就完成了概率模型需要的几个方法了，可以对一个输入给出一个概率向量，每个维度代表在这个类的概率。

### 5. 写在最后

利用自定义的adaboost+naivebayes模型，测试准确率从95%增加到了96.5%左右。由于训练数据比较好，95%已经很不错了，这里主要是通过写一个自定义模型学习一下Spark ML方面的知识。之前都只是听说过，从来没用过，学习一下还是很有必要的，毕竟不能总指望着单机就能搞定所有问题。

不过注意到这里我并不是从0开始造轮子，我是从ProbabilisticClassification继承过来加以修改的，如果想要做其他模型的修改还是推荐看上面的两篇文章，然后多看看Spark ML源码里类似的模型并根据自己的需要进行修改。

然后scala也是为了用Spark ML现学的，代码可以优化的地方估计很多。

这是本人第一篇博客，希望以后可以坚持写，作为对自己工作学习的总结笔记。

### 参考资料

\[1] [Multi-class AdaBoost](https://www.researchgate.net/profile/Trevor_Hastie/publication/228947999_Multi-class_AdaBoost/links/0c960521b946de42a9000000.pdf) - T Hastie, S Rosset, J Zhu, H Zou

\[2] [Extend Spark ML for your own model/transformer types](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types)

\[3] [spark的NaiveBayes实现源码](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://shangzhi-huang.gitbook.io/workspace/spark/spark-ml-xia-shi-xian-de-duo-fen-lei-adaboost-+-naivebayes-suan-fa.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
