Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix namespaces deletion handling #159

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/cluster-secret/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: cluster-secret
description: ClusterSecret Operator
kubeVersion: '>= 1.25.0-0'
type: application
version: 0.5.2
version: 0.5.3
icon: https://clustersecret.com/assets/csninjasmall.png
sources:
- https://github.com/zakkg3/ClusterSecret
Expand Down
129 changes: 54 additions & 75 deletions src/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
from kubernetes import client, config

from cache import Cache, MemoryCache
from kubernetes_utils import delete_secret, get_ns_list, sync_secret, patch_clustersecret_status, \
create_secret_metadata, secret_exists, get_custom_objects_by_kind
from kubernetes_utils import delete_secret, get_ns_list, sync_secret, patch_clustersecret_status, get_custom_objects_by_kind
from models import BaseClusterSecret

# In-memory dictionary for all ClusterSecrets in the Cluster. UID -> ClusterSecret Body
Expand Down Expand Up @@ -110,7 +109,6 @@ def on_field_data(
old: Dict[str, str],
new: Dict[str, str],
body: Dict[str, Any],
meta: kopf.Meta,
name: str,
uid: str,
logger: logging.Logger,
Expand All @@ -125,60 +123,20 @@ def on_field_data(
logger.debug(f'Updating Object body == {body}')
syncedns = body.get('status', {}).get('create_fn', {}).get('syncedns', [])

secret_type = body.get('type', 'Opaque')

cached_cluster_secret = csecs_cache.get_cluster_secret(uid)
if cached_cluster_secret is None:
logger.error('Received an event for an unknown ClusterSecret.')

updated_syncedns = syncedns.copy()
for ns in syncedns:
logger.info(f'Re Syncing secret {name} in ns {ns}')
ns_sec_body = client.V1Secret(
api_version='v1',
data={str(key): str(value) for key, value in new.items()},
kind='Secret',
metadata=create_secret_metadata(
name=name,
namespace=ns,
annotations={str(key): str(value) for key, value in meta.annotations.items()},
labels={str(key): str(value) for key, value in meta.labels.items()},
),
type=secret_type,
)
logger.debug(f'body: {ns_sec_body}')
# Ensuring the secret still exist.
if secret_exists(logger=logger, name=name, namespace=ns, v1=v1):
response = v1.replace_namespaced_secret(name=name, namespace=ns, body=ns_sec_body)
else:
try:
v1.read_namespace(name=ns)
except client.exceptions.ApiException as e:
if e.status != 404:
raise
response = f'Namespace {ns} not found'
updated_syncedns.remove(ns)
logger.info(f'Namespace {ns} not found while Syncing secret {name}')
else:
response = v1.create_namespaced_secret(namespace=ns, body=ns_sec_body)
logger.debug(response)

if updated_syncedns != syncedns:
# Patch synced_ns field
logger.debug(f'Patching clustersecret {name}')
body = patch_clustersecret_status(
logger=logger,
name=name,
new_status={'create_fn': {'syncedns': updated_syncedns}},
custom_objects_api=custom_objects_api,
)
sync_secret(logger, ns, body, v1)

# Updating the cache
csecs_cache.set_cluster_secret(BaseClusterSecret(
uid=uid,
name=name,
body=body,
synced_namespace=updated_syncedns,
synced_namespace=syncedns,
))


Expand Down Expand Up @@ -207,46 +165,67 @@ async def create_fn(
synced_namespace=matchedns,
))

# return for what ??? Should be deleted ??
# This return is mandatory! It's used to update the status of the CRD
# https://kopf.readthedocs.io/en/stable/results/
return {'syncedns': matchedns}


@kopf.on.create('', 'v1', 'namespaces')
async def namespace_watcher(logger: logging.Logger, meta: kopf.Meta, **_):
@kopf.on.delete('', 'v1', 'namespaces')
async def namespace_watcher(logger: logging.Logger, reason: kopf.Reason, meta: kopf.Meta, **_):
"""Watch for namespace events
"""
new_ns = meta.name
logger.debug(f'New namespace created: {new_ns} re-syncing')
ns_new_list = []
for cluster_secret in csecs_cache.all_cluster_secret():
obj_body = cluster_secret.body
name = cluster_secret.name

matcheddns = cluster_secret.synced_namespace

logger.debug(f'Old matched namespace: {matcheddns} - name: {name}')
ns_new_list = get_ns_list(logger, obj_body, v1)
logger.debug(f'new matched list: {ns_new_list}')
if new_ns in ns_new_list:
logger.debug(f'Cloning secret {name} into the new namespace {new_ns}')
if reason not in ["create", "delete"]:
logger.error(f'Function "namespace_watcher" was called with incorrect reason: {reason}')
return

ns_name = meta.name
logger.info(f'Namespace {"created" if reason == "create" else "deleted"}: {ns_name}. Re-syncing')

ns_list_new = []
for cached_cluster_secret in csecs_cache.all_cluster_secret():
body = cached_cluster_secret.body
name = cached_cluster_secret.name
ns_list_synced = cached_cluster_secret.synced_namespace
ns_list_new = get_ns_list(logger, body, v1)
ns_list_changed = False

logger.debug(f'ClusterSecret: {name}. Old matched namespaces: {ns_list_synced}')
logger.debug(f'ClusterSecret: {name}. New matched namespaces: {ns_list_new}')

if reason == "create" and ns_name in ns_list_new:
logger.info(f'Cloning secret {name} into the new namespace: {ns_name}')
sync_secret(
logger=logger,
namespace=new_ns,
body=obj_body,
namespace=ns_name,
body=body,
v1=v1,
)

# if there is a new matching ns, refresh cache
cluster_secret.synced_namespace = ns_new_list
csecs_cache.set_cluster_secret(cluster_secret)

# update ns_new_list on the object so then we also delete from there
patch_clustersecret_status(
logger=logger,
name=cluster_secret.name,
new_status={'create_fn': {'syncedns': ns_new_list}},
custom_objects_api=custom_objects_api,
)
ns_list_changed = True

if reason == "delete" and ns_name in ns_list_synced:
logger.info(f'Secret {name} removed from deleted namespace: {ns_name}')
# Ensure that deleted namespace will not come in new list - on moment when this event handled by kopf the namespace in kubernetes can still exists
if ns_name in ns_list_new:
ns_list_new.remove(ns_name)
ns_list_changed = True

# Update ClusterSecret only if there are changes in list of his namespaces
if ns_list_changed:
# Update in-memory cache
cached_cluster_secret.synced_namespace = ns_list_new
csecs_cache.set_cluster_secret(cached_cluster_secret)

# Update the list of synced namespaces in kubernetes object
logger.debug(f'Patching ClusterSecret: {name}')
patch_clustersecret_status(
logger=logger,
name=name,
new_status={'create_fn': {'syncedns': ns_list_new}},
custom_objects_api=custom_objects_api,
)
else:
logger.debug(f'There are no changes in the list of namespaces for ClusterSecret: {name}')


@kopf.on.startup()
Expand Down
45 changes: 18 additions & 27 deletions src/kubernetes_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def delete_secret(
logger.info(f'deleting secret {name} from namespace {namespace}')
try:
v1.delete_namespaced_secret(name, namespace)
except rest.ApiException as e:
except exceptions.ApiException as e:
if e.status == 404:
logger.warning(f'The namespace {namespace} may not exist anymore: Not found')
else:
Expand Down Expand Up @@ -172,6 +172,13 @@ def sync_secret(
annotations = cs_metadata.get('annotations', None)
labels = cs_metadata.get('labels', None)

try:
v1.read_namespace(name=namespace)
except exceptions.ApiException as e:
logger.debug(f'Namespace {namespace} not found while syncing secret {sec_name}. Never mind on this rare situation it will be handled in other place.')
if e.status == 404:
return

if 'data' not in body:
raise kopf.TemporaryError('Property data is missing.')

Expand Down Expand Up @@ -212,7 +219,7 @@ def sync_secret(
)
body.type = secret_type
body.data = data
logger.info(f'cloning secret in namespace {namespace}')
logger.info(f'Syncing secret {sec_name} in namespace {namespace}.')
logger.debug(f'V1Secret= {body}')

try:
Expand All @@ -221,42 +228,26 @@ def sync_secret(

# If nothing returned, the secret does not exist, creating it then
if metadata is None:
logger.info('Using create_namespaced_secret')
logger.info(f'Creating new secret {sec_name} in namespace {namespace}.')
logger.debug(f'response is {v1.create_namespaced_secret(namespace, body)}')
return

if metadata.annotations is None:
logger.info(
f'secret `{sec_name}` exist but it does not have annotations, so is not managed by ClusterSecret',
)
if metadata.annotations is None or metadata.annotations.get(CREATE_BY_ANNOTATION) is None:
logger.info(f'Secret {sec_name} already exist in namespace {namespace} and is not managed by ClusterSecret.')

# If we should not overwrite existing secrets
if not get_replace_existing():
logger.info(
f'secret `{sec_name}` will not be replaced. '
'You can enforce this by setting env REPLACE_EXISTING to true.',
)
logger.info(f'Secret {sec_name} in namespace {namespace} will not be replaced. You can enforce this by setting env REPLACE_EXISTING to true.')
return
elif metadata.annotations.get(CREATE_BY_ANNOTATION) is None:
logger.error(
f"secret `{sec_name}` already exist in namespace '{namespace}' and is not managed by ClusterSecret",
)

if not get_replace_existing():
logger.info(
f'secret `{sec_name}` will not be replaced. '
'You can enforce this by setting env REPLACE_EXISTING to true.',
)
return

logger.info(f'Replacing secret {sec_name}')
logger.info(f'Replacing secret {sec_name} in namespace {namespace}.')
v1.replace_namespaced_secret(
name=sec_name,
namespace=namespace,
body=body,
)
except rest.ApiException as e:
logger.error('Can not create a secret, it is base64 encoded? enable debug for details')
except exceptions.ApiException as e:
logger.error('Can not create a secret, it is base64 encoded? Enable debug for details.')
logger.debug(f'data: {data}')
logger.debug(f'Kube exception {e}')

Expand Down Expand Up @@ -349,6 +340,6 @@ def get_custom_objects_by_kind(
)

return custom_objects['items']
except rest.ApiException as e:
except exceptions.ApiException as e:
# Properly handle API exceptions
raise rest.ApiException(f'Error while retrieving custom objects: {e}')
raise exceptions.ApiException(f'Error while retrieving custom objects: {e}')
Loading