Spark源码分析之RDD下的DecisionTree/RandomForest的数据结构

标签:#scala##spark##决策树##随机森林# 时间:2018/08/02 17:23:34 作者:小木

决策树算法中用到了许多数据结构,这里我们解析一下。

[TOC]

一、决策树元数据(DecisionTreeMetadata)

用以描述决策树输入数据的数据结构,主要是关于各种特种的描述等。其属性包括:

val numFeatures: Int,            //特征数量
val numExamples: Long,            //数据数量
val numClasses: Int,            //类别数量
val maxBins: Int,                //最大箱数
val featureArity: Map[Int, Int],    //特征元数
val unorderedFeatures: Set[Int],    //无序特征集合
val numBins: Array[Int],            //箱数
val impurity: Impurity,                //不纯度(指使用哪种信息差异公式计算)
val quantileStrategy: QuantileStrategy,    //分位数策略
val maxDepth: Int,            //最大数深度
val minInstancesPerNode: Int,            //每个节点最小的实例数
val minInfoGain: Double,                //最小的信息增益
val numTrees: Int,                        //数的数量
val numFeaturesPerNode: Int                //每个节点的特征数

我们对其中某些属性解释下。

maxBins:这个属性是最大箱数,它是所有属性中最大箱数的值。

numBins:这个是每个特征下的箱数,是一个数组。

impurity:这个是不纯度,在决策树中需要计算特征之间信息量的差异(特征的能力),有多种计算方法,这里就是表明要使用哪种计算的,现有的针对分类问题计算包括两种:信息增益(entropy)和GINI不纯度(gini),针对回归问题使用的是variance。

numClasses:这个属性是类别的数量,对于分类问题来说,这个值就是不同的类标签数量,如果是回归问题,这个属性就是0。

featureArity:这个是特征的元数。也就是特征下不同值的数量,是一个K-V结构,K是特征索引,V是特征下不同取值的数量。注意,这个K仅仅是针对分类特征来说,连续变量的特征没有元数。有一个需要特别注意的是,这里的属性变量应该是经过重新编码,比如特征总共有N个,那么特征应该从0到N-1编号过,因此,所有的特征属性值范围都应该在0到N-1之间。

quantileStrategy:分位数策略,包含三种,分别是sort/MinMax/ApproxHist

二、分割点(Split)

分割点就是一个特征索引和double数组组成的数据结构,它表明的是哪个特征下的分割点。可以通过这个而判断数据在分割点左边还是右边,表明一组数据划分的临界点。决策树需要对特征做划分,也就是把特征下的数据分到不同的箱子中,在分类特征变量下,一个箱子一般就包含了一个特征值,那么分割点其实就是各个数据点之间的中间点。而连续变量则需要离散化(参考:https://www.datalearner.com/blog/1051533040913424 ),离散化之后也会有一个double数组表明数据的划分结果,把这个数组每个值拿出来,和索引一起就可以组成Splits的数组了。

三、策略(Strategy)

这个类是用来存储决策树配置信息的。其包含的变量如下:

@Since("1.0.0") @BeanProperty var algo: Algo,        //学习目标,Classification/Regression
@Since("1.0.0") @BeanProperty var impurity: Impurity,    //信息差异的计算,共三种。针对分类问题计算包括两种:信息增益(entropy)和GINI不纯度(gini),针对回归问题使用的是variance。
@Since("1.0.0") @BeanProperty var maxDepth: Int,    //树的最大深度
@Since("1.2.0") @BeanProperty var numClasses: Int = 2,    //类标签数量
@Since("1.0.0") @BeanProperty var maxBins: Int = 32,    //最大的箱数
@Since("1.0.0") @BeanProperty var quantileCalculationStrategy: QuantileStrategy = Sort,    //分位数策略
@Since("1.0.0") @BeanProperty var categoricalFeaturesInfo: Map[Int, Int] = Map[Int, Int](),    //类别特征信息
@Since("1.2.0") @BeanProperty var minInstancesPerNode: Int = 1,    //每个节点的最小实例数
@Since("1.2.0") @BeanProperty var minInfoGain: Double = 0.0,    //最小的信息增益
@Since("1.0.0") @BeanProperty var maxMemoryInMB: Int = 256,    //最大的内存
@Since("1.2.0") @BeanProperty var subsamplingRate: Double = 1,    //抽样率,这个是指定每个棵树用多少数据集进行训练的意思,默认为1,范围是0-1之间,值越小训练的速度越快,但是值越大训练的数据集比例越高,效果一般也越好。
@Since("1.2.0") @BeanProperty var useNodeIdCache: Boolean = false,    //是否缓存节点ID
@Since("1.2.0") @BeanProperty var checkpointInterval: Int = 10    //检测间隔

四、带标签的RDD(RDD[LabeledPoint])

这是Spark为机器学习开发的一个RDD数据对象,一个LabeledPoint是一个数据点,里面包含了这个数据的特征features(Vector)和标签label(Double)。

五、树形的RDD(RDD[TreePoint])

在决策树中,树的数据结构非常重要。在Spark中,实现了一个树形RDD的数据结构TreePoint。和LabeledPoint不同的是,TreePoint将特征向量features(Vector)转成了一个 binnedFeatures(Array[Int]),也就是把数据的特征值变成了某个箱子的索引(分箱的结果)。通过输入数据、元数据、特征值切分点来构造。逻辑是根据分类属性的元数和连续属性的箱子数来转换。核心的两个方法如下:

/**
   * Convert an input dataset into its TreePoint representation,
   * binning feature values in preparation for DecisionTree training.
   * @param input     Input dataset.
   * @param splits    Splits for features, of size (numFeatures, numSplits).
   * @param metadata  Learning and dataset metadata
   * @return  TreePoint dataset representation
   */
  def convertToTreeRDD(
      input: RDD[LabeledPoint],
      splits: Array[Array[Split]],
      metadata: DecisionTreeMetadata): RDD[TreePoint] = {
    // Construct arrays for featureArity for efficiency in the inner loop.
    // 构造一个数组,其长度是特征长度,元素的值是该特征对应的特征值的数量,如果是分类属性,直接获取元数据中的featureArity对应的值即可。如果是连续变量则获取该特征下切分点中对应的threshold,threshold就是表明如果值小于它则在树的左边,否则在树的右边
    val featureArity: Array[Int] = new Array[Int](metadata.numFeatures)
    var featureIndex = 0
    while (featureIndex < metadata.numFeatures) {
      featureArity(featureIndex) = metadata.featureArity.getOrElse(featureIndex, 0)
      featureIndex += 1
    }

    // thresholds是一个二维数组,是指连续属性下threshold的数组
    val thresholds: Array[Array[Double]] = featureArity.zipWithIndex.map { case (arity, idx) =>
      if (arity == 0) {
        splits(idx).map(_.asInstanceOf[ContinuousSplit].threshold)
      } else {
        Array.empty[Double]
      }
    }

    // 根据分类属性的数量和连续属性的threshold数组来转换RDD成树形,也就是找出每个特征下每个元素对应的箱子的ID
    input.map { x =>
      TreePoint.labeledPointToTreePoint(x, thresholds, featureArity)
    }
  }

  /**
   * Convert one LabeledPoint into its TreePoint representation.
   * 这个方法就是返回某个特征下某个数据点的箱子的ID
   * @param thresholds  For each feature, split thresholds for continuous features,
   *                    empty for categorical features.
   * @param featureArity  Array indexed by feature, with value 0 for continuous and numCategories
   *                      for categorical features.
   */
  private def labeledPointToTreePoint(
      labeledPoint: LabeledPoint,
      thresholds: Array[Array[Double]],
      featureArity: Array[Int]): TreePoint = {
    val numFeatures = labeledPoint.features.size
    val arr = new Array[Int](numFeatures)
    var featureIndex = 0
    while (featureIndex < numFeatures) {
      arr(featureIndex) =
        findBin(featureIndex, labeledPoint, featureArity(featureIndex), thresholds(featureIndex))
      featureIndex += 1
    }
    new TreePoint(labeledPoint.label, arr)
  }

  /**
   * Find discretized value for one (labeledPoint, feature).
   * 寻找变量值对应的箱子ID,如果是连续变量,则在上述的threshold数组中找,否则在特征元中找
   * NOTE: We cannot use Bucketizer since it handles split thresholds differently than the old
   *       (mllib) tree API.  We want to maintain the same behavior as the old tree API.
   *
   * @param featureArity  0 for continuous features; number of categories for categorical features.
   */
  private def findBin(
      featureIndex: Int,
      labeledPoint: LabeledPoint,
      featureArity: Int,
      thresholds: Array[Double]): Int = {
    val featureValue = labeledPoint.features(featureIndex)

    // 特征元数等于0表明是连续变量
    if (featureArity == 0) {
      val idx = java.util.Arrays.binarySearch(thresholds, featureValue)
      if (idx >= 0) {
        idx
      } else {
        -idx - 1
      }
    } else {
      // 如果是分类属性,先判断这个值是否在范围内,如果不在报错,否则就把这个值转成整形返回
      // Categorical feature bins are indexed by feature values.
      if (featureValue < 0 || featureValue >= featureArity) {
        throw new IllegalArgumentException(
          s"DecisionTree given invalid data:" +
            s" Feature $featureIndex is categorical with values in {0,...,${featureArity - 1}," +
            s" but a data point gives it value $featureValue.\n" +
            "  Bad data point: " + labeledPoint.toString)
      }
      featureValue.toInt
    }
  }

六、装袋的RDD(RDD[BaggedPoint])

装袋的数据点是指为了实现bagging方法而设计的,决策树算法是用的一棵树的随机森林实现的,随机森林是bagging集成方法,所以用到了这个BaggedPoint。这个RDD就是根据要求,将数据抽样成几份。比如如果我用三棵树作为随机森林的参数,每棵树用90%的数据训练,那么这里就是把数据抽样成三份,每份数据占原来的数据的90%。当然,我们可以指定抽样过程是有放回的抽样还是无放回的抽样。该数据结构最后返回的属性有两个:

datum: Datum        // 数据实例(Datum是泛型),是一个数据点的意思
subsampleWeights: Array[Double]        //数据的份数,是个向量,其维度是抽样的份数,取值的结果表明当前数据点在对应样本中出现的次数,因此,这里向量中取值是自然数。

举个例子,假设原始数据中包含这样的一个RDD的数据点features=(2.0, 2.0, 3.0),抽样3次之后该数据点变成了BaggedPoint(datum=(2.0, 2.0, 3.0),subsampleWeights=[1, 0, 4]),这个含义是在最终的三份抽样结果的数据中,在第一份样本中,这个数据点被抽中了1次,在第二份样本中,这个数据点被抽中了0次,第三份样本中,这个数据点被抽中了4次。

其主要的转化原理如下:

/** 需要给定如下参数:
 * input:输入的RDD数据集
 * subsamplingRate:每个样本需要抽样的数据占原始数据集的比例
 * numSubsamples:需要抽样的份数
 * withReplacement:是否有放回的抽样
 * seed:随机抽样的种子,对同一台机器来说,相同的种子会生成相同的随机数结果
 */
def convertToBaggedRDD[Datum] (
      input: RDD[Datum],
      subsamplingRate: Double,
      numSubsamples: Int,
      withReplacement: Boolean,
      seed: Long = Utils.random.nextLong()): RDD[BaggedPoint[Datum]] = {
    // 如果是有放回的抽样,那就直接抽样
    if (withReplacement) {
        convertToBaggedRDDSamplingWithReplacement(input, subsamplingRate, numSubsamples, seed)
    } else {

            // 无放回的抽样情况下,抽样比例为1的时候,抽样结果就是原数据
        if (numSubsamples == 1 && subsamplingRate == 1.0) {
            convertToBaggedRDDWithoutSampling(input)
        } else {
            // 无放回的抽样
            convertToBaggedRDDSamplingWithoutReplacement(input, subsamplingRate, numSubsamples, seed)
        }
    }
}
  // 无放回的抽样,无放回的抽样,每个样本只能抽样一次,因此,抽样份数subsampleWeights结果只能是1或者是0
  // 这里用随机数来确定抽样的对象,循环所有的数据,每个数据循环numSubsamples次,每次产生一个随机数,如果该值小于抽样率,这个数据点就被抽中了。这是轮盘赌的原理。
  private def convertToBaggedRDDSamplingWithoutReplacement[Datum] (
      input: RDD[Datum],
      subsamplingRate: Double,
      numSubsamples: Int,
      seed: Long): RDD[BaggedPoint[Datum]] = {
    input.mapPartitionsWithIndex { (partitionIndex, instances) =>
      // Use random seed = seed + partitionIndex + 1 to make generation reproducible.
      val rng = new XORShiftRandom
      rng.setSeed(seed + partitionIndex + 1)
      instances.map { instance =>
        val subsampleWeights = new Array[Double](numSubsamples)
        var subsampleIndex = 0
        while (subsampleIndex < numSubsamples) {
          val x = rng.nextDouble()
          subsampleWeights(subsampleIndex) = {
            if (x < subsamplingRate) 1.0 else 0.0
          }
          subsampleIndex += 1
        }
        new BaggedPoint(instance, subsampleWeights)
      }
    }
  }

  // 有放回的抽样,用的是泊松分布来产生下一个随机数
  private def convertToBaggedRDDSamplingWithReplacement[Datum] (
      input: RDD[Datum],
      subsample: Double,
      numSubsamples: Int,
      seed: Long): RDD[BaggedPoint[Datum]] = {
    input.mapPartitionsWithIndex { (partitionIndex, instances) =>
      // Use random seed = seed + partitionIndex + 1 to make generation reproducible.
      val poisson = new PoissonDistribution(subsample)
      poisson.reseedRandomGenerator(seed + partitionIndex + 1)
      instances.map { instance =>
        val subsampleWeights = new Array[Double](numSubsamples)
        var subsampleIndex = 0
        while (subsampleIndex < numSubsamples) {
          subsampleWeights(subsampleIndex) = poisson.sample()
          subsampleIndex += 1
        }
        new BaggedPoint(instance, subsampleWeights)
      }
    }
  }

  private def convertToBaggedRDDWithoutSampling[Datum] (
      input: RDD[Datum]): RDD[BaggedPoint[Datum]] = {
    input.map(datum => new BaggedPoint(datum, Array(1.0)))
  }

七、节点(Node)

这里的节点是org.apache.spark.mllib.tree.model.Node的节点,是决策树中用到的节点实现类。其包含的属性有:

@Since("1.0.0") val id: Int, // 节点ID,从1开始,1是根节点,2、3是左右节点
@Since("1.0.0") var predict: Predict,    // 该节点的预测结果
@Since("1.2.0") var impurity: Double,    // 当前节点的不纯度
@Since("1.0.0") var isLeaf: Boolean,    // 是否是叶子节点
@Since("1.0.0") var split: Option[Split],    // 这是一个切分点,表明应该往左边节点走还是往右边节点走
@Since("1.0.0") var leftNode: Option[Node],    // 左子节点
@Since("1.0.0") var rightNode: Option[Node],// 右子节点
@Since("1.0.0") var stats: Option[InformationGainStats]    // 当前节点的信息增益

这个数据节点没什么好说的,主要是注意里面用了递归的方法预测结果,一直到最终叶子节点为止会给出预测结果。

八、学习节点(LearningNode)

这个节点是一个变量,方便我们修改节点状态的,它可能是一个内部节点,也可能是一个叶子节点。最终它可以被转成一个正常的节点。

欢迎大家关注DataLearner官方微信,接受最新的AI技术推送