Class LocalityMulticastAMRMProxyPolicy
- All Implemented Interfaces:
FederationAMRMProxyPolicy,ConfigurableFederationPolicy
FederationAMRMProxyPolicy interface that
carefully multicasts the requests with the following behavior:
Host localized ResourceRequests are always forwarded to the RM that
owns the corresponding node, based on the feedback of a
SubClusterResolver. If the SubClusterResolver cannot resolve
this node we default to forwarding the ResourceRequest to the home
sub-cluster.
Rack localized ResourceRequests are forwarded to the RMs that owns
the corresponding rack. Note that in some deployments each rack could be
striped across multiple RMs. This policy respects that. If the
SubClusterResolver cannot resolve this rack we default to forwarding
the ResourceRequest to the home sub-cluster.
ANY requests corresponding to node/rack local requests are forwarded only to the set of RMs that owns the corresponding localized requests. The number of containers listed in each ANY is proportional to the number of localized container requests (associated to this ANY via the same allocateRequestId).
ANY that are not associated to node/rack local requests are split among RMs
based on the "weights" in the WeightedPolicyInfo configuration *and*
headroom information. The headroomAlpha parameter of the policy
configuration indicates how much headroom contributes to the splitting
choice. Value of 1.0f indicates the weights are interpreted only as 0/1
boolean but all splitting is based on the advertised headroom (fallback to
1/N for RMs that we don't have headroom info from). An headroomAlpha
value of 0.0f means headroom is ignored and all splitting decisions are
proportional to the "weights" in the configuration of the policy.
ANY of zero size are forwarded to all known subclusters (i.e., subclusters where we scheduled containers before), as they may represent a user attempt to cancel a previous request (and we are mostly stateless now, so should forward to all known RMs).
Invariants:
The policy always excludes non-active RMs.
The policy always excludes RMs that do not appear in the policy configuration weights, or have a weight of 0 (even if localized resources explicit refer to it).
(Bar rounding to closest ceiling of fractional containers) The sum of requests made to multiple RMs at the ANY level "adds-up" to the user request. The maximum possible excess in a given request is a number of containers less or equal to number of sub-clusters in the federation.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected final classThis helper class is used to book-keep the requests made to each subcluster, and maintain useful statistics to split ANY requests. -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final intstatic final org.slf4j.Loggerstatic final String -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptioncomputeIntegerAssignment(int totalNum, ArrayList<Float> weightsList) Split the integer into bins according to the weights.protected SubClusterIdgetSubClusterForUnResolvedRequest(LocalityMulticastAMRMProxyPolicy.AllocationBookkeeper bookKeeper, long allocationId) For unit test to override.voidnotifyOfResponse(SubClusterId subClusterId, org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse response) This method should be invoked to notify the policy about responses being received.static StringprettyPrintRequests(List<org.apache.hadoop.yarn.api.records.ResourceRequest> response, int max) Print a list of Resource Requests into a one line string.voidreinitialize(FederationPolicyInitializationContext policyContext) This method is invoked to initialize of update the configuration of policies.protected SubClusterIdrouteNodeRequestIfNeeded(SubClusterId targetId, int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) When certain subcluster is too loaded, reroute Node requests going there.Map<SubClusterId,List<org.apache.hadoop.yarn.api.records.ResourceRequest>> splitResourceRequests(List<org.apache.hadoop.yarn.api.records.ResourceRequest> resourceRequests, Set<SubClusterId> timedOutSubClusters) Splits theResourceRequests from the client across one or more sub-clusters based on the policy semantics (e.g., broadcast, load-based).Methods inherited from class org.apache.hadoop.yarn.server.federation.policies.amrmproxy.AbstractAMRMProxyPolicy
validateMethods inherited from class org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy
getActiveSubclusters, getIsDirty, getPolicyContext, getPolicyInfo, setPolicyContext, setPolicyInfo
-
Field Details
-
LOG
public static final org.slf4j.Logger LOG -
PRINT_RR_MAX
- See Also:
-
DEFAULT_PRINT_RR_MAX
public static final int DEFAULT_PRINT_RR_MAX- See Also:
-
-
Constructor Details
-
LocalityMulticastAMRMProxyPolicy
public LocalityMulticastAMRMProxyPolicy()
-
-
Method Details
-
prettyPrintRequests
public static String prettyPrintRequests(List<org.apache.hadoop.yarn.api.records.ResourceRequest> response, int max) Print a list of Resource Requests into a one line string.- Parameters:
response- list of ResourceRequestmax- number of ResourceRequest to print- Returns:
- the printed one line string
-
reinitialize
public void reinitialize(FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException Description copied from interface:ConfigurableFederationPolicyThis method is invoked to initialize of update the configuration of policies. The implementor should provide try-n-swap semantics, and retain state if possible.- Specified by:
reinitializein interfaceConfigurableFederationPolicy- Overrides:
reinitializein classAbstractConfigurableFederationPolicy- Parameters:
policyContext- the new context to provide to implementor.- Throws:
FederationPolicyInitializationException- in case the initialization fails.
-
notifyOfResponse
public void notifyOfResponse(SubClusterId subClusterId, org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse response) throws org.apache.hadoop.yarn.exceptions.YarnException Description copied from interface:FederationAMRMProxyPolicyThis method should be invoked to notify the policy about responses being received. This is useful for stateful policies that make decisions based on previous responses being received.- Specified by:
notifyOfResponsein interfaceFederationAMRMProxyPolicy- Overrides:
notifyOfResponsein classAbstractAMRMProxyPolicy- Parameters:
subClusterId- the id of the subcluster sending the notificationresponse- the response received from one of the RMs- Throws:
org.apache.hadoop.yarn.exceptions.YarnException- in case the response is not valid
-
splitResourceRequests
public Map<SubClusterId,List<org.apache.hadoop.yarn.api.records.ResourceRequest>> splitResourceRequests(List<org.apache.hadoop.yarn.api.records.ResourceRequest> resourceRequests, Set<SubClusterId> timedOutSubClusters) throws org.apache.hadoop.yarn.exceptions.YarnException Description copied from interface:FederationAMRMProxyPolicySplits theResourceRequests from the client across one or more sub-clusters based on the policy semantics (e.g., broadcast, load-based).- Parameters:
resourceRequests- the list ofResourceRequests from the AM to be splittimedOutSubClusters- the set of sub-clusters that haven't had a successful heart-beat response for a while.- Returns:
- map of sub-cluster as identified by
SubClusterIdto the list ofResourceRequests that should be forwarded to it - Throws:
org.apache.hadoop.yarn.exceptions.YarnException- in case the request is malformed or no viable sub-clusters can be found.
-
getSubClusterForUnResolvedRequest
protected SubClusterId getSubClusterForUnResolvedRequest(LocalityMulticastAMRMProxyPolicy.AllocationBookkeeper bookKeeper, long allocationId) For unit test to override.- Parameters:
bookKeeper- bookKeeperallocationId- allocationId.- Returns:
- SubClusterId.
-
computeIntegerAssignment
@VisibleForTesting protected ArrayList<Integer> computeIntegerAssignment(int totalNum, ArrayList<Float> weightsList) throws org.apache.hadoop.yarn.exceptions.YarnException Split the integer into bins according to the weights.- Parameters:
totalNum- total number of containers to splitweightsList- the weights for each subcluster- Returns:
- the container allocation after split
- Throws:
org.apache.hadoop.yarn.exceptions.YarnException- if fails
-
routeNodeRequestIfNeeded
protected SubClusterId routeNodeRequestIfNeeded(SubClusterId targetId, int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) When certain subcluster is too loaded, reroute Node requests going there.- Parameters:
targetId- current subClusterId where request is sentmaxThreshold- threshold for Pending countactiveAndEnabledSCs- list of active sc- Returns:
- subClusterId target sc id
-