sparkstreaming源码分析2从简单例子看DStream上的operation

作者: 云计算机网 分类: 云计算知识 发布时间: 2016-05-26 14:10

博客地址: http://blog.csdn.net/yueqian_zhu/

先贴一下上一节的例子

object NetworkWordCount {  def main(args: Array[String]) {    if (args.length < 2) {      System.err.println("Usage: NetworkWordCount <hostname> <port>")      System.exit(1)    }    StreamingExamples.setStreamingLogLevels()    // Create the context with a 1 second batch size    val sparkConf = new SparkConf().setAppName("NetworkWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(1))    // Create a socket stream on target ip:port and count the    // words in input stream of  delimited text (eg. generated by 'nc')    // Note that no duplication in storage level only for running locally.    // Replication necessary in distributed scenario for fault tolerance.    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)    val words = lines.flatMap(_.split(" "))    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)    wordCounts.print()    ssc.start()    ssc.awaitTermination()  }}
这一节学习一下Dstream上的operation部分

1、调用socketTextStream方法,返回一个ReceiverInputDStream类型。它继承与InputDStream,InputDStream又继承于DStream

(1)设置本身的InputDStream到DStreamGraph中

(2)获取streamId

2、调用flatMap方法,返回一个flatMappedDStream。

看一下FlatMappedDStream的成员

private[streaming]class FlatMappedDStream[T: ClassTag, U: ClassTag](    parent: DStream[T],    flatMapFunc: T => Traversable[U]  ) extends DStream[U](parent.ssc) {  override def dependencies: List[DStream[_]] = List(parent)  override def slideDuration: Duration = parent.slideDuration  override def compute(validTime: Time): Option[RDD[U]] = {    parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))  }}
与RDD的操作非常类似

dependencies:即调用flatmap操作的DStream,这里指ReceiverInputDStream

slideDuration:Dstream产生RDD的时间间隔,即批处理间隔

compute:根据参数得到一个RDD,继而在这个RDD上调用flatmap操作。flatmap操作的方法参数实际上附加到了RDD的身上。

2、调用map方法,其实是将map方法附加给了RDD。之后的reduceByKey同理。

3、调用print方法,它是一个输出操作。默认输出RDD的前10个元素。调用print方法得到一个ForEachDStream,并将这个ForEachDStream注册到DStreamGraph中。

至此,operation部分就结束了。此时,还没有真正执行起来,这需要调用StreamingContext的start方法才行。

  • 虚拟化整体采用率虽然看起来高,但是这个数字掩盖了另一个事实,即数据保护方面的顾虑阻碍了关键型应用程序的虚拟化,使这些公司在关键应用上的虚拟化程度比较低。

    Veeam Software指出了这个事实。

    Veeam产品战略高级总监Doug Hazelman表示:"几乎每家公司都有虚拟化行动,但是问题是他们的虚拟化程度大小。

    "

    Veeam调查了位于美国、英国、德国和法国的员工人数超过1000人的公司,访问了500位首席信息官,发现44%的首席信息官表示在一些任务关键型工作负荷上会避免采用虚拟化,因为他们担心虚拟化会影响备份和恢复。

    目前,许多公司只备份三分之二的(68%)的虚拟化数据。

    61%使用物理手段来进行备份和恢复的企业从现在开始会针对虚拟化改变他们的数据保护方式。

    Hazelman表示:"此次调查的目的是探寻人们在备份和数据恢复上的行为。

    我们的发现是,比较容易使用虚拟化的领域一般都采用虚拟化了,但是顶层的应用还没有虚拟化。

    这里面有一些数据保护方面的顾虑。

    "

    63%的回答者使用单一产品来同时备份他们的物理和虚拟服务器。

    在这种方式下,他们仍然视虚拟机为物理服务器,因而限制了虚拟化的使用程度。

    因此,这些企业在虚拟化的任务关键型工作负荷上没有得到应有的最佳的保护水平。

    Hazelman表示:"他们担心数据保护和数据恢复。

    如果你观察他们一贯的行为方式,你会发现这是不够的。

    他们在等待第三方解决方案来使数据保护更加成熟一些。

    "

    在被问到关于在虚拟环境中使用传统基于物理的备份工具的缺点时,超过一半的回答者(51%)指出这种方式太昂贵。

    除了这个,40%的回答者指出恢复速度太慢,还有40%的回答者指出缺点是需要安装软件代理。

    实际上,在使用传统备份工具的时候,IT部门指出标准恢复流程的执行(比如文件层次的恢复)变得更加复杂了。

    大部分回答者(66%)要么首先恢复整个虚拟机然后恢复单个文件,要么是保持两个备份,一个在系统层次上,一个在文件层次上。

    IT总监们也开始重新评估他们的数据保护方式。

    61%物理数据保护工具的使用者表示他们考虑针对虚拟化改变数据保护方式,59%的人计划部署专门的虚拟化解决方案来处理他们的虚拟服务器。

    他们这么做的原因很清楚:更快的恢复速度(63%),更快的备份速度(56%),更低的成本(54%)。#p#分页标题#e#

    Hazelman表示:"对VAR(增值分销商)而言,这是一个很大的机遇。

    他们应该在这方面教育他们的顾客。

    他们需要寻找虚拟领域内最佳的解决方案,无论是平台本身还是支持虚拟化的生态系统。

    对于像我们这样的独立软件厂商而言,我们的任务就是开发更好的产品。

    "

    (责任编辑:admin)

  • 相关推荐:

  • 存储分析:数据保护影响
  • 简化部署戴尔虚拟化解决
  • 深入分析:虚拟化技术已
  • 虚拟化分析:从概念到应
  • 云计算服务火热服务器未
  • 大数据处理分析的六大工
  • 2016年中国服务器行业发
  • 5.28黄金走势分析 操作建
  • 通常电脑故障排除的方法
  • 电脑开机自动断电原因分
  • 网站内容禁止违规转载,转载授权联系中国云计算网