Spark源码分析之RDD下的DecisionTree/RandomForest的数据结构
决策树算法中用到了许多数据结构,这里我们解析一下。
一、决策树元数据(DecisionTreeMetadata)
用以描述决策树输入数据的数据结构,主要是关于各种特种的描述等。其属性包括:
val numFeatures: Int, //特征数量
val numExamples: Long, //数据数量
val numClasses: Int, //类别数量
val maxBins: Int, //最大箱数
val featureArity: Map[Int, Int], //特征元数
val unorderedFeatures: Set[Int], //无序特征集合
val numBins: Array[Int], //箱数
val impurity: Impurity, //不纯度(指使用哪种信息差异公式计算)
val quantileStrategy: QuantileStrategy, //分位数策略
val maxDepth: Int, //最大数深度
val minInstancesPerNode: Int, //每个节点最小的实例数
val minInfoGain: Double, //最小的信息增益
val numTrees: Int, //数的数量
val numFeaturesPerNode: Int //每个节点的特征数
我们对其中某些属性解释下。
maxBins
:这个属性是最大箱数,它是所有属性中最大箱数的值。
numBins
:这个是每个特征下的箱数,是一个数组。
impurity
:这个是不纯度,在决策树中需要计算特征之间信息量的差异(特征的能力),有多种计算方法,这里就是表明要使用哪种计算的,现有的针对分类问题计算包括两种:信息增益(entropy)和GINI不纯度(gini),针对回归问题使用的是variance。
numClasses
:这个属性是类别的数量,对于分类问题来说,这个值就是不同的类标签数量,如果是回归问题,这个属性就是0。
featureArity
:这个是特征的元数。也就是特征下不同值的数量,是一个K-V结构,K是特征索引,V是特征下不同取值的数量。注意,这个K仅仅是针对分类特征来说,连续变量的特征没有元数。有一个需要特别注意的是,这里的属性变量应该是经过重新编码,比如特征总共有N个,那么特征应该从0到N-1编号过,因此,所有的特征属性值范围都应该在0到N-1之间。
quantileStrategy
:分位数策略,包含三种,分别是sort/MinMax/ApproxHist
二、分割点(Split)
分割点就是一个特征索引和double数组组成的数据结构,它表明的是哪个特征下的分割点。可以通过这个而判断数据在分割点左边还是右边,表明一组数据划分的临界点。决策树需要对特征做划分,也就是把特征下的数据分到不同的箱子中,在分类特征变量下,一个箱子一般就包含了一个特征值,那么分割点其实就是各个数据点之间的中间点。而连续变量则需要离散化(参考:https://www.datalearner.com/blog/1051533040913424 ),离散化之后也会有一个double数组表明数据的划分结果,把这个数组每个值拿出来,和索引一起就可以组成Splits的数组了。
三、策略(Strategy)
这个类是用来存储决策树配置信息的。其包含的变量如下:
@Since("1.0.0") @BeanProperty var algo: Algo, //学习目标,Classification/Regression
@Since("1.0.0") @BeanProperty var impurity: Impurity, //信息差异的计算,共三种。针对分类问题计算包括两种:信息增益(entropy)和GINI不纯度(gini),针对回归问题使用的是variance。
@Since("1.0.0") @BeanProperty var maxDepth: Int, //树的最大深度
@Since("1.2.0") @BeanProperty var numClasses: Int = 2, //类标签数量
@Since("1.0.0") @BeanProperty var maxBins: Int = 32, //最大的箱数
@Since("1.0.0") @BeanProperty var quantileCalculationStrategy: QuantileStrategy = Sort, //分位数策略
@Since("1.0.0") @BeanProperty var categoricalFeaturesInfo: Map[Int, Int] = Map[Int, Int](), //类别特征信息
@Since("1.2.0") @BeanProperty var minInstancesPerNode: Int = 1, //每个节点的最小实例数
@Since("1.2.0") @BeanProperty var minInfoGain: Double = 0.0, //最小的信息增益
@Since("1.0.0") @BeanProperty var maxMemoryInMB: Int = 256, //最大的内存
@Since("1.2.0") @BeanProperty var subsamplingRate: Double = 1, //抽样率,这个是指定每个棵树用多少数据集进行训练的意思,默认为1,范围是0-1之间,值越小训练的速度越快,但是值越大训练的数据集比例越高,效果一般也越好。
@Since("1.2.0") @BeanProperty var useNodeIdCache: Boolean = false, //是否缓存节点ID
@Since("1.2.0") @BeanProperty var checkpointInterval: Int = 10 //检测间隔
四、带标签的RDD(RDD[LabeledPoint])
这是Spark为机器学习开发的一个RDD数据对象,一个LabeledPoint是一个数据点,里面包含了这个数据的特征features(Vector)和标签label(Double)。
五、树形的RDD(RDD[TreePoint])
在决策树中,树的数据结构非常重要。在Spark中,实现了一个树形RDD的数据结构TreePoint。和LabeledPoint不同的是,TreePoint将特征向量features(Vector)转成了一个 binnedFeatures(Array[Int]),也就是把数据的特征值变成了某个箱子的索引(分箱的结果)。通过输入数据、元数据、特征值切分点来构造。逻辑是根据分类属性的元数和连续属性的箱子数来转换。核心的两个方法如下:
/**
* Convert an input dataset into its TreePoint representation,
* binning feature values in preparation for DecisionTree training.
* @param input Input dataset.
* @param splits Splits for features, of size (numFeatures, numSplits).
* @param metadata Learning and dataset metadata
* @return TreePoint dataset representation
*/
def convertToTreeRDD(
input: RDD[LabeledPoint],
splits: Array[Array[Split]],
metadata: DecisionTreeMetadata): RDD[TreePoint] = {
// Construct arrays for featureArity for efficiency in the inner loop.
// 构造一个数组,其长度是特征长度,元素的值是该特征对应的特征值的数量,如果是分类属性,直接获取元数据中的featureArity对应的值即可。如果是连续变量则获取该特征下切分点中对应的threshold,threshold就是表明如果值小于它则在树的左边,否则在树的右边
val featureArity: Array[Int] = new Array[Int](metadata.numFeatures)
var featureIndex = 0
while (featureIndex < metadata.numFeatures) {
featureArity(featureIndex) = metadata.featureArity.getOrElse(featureIndex, 0)
featureIndex += 1
}
// thresholds是一个二维数组,是指连续属性下threshold的数组
val thresholds: Array[Array[Double]] = featureArity.zipWithIndex.map { case (arity, idx) =>
if (arity == 0) {
splits(idx).map(_.asInstanceOf[ContinuousSplit].threshold)
} else {
Array.empty[Double]
}
}
// 根据分类属性的数量和连续属性的threshold数组来转换RDD成树形,也就是找出每个特征下每个元素对应的箱子的ID
input.map { x =>
TreePoint.labeledPointToTreePoint(x, thresholds, featureArity)
}
}
/**
* Convert one LabeledPoint into its TreePoint representation.
* 这个方法就是返回某个特征下某个数据点的箱子的ID
* @param thresholds For each feature, split thresholds for continuous features,
* empty for categorical features.
* @param featureArity Array indexed by feature, with value 0 for continuous and numCategories
* for categorical features.
*/
private def labeledPointToTreePoint(
labeledPoint: LabeledPoint,
thresholds: Array[Array[Double]],
featureArity: Array[Int]): TreePoint = {
val numFeatures = labeledPoint.features.size
val arr = new Array[Int](numFeatures)
var featureIndex = 0
while (featureIndex < numFeatures) {
arr(featureIndex) =
findBin(featureIndex, labeledPoint, featureArity(featureIndex), thresholds(featureIndex))
featureIndex += 1
}
new TreePoint(labeledPoint.label, arr)
}
/**
* Find discretized value for one (labeledPoint, feature).
* 寻找变量值对应的箱子ID,如果是连续变量,则在上述的threshold数组中找,否则在特征元中找
* NOTE: We cannot use Bucketizer since it handles split thresholds differently than the old
* (mllib) tree API. We want to maintain the same behavior as the old tree API.
*
* @param featureArity 0 for continuous features; number of categories for categorical features.
*/
private def findBin(
featureIndex: Int,
labeledPoint: LabeledPoint,
featureArity: Int,
thresholds: Array[Double]): Int = {
val featureValue = labeledPoint.features(featureIndex)
// 特征元数等于0表明是连续变量
if (featureArity == 0) {
val idx = java.util.Arrays.binarySearch(thresholds, featureValue)
if (idx >= 0) {
idx
} else {
-idx - 1
}
} else {
// 如果是分类属性,先判断这个值是否在范围内,如果不在报错,否则就把这个值转成整形返回
// Categorical feature bins are indexed by feature values.
if (featureValue < 0 || featureValue >= featureArity) {
throw new IllegalArgumentException(
s"DecisionTree given invalid data:" +
s" Feature $featureIndex is categorical with values in {0,...,${featureArity - 1}," +
s" but a data point gives it value $featureValue.\n" +
" Bad data point: " + labeledPoint.toString)
}
featureValue.toInt
}
}
六、装袋的RDD(RDD[BaggedPoint])
装袋的数据点是指为了实现bagging方法而设计的,决策树算法是用的一棵树的随机森林实现的,随机森林是bagging集成方法,所以用到了这个BaggedPoint。这个RDD就是根据要求,将数据抽样成几份。比如如果我用三棵树作为随机森林的参数,每棵树用90%的数据训练,那么这里就是把数据抽样成三份,每份数据占原来的数据的90%。当然,我们可以指定抽样过程是有放回的抽样还是无放回的抽样。该数据结构最后返回的属性有两个:
datum: Datum // 数据实例(Datum是泛型),是一个数据点的意思
subsampleWeights: Array[Double] //数据的份数,是个向量,其维度是抽样的份数,取值的结果表明当前数据点在对应样本中出现的次数,因此,这里向量中取值是自然数。
举个例子,假设原始数据中包含这样的一个RDD的数据点features=(2.0, 2.0, 3.0),抽样3次之后该数据点变成了BaggedPoint(datum=(2.0, 2.0, 3.0),subsampleWeights=[1, 0, 4]),这个含义是在最终的三份抽样结果的数据中,在第一份样本中,这个数据点被抽中了1次,在第二份样本中,这个数据点被抽中了0次,第三份样本中,这个数据点被抽中了4次。
其主要的转化原理如下:
/** 需要给定如下参数:
* input:输入的RDD数据集
* subsamplingRate:每个样本需要抽样的数据占原始数据集的比例
* numSubsamples:需要抽样的份数
* withReplacement:是否有放回的抽样
* seed:随机抽样的种子,对同一台机器来说,相同的种子会生成相同的随机数结果
*/
def convertToBaggedRDD[Datum] (
input: RDD[Datum],
subsamplingRate: Double,
numSubsamples: Int,
withReplacement: Boolean,
seed: Long = Utils.random.nextLong()): RDD[BaggedPoint[Datum]] = {
// 如果是有放回的抽样,那就直接抽样
if (withReplacement) {
convertToBaggedRDDSamplingWithReplacement(input, subsamplingRate, numSubsamples, seed)
} else {
// 无放回的抽样情况下,抽样比例为1的时候,抽样结果就是原数据
if (numSubsamples == 1 && subsamplingRate == 1.0) {
convertToBaggedRDDWithoutSampling(input)
} else {
// 无放回的抽样
convertToBaggedRDDSamplingWithoutReplacement(input, subsamplingRate, numSubsamples, seed)
}
}
}
// 无放回的抽样,无放回的抽样,每个样本只能抽样一次,因此,抽样份数subsampleWeights结果只能是1或者是0
// 这里用随机数来确定抽样的对象,循环所有的数据,每个数据循环numSubsamples次,每次产生一个随机数,如果该值小于抽样率,这个数据点就被抽中了。这是轮盘赌的原理。
private def convertToBaggedRDDSamplingWithoutReplacement[Datum] (
input: RDD[Datum],
subsamplingRate: Double,
numSubsamples: Int,
seed: Long): RDD[BaggedPoint[Datum]] = {
input.mapPartitionsWithIndex { (partitionIndex, instances) =>
// Use random seed = seed + partitionIndex + 1 to make generation reproducible.
val rng = new XORShiftRandom
rng.setSeed(seed + partitionIndex + 1)
instances.map { instance =>
val subsampleWeights = new Array[Double](numSubsamples)
var subsampleIndex = 0
while (subsampleIndex < numSubsamples) {
val x = rng.nextDouble()
subsampleWeights(subsampleIndex) = {
if (x < subsamplingRate) 1.0 else 0.0
}
subsampleIndex += 1
}
new BaggedPoint(instance, subsampleWeights)
}
}
}
// 有放回的抽样,用的是泊松分布来产生下一个随机数
private def convertToBaggedRDDSamplingWithReplacement[Datum] (
input: RDD[Datum],
subsample: Double,
numSubsamples: Int,
seed: Long): RDD[BaggedPoint[Datum]] = {
input.mapPartitionsWithIndex { (partitionIndex, instances) =>
// Use random seed = seed + partitionIndex + 1 to make generation reproducible.
val poisson = new PoissonDistribution(subsample)
poisson.reseedRandomGenerator(seed + partitionIndex + 1)
instances.map { instance =>
val subsampleWeights = new Array[Double](numSubsamples)
var subsampleIndex = 0
while (subsampleIndex < numSubsamples) {
subsampleWeights(subsampleIndex) = poisson.sample()
subsampleIndex += 1
}
new BaggedPoint(instance, subsampleWeights)
}
}
}
private def convertToBaggedRDDWithoutSampling[Datum] (
input: RDD[Datum]): RDD[BaggedPoint[Datum]] = {
input.map(datum => new BaggedPoint(datum, Array(1.0)))
}
七、节点(Node)
这里的节点是org.apache.spark.mllib.tree.model.Node的节点,是决策树中用到的节点实现类。其包含的属性有:
@Since("1.0.0") val id: Int, // 节点ID,从1开始,1是根节点,2、3是左右节点
@Since("1.0.0") var predict: Predict, // 该节点的预测结果
@Since("1.2.0") var impurity: Double, // 当前节点的不纯度
@Since("1.0.0") var isLeaf: Boolean, // 是否是叶子节点
@Since("1.0.0") var split: Option[Split], // 这是一个切分点,表明应该往左边节点走还是往右边节点走
@Since("1.0.0") var leftNode: Option[Node], // 左子节点
@Since("1.0.0") var rightNode: Option[Node],// 右子节点
@Since("1.0.0") var stats: Option[InformationGainStats] // 当前节点的信息增益
这个数据节点没什么好说的,主要是注意里面用了递归的方法预测结果,一直到最终叶子节点为止会给出预测结果。
八、学习节点(LearningNode)
这个节点是一个变量,方便我们修改节点状态的,它可能是一个内部节点,也可能是一个叶子节点。最终它可以被转成一个正常的节点。
欢迎大家关注DataLearner官方微信,接受最新的AI技术推送
