From 86537ac63baccdede382e164ed19717aa329de51 Mon Sep 17 00:00:00 2001 From: Yunhui Zhang Date: Fri, 19 Jan 2024 14:18:12 -0800 Subject: [PATCH] Add aws instance type to affinity terms in the pod template --- paasta_tools/cli/cmds/spark_run.py | 105 ++++++++++++++++++----------- 1 file changed, 67 insertions(+), 38 deletions(-) diff --git a/paasta_tools/cli/cmds/spark_run.py b/paasta_tools/cli/cmds/spark_run.py index f9145305d2..73c7350505 100644 --- a/paasta_tools/cli/cmds/spark_run.py +++ b/paasta_tools/cli/cmds/spark_run.py @@ -1,4 +1,5 @@ import argparse +import copy import json import logging import os @@ -14,6 +15,7 @@ from typing import Optional from typing import Tuple from typing import Union +from collections import defaultdict import yaml from boto3.exceptions import Boto3Error @@ -75,27 +77,9 @@ POD_TEMPLATE_PATH = "/nail/tmp/spark-pt-{file_uuid}.yaml" DEFAULT_RUNTIME_TIMEOUT = "12h" -POD_TEMPLATE = """ -apiVersion: v1 -kind: Pod -metadata: - labels: - spark: {spark_pod_label} -spec: - dnsPolicy: Default - affinity: - podAffinity: - preferredDuringSchedulingIgnoredDuringExecution: - - weight: 95 - podAffinityTerm: - labelSelector: - matchExpressions: - - key: spark - operator: In - values: - - {spark_pod_label} - topologyKey: topology.kubernetes.io/hostname -""" +POD_TEMPLATE = defaultdict( + dict, {"apiVersion": "v1", "kind": "Pod", "spec": {"dnsPolicy": "Default"}} +) deprecated_opts = { "j": "spark.jars", @@ -265,6 +249,11 @@ def add_subparser(subparsers): default=default_spark_pool, ) + list_parser.add_argument( + "--aws-instance-types", + help="AWS instance types for executor, seperate by comma(,)", + ) + list_parser.add_argument( "-w", "--work-dir", @@ -522,6 +511,47 @@ def should_enable_compact_bin_packing(disable_compact_bin_packing, cluster_manag return True +# inplace add a low priority podAffinityTerm for compact bin packing +def add_compact_bin_packing_affinity_term(pod: Dict, spark_pod_label: str): + pod["metadata"]["labels"]["spark"] = spark_pod_label + pod["spec"]["affinity"]["podAffinity"].setdefault( + "preferredDuringSchedulingIgnoredDuringExecution", [] + ).append( + { + "weight": 95, + "podAffinityTerm": { + "labelSelector": { + "matchExpressions": [ + { + "key": "spark", + "operator": "In", + "values": [spark_pod_label], + } + ] + }, + "topologyKey": "topology.kubernetes.io/hostname", + }, + } + ) + return pod + + +# inplace add nodeAffinity for node selection +def add_node_affinity_terms(pod: Dict, instance_types: str): + pod["spec"]["affinity"]["nodeAffinity"][ + "requiredDuringSchedulingIgnoredDuringExecution" + ].setdefault("nodeSelectorTerms", []).extend( + [ + { + "key": "node.kubernetes.io/instance-type", + "operator": "In", + "values": instance_types.split(","), + }, + ] + ) + return pod + + def get_docker_run_cmd( container_name, volumes, @@ -694,11 +724,8 @@ def get_spark_env( def _parse_user_spark_args( spark_args: Optional[str], - pod_template_path: str, - enable_compact_bin_packing: bool = False, enable_spark_dra: bool = False, ) -> Dict[str, str]: - user_spark_opts = {} if spark_args: for spark_arg in spark_args.split(): @@ -713,9 +740,6 @@ def _parse_user_spark_args( sys.exit(1) user_spark_opts[fields[0]] = fields[1] - if enable_compact_bin_packing: - user_spark_opts["spark.kubernetes.executor.podTemplateFile"] = pod_template_path - if enable_spark_dra: if ( "spark.dynamicAllocation.enabled" in user_spark_opts @@ -1286,21 +1310,26 @@ def paasta_spark_run(args): volumes = instance_config.get_volumes(system_paasta_config.get_volumes()) app_base_name = get_spark_app_name(args.cmd or instance_config.get_cmd()) - - if args.enable_compact_bin_packing: - document = POD_TEMPLATE.format( - spark_pod_label=limit_size_with_hash(f"exec-{app_base_name}"), - ) - parsed_pod_template = yaml.safe_load(document) - with open(pod_template_path, "w") as f: - yaml.dump(parsed_pod_template, f) - + user_spark_opts = _parse_user_spark_args( args.spark_args, - pod_template_path, - args.enable_compact_bin_packing, args.enable_dra, ) + + if "spark.kubernetes.executor.podTemplateFile" not in user_spark_opts: + # update pod_template only if use does not provide one + pod_template = copy.deepcopy(POD_TEMPLATE) + + spark_pod_label = limit_size_with_hash(f"exec-{app_base_name}") + if args.enable_compact_bin_packing: + add_compact_bin_packing_affinity_term(pod_template, spark_pod_label) + if args.aws_instance_types: + add_node_affinity_terms(pod_template, args.aws_instance_types) + + with open(pod_template_path, "w") as f: + yaml.dump(pod_template, f) + user_spark_opts["spark.kubernetes.executor.podTemplateFile"] = pod_template_path + args.cmd = _auto_add_timeout_for_job(args.cmd, args.timeout_job_runtime)