-
Notifications
You must be signed in to change notification settings - Fork 4.3k
fix: Redact SSO PII before deletion #38425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 13 commits
9a178e3
8d57698
2688ac8
ff4b57e
417aa3d
542b5be
1b46be6
74d655b
08b491f
bbb5643
07b82ff
15bcdc0
2a9fba8
dd7ac9c
5ca020f
cdb49a2
bd3c108
7528c08
2af3cb4
9a8ba84
a75fb7f
0cbee49
c902e56
5b3312e
9aa4192
36192df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,10 +6,14 @@ | |
| from completion.test_utils import CompletionWaffleTestMixin | ||
| from django.test import TestCase | ||
| from django.test.utils import override_settings | ||
| from social_django.models import UserSocialAuth | ||
|
|
||
| from common.djangoapps.student.models import CourseEnrollment | ||
| from common.djangoapps.student.tests.factories import UserFactory | ||
| from openedx.core.djangoapps.user_api.accounts.utils import retrieve_last_sitewide_block_completed | ||
| from openedx.core.djangoapps.user_api.accounts.signals import get_redacted_social_auth_uid | ||
| from openedx.core.djangoapps.user_api.accounts.utils import ( | ||
| retrieve_last_sitewide_block_completed, | ||
| ) | ||
| from openedx.core.djangolib.testing.utils import skip_unless_lms | ||
| from xmodule.modulestore.tests.django_utils import ( | ||
| SharedModuleStoreTestCase, # lint-amnesty, pylint: disable=wrong-import-order | ||
|
|
@@ -133,3 +137,142 @@ def test_retrieve_last_sitewide_block_completed(self): | |
| ) | ||
|
|
||
| assert empty_block_url is None | ||
|
|
||
|
|
||
| @skip_unless_lms | ||
| class RedactUserSocialAuthPIITest(TestCase): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Note: You have much of what you need, so hopefully this is minor refactoring and clean-up.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. signal tests moved to test_signals.py using mock.patch, and utils tests now call redact_and_delete_social_auth directly without any signal setup/teardown. |
||
| """ | ||
| Tests for SSO PII redaction before deletion. | ||
| """ | ||
|
|
||
| def setUp(self): | ||
| super().setUp() | ||
| self.user = UserFactory.create(username='testuser', email='testuser@example.com') | ||
|
|
||
| def create_social_auth(self, provider='google-oauth2', uid='user@example.com', extra_data=None): | ||
| """ | ||
| Helper method to create UserSocialAuth instances for testing. | ||
| """ | ||
| if extra_data is None: | ||
| extra_data = { | ||
| 'email': 'user@example.com', | ||
| 'name': 'Test User', | ||
| 'id': '123456789', | ||
| } | ||
| return UserSocialAuth.objects.create( | ||
| user=self.user, | ||
| provider=provider, | ||
| uid=uid, | ||
| extra_data=extra_data, | ||
| ) | ||
|
|
||
| def test_get_redacted_social_auth_uid_format(self): | ||
| """ | ||
| Test that get_redacted_social_auth_uid returns the expected string format. | ||
|
|
||
| This is the single source of truth for the redacted uid format. | ||
| If this test breaks, the bulk retirement Concat/Cast in utils.py and | ||
| retire_user.py must also be updated to match. | ||
| """ | ||
| assert get_redacted_social_auth_uid(42) == 'redacted-before-delete-42@safe.com' | ||
| assert get_redacted_social_auth_uid(1) == 'redacted-before-delete-1@safe.com' | ||
|
|
||
| def test_delete_redacts_user_social_auth_pii(self): | ||
| """ | ||
| Test that deleting social auth redacts uid and extra_data before removal. | ||
| """ | ||
| social_auth = self.create_social_auth() | ||
| social_auth_id = social_auth.id | ||
|
|
||
| captured_states = [] | ||
|
|
||
| def capture_state_before_delete(sender, instance, **kwargs): # pylint: disable=unused-argument | ||
| instance.refresh_from_db() | ||
| captured_states.append({ | ||
| 'id': instance.id, | ||
| 'uid': instance.uid, | ||
| 'extra_data': dict(instance.extra_data) if instance.extra_data else {}, | ||
| }) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think using a Also, If this were needed, you've got a lot of code redundancy. You could use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refactored to use CaptureQueriesContext to assert UPDATE precedes DELETE, and moved the signal tests to a new test_signals.py. |
||
|
|
||
| from django.db.models.signals import pre_delete | ||
|
robrap marked this conversation as resolved.
Outdated
|
||
|
|
||
| pre_delete.connect(capture_state_before_delete, sender=UserSocialAuth) | ||
| try: | ||
| social_auth.delete() | ||
| finally: | ||
| pre_delete.disconnect(capture_state_before_delete, sender=UserSocialAuth) | ||
|
|
||
| assert captured_states == [{ | ||
| 'id': social_auth_id, | ||
| 'uid': get_redacted_social_auth_uid(social_auth_id), | ||
| 'extra_data': {}, | ||
| }] | ||
| assert not UserSocialAuth.objects.filter(id=social_auth_id).exists() | ||
|
|
||
| def test_delete_already_redacted_user_social_auth_is_idempotent(self): | ||
| """ | ||
| Test that deleting an already redacted social auth keeps the redacted state. | ||
| """ | ||
| social_auth = self.create_social_auth() | ||
| social_auth.uid = get_redacted_social_auth_uid(social_auth.pk) | ||
| social_auth.extra_data = {} | ||
| social_auth.save(update_fields=['uid', 'extra_data']) | ||
| social_auth_id = social_auth.id | ||
|
|
||
| captured_states = [] | ||
|
|
||
| def capture_state_before_delete(sender, instance, **kwargs): # pylint: disable=unused-argument | ||
| instance.refresh_from_db() | ||
| captured_states.append((instance.uid, instance.extra_data)) | ||
|
|
||
| from django.db.models.signals import pre_delete | ||
|
|
||
| pre_delete.connect(capture_state_before_delete, sender=UserSocialAuth) | ||
| try: | ||
| social_auth.delete() | ||
| finally: | ||
| pre_delete.disconnect(capture_state_before_delete, sender=UserSocialAuth) | ||
|
|
||
| assert captured_states == [ | ||
| (get_redacted_social_auth_uid(social_auth_id), {}), | ||
| ] | ||
| assert not UserSocialAuth.objects.filter(id=social_auth_id).exists() | ||
|
|
||
| def test_delete_redacts_multiple_sso_providers(self): | ||
| """ | ||
| Test that deletion redacts multiple SSO providers before removal. | ||
| """ | ||
| auths = [ | ||
| self.create_social_auth( | ||
| provider='google-oauth2', | ||
| uid='google@example.com', | ||
| extra_data={'email': 'google@example.com', 'name': 'Google User'} | ||
| ), | ||
| self.create_social_auth( | ||
| provider='tpa-saml', | ||
| uid='saml@example.com', | ||
| extra_data={'email': 'saml@example.com', 'name': 'SAML User', 'uid': 'saml-uid'} | ||
| ), | ||
| ] | ||
| # Save IDs before deletion (they become None after delete) | ||
| auth_ids = [auth.pk for auth in auths] | ||
|
|
||
| captured_states = [] | ||
|
|
||
| def capture_state_before_delete(sender, instance, **kwargs): # pylint: disable=unused-argument | ||
| instance.refresh_from_db() | ||
| captured_states.append((instance.provider, instance.uid, instance.extra_data)) | ||
|
|
||
| from django.db.models.signals import pre_delete | ||
|
|
||
| pre_delete.connect(capture_state_before_delete, sender=UserSocialAuth) | ||
| try: | ||
| for auth in auths: | ||
| auth.delete() | ||
| finally: | ||
| pre_delete.disconnect(capture_state_before_delete, sender=UserSocialAuth) | ||
|
|
||
| assert sorted(captured_states) == sorted([ | ||
| ('google-oauth2', get_redacted_social_auth_uid(auth_ids[0]), {}), | ||
| ('tpa-saml', get_redacted_social_auth_uid(auth_ids[1]), {}), | ||
| ]) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,13 +9,16 @@ | |
| import pytest | ||
| from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user | ||
| from django.core.management import CommandError, call_command | ||
| from django.db.models.signals import pre_delete | ||
| from social_django.models import UserSocialAuth | ||
|
|
||
| from common.djangoapps.student.tests.factories import UserFactory # lint-amnesty, pylint: disable=wrong-import-order | ||
| from openedx.core.djangoapps.user_api.accounts.tests.retirement_helpers import ( # lint-amnesty, pylint: disable=unused-import, wrong-import-order | ||
| setup_retirement_states, # noqa: F401 | ||
| ) | ||
| from openedx.core.djangolib.testing.utils import skip_unless_lms # lint-amnesty, pylint: disable=wrong-import-order | ||
|
|
||
| from ...accounts.signals import get_redacted_social_auth_uid | ||
| from ...models import UserRetirementStatus | ||
|
|
||
| pytestmark = pytest.mark.django_db | ||
|
|
@@ -107,3 +110,99 @@ def test_retire_with_username_email_userfile(setup_retirement_states): # lint-a | |
| with pytest.raises(CommandError, match=r'You cannot use userfile option with username and user_email'): | ||
| call_command('retire_user', user_file=user_file, username=username, user_email=user_email) | ||
| remove_user_file() | ||
|
|
||
|
|
||
| @skip_unless_lms | ||
| def test_retire_user_redacts_sso_pii_before_deletion(setup_retirement_states): # lint-amnesty, pylint: disable=redefined-outer-name, unused-argument # noqa: F811 | ||
|
ktyagiapphelix2u marked this conversation as resolved.
Outdated
|
||
| """ | ||
| Test that SSO PII is redacted before UserSocialAuth records are deleted during retirement. | ||
|
|
||
| This test verifies the order of operations by capturing the record's state | ||
| at the moment of deletion to ensure it was already redacted. | ||
| """ | ||
| user = UserFactory.create(username='sso-user', email='sso-user@example.com') | ||
| social_auth = UserSocialAuth.objects.create( | ||
| user=user, | ||
| provider='google-oauth2', | ||
| uid='sso-user@example.com', | ||
| extra_data={ | ||
| 'email': 'sso-user@example.com', | ||
| 'name': 'SSO Test User', | ||
| 'id': '123456789', | ||
| } | ||
| ) | ||
| social_auth_id = social_auth.id | ||
|
|
||
| captured_states = [] | ||
|
|
||
| def capture_state_before_delete(sender, instance, **kwargs): # pylint: disable=unused-argument | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. switched to CaptureQueriesContext for UPDATE-before-DELETE assertions, and disconnected the safety-net pre_delete signal handler around both tests so they'd fail if retire_user itself stopped redacting. |
||
| """Capture the database state seen by the pre_delete signal.""" | ||
| instance.refresh_from_db() | ||
| captured_states.append({ | ||
| 'id': instance.id, | ||
| 'uid': instance.uid, | ||
| 'extra_data': dict(instance.extra_data) if instance.extra_data else {}, | ||
| }) | ||
|
|
||
| pre_delete.connect(capture_state_before_delete, sender=UserSocialAuth) | ||
| try: | ||
| call_command('retire_user', username=user.username, user_email=user.email) | ||
| finally: | ||
| pre_delete.disconnect(capture_state_before_delete, sender=UserSocialAuth) | ||
|
|
||
| # Verify that at the moment of deletion, the record was already redacted | ||
| assert captured_states == [{ | ||
| 'id': social_auth_id, | ||
| 'uid': get_redacted_social_auth_uid(social_auth_id), | ||
| 'extra_data': {}, | ||
| }], \ | ||
| "SSO records should be redacted before deletion" | ||
|
|
||
| # Verify deletion completed | ||
| assert not UserSocialAuth.objects.filter(id=social_auth_id).exists() | ||
|
|
||
| retired_user_status = UserRetirementStatus.objects.filter(original_username=user.username).first() | ||
| assert retired_user_status is not None | ||
| assert retired_user_status.original_email == 'sso-user@example.com' | ||
|
|
||
|
|
||
| @skip_unless_lms | ||
| def test_retire_user_redacts_each_social_auth_before_bulk_deletion(setup_retirement_states): # lint-amnesty, pylint: disable=redefined-outer-name, unused-argument # noqa: F811 | ||
| """ | ||
| Test that each UserSocialAuth record is redacted before bulk deletion during retirement. | ||
| """ | ||
| user = UserFactory.create(username='multi-sso-user', email='multi-sso@example.com') | ||
| google_auth = UserSocialAuth.objects.create( | ||
| user=user, | ||
| provider='google-oauth2', | ||
| uid='google-multi@example.com', | ||
| extra_data={'email': 'google-multi@example.com', 'name': 'Google User'} | ||
| ) | ||
| saml_auth = UserSocialAuth.objects.create( | ||
| user=user, | ||
| provider='tpa-saml', | ||
| uid='saml-multi@example.com', | ||
| extra_data={'email': 'saml-multi@example.com', 'name': 'SAML User', 'uid': 'saml-123'} | ||
| ) | ||
| # Save IDs before deletion (they become None after delete) | ||
| google_auth_id = google_auth.id | ||
| saml_auth_id = saml_auth.id | ||
|
|
||
| captured_states = [] | ||
|
|
||
| def capture_state_before_delete(sender, instance, **kwargs): # pylint: disable=unused-argument | ||
| """Capture the database state seen by the pre_delete signal.""" | ||
| instance.refresh_from_db() | ||
| extra = dict(instance.extra_data) if instance.extra_data else {} | ||
| captured_states.append((instance.provider, instance.uid, extra)) | ||
|
|
||
| pre_delete.connect(capture_state_before_delete, sender=UserSocialAuth) | ||
| try: | ||
| call_command('retire_user', username=user.username, user_email=user.email) | ||
| finally: | ||
| pre_delete.disconnect(capture_state_before_delete, sender=UserSocialAuth) | ||
|
|
||
| assert sorted(captured_states) == sorted([ | ||
| ('google-oauth2', get_redacted_social_auth_uid(google_auth_id), {}), | ||
| ('tpa-saml', get_redacted_social_auth_uid(saml_auth_id), {}), | ||
| ]) | ||
Uh oh!
There was an error while loading. Please reload this page.