Skip to content

Commit 42eb344

Browse files
authored
Add support for setting Kubernetes affinity in pipeline configuration (#276)
* Add support for setting Kubernetes affinity in pipeline configuration * chore(ci): update upload/download-artifact actions to v4
1 parent 03693ee commit 42eb344

6 files changed

Lines changed: 138 additions & 2 deletions

File tree

.github/workflows/test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
tox -v
4545
4646
- name: Store coverage reports
47-
uses: actions/upload-artifact@v3
47+
uses: actions/upload-artifact@v4
4848
with:
4949
name: coverage-${{ matrix.python-version }}
5050
path: coverage.xml
@@ -59,7 +59,7 @@ jobs:
5959
with:
6060
fetch-depth: 0
6161

62-
- uses: actions/download-artifact@v3
62+
- uses: actions/download-artifact@v4
6363
with:
6464
name: coverage-3.9
6565
path: .

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## [Unreleased]
44

5+
- Add support for setting Kubernetes affinity in pipeline configuration
6+
57
## [0.8.0] - 2024-09-04
68

79
- Added support for python 3.11. Python 3.12 is still blocked by kedro-mlflow

docs/source/02_installation/02_configuration.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,25 @@ run_config:
104104
value: "gpu_workload"
105105
effect: "NoSchedule"
106106

107+
# Optional section allowing adjustment of the affinity for the nodes
108+
affinity:
109+
__default__:
110+
node_affinity:
111+
required_during_scheduling_ignored_during_execution:
112+
nodeSelectorTerms:
113+
- matchExpressions:
114+
- key: "node-type"
115+
operator: "In"
116+
values: ["general"]
117+
node_a:
118+
node_affinity:
119+
required_during_scheduling_ignored_during_execution:
120+
nodeSelectorTerms:
121+
- matchExpressions:
122+
- key: "gpu"
123+
operator: "In"
124+
values: ["true"]
125+
107126
# Optional section to allow mounting additional volumes (such as EmptyDir)
108127
# to specific nodes
109128
extra_volumes:

kedro_kubeflow/config.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,25 @@ def volume_validator(cls, value):
259259
return value
260260

261261

262+
class AffinityConfig(defaultdict):
263+
def __init__(self, default_factory=None, *args, **kwargs):
264+
super().__init__(default_factory, *args, **kwargs)
265+
266+
def is_set_for(self, node_name):
267+
return bool(self.get_for(node_name))
268+
269+
def get_for(self, node_name):
270+
node_value = self._get_or_default(node_name, None)
271+
if node_value is not None:
272+
return node_value
273+
return self._get_or_default("__default__", None)
274+
275+
def _get_or_default(self, node_name, default):
276+
if node_name in self:
277+
return self[node_name]
278+
return self.get("__default__", default)
279+
280+
262281
class RunConfig(BaseModel):
263282
def __init__(self, **kwargs):
264283
super().__init__(**kwargs)
@@ -293,6 +312,10 @@ def _validate_tolerations(cls, value):
293312
def _validate_extra_volumes(cls, value):
294313
return RunConfig._create_default_dict_with(value, [], defaultdict)
295314

315+
@validator("affinity", always=True)
316+
def _validate_affinity(cls, value):
317+
return RunConfig._create_default_dict_with(value, None, AffinityConfig)
318+
296319
image: str
297320
image_pull_policy: str = "IfNotPresent"
298321
root: Optional[str]
@@ -305,6 +328,7 @@ def _validate_extra_volumes(cls, value):
305328
retry_policy: Optional[Dict[str, Optional[RetryPolicyConfig]]]
306329
volume: Optional[VolumeConfig] = None
307330
extra_volumes: Optional[Dict[str, List[ExtraVolumeConfig]]] = None
331+
affinity: Optional[Dict[str, Any]] = None
308332
wait_for_completion: bool = False
309333
store_kedro_outputs_as_kfp_artifacts: bool = True
310334
max_cache_staleness: Optional[str] = None

kedro_kubeflow/generators/utils.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,29 @@ def create_pipeline_exit_handler(pipeline, image, image_pull_policy, run_config,
145145
return dsl.ExitHandler(customize_op(exit_container_op, image_pull_policy, run_config))
146146

147147

148+
def dict_to_v1affinity(affinity_dict):
149+
def convert_node_affinity(na):
150+
if not na:
151+
return None
152+
return k8s.V1NodeAffinity(**na)
153+
154+
def convert_pod_affinity(pa):
155+
if not pa:
156+
return None
157+
return k8s.V1PodAffinity(**pa)
158+
159+
def convert_pod_anti_affinity(paa):
160+
if not paa:
161+
return None
162+
return k8s.V1PodAntiAffinity(**paa)
163+
164+
return k8s.V1Affinity(
165+
node_affinity=convert_node_affinity(affinity_dict.get("node_affinity")),
166+
pod_affinity=convert_pod_affinity(affinity_dict.get("pod_affinity")),
167+
pod_anti_affinity=convert_pod_anti_affinity(affinity_dict.get("pod_anti_affinity")),
168+
)
169+
170+
148171
def customize_op(op, image_pull_policy, run_config: RunConfig):
149172
op.container.set_image_pull_policy(image_pull_policy)
150173
if run_config.volume and run_config.volume.owner is not None:
@@ -162,6 +185,12 @@ def customize_op(op, image_pull_policy, run_config: RunConfig):
162185
for toleration in run_config.tolerations[op.name]:
163186
op.add_toleration(k8s.V1Toleration(**toleration.dict()))
164187

188+
if run_config.affinity.is_set_for(op.name):
189+
affinity_dict = run_config.affinity.get_for(op.name)
190+
if affinity_dict:
191+
affinity_obj = dict_to_v1affinity(affinity_dict)
192+
op.add_affinity(affinity_obj)
193+
165194
if extra_volumes := run_config.extra_volumes[op.name]:
166195
op.add_pvolumes({ev.mount_path: dsl.PipelineVolume(volume=ev.as_v1volume()) for ev in extra_volumes})
167196
return op

tests/test_config.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,23 @@
2222
size: 3Gi
2323
access_modes: [ReadWriteOnce]
2424
keep: True
25+
affinity:
26+
__default__:
27+
node_affinity:
28+
required_during_scheduling_ignored_during_execution:
29+
nodeSelectorTerms:
30+
- matchExpressions:
31+
- key: "default-key"
32+
operator: "In"
33+
values: ["default-value"]
34+
node1:
35+
node_affinity:
36+
required_during_scheduling_ignored_during_execution:
37+
nodeSelectorTerms:
38+
- matchExpressions:
39+
- key: "node1-key"
40+
operator: "In"
41+
values: ["node1-value"]
2542
"""
2643

2744

@@ -264,3 +281,48 @@ def test_retry_policy_no_default(self):
264281
)
265282

266283
self.assertIsNone(cfg.run_config.retry_policy["node2"])
284+
285+
def test_affinity_node_specific(self):
286+
cfg = PluginConfig(**yaml.safe_load(CONFIG_YAML))
287+
affinity = cfg.run_config.affinity.get_for("node1")
288+
assert affinity is not None
289+
assert "node_affinity" in affinity
290+
assert (
291+
affinity["node_affinity"]["required_during_scheduling_ignored_during_execution"]["nodeSelectorTerms"][0][
292+
"matchExpressions"
293+
][0]["key"]
294+
== "node1-key"
295+
)
296+
297+
def test_affinity_default(self):
298+
cfg = PluginConfig(**yaml.safe_load(CONFIG_YAML))
299+
affinity = cfg.run_config.affinity.get_for("node2")
300+
assert affinity is not None
301+
assert "node_affinity" in affinity
302+
assert (
303+
affinity["node_affinity"]["required_during_scheduling_ignored_during_execution"]["nodeSelectorTerms"][0][
304+
"matchExpressions"
305+
][0]["key"]
306+
== "default-key"
307+
)
308+
309+
def test_affinity_is_set_for(self):
310+
cfg = PluginConfig(**yaml.safe_load(CONFIG_YAML))
311+
assert cfg.run_config.affinity.is_set_for("node1")
312+
assert cfg.run_config.affinity.is_set_for("node2") # falls back to default
313+
# Should be False if neither node nor default is set
314+
empty_affinity = PluginConfig(
315+
**yaml.safe_load(
316+
"""
317+
host: https://example.com
318+
run_config:
319+
image: "gcr.io/project-image/test"
320+
image_pull_policy: "Always"
321+
experiment_name: "Test Experiment"
322+
run_name: "test run"
323+
scheduled_run_name: "scheduled run"
324+
affinity: {}
325+
"""
326+
)
327+
)
328+
assert not empty_affinity.run_config.affinity.is_set_for("unknown_node")

0 commit comments

Comments
 (0)