-
Notifications
You must be signed in to change notification settings - Fork 241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add aws instance type to affinity terms in the pod template #3783
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
import argparse | ||
import copy | ||
import json | ||
import logging | ||
import os | ||
|
@@ -14,6 +15,7 @@ | |
from typing import Optional | ||
from typing import Tuple | ||
from typing import Union | ||
from collections import defaultdict | ||
|
||
import yaml | ||
from boto3.exceptions import Boto3Error | ||
|
@@ -75,27 +77,9 @@ | |
POD_TEMPLATE_PATH = "/nail/tmp/spark-pt-{file_uuid}.yaml" | ||
DEFAULT_RUNTIME_TIMEOUT = "12h" | ||
|
||
POD_TEMPLATE = """ | ||
apiVersion: v1 | ||
kind: Pod | ||
metadata: | ||
labels: | ||
spark: {spark_pod_label} | ||
spec: | ||
dnsPolicy: Default | ||
affinity: | ||
podAffinity: | ||
preferredDuringSchedulingIgnoredDuringExecution: | ||
- weight: 95 | ||
podAffinityTerm: | ||
labelSelector: | ||
matchExpressions: | ||
- key: spark | ||
operator: In | ||
values: | ||
- {spark_pod_label} | ||
topologyKey: topology.kubernetes.io/hostname | ||
""" | ||
POD_TEMPLATE = defaultdict( | ||
dict, {"apiVersion": "v1", "kind": "Pod", "spec": {"dnsPolicy": "Default"}} | ||
) | ||
|
||
deprecated_opts = { | ||
"j": "spark.jars", | ||
|
@@ -265,6 +249,11 @@ def add_subparser(subparsers): | |
default=default_spark_pool, | ||
) | ||
|
||
list_parser.add_argument( | ||
"--aws-instance-types", | ||
help="AWS instance types for executor, seperate by comma(,)", | ||
) | ||
|
||
list_parser.add_argument( | ||
"-w", | ||
"--work-dir", | ||
|
@@ -522,6 +511,47 @@ def should_enable_compact_bin_packing(disable_compact_bin_packing, cluster_manag | |
return True | ||
|
||
|
||
# inplace add a low priority podAffinityTerm for compact bin packing | ||
def add_compact_bin_packing_affinity_term(pod: Dict, spark_pod_label: str): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: i'd probably rename suggestion: if y'all ever want to get rid of the incompletely typed Dict here, a possible option would be to use the models from the kubernetes client (e.g., https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1PodTemplate.md) internally and then serialize to yaml at the very end :) suggestion: imo, it's a little preferable to not mutate inputs in-place since pure functions are generally easier to work with/test - but it's not a particularly big deal :) suggestion (if this remains an impure function): typing this as (same points apply to |
||
pod["metadata"]["labels"]["spark"] = spark_pod_label | ||
pod["spec"]["affinity"]["podAffinity"].setdefault( | ||
"preferredDuringSchedulingIgnoredDuringExecution", [] | ||
).append( | ||
{ | ||
"weight": 95, | ||
"podAffinityTerm": { | ||
"labelSelector": { | ||
"matchExpressions": [ | ||
{ | ||
"key": "spark", | ||
"operator": "In", | ||
"values": [spark_pod_label], | ||
} | ||
] | ||
}, | ||
"topologyKey": "topology.kubernetes.io/hostname", | ||
}, | ||
} | ||
) | ||
return pod | ||
|
||
|
||
# inplace add nodeAffinity for node selection | ||
def add_node_affinity_terms(pod: Dict, instance_types: str): | ||
pod["spec"]["affinity"]["nodeAffinity"][ | ||
"requiredDuringSchedulingIgnoredDuringExecution" | ||
].setdefault("nodeSelectorTerms", []).extend( | ||
[ | ||
{ | ||
"key": "node.kubernetes.io/instance-type", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just curious: do we want users to specify an instance type (e.g., g4dn.xlarge vs g4dn.2xlarge) or would we be fine having them specify a family (e.g., g4dn) and letting karpenter spin up the most optimal instance type for the given requests at the time? |
||
"operator": "In", | ||
"values": instance_types.split(","), | ||
}, | ||
] | ||
) | ||
return pod | ||
|
||
|
||
def get_docker_run_cmd( | ||
container_name, | ||
volumes, | ||
|
@@ -694,11 +724,8 @@ def get_spark_env( | |
|
||
def _parse_user_spark_args( | ||
spark_args: Optional[str], | ||
pod_template_path: str, | ||
enable_compact_bin_packing: bool = False, | ||
enable_spark_dra: bool = False, | ||
) -> Dict[str, str]: | ||
|
||
user_spark_opts = {} | ||
if spark_args: | ||
for spark_arg in spark_args.split(): | ||
|
@@ -713,9 +740,6 @@ def _parse_user_spark_args( | |
sys.exit(1) | ||
user_spark_opts[fields[0]] = fields[1] | ||
|
||
if enable_compact_bin_packing: | ||
user_spark_opts["spark.kubernetes.executor.podTemplateFile"] = pod_template_path | ||
|
||
if enable_spark_dra: | ||
if ( | ||
"spark.dynamicAllocation.enabled" in user_spark_opts | ||
|
@@ -1286,21 +1310,26 @@ def paasta_spark_run(args): | |
|
||
volumes = instance_config.get_volumes(system_paasta_config.get_volumes()) | ||
app_base_name = get_spark_app_name(args.cmd or instance_config.get_cmd()) | ||
|
||
if args.enable_compact_bin_packing: | ||
document = POD_TEMPLATE.format( | ||
spark_pod_label=limit_size_with_hash(f"exec-{app_base_name}"), | ||
) | ||
parsed_pod_template = yaml.safe_load(document) | ||
with open(pod_template_path, "w") as f: | ||
yaml.dump(parsed_pod_template, f) | ||
|
||
|
||
user_spark_opts = _parse_user_spark_args( | ||
args.spark_args, | ||
pod_template_path, | ||
args.enable_compact_bin_packing, | ||
args.enable_dra, | ||
) | ||
|
||
if "spark.kubernetes.executor.podTemplateFile" not in user_spark_opts: | ||
# update pod_template only if use does not provide one | ||
pod_template = copy.deepcopy(POD_TEMPLATE) | ||
|
||
spark_pod_label = limit_size_with_hash(f"exec-{app_base_name}") | ||
if args.enable_compact_bin_packing: | ||
add_compact_bin_packing_affinity_term(pod_template, spark_pod_label) | ||
if args.aws_instance_types: | ||
add_node_affinity_terms(pod_template, args.aws_instance_types) | ||
|
||
with open(pod_template_path, "w") as f: | ||
yaml.dump(pod_template, f) | ||
user_spark_opts["spark.kubernetes.executor.podTemplateFile"] = pod_template_path | ||
|
||
|
||
args.cmd = _auto_add_timeout_for_job(args.cmd, args.timeout_job_runtime) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small wording edit:
it might also be nice to have arparse handle the splitting for us with something like: