diff --git a/luigi/contrib/kubernetes.py b/luigi/contrib/kubernetes.py index 82dc769580..af37a9f96a 100644 --- a/luigi/contrib/kubernetes.py +++ b/luigi/contrib/kubernetes.py @@ -33,6 +33,7 @@ Written and maintained by Marco Capuccini (@mcapuccini). """ import logging +import os import time import uuid from datetime import datetime @@ -54,7 +55,7 @@ class kubernetes(luigi.Config): default="kubeconfig", description="Authorization method to access the cluster") kubeconfig_path = luigi.Parameter( - default="~/.kube/config", + default=os.getenv("KUBECONFIG", "~/.kube/config"), description="Path to kubeconfig file for cluster authentication") max_retrials = luigi.IntParameter( default=0, diff --git a/test/contrib/kubernetes_test.py b/test/contrib/kubernetes_test.py index 246ca777d6..4e95805f23 100644 --- a/test/contrib/kubernetes_test.py +++ b/test/contrib/kubernetes_test.py @@ -85,7 +85,7 @@ def test_success_job(self): def test_fail_job(self): fail = FailJob() - self.assertRaises(RuntimeError, fail.run) + self.assertRaises(AssertionError, fail.run) # Check for retrials kube_api = HTTPClient(KubeConfig.from_file("~/.kube/config")) # assumes minikube jobs = Job.objects(kube_api).filter(selector="luigi_task_id=" @@ -94,11 +94,14 @@ def test_fail_job(self): job = Job(kube_api, jobs.response["items"][0]) self.assertTrue("failed" in job.obj["status"]) self.assertTrue(job.obj["status"]["failed"] > fail.max_retrials) - self.assertTrue(job.obj['spec']['template']['metadata']['labels'] == fail.labels()) + self.assertTrue(job.obj['spec']['template']['metadata']['labels'].items() >= fail.labels.items()) + @mock.patch.object(KubernetesJobTask, "_KubernetesJobTask__verify_job_has_started") @mock.patch.object(KubernetesJobTask, "_KubernetesJobTask__get_job_status") @mock.patch.object(KubernetesJobTask, "signal_complete") - def test_output(self, mock_signal, mock_job_status): + def test_output(self, mock_signal, mock_job_status, mock_verify_job): + # mock that the job has started + mock_verify_job.return_value = True # mock that the job succeeded mock_job_status.return_value = "succeeded" # create a kubernetes job @@ -106,6 +109,7 @@ def test_output(self, mock_signal, mock_job_status): # set logger and uu_name due to logging in __track_job() kubernetes_job._KubernetesJobTask__logger = logger kubernetes_job.uu_name = "test" + kubernetes_job._KubernetesJobTask__print_kubectl_hints = mock.Mock() # track the job (bc included in run method) kubernetes_job._KubernetesJobTask__track_job() # Make sure successful job signals