Skip to content

Commit 8d1d7d2

Browse files
Add unit tests for ROSA cluster tagging username resolution
Cover CloudTrail fallback lookups, automation-user exclusion, strategy ordering, and ambiguous cluster role matching introduced in PR #1007. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 0e2fb2e commit 8d1d7d2

3 files changed

Lines changed: 256 additions & 0 deletions

File tree

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
from datetime import datetime, timezone
2+
from unittest.mock import Mock, patch
3+
4+
from cloud_governance.policy.policy_operations.aws.tag_cluster.tag_cluster_operations import TagClusterOperations
5+
6+
7+
def _make_tag_cluster_ops(**overrides):
8+
"""Build TagClusterOperations without calling __init__ (avoids real AWS calls)."""
9+
ops = TagClusterOperations.__new__(TagClusterOperations)
10+
ops.iam_users = overrides.get('iam_users', ['pragchau', 'cloud-governance-delete-user'])
11+
ops._automation_user = overrides.get('_automation_user', 'cloud-governance-user')
12+
ops.get_user_name_from_name_tag = Mock(return_value=overrides.get('name_tag_user'))
13+
ops._get_username_from_instance_id_and_time = Mock(
14+
return_value=overrides.get('instance_ct_user'))
15+
ops.cloudtrail = Mock()
16+
ops.cloudtrail.get_username_from_cluster_role = Mock(
17+
return_value=overrides.get('cluster_role_user', ''))
18+
ops._regional_cloudtrail = Mock()
19+
ops._regional_cloudtrail.get_username_from_resource_events = Mock(
20+
return_value=overrides.get('resource_events_user', ''))
21+
return ops
22+
23+
24+
class TestTagClusterOperationsGetUsername:
25+
LAUNCH_TIME = datetime(2026, 6, 30, 14, 6, 46, tzinfo=timezone.utc)
26+
RESOURCE_ID = 'i-0123456789abcdef0'
27+
CLUSTER_ID = 'z2r7p1f3o6m2h3y-t5bx8'
28+
29+
def test_returns_autoscaling_without_iam_validation(self):
30+
ops = _make_tag_cluster_ops(instance_ct_user='AutoScaling')
31+
result = ops.get_username(
32+
start_time=self.LAUNCH_TIME, resource_id=self.RESOURCE_ID,
33+
resource_type='AWS::EC2::Instance', tags=[], cluster_id=self.CLUSTER_ID)
34+
assert result == 'AutoScaling'
35+
36+
def test_skips_automation_user_from_instance_cloudtrail(self):
37+
ops = _make_tag_cluster_ops(
38+
instance_ct_user='cloud-governance-user',
39+
cluster_role_user='pragchau')
40+
result = ops.get_username(
41+
start_time=self.LAUNCH_TIME, resource_id=self.RESOURCE_ID,
42+
resource_type='AWS::EC2::Instance', tags=[], cluster_id=self.CLUSTER_ID)
43+
assert result == 'pragchau'
44+
ops.cloudtrail.get_username_from_cluster_role.assert_called_once()
45+
46+
def test_cluster_role_resolved_before_resource_events_fallback(self):
47+
ops = _make_tag_cluster_ops(
48+
instance_ct_user='178000000000',
49+
cluster_role_user='pragchau',
50+
resource_events_user='cloud-governance-delete-user')
51+
result = ops.get_username(
52+
start_time=self.LAUNCH_TIME, resource_id=self.RESOURCE_ID,
53+
resource_type='AWS::EC2::Instance', tags=[], cluster_id=self.CLUSTER_ID)
54+
assert result == 'pragchau'
55+
ops._regional_cloudtrail.get_username_from_resource_events.assert_not_called()
56+
57+
def test_resource_events_fallback_excludes_automation_user(self):
58+
ops = _make_tag_cluster_ops(
59+
instance_ct_user='178000000000',
60+
cluster_role_user='',
61+
resource_events_user='pragchau')
62+
result = ops.get_username(
63+
start_time=self.LAUNCH_TIME, resource_id=self.RESOURCE_ID,
64+
resource_type='AWS::EC2::Instance', tags=[], cluster_id=self.CLUSTER_ID)
65+
assert result == 'pragchau'
66+
call_kwargs = ops._regional_cloudtrail.get_username_from_resource_events.call_args.kwargs
67+
assert call_kwargs['exclude_users'] == {'cloud-governance-user'}
68+
69+
70+
class TestTagClusterOperationsClusterInstanceFallback:
71+
def test_get_username_from_cluster_instances_returns_tagged_peer(self):
72+
ops = TagClusterOperations.__new__(TagClusterOperations)
73+
ops.cluster_prefix = ['kubernetes.io/cluster/']
74+
ops.iam_users = ['pragchau']
75+
ops.ec2_operations = Mock()
76+
ops.ec2_operations.get_tag_value_from_tags = Mock(return_value='pragchau')
77+
resources = [[{
78+
'Tags': [
79+
{'Key': 'kubernetes.io/cluster/z2r7p1f3o6m2h3y-t5bx8', 'Value': 'owned'},
80+
{'Key': 'User', 'Value': 'pragchau'},
81+
]
82+
}]]
83+
result = ops.get_username_from_cluster_instances(
84+
resources=resources, cluster_name='z2r7p1f3o6m2h3y-t5bx8')
85+
assert result == 'pragchau'
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from datetime import datetime, timezone
2+
from unittest.mock import Mock
3+
4+
from cloud_governance.policy.policy_operations.aws.tag_non_cluster.non_cluster_operations import NonClusterOperations
5+
6+
7+
def _make_non_cluster_ops(**overrides):
8+
ops = NonClusterOperations.__new__(NonClusterOperations)
9+
ops.iam_users = overrides.get('iam_users', ['pragchau', 'cloud-governance-delete-user'])
10+
ops._automation_user = overrides.get('_automation_user', 'cloud-governance-user')
11+
ops.get_user_name_from_name_tag = Mock(return_value=overrides.get('name_tag_user'))
12+
ops._get_username_from_cloudtrail = Mock(return_value=overrides.get('cloudtrail_user'))
13+
ops.cloudtrail = Mock()
14+
ops.cloudtrail.get_username_from_resource_events = Mock(
15+
return_value=overrides.get('resource_events_user', ''))
16+
return ops
17+
18+
19+
class TestNonClusterOperationsGetUsername:
20+
START_TIME = datetime(2026, 6, 30, 14, 0, 0, tzinfo=timezone.utc)
21+
22+
def test_excludes_automation_user_from_primary_cloudtrail(self):
23+
ops = _make_non_cluster_ops(cloudtrail_user='cloud-governance-user')
24+
result = ops.get_username(
25+
start_time=self.START_TIME, resource_id='vol-abc123',
26+
resource_type='AWS::EC2::Volume', tags=[])
27+
assert result == ''
28+
ops.cloudtrail.get_username_from_resource_events.assert_called_once()
29+
30+
def test_returns_valid_user_from_primary_cloudtrail(self):
31+
ops = _make_non_cluster_ops(cloudtrail_user='pragchau')
32+
result = ops.get_username(
33+
start_time=self.START_TIME, resource_id='vol-abc123',
34+
resource_type='AWS::EC2::Volume', tags=[])
35+
assert result == 'pragchau'
36+
ops.cloudtrail.get_username_from_resource_events.assert_not_called()
37+
38+
def test_resource_events_fallback_excludes_automation_user(self):
39+
ops = _make_non_cluster_ops(
40+
cloudtrail_user='',
41+
resource_events_user='pragchau')
42+
result = ops.get_username(
43+
start_time=self.START_TIME, resource_id='vol-abc123',
44+
resource_type='AWS::EC2::Volume', tags=[])
45+
assert result == 'pragchau'
46+
call_kwargs = ops.cloudtrail.get_username_from_resource_events.call_args.kwargs
47+
assert call_kwargs['exclude_users'] == {'cloud-governance-user'}

tests/unittest/cloud_governance/common/clouds/aws/cloudtrail/test_cloudtrail_operations.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
from datetime import datetime, timezone
23
from unittest.mock import Mock, patch
34

45
import pytest
@@ -349,3 +350,126 @@ def test_arn_parsing_different_session_formats(self, mock_boto3):
349350
for arn, expected_username in test_cases:
350351
session_name = arn.split('/')[-1]
351352
assert session_name == expected_username, f"Failed to parse {arn}"
353+
354+
355+
class TestCloudTrailOperationsRosaTagging:
356+
"""Tests for ROSA cluster ownership resolution added in PR #1007."""
357+
358+
LAUNCH_TIME = datetime(2026, 6, 30, 14, 6, 46, tzinfo=timezone.utc)
359+
IAM_USERS = ['pragchau', 'cloud-governance-delete-user']
360+
361+
@patch('cloud_governance.common.clouds.aws.cloudtrail.cloudtrail_operations.boto3')
362+
def test_get_username_from_resource_events_skips_excluded_users(self, mock_boto3):
363+
cloudtrail_ops = CloudTrailOperations(region_name='us-east-1')
364+
cloudtrail_ops.get_full_responses = Mock(return_value=[
365+
{
366+
'EventName': 'CreateTags',
367+
'Username': 'cloud-governance-delete-user',
368+
'CloudTrailEvent': '{}',
369+
},
370+
{
371+
'EventName': 'CreateTags',
372+
'Username': 'pragchau',
373+
'CloudTrailEvent': '{}',
374+
},
375+
])
376+
result = cloudtrail_ops.get_username_from_resource_events(
377+
resource_id='i-0123456789abcdef0',
378+
iam_users=self.IAM_USERS,
379+
start_time=self.LAUNCH_TIME,
380+
exclude_users={'cloud-governance-delete-user'})
381+
assert result == 'pragchau'
382+
383+
@patch('cloud_governance.common.clouds.aws.cloudtrail.cloudtrail_operations.boto3')
384+
def test_get_username_from_cluster_role_direct_match(self, mock_boto3):
385+
mock_iam = Mock()
386+
mock_paginator = Mock()
387+
mock_paginator.paginate.return_value = [{
388+
'Roles': [{
389+
'RoleName': 'z2r7p1f3o6m2h3y-t5bx8-master-role',
390+
'Arn': 'arn:aws:iam::123456789012:role/z2r7p1f3o6m2h3y-t5bx8-master-role',
391+
'CreateDate': self.LAUNCH_TIME,
392+
}]
393+
}]
394+
mock_iam.get_paginator.return_value = mock_paginator
395+
mock_boto3.client.side_effect = lambda service, **kwargs: {
396+
'cloudtrail': Mock(),
397+
'iam': mock_iam,
398+
}[service]
399+
400+
cloudtrail_ops = CloudTrailOperations(region_name='us-east-1')
401+
cloudtrail_ops._CloudTrailOperations__get_username_from_role_cloudtrail = Mock(
402+
return_value='pragchau')
403+
404+
result = cloudtrail_ops.get_username_from_cluster_role(
405+
cluster_id='z2r7p1f3o6m2h3y-t5bx8',
406+
iam_users=self.IAM_USERS,
407+
launch_time=self.LAUNCH_TIME)
408+
assert result == 'pragchau'
409+
410+
@patch('cloud_governance.common.clouds.aws.cloudtrail.cloudtrail_operations.boto3')
411+
def test_get_username_from_cluster_role_ambiguous_temporal_match_returns_empty(
412+
self, mock_boto3):
413+
role_create = datetime(2026, 6, 30, 13, 0, 0, tzinfo=timezone.utc)
414+
mock_iam = Mock()
415+
mock_paginator = Mock()
416+
mock_paginator.paginate.return_value = [{
417+
'Roles': [
418+
{
419+
'RoleName': 'cluster-a-openshift-machine-api',
420+
'Arn': 'arn:aws:iam::123456789012:role/cluster-a-openshift-machine-api',
421+
'CreateDate': role_create,
422+
},
423+
{
424+
'RoleName': 'cluster-b-openshift-machine-api',
425+
'Arn': 'arn:aws:iam::123456789012:role/cluster-b-openshift-machine-api',
426+
'CreateDate': role_create,
427+
},
428+
]
429+
}]
430+
mock_iam.get_paginator.return_value = mock_paginator
431+
mock_boto3.client.side_effect = lambda service, **kwargs: {
432+
'cloudtrail': Mock(),
433+
'iam': mock_iam,
434+
}[service]
435+
436+
cloudtrail_ops = CloudTrailOperations(region_name='us-east-1')
437+
cloudtrail_ops._CloudTrailOperations__get_username_from_role_cloudtrail = Mock(
438+
side_effect=['pragchau', 'otheruser'])
439+
cloudtrail_ops._CloudTrailOperations__get_username_from_create_role_events = Mock(
440+
return_value='')
441+
442+
result = cloudtrail_ops.get_username_from_cluster_role(
443+
cluster_id='unknown-infra-id',
444+
iam_users=self.IAM_USERS + ['otheruser'],
445+
launch_time=self.LAUNCH_TIME)
446+
assert result == ''
447+
cloudtrail_ops._CloudTrailOperations__get_username_from_create_role_events.assert_called_once()
448+
449+
@patch('cloud_governance.common.clouds.aws.cloudtrail.cloudtrail_operations.boto3')
450+
def test_get_username_from_create_role_events_paginates_global_cloudtrail(
451+
self, mock_boto3):
452+
mock_global_ct = Mock()
453+
mock_global_ct.lookup_events.side_effect = [
454+
{'Events': [], 'NextToken': 'page2'},
455+
{'Events': [{
456+
'EventName': 'CreateRole',
457+
'Username': 'pragchau',
458+
'Resources': [{
459+
'ResourceType': 'AWS::IAM::Role',
460+
'ResourceName': 'rosa-test-openshift-machine-api',
461+
}],
462+
'CloudTrailEvent': '{}',
463+
}]},
464+
]
465+
mock_boto3.client.return_value = Mock()
466+
467+
cloudtrail_ops = CloudTrailOperations(region_name='us-west-2')
468+
cloudtrail_ops._CloudTrailOperations__global_cloudtrail = mock_global_ct
469+
470+
start = datetime(2026, 6, 30, 8, 0, 0, tzinfo=timezone.utc)
471+
end = datetime(2026, 6, 30, 14, 0, 0, tzinfo=timezone.utc)
472+
result = cloudtrail_ops._CloudTrailOperations__get_username_from_create_role_events(
473+
start_time=start, end_time=end, iam_users=self.IAM_USERS)
474+
assert result == 'pragchau'
475+
assert mock_global_ct.lookup_events.call_count == 2

0 commit comments

Comments
 (0)