From a456ab01db2313cc91c2f69b4aea5ea4add4c19e Mon Sep 17 00:00:00 2001 From: Nicola Date: Fri, 7 Feb 2025 17:44:43 +0100 Subject: [PATCH] cli: add user deletion --- invenio_app_rdm/cli.py | 78 ++++++++++++++++++++++++++++++++++++ tests/test_cli.py | 90 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 tests/test_cli.py diff --git a/invenio_app_rdm/cli.py b/invenio_app_rdm/cli.py index 80061c5b2..4b285ce3d 100644 --- a/invenio_app_rdm/cli.py +++ b/invenio_app_rdm/cli.py @@ -10,6 +10,7 @@ import click from flask.cli import with_appcontext from invenio_access.permissions import system_identity +from invenio_accounts.cli import users as cli_users from invenio_records_resources.proxies import current_service_registry from .fixtures import FixturesEngine, Pages @@ -93,3 +94,80 @@ def rebuild_all_indices(order): else: # success click.secho("Done.", fg="green") + + +@cli_users.command("delete") +@click.argument("user_id") +def user_delete(user_id): + """Delete a user, checking first the existence of RDM records.""" + from invenio_accounts.proxies import current_datastore + + user = current_datastore.get_user(user_id) + if not user: + click.secho(f"User {user_id} not found.", fg="red") + return + + if not click.confirm( + f'Are you sure you want to delete user "{user.id}/{user.username}/{user.email}"?' + ): + click.secho("Aborted.", fg="red") + return + + from invenio_access.utils import get_identity + + idty = get_identity(user) + + from invenio_rdm_records.proxies import current_rdm_records_service + + drafts = current_rdm_records_service.search_drafts(idty) + if drafts.total: + click.secho(f"Aborted. User still has {drafts.total} drafts", fg="red") + return + + records = current_rdm_records_service.search(idty) + if records.total: + click.secho(f"Aborted. User still has {records.total} records", fg="red") + return + + from invenio_communities.proxies import current_communities + + communities = current_communities.service.members.read_memberships(idty)[ + "memberships" + ] + if communities: + click.secho(f"Aborted. User part of communities: {communities}", fg="red") + return + + from invenio_requests import current_requests_service + + requests = current_requests_service.search_user_requests(idty) + if requests: + click.secho(f"Aborted. User part of communities: {communities}", fg="red") + return + + from invenio_accounts.models import ( + LoginInformation, + SessionActivity, + User, + userrole, + ) + from invenio_db import db + from invenio_oauthclient.models import RemoteAccount, RemoteToken, UserIdentity + from invenio_users_resources.proxies import current_users_service + + with db.session.begin_nested(): + d = db.session.query(userrole).filter(userrole.c.user_id == user_id) + d.delete(synchronize_session=False) + SessionActivity.query.filter(SessionActivity.user_id == user_id).delete() + UserIdentity.query.filter(UserIdentity.id_user == user_id).delete() + ra = RemoteAccount.query.filter(RemoteAccount.user_id == user_id).one_or_none() + if ra: + RemoteToken.query.filter(RemoteToken.id_remote_account == ra.id).delete() + ra.delete() + + LoginInformation.query.filter(LoginInformation.user_id == user_id).delete() + User.query.filter(User.id == user_id).delete() + + db.session.commit() + current_users_service.indexer.delete_by_id(user_id) + click.secho(f"Successfully deleted user.", fg="green") diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 000000000..88b5bb16d --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2025 CERN. +# +# Invenio App RDM is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""Test invenio-app-rdm CLI.""" + +import pytest +from invenio_access.utils import get_identity +from invenio_rdm_records.proxies import current_rdm_records_service + + +@pytest.fixture() +def user1_idty(users): + """Return user and identity.""" + user = users["user1"] + return user, get_identity(user) + + +def test_user_delete_non_existent(app, users, cli_runner): + """Test deleting a non-existent user.""" + result = cli_runner("users", "delete", "999") + assert "User 999 not found." in result.output + + +def test_user_delete_with_drafts(app, user1_idty, minimal_record, cli_runner): + """Test deleting user with drafts fails.""" + user, idty = user1_idty + # Create draft for user + current_rdm_records_service.create(idty, minimal_record) + + result = cli_runner("users", "delete", str(user.id)) + assert "Aborted. User still has 1 drafts" in result.output + + +# def test_user_delete_with_records(app, user1_idty, cli_runner, users): +# """Test deleting user with published records fails.""" +# user = users[0] +# # Create and publish record for user +# service = current_rdm_records_service +# draft = service.create(get_identity(user), {}) +# service.publish(get_identity(user), draft.id) + +# result = cli_runner("users", "delete", str(user.id)) +# assert "Aborted. User still has 1 records" in result.output + +# def test_user_delete_with_community(app, user1_idty, cli_runner, users): +# """Test deleting user that belongs to a community fails.""" +# user = users[0] +# # Add user to community +# community = current_communities.service.create(system_identity, { +# "title": "Test Community", +# "type": "organization" +# }) +# current_communities.service.members.add(system_identity, community.id, +# [{"user": user.id, "role": "member"}]) + +# result = cli_runner("users", "delete", str(user.id)) +# assert "Aborted. User part of communities:" in result.output + +# def test_user_delete_with_request(app, user1_idty, cli_runner, users): +# """Test deleting user with pending requests fails.""" +# user = users[0] +# # Create request for user +# current_requests_service.create(get_identity(user), { +# "title": "Test request", +# "type": "user-request" +# }) + +# result = cli_runner("users", "delete", str(user.id)) +# assert "Aborted. User part of communities:" in result.output + +# def test_user_delete_user_declines(app, user1_idty, cli_runner, users): +# """Test user deletion when user declines confirmation.""" +# user = users[0] +# result = cli_runner("users", "delete", str(user.id), input="n\n") +# assert "Aborted." in result.output +# # Verify user still exists +# assert current_datastore.get_user(user.id) + +# def test_user_delete_success(app, user1_idty, cli_runner, users): +# """Test successful user deletion.""" +# user = users[0] +# result = cli_runner("users", "delete", str(user.id), input="y\n") +# assert "Successfully deleted user." in result.output + +# # Verify user no longer exists +# assert not current_datastore.get_user(user.id)