sparkcore源码分析8从简单例子看transformation

作者: 云计算机网 分类: 云计算知识 发布时间: 2016-06-08 07:46

前面提到过spark自带的一个最简单的例子,也介绍了SparkContext的部分,这节介绍剩余的内容中的transformation。

object SparkPi {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("Spark Pi")    val spark = new SparkContext(conf)    val slices = if (args.length > 0) args(0).toInt else 2    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow    val count = spark.parallelize(1 until n, slices).map { i =>      val x = random * 2 - 1      val y = random * 2 - 1      if (x*x + y*y < 1) 1 else 0    }.reduce(_ + _)    println("Pi is roughly " + 4.0 * count / n)    spark.stop()  }}
调用SparkContext的parallelize方法。此方法在一个已经存在的Scala集合上创建出一个可以被并行操作的分布式数据集,也就是返回一个RDD。

大家都在书面化的说RDD是什么,我还不如用源码的方式展示更为直观。可以看出来,最基本的RDD其实就是sparkContext和它本身的依赖。

abstract class RDD[T: ClassTag](    @transient private var _sc: SparkContext,    @transient private var deps: Seq[Dependency[_]]  ) extends Serializable with Logging {  ...  ...}
我们顺着看parallelize方法的内容。seq是一个scala集合类型。numSlices是设置的并行度,有默认值(配置项或者driver获得的Executor注册上来的总的core)

override def defaultParallelism(): Int = {  conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))}

def parallelize[T: ClassTag](      seq: Seq[T],      numSlices: Int = defaultParallelism): RDD[T] = withScope {    assertNotStopped()    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())  }

我们继续看ParallelCollectionRDD的内部。它重写了RDD的一些方法,并设置依赖为Nil

private[spark] class ParallelCollectionRDD[T: ClassTag](    @transient sc: SparkContext,    @transient data: Seq[T],    numSlices: Int,    locationPrefs: Map[Int, Seq[String]])    extends RDD[T](sc, Nil) {  // TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets  // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split  // instead.  // UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal.  override def getPartitions: Array[Partition] = {    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray  }  override def compute(s: Partition, context: TaskContext): Iterator[T] = {    new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)  }  override def getPreferredLocations(s: Partition): Seq[String] = {    locationPrefs.getOrElse(s.index, Nil)  }}
到这里,parallelize的操作就结束了。这其实就是人们常说的spark Transformation,并没有触发任务的调度。
  • 虚拟化整体采用率虽然看起来高,但是这个数字掩盖了另一个事实,即数据保护方面的顾虑阻碍了关键型应用程序的虚拟化,使这些公司在关键应用上的虚拟化程度比较低。

    Veeam Software指出了这个事实。

    Veeam产品战略高级总监Doug Hazelman表示:"几乎每家公司都有虚拟化行动,但是问题是他们的虚拟化程度大小。

    "

    Veeam调查了位于美国、英国、德国和法国的员工人数超过1000人的公司,访问了500位首席信息官,发现44%的首席信息官表示在一些任务关键型工作负荷上会避免采用虚拟化,因为他们担心虚拟化会影响备份和恢复。

    目前,许多公司只备份三分之二的(68%)的虚拟化数据。

    61%使用物理手段来进行备份和恢复的企业从现在开始会针对虚拟化改变他们的数据保护方式。

    Hazelman表示:"此次调查的目的是探寻人们在备份和数据恢复上的行为。

    我们的发现是,比较容易使用虚拟化的领域一般都采用虚拟化了,但是顶层的应用还没有虚拟化。

    这里面有一些数据保护方面的顾虑。

    "

    63%的回答者使用单一产品来同时备份他们的物理和虚拟服务器。

    在这种方式下,他们仍然视虚拟机为物理服务器,因而限制了虚拟化的使用程度。

    因此,这些企业在虚拟化的任务关键型工作负荷上没有得到应有的最佳的保护水平。

    Hazelman表示:"他们担心数据保护和数据恢复。

    如果你观察他们一贯的行为方式,你会发现这是不够的。

    他们在等待第三方解决方案来使数据保护更加成熟一些。

    "

    在被问到关于在虚拟环境中使用传统基于物理的备份工具的缺点时,超过一半的回答者(51%)指出这种方式太昂贵。

    除了这个,40%的回答者指出恢复速度太慢,还有40%的回答者指出缺点是需要安装软件代理。

    实际上,在使用传统备份工具的时候,IT部门指出标准恢复流程的执行(比如文件层次的恢复)变得更加复杂了。

    大部分回答者(66%)要么首先恢复整个虚拟机然后恢复单个文件,要么是保持两个备份,一个在系统层次上,一个在文件层次上。

    IT总监们也开始重新评估他们的数据保护方式。

    61%物理数据保护工具的使用者表示他们考虑针对虚拟化改变数据保护方式,59%的人计划部署专门的虚拟化解决方案来处理他们的虚拟服务器。

    他们这么做的原因很清楚:更快的恢复速度(63%),更快的备份速度(56%),更低的成本(54%)。#p#分页标题#e#

    Hazelman表示:"对VAR(增值分销商)而言,这是一个很大的机遇。

    他们应该在这方面教育他们的顾客。

    他们需要寻找虚拟领域内最佳的解决方案,无论是平台本身还是支持虚拟化的生态系统。

    对于像我们这样的独立软件厂商而言,我们的任务就是开发更好的产品。

    "

    (责任编辑:admin)

  • 相关推荐:

  • 存储分析:数据保护影响
  • 简化部署戴尔虚拟化解决
  • 深入分析:虚拟化技术已
  • 虚拟化分析:从概念到应
  • 云计算服务火热服务器未
  • 大数据处理分析的六大工
  • 2016年中国服务器行业发
  • 5.28黄金走势分析 操作建
  • 通常电脑故障排除的方法
  • 电脑开机自动断电原因分
  • 网站内容禁止违规转载,转载授权联系中国云计算网