-
Notifications
You must be signed in to change notification settings - Fork 234
External Cluster Environments #1244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 21 commits
49985f5
d491508
686ed39
7e90611
2daf2cc
ff5aab5
20a5c56
0d43bec
875fafa
5d1d732
19f92c7
e0df4e2
5e23382
c9b90ce
0e0ae6f
9a9416e
9ba9ab7
0a4c6fb
d8b55b6
94ab66e
3514129
4fbe8b7
51256b9
8107dac
26cba1a
55065ca
3282e14
047538e
7c06f2b
c909106
a1a9e46
8554343
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
"""Instantiates a static global factory and a single atomic client""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is more about the introduction of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, I think that's fine, I can move them into processproxies and go from there |
||
from enterprise_gateway.services.external.k8s_client_factory import KubernetesClientFactory | ||
|
||
KUBERNETES_CLIENT_FACTORY = KubernetesClientFactory() | ||
kubernetes_client = KUBERNETES_CLIENT_FACTORY.get_kubernetes_client() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
"""Contains factory to create kubernetes api client instances using a single confguration""" | ||
import os | ||
|
||
from kubernetes import client, config | ||
from traitlets.config import SingletonConfigurable | ||
|
||
from ..utils.envutils import is_env_true | ||
|
||
|
||
class KubernetesClientFactory(SingletonConfigurable): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I love this! Thank you for making this better! |
||
"""Manages kubernetes client creation from environment variables""" | ||
|
||
def __init__(self) -> None: | ||
"""Maintain a single configuration object and populate based on environment""" | ||
super().__init__() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't seem necessary at the moment and would recommend its removal unless we find it necessary. Sometimes the linting can demand this kind of thing (which is annoying). |
||
|
||
def get_kubernetes_client(self, get_remote_client: bool = True) -> client.ApiClient: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't |
||
"""Get kubernetes api client with appropriate configuration | ||
|
||
Args: | ||
get_remote_client (bool): Return a client for the remote cluster if configured. Else, return incluster config. Defaults to True. | ||
|
||
Returns: | ||
ApiClient: Kubernetes API client for appropriate cluster | ||
""" | ||
kubernetes_config: client.Configuration = client.Configuration() | ||
if os.getenv("KUBERNETES_SERVICE_HOST"): | ||
# Running inside cluster | ||
if is_env_true('EG_USE_REMOTE_CLUSTER') and get_remote_client: | ||
kubeconfig_path = os.getenv( | ||
'EG_REMOTE_CLUSTER_KUBECONFIG_PATH', '/etc/kube/config/kubeconfig' | ||
) | ||
context = os.getenv('EG_REMOTE_CLUSTER_CONTEXT', None) | ||
config.load_kube_config( | ||
client_configuration=kubernetes_config, | ||
config_file=kubeconfig_path, | ||
context=context, | ||
) | ||
else: | ||
config.load_incluster_config(client_configuration=kubernetes_config) | ||
else: | ||
config.load_kube_config(client_configuration=kubernetes_config) | ||
|
||
self.log.debug(f"Created kubernetes client for host {kubernetes_config.host}") | ||
return client.ApiClient(kubernetes_config) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,12 @@ | |
|
||
from __future__ import annotations | ||
|
||
import os | ||
from typing import Any | ||
|
||
from kubernetes import client | ||
|
||
from ..external.k8s_client import kubernetes_client | ||
from ..kernels.remotemanager import RemoteKernelManager | ||
from .k8s import KubernetesProcessProxy | ||
|
||
|
@@ -37,6 +39,13 @@ async def launch_process( | |
kwargs["env"]["KERNEL_CRD_VERSION"] = self.version | ||
kwargs["env"]["KERNEL_CRD_PLURAL"] = self.plural | ||
|
||
use_remote_cluster = os.getenv("EG_USE_REMOTE_CLUSTER") | ||
Shrinjay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if use_remote_cluster: | ||
kwargs["env"]["EG_USE_REMOTE_CLUSTER"] = 'true' | ||
kwargs["env"]["EG_REMOTE_CLUSTER_KUBECONFIG_PATH"] = os.getenv( | ||
"EG_REMOTE_CLUSTER_KUBECONFIG_PATH" | ||
) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm curious why this code is in |
||
await super().launch_process(kernel_cmd, **kwargs) | ||
return self | ||
|
||
|
@@ -48,7 +57,9 @@ def delete_managed_object(self, termination_stati: list[str]) -> bool: | |
|
||
Note: the caller is responsible for handling exceptions. | ||
""" | ||
delete_status = client.CustomObjectsApi().delete_namespaced_custom_object( | ||
delete_status = client.CustomObjectsApi( | ||
api_client=kubernetes_client | ||
).delete_namespaced_custom_object( | ||
self.group, | ||
self.version, | ||
self.kernel_namespace, | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -10,10 +10,14 @@ | |||||
from typing import Any | ||||||
|
||||||
import urllib3 | ||||||
from kubernetes import client, config | ||||||
import yaml | ||||||
from kubernetes import client | ||||||
from kubernetes.utils.create_from_yaml import create_from_yaml_single_item | ||||||
|
||||||
from ..external.k8s_client import kubernetes_client | ||||||
from ..kernels.remotemanager import RemoteKernelManager | ||||||
from ..sessions.kernelsessionmanager import KernelSessionManager | ||||||
from ..utils.envutils import is_env_true | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we refactor this filename to include an underscore for word separation?
Suggested change
|
||||||
from .container import ContainerProcessProxy | ||||||
|
||||||
urllib3.disable_warnings() | ||||||
|
@@ -29,8 +33,6 @@ | |||||
share_gateway_namespace = bool(os.environ.get("EG_SHARED_NAMESPACE", "False").lower() == "true") | ||||||
kpt_dir = os.environ.get("EG_POD_TEMPLATE_DIR", "/tmp") # noqa | ||||||
|
||||||
config.load_incluster_config() | ||||||
|
||||||
|
||||||
class KubernetesProcessProxy(ContainerProcessProxy): | ||||||
""" | ||||||
|
@@ -81,7 +83,7 @@ def get_container_status(self, iteration: int | None) -> str | None: | |||||
# is used for the assigned_ip. | ||||||
pod_status = None | ||||||
kernel_label_selector = "kernel_id=" + self.kernel_id + ",component=kernel" | ||||||
ret = client.CoreV1Api().list_namespaced_pod( | ||||||
ret = client.CoreV1Api(api_client=kubernetes_client).list_namespaced_pod( | ||||||
namespace=self.kernel_namespace, label_selector=kernel_label_selector | ||||||
) | ||||||
if ret and ret.items: | ||||||
|
@@ -117,7 +119,7 @@ def delete_managed_object(self, termination_stati: list[str]) -> bool: | |||||
# Deleting a Pod will return a v1.Pod if found and its status will be a PodStatus containing | ||||||
# a phase string property | ||||||
# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.21/#podstatus-v1-core | ||||||
v1_pod = client.CoreV1Api().delete_namespaced_pod( | ||||||
v1_pod = client.CoreV1Api(api_client=kubernetes_client).delete_namespaced_pod( | ||||||
namespace=self.kernel_namespace, body=body, name=self.container_name | ||||||
) | ||||||
status = None | ||||||
|
@@ -164,7 +166,7 @@ def terminate_container_resources(self) -> bool | None: # noqa | |||||
body = client.V1DeleteOptions( | ||||||
grace_period_seconds=0, propagation_policy="Background" | ||||||
) | ||||||
v1_status = client.CoreV1Api().delete_namespace( | ||||||
v1_status = client.CoreV1Api(api_client=kubernetes_client).delete_namespace( | ||||||
name=self.kernel_namespace, body=body | ||||||
) | ||||||
status = None | ||||||
|
@@ -227,7 +229,6 @@ def _determine_kernel_pod_name(self, **kwargs: dict[str, Any] | None) -> str: | |||||
return pod_name | ||||||
|
||||||
def _determine_kernel_namespace(self, **kwargs: dict[str, Any] | None) -> str: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (The following applies to lines L240-L254 below...)
((Just to note, in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
|
||||||
# Since we need the service account name regardless of whether we're creating the namespace or not, | ||||||
# get it now. | ||||||
service_account_name = KubernetesProcessProxy._determine_kernel_service_account_name( | ||||||
|
@@ -280,10 +281,17 @@ def _create_kernel_namespace(self, service_account_name: str) -> str: | |||||
|
||||||
# create the namespace | ||||||
try: | ||||||
client.CoreV1Api().create_namespace(body=body) | ||||||
client.CoreV1Api(api_client=kubernetes_client).create_namespace(body=body) | ||||||
self.delete_kernel_namespace = True | ||||||
self.log.info(f"Created kernel namespace: {namespace}") | ||||||
|
||||||
# If remote cluster is being used, service account may not be present, create before role binding | ||||||
# If creating service account is disabled, operator must manually create svc account | ||||||
if is_env_true('EG_USE_REMOTE_CLUSTER') and os.getenv('EG_CREATE_REMOTE_SVC_ACCOUNT'): | ||||||
Shrinjay marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
self._create_service_account_if_not_exists( | ||||||
namespace=namespace, service_account_name=service_account_name | ||||||
) | ||||||
|
||||||
# Now create a RoleBinding for this namespace for the default ServiceAccount. We'll reference | ||||||
# the ClusterRole, but that will only be applied for this namespace. This prevents the need for | ||||||
# creating a role each time. | ||||||
|
@@ -307,14 +315,66 @@ def _create_kernel_namespace(self, service_account_name: str) -> str: | |||||
body = client.V1DeleteOptions( | ||||||
grace_period_seconds=0, propagation_policy="Background" | ||||||
) | ||||||
client.CoreV1Api().delete_namespace(name=namespace, body=body) | ||||||
client.CoreV1Api(api_client=kubernetes_client).delete_namespace( | ||||||
name=namespace, body=body | ||||||
) | ||||||
self.log.warning(f"Deleted kernel namespace: {namespace}") | ||||||
else: | ||||||
reason = f"Error occurred creating namespace '{namespace}': {err}" | ||||||
self.log_and_raise(http_status_code=500, reason=reason) | ||||||
|
||||||
return namespace | ||||||
|
||||||
def _create_service_account_if_not_exists( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is strictly for external clusters, could we rename this to something like: |
||||||
self, namespace: str, service_account_name: str | ||||||
) -> None: | ||||||
"""If service account doesn't exist in target cluster, create one. Occurs if a remote cluster is being used.""" | ||||||
service_account_list_in_namespace: client.V1ServiceAccountList = client.CoreV1Api( | ||||||
api_client=kubernetes_client | ||||||
).list_namespaced_service_account(namespace=namespace) | ||||||
|
||||||
service_accounts_in_namespace: list[ | ||||||
client.V1ServiceAccount | ||||||
] = service_account_list_in_namespace.items | ||||||
service_account_names_in_namespace: list[str] = [ | ||||||
svcaccount.metadata.name for svcaccount in service_accounts_in_namespace | ||||||
] | ||||||
|
||||||
if service_account_name not in service_account_names_in_namespace: | ||||||
service_account_metadata = {"name": service_account_name} | ||||||
service_account_to_create: client.V1ServiceAccount = client.V1ServiceAccount( | ||||||
kind="ServiceAccount", metadata=service_account_metadata | ||||||
) | ||||||
|
||||||
client.CoreV1Api(api_client=kubernetes_client).create_namespaced_service_account( | ||||||
namespace=namespace, body=service_account_to_create | ||||||
) | ||||||
|
||||||
self.log.info( | ||||||
f"Created service account {service_account_name} in namespace {namespace}" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any way to access the "name" of the external cluster? Seems really helpful to include that here.
Suggested change
|
||||||
) | ||||||
|
||||||
def _create_role_if_not_exists(self, namespace: str) -> None: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A similar name change to that suggested previously would be nice here. |
||||||
"""If role doesn't exist in target cluster, create one. Occurs if a remote cluster is being used""" | ||||||
role_yaml_path = os.getenv('EG_REMOTE_CLUSTER_ROLE_PATH') | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should This line (and any validation) should be moved within the |
||||||
|
||||||
# Get Roles in remote cluster | ||||||
remote_cluster_roles: client.V1RoleList = client.RbacAuthorizationV1Api( | ||||||
api_client=kubernetes_client | ||||||
).list_namespaced_role(namespace=namespace) | ||||||
remote_cluster_role_names = [role.metadata.name for role in remote_cluster_roles.items] | ||||||
|
||||||
# If the kernel Role does not exist in the remote cluster. | ||||||
if kernel_cluster_role not in remote_cluster_role_names: | ||||||
with open(role_yaml_path) as f: | ||||||
role_yaml = yaml.safe_load(f) | ||||||
role_yaml["metadata"]["namespace"] = namespace | ||||||
create_from_yaml_single_item(yml_object=role_yaml, k8s_client=kubernetes_client) | ||||||
|
||||||
self.log.info(f"Created role {kernel_cluster_role} in namespace {namespace}") | ||||||
else: | ||||||
self.log.info(f"Found role {kernel_cluster_role} in namespace {namespace}") | ||||||
|
||||||
def _create_role_binding(self, namespace: str, service_account_name: str) -> None: | ||||||
# Creates RoleBinding instance for the given namespace. The role used will be the ClusterRole named by | ||||||
# EG_KERNEL_CLUSTER_ROLE. | ||||||
|
@@ -327,9 +387,17 @@ def _create_role_binding(self, namespace: str, service_account_name: str) -> Non | |||||
role_binding_name = kernel_cluster_role # use same name for binding as cluster role | ||||||
labels = {"app": "enterprise-gateway", "component": "kernel", "kernel_id": self.kernel_id} | ||||||
binding_metadata = client.V1ObjectMeta(name=role_binding_name, labels=labels) | ||||||
binding_role_ref = client.V1RoleRef( | ||||||
api_group="", kind="ClusterRole", name=kernel_cluster_role | ||||||
) | ||||||
|
||||||
# If remote cluster is used, we need to create a role on that cluster | ||||||
if is_env_true('EG_USE_REMOTE_CLUSTER'): | ||||||
self._create_role_if_not_exists(namespace=namespace) | ||||||
# We use namespaced roles on remote clusters rather than a ClusterRole | ||||||
binding_role_ref = client.V1RoleRef(api_group="", kind="Role", name=kernel_cluster_role) | ||||||
else: | ||||||
binding_role_ref = client.V1RoleRef( | ||||||
api_group="", kind="ClusterRole", name=kernel_cluster_role | ||||||
) | ||||||
|
||||||
binding_subjects = client.V1Subject( | ||||||
api_group="", kind="ServiceAccount", name=service_account_name, namespace=namespace | ||||||
) | ||||||
|
@@ -341,7 +409,7 @@ def _create_role_binding(self, namespace: str, service_account_name: str) -> Non | |||||
subjects=[binding_subjects], | ||||||
) | ||||||
|
||||||
client.RbacAuthorizationV1Api().create_namespaced_role_binding( | ||||||
client.RbacAuthorizationV1Api(api_client=kubernetes_client).create_namespaced_role_binding( | ||||||
namespace=namespace, body=body | ||||||
) | ||||||
self.log.info( | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
"""""Utilities to make checking environment variables easier""" | ||
import os | ||
|
||
|
||
def is_env_true(env_variable_name: str) -> bool: | ||
"""If environment variable is set and value is "TRUE" or "true", then return true. Else return false""" | ||
Shrinjay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return bool(os.getenv(env_variable_name, "False").lower() == "true") |
Uh oh!
There was an error while loading. Please reload this page.