spark 分区与分组的关系 spark按照key进行分区

百度一面之谈谈Spark RDD数据分区和分区规则

Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。

注意:

(1)只有Key-Value类型的pairRDD才有分区器,非Key-Value类型的RDD分区的值是None。

(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

一、获取RDD分区

代码实现

package com.bigdata.partitioner;

import org.apache.spark.Partitioner;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.Optional;

import org.apache.spark.api.java.function.Function2;

import scala.Tuple2;

import java.util.Arrays;

public class Test01_Partitioner {

public static void main(String[] args) {

// 1.创建配置对象

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

// 2. 创建sparkContext

JavaSparkContext sc = new JavaSparkContext(conf);

// 3. 编写代码

JavaPairRDD tupleRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("s", 1), new Tuple2<>("a", 3), new Tuple2<>("d", 2)));

// 获取分区器

Optional partitioner = tupleRDD.partitioner();

// Optional.empty 通过集合创建的rdd默认不带分区器

System.out.println(partitioner);

JavaPairRDD reduceByKeyRDD = tupleRDD.reduceByKey(new Function2() {

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1 + v2;

}

});

// 获取分区器

Optional partitioner1 = reduceByKeyRDD.partitioner();

// Optional[org.apache.spark.HashPartitioner@2] 走完shuffle之后,变为Hash分区器

System.out.println(partitioner1);

// 4. 关闭sc

sc.stop();

}

}

二、Hash分区

HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。

按照单词首字母的hashCode值%分区个数。

A 万条数据

B 1万条

C 1万条

D 万条数据

E 1万条

F 1万条

G 万条数据

假设有3个分区(0、1、2)

A 万条数据

0分区 D 万条数据

G 万条数据

1分区 B 1万条

E 1万条

2分区 C 1万条

F 1万条

三、Ranger分区

RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。

实现过程为:

第一步:先从整个RDD中采用水塘抽样算法,抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;

第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的。

1)我们假设有万条数据要分4个区

2)从万条中抽个数(1,2,3, ….. )

3)对个数进行排序,然后均匀的分为4段

4)获取万条数据,每个值与4个分区的范围比较,放入合适分区

四、分区规则

从集合创建RDD

代码验证

package com.bigdata.partition;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;

public class Test01_ListPartition {

public static void main(String[] args) {

// 1.创建配置对象

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

// 2. 创建sparkContext

JavaSparkContext sc = new JavaSparkContext(conf);

// 3. 编写代码

// 默认环境的核数

// 可以手动填写参数控制分区的个数

JavaRDD stringRDD = sc.parallelize(Arrays.asList("hello", "spark", "hello", "spark", "hello"),2);

// 数据分区的情况

// 0 => 1, => 3,4,5

// 利用整数除机制 左闭右开

// 0 => start 0*5/2 end 1*5/2

// 1 => start 1*5/2 end 2*5/2

stringRDD.saveAsTextFile("output");

// 4. 关闭sc

sc.stop();

}

}

从文件创建RDD

分区测试

package com.bigdata.partition;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import java.util.List;

public class Test02_FilePartition {

public static void main(String[] args) {

// 1.创建配置对象

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

// 2. 创建sparkContext

JavaSparkContext sc = new JavaSparkContext(conf);

// 3. 编写代码

// 默认填写的最小分区数 2和环境的核数取小的值 一般为2

JavaRDD lineRDD = sc.textFile("input/1.txt");

// 具体的分区个数需要经过公式计算

// 首先获取文件的总长度 totalSize

// 计算平均长度 goalSize = totalSize / numSplits

// 获取块大小 128M

// 计算切分大小 splitSize = Math.max(minSize, Math.min(goalSize, blockSize));

// 最后使用splitSize 按照倍原则切分整个文件 得到几个分区就是几个分区

// 实际开发中 只需要看文件总大小 / 填写的分区数 和块大小比较 谁小拿谁进行切分

lineRDD.saveAsTextFile("output");

// 数据会分配到哪个分区

// 如果切分的位置位于一行的中间 会在当前分区读完一整行数据

// 0 -> 1, -> -> -> 空

// 4. 关闭sc

sc.stop();

}

}

分区源码

注意:getSplits文件返回的是切片规划,真正读取是在compute方法中创建LineRecordReader读取的,有两个关键变量:start = split.getStart() end = start + split.getLength

①分区数量的计算方式:

totalSize =

goalSize = / 3 = 3(byte) 表示每个分区存储3字节的数据

分区数= totalSize/ goalSize = /3 => 3,3,4

4子节大于3子节的倍,符合hadoop切片倍的策略,因此会多创建一个分区,即一共有4个分区 3,3,3,1

②Spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,跟字节数没有关系

③数据读取位置计算是以偏移量为单位来进行计算的。

④数据分区的偏移量范围的计算

0 => [0,3] 1@@ => 1,2

1 => [3,6] 2@@ => 3

2 => [6,9] 3@@ => 4

3 => [9,9] => 无

五、总结

以上就是Spark的数据分区和分区规则,HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。RangePartitioner分区一定程度上解决了数据偏移问题,但是速度相对慢。

原文链接:,转发请注明来源!