Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aws instance type to affinity terms in the pod template #3783

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 67 additions & 38 deletions paasta_tools/cli/cmds/spark_run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import copy
import json
import logging
import os
Expand All @@ -14,6 +15,7 @@
from typing import Optional
from typing import Tuple
from typing import Union
from collections import defaultdict

import yaml
from boto3.exceptions import Boto3Error
Expand Down Expand Up @@ -75,27 +77,9 @@
POD_TEMPLATE_PATH = "/nail/tmp/spark-pt-{file_uuid}.yaml"
DEFAULT_RUNTIME_TIMEOUT = "12h"

POD_TEMPLATE = """
apiVersion: v1
kind: Pod
metadata:
labels:
spark: {spark_pod_label}
spec:
dnsPolicy: Default
affinity:
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 95
podAffinityTerm:
labelSelector:
matchExpressions:
- key: spark
operator: In
values:
- {spark_pod_label}
topologyKey: topology.kubernetes.io/hostname
"""
POD_TEMPLATE = defaultdict(
dict, {"apiVersion": "v1", "kind": "Pod", "spec": {"dnsPolicy": "Default"}}
)

deprecated_opts = {
"j": "spark.jars",
Expand Down Expand Up @@ -265,6 +249,11 @@ def add_subparser(subparsers):
default=default_spark_pool,
)

list_parser.add_argument(
"--aws-instance-types",
help="AWS instance types for executor, seperate by comma(,)",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small wording edit:

Suggested change
help="AWS instance types for executor, seperate by comma(,)",
help="AWS instance types for executor, separated by commas (,)",

it might also be nice to have arparse handle the splitting for us with something like:

Suggested change
help="AWS instance types for executor, seperate by comma(,)",
help="AWS instance types for executor, separated by commas (,)",
type=lambda instances: [instance for instance in instances.split(","))

)

list_parser.add_argument(
"-w",
"--work-dir",
Expand Down Expand Up @@ -522,6 +511,47 @@ def should_enable_compact_bin_packing(disable_compact_bin_packing, cluster_manag
return True


# inplace add a low priority podAffinityTerm for compact bin packing
def add_compact_bin_packing_affinity_term(pod: Dict, spark_pod_label: str):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: i'd probably rename pod here to pod_template to reduce confusion

suggestion: if y'all ever want to get rid of the incompletely typed Dict here, a possible option would be to use the models from the kubernetes client (e.g., https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1PodTemplate.md) internally and then serialize to yaml at the very end :)

suggestion: imo, it's a little preferable to not mutate inputs in-place since pure functions are generally easier to work with/test - but it's not a particularly big deal :)

suggestion (if this remains an impure function): typing this as def add_compact_bin_packing_affinity_term(pod: Dict, spark_pod_label: str) -> None and removing the return would reduce confusion

(same points apply to add_node_affinity_terms() below)

pod["metadata"]["labels"]["spark"] = spark_pod_label
pod["spec"]["affinity"]["podAffinity"].setdefault(
"preferredDuringSchedulingIgnoredDuringExecution", []
).append(
{
"weight": 95,
"podAffinityTerm": {
"labelSelector": {
"matchExpressions": [
{
"key": "spark",
"operator": "In",
"values": [spark_pod_label],
}
]
},
"topologyKey": "topology.kubernetes.io/hostname",
},
}
)
return pod


# inplace add nodeAffinity for node selection
def add_node_affinity_terms(pod: Dict, instance_types: str):
pod["spec"]["affinity"]["nodeAffinity"][
"requiredDuringSchedulingIgnoredDuringExecution"
].setdefault("nodeSelectorTerms", []).extend(
[
{
"key": "node.kubernetes.io/instance-type",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious: do we want users to specify an instance type (e.g., g4dn.xlarge vs g4dn.2xlarge) or would we be fine having them specify a family (e.g., g4dn) and letting karpenter spin up the most optimal instance type for the given requests at the time?

"operator": "In",
"values": instance_types.split(","),
},
]
)
return pod


def get_docker_run_cmd(
container_name,
volumes,
Expand Down Expand Up @@ -694,11 +724,8 @@ def get_spark_env(

def _parse_user_spark_args(
spark_args: Optional[str],
pod_template_path: str,
enable_compact_bin_packing: bool = False,
enable_spark_dra: bool = False,
) -> Dict[str, str]:

user_spark_opts = {}
if spark_args:
for spark_arg in spark_args.split():
Expand All @@ -713,9 +740,6 @@ def _parse_user_spark_args(
sys.exit(1)
user_spark_opts[fields[0]] = fields[1]

if enable_compact_bin_packing:
user_spark_opts["spark.kubernetes.executor.podTemplateFile"] = pod_template_path

if enable_spark_dra:
if (
"spark.dynamicAllocation.enabled" in user_spark_opts
Expand Down Expand Up @@ -1286,21 +1310,26 @@ def paasta_spark_run(args):

volumes = instance_config.get_volumes(system_paasta_config.get_volumes())
app_base_name = get_spark_app_name(args.cmd or instance_config.get_cmd())

if args.enable_compact_bin_packing:
document = POD_TEMPLATE.format(
spark_pod_label=limit_size_with_hash(f"exec-{app_base_name}"),
)
parsed_pod_template = yaml.safe_load(document)
with open(pod_template_path, "w") as f:
yaml.dump(parsed_pod_template, f)


user_spark_opts = _parse_user_spark_args(
args.spark_args,
pod_template_path,
args.enable_compact_bin_packing,
args.enable_dra,
)

if "spark.kubernetes.executor.podTemplateFile" not in user_spark_opts:
# update pod_template only if use does not provide one
pod_template = copy.deepcopy(POD_TEMPLATE)

spark_pod_label = limit_size_with_hash(f"exec-{app_base_name}")
if args.enable_compact_bin_packing:
add_compact_bin_packing_affinity_term(pod_template, spark_pod_label)
if args.aws_instance_types:
add_node_affinity_terms(pod_template, args.aws_instance_types)

with open(pod_template_path, "w") as f:
yaml.dump(pod_template, f)
user_spark_opts["spark.kubernetes.executor.podTemplateFile"] = pod_template_path


args.cmd = _auto_add_timeout_for_job(args.cmd, args.timeout_job_runtime)

Expand Down