Class LocalityMulticastAMRMProxyPolicy

All Implemented Interfaces:
FederationAMRMProxyPolicy, ConfigurableFederationPolicy

public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy
An implementation of the FederationAMRMProxyPolicy interface that carefully multicasts the requests with the following behavior:

Host localized ResourceRequests are always forwarded to the RM that owns the corresponding node, based on the feedback of a SubClusterResolver. If the SubClusterResolver cannot resolve this node we default to forwarding the ResourceRequest to the home sub-cluster.

Rack localized ResourceRequests are forwarded to the RMs that owns the corresponding rack. Note that in some deployments each rack could be striped across multiple RMs. This policy respects that. If the SubClusterResolver cannot resolve this rack we default to forwarding the ResourceRequest to the home sub-cluster.

ANY requests corresponding to node/rack local requests are forwarded only to the set of RMs that owns the corresponding localized requests. The number of containers listed in each ANY is proportional to the number of localized container requests (associated to this ANY via the same allocateRequestId).

ANY that are not associated to node/rack local requests are split among RMs based on the "weights" in the WeightedPolicyInfo configuration *and* headroom information. The headroomAlpha parameter of the policy configuration indicates how much headroom contributes to the splitting choice. Value of 1.0f indicates the weights are interpreted only as 0/1 boolean but all splitting is based on the advertised headroom (fallback to 1/N for RMs that we don't have headroom info from). An headroomAlpha value of 0.0f means headroom is ignored and all splitting decisions are proportional to the "weights" in the configuration of the policy.

ANY of zero size are forwarded to all known subclusters (i.e., subclusters where we scheduled containers before), as they may represent a user attempt to cancel a previous request (and we are mostly stateless now, so should forward to all known RMs).

Invariants:

The policy always excludes non-active RMs.

The policy always excludes RMs that do not appear in the policy configuration weights, or have a weight of 0 (even if localized resources explicit refer to it).

(Bar rounding to closest ceiling of fractional containers) The sum of requests made to multiple RMs at the ANY level "adds-up" to the user request. The maximum possible excess in a given request is a number of containers less or equal to number of sub-clusters in the federation.

  • Field Details

  • Constructor Details

    • LocalityMulticastAMRMProxyPolicy

      public LocalityMulticastAMRMProxyPolicy()
  • Method Details

    • prettyPrintRequests

      public static String prettyPrintRequests(List<org.apache.hadoop.yarn.api.records.ResourceRequest> response, int max)
      Print a list of Resource Requests into a one line string.
      Parameters:
      response - list of ResourceRequest
      max - number of ResourceRequest to print
      Returns:
      the printed one line string
    • reinitialize

      public void reinitialize(FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException
      Description copied from interface: ConfigurableFederationPolicy
      This method is invoked to initialize of update the configuration of policies. The implementor should provide try-n-swap semantics, and retain state if possible.
      Specified by:
      reinitialize in interface ConfigurableFederationPolicy
      Overrides:
      reinitialize in class AbstractConfigurableFederationPolicy
      Parameters:
      policyContext - the new context to provide to implementor.
      Throws:
      FederationPolicyInitializationException - in case the initialization fails.
    • notifyOfResponse

      public void notifyOfResponse(SubClusterId subClusterId, org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse response) throws org.apache.hadoop.yarn.exceptions.YarnException
      Description copied from interface: FederationAMRMProxyPolicy
      This method should be invoked to notify the policy about responses being received. This is useful for stateful policies that make decisions based on previous responses being received.
      Specified by:
      notifyOfResponse in interface FederationAMRMProxyPolicy
      Overrides:
      notifyOfResponse in class AbstractAMRMProxyPolicy
      Parameters:
      subClusterId - the id of the subcluster sending the notification
      response - the response received from one of the RMs
      Throws:
      org.apache.hadoop.yarn.exceptions.YarnException - in case the response is not valid
    • splitResourceRequests

      public Map<SubClusterId,List<org.apache.hadoop.yarn.api.records.ResourceRequest>> splitResourceRequests(List<org.apache.hadoop.yarn.api.records.ResourceRequest> resourceRequests, Set<SubClusterId> timedOutSubClusters) throws org.apache.hadoop.yarn.exceptions.YarnException
      Description copied from interface: FederationAMRMProxyPolicy
      Splits the ResourceRequests from the client across one or more sub-clusters based on the policy semantics (e.g., broadcast, load-based).
      Parameters:
      resourceRequests - the list of ResourceRequests from the AM to be split
      timedOutSubClusters - the set of sub-clusters that haven't had a successful heart-beat response for a while.
      Returns:
      map of sub-cluster as identified by SubClusterId to the list of ResourceRequests that should be forwarded to it
      Throws:
      org.apache.hadoop.yarn.exceptions.YarnException - in case the request is malformed or no viable sub-clusters can be found.
    • getSubClusterForUnResolvedRequest

      protected SubClusterId getSubClusterForUnResolvedRequest(LocalityMulticastAMRMProxyPolicy.AllocationBookkeeper bookKeeper, long allocationId)
      For unit test to override.
      Parameters:
      bookKeeper - bookKeeper
      allocationId - allocationId.
      Returns:
      SubClusterId.
    • computeIntegerAssignment

      @VisibleForTesting protected ArrayList<Integer> computeIntegerAssignment(int totalNum, ArrayList<Float> weightsList) throws org.apache.hadoop.yarn.exceptions.YarnException
      Split the integer into bins according to the weights.
      Parameters:
      totalNum - total number of containers to split
      weightsList - the weights for each subcluster
      Returns:
      the container allocation after split
      Throws:
      org.apache.hadoop.yarn.exceptions.YarnException - if fails
    • routeNodeRequestIfNeeded

      protected SubClusterId routeNodeRequestIfNeeded(SubClusterId targetId, int maxThreshold, Set<SubClusterId> activeAndEnabledSCs)
      When certain subcluster is too loaded, reroute Node requests going there.
      Parameters:
      targetId - current subClusterId where request is sent
      maxThreshold - threshold for Pending count
      activeAndEnabledSCs - list of active sc
      Returns:
      subClusterId target sc id