博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark RDD弹性分布数据集详解
阅读量:3960 次
发布时间:2019-05-24

本文共 3723 字,大约阅读时间需要 12 分钟。

RDD简介

RDD是Spark提供的最重要的抽象概念,我们可以将RDD理解为一个分布式存储在集群中的大型数据集合,不同RDD之间可以通过转换操作形成依赖关系实现管道化,从而避免了中间结果的I/O操作,提高数据处理的速度和性能。

RDD(Resilient Distributed Dataset),即弹性分布式数据集,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并且还能控制数据的分区。

RDD创建方式

方式一:从系统加载数据创建RDD

Spark可以从Hadoop支持的任何存储源中加载数据去创建RDD,包括本地文件系统和HDFS等文件系统。我们通过Spark中的SparkContext对象调用textFile()方法加载数据创建RDD。

val lineRDD = sc.textFile("f:/person.txt")

在这里插入图片描述

方式二:通过并行集合创建RDD

Spark可以通过并行集合创建RDD。即从一个已经存在的集合、数组上,通过SparkContext对象调用parallelize()方法创建RDD。

val arr = Array(1,3,5,7,9)val arrRDD=sc.parallelize(arr)

在这里插入图片描述

RDD的处理过程

Spark用Scala语言实现了RDD的API,可以通过调用API对RDD进行操作处理。RDD经过一系列的“转换”操作,每一次转换都会产生不同的RDD,以供给下一次“转换”操作使用,直到最后一个RDD经过“行动”操作才会被真正计算处理,并输出到外部数据源中,若是中间的数据结果需要复用,则可以进行缓存处理,将数据缓存到内存中。

在这里插入图片描述
转换算子

RDD处理过程中的“转换”操作主要用于根据已有RDD创建新的RDD,每一次通过Transformation算子计算后都会返回一个新RDD,供给下一个转换算子使用。下面,通过一张表来列举一些常用转换算子操作的API,具体如下。

转换算子 相关说明
filter(func) 筛选出满足函数func的元素,并返回一个新的数据集
map(func) 将每个元素传递到函数func中,返回的结果是一个新的数据集
flatMap(func) 与map()相似,但是每个输入的元素都可以映射到0或者多个输出结果
groupByKey() 应用于(Key,Value)键值对的数据集时,返回一个新的(Key,Iterable )形式的数据集
reduceByKey(func) 应用于(Key,Value)键值对的数据集时,返回一个新的(Key,Value)形式的数据集。其中,每个Value值是将每个Key键传递到函数func中进行聚合后的结果

行动算子

行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发真正的计算。下面,通过一张表来列举一些常用行动算子操作的API,具体如下。

转换算子 相关说明
count() 返回数据集中的元素个数
first() 返回数组的第一个元素
take(n) 以数组的形式返回数组集中的前n个元素
reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
collect() 以数组的形式返回数据集中的所有元素
foreach(func) 将数据集中的每个元素传递到函数func中运行

RDD的分区

分区的作用:

在分布式程序中,网络通信的开销是很大的,因此控制数据分布以获得最少的网络传输可以极大的提升程序的整体性能,Spark程序可以通过控制RDD分区方式来减少通信开销。Spark中所有的RDD都可以进行分区,系统会根据一个针对键的函数对元素进行分区。虽然Spark不能控制每个键具体划分到哪个节点上,但是可以确保相同的键出现在同一个分区上。
分区的方式:
Spark框架为RDD提供了两种分区方式,分别是哈希分区(HashPartitioner)和范围分区(RangePartitioner)。其中,哈希分区是根据哈希值进行分区;范围分区是将一定范围的数据映射到一个分区中。这两种分区方式已经可以满足大多数应用场景的需求。与此同时,Spark也支持自定义分区方式,即通过一个自定义的Partitioner对象来控制RDD的分区,从而进一步减少通信开销。

RDD依赖关系

窄依赖

  • 窄依赖是指父RDD的每一个分区最多被一个子RDD的分区使用,即OneToOneDependencies。
  • 窄依赖的表现一般分为两类,第一类表现为一个父RDD的分区对应于一个子RDD的分区;第二类表现为多个父RDD的分区对应于一个子RDD的分区。
  • 一个父RDD的一个分区不可能对应一个子RDD的多个分区。

RDD做map、filter和union算子操作时,是属于窄依赖的第一类表现;而RDD做join算子操作(对输入进行协同划分)时,是属于窄依赖表现的第二类。输入协同划分是指多个父RDD的某一个分区的所有Key,被划分到子RDD的同一分区。当子RDD做算子操作,因为某个分区操作失败导致数据丢失时,只需要重新对父RDD中对应的分区做算子操作即可恢复数据。

宽依赖

宽依赖是指子RDD的每一个分区都会使用所有父RDD的所有分区或多个分区,即OneToManyDependecies。父RDD做groupByKey和join(输入未协同划分)算子操作时,子RDD的每一个分区都会依赖于所有父RDD的所有分区。当子RDD做算子操作,因为某个分区操作失败导致数据丢失时,则需要重新对父RDD中的所有分区进行算子操作才能恢复数据。

RDD的机制

持久化机制

在Spark中,RDD是采用惰性求值,即每次调用行动算子操作,都会从头开始计算,这对迭代计算来说代价很大,因为迭代计算经常需要多次重复的使用同一组数据集,所以,为了避免重复计算的开销,让Spark对数据集进行持久化操作。

RDD的持久化操作有两种方法,分别是cache()方法和persist()方法。

  • persist()方法的存储级别是通过StorageLevel对象设置的。
  • cache()方法的存储级别是使用默认的存储级别(即StorageLevel.MEMORY_ONLY)。

容错机制

当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式,分别是血统(Lineage)方式和设置检查点(checkpoint)方式。

  1. 血统方式
    • 根据RDD之间依赖关系对丢失数据的RDD进行数据恢复。若丢失数据的子RDD进行窄依赖运算,则只需要把丢失数据的父RDD的对应分区进行重新计算,不依赖其他节点,并且在计算过程中不存在冗余计算;若丢失数据的RDD进行宽依赖运算,则需要父RDD所有分区都要进行从头到尾计算,计算过程中存在冗余计算。
  2. 设置检查点方式
    1. 本质是将RDD写入磁盘存储。当RDD进行宽依赖运算时,只要在中间阶段设置一个检查点进行容错,即Spark中的sparkContext调用setCheckpoint()方法,设置容错文件系统目录作为检查点checkpoint,将checkpoint的数据写入之前设置的容错文件系统中进行持久化存储,若后面有节点宕机导致分区数据丢失,则以从做检查点的RDD开始重新计算,不需要从头到尾的计算,从而减少开销。

Spark的任务调度

DGA

DAG(Directed Acyclic Graph)叫做有向无环图,Spark中的RDD通过一系列的转换算子操作和行动算子操作形成了一个DAG。DAG是一种非常重要的图论数据结构。如果一个有向图无法从任意顶点出发经过若干条边回到该点,则这个图就是有向无环图。

根据RDD之间依赖关系的不同可将DAG划分成不同的Stage(调度阶段)。对窄依赖来说,RDD分区的转换处理是在一个线程里完成,所以窄依赖会被Spark划分到同一个Stage中;而对宽依赖来说,由于有Shuffle存在,所以只能在父RDD处理完成后,下一个Stage才能开始接下来的计算,因此宽依赖是划分Stage的依据,当RDD进行转换操作,遇到宽依赖类型的转换操作时,就划为一个Stage。

RDD在Spark中的运行流程

  1. RDD Objects:当RDD对象创建后,SparkContext会根据RDD对象构建DAG有向无环图,然后将Task提交给DAGScheduler。
  2. DAGScheduler:将作业的DAG划分成不同Stage,每个Stage都是TaskSet任务集合,并以TaskSet为单位提交给TaskScheduler。
  3. TaskScheduler:通过TaskSetManager管理Task,并通过集群中的资源管理器把Task发给集群中Worker的Executor。
  4. Worker:Spark集群中的Worker接收到Task后,把Task运行在Executor进程中,一个进程中可以有多个线程在工作,从而可以处理多个数据分区。

Spark WordCount单词统计

转载地址:http://tgmzi.baihongyu.com/

你可能感兴趣的文章