diff --git a/.gitignore b/.gitignore index 85142b006..a940ae11b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,13 @@ # Mycodo files test/ tests/ +!mycodo/tests/ site/ env/ .venv /databases/flask_secret_key /mycodo/config_override.py +/mycodo/flask_session/ *.db *.pem *.bak diff --git a/CHANGELOG.md b/CHANGELOG.md index 69a084e6e..60d581d3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ sudo service mycodoflask restart ### Features - Add API endpoints: /log and /dependency ([#1430](https://github.com/kizniche/Mycodo/issues/1430)) + - Add ability to update (overwrite) a custom Function module without deleting it - Add Input: ENS160 ([#1434](https://github.com/kizniche/Mycodo/pull/1434)) - Add Output: Waveshare 8-Channel Raspberry Pi Relay Board B ([#1434](https://github.com/kizniche/Mycodo/pull/1434)) - Add Function: Camera: rpicam ([#1487](https://github.com/kizniche/Mycodo/pull/1487)) diff --git a/mycodo/mycodo_flask/forms/forms_settings.py b/mycodo/mycodo_flask/forms/forms_settings.py index 1ad64a619..af3daaa86 100644 --- a/mycodo/mycodo_flask/forms/forms_settings.py +++ b/mycodo/mycodo_flask/forms/forms_settings.py @@ -182,6 +182,11 @@ class ControllerDel(FlaskForm): delete_controller = SubmitField(TRANSLATIONS['delete']['title']) +class ControllerMod(FlaskForm): + update_controller_file = FileField() + update_controller = SubmitField(lazy_gettext('Replace')) + + # # Settings (Action) # diff --git a/mycodo/mycodo_flask/routes_settings.py b/mycodo/mycodo_flask/routes_settings.py index b2cf7e2ac..8f5abe908 100644 --- a/mycodo/mycodo_flask/routes_settings.py +++ b/mycodo/mycodo_flask/routes_settings.py @@ -122,6 +122,7 @@ def settings_function(): form_controller = forms_settings.Controller() form_controller_delete = forms_settings.ControllerDel() + form_controller_update = forms_settings.ControllerMod() # Get list of custom functions excluded_files = ['__init__.py', '__pycache__'] @@ -134,6 +135,8 @@ def settings_function(): utils_settings.settings_function_import(form_controller) elif form_controller_delete.delete_controller.data: utils_settings.settings_function_delete(form_controller_delete) + elif form_controller_update.update_controller.data: + utils_settings.settings_function_update(form_controller_delete, form_controller_update) return redirect(url_for('routes_settings.settings_function')) @@ -155,7 +158,8 @@ def settings_function(): return render_template('settings/function.html', dict_controllers=dict_controllers, form_controller=form_controller, - form_controller_delete=form_controller_delete) + form_controller_delete=form_controller_delete, + form_controller_update=form_controller_update) @blueprint.route('/settings/action', methods=('GET', 'POST')) diff --git a/mycodo/mycodo_flask/templates/settings/function.html b/mycodo/mycodo_flask/templates/settings/function.html index 5174886c4..0a7e0e68f 100644 --- a/mycodo/mycodo_flask/templates/settings/function.html +++ b/mycodo/mycodo_flask/templates/settings/function.html @@ -42,7 +42,7 @@

{{_('Imported Function Modules')}}

{% for each_controller in dict_controllers %} -
+ {{form_controller_delete.csrf_token}} {{form_controller_delete.controller_id(value=each_controller)}} @@ -50,8 +50,16 @@

{{_('Imported Function Modules')}}

{{each_controller}} {{dict_controllers[each_controller]['function_name']}} -
- {{form_controller_delete.delete_controller(class_='btn btn-primary btn-block', **{'onclick':'return confirm("Are you sure you want to delete this?")'})}} +
+
+ {{form_controller_delete.delete_controller(class_='btn btn-primary btn-block', **{'onclick':'return confirm("Are you sure you want to delete this?")'})}} +
+
+ +
+
+ {{form_controller_update.update_controller(class_='btn btn-primary btn-block', **{'onclick':'return confirm("Are you sure you want to update this? This will overwrite the existing module.")'})}} +
diff --git a/mycodo/mycodo_flask/utils/utils_settings.py b/mycodo/mycodo_flask/utils/utils_settings.py index c744411c5..586b36823 100644 --- a/mycodo/mycodo_flask/utils/utils_settings.py +++ b/mycodo/mycodo_flask/utils/utils_settings.py @@ -617,6 +617,128 @@ def settings_function_delete(form): flash_success_errors(error, action, url_for('routes_settings.settings_function')) +def settings_function_update(form_del, form): + """ + Receive a function module file, check it for errors, replace the existing module + """ + action = '{action} {controller}'.format( + action=gettext("Update"), + controller=TRANSLATIONS['controller']['title']) + error = [] + + controller_info = None + existing_controller_info = None + + try: + install_dir = os.path.abspath(INSTALL_DIRECTORY) + tmp_directory = os.path.join(install_dir, 'mycodo/functions/tmp_functions') + assure_path_exists(tmp_directory) + assure_path_exists(PATH_FUNCTIONS_CUSTOM) + tmp_name = 'tmp_function_testing.py' + full_path_tmp = os.path.join(tmp_directory, tmp_name) + + controller_device_name = form_del.controller_id.data + + if not form.update_controller_file.data: + error.append('No file present') + elif form.update_controller_file.data.filename == '': + error.append('No file name') + else: + form.update_controller_file.data.save(full_path_tmp) + + if not error: + # Load and validate the uploaded (new) module + try: + controller_info, status = load_module_from_file(full_path_tmp, 'functions') + if not controller_info or not hasattr(controller_info, 'FUNCTION_INFORMATION'): + error.append("Could not load FUNCTION_INFORMATION dictionary from " + "the uploaded controller module") + except Exception: + error.append("Could not load uploaded file as a python module:\n" + "{}".format(traceback.format_exc())) + + # Load the existing (old) module from disk to extract its function_name_unique + existing_file_path = os.path.join( + PATH_FUNCTIONS_CUSTOM, '{}.py'.format(controller_device_name.lower())) + try: + existing_controller_info, status = load_module_from_file(existing_file_path, 'functions') + if not existing_controller_info or not hasattr(existing_controller_info, 'FUNCTION_INFORMATION'): + error.append("Could not load FUNCTION_INFORMATION dictionary from " + "the existing controller module") + except Exception: + error.append("Could not load existing controller module as a python module:\n" + "{}".format(traceback.format_exc())) + + if not error: + if 'function_name_unique' not in controller_info.FUNCTION_INFORMATION: + error.append( + "'function_name_unique' not found in " + "FUNCTION_INFORMATION dictionary") + elif controller_info.FUNCTION_INFORMATION['function_name_unique'] == '': + error.append("'function_name_unique' is empty") + elif (controller_info.FUNCTION_INFORMATION['function_name_unique'].lower() != + existing_controller_info.FUNCTION_INFORMATION['function_name_unique'].lower()): + error.append( + "'function_name_unique' must match the existing module name '{}', " + "but '{}' was found".format( + existing_controller_info.FUNCTION_INFORMATION['function_name_unique'], + controller_info.FUNCTION_INFORMATION['function_name_unique'])) + + if 'function_name' not in controller_info.FUNCTION_INFORMATION: + error.append("'function_name' not found in FUNCTION_INFORMATION dictionary") + elif controller_info.FUNCTION_INFORMATION['function_name'] == '': + error.append("'function_name' is empty") + + if 'dependencies_module' in controller_info.FUNCTION_INFORMATION: + if not isinstance(controller_info.FUNCTION_INFORMATION['dependencies_module'], list): + error.append("'dependencies_module' must be a list of tuples") + else: + for each_dep in controller_info.FUNCTION_INFORMATION['dependencies_module']: + if not isinstance(each_dep, tuple): + error.append("'dependencies_module' must be a list of tuples") + elif len(each_dep) != 3: + error.append("'dependencies_module': tuples in list must have 3 items") + elif not each_dep[0] or not each_dep[1] or not each_dep[2]: + error.append( + "'dependencies_module': tuples in list must " + "not be empty") + elif each_dep[0] not in ['internal', 'pip-pypi', 'apt']: + error.append( + "'dependencies_module': first in tuple " + "must be 'internal', 'pip-pypi', " + "or 'apt'") + + if not error: + # Determine filename from the uploaded module's function_name_unique + unique_name = '{}.py'.format(controller_info.FUNCTION_INFORMATION['function_name_unique'].lower()) + + # Move module from temp directory to function directory, overwriting the existing module + full_path_final = os.path.join(PATH_FUNCTIONS_CUSTOM, unique_name) + os.rename(full_path_tmp, full_path_final) + + # Reload frontend to refresh the controllers + cmd = '{path}/mycodo/scripts/mycodo_wrapper frontend_reload 2>&1'.format( + path=install_dir) + subprocess.Popen(cmd, shell=True) + flash('Frontend reloaded to scan for updated Controller Modules', 'success') + + # Restart the backend if any Controller using this module is currently activated + controller_activated = CustomController.query.filter( + CustomController.device == controller_device_name, + CustomController.is_activated.is_(True)).count() + if controller_activated: + cmd = '{path}/mycodo/scripts/mycodo_wrapper daemon_restart 2>&1'.format( + path=install_dir) + subprocess.Popen(cmd, shell=True) + flash('Backend restarted to apply updated Controller Module', 'success') + + except Exception as err: + logger.exception("Function Update") + error.append("Exception: {}".format(err)) + + flash_success_errors(error, action, url_for('routes_settings.settings_function')) + + def settings_action_import(form): """ Receive an action module file, check it for errors, add it to Mycodo controller list diff --git a/mycodo/tests/software_tests/test_mycodo_flask/test_utils_settings.py b/mycodo/tests/software_tests/test_mycodo_flask/test_utils_settings.py new file mode 100644 index 000000000..eac522647 --- /dev/null +++ b/mycodo/tests/software_tests/test_mycodo_flask/test_utils_settings.py @@ -0,0 +1,289 @@ +# coding=utf-8 +"""Tests for settings function update utility function.""" +import os +import shutil +import tempfile + +import mock +import pytest +from unittest.mock import MagicMock, patch + + +# A minimal valid custom function module content +VALID_FUNCTION_CONTENT = b"""FUNCTION_INFORMATION = { + 'function_name_unique': 'MY_CUSTOM_FUNCTION', + 'function_name': 'My Custom Function', +} +""" + +VALID_FUNCTION_UNIQUE_NAME = 'MY_CUSTOM_FUNCTION' + + +class MockFileStorage: + """Mock Werkzeug FileStorage for testing file uploads.""" + + def __init__(self, filename, content): + self.filename = filename + self._content = content + + def save(self, dst): + with open(dst, 'wb') as f: + f.write(self._content) + + +def make_del_form(controller_id): + """Create a mock ControllerDel form (supplies controller_id).""" + form = MagicMock() + form.controller_id.data = controller_id + return form + + +def make_mod_form(file_storage): + """Create a mock ControllerMod form (supplies the replacement file).""" + form = MagicMock() + form.update_controller_file.data = file_storage + return form + + +@pytest.fixture() +def custom_functions_dir(): + """Create a temporary directory to act as PATH_FUNCTIONS_CUSTOM.""" + tmp_dir = tempfile.mkdtemp() + yield tmp_dir + shutil.rmtree(tmp_dir, ignore_errors=True) + + +@pytest.fixture() +def tmp_install_dir(): + """Create a temporary directory to act as INSTALL_DIRECTORY.""" + tmp_dir = tempfile.mkdtemp() + yield tmp_dir + shutil.rmtree(tmp_dir, ignore_errors=True) + + +@pytest.fixture() +def mock_mycodo_user(): + """ + Mock pwd.getpwnam and grp.getgrnam so that looking up the 'mycodo' + user/group resolves to the current process's UID/GID. This lets + os.chown succeed in CI without a real mycodo system account, while + still exercising the full assure_path_exists / set_user_grp code path. + """ + current_uid = os.getuid() + current_gid = os.getgid() + + mock_pw = MagicMock() + mock_pw.pw_uid = current_uid + + mock_gr = MagicMock() + mock_gr.gr_gid = current_gid + + with patch('mycodo.utils.system_pi.pwd.getpwnam', return_value=mock_pw), \ + patch('mycodo.utils.system_pi.grp.getgrnam', return_value=mock_gr): + yield + + +def write_existing_module(custom_functions_dir, content=None): + """Write the existing module file to PATH_FUNCTIONS_CUSTOM so the update function can load it.""" + if content is None: + content = VALID_FUNCTION_CONTENT + path = os.path.join(custom_functions_dir, 'my_custom_function.py') + with open(path, 'wb') as f: + f.write(content) + return path + + +class TestSettingsFunctionUpdate: + """Tests for the settings_function_update utility function.""" + + @mock.patch('subprocess.Popen') + def test_update_valid_module_no_activated_functions( + self, mock_popen, app, custom_functions_dir, tmp_install_dir, + mock_mycodo_user): + """A valid update with no activated functions: only frontend reload, no daemon restart.""" + from mycodo.mycodo_flask.utils.utils_settings import settings_function_update + + # Write the existing module that will be read from disk for comparison + write_existing_module(custom_functions_dir) + + form_del = make_del_form(VALID_FUNCTION_UNIQUE_NAME) + form = make_mod_form(MockFileStorage('my_custom_function.py', VALID_FUNCTION_CONTENT)) + + with patch('mycodo.mycodo_flask.utils.utils_settings.INSTALL_DIRECTORY', tmp_install_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.PATH_FUNCTIONS_CUSTOM', custom_functions_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.CustomController') as mock_cc: + mock_cc.query.filter.return_value.count.return_value = 0 + settings_function_update(form_del, form) + + # File should be written to the custom functions directory + expected_file = os.path.join(custom_functions_dir, 'my_custom_function.py') + assert os.path.exists(expected_file) + with open(expected_file, 'rb') as f: + assert b'MY_CUSTOM_FUNCTION' in f.read() + + # Only the frontend reload should have been triggered + assert mock_popen.call_count == 1 + assert 'frontend_reload' in mock_popen.call_args[0][0] + + @mock.patch('subprocess.Popen') + def test_update_valid_module_with_activated_function( + self, mock_popen, app, custom_functions_dir, tmp_install_dir, + mock_mycodo_user): + """A valid update with at least one activated function triggers both frontend reload and daemon restart.""" + from mycodo.mycodo_flask.utils.utils_settings import settings_function_update + + write_existing_module(custom_functions_dir) + + form_del = make_del_form(VALID_FUNCTION_UNIQUE_NAME) + form = make_mod_form(MockFileStorage('my_custom_function.py', VALID_FUNCTION_CONTENT)) + + with patch('mycodo.mycodo_flask.utils.utils_settings.INSTALL_DIRECTORY', tmp_install_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.PATH_FUNCTIONS_CUSTOM', custom_functions_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.CustomController') as mock_cc: + # One activated CustomController entry exists for this module + mock_cc.query.filter.return_value.count.return_value = 1 + settings_function_update(form_del, form) + + # File should exist + assert os.path.exists(os.path.join(custom_functions_dir, 'my_custom_function.py')) + + # Both frontend_reload and daemon_restart should have been triggered + assert mock_popen.call_count == 2 + calls = [c[0][0] for c in mock_popen.call_args_list] + assert any('frontend_reload' in c for c in calls) + assert any('daemon_restart' in c for c in calls) + + @mock.patch('subprocess.Popen') + def test_update_overwrites_existing_module( + self, mock_popen, app, custom_functions_dir, tmp_install_dir, + mock_mycodo_user): + """Updating an existing module overwrites its file content.""" + from mycodo.mycodo_flask.utils.utils_settings import settings_function_update + + # Write a recognisably "old" version of the existing module + write_existing_module(custom_functions_dir, VALID_FUNCTION_CONTENT + b'# old version\n') + + new_content = VALID_FUNCTION_CONTENT + b'# new version\n' + form_del = make_del_form(VALID_FUNCTION_UNIQUE_NAME) + form = make_mod_form(MockFileStorage('my_custom_function.py', new_content)) + + with patch('mycodo.mycodo_flask.utils.utils_settings.INSTALL_DIRECTORY', tmp_install_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.PATH_FUNCTIONS_CUSTOM', custom_functions_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.CustomController') as mock_cc: + mock_cc.query.filter.return_value.count.return_value = 0 + settings_function_update(form_del, form) + + with open(os.path.join(custom_functions_dir, 'my_custom_function.py'), 'rb') as f: + content = f.read() + assert b'# old version' not in content + assert b'# new version' in content + + @mock.patch('subprocess.Popen') + def test_update_no_file_fails( + self, mock_popen, app, custom_functions_dir, tmp_install_dir, + mock_mycodo_user): + """Submitting with no file should not update or trigger any subprocess.""" + from mycodo.mycodo_flask.utils.utils_settings import settings_function_update + + write_existing_module(custom_functions_dir) + form_del = make_del_form(VALID_FUNCTION_UNIQUE_NAME) + form = make_mod_form(None) + + with patch('mycodo.mycodo_flask.utils.utils_settings.INSTALL_DIRECTORY', tmp_install_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.PATH_FUNCTIONS_CUSTOM', custom_functions_dir): + settings_function_update(form_del, form) + + mock_popen.assert_not_called() + + @mock.patch('subprocess.Popen') + def test_update_empty_filename_fails( + self, mock_popen, app, custom_functions_dir, tmp_install_dir, + mock_mycodo_user): + """Submitting with an empty filename should not update or trigger any subprocess.""" + from mycodo.mycodo_flask.utils.utils_settings import settings_function_update + + write_existing_module(custom_functions_dir) + form_del = make_del_form(VALID_FUNCTION_UNIQUE_NAME) + form = make_mod_form(MockFileStorage('', VALID_FUNCTION_CONTENT)) + + with patch('mycodo.mycodo_flask.utils.utils_settings.INSTALL_DIRECTORY', tmp_install_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.PATH_FUNCTIONS_CUSTOM', custom_functions_dir): + settings_function_update(form_del, form) + + mock_popen.assert_not_called() + + @mock.patch('subprocess.Popen') + def test_update_different_filename_same_unique_name_succeeds( + self, mock_popen, app, custom_functions_dir, tmp_install_dir, + mock_mycodo_user): + """Uploaded file with a different filename but the same function_name_unique in its dict + should succeed — the comparison is purely dict-based, not filename-based.""" + from mycodo.mycodo_flask.utils.utils_settings import settings_function_update + + # Existing module on disk: my_custom_function.py, unique name 'MY_CUSTOM_FUNCTION' + write_existing_module(custom_functions_dir) + + # Uploaded file has a DIFFERENT filename but the SAME function_name_unique in its dict + form_del = make_del_form(VALID_FUNCTION_UNIQUE_NAME) + form = make_mod_form(MockFileStorage('renamed_upload.py', VALID_FUNCTION_CONTENT)) + + with patch('mycodo.mycodo_flask.utils.utils_settings.INSTALL_DIRECTORY', tmp_install_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.PATH_FUNCTIONS_CUSTOM', custom_functions_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.CustomController') as mock_cc: + mock_cc.query.filter.return_value.count.return_value = 0 + settings_function_update(form_del, form) + + # The existing module file should have been overwritten (keyed by unique name, not upload filename) + expected_file = os.path.join(custom_functions_dir, 'my_custom_function.py') + assert os.path.exists(expected_file) + with open(expected_file, 'rb') as f: + assert b'MY_CUSTOM_FUNCTION' in f.read() + mock_popen.assert_called_once() + + @mock.patch('subprocess.Popen') + def test_update_different_filename_different_unique_name_fails( + self, mock_popen, app, custom_functions_dir, tmp_install_dir, + mock_mycodo_user): + """Uploaded file with a different filename AND a different function_name_unique in its dict + should fail — even though the filename is different, the dict value must match.""" + from mycodo.mycodo_flask.utils.utils_settings import settings_function_update + + # Existing module on disk: my_custom_function.py, unique name 'MY_CUSTOM_FUNCTION' + write_existing_module(custom_functions_dir) + + # Uploaded file has a different filename AND a different function_name_unique in its dict + different_content = b"""FUNCTION_INFORMATION = { + 'function_name_unique': 'DIFFERENT_FUNCTION', + 'function_name': 'Different Function', +} +""" + form_del = make_del_form(VALID_FUNCTION_UNIQUE_NAME) + form = make_mod_form(MockFileStorage('renamed_upload.py', different_content)) + + with patch('mycodo.mycodo_flask.utils.utils_settings.INSTALL_DIRECTORY', tmp_install_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.PATH_FUNCTIONS_CUSTOM', custom_functions_dir): + settings_function_update(form_del, form) + + mock_popen.assert_not_called() + + @mock.patch('subprocess.Popen') + def test_update_invalid_python_fails( + self, mock_popen, app, custom_functions_dir, tmp_install_dir, + mock_mycodo_user): + """An uploaded file with invalid Python should fail without touching the module.""" + from mycodo.mycodo_flask.utils.utils_settings import settings_function_update + + original_content = VALID_FUNCTION_CONTENT + b'# original\n' + write_existing_module(custom_functions_dir, original_content) + + form_del = make_del_form(VALID_FUNCTION_UNIQUE_NAME) + form = make_mod_form(MockFileStorage('my_custom_function.py', b'this is not valid python !@#$%')) + + with patch('mycodo.mycodo_flask.utils.utils_settings.INSTALL_DIRECTORY', tmp_install_dir), \ + patch('mycodo.mycodo_flask.utils.utils_settings.PATH_FUNCTIONS_CUSTOM', custom_functions_dir): + settings_function_update(form_del, form) + + # Existing module should be untouched + with open(os.path.join(custom_functions_dir, 'my_custom_function.py'), 'rb') as f: + assert b'# original' in f.read() + mock_popen.assert_not_called()