Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。
注意:
(1)只有Key-Value类型的pairRDD才有分区器,非Key-Value类型的RDD分区的值是None。
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
一、获取RDD分区
代码实现
package com.bigdata.partitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import java.util.Arrays;
public class Test01_Partitioner {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaPairRDD
// 获取分区器
Optional
// Optional.empty 通过集合创建的rdd默认不带分区器
System.out.println(partitioner);
JavaPairRDD
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 获取分区器
Optional
// Optional[org.apache.spark.HashPartitioner@2] 走完shuffle之后,变为Hash分区器
System.out.println(partitioner1);
// 4. 关闭sc
sc.stop();
}
}
二、Hash分区
HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。
按照单词首字母的hashCode值%分区个数。
A 万条数据
B 1万条
C 1万条
D 万条数据
E 1万条
F 1万条
G 万条数据
假设有3个分区(0、1、2)
A 万条数据
0分区 D 万条数据
G 万条数据
1分区 B 1万条
E 1万条
2分区 C 1万条
F 1万条
三、Ranger分区
RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。
实现过程为:
第一步:先从整个RDD中采用水塘抽样算法,抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;
第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的。
1)我们假设有万条数据要分4个区
2)从万条中抽个数(1,2,3, ….. )
3)对个数进行排序,然后均匀的分为4段
4)获取万条数据,每个值与4个分区的范围比较,放入合适分区
四、分区规则
从集合创建RDD
代码验证
package com.bigdata.partition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
public class Test01_ListPartition {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
// 默认环境的核数
// 可以手动填写参数控制分区的个数
JavaRDD
// 数据分区的情况
// 0 => 1, => 3,4,5
// 利用整数除机制 左闭右开
// 0 => start 0*5/2 end 1*5/2
// 1 => start 1*5/2 end 2*5/2
stringRDD.saveAsTextFile("output");
// 4. 关闭sc
sc.stop();
}
}
从文件创建RDD
分区测试
package com.bigdata.partition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.List;
public class Test02_FilePartition {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
// 默认填写的最小分区数 2和环境的核数取小的值 一般为2
JavaRDD
// 具体的分区个数需要经过公式计算
// 首先获取文件的总长度 totalSize
// 计算平均长度 goalSize = totalSize / numSplits
// 获取块大小 128M
// 计算切分大小 splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
// 最后使用splitSize 按照倍原则切分整个文件 得到几个分区就是几个分区
// 实际开发中 只需要看文件总大小 / 填写的分区数 和块大小比较 谁小拿谁进行切分
lineRDD.saveAsTextFile("output");
// 数据会分配到哪个分区
// 如果切分的位置位于一行的中间 会在当前分区读完一整行数据
// 0 -> 1, -> -> -> 空
// 4. 关闭sc
sc.stop();
}
}
分区源码
注意:getSplits文件返回的是切片规划,真正读取是在compute方法中创建LineRecordReader读取的,有两个关键变量:start = split.getStart() end = start + split.getLength
①分区数量的计算方式:
totalSize =
goalSize = / 3 = 3(byte) 表示每个分区存储3字节的数据
分区数= totalSize/ goalSize = /3 => 3,3,4
4子节大于3子节的倍,符合hadoop切片倍的策略,因此会多创建一个分区,即一共有4个分区 3,3,3,1
②Spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,跟字节数没有关系
③数据读取位置计算是以偏移量为单位来进行计算的。
④数据分区的偏移量范围的计算
0 => [0,3] 1@@ => 1,2
1 => [3,6] 2@@ => 3
2 => [6,9] 3@@ => 4
3 => [9,9] => 无
五、总结
以上就是Spark的数据分区和分区规则,HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。RangePartitioner分区一定程度上解决了数据偏移问题,但是速度相对慢。