What is headroom? It is defined as "the maximum resource of an application can get". Headroom calculation is used mainly for reducer preemption. What if the headroom calculation is wrong?

In our production clusters, we observed that some MR jobs stuck in deadlock states. The scenario usually happens when some map tasks fail and the application master needs to reschedule them. Due to the headroom miscalculation, the application master thinks there are enough resources to relaunch the failed map tasks, thus it does not preempt reduce tasks. But in fact, all the resources are occupied by the reducers. As a result, the reducers wait for all map outputs before they could finish, on the other hand, the failed map tasks wait for reducers to finish to release resource so that they can have a chance to rerun.

This issue is discussed in YARN-1198 and its sub JIRAs YARN-1857, YARN-1680, and YARN-2008.

This issue partly comes from the fact that the headroom is calculated in YARN resource scheduler, but the preempt decision is made by the application master so that YARN is agnostic to the application details. If the application master does not report correctly about node resources, the miscalculation can happen.

Here I like to look into the code to see what happens under the hood.

The resource allocation protocol between the application master and the resource manager is defined in ApplicationMasterProtocol and especially the allocate() method.

public interface ApplicationMasterProtocol {
  public AllocateResponse allocate(AllocateRequest request) 
  throws YarnException, IOException;
}

When the ApplicationMasterService in resource manager receives the allocate request from the application master, it calls the allocate() method in YARN resource scheduler.

public class ApplicationMasterService extends AbstractService implements
    ApplicationMasterProtocol {
  private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
  private final AMLivelinessMonitor amLivelinessMonitor;
  private YarnScheduler rScheduler;
  private InetSocketAddress bindAddress;
  private Server server;
  private final RecordFactory recordFactory =
      RecordFactoryProvider.getRecordFactory(null);
  private final ConcurrentMap responseMap =
      new ConcurrentHashMap();
  private final AllocateResponse resync =
      recordFactory.newRecordInstance(AllocateResponse.class);
  private final RMContext rmContext;
  @Override
  public AllocateResponse allocate(AllocateRequest request)
      throws YarnException, IOException {

    ApplicationAttemptId appAttemptId = authorizeRequest();

    this.amLivelinessMonitor.receivedPing(appAttemptId);

    /* check if its in cache */
    AllocateResponseLock lock = responseMap.get(appAttemptId);
    if (lock == null) {
      LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
      return resync;
    }
    synchronized (lock) {
      AllocateResponse lastResponse = lock.getAllocateResponse();
      if (!hasApplicationMasterRegistered(appAttemptId)) {
        String message =
            "Application Master is trying to allocate before registering for: "
                + appAttemptId.getApplicationId();
        LOG.error(message);
        RMAuditLogger.logFailure(
            this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
                .getUser(), AuditConstants.REGISTER_AM, "",
            "ApplicationMasterService", message,
            appAttemptId.getApplicationId(),
            appAttemptId);
        throw new InvalidApplicationMasterRequestException(message);
      }

      if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
        /* old heartbeat */
        return lastResponse;
      } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
        LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
        // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
        // Reboot is not useful since after AM reboots, it will send register
        // and
        // get an exception. Might as well throw an exception here.
        return resync;
      }

      // Send the status update to the appAttempt.
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppAttemptStatusupdateEvent(appAttemptId, request
              .getProgress()));

      List ask = request.getAskList();
      List release = request.getReleaseList();

      ResourceBlacklistRequest blacklistRequest =
          request.getResourceBlacklistRequest();
      List blacklistAdditions =
          (blacklistRequest != null) ?
              blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
      List blacklistRemovals =
          (blacklistRequest != null) ?
              blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;

      // sanity check
      try {
        RMServerUtils.validateResourceRequests(ask,
            rScheduler.getMaximumResourceCapability());
      } catch (InvalidResourceRequestException e) {
        LOG.warn("Invalid resource ask by application " + appAttemptId, e);
        throw e;
      }
      
      try {
        RMServerUtils.validateBlacklistRequest(blacklistRequest);
      }  catch (InvalidResourceBlacklistRequestException e) {
        LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
        throw e;
      }

      RMApp app =
          this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
      // In the case of work-preserving AM restart, it's possible for the
      // AM to release containers from the earlier attempt.
      if (!app.getApplicationSubmissionContext()
        .getKeepContainersAcrossApplicationAttempts()) {
        try {
          RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
        } catch (InvalidContainerReleaseException e) {
          LOG.warn("Invalid container release by application " + appAttemptId, e);
          throw e;
        }
      }

      // Send new requests to appAttempt.
      Allocation allocation =
          this.rScheduler.allocate(appAttemptId, ask, release, 
              blacklistAdditions, blacklistRemovals);

      if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
        LOG.info("blacklist are updated in Scheduler." +
            "blacklistAdditions: " + blacklistAdditions + ", " +
            "blacklistRemovals: " + blacklistRemovals);
      }
      RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
      AllocateResponse allocateResponse =
          recordFactory.newRecordInstance(AllocateResponse.class);
      if (!allocation.getContainers().isEmpty()) {
        allocateResponse.setNMTokens(allocation.getNMTokens());
      }

      // update the response with the deltas of node status changes
      List updatedNodes = new ArrayList();
      if(app.pullRMNodeUpdates(updatedNodes) > 0) {
        List updatedNodeReports = new ArrayList();
        for(RMNode rmNode: updatedNodes) {
          SchedulerNodeReport schedulerNodeReport =  
              rScheduler.getNodeReport(rmNode.getNodeID());
          Resource used = BuilderUtils.newResource(0, 0);
          int numContainers = 0;
          if (schedulerNodeReport != null) {
            used = schedulerNodeReport.getUsedResource();
            numContainers = schedulerNodeReport.getNumContainers();
          }
          NodeReport report = BuilderUtils.newNodeReport(rmNode.getNodeID(),
              rmNode.getState(),
              rmNode.getHttpAddress(), rmNode.getRackName(), used,
              rmNode.getTotalCapability(), numContainers,
              rmNode.getHealthReport(),
              rmNode.getLastHealthReportTime());

          updatedNodeReports.add(report);
        }
        allocateResponse.setUpdatedNodes(updatedNodeReports);
      }

      allocateResponse.setAllocatedContainers(allocation.getContainers());
      allocateResponse.setCompletedContainersStatuses(appAttempt
          .pullJustFinishedContainers());
      allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
      allocateResponse.setAvailableResources(allocation.getResourceLimit());

      allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());

      // add preemption to the allocateResponse message (if any)
      allocateResponse
          .setPreemptionMessage(generatePreemptionMessage(allocation));

      /*
       * As we are updating the response inside the lock object so we don't
       * need to worry about unregister call occurring in between (which
       * removes the lock object).
       */
      lock.setAllocateResponse(allocateResponse);
      return allocateResponse;
    }    
  }
}

The resource limit, i.e., the headroom, is returned in the AllocateResponse. Take the capacity scheduler as an example, the LeafQueue calculates the headroom for an application.

public class LeafQueue implements CSQueue {

  @Lock({LeafQueue.class, FiCaSchedulerApp.class})
  Resource computeUserLimitAndSetHeadroom(
      FiCaSchedulerApp application, Resource clusterResource, Resource required) {
    
    String user = application.getUser();
    
    /** 
     * Headroom is min((userLimit, queue-max-cap) - consumed)
     */

    Resource userLimit =                          // User limit
        computeUserLimit(application, clusterResource, required);

    //Max avail capacity needs to take into account usage by ancestor-siblings
    //which are greater than their base capacity, so we are interested in "max avail"
    //capacity
    float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity(
      resourceCalculator, clusterResource, this);

    Resource queueMaxCap =                        // Queue Max-Capacity
        Resources.multiplyAndNormalizeDown(
            resourceCalculator, 
            clusterResource, 
            absoluteMaxAvailCapacity,
            minimumAllocation);
    
    Resource userConsumed = getUser(user).getConsumedResources(); 
    Resource headroom =
      Resources.min(resourceCalculator, clusterResource,
        Resources.subtract(
            Resources.min(resourceCalculator, clusterResource, 
                userLimit, queueMaxCap), 
            userConsumed),
 Resources.subtract(queueMaxCap, usedResources));   
    
    application.setHeadroom(headroom);
    metrics.setAvailableResourcesToUser(user, headroom);
    
    return userLimit;
  }
}

Each application attempt i.e., SchedulerApplicationAttempt in the eye of the scheduler, includes a resourceLimit, i.e., headroom.

public class SchedulerApplicationAttempt implements Comparable {

  private Resource resourceLimit = Resource.newInstance(0, 0);

  public synchronized void setHeadroom(Resource globalLimit) {
    this.resourceLimit = globalLimit; 
  }

  /**
   * Get available headroom in terms of resources for the application's user.
   * @return available resource headroom
   */
  public synchronized Resource getHeadroom() {
    // Corner case to deal with applications being slightly over-limit
    if (resourceLimit.getMemory() < 0) {
      resourceLimit.setMemory(0);
    }
    
    return resourceLimit;
  }
}

When the Application Master receives the headroom, it stores the resource as availableResources in RMContainerRequestor.



public abstract class RMContainerRequestor extends RMCommunicator {

  private Resource availableResources;

  protected AllocateResponse makeRemoteRequest() throws IOException {
    ResourceBlacklistRequest blacklistRequest =
        ResourceBlacklistRequest.newInstance(new ArrayList(blacklistAdditions),
            new ArrayList(blacklistRemovals));
    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList(ask),
          new ArrayList(release), blacklistRequest);
    AllocateResponse allocateResponse;
    try {
      allocateResponse = scheduler.allocate(allocateRequest);
    } catch (YarnException e) {
      throw new IOException(e);
    }
    lastResponseID = allocateResponse.getResponseId();
    availableResources = allocateResponse.getAvailableResources();
    lastClusterNmCount = clusterNmCount;
    clusterNmCount = allocateResponse.getNumClusterNodes();

    if (ask.size() > 0 || release.size() > 0) {
      LOG.info("getResources() for " + applicationId + ":" + " ask="
          + ask.size() + " release= " + release.size() + " newContainers="
          + allocateResponse.getAllocatedContainers().size()
          + " finishedContainers="
          + allocateResponse.getCompletedContainersStatuses().size()
          + " resourcelimit=" + availableResources + " knownNMs="
          + clusterNmCount);
    }

    ask.clear();
    release.clear();

    if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) {
      LOG.info("Update the blacklist for " + applicationId +
          ": blacklistAdditions=" + blacklistAdditions.size() +
          " blacklistRemovals=" +  blacklistRemovals.size());
    }
    blacklistAdditions.clear();
    blacklistRemovals.clear();
    return allocateResponse;
  }
}

Its parent class RMContainerAllocator utilizes the availableResources to derive the memory limit.



public class RMContainerAllocator extends RMContainerRequestor
    implements ContainerAllocator {

  public int getMemLimit() {
    int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
    return headRoom + assignedRequests.maps.size() * mapResourceReqt + 
       assignedRequests.reduces.size() * reduceResourceReqt;
  }
}
RMContainerAllocator communicate with the resource manager via the heartbeat() method, where the memory limit is used in preemptReducesIfNeeded() and scheduleReduces()

public class RMContainerAllocator extends RMContainerRequestor
    implements ContainerAllocator {

  protected synchronized void heartbeat() throws Exception {
    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
    List allocatedContainers = getResources();
    if (allocatedContainers.size() > 0) {
      scheduledRequests.assign(allocatedContainers);
    }

    int completedMaps = getJob().getCompletedMaps();
    int completedTasks = completedMaps + getJob().getCompletedReduces();
    if ((lastCompletedTasks != completedTasks) ||
          (scheduledRequests.maps.size() > 0)) {
      lastCompletedTasks = completedTasks;
      recalculateReduceSchedule = true;
    }

    if (recalculateReduceSchedule) {
      preemptReducesIfNeeded();
      scheduleReduces(
          getJob().getTotalMaps(), completedMaps,
          scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
          assignedRequests.maps.size(), assignedRequests.reduces.size(),
          mapResourceReqt, reduceResourceReqt,
          pendingReduces.size(), 
          maxReduceRampupLimit, reduceSlowStart);
      recalculateReduceSchedule = false;
    }

    scheduleStats.updateAndLogIfChanged("After Scheduling: ");
  }
}

Let us take a look at preemptReducesIfNeeded() first.



public class RMContainerAllocator extends RMContainerRequestor
    implements ContainerAllocator {

  private void preemptReducesIfNeeded() {
    if (reduceResourceReqt == 0) {
      return; //no reduces
    }
    //check if reduces have taken over the whole cluster and there are 
    //unassigned maps
    if (scheduledRequests.maps.size() > 0) {
      int memLimit = getMemLimit();
      int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
          assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
      //availableMemForMap must be sufficient to run atleast 1 map
      if (availableMemForMap < mapResourceReqt) {
        //to make sure new containers are given to maps and not reduces
        //ramp down all scheduled reduces if any
        //(since reduces are scheduled at higher priority than maps)
        LOG.info("Ramping down all scheduled reduces:" + scheduledRequests.reduces.size());
        for (ContainerRequest req : scheduledRequests.reduces.values()) {
          pendingReduces.add(req);
        }
        scheduledRequests.reduces.clear();
        
        //preempt for making space for atleast one map
        int premeptionLimit = Math.max(mapResourceReqt, 
            (int) (maxReducePreemptionLimit * memLimit));
        
        int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, 
            premeptionLimit);
        
        int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
        toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
        
        LOG.info("Going to preempt " + toPreempt);
        assignedRequests.preemptReduce(toPreempt);
      }
    }
  }
}
As shown in above, if the memLimit derived from the resource scheduler is wrong, the condition "availableMemForMap < mapResourceReqt" may not hold any more and hence, the line assignedRequests.preemptReduce(toPreempt) will not be called and no reducer will be preempted.

The headroom is also used in making decision on how to schedule reducers, i.e., how many maps and how many reduces should be launched to share the available resources.

public class RMContainerAllocator extends RMContainerRequestor
    implements ContainerAllocator {

  public void scheduleReduces(
      int totalMaps, int completedMaps,
      int scheduledMaps, int scheduledReduces,
      int assignedMaps, int assignedReduces,
      int mapResourceReqt, int reduceResourceReqt,
      int numPendingReduces,
      float maxReduceRampupLimit, float reduceSlowStart) {
    
    if (numPendingReduces == 0) {
      return;
    }
    
    int headRoom = getAvailableResources() != null ?
        getAvailableResources().getMemory() : 0;
    LOG.info("Recalculating schedule, headroom=" + headRoom);
    
    //check for slow start
    if (!getIsReduceStarted()) {//not set yet
      int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart * 
                      totalMaps);
      if(completedMaps < completedMapsForReduceSlowstart) {
        LOG.info("Reduce slow start threshold not met. " +
              "completedMapsForReduceSlowstart " + 
            completedMapsForReduceSlowstart);
        return;
      } else {
        LOG.info("Reduce slow start threshold reached. Scheduling reduces.");
        setIsReduceStarted(true);
      }
    }
    
    //if all maps are assigned, then ramp up all reduces irrespective of the
    //headroom
    if (scheduledMaps == 0 && numPendingReduces > 0) {
      LOG.info("All maps assigned. " +
          "Ramping up all remaining reduces:" + numPendingReduces);
      scheduleAllReduces();
      return;
    }

    float completedMapPercent = 0f;
    if (totalMaps != 0) {//support for 0 maps
      completedMapPercent = (float)completedMaps/totalMaps;
    } else {
      completedMapPercent = 1;
    }
    
    int netScheduledMapMem = 
        (scheduledMaps + assignedMaps) * mapResourceReqt;

    int netScheduledReduceMem = 
        (scheduledReduces + assignedReduces) * reduceResourceReqt;

    int finalMapMemLimit = 0;
    int finalReduceMemLimit = 0;
    
    // ramp up the reduces based on completed map percentage
    int totalMemLimit = getMemLimit();
    int idealReduceMemLimit = 
        Math.min(
            (int)(completedMapPercent * totalMemLimit),
            (int) (maxReduceRampupLimit * totalMemLimit));
    int idealMapMemLimit = totalMemLimit - idealReduceMemLimit;

    // check if there aren't enough maps scheduled, give the free map capacity
    // to reduce
    if (idealMapMemLimit > netScheduledMapMem) {
      int unusedMapMemLimit = idealMapMemLimit - netScheduledMapMem;
      finalReduceMemLimit = idealReduceMemLimit + unusedMapMemLimit;
      finalMapMemLimit = totalMemLimit - finalReduceMemLimit;
    } else {
      finalMapMemLimit = idealMapMemLimit;
      finalReduceMemLimit = idealReduceMemLimit;
    }
    
    LOG.info("completedMapPercent " + completedMapPercent +
        " totalMemLimit:" + totalMemLimit +
        " finalMapMemLimit:" + finalMapMemLimit +
        " finalReduceMemLimit:" + finalReduceMemLimit + 
        " netScheduledMapMem:" + netScheduledMapMem +
        " netScheduledReduceMem:" + netScheduledReduceMem);
    
    int rampUp = 
        (finalReduceMemLimit - netScheduledReduceMem) / reduceResourceReqt;
    
    if (rampUp > 0) {
      rampUp = Math.min(rampUp, numPendingReduces);
      LOG.info("Ramping up " + rampUp);
      rampUpReduces(rampUp);
    } else if (rampUp < 0){
      int rampDown = -1 * rampUp;
      rampDown = Math.min(rampDown, scheduledReduces);
      LOG.info("Ramping down " + rampDown);
      rampDownReduces(rampDown);
    }
  }
}
Previous Post Next Post