diff --git a/ops/model.py b/ops/model.py index 86bb7d152..4d926cab3 100644 --- a/ops/model.py +++ b/ops/model.py @@ -2761,7 +2761,6 @@ def get_notices( user_id: Optional[int] = None, types: Optional[Iterable[Union[pebble.NoticeType, str]]] = None, keys: Optional[Iterable[str]] = None, - after: Optional[datetime.datetime] = None, ) -> List[pebble.Notice]: """Query for notices that match all of the provided filters. @@ -2773,7 +2772,6 @@ def get_notices( user_id=user_id, types=types, keys=keys, - after=after, ) # Define this last to avoid clashes with the imported "pebble" module diff --git a/ops/pebble.py b/ops/pebble.py index 10446a6aa..35a68c69a 100644 --- a/ops/pebble.py +++ b/ops/pebble.py @@ -2744,6 +2744,33 @@ def get_checks( resp = self._request('GET', '/v1/checks', query) return [CheckInfo.from_dict(info) for info in resp['result']] + def notify(self, type: NoticeType, key: str, *, + data: Optional[Dict[str, str]] = None, + repeat_after: Optional[datetime.timedelta] = None) -> str: + """Record an occurrence of a notice with the specified options. + + Args: + type: Notice type (currently only "custom" notices are supported). + key: Notice key; must be in "example.com/path" format. + data: Data fields for this notice. + repeat_after: Only allow this notice to repeat after this duration + has elapsed (the default is to always repeat). + + Returns: + The notice's ID. + """ + body: Dict[str, Any] = { + 'action': 'add', + 'type': type.value, + 'key': key, + } + if data is not None: + body['data'] = data + if repeat_after is not None: + body['repeat-after'] = _format_timeout(repeat_after.total_seconds()) + resp = self._request('POST', '/v1/notices', body=body) + return resp['result']['id'] + def get_notice(self, id: str) -> Notice: """Get details about a single notice by ID. @@ -2760,7 +2787,6 @@ def get_notices( user_id: Optional[int] = None, types: Optional[Iterable[Union[NoticeType, str]]] = None, keys: Optional[Iterable[str]] = None, - after: Optional[datetime.datetime] = None, ) -> List[Notice]: """Query for notices that match all of the provided filters. @@ -2772,14 +2798,18 @@ def get_notices( user (notices whose ``user_id`` matches the requester UID as well as public notices). + Note that the "after" filter is not yet implemented, as it's not + needed right now and it's hard to implement correctly with Python's + datetime type, which only has microsecond precision (and Go's Time + type has nanosecond precision). + Args: - select: select which notices to return (instead of returning - notices for the current user) - user_id: filter for notices for the specified user, including - public notices (only works for Pebble admins) - types: filter for notices with any of the specified types - keys: filter for notices with any of the specified keys - after: filter for notices that were last repeated after this time + select: Select which notices to return (instead of returning + notices for the current user). + user_id: Filter for notices for the specified user, including + public notices (only works for Pebble admins). + types: Filter for notices with any of the specified types. + keys: Filter for notices with any of the specified keys. """ query: Dict[str, Union[str, List[str]]] = {} if select is not None: @@ -2790,8 +2820,6 @@ def get_notices( query['types'] = [(t.value if isinstance(t, NoticeType) else t) for t in types] if keys is not None: query['keys'] = list(keys) - if after is not None: - query['after'] = after.isoformat() resp = self._request('GET', '/v1/notices', query) return [Notice.from_dict(info) for info in resp['result']] diff --git a/ops/testing.py b/ops/testing.py index fced2e310..f91363cda 100644 --- a/ops/testing.py +++ b/ops/testing.py @@ -18,6 +18,7 @@ import dataclasses import datetime import fnmatch +import http import inspect import io import ipaddress @@ -1098,6 +1099,37 @@ def container_pebble_ready(self, container_name: str): self.set_can_connect(container, True) self.charm.on[container_name].pebble_ready.emit(container) + def pebble_notify(self, container_name: str, key: str, *, + data: Optional[Dict[str, str]] = None, + repeat_after: Optional[datetime.timedelta] = None, + type: pebble.NoticeType = pebble.NoticeType.CUSTOM) -> str: + """Record a Pebble notice with the specified key and data. + + If :meth:`begin` has been called and the notice is new or was repeated, + this will trigger a notice event of the appropriate type, for example + :class:`ops.PebbleCustomNoticeEvent`. + + Args: + container_name: Name of workload container. + key: Notice key; must be in "example.com/path" format. + data: Data fields for this notice. + repeat_after: Only allow this notice to repeat after this duration + has elapsed (the default is to always repeat). + type: Notice type (currently only "custom" notices are supported). + + Returns: + The notice's ID. + """ + container = self.model.unit.get_container(container_name) + client = self._backend._pebble_clients[container.name] + + id, new_or_repeated = client._notify(type, key, data=data, repeat_after=repeat_after) + + if self._charm is not None and type == pebble.NoticeType.CUSTOM and new_or_repeated: + self.charm.on[container_name].pebble_custom_notice.emit(container, id, type.value, key) + + return id + def get_workload_version(self) -> str: """Read the workload version that was set by the unit.""" return self._backend._workload_version @@ -2733,6 +2765,8 @@ def __init__(self, backend: _TestingModelBackend, container_root: pathlib.Path): self._root = container_root self._backend = backend self._exec_handlers: Dict[Tuple[str, ...], ExecHandler] = {} + self._notices: Dict[Tuple[str, str], pebble.Notice] = {} + self._last_notice_id = 0 def _handle_exec(self, command_prefix: Sequence[str], handler: ExecHandler): prefix = tuple(command_prefix) @@ -3012,9 +3046,7 @@ def list_files(self, path: str, *, pattern: Optional[str] = None, self._check_absolute_path(path) file_path = self._root / path[1:] if not file_path.exists(): - raise pebble.APIError( - body={}, code=404, status='Not Found', - message=f"stat {path}: no such file or directory") + raise self._api_error(404, f"stat {path}: no such file or directory") files = [file_path] if not itself: try: @@ -3143,19 +3175,13 @@ def exec( handler = self._find_exec_handler(command) if handler is None: message = "execution handler not found, please register one using Harness.handle_exec" - raise pebble.APIError( - body={}, code=500, status='Internal Server Error', message=message - ) + raise self._api_error(500, message) environment = {} if environment is None else environment if service_context is not None: plan = self.get_plan() if service_context not in plan.services: message = f'context service "{service_context}" not found' - body = {'type': 'error', 'status-code': 500, 'status': 'Internal Server Error', - 'result': {'message': message}} - raise pebble.APIError( - body=body, code=500, status='Internal Server Error', message=message - ) + raise self._api_error(500, message) service = plan.services[service_context] environment = {**service.environment, **environment} working_dir = service.working_dir if working_dir is None else working_dir @@ -3246,11 +3272,7 @@ def send_signal(self, sig: Union[int, str], service_names: Iterable[str]): if service not in plan.services or not self.get_services([service])[0].is_running(): # conform with the real pebble api message = f'cannot send signal to "{service}": service is not running' - body = {'type': 'error', 'status-code': 500, 'status': 'Internal Server Error', - 'result': {'message': message}} - raise pebble.APIError( - body=body, code=500, status='Internal Server Error', message=message - ) + raise self._api_error(500, message) # Check if signal name is valid try: @@ -3259,19 +3281,86 @@ def send_signal(self, sig: Union[int, str], service_names: Iterable[str]): # conform with the real pebble api first_service = next(iter(service_names)) message = f'cannot send signal to "{first_service}": invalid signal name "{sig}"' - body = {'type': 'error', 'status-code': 500, 'status': 'Internal Server Error', - 'result': {'message': message}} - raise pebble.APIError( - body=body, - code=500, - status='Internal Server Error', - message=message) from None + raise self._api_error(500, message) def get_checks(self, level=None, names=None): # type:ignore raise NotImplementedError(self.get_checks) # type:ignore + def notify(self, type: pebble.NoticeType, key: str, *, + data: Optional[Dict[str, str]] = None, + repeat_after: Optional[datetime.timedelta] = None) -> str: + notice_id, _ = self._notify(type, key, data=data, repeat_after=repeat_after) + return notice_id + + def _notify(self, type: pebble.NoticeType, key: str, *, + data: Optional[Dict[str, str]] = None, + repeat_after: Optional[datetime.timedelta] = None) -> Tuple[str, bool]: + """Record an occurrence of a notice with the specified details. + + Return a tuple of (notice_id, new_or_repeated). + """ + if type != pebble.NoticeType.CUSTOM: + message = f'invalid type "{type.value}" (can only add "custom" notices)' + raise self._api_error(400, message) + + # The shape of the code below is taken from State.AddNotice in Pebble. + now = datetime.datetime.now(tz=datetime.timezone.utc) + + new_or_repeated = False + unique_key = (type.value, key) + notice = self._notices.get(unique_key) + if notice is None: + # First occurrence of this notice uid+type+key + self._last_notice_id += 1 + notice = pebble.Notice( + id=str(self._last_notice_id), + user_id=0, # Charm should always be able to read pebble_notify notices. + type=type, + key=key, + first_occurred=now, + last_occurred=now, + last_repeated=now, + expire_after=datetime.timedelta(days=7), + occurrences=1, + last_data=data or {}, + repeat_after=repeat_after, + ) + self._notices[unique_key] = notice + new_or_repeated = True + else: + # Additional occurrence, update existing notice + last_repeated = notice.last_repeated + if repeat_after is None or now > notice.last_repeated + repeat_after: + # Update last repeated time if repeat-after time has elapsed (or is None) + last_repeated = now + new_or_repeated = True + notice = dataclasses.replace( + notice, + last_occurred=now, + last_repeated=last_repeated, + occurrences=notice.occurrences + 1, + last_data=data or {}, + repeat_after=repeat_after, + ) + self._notices[unique_key] = notice + + return notice.id, new_or_repeated + + def _api_error(self, code: int, message: str) -> pebble.APIError: + status = http.HTTPStatus(code).phrase + body = { + 'type': 'error', + 'status-code': code, + 'status': status, + 'result': {'message': message}, + } + return pebble.APIError(body, code, status, message) + def get_notice(self, id: str) -> pebble.Notice: - raise NotImplementedError(self.get_notice) + for notice in self._notices.values(): + if notice.id == id: + return notice + raise self._api_error(404, f'cannot find notice with ID "{id}"') def get_notices( self, @@ -3280,6 +3369,38 @@ def get_notices( user_id: Optional[int] = None, types: Optional[Iterable[Union[pebble.NoticeType, str]]] = None, keys: Optional[Iterable[str]] = None, - after: Optional[datetime.datetime] = None, ) -> List[pebble.Notice]: - raise NotImplementedError(self.get_notices) + # Similar logic as api_notices.go:v1GetNotices in Pebble. + + filter_user_id = 0 # default is to filter by request UID (root) + if user_id is not None: + filter_user_id = user_id + if select is not None: + if user_id is not None: + raise self._api_error(400, 'cannot use both "select" and "user_id"') + filter_user_id = None + + if types is not None: + types = {(t.value if isinstance(t, pebble.NoticeType) else t) for t in types} + if keys is not None: + keys = set(keys) + + notices = [notice for notice in self._notices.values() if + self._notice_matches(notice, filter_user_id, types, keys)] + notices.sort(key=lambda notice: notice.last_repeated) + return notices + + @staticmethod + def _notice_matches(notice: pebble.Notice, + user_id: Optional[int] = None, + types: Optional[Set[str]] = None, + keys: Optional[Set[str]] = None) -> bool: + # Same logic as NoticeFilter.matches in Pebble. + # For example: if user_id filter is set and it doesn't match, return False. + if user_id is not None and not (notice.user_id is None or user_id == notice.user_id): + return False + if types is not None and notice.type not in types: + return False + if keys is not None and notice.key not in keys: + return False + return True diff --git a/requirements-dev.txt b/requirements-dev.txt index 0f79aa4d1..8406f2cd8 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -11,6 +11,4 @@ pytest-operator~=0.23 coverage[toml]~=7.0 typing_extensions~=4.2 -macaroonbakery != 1.3.3 # TODO: remove once this is fixed: https://github.com/go-macaroon-bakery/py-macaroon-bakery/issues/94 - -r requirements.txt diff --git a/test/test_model.py b/test/test_model.py index abccb7f6a..4d226ad9a 100644 --- a/test/test_model.py +++ b/test/test_model.py @@ -1980,7 +1980,6 @@ def test_get_notices(self): select=pebble.NoticesSelect.ALL, types=[pebble.NoticeType.CUSTOM], keys=['example.com/a', 'example.com/b'], - after=datetime.datetime(2023, 12, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc), ) self.assertEqual(len(notices), 1) self.assertEqual(notices[0].id, '124') @@ -1992,7 +1991,6 @@ def test_get_notices(self): select=pebble.NoticesSelect.ALL, types=[pebble.NoticeType.CUSTOM], keys=['example.com/a', 'example.com/b'], - after=datetime.datetime(2023, 12, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc), ))]) diff --git a/test/test_pebble.py b/test/test_pebble.py index 63f1339ad..f3d24d310 100644 --- a/test/test_pebble.py +++ b/test/test_pebble.py @@ -2750,6 +2750,52 @@ def test_checklevel_conversion(self): ('GET', '/v1/checks', {'level': 'ready', 'names': ['chk2']}, None), ]) + def test_notify_basic(self): + self.client.responses.append({ + 'result': { + 'id': '123', + }, + 'status': 'OK', + 'status-code': 200, + 'type': 'sync', + }) + + notice_id = self.client.notify(pebble.NoticeType.CUSTOM, 'example.com/a') + self.assertEqual(notice_id, '123') + + self.assertEqual(self.client.requests, [ + ('POST', '/v1/notices', None, { + 'action': 'add', + 'key': 'example.com/a', + 'type': 'custom', + }), + ]) + + def test_notify_other_args(self): + self.client.responses.append({ + 'result': { + 'id': '321', + }, + 'status': 'OK', + 'status-code': 200, + 'type': 'sync', + }) + + notice_id = self.client.notify(pebble.NoticeType.CUSTOM, 'example.com/a', + data={'k': 'v'}, + repeat_after=datetime.timedelta(hours=3)) + self.assertEqual(notice_id, '321') + + self.assertEqual(self.client.requests, [ + ('POST', '/v1/notices', None, { + 'action': 'add', + 'key': 'example.com/a', + 'type': 'custom', + 'data': {'k': 'v'}, + 'repeat-after': '10800.000s', + }), + ]) + def test_get_notice(self): self.client.responses.append({ 'result': { @@ -2851,7 +2897,6 @@ def test_get_notices_filters(self): select=pebble.NoticesSelect.ALL, types=[pebble.NoticeType.CUSTOM], keys=['example.com/a', 'example.com/b'], - after=datetime_utc(2023, 12, 1, 2, 3, 4, 5), ) self.assertEqual(len(notices), 2) self.assertEqual(notices[0].id, '123') @@ -2862,7 +2907,6 @@ def test_get_notices_filters(self): 'select': 'all', 'types': ['custom'], 'keys': ['example.com/a', 'example.com/b'], - 'after': '2023-12-01T02:03:04.000005+00:00', } self.assertEqual(self.client.requests, [ ('GET', '/v1/notices', query, None), diff --git a/test/test_real_pebble.py b/test/test_real_pebble.py index 40ecdf222..676c96058 100644 --- a/test/test_real_pebble.py +++ b/test/test_real_pebble.py @@ -12,6 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Run (some) unit tests against a real Pebble server. + +Set the RUN_REAL_PEBBLE_TESTS environment variable to run these tests +against a real Pebble server. For example, in one terminal, run Pebble: + +$ PEBBLE=~/pebble pebble run --http=:4000 +2021-09-20T04:10:34.934Z [pebble] Started daemon + +In another terminal, run the tests: + +$ source .tox/unit/bin/activate +$ RUN_REAL_PEBBLE_TESTS=1 PEBBLE=~/pebble pytest test/test_real_pebble.py -v +$ deactivate +""" + import json import os import shutil @@ -26,32 +41,23 @@ from ops import pebble -from .test_testing import PebbleStorageAPIsTestMixin +from .test_testing import PebbleNoticesMixin, PebbleStorageAPIsTestMixin + + +def get_socket_path() -> str: + socket_path = os.getenv('PEBBLE_SOCKET') + pebble_path = os.getenv('PEBBLE') + if not socket_path and pebble_path: + assert isinstance(pebble_path, str) + socket_path = os.path.join(pebble_path, '.pebble.socket') + assert socket_path, 'PEBBLE or PEBBLE_SOCKET must be set if RUN_REAL_PEBBLE_TESTS set' + return socket_path -# Set the RUN_REAL_PEBBLE_TESTS environment variable to run these tests -# against a real Pebble server. For example, in one terminal, run Pebble: -# -# $ PEBBLE=~/pebble pebble run --http=:4000 -# 2021-09-20T04:10:34.934Z [pebble] Started daemon -# -# In another terminal, run the tests: -# -# $ source .tox/unit/bin/activate -# $ RUN_REAL_PEBBLE_TESTS=1 PEBBLE=~/pebble pytest test/test_real_pebble.py -v -# $ deactivate -# @unittest.skipUnless(os.getenv('RUN_REAL_PEBBLE_TESTS'), 'RUN_REAL_PEBBLE_TESTS not set') class TestRealPebble(unittest.TestCase): def setUp(self): - socket_path = os.getenv('PEBBLE_SOCKET') - pebble_path = os.getenv('PEBBLE') - if not socket_path and pebble_path: - assert isinstance(pebble_path, str) - socket_path = os.path.join(pebble_path, '.pebble.socket') - assert socket_path, 'PEBBLE or PEBBLE_SOCKET must be set if RUN_REAL_PEBBLE_TESTS set' - - self.client = pebble.Client(socket_path=socket_path) + self.client = pebble.Client(socket_path=get_socket_path()) def test_checks_and_health(self): self.client.add_layer('layer', { @@ -289,14 +295,10 @@ def test_log_forwarding(self): @unittest.skipUnless(os.getenv('RUN_REAL_PEBBLE_TESTS'), 'RUN_REAL_PEBBLE_TESTS not set') class TestPebbleStorageAPIsUsingRealPebble(unittest.TestCase, PebbleStorageAPIsTestMixin): def setUp(self): - socket_path = os.getenv('PEBBLE_SOCKET') - pebble_dir = os.getenv('PEBBLE') - if not socket_path and pebble_dir: - socket_path = os.path.join(pebble_dir, '.pebble.socket') - assert socket_path and pebble_dir, 'PEBBLE must be set if RUN_REAL_PEBBLE_TESTS set' - - self.prefix = tempfile.mkdtemp(dir=pebble_dir) - self.client = pebble.Client(socket_path=socket_path) + pebble_path = os.getenv('PEBBLE') + assert pebble_path is not None + self.prefix = tempfile.mkdtemp(dir=pebble_path) + self.client = pebble.Client(socket_path=get_socket_path()) def tearDown(self): shutil.rmtree(self.prefix) @@ -306,3 +308,9 @@ def tearDown(self): @unittest.skip('pending resolution of https://github.com/canonical/pebble/issues/80') def test_make_dir_with_permission_mask(self): pass + + +@unittest.skipUnless(os.getenv('RUN_REAL_PEBBLE_TESTS'), 'RUN_REAL_PEBBLE_TESTS not set') +class TestNoticesUsingRealPebble(unittest.TestCase, PebbleNoticesMixin): + def setUp(self): + self.client = pebble.Client(socket_path=get_socket_path()) diff --git a/test/test_testing.py b/test/test_testing.py index 1fc624d6c..fd0738fc0 100644 --- a/test/test_testing.py +++ b/test/test_testing.py @@ -27,6 +27,7 @@ import sys import tempfile import textwrap +import time import typing import unittest import uuid @@ -34,7 +35,6 @@ import pytest import yaml -from typing_extensions import Required import ops import ops.testing @@ -2647,7 +2647,7 @@ def __init__(self, framework: ops.Framework): expected_relation_created[0]] self.assertEqual(changes[:2], expected_relation_created) changes = changes[2:] - expected_middle: typing.List[RecordedChange] = [ + expected_middle: typing.List[typing.Dict[str, typing.Any]] = [ {'name': 'leader-elected'}, {'name': 'config-changed', 'data': {}}, {'name': 'start'}, @@ -3053,7 +3053,7 @@ class RelationChangedViewer(ops.Object): def __init__(self, charm: ops.CharmBase, relation_name: str): super().__init__(charm, relation_name) - self.changes: typing.List[typing.Dict[str, str]] = [] + self.changes: typing.List[typing.Dict[str, typing.Any]] = [] charm.framework.observe(charm.on[relation_name].relation_changed, self.on_relation_changed) def on_relation_changed(self, event: ops.RelationEvent): @@ -3066,19 +3066,12 @@ def on_relation_changed(self, event: ops.RelationEvent): self.changes.append(dict(data)) -class RecordedChange(typing.TypedDict, total=False): - name: Required[str] - data: typing.Dict[str, typing.Any] - relation: str - container: typing.Optional[str] - - class RecordingCharm(ops.CharmBase): """Record the events that we see, and any associated data.""" def __init__(self, framework: ops.Framework): super().__init__(framework) - self.changes: typing.List[RecordedChange] = [] + self.changes: typing.List[typing.Dict[str, typing.Any]] = [] self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.leader_elected, self._on_leader_elected) self.framework.observe(self.on.leader_settings_changed, self._on_leader_settings_changed) @@ -3172,10 +3165,11 @@ def _observe_relation_event(self, event_name: str, event: ops.RelationEvent): assert event.departing_unit is not None data['departing_unit'] = event.departing_unit.name - recording: RecordedChange = { + recording: typing.Dict[str, typing.Any] = { 'name': event_name, 'relation': event.relation.name, - 'data': data} + 'data': data, + } if self.record_relation_data_on_events: recording["data"].update({'relation_data': { @@ -3199,16 +3193,25 @@ class ContainerEventCharm(RecordingCharm): def observe_container_events(self, container_name: str): self.framework.observe(self.on[container_name].pebble_ready, self._on_pebble_ready) + self.framework.observe(self.on[container_name].pebble_custom_notice, + self._on_pebble_custom_notice) def _on_pebble_ready(self, event: ops.PebbleReadyEvent): - self._observe_container_event('pebble-ready', event) - - def _observe_container_event(self, event_name: str, event: ops.PebbleReadyEvent): - container_name = None - if event.workload is not None: - container_name = event.workload.name - self.changes.append( - {'name': event_name, 'container': container_name}) + self.changes.append({ + 'name': 'pebble-ready', + 'container': event.workload.name, + }) + + def _on_pebble_custom_notice(self, event: ops.PebbleCustomNoticeEvent): + type_str = (event.notice.type.value if isinstance(event.notice.type, pebble.NoticeType) + else event.notice.type) + self.changes.append({ + 'name': 'pebble-custom-notice', + 'container': event.workload.name, + 'notice_id': event.notice.id, + 'notice_type': type_str, + 'notice_key': event.notice.key, + }) def get_public_methods(obj: object): @@ -5508,3 +5511,199 @@ def test_bad_results(self): self._action_results["A"] = "foo" with self.assertRaises(ValueError): self.harness.run_action("results") + + +class TestNotify(unittest.TestCase): + def test_notify_basics(self): + harness = ops.testing.Harness(ContainerEventCharm, meta=""" + name: notifier + containers: + foo: + resource: foo-image + bar: + resource: foo-image + """) + self.addCleanup(harness.cleanup) + harness.begin() + harness.charm.observe_container_events('foo') + harness.charm.observe_container_events('bar') + + id1a = harness.pebble_notify('foo', 'example.com/n1') + id2 = harness.pebble_notify('foo', 'foo.com/n2') + id3 = harness.pebble_notify('bar', 'example.com/n1') + id1b = harness.pebble_notify('foo', 'example.com/n1') + + self.assertIsInstance(id1a, str) + self.assertNotEqual(id1a, '') + self.assertEqual(id1a, id1b) + + self.assertIsInstance(id2, str) + self.assertNotEqual(id2, '') + self.assertNotEqual(id2, id1a) + + self.assertIsInstance(id3, str) + self.assertNotEqual(id3, '') + self.assertNotEqual(id3, id2) + + expected_changes = [{ + 'name': 'pebble-custom-notice', + 'container': 'foo', + 'notice_id': id1a, + 'notice_type': 'custom', + 'notice_key': 'example.com/n1', + }, { + 'name': 'pebble-custom-notice', + 'container': 'foo', + 'notice_id': id2, + 'notice_type': 'custom', + 'notice_key': 'foo.com/n2', + }, { + 'name': 'pebble-custom-notice', + 'container': 'bar', + 'notice_id': id3, + 'notice_type': 'custom', + 'notice_key': 'example.com/n1', + }, { + 'name': 'pebble-custom-notice', + 'container': 'foo', + 'notice_id': id1a, + 'notice_type': 'custom', + 'notice_key': 'example.com/n1', + }] + self.assertEqual(harness.charm.changes, expected_changes) + + def test_notify_no_repeat(self): + """Ensure event doesn't get triggered when notice occurs but doesn't repeat.""" + harness = ops.testing.Harness(ContainerEventCharm, meta=""" + name: notifier + containers: + foo: + resource: foo-image + """) + self.addCleanup(harness.cleanup) + harness.begin() + harness.charm.observe_container_events('foo') + + id1a = harness.pebble_notify('foo', 'example.com/n1', + repeat_after=datetime.timedelta(days=1)) + id1b = harness.pebble_notify('foo', 'example.com/n1', + repeat_after=datetime.timedelta(days=1)) + + self.assertEqual(id1a, id1b) + + expected_changes = [{ + 'name': 'pebble-custom-notice', + 'container': 'foo', + 'notice_id': id1a, + 'notice_type': 'custom', + 'notice_key': 'example.com/n1', + }] + self.assertEqual(harness.charm.changes, expected_changes) + + def test_notify_no_begin(self): + num_notices = 0 + + class TestCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + self.framework.observe(self.on['c1'].pebble_custom_notice, + self._on_pebble_custom_notice) + + def _on_pebble_custom_notice(self, event: ops.PebbleCustomNoticeEvent): + nonlocal num_notices + num_notices += 1 + + harness = ops.testing.Harness(TestCharm, meta=""" + name: notifier + containers: + c1: + resource: c1-image + """) + self.addCleanup(harness.cleanup) + + id = harness.pebble_notify('c1', 'example.com/n1') + + self.assertIsInstance(id, str) + self.assertNotEqual(id, '') + self.assertEqual(num_notices, 0) + + +class PebbleNoticesMixin: + client: ops.pebble.Client + + assertEqual = unittest.TestCase.assertEqual # noqa + assertIsNone = unittest.TestCase.assertIsNone # noqa + assertLess = unittest.TestCase.assertLess # noqa + assertRaises = unittest.TestCase.assertRaises # noqa + assertGreaterEqual = unittest.TestCase.assertGreaterEqual # noqa + + def test_get_notice_by_id(self): + client = self.client + key1 = 'example.com/' + os.urandom(16).hex() + key2 = 'example.com/' + os.urandom(16).hex() + id1 = client.notify(pebble.NoticeType.CUSTOM, key1) + id2 = client.notify(pebble.NoticeType.CUSTOM, key2, data={'x': 'y'}) + time.sleep(0.000_001) # Ensure times are different. + client.notify(pebble.NoticeType.CUSTOM, key2, data={'k': 'v', 'foo': 'bar'}) + + notice = client.get_notice(id1) + self.assertEqual(notice.id, id1) + self.assertEqual(notice.type, pebble.NoticeType.CUSTOM) + self.assertEqual(notice.key, key1) + self.assertEqual(notice.first_occurred, notice.last_occurred) + self.assertEqual(notice.first_occurred, notice.last_repeated) + self.assertEqual(notice.occurrences, 1) + self.assertEqual(notice.last_data, {}) + self.assertIsNone(notice.repeat_after) + self.assertEqual(notice.expire_after, datetime.timedelta(days=7)) + + notice = client.get_notice(id2) + self.assertEqual(notice.id, id2) + self.assertEqual(notice.type, pebble.NoticeType.CUSTOM) + self.assertEqual(notice.key, key2) + self.assertLess(notice.first_occurred, notice.last_occurred) + self.assertLess(notice.first_occurred, notice.last_repeated) + self.assertEqual(notice.last_occurred, notice.last_repeated) + self.assertEqual(notice.occurrences, 2) + self.assertEqual(notice.last_data, {'k': 'v', 'foo': 'bar'}) + self.assertIsNone(notice.repeat_after) + self.assertEqual(notice.expire_after, datetime.timedelta(days=7)) + + def test_get_notices(self): + client = self.client + + key1 = 'example.com/' + os.urandom(16).hex() + key2 = 'example.com/' + os.urandom(16).hex() + key3 = 'example.com/' + os.urandom(16).hex() + + client.notify(pebble.NoticeType.CUSTOM, key1) + time.sleep(0.000_001) # Ensure times are different. + client.notify(pebble.NoticeType.CUSTOM, key2) + time.sleep(0.000_001) # Ensure times are different. + client.notify(pebble.NoticeType.CUSTOM, key3) + + notices = client.get_notices() + self.assertGreaterEqual(len(notices), 3) + + notices = client.get_notices(keys=[key1, key2, key3]) + self.assertEqual(len(notices), 3) + self.assertEqual(notices[0].key, key1) + self.assertEqual(notices[1].key, key2) + self.assertEqual(notices[2].key, key3) + self.assertLess(notices[0].last_repeated, notices[1].last_repeated) + self.assertLess(notices[1].last_repeated, notices[2].last_repeated) + + notices = client.get_notices(keys=[key2]) + self.assertEqual(len(notices), 1) + self.assertEqual(notices[0].key, key2) + + notices = client.get_notices(keys=[key1, key3]) + self.assertEqual(len(notices), 2) + self.assertEqual(notices[0].key, key1) + self.assertEqual(notices[1].key, key3) + self.assertLess(notices[0].last_repeated, notices[1].last_repeated) + + +class TestNotices(unittest.TestCase, _TestingPebbleClientMixin, PebbleNoticesMixin): + def setUp(self): + self.client = self.get_testing_client()