专注大数据培训
我们一直在领跑

一文吃透数据倾斜

前言

本文是介绍的是开发spark极其核心的地方,可以说懂得解决spark数据倾斜是区分一个spark工程师是否足够专业的标准,在面试中以及实际开发中,几乎天天面临的都是这个问题。

原理以及现象

先来解释一下,出现什么现象的时候我们认定他为数据倾斜,以及他数据倾斜发生的原理是什么?

比如一个spark任务中,绝多数task任务运行速度很快,但是就是有那么几个task任务运行极其缓慢,慢慢的可能就接着报内存溢出的问题了,那么这个时候我们就可以认定他是数据倾斜了。

接下来说一下发生数据倾斜的底层理论,其实可以非常肯定的说,数据倾斜就是发生在shuffle类的算子中,在进行shuffle的时候,必须将各个节点的相同的key拉到某个节点上的一个task来进行处理,比如按照key进行聚合和join操作等,这个时候其中某一个key数量量特别大,于是就发生了数据倾斜了。

数据倾斜示意图

分组聚合

分组聚合逻辑中,需要把相同key的数据发往下游同一个task,如果某个或某几个key的数量特别大,则会导致下游的某个或某几个task所要处理的数据量特别大,也就是要处理的任务负载特别大

JOIN计算

join计算中,A表和B表中相同key的数据,需要发往下游同一个task,如果A表中或B表中,某个key或某几个key的数量特别大,则会导致下游的某个或某几个task所要处理的数据量特别大,也就是要处理的任务负载特别大

定位数据倾斜的代码

上面我们知道了数据倾斜的底层原理,那么就好定位代码了,所以我就可以改写这段代码,让spark任务来正常运行了。

我们知道了导致数据倾斜的问题就是shuffle算子,所以我们先去找到代码中的shuffle的算子,比如distinct、groupBYkey、reduceBykey、aggergateBykey、join、cogroup、repartition等,那么问题一定就出现在这里。

找到shuffle类的算子之后,我们知道一个application分为job,那么一个job又划分为多个stage,stage的划分就是根据shuffle类的算子,也可以说是宽依赖来划分的,所以这个时候我们在spark UI界面上点击查看stage,如下图:

可以看到94这一行和91这一行,执行时间明显比其他的执行时间要长太多了,我们就可以肯定一定是这里发生了数据倾斜,然后我们就找到了发生数据倾斜的stage了,然后根据stage划分原理,我们就可以推算出来发生倾斜的那个stage对应的代码中的哪一部分了。

这个时候我们找到了数据倾斜发生的地方了,但是我们还需要知道到底是哪个key数据量特别大导致的数据倾斜,于是接下来来聊一聊这个问题。

找到这个key的算法,我们可以使用采样的方式,对,就是当初虐了我们千百遍的概率论与数理统计的课上讲的采样算法。

代码如下:

val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

现在我来简单说一下他的原理,他就是从所有key中,把其中每一个key随机取出来一部分,然后进行一个百分比的推算,学过采样算法的都知道,这是用局部取推算整体,虽然有点不准确,但是在整体概率上来说,我们只需要大概之久可以定位那个最多的key了。

解决数据倾斜的方案

上面我们聊了数据倾斜发生的原理以及如何定位是哪个key发生了数据倾斜,这个时候我们就开始着手解决这个问题了,我把分为七中解决方案,每个方案都有对应的情况,读者可以针对自己的情况来灵活运用。

解决方案一: 提高shuffle 的并行度。

他的原理很简单,我们知道在rduceBykey中有一个shuffle read task的值默认为200,也就是说用两百个task来处理任务,对于我们一个很大的集群来说,每个task的任务中需要处理的key也是比较多的,这个时候我们把这个数量给提高以爱,比如我么设置reduceBYkey(1000),这个时候task的数量就多了,然后分配到每个task中的key就少了,于是说并行度就提高了。但是总体来说,这种解决办法对于某一个数量特别大的key来说效果甚为,只能说key多的时候,我们可以有一定的程度上环境数据倾斜的问题,所以这种方法也不是我们要找到的最好的办法,他也是有一定的局限性。

任重而道远,我们还需要继续寻找。

解决方案二:两阶段聚合(局部聚合+全局聚合)

用这个方法就可以解决大部分聚合运算场景的数据倾斜。

假如我们有一个rdd,他的其中某一个key数量比较大,我们要进行shuffle的时候,速度比较慢。

比如这个key就是hello,他的条数已经有1万条。

单一的进行shuffle肯定是耗时非常长。所以我们给他打上10以内的随机前缀,例如下面这种形式。

0_hello,1
1_hello,1
2_hello,1
3_hello,1
0_hello,1
2_hello,1
3_hello,1
.....

然后这个时候进行局部的预聚合,比如reduceBykey,于是经过局部的聚合后,我们得到了下面这种

0_hello,3000
1_hello,2000
2_hello,2500
3_hello,2500 

然后在去掉之前加的随机前缀,在进行聚合,reduceBykey

hello,10000

于是通过这种把key进行拆分的方式,我们把key分配给了一些task去执行任务,经过实验数据表明,这种方法可以提高数倍效率,不知道您看明白了没?

这里我给出具体的一些代码,可以参考:

object WordCountAggTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(conf)
    val array = Array("you you","you you","you you",
      "you you",
      "you you",
      "you you",
      "you you",
      "jump jump")
    val rdd = sc.parallelize(array,8)
    rdd.flatMap( line => line.split(" "))
      .map(word =>{
        val prefix = (new util.Random).nextInt(3)
        (prefix+"_"+word,1)
      }).reduceByKey(_+_)
       .map( wc =>{
         val newWord=wc._1.split("_")(1)
         val count=wc._2
         (newWord,count)
       }).reduceByKey(_+_)
      .foreach( wc =>{
        println("单词:"+wc._1 + " 次数:"+wc._2)
      })

  }
}

当然了,这种方法也是有局限性的,他适用于聚合类的shuffle操作,如果对于join操作,还是不行的,所以我们接着探究更深入的方法。

解决方案三:将reduce join转整map join

这种方法是有假定的前提的条件的,比如有两个rdd进行join操作,其中一个rdd的数据量不是很大,比如低于1个G的情况。

具体操作是就是选择两个rdd中那个比较数据量小的,然后我们把它拉到driver端,再然后通过广播变量的方式给他广播出去,这个时候再进行join 的话,因为数据都是在同一Executor中,所以shuffle 中不会有数据的传输,也就避免了数据倾斜,所以这种方式很好。

给出参考代码:

object MapJoinTest {
 
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(conf)
    val lista=Array(
      Tuple2("001","令狐冲"),
      Tuple2("002","任盈盈")
    )
     //数据量小一点
    val listb=Array(
      Tuple2("001","一班"),
      Tuple2("002","二班")
    )
    val listaRDD = sc.parallelize(lista)
    val listbRDD = sc.parallelize(listb)
    //val result: RDD[(String, (String, String))] = listaRDD.join(listbRDD)
     //设置广播变量
    val listbBoradcast = sc.broadcast(listbRDD.collect())
    listaRDD.map(  tuple =>{
      val key = tuple._1
      val name = tuple._2
      val map = listbBoradcast.value.toMap
      val className = map.get(key)
      (key,(name,className))
    }).foreach( tuple =>{
      println("班级号"+tuple._1 + " 姓名:"+tuple._2._1 + " 班级名:"+tuple._2._2.get)
    })
  }
}

当然了,这种方法也是有缺陷的,比如两个rdd都非常大,比如超过了10个G,这个时候我们就不能用这种方法了,因为数据量太大了,广播变量还是需要太大的消耗,我们还需要继续探索更深层次的解决办法。于是就有下面这种不可思议的解决办法。

解决方案四:采样倾斜key并分拆join操作

注意:再讲解本方法之前,我还需要说明一点,如果您没看懂就多看一次,因为这种法方法太不可思议了,真的要明白还是需要一点想想力的,我尽量把它表达的明白些。

在方法五中,我们通过广播的方式可以解决大小表join(小表低于5G以下)的操作,但是这个方法是解决超过10个G以上两个大表JOIN,比如50个G的数据都可以。

实现思路:

1.对包含少数几个数据量过大的key的那个Rdd,通过sample算子采样出一份样本来,然后统计以下每个key的数量,计算出来数据量最大的是哪几个key。

2.然后将这几个key对应的数据从原来的rdd中拆分出来,形成一个单独的rdd,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个rdd。

3.接着将需要join的另外一个rdd,也过滤出来那几个倾斜的key对应的数据并形成一个单独的rdd,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个rdd。

4.再将附加了随机前缀的独立rdd与另外一个膨胀n倍的独立rdd进行join,此时就可以将原先相同的key打算成n份,分散到多个task中去进行join了。

5.而另外两个普通的rdd就照常join即可。

6.最后将两次join的结果使用union算子合并起来即可,就是最终join结果。

不知道您看懂了没有?确实比较复杂,所以,我体贴地给你画了个图,你仔细品,没品出来找涛哥

总结

数据倾斜是在shuffle中产生的,shuffle过程中造成了下游task的数据任务不均衡

数据倾斜常见的场景就是那些需要shuffle的场景,比如reduce,reducebykey,join,groupbykey…..

对于数据倾斜的通用做法是,增大shuffle的并行度,即增加下游task数量,此法不一定能解决,什么情况可以解决,什么情况不可以解决?细品,没品出来找涛哥

上面通用做法解决不了的时候,下面的做法就很牛逼了

1,对于聚合运算场景,那就分两阶段聚合

2,对于join场景,如果是大小表,则使用map端join思想;如果两表都很大,则仔细回到前文品“分拆join大法”

只要灵活运用上述思想,基本上能解决任何数据倾斜场景。文中代码示例是用spark,但是对于mapreduce,也是一样的思想;而至于hive,不用单独讨论,因为hive底层要么就是mapreduce,要么就是spark

欢迎分享,转载有奖:多易教育 » 一文吃透数据倾斜