Spark难点 | Join的实现原理

2,562 阅读4分钟

Join背景

当前SparkSQL支持三种join算法:Shuffle Hash Join、Broadcast Hash Join以及Sort Merge Join。其中前两者归根到底都属于Hash Join,只不过载Hash Join之前需要先Shuffle还是先Broadcast。其实,Hash Join算法来自于传统数据库,而Shuffle和Broadcast是大数据在分布式情况下的概念,两者结合的产物。因此可以说,大数据的根就是传统数据库。Hash Join是内核。

Spark Join的分类和实现机制

file

上图是Spark Join的分类和使用。

Hash Join

先来看看这样一条SQL语句:select * from order,item where item.id = order.i_id,参与join的两张表是order和item,join key分别是item.id以及order.i_id。现在假设Join采用的是hash join算法,整个过程会经历三步:

  • 确定Build Table以及Probe Table:这个概念比较重要,Build Table会被构建成以join key为key的hash table,而Probe Table使用join key在这张hash table表中寻找符合条件的行,然后进行join链接。Build表和Probe表是Spark决定的。通常情况下,小表会被作为Build Table,较大的表会被作为Probe Table。
  • 构建Hash Table:依次读取Build Table(item)的数据,对于每一条数据根据Join Key(item.id)进行hash,hash到对应的bucket中(类似于HashMap的原理),最后会生成一张HashTable,HashTable会缓存在内存中,如果内存放不下会dump到磁盘中。
  • 匹配:生成Hash Table后,在依次扫描Probe Table(order)的数据,使用相同的hash函数(在spark中,实际上就是要使用相同的partitioner)在Hash Table中寻找hash(join key)相同的值,如果匹配成功就将两者join在一起。

file

Broadcast Hash Join

当Join的一张表很小的时候,使用broadcast hash join。

Broadcast Hash Join的条件有以下几个:

  • 被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的信息,默认是10M;
  • 基表不能被广播,比如left outer join时,只能广播右表。

file

broadcast hash join可以分为两步:

  • broadcast阶段:将小表广播到所有的executor上,广播的算法有很多,最简单的是先发给driver,driver再统一分发给所有的executor,要不就是基于bittorrete的p2p思路;
  • hash join阶段:在每个executor上执行 hash join,小表构建为hash table,大表的分区数据匹配hash table中的数据。
Sort Merge Join

file

当两个表都非常大时,SparkSQL采用了一种全新的方案来对表进行Join,即Sort Merge Join。这种方式不用将一侧数据全部加载后再进行hash join,但需要在join前将数据进行排序。

首先将两张表按照join key进行重新shuffle,保证join key值相同的记录会被分在相应的分区,分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接。可以看出,无论分区有多大,Sort Merge Join都不用把一侧的数据全部加载到内存中,而是即用即丢;因为两个序列都有有序的,从头遍历,碰到key相同的就输出,如果不同,左边小就继续取左边,反之取右边。从而大大提高了大数据量下sql join的稳定性。

整个过程分为三个步骤:

  • shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理
  • sort阶段:对单个分区节点的两表数据,分别进行排序
  • merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则继续取更小一边的key。

file

经过上文的分析,很明显可以得出这几种join的代价关系:cost(Broadcast Hash Join)< cost(Shuffle Hash Join) < cost(Sort Merge Join),数据仓库设计时最好避免大表与大表的join查询,SparkSQL也可以根据内存资源、带宽资源适量将参数spark.sql. autoBroadcastJoinThreshold调大,让更多join实际执行为Broadcast Hash Join。

声明:本号所有文章除特殊注明,都为原创,公众号读者拥有优先阅读权,未经作者本人允许不得转载,否则追究侵权责任。

关注我的公众号,后台回复【JAVAPDF】获取200页面试题!5万人关注的大数据成神之路,不来了解一下吗?5万人关注的大数据成神之路,真的不来了解一下吗?5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》

大数据技术与架构