hadoop之MapReduce框架心跳机制分析

作者: 云计算机网 分类: 云计算知识 发布时间: 2016-05-19 20:34

1、概述

MapReduce框架中的master/slave心跳机制是整个集群运作的基础,是沟通TaskTracker和JobTracker的桥梁。TaskTracker周期性地调用心跳RPC函数,汇报节点和任务运行状态信息。MapReduce框架中通过心跳机制可以实现给TaskTracker分配任务、使JobTracker能够及时获取各个节点的资源使用情况和任务运行状态信息、判断TaskTracker的死活。本文主要从JobTracker和TaskTracker通信双方的角度分别去分析他们之间的心跳通信机制。

2、TaskTracker端心跳机制

JobTracker和TaskTracker之前的心跳模式采取了“拉”的方式,JobTracker不会主动向各个TaskTracker发送心跳信息,而是各个TaskTracker主动向JobTracker发送信息,同时领取JobTracker返回心跳包的各种命令。

TaskTracker中有一个run方法,其维护了一个无限循环用于通过心跳发送任务运行状态信息和接收JobTracker通过心跳返回的命令信息。其代码结构大概如下:

/**   * The server retry loop.   * This while-loop attempts to connect to the JobTracker.  It only   * loops when the old TaskTracker has gone bad (its state is   * stale somehow) and we need to reinitialize everything.   */  public void run() {    try {      getUserLogManager().start();      startCleanupThreads();      boolean denied = false;      while (running && !shuttingDown && !denied) {        boolean staleState = false;        try {          // This while-loop attempts reconnects if we get network errors          while (running && !staleState && !shuttingDown && !denied) {            try {              State osState = offerService();              if (osState == State.STALE) {                staleState = true;              } else if (osState == State.DENIED) {                denied = true;              }....}

其中,用于处理心跳相关信息的服务函数offerService代码大体框架:

/**   * Main service loop.  Will stay in this loop forever.   */  State offerService() throws Exception {    long lastHeartbeat = System.currentTimeMillis();//上一次发心跳距现在时间   ////此循环主要根据控制完成task个数控制心跳间隔。    while (running && !shuttingDown) {      try {        long now = System.currentTimeMillis();//获得当前时间                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            // accelerate to account for multiple finished tasks up-front        //通过完成的任务数动态控制心跳间隔时间         long remaining =          (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;        while (remaining > 0) {          // sleeps for the wait time or          // until there are *enough* empty slots to schedule tasks          synchronized (finishedCount) {            finishedCount.wait(remaining);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    // Recompute            now = System.currentTimeMillis();            remaining =              (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    if (remaining <= 0) {              // Reset count              finishedCount.set(0);              break;            }          }        }...//发送心跳// Send the heartbeat and process the jobtracker's directives        HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);//真正想JobTracker发送心跳.....//开始处理JobTracker返回的命令TaskTrackerAction[] actions = heartbeatResponse.getActions();...//杀死一定时间没没有汇报进度的taskmarkUnresponsiveTasks();//当剩余磁盘空间小于mapred.local.dir.minspacekill(默认为0)时,寻找合适的任务将其杀掉以释放空间killOverflowingTasks(); 
  • 电脑在使用之后会随着时间变化运行速度就会变得越来越慢,这主要是因为留下了一些缓存或是一些垃圾文件所以就导致电脑运行速度变慢。这时候就需要对磁盘进行清理了,那么你知道磁盘清理的作用是什么吗?

      电脑在使用之后会随着时间变化运行速度就会变得越来越慢,这主要是因为留下了一些缓存或是一些垃圾文件所以就导致电脑运行速度变慢。这时候就需要对磁盘进行清理了,那么你知道磁盘清理的作用是什么吗?

      由于硬盘被划分成一个一个簇,然后里头分成各个扇区,文件的大小不同,在储存的时候系统会搜索最相应的大小,久而久之在文件和文件之间会形成一些碎片,较大的文件也可能被分散存储,产生碎片以后,在读取文件时需要更多的时间和查找,从而减慢操作速度,对硬盘也有一定损害,因此我们一般会定期进行一次磁盘清理。

      磁盘清理很简单,最简单的就是从“开始-所有程序-附件-系统工具-磁盘碎片整理工具”,然后选择需要清理的磁盘即可,如下图:

    Win7磁盘清理

      另外还值得一提的是,使用较久的电脑会产生各种系统垃圾,也会导致速度变慢,通常大家习惯于定期清理系统垃圾,虽然对于保持系统性能帮助不少,但普通的清理垃圾不能整理磁盘碎片,因此清理磁盘是一种比较好的实现硬盘加速方法,因此也是电脑提升性能比较可取的方法。

      因此我们只要知道,磁盘清理的最大目的就是提升硬盘性能速度,另外对于硬盘保护也有一定关系,不过不建议频繁进行磁盘清理,过于频繁也会对磁盘产生一些负面影响,一般半年到一年左右清理一次就足够了。

      以上就是小编为大家介绍磁盘清理的作用,你明白了吗?

  • 相关推荐:

  • 磁盘清理的作用
  • 京东618电脑数码榜风起云
  • 当当网卖身:英雄迟暮,
  • 有益于NAS的技术之二:文
  • VMware终端用户计算模式变
  • Unix与Linux之间的差异不可
  • 云计算从服务到公共资源
  • “京条计划”之下惊现京
  • 老司机专享丨“猎时行动
  • “河南百所高校走进景安
  • 网站内容禁止违规转载,转载授权联系中国云计算网