diff --git a/docs/agents/access_director.rst b/docs/agents/access_director.rst new file mode 100644 index 00000000..99d59573 --- /dev/null +++ b/docs/agents/access_director.rst @@ -0,0 +1,199 @@ +.. highlight:: rst + +.. _access_director: + +===================== +Access Director Agent +===================== + +The Access Director Agent distributes Access Control information +(passwords) to all subscribed agents in the OCS instance. See the +main article on :ref:`access_control_user`, and the Agent +:ref:`access_director_description` below. + + +.. argparse:: + :module: ocs.agents.access_director.agent + :func: make_parser + :prog: agent.py + + + +Configuration File Examples +--------------------------- + +OCS Site Config +``````````````` + +To configure the Access Director Agent we need to add an AccessDirector +block to our ocs configuration file. Here is an example configuration +block using all of the available arguments:: + + {'agent-class': 'AccessDirector', + 'instance-id': 'access-dir', + 'arguments': [['--config-file', '/config/access-director.yaml']]} + +Docker Compose +`````````````` + +The Access Director Agent can also be run in a Docker container. An +example docker-compose service configuration is shown here:: + + ocs-access-dir: + image: simonsobs/ocs:latest + hostname: ocs-docker + environment: + - LOGLEVEL=info + - INSTANCE_ID=access-dir + volumes: + - ${OCS_CONFIG_DIR}:/config:ro + +.. _access_director_config_file: + +Access Director Configuration File +`````````````````````````````````` + +The format is described below, but for a browsable schema see +:class:`ocs.access.AccessDirectorConfig`. + +Here is an example configuration file: + +.. code-block:: yaml + + # Policy file for OCS Access Directory Agent + + passwords_block_default_hashfunc: none + distrib_hashfunc: md5 + + passwords: + - default: true + password_4: 'superuserPassword!' + - agent_class: 'FakeDataAgent' + instance_id: '!faker4' + password_2: 'fake2ser' + - instance_id: 'faker4' + password_2: 'specialLevel2' + password_3: 'speciallevel3' + + exclusive_access_blocks: + - name: "the-fakers" + password: "lockout-test" + grants: + - instance_id: "faker4" + lock_levels: [1,2,3] + cred_level: 1 + - instance_id: "faker*,!faker4" + lock_levels: [3] + cred_level: 3 + + +The ``passwords`` entry is a list of password assignment blocks, which +define passwords that should grant clients certain credential levels +on certain agents. + +The syntax of the assignment blocks is described in +:class:`ocs.access.AccessPasswordItem`. + +**Examples** + +Example 1 -- set the level (1,2,3,4) passwords for all agents to ('', 'special2', +'special3', 'superuser'):: + + passwords: + - default: true + password_1: '' + password_2: 'special2' + password_3: 'special3' + password_4: 'superuser' + +Example 2 -- like Example 1 except that any agent with the class +"FakeDataAgent" will have level 2 password set to 'fake2':: + + passwords: + - default: true + password_1: '' + password_2: 'special2' + password_3: 'special3' + password_4: 'superuser' + - agent_class: FakeDataAgent + password_2: 'fake2' + +Example 3 -- like Example 2 except that the agent with `instance_id` +of "danger4" will have the level 2 and level 3 access totally +disabled (even if "danger4" is a FakeDataAgent.):: + + passwords: + - default: true + password_1: '' + password_2: 'special2' + password_3: 'special3' + password_4: 'superuser' + - agent_class: FakeDataAgent + instance_id: '!danger4' + password_2: 'fake2' + + +**exclusive_access_blocks** + +The ``exclusive_access_blocks`` entry is a list of access grant +blocks. Each block must at least have a (unique) "name" entry, and a +list of :class:`GrantConfigItem`. Each +GrantConfigItem targets some set of agent instances (using the +`default` / `agent_class` / `instance_id` keys in the same way that +:class:`ocs.access.AccessPasswordItem` does). The `cred_level` +declares what level to give the grantee, on the targeted instances. +The `lockout_levels` is a list of Credential Levels to *lock out*, +during this grant. + +The additional settings items are: + +- ``passwords_block_default_hashfunc``: name of the hash function to + assume for passwords provided in the "passwords" block. (Default: + "none", meaning they are the cleartext.) +- ``distrib_hashfunc``: hashfunc to use, instead of cleartext, when + distributing passwords to agents. (Does not affect passwords that + were provided already hashed.) + + + +.. _access_director_description: + + +Description +----------- + +The role of this Agent is to distribute Access Control information to +Agent Instances and Clients in the OCS instance. In an OCS there will +normally be zero or one instance of the Access Director (but it's +possible to set up multiple instances, and have different agents +attuned to different Access Directors). + +The agent configuration is provided through the Access Config File, +the path to which is a command-line parameter. The ``manager`` +process distributes access information to agents on a special feed +(``...feeds.controls``). The task ``reload_config`` may be used to +trigger a reload of the config file. If there are syntax errors in +the config file, this will normally cause the reload to be ignored and +the existing configuration to persist. + +Two "special access points" are exposed by the Access Director. The +``agent_poll`` method is used by agents to request an update of their +current access information; they will normally do this when they +connect or reconnect to crossbar. + +The ``request_exclusive`` access point is used by clients that wish to +establish an Exclusive Access Lock. The client would provide the name +of the access block, and a password (if defined). The Agent then +returns to that client a randomly generated password that the client +can use to talk to all agents covered by the grant. Starting then, +and until the grant expires or is released, the Agent distributes +updated access information to all agents that reflects the new access +rules -- i.e. special access levels, granted based on that password, +and also any lock-outs that are associated with the grant. + + +Agent API +--------- + +.. autoclass:: ocs.agents.access_director.agent.AccessDirector + :members: diff --git a/docs/api.rst b/docs/api.rst index 18238503..6676b1db 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -9,6 +9,15 @@ agents The ``agents/`` directory contains the OCS Agents, as well as any supporting code. +agents.access_director +`````````````````````` + +.. automodule:: ocs.agents.access_director.agent + :members: + :undoc-members: + :show-inheritance: + :noindex: + agents.aggregator ````````````````` @@ -87,6 +96,14 @@ ocs.agent_cli :undoc-members: :show-inheritance: +ocs.access +---------- + +.. automodule:: ocs.access + :members: + :undoc-members: + :show-inheritance: + .. _ocs_base_api: ocs.base diff --git a/docs/developer/access_control.rst b/docs/developer/access_control.rst new file mode 100644 index 00000000..8621c76c --- /dev/null +++ b/docs/developer/access_control.rst @@ -0,0 +1,165 @@ +.. _access_control_dev: + +Access Control +============== + +This section discusses the implementation of Access Control in Agent +code. A connected Client will provide a password to the operation, +and this will automatically be compared to the set of accepted +passwords to establish what Privilege Level to use. + +From the perspective of the Agent code, there are two ways access can +be restricted: + +1. When the Task or Process is registered, on Agent start-up, pass + ``min_privs=2`` or ``min_privs=3``. Then, when a client tries to + start or stop the Operation, any provided password will + automatically be checked, and the request will be immediately + rejected. + +2. In the body of the Task or Process, the agent code can check the + current privilege level, and take different actions depending on + that level. This is appropriate when certain conditions, which + need to be assessed on the fly, might warrant requiring a higher + level of privilege to run. + +The next two sections go into more detail on those two approaches. + + +Restriction of Operations at Agent Start-up +------------------------------------------- + +The simplest form of restriction is to simply require a minimum +privilege level for access to a Task or Process. + +To restrict access to the Operation Process, state the required access +level in the ``min_privs`` argument when registering the op. For +example:: + + agent.register_task('deep_clean', washingmachine.deep_clean, + min_privs=2) + +If the need for Access Control depends somewhat on the particular +instance of an Agent, it may be convenient to have ``min_privs`` set +based on a command-line parameter. The FakeData Agent shows an +example of this. + +When restrictions are set up in this way, checking of privileges is +handled automatically and immediately when a client calls ``start`` or +``stop`` on an operation -- if the privileges are not sufficient the +client gets an *immediate* error response indicating the failure. + +In the case that an Agent has privilege levels coded in, but you want +to disable (or effectively disable) those restrictions, there are two +simple options: + +1. You can run tell the agent to run with no privilege enforcement, by + passing `--access-policy none`. In the SCF that would look like + this:: + + {'agent-class': 'FakeDataAgent', + 'instance-id': 'faker4', + 'arguments': [['--access-policy', 'none']], + }, + + +2. You can add an Access Director configuration block, that accepts an + empty password for access to level 3:: + + passwords: + - instance_id: 'faker4' + - password_3: '' + + +Note that in case (1), the agent will entirely ignore the Access +Director -- so Exclusive Access grants will also not have any effect +on this agent. In case (2) such grants will still be respected (if +they include a lock-out for general level 3 access). + + +Dynamic Access Restrictions +--------------------------- + +A more complex imposition of restrictions is to make run-time +decision, in code. This is achieved, within the body of the Task or +Process function, by checking ``session.cred_level``. + +E.g.:: + + class WashingMachine(OCSAgent): + # ... + + def deep_clean(self, session, args): + """deep_clean(force=False) + + **Task** -- Perform a deep clean cycle. + + If a normal wash cycle is in progress, cred_level=2 is + required to start the deep_clean (which will cause the + current wash cycle to be aborted); in that case force=True + must be passed. + + """ + if self.wash_cycle_in_progress: + if session.cred_level < 2 or not force: + return False, ("CredLevel=2 and force=True are required " + "to start deep_clean during a wash cycle.") + self._abort_wash_cycle() + + self.washer._hardware.initiate_deep_clean() + ... + + +Note that rejections in the function body cause the Operation to exit +with error, rather than for the ``start`` call of the Operation to +return an immediate error. + +It is good practice for an operation to not have drastically different +behavior, depending *only* on the credential level. Users/clients may +sometimes provide their high-privilege credentials to routine +operations, and safe-guards should remain in place despite that +privileged access. This is the reason for requiring ``force=True`` in +the ``deep_clean`` example, above -- even a high-privilege user +probably doesn't want, accidentally, to run ``deep_clean`` while the +``wash_cycle_in_progress`` is. + + +Choosing What to Protect +------------------------ + +When developing agents with Access Control in mind, you should +consider what functionality of the agent should be restricted. In a +complex system, operation by users can become very awkward if numerous +different passwords are required to access standard functionality of +various devices. We thus recommend that Access Control be used only +to guard against the accidental entry into unsafe or highly +inconvenient hardware states. + +As a general guideline: + +- Require privilege level 3 for operations that could lead to damage, + long-term outages, or degrade observatory safety. +- Require privilege level 2 for activities that could lead to awkward + device states that might delay observatory function temporarily, or + require expert attention to recover from. +- Use default privilege level (1) for everything else. This is true, + even if some expertise is required to use the device properly. + +Testing and Debugging +--------------------- + +When testing an agent's Access Control, recall that the +``--access-policy`` argument can be used to set the level 2 and 3 +passwords, independent of whether an Access Director agent is running +in the OCS instance. An example SCF entry for a FakeData agent with +passwords is:: + + {'agent-class': 'FakeDataAgent', + 'instance-id': 'faker4', + 'arguments': [['--access-policy', 'override:fake-pw-2,fake-pw-3']], + }, + +You can override the ``--access-policy`` on the command line when +using ``ocs-agent-cli``; e.g.:: + + $ ocs-agent-cli --instance-id=faker4 --access-policy=none diff --git a/docs/developer/clients.rst b/docs/developer/clients.rst index 02c654d3..49e64f70 100644 --- a/docs/developer/clients.rst +++ b/docs/developer/clients.rst @@ -192,6 +192,30 @@ OCSReply is a namedtuple. The elements of the tuple are: docstring for :func:`ocs.ocs_agent.OpSession.encoded` and the Data Access section on :ref:`session_data`. + +.. _clients_passwords: + +Passwords and Credentials +````````````````````````` + +When :ref:`Access Control` has been configured, +clients will need to provide passwords to access some Agent +functionality. Use the ``privs`` argument, when instantiating a +client, to cause the client to use a password:: + + # Use access password + client = OCSClient('agent-instance-id', privs='my-secret-password') + +Alternatively, a specific access level can be requested, as an integer:: + + # Get credential level 3 + client = OCSClient('agent-instance-id', privs=3) + +For that to work, an OCS passwords file must be available. See +:func:`ocs.access.client_get_password` for a discussion of how to set +up the OCS password file. + + Examples ```````` diff --git a/docs/index.rst b/docs/index.rst index b27120bd..f3d306e1 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -30,6 +30,7 @@ write OCS Agents or Clients. :caption: Agent Reference :maxdepth: 3 + agents/access_director agents/aggregator agents/barebones agents/influxdb_publisher @@ -48,6 +49,7 @@ write OCS Agents or Clients. developer/clients developer/data developer/writing-plugins + developer/access_control developer/docker developer/testing diff --git a/docs/user/access_control.rst b/docs/user/access_control.rst new file mode 100644 index 00000000..a1f312da --- /dev/null +++ b/docs/user/access_control.rst @@ -0,0 +1,151 @@ +.. _access_control_user: + +Access Control +============== + +In OCS, Access Control is a set of features to restrict some Agent +activities, so that they can only be controlled by certain privileged +clients. The goals of the Access Control system are: + +- Provide a way to restrict some Agent functionality, such as + dangerous or time consuming operations, to clients or users that + have provided special access passwords. +- Provide a way for special clients, such as the main observatory + scheduling system, to request exclusive access to Agent functions, + and thus prevent other clients from initiating potentially + interfering activities. + +To enable the full features of OCS Access Control, it is necessary to +include an instance of the :ref:`Access Director` +Agent in the OCS setup. The Access Director distributes access +information to Agents on the network using a special feed, and +processes requests from agents and clients using special WAMP access +points. + + +Important configuration files +----------------------------- + +Access Control is affected by the following configuration files: + +**Site Config File** + +- The SCF includes a global setting that determines whether agents in + the system should refer to the Access Director (AD) agent, or not. + See :class:`ocs.site_config.HubConfig`. +- The SCF also must include a block that sets up an instance of the + AD, which will include specifying the path to the Access Config + File; see :ref:`Access Director`. + +**Access Config File** + +- The Access Config File is a YAML file read by the AD. It sets the + passwords that clients can use to obtain different credential levels + for different agent instances. +- It may also define "Exclusive Access" grant blocks (see below). +- The format of this file, too, is described with the :ref:`Access + Director` Agent. + + +**Client Access Config Files** + +- Although you can include passwords when instantiating + :class:`ocs.ocs_client.OCSClient` objects, users can store passwords in + a config file for automatic retrieval. +- See :ref:`clients_passwords`. + + +Setting up the Access Director +------------------------------ + +The Access Director agent is set up just like any other agent, with an +entry in the Site Config File. See the Agent's instructions for the +standard SCF and docker-compose blocks. Because it will be critical +to proper functioning of OCS, it is a good idea to configure it to run +on the same host as the crossbar server and other critical agents +(Registry, Aggregator) if possible. + + +Setting up Agents to Listen to the Access Director +-------------------------------------------------- + +By default, other Agents will not know to refer to the Access Director +for access information. Agents need to be configured to refer to the +Access Director, by either: + +- Passing the ``--access-policy=director:`` argument to + the Agent, providing the instance-id of the Access Director; OR +- The ``'access_policy'`` setting in the SCF, which sets the default + value for `--access-policy` argument for all agents in the OCS + instance. + +Before enabling the "director" access policy on the whole system, the +behavior can be tested on a subset of Agents by passing +``--access-policy`` explicitly in their instance args. + +Once an Agent instance has been launched with +``--access-policy=director...``, communication with the instance *will +depend on* the presence and proper functioning of the Access Director +agent on the network. (Without such stringent policies, the power of +exclusive access grants would be diminished.) + + +Developing Agents with Access Control Features +---------------------------------------------- + +In most cases it will be necessary to include access controls +(e.g. requiring a specific level to perform a certain task) in the +Agent code. See :ref:`access_control_dev`. + +For agents that do not have any explicit Access Control functionality +built in, they are still subject to constraints from the Accesss +Director -- provided that the Agent instance is running on a version +of ``ocs`` that is Access Control-aware. Even if an agent doesn't set +any minimum credential levels for operations, the Access Director can +still restrict access to that agent by imposing passwords for level 1 +access. + + +Passwords and Credential Levels +------------------------------- + +The basic approach to access control is that a client will provide a +password whenever it performs calls to an Agent Instance Operation +(e.g. starting a Task). Based on the password, the client will be +granted some level of access. The access levels are: + +- 0: Access is blocked. +- 1: Basic Access. +- 2: Advanced Access. +- 3: Full Access. +- 4: Super-user Access. + +Operations within the Agent Instance will be hard-coded, or will +somehow decide dynamically, to require some minimum access level to +perform a given operation. If the access level associated with the +password is equal to or higher than the required access level, then +the operation is allowed to proceed. Otherwise, the call will return +an error immediately (or, in dynamically decided cases, the operation +will exit with error). + +For details on how to add Credential Level awareness to Agent code, +see :ref:`access_control_dev`. + + +Exclusive Access Grants +----------------------- + +Exclusive Access Grants are a means for some clients to obtain +exclusive access to certain operations. For example, a sequence of +activities that requires careful interleaving of otherwise routine +tasks between two agents might request exclusive access to those two +agent instances, locking out all other users. + +Multiple Exclusive Access Grant blocks can be defined, in the Access +Config File, and then may be referenced by "name" and activated by +providing a (optional) password to the Access Director agent. + +For a description of the Exclusive Access Grant block format, see +:ref:`access_director_config_file`. For details on how a client may +request, renew, and relinquish an Exclusive Access Grant, see +:class:`ocs.access.ExclusiveAccessClient`. diff --git a/docs/user/configuration.rst b/docs/user/configuration.rst index 5d7571c3..80adbad6 100644 --- a/docs/user/configuration.rst +++ b/docs/user/configuration.rst @@ -23,3 +23,4 @@ following pages: docker_config crossbar_config centralized_management + access_control diff --git a/ocs/access.py b/ocs/access.py new file mode 100644 index 00000000..42d3cc51 --- /dev/null +++ b/ocs/access.py @@ -0,0 +1,821 @@ +"""Access Control is a system for restricting access to some Agents or +Agent functions to certain clients. + +Functions in this module are prefixed with one of: + +- ``agent_`` -- functions used by an Agent to determine access + controls and verify client access credentials. + +- ``client_`` -- functions used by client programs to interact with + Agents that implement access control. + +- ``director_`` -- functions used by Access Director to process access + rules and grant requests. + +""" + +import hashlib +import os +import random +import string +import time +import yaml +from enum import IntEnum +from fnmatch import fnmatch + +from typing import List, Optional, Tuple, Union + +# "asdict" imported so module users can easily serialize dataclasses. +from dataclasses import field, fields, asdict # noqa + +# Use pydantic dataclass in order to deserialize complex hierarchical +# objects, such as a dataclass that has a member that is a list of +# some other dataclass. +from pydantic.dataclasses import dataclass + + +from . import ocs_client + +#: The protocol version ("access_control") that agents report +#: themselves as using via get_api., should declare themselves to be +#: using. +AC_VERSION = 1 + + +class CredLevel(IntEnum): + """Representation of the credential level of a client, or the + required credential level for some operation.""" + BLOCKED = 0 + BASIC = 1 + ADVANCED = 2 + FULL = 3 + SUPERUSER = 4 + + def __str__(self): + return f'{self.value}-{self.name}' + + +#: Keys to use for passwords associated with each CredLevel. These are +#: used in config blocks for Access Director and OCS clients, for +#: example. +CRED_KEYS = [ + ('password_1', CredLevel(1)), + ('password_2', CredLevel(2)), + ('password_3', CredLevel(3)), + ('password_4', CredLevel(4)), +] + + +# Password and hashing support + +def _get_random_string(length=8): + return ''.join([random.choice(string.ascii_letters) for i in range(length)]) + + +def _hashfunc_no_hash(x): + return x + + +def _hashfunc_short_md5(x): + return hashlib.md5(x.encode('utf-8')).hexdigest()[:16] + + +HASHFUNCS = { + 'none': _hashfunc_no_hash, + 'blocked': lambda x: ValueError('Should not call this hash func.'), + 'md5': _hashfunc_short_md5, +} + + +@dataclass +class HashedPass: + """Container for hashed password values, for internal storage in + Access Director agent as well as for distribution to Agents that + need to validate clients. + + """ + + #: The identifier for the hash being used (use "none") for clear + #: text, and "blocked" to refuse to match any user password. + hash: str + + #: The hashed string. If this string is empty, and hash != + #: "blocked", then *any* provided password will match. + value: str = '' + + def check(self, value: str) -> bool: + """Check if the provided clear-text password matches this + object's stored password, after hashing. + + """ + if value is None: + value = '' + if self.hash == 'blocked': + return False + if self.value == '': + return True + hashfunc = HASHFUNCS[self.hash] + return hashfunc(value) == self.value + + @classmethod + def create_blocked(cls): + """Return an object representing "blocked" access.""" + return cls(hash='blocked') + + @classmethod + def create_free(cls): + """Return an object representing passwordless access.""" + return cls(hash='none') + + @classmethod + def create_from_value(cls, hash, clear_text): + """Return on object constructed from the hashed value of + clear_text password. + + """ + hashfunc = HASHFUNCS[hash] + return cls(hash=hash, value=hashfunc(clear_text)) + + +@dataclass +class AccessPasswordItem: + """A single entry in a password configuration file. + + This is used both in the Access Configuration File, which + configures the Access Director, and in the OCS Password File, + which clients reference to find passwords to use for specific + agents. + + Each field in this object is either a **selector** or a + **password**. + + The **selectors** are used to determine whether this rule applies + to a particular agent instance (described in terms of its *agent_class* and + *instance_id*). Those keys are: + + - ``default`` (bool): True if this selector should apply to all + agent instances -- when this is set, all other selector keys + are ignored. + - ``agent_class`` (str): This is a pattern to match against the + provided *agent_class*. If absent or set to None, this is + ignored. + - ``instance_id`` (str): This is a pattern to match against the + provided *instance_id*. If absent or set to None, this is + ignored. + + Patterns and matching are as described in this module's + :func:`pattern_match`. + + The **passwords** consist of: + + - ``password_1``: The password associated with credential level 1. + - ``password_2``: The password ... level 2. + - ``password_3``: The password ... level 3. + - ``password_4``: The password ... level 4. + + When used in the Access Configuration File, the *password* entries + define passwords that should be accepted, to grant clients that + level of access. The passwords may be strings (which will be + interpreted as either clear-text, or pre-hashed values, depending + on the ``passwords_block_default_hashfunc`` setting). When the + Agent is checking a password, it considers all the rules and + returns the highest credential level matched by the password and + target. + + When used in the OCS Password File, only ``password_2`` and/or + ``password_3`` keys should be used -- and the values should be + strings, which will be interpreted as the clear text passwords to + use in the Client. + + """ + #: Selector variables + default: Optional[bool] = False + agent_class: Optional[str] = None + instance_id: Optional[str] = None + + #: Password variables + password_1: Optional[Union[str, HashedPass]] = None + password_2: Optional[Union[str, HashedPass]] = None + password_3: Optional[Union[str, HashedPass]] = None + password_4: Optional[Union[str, HashedPass]] = None + + def get_scope_spec(self): + return ScopeSpec(default=self.default, + agent_class=self.agent_class, + instance_id=self.instance_id) + + +@dataclass +class AgentSpec: + """Identifying information for a specific Agent Instance. Info + here is compared against a ScopeSpec for an AccessRule, to see if + rule should be applied to the Instance. + + """ + instance_id: str + agent_class: str + + #: The superuser_key is used for an Agent to start its own + #: operations, internally, such as on startup. + superuser_key: Optional[object] = None + + +@dataclass +class ActionContext: + """Placeholder class for future fine-grain access control / + lockout of individual operations at the Access Director level. + + """ + op_name: Optional[str] = None + action: Optional[str] = None + + +def pattern_match(target: str, pattern: str, raw=False): + """Pattern-matching of a target against a pattern is defined as + follows: + + - A pattern consists of one or more sub-patterns, separated by + commas. The pattern matches the target if any of the + positive sub-patterns match the target, as long as none of + the negative sub-patterns matches the target. + - If a sub-pattern does not start with "!", it considered a + positive sub-pattern and fnmatch is used to test the + sub-pattern against the target. + - If a sub-pattern starts with "!", then it is a negative + sub-pattern and the remainder of the sub-pattern text is + used with fnmatch to test against the target. + + Examples: + + - The pattern ``"Director,Act*,*Producer*"`` matches the string + "Director" or any string that starts with "Act" or contains + "Producer". + - The pattern ``"*Agent,!FakeDataAgent"`` matches any string + that ends with "Agent", except the string "FakeDataAgent". + - The pattern ``"compute*,login*,!*9,!*8"`` matches any string + that starts with "compute" or "login" and does not end with + "8" or "9". + + """ + assert (',' not in target) and ('!' not in target) + pos, neg = [], [] + for subpat in pattern.split(','): + dest = pos + if subpat.startswith('!'): + dest = neg + subpat = subpat[1:] + if len(subpat) == 0: + continue + dest.append(fnmatch(target, subpat)) + if raw: + return pos, neg + return (len(pos) == 0 or any(pos)) and not any(neg) + + +@dataclass +class ScopeSpec: + """A specification of the scope of an AccessRule. See ``check`` + function for matching details. + + """ + default: Optional[bool] = False + agent_class: Optional[str] = None + instance_id: Optional[str] = None + + def check(self, agent: AgentSpec): + """Determine whether ``agent`` matches the present ScopeSpec. + + If ``self.default`` is True, then this function returns True. + Otherwise, if ``self.agent_class`` and ``self.instance_id`` + are both None, then this function returns False. Otherwise + the agent must match the pattern in self.agent_class, and the + pattern in self.instance_id. + + See :func:`pattern_match` in this module for a description of + patterns and matching. + + """ + if self.default: + return True + if self.agent_class is None and self.instance_id is None: + return False + if (self.agent_class is not None + and not pattern_match(agent.agent_class, self.agent_class)): + return False + if (self.instance_id is not None + and not pattern_match(agent.instance_id, self.instance_id)): + return False + return True + + def get_specificity(self): + """Return the *specificity* of this rule, for sorting. The + principles are that positive patterns are more specific than + negative patterns; and then that instance_id selection is more + specific than agent_class selection. + + If default=True, then 0 is returned. If not default, but + agent_class and instance_id are both None, then 0 is returned. + + Otherwise, the specificity is the sum of the following + contributions: + + - 8 if instance_id includes any positive subpatterns. + - 4 if agent_class includes any positive subpatterns. + - 2 if instance_id includes any negative subpatterns. + - 1 if agent_class includes any negative subpatterns. + + """ + if self.default: + return 0 + spec_bits = [] + if self.instance_id is not None: + pos, neg = pattern_match('', self.instance_id, raw=True) + if len(pos): + spec_bits.append(3) + if len(neg): + spec_bits.append(1) + if self.agent_class is not None: + pos, neg = pattern_match('', self.agent_class, raw=True) + if len(pos): + spec_bits.append(2) + if len(neg): + spec_bits.append(0) + if len(spec_bits) == 0: + return -1 + return sum([(1 << b) for b in spec_bits]) + + +@dataclass +class AccessRule: + """A Rule for consumption by an Agent to grant or revoke access. + The ``scope_spec`` determines what Agent Instances the rule should + be applied to. The cred_level is the level granted by this rule, + if password in hashed_pass has been provided. + + The lockout_* entries are populated in the case that this rule + arises from an Exclusive Access Grant. The lockout_id is the name + given to some specific lockout definition. The lockout_owner is + some identifier provided by whoever requested the lockout. And + lockout_levels is the list of CredLevels that other callers (those + without the present lockout password) are forbidden from having. + + """ + hashed_pass: Optional[HashedPass] + cred_level: Optional[CredLevel] + scope_spec: ScopeSpec = field(default_factory=lambda: ScopeSpec(default=True)) + lockout_id: Optional[str] = None + lockout_owner: Optional[str] = None + lockout_levels: Optional[List[CredLevel]] = field(default_factory=list) + + +@dataclass +class AgentAccessRules: + """A container used by Agent to hold configuration, including + AccessRule (whether generated in place or received from Access + Director agent). + + """ + policy: Optional[str] = None + director_id: Optional[str] = None + agent: Optional[AgentSpec] = None + rules: List[AccessRule] = field(default_factory=list) + + +def agent_get_policy_default(policy: str) -> [AgentAccessRules]: + """Get the default access rules, based on a "policy" string + (probably from the --access-policy Agent argument). + + The policy passed in by the user should be one of the following: + + - "none" (or "", or None). This effectively disables Access + Control, as the returned rules will give a caller maximum + privileges, no matter what password. + - "override:pw2,pw3", where "pw2" and "pw3" represent the desired + level 2 and level 3 passwords (unhashed). + - "director:access-dir", where "access-dir" represents the + instance-id of an Access Director agent. The rules returned in + this case will, initially, block all access; it's expected these + rules will be updated by messages from the Access Director. + + """ + default_scope = ScopeSpec(default=True) + free_pass = HashedPass.create_free() + blocked_pass = HashedPass.create_blocked() + + if policy in [None, '', 'none']: + return AgentAccessRules( + policy='none', + rules=[AccessRule(scope_spec=default_scope, + hashed_pass=free_pass, + cred_level=CredLevel(level)) + for level in [1, 2, 3]]) + + policy, args = policy.split(':', 1) + + if policy == 'director': + if args == '': + args = 'access-director' + return AgentAccessRules( + policy='director', + director_id=args, + rules=[AccessRule(scope_spec=default_scope, + hashed_pass=blocked_pass, + cred_level=CredLevel(level)) + for level in [1, 2, 3]]) + + elif policy == 'override': + pws = args.split(',') + return AgentAccessRules( + policy='override', + rules=[ + AccessRule(scope_spec=default_scope, + hashed_pass=free_pass, + cred_level=CredLevel(1)), + AccessRule(scope_spec=default_scope, + hashed_pass=HashedPass(hash='none', value=pws[0]), + cred_level=CredLevel(2)), + AccessRule(scope_spec=default_scope, + hashed_pass=HashedPass(hash='none', value=pws[1]), + cred_level=CredLevel(3)), + ]) + raise ValueError(f'Invalid policy "{policy}"') + + +def agent_filter_rules(rules: List[AccessRule], + agent: AgentSpec) -> [List[AccessRule]]: + """Filter a list of AccessRules, keeping only ones pertinent to + agent. + + """ + return [rule for rule in rules + if rule.scope_spec.check(agent)] + + +def agent_get_creds( + password: Union[str, None], + access_rules: AgentAccessRules, + agent: AgentSpec, + action: ActionContext = None, +) -> [CredLevel]: + """Based on the access_rules, and the provided password, determine + the credential level for the specified agent and action. + + """ + if access_rules.policy == 'none': + return CredLevel(1), "No access policy -- level 1 for all." + + if agent.superuser_key is not None and password is agent.superuser_key: + return CredLevel(4), "Super-user." + + cred_levels = [CredLevel(0)] + blocked_levels = [] + lockout_owners = set() + + for rule in access_rules.rules: + if not rule.scope_spec.check(agent): + continue + if rule.hashed_pass.check(password): + cred_levels.append(rule.cred_level) + elif len(rule.lockout_levels): + blocked_levels.extend(rule.lockout_levels) + lockout_owners.add(f"{rule.lockout_id}[{rule.lockout_owner}]") + + privs_diminished = False + cred_level = max(cred_levels) + while cred_level > CredLevel(0) and cred_level in blocked_levels: + cred_level = CredLevel(cred_level.value - 1) + privs_diminished = True + + return cred_level, ("" if not privs_diminished else + f"Privileges diminished; lockouts active: {list(lockout_owners)}") + + +def agent_rejection_message(cred_level: CredLevel, required_level: CredLevel, + lockout_detail: str = ''): + """Get a helpful message about what privs are needed to access a + resource protected at required_level. + + """ + assert cred_level.value < required_level.value + text = (f'The action requires credential level {required_level} but the ' + f'client has only level {cred_level}') + if lockout_detail != '': + text += ' ' + lockout_detail + return text + + +# +# Client support +# + +def client_get_password(privs, agent_class, instance_id): + """For OCSClient use -- determine the best client password to use. + This may lead to inspection of OCS password files. + + Args: + privs (str, int): If this is a string, it will be used as the + password. If this argument is an integer, it represents a + desired credential level and the local password configuration + will be inspected to find a password associated with access at + that level. + agent_class (str or None): If specified, will be used to match + rules in the password config file. + instance_id (str or None): If specified, will be used to match + rules in the password config file. + + Returns: + A string representing the password to use in all requests the + client makes (these are passed to the agent in the + "password=..." argument of the _ops_handler). + + Notes: + If privs is a string, then this password is used directly and no + inspection of config files is performend. + + If privs is 1, then the password '' is returned. + + If privs is 2 or 3, then the OCS password file is loaded and + inspected for a suitable password. The password file will be + loaded from ``~/.ocs-passwords.yaml``, unless overridden by the + environment variable ``OCS_PASSWORDS_FILE``. + + The password file is in yaml format, containing a single list. + Each entry in the list is a dictionary referred to as a rule. + Each rule is a dict with the schema described in + :class:`ocs.access.AccessPasswordItem`. + + To find a suitable password, all rules are considered. When + multiple rules have selectors that match the target and contain + a password of the required credential level or higher, then the + one that is most *specific* is taken. When multiple rules are + tied for specificity, the one occurring latest in the list is + taken. "Specificity" is outlined in + :func:`ScopeSpec.get_specificity`. + + Here's an example passwords file:: + + - default: true + password_3: "general-access-password" + + - agent_class: "FakeDataAgent" + instance_id: "!faker4,!faker1" + password_2: "normal-faker-password" + + - instance_id: "faker*" + password_2: "special-faker-password" + + - instance_id: "faker4" + password_2: "special-faker4-password" + + Suppose faker1 and faker4 both have agent_class "FakeDataAgent". + For level 2 access, "faker1" matches the rules at index 0 and 2, + but the most specific rule is 2 so that is used. For "faker4", + rules 0, 2, and 3 match. Rules 2 and 3 are equally specific but + rule 3 occurs later so it is used. + + """ + if isinstance(privs, str): + return privs + if privs is None: + privs = 1 + if not isinstance(privs, int): + raise ValueError("privs argument should be int or str.") + assert 1 <= privs <= 3 + + if privs == 1: + return '' + + if os.getenv('OCS_PASSWORDS_FILE'): + pw_file = os.getenv('OCS_PASSWORDS_FILE') + else: + pw_file = os.path.expanduser('~/.ocs-passwords.yaml') + + agent = AgentSpec(agent_class=agent_class, instance_id=instance_id) + + creds = yaml.safe_load(open(pw_file, 'rb')) + candidates = [] + for i, row in enumerate(creds): + scope_kw = {k: row.get(k) for k in ['default', 'agent_class', 'instance_id']} + scope = ScopeSpec(**scope_kw) + if scope.check(agent): + specificity = scope.get_specificity() + if 'password_2' in row and privs <= 2: + candidates.append((specificity, i, row['password_2'])) + elif 'password_3' in row and privs <= 3: + candidates.append((specificity, i, row['password_3'])) + if len(candidates) == 0: + return '' + return max(candidates)[2] + + +class ExclusiveAccessClient: + """Manager class for Exclusive Access grants. + + Args: + target: + instance_id of the Access Director agent (or an + OCSClient to use). + grantee: + Identifier for the requester (for consumer information). + grant_name: + The grant name, as defined in the grant config block. + password: + The password for the grant config block, if needed. + + The `acquire`, `renew` and `release` methods all return a tuple + with `(ok, detail)`, where `ok` is a boolean indicating that + things seem to have worked, and `detail` is the full "useful info" + result returned by the call to :func:`AccessDirector.request_exclusive + `. + See that function for details. + + """ + + def __init__(self, target: str, + grantee: str, grant_name: str, password: str = None): + if isinstance(target, str): + self._client = ocs_client.OCSClient(target) + else: + self._client = target + + self._gargs = { + 'grantee': grantee, + 'grant_name': grant_name, + 'password': password, + } + self.password = None + self.expire_at = None + self.last_response = {'error': 'no history'} + + def acquire(self, expire_at: float = None) -> Tuple[bool, dict]: + """Try to acquire the exclusive access grant. ``expire_at`` + is an optional unix timestamp to suggest as the grant expiry + time. + + If this succeeds, the access password, and timestamp at which + the grant will expire, are stored in ``self.password`` and + ``self.expire_at``. + + """ + if expire_at is None: + expire_at = time.time() + 100. + resp = self._client._client.special('request_exclusive', + action='acquire', + expire_at=expire_at, + **self._gargs) + self.last_response = resp + if 'error' in resp: + return False, resp + self.password = resp['password'] + self.expire_at = resp['expire_at'] + return True, resp + + def renew(self, expire_at: float = None) -> Tuple[bool, dict]: + """Renew the Exclusive Access grant.""" + if expire_at is None: + expire_at = time.time() + 100. + resp = self._client._client.special('request_exclusive', + action='renew', + expire_at=expire_at, + **self._gargs) + if 'error' in resp: + return False, resp + self.expire_at = resp['expire_at'] + return True, resp + + def release(self) -> Tuple[bool, dict]: + """Release the Exclusive Access grant.""" + resp = self._client._client.special('request_exclusive', + action='release', + **self._gargs) + self.password = None + self.expire_at = None + return 'error' not in resp, resp + +# +# Access Director Agent support +# + + +@dataclass +class GrantConfigItem: + + """A single entry from Access Director config file. + + """ + cred_level: Optional[CredLevel] + # For ScopeSpec... + default: Optional[bool] = False + agent_class: Optional[str] = None + instance_id: Optional[str] = None + # Levels to which this grant prevents regular access. + lockout_levels: Optional[List[CredLevel]] = field(default_factory=list) + + def get_scope_spec(self): + return ScopeSpec(default=self.default, + agent_class=self.agent_class, + instance_id=self.instance_id) + + +@dataclass +class DistributedAccessGrant: + """Class for decoding a "grant" block of Access Director config + file. + + """ + name: str + grants: List[GrantConfigItem] + password: Optional[str] = None + hash: Optional[str] = 'md5' + + +@dataclass +class AccessDirectorConfig: + passwords_block_default_hashfunc: str = 'none' + distrib_hashfunc: str = 'md5' + passwords: List[AccessPasswordItem] = field(default_factory=list) + exclusive_access_blocks: List[DistributedAccessGrant] = field(default_factory=list) + + +def director_parse_config(config: dict) -> dict: + """Parse a config file and return the config dict. This includes + some validation and some translation. + + The elements at the top level of the returned dict will become + attributes of the AD instance, so be careful what you add in + there. + + """ + # Do a bunch of validation ... + adc = AccessDirectorConfig(**config) + + out = { + '_hashname': adc.distrib_hashfunc, + '_hashfunc': HASHFUNCS[adc.distrib_hashfunc], + } + + def promote_pw(pw): + """Encode a password with the chosen default hash; unless it + is the empty password in which case just leave it unhashed. + + """ + if isinstance(pw, str): + if pw == '': + pw = HashedPass.create_free() + else: + pw = HashedPass(hash=adc.passwords_block_default_hashfunc, + value=pw) + if pw.hash != 'none': + return pw + if pw.hash == 'none' and pw.value == '': + return pw + return HashedPass.create_from_value(out['_hashname'], pw.value) + + rules = [] + for entry in adc.passwords: + scope = entry.get_scope_spec() + for k, level in CRED_KEYS: + if getattr(entry, k) is None: + continue + hashed = promote_pw(getattr(entry, k)) + rules.append(AccessRule( + hashed_pass=hashed, cred_level=level, scope_spec=scope)) + + out.update({ + '_rules': rules, + '_grant_blocks': adc.exclusive_access_blocks, + }) + return out + + +def director_get_access_rules( + grant_def: DistributedAccessGrant, + grant_owner: str) -> Tuple[str, List[AccessRule]]: + """Generate password for a grant block, and the corresponding + access rule blocks to be passed to agents for processing + credentials. + + Returns: + password + The password granting access. + rules + The list of AccessRule objects for distribution to agents. + + """ + pw = _get_random_string() + hashed_pass = HashedPass.create_from_value(grant_def.hash, pw) + + rules = [] + for grant in grant_def.grants: + scope = ScopeSpec(default=grant.default, + agent_class=grant.agent_class, + instance_id=grant.instance_id) + rule = AccessRule(scope_spec=scope, + cred_level=grant.cred_level, + hashed_pass=hashed_pass, + lockout_id=grant_def.name, + lockout_owner=grant_owner, + lockout_levels=grant.lockout_levels) + rules.append(rule) + + return pw, rules diff --git a/ocs/agents/access_director/__init__.py b/ocs/agents/access_director/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ocs/agents/access_director/agent.py b/ocs/agents/access_director/agent.py new file mode 100644 index 00000000..32bb9a7f --- /dev/null +++ b/ocs/agents/access_director/agent.py @@ -0,0 +1,313 @@ +from ocs import ocs_agent, site_config, access + +import argparse +import os +import time +import yaml + +from twisted.internet.defer import inlineCallbacks +from autobahn.twisted.util import sleep as dsleep + + +class AccessDirector: + """Agent for distributing Access Control information to all Agents + in a system. + + """ + + def __init__(self, agent, config_file): + self.agent = agent + self.agent.register_feed('controls', record=False) + self.log = agent.log + + self._registered = False + + self._config_file = config_file + self._requests = [] + self._active_grants = [] + + self._load_config() + + @ocs_agent.param('_') + @inlineCallbacks + def manager(self, session, params): + """manager() + + **Process** Update the main access control feed with new + access information. This occurs in response to agent queries, + or if new grants require updates to access. + + """ + session.set_status('running') + session.data = {} + + if not self._registered: + yield self.agent.register( + self.agent_poll, + f'{self.agent.agent_address}.agent_poll') + yield self.agent.register( + self.request_exclusive, + f'{self.agent.agent_address}.request_exclusive') + self._registered = True + + last_blast = 0 + + while session.status in ['running']: + yield dsleep(1) + now = time.time() + + # Expire grants? + keepers = [] + for _g in self._active_grants: + if _g.expire_at <= now: + self.log.info(f'Grant expiring: {_g.name}') + else: + keepers.append(_g) + if len(keepers) < len(self._active_grants): + self._active_grants = keepers + self._update_all() + + session.data['grants'] = [v.encode() for v in self._active_grants] + while len(self._requests): + r = self._requests.pop(0) + msg = {'reset': True, + 'ac_version': access.AC_VERSION, + 'rules': list(self._rules)} + + for _grant in self._active_grants: + msg['rules'].extend(_grant.rules) + + if r['type'] == 'reset': + last_blast = time.time() + elif r['type'] == 'single': + agent = access.AgentSpec( + agent_class=r['agent_class'], + instance_id=r['instance_id']) + subrules = access.agent_filter_rules(msg['rules'], agent) + msg = {'target': r['instance_id'], + 'ac_version': access.AC_VERSION, + 'rules': subrules} + + if 'rules' in msg: + msg['rules'] = [access.asdict(r) for r in msg['rules']] + + self.agent.publish_to_feed('controls', msg) + + if time.time() - last_blast > 60: + self._update_all() + + return True, 'Exited.' + + def _load_config(self): + # Read the file and convert to internal rep. + config_raw = yaml.safe_load(open(self._config_file, 'rb')) + config = access.director_parse_config(config_raw) + + # Transfer new config to self. + for k, v in config.items(): + setattr(self, k, v) + + # Request a compete update. + self._update_all() + + def _update_all(self): + self._requests.append({'type': 'reset'}) + + @ocs_agent.param('_') + @inlineCallbacks + def reload_config(self, session, params): + """reload_config() + + **Task** - Reload access config file. + + """ + yield self._load_config() + return True, 'Update requested.' + + @inlineCallbacks + def agent_poll(self, instance_id=None, agent_class=None): + """*Special access point.* This is used for agents to request + an announcement of their password rules on the control feed. + The instance_id and agent_class arguments must both be + specified. + + """ + self._requests.append({ + 'type': 'single', + 'instance_id': instance_id, + 'agent_class': agent_class, + }) + yield self.log.info(f'agent-poll received from {agent_class}:{instance_id}') + + def request_exclusive(self, grant_name=None, password=None, + action=None, expire_at=None, grantee=None, + strict=None): + """*Special access point.* Request, renew, or release an + exclusive access grant. + + Args: + grant_name (str): Name of the grant, to match an entry in + the "grant-blocks" section of the config file. + password (str): The password, to be checked against the + password specified in the grant block of the config. + action (str): One of "acquire", "renew" or "release". + expire_at (float): Unix timestamp for the desired expiry + time of the grant. + grantee (str): A string representing the client that has + requested the lock. When passed with "acquire", it is + stored for distribution to clients so they can explain who + has locked them out. + strict (bool): If True, reject all requests except acquire + when a grant is inactive, renew when a grant is active, + and release when a grant is active. + + Returns: + A dict with useful info. + + On error, the dict has only an entry "error" with an error + message in it. + + On success, the returned dict has at least the items + 'grant_name' (which matches the requested grant_name) and + 'message'; the 'message' is just "grant acquired" / "grant + renewed" / "grant released". + + Additionally, if the 'action' is 'acquire' or 'renew' then + the dict will include an entry 'expire_at' with the unix + timestamp that the grant will be cancelled automatically. + This timestamp may be earlier (but not later) than the time + requested with the expire_at parameter. + + If the action is 'acquire', then the dict also has an entry + 'password', containing the password client should use to + access the exclusive access targets for the duration of the + access grant. + + """ + if grant_name is None: + return {'error': 'No grant_name specified.'} + if strict is None: + strict = True + + for block in self._grant_blocks: + if block.name == grant_name: + break + else: + return {'error': f'Named grant not found ({grant_name})'} + if block.password is not None and block.password != password: + return {'error': f'Credential failed to access ({grant_name})'} + + # Does this grant already exist somewhere? + for grant_idx, g in enumerate(self._active_grants): + if g.name == grant_name: + break + else: + grant_idx = None + + if action == 'acquire': + if grant_idx is not None: + if strict: + return {'error': 'Grant is already held; release it first.'} + else: + self._active_grants.pop(grant_idx) + + # Generate passwords for this grant. + password, rules = access.director_get_access_rules(block, grantee) + new_grant = AccessGrant(grant_name, rules, expire_at, grantee) + self._active_grants.append(new_grant) + self._update_all() + self.log.info(f'Exclusive access granted: {grant_name}') + return {'message': 'grant acquired', 'password': password, + 'grant_name': grant_name, + 'expire_at': new_grant.expire_at} + + elif action == 'renew': + if grant_idx is None: + return {'error': 'Grant is not currently held; cannot renew.'} + self._active_grants[grant_idx].renew(expire_at) + return {'message': 'grant renewed', + 'grant_name': grant_name, + 'expire_at': self._active_grants[grant_idx].expire_at} + + elif action == 'release': + if grant_idx is None: + if strict: + return {'error': 'Grant is not currently held; cannot release.'} + else: + self._active_grants.pop(grant_idx) + self._update_all() + self.log.info(f'Exclusive access relinquished: {grant_name}') + return {'message': 'grant released', + 'grant_name': grant_name, + } + + @inlineCallbacks + def _simple_stop(self, session, params): + yield + if session.status not in ['stopping', 'done']: + session.set_status('stopping') + return True, 'Stop initiated.' + return False, 'Already done/stopping.' + + +class AccessGrant: + def __init__(self, name, rules, expire_at, grantee): + self.name = name + self.rules = rules + self.renew(expire_at) + self.grantee = grantee + + def renew(self, expire_at): + if expire_at is None: + expire_at = time.time() + 60 + self.expire_at = expire_at + + def encode(self): + return { + 'name': self.name, + 'expire_at': self.expire_at, + 'grantee': self.grantee, + 'rules': [access.asdict(r) for r in self.rules], + } + + +def make_parser(parser=None): + if parser is None: + parser = argparse.ArgumentParser() + pgroup = parser.add_argument_group('Agent Options') + pgroup.add_argument('--config-file', default=None, + help="AccessDirectory config file.") + return parser + + +def main(args=None): + parser = make_parser() + args = site_config.parse_args(agent_class='AccessDirector', + parser=parser, + args=args) + config_file = args.config_file + if config_file[0] != '/': + # Relative to SCF location. + config_file = os.path.join( + os.path.dirname(args.site_file), config_file) + + # Force the access_policy arg to "none", to prevent OCSAgent from + # stalling, trying to get access passwords from this instance ... + args.access_policy = 'none' + + agent, runner = ocs_agent.init_site_agent(args) + access_director = AccessDirector(agent, config_file) + + agent.register_process('manager', + access_director.manager, + access_director._simple_stop, + blocking=False, + startup=True) + agent.register_task('reload_config', + access_director.reload_config, + blocking=False) + runner.run(agent, auto_reconnect=True) + + +if __name__ == '__main__': + main() diff --git a/ocs/agents/fake_data/agent.py b/ocs/agents/fake_data/agent.py index b3929df0..3e247e0d 100644 --- a/ocs/agents/fake_data/agent.py +++ b/ocs/agents/fake_data/agent.py @@ -242,6 +242,10 @@ def delay_task(self, session, params): delay = params['delay'] succeed = params['succeed'] is True + if session.cred_level == 2 and delay == 3.33: + # For testing. + return False, 'Actually you need access level 3+ to delay for 3.33 seconds.' + session.data = {'requested_delay': delay, 'delay_so_far': 0} t0 = time.time() @@ -296,6 +300,13 @@ def main(args=None): agent, runner = ocs_agent.init_site_agent(args) + # If user specifies an Access Policy, we will make "delay_task" + # require "Advanced" access to help with testing. + if args.access_policy in [None, 'none', '']: + min_privs = 1 + else: + min_privs = 2 + fdata = FakeDataAgent(agent, num_channels=args.num_channels, sample_rate=args.sample_rate, @@ -306,7 +317,8 @@ def main(args=None): blocking=False, startup=startup) agent.register_task('set_heartbeat', fdata.set_heartbeat) agent.register_task('delay_task', fdata.delay_task, blocking=False, - aborter=fdata._abort_delay_task) + aborter=fdata._abort_delay_task, + min_privs=min_privs) runner.run(agent, auto_reconnect=True) diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index cc0ce9da..853a9384 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -70,6 +70,9 @@ def _get_local_instances(self): # Gather managed items from site config. for inst in hc.instances: + if 'instance-id' not in inst: + warnings.append('Ignoring an entry with no instance-id!') + continue if inst['instance-id'] in instances: warnings.append( f'Configuration problem, instance-id={inst["instance-id"]} ' diff --git a/ocs/client_http.py b/ocs/client_http.py index fa6c86f8..5d173384 100644 --- a/ocs/client_http.py +++ b/ocs/client_http.py @@ -98,3 +98,11 @@ def request(self, action, op_name, params={}, **kw): Tuple (status, message, session). """ return self.call(self.agent_addr + '.ops', action, op_name, params, **kw) + + def special(self, subaddr, *args, **kwargs): + """Execute a an arbitrary method associated with an Agent. + This is intended for use with special, centralized services, + such as to communicate with the Access Director. + + """ + return self.call(self.agent_addr + '.' + subaddr, *args, **kwargs) diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index 74032896..6a2c2f48 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -17,6 +17,7 @@ from autobahn.wamp.exception import ApplicationError, TransportLost from autobahn.exception import Disconnected from .ocs_twisted import in_reactor_context +from . import access import json import math @@ -99,6 +100,7 @@ def __init__(self, config, site_args, address=None, class_name=None): self.next_session_id = 0 self.session_archive = {} # by op_name, lists of OpSession. self.agent_address = address + self.instance_id = site_args.instance_id self.class_name = class_name self.registered = False self.log = txaio.make_logger() @@ -112,6 +114,20 @@ def __init__(self, config, site_args, address=None, class_name=None): self.realm_joined = False self.first_time_startup = True + # Access Control rules + self.access_director = None + self.access_config = access.agent_get_policy_default(site_args.access_policy) + if self.access_config.policy != 'none': + self.access_config.agent = \ + access.AgentSpec(instance_id=self.instance_id, + agent_class=self.class_name, + superuser_key=self) + if self.access_config.policy == 'director': + self.access_director = ( + site_args.address_root + '.' + self.access_config.director_id) + self.subscribe_on_start(self._access_handler, + self.access_director + '.feeds.controls') + # Attach the logger. log_dir, log_file = site_args.log_dir, None if log_dir is not None: @@ -135,6 +151,20 @@ def __init__(self, config, site_args, address=None, class_name=None): self.log.info('ocs: starting %s @ %s' % (str(self.__class__), address)) self.log.info('log_file is apparently %s' % (log_file)) + def _access_handler(self, _data): + """Handler for data from the Access Director agent. + + """ + message = _data[0] + ad_version = message.get('ac_version') + if ad_version != access.AC_VERSION: + self.log.error(f'Access Control version mismatch: director={ad_version}, ' + f'Agent={access.AC_VERSION}') + return + if message.get('reset') or message.get('target') == self.instance_id: + new_rules = [access.AccessRule(**item) for item in message['rules']] + self.access_config.rules = new_rules + @inlineCallbacks def _stop_all_running_sessions(self): """Stops all currently running sessions.""" @@ -256,11 +286,20 @@ def _subscribe_fail(*args, **kwargs): self.log.info('startup-op: launching %s' % op_name) if op_params is True: op_params = {} - self.start(op_name, op_params) + self.start(op_name, op_params, password=self) self.first_time_startup = False self.realm_joined = True + def _access_dir_fail(*args, **kwargs): + self.log.error('Failed to request access rules from access director ' + f'at {self.access_director}') + + if self.access_config.policy == 'director': + self.call(self.access_director + '.agent_poll', + instance_id=self.instance_id, agent_class=self.class_name)\ + .addErrback(_access_dir_fail) + @inlineCallbacks def onLeave(self, details): self.log.info('session left: {}'.format(details)) @@ -342,17 +381,17 @@ def encoded(self): 'processes': list(self.processes.keys()) } - def _ops_handler(self, action, op_name, params=None, timeout=None): + def _ops_handler(self, action, op_name, params=None, timeout=None, password=None): if action == 'start': - return self.start(op_name, params=params) + return self.start(op_name, params=params, password=password) if action == 'stop': - return self.stop(op_name, params=params) + return self.stop(op_name, params=params, password=password) if action == 'abort': - return self.abort(op_name, params=params) + return self.abort(op_name, params=params, password=password) if action == 'wait': - return self.wait(op_name, timeout=timeout) + return self.wait(op_name, timeout=timeout, password=password) if action == 'status': - return self.status(op_name) + return self.status(op_name, password=password) return (ocs.ERROR, 'No implementation for "%s"' % op_name, {}) def _gather_sessions(self, parent): @@ -413,6 +452,11 @@ def _management_handler(self, q, **kwargs): returned by :func:`_gather_sessions`. - 'tasks': The list of Task api description info, as returned by :func:`_gather_sessions`. + - 'access_control': if present value will be a positive + integer indicating the Access Control system version + supported by the instance (a.k.a., 1). If this isn't + present, passing "password" argument to API calls will + likely produce an error. Passing get_X will, for some values of X, return only that subset of the full API; treat that as deprecated. @@ -426,6 +470,7 @@ def _management_handler(self, q, **kwargs): 'feeds': [(k, v.encoded()) for k, v in self.feeds.items()], 'processes': self._gather_sessions(self.processes), 'tasks': self._gather_sessions(self.tasks), + 'access_control': access.AC_VERSION, } if q == 'get_tasks': return self._gather_sessions(self.tasks) @@ -437,7 +482,8 @@ def _management_handler(self, q, **kwargs): return self.class_name def register_task(self, name, func, aborter=None, blocking=True, - aborter_blocking=None, startup=False): + aborter_blocking=None, startup=False, + min_privs=0): """Register a Task for this agent. Args: @@ -460,6 +506,8 @@ def register_task(self, name, func, aborter=None, blocking=True, launched on startup. If the ``startup`` argument is a dictionary, this is passed to the Operation's start function. + min_privs (int): Minimum privilege level required to start + or abort this operation (1, 2, 3). See Access Control. Notes: @@ -473,13 +521,14 @@ def register_task(self, name, func, aborter=None, blocking=True, """ self.tasks[name] = AgentTask( func, blocking=blocking, aborter=aborter, - aborter_blocking=aborter_blocking) + aborter_blocking=aborter_blocking, + min_privs=min_privs) self.sessions[name] = None if startup is not False: self.startup_ops.append(('task', name, startup)) def register_process(self, name, start_func, stop_func, blocking=True, - stopper_blocking=None, startup=False): + stopper_blocking=None, startup=False, min_privs=0): """Register a Process for this agent. Args: @@ -502,6 +551,8 @@ def register_process(self, name, start_func, stop_func, blocking=True, launched on startup. If the ``startup`` argument is a dictionary, this is passed to the Operation's start function. + min_privs (int): Minimum privilege level required to start + or stop this operation (1, 2, 3). See Access Control. Notes: The functions start_func and stop_func will be called with @@ -514,7 +565,8 @@ def register_process(self, name, start_func, stop_func, blocking=True, """ self.processes[name] = AgentProcess( start_func, stop_func, blocking=blocking, - stopper_blocking=stopper_blocking) + stopper_blocking=stopper_blocking, + min_privs=min_privs) self.sessions[name] = None if startup is not False: self.startup_ops.append(('process', name, startup)) @@ -707,7 +759,7 @@ def _handle_task_error(self, *args, **kw): Agent's Operations. Some methods are valid on Processs, some on Tasks, and some on both.""" - def start(self, op_name, params=None): + def start(self, op_name, params=None, password=None): """ Launch an operation. Note that successful return of this function does not mean that the operation is running; it only means @@ -746,6 +798,17 @@ def start(self, op_name, params=None): op = self.processes[op_name] msg = 'Started process "%s".' % op_name + # Check access. + actx = access.ActionContext(op_name=op_name, action='start') + cred_level, detail = access.agent_get_creds( + password, self.access_config, self.access_config.agent, actx) + if cred_level < op.min_privs: + self.log.info('Rejected underprivileged "start" request.') + return (ocs.ERROR, + access.agent_rejection_message( + cred_level, op.min_privs, detail), + {}) + # Pre-process params? if hasattr(op.launcher, '_ocs_prescreen'): try: @@ -759,7 +822,8 @@ def start(self, op_name, params=None): return (ocs.ERROR, f'CRASH: during param pre-processing: {str(err)}', {}) # Mark as started. - session = OpSession(self.next_session_id, op_name, app=self) + session = OpSession(self.next_session_id, op_name, app=self, + cred_level=cred_level.value) self.next_session_id += 1 self.sessions[op_name] = session @@ -775,7 +839,7 @@ def start(self, op_name, params=None): return (ocs.ERROR, 'No task or process called "%s"' % op_name, {}) @inlineCallbacks - def wait(self, op_name, timeout=None): + def wait(self, op_name, timeout=None, password=None): """Wait for the specified Operation to become idle, or for timeout seconds to elapse. If timeout==None, the timeout is disabled and the function will not return until the Operation @@ -840,7 +904,7 @@ def wait(self, op_name, timeout=None): return (ocs.TIMEOUT, 'Operation "%s" still running; wait timed out.' % op_name, session.encoded()) - def _stop_helper(self, stop_type, op_name, params): + def _stop_helper(self, stop_type, op_name, params, password): """Common stopper/aborter code for Process stop and Task abort. @@ -849,6 +913,7 @@ def _stop_helper(self, stop_type, op_name, params): op_name (str): the op_name. params (dict or None): Params to be passed to stopper function. + password: Password of the client. """ print(f'{stop_type} called for {op_name}') @@ -873,6 +938,14 @@ def _stop_helper(self, stop_type, op_name, params): return (ocs.ERROR, f'Cannot "{stop_type}" "{op_name}" because ' 'it is a "{op_type}".', {}) + actx = access.ActionContext(op_name=op_name, action='stop') + cred_level, detail = access.agent_get_creds( + password, self.access_config, self.access_config.agent, actx) + if cred_level < op.min_privs: + return (ocs.ERROR, + access.agent_rejection_message(cred_level, op.min_privs), + {}) + session = self.sessions.get(op_name) if session is None: return (ocs.ERROR, 'No session active.', {}) @@ -921,7 +994,7 @@ def _errback(*args, **kw): return (ocs.OK, f'Requested {stop_type} on {op_type} {op_name}".', session.encoded()) - def stop(self, op_name, params=None): + def stop(self, op_name, params=None, password=None): """ Initiate a Process stop routine. @@ -934,9 +1007,9 @@ def stop(self, op_name, params=None): ocs.OK: the Process stop routine has been launched. """ - return self._stop_helper('stop', op_name, params) + return self._stop_helper('stop', op_name, params, password) - def abort(self, op_name, params=None): + def abort(self, op_name, params=None, password=None): """ Initiate a Task abort routine. @@ -950,9 +1023,9 @@ def abort(self, op_name, params=None): ocs.OK: the Process stop routine has been launched. """ - return self._stop_helper('abort', op_name, params) + return self._stop_helper('abort', op_name, params, password) - def status(self, op_name, params=None): + def status(self, op_name, params=None, password=None): """ Get an Operation's session data. @@ -997,7 +1070,7 @@ def _running_wrapper(session, params): class AgentTask(AgentOp): def __init__(self, launcher, blocking=None, aborter=None, - aborter_blocking=None): + aborter_blocking=None, min_privs=0): self.launcher = launcher self.blocking = blocking self.aborter = aborter @@ -1005,6 +1078,7 @@ def __init__(self, launcher, blocking=None, aborter=None, aborter_blocking = blocking self.aborter_blocking = aborter_blocking self.docstring = launcher.__doc__ + self.min_privs = access.CredLevel(max(1, min_privs)) def encoded(self): """Dict of static info for API self-description.""" @@ -1013,11 +1087,13 @@ def encoded(self): 'abortable': (self.aborter is not None), 'docstring': self.docstring, 'op_type': 'task', + 'min_privs': self.min_privs.value, } class AgentProcess(AgentOp): - def __init__(self, launcher, stopper, blocking=None, stopper_blocking=None): + def __init__(self, launcher, stopper, blocking=None, stopper_blocking=None, + min_privs=0): self.launcher = launcher self.stopper = stopper self.blocking = blocking @@ -1025,6 +1101,7 @@ def __init__(self, launcher, stopper, blocking=None, stopper_blocking=None): stopper_blocking = blocking self.stopper_blocking = stopper_blocking self.docstring = launcher.__doc__ + self.min_privs = access.CredLevel(max(1, min_privs)) def encoded(self): """Dict of static info for API self-description.""" @@ -1032,6 +1109,7 @@ def encoded(self): 'blocking': self.blocking, 'docstring': self.docstring, 'op_type': 'process', + 'min_privs': self.min_privs.value, } @@ -1072,7 +1150,7 @@ class OpSession: """ def __init__(self, session_id, op_name, status='starting', - app=None, purge_policy=None): + app=None, purge_policy=None, cred_level=1): # Note that some data members are used internally, while others are # communicated over WAMP to Agent control clients. @@ -1086,6 +1164,7 @@ def __init__(self, session_id, op_name, status='starting', self.app = app self.success = None self.status = None + self.cred_level = cred_level # This has to be the last call since it depends on init... self.set_status(status, timestamp=self.start_time) @@ -1132,6 +1211,9 @@ def encoded(self): op_code : int The OpCode, which combines information from status, success, and degraded; see :class:`ocs.base.OpCode`. + cred_level: int + The Credential Level (see Access Control) with which the + operation was started. status : str The Operation run status (e.g. 'starting', 'done', ...). See :data:`ocs.ocs_agent.SESSION_STATUS_CODES`. @@ -1214,6 +1296,7 @@ def json_safe(data, check_ok=False): return {'session_id': self.session_id, 'op_name': self.op_name, 'op_code': self.op_code.value, + 'cred_level': self.cred_level, 'status': self.status, 'degraded': self.degraded, 'success': self.success, diff --git a/ocs/ocs_client.py b/ocs/ocs_client.py index ed229e6e..406d03ee 100644 --- a/ocs/ocs_client.py +++ b/ocs/ocs_client.py @@ -2,10 +2,13 @@ import time import ocs -from ocs import site_config +from ocs import site_config, access +import logging +logger = logging.getLogger(__name__) -def _get_op(op_type, name, encoded, client): + +def _get_op(op_type, name, encoded, client, password): """Factory for generating matched operations. This will make sure op.start's docstring is the docstring of the operation. @@ -16,21 +19,33 @@ def _get_op(op_type, name, encoded, client): :class:`ocs.ocs_agent.AgentProcess` dictionary. client (ControlClient): Client object, which will be used to issue the requests for operation actions. + password (str): Client-supplied password, or None to not use a + password. """ + # It's important to not pass password='', if the agent doesn't + # support password API. + feature_kw = {} + if password is not None and password != '': + feature_kw['password'] = password + class MatchedOp: def start(self, **kwargs): - return OCSReply(*client.request('start', name, params=kwargs)) + return OCSReply(*client.request('start', name, params=kwargs, + **feature_kw)) def wait(self, timeout=None): - return OCSReply(*client.request('wait', name, timeout=timeout)) + return OCSReply(*client.request('wait', name, timeout=timeout, + **feature_kw)) def status(self): - return OCSReply(*client.request('status', name)) + return OCSReply(*client.request('status', name, + **feature_kw)) class MatchedTask(MatchedOp): def abort(self): - return OCSReply(*client.request('abort', name)) + return OCSReply(*client.request('abort', name, + **feature_kw)) def __call__(self, **kw): """Runs self.start(**kw) and, if that succeeds, self.wait().""" @@ -41,7 +56,8 @@ def __call__(self, **kw): class MatchedProcess(MatchedOp): def stop(self): - return OCSReply(*client.request('stop', name)) + return OCSReply(*client.request('stop', name, + **feature_kw)) def __call__(self): """Equivalent to self.status().""" @@ -83,7 +99,7 @@ class OCSClient: """ - def __init__(self, instance_id, **kwargs): + def __init__(self, instance_id, privs=None, **kwargs): """ Args: instance_id (str): Instance id for agent to run @@ -104,13 +120,24 @@ def __init__(self, instance_id, **kwargs): self.instance_id = instance_id self._api = self._client.get_api() + # If target agent doesn't support AC, don't pass password arg. + use_password = (self._api.get('access_control') is not None) + + self._password = access.client_get_password( + privs, self._api['agent_class'], instance_id) + + if self._password not in [None, ''] and not use_password: + logger.warning('WARNING: client wants to pass a password to ' + 'agent instance, but it does not support passwords.') + self._password = None + for name, _, encoded in self._api['tasks']: setattr(self, _opname_to_attr(name), - _get_op('task', name, encoded, self._client)) + _get_op('task', name, encoded, self._client, self._password)) for name, _, encoded in self._api['processes']: setattr(self, _opname_to_attr(name), - _get_op('process', name, encoded, self._client)) + _get_op('process', name, encoded, self._client, self._password)) def __repr__(self): return f"OCSClient('{self.instance_id}')" diff --git a/ocs/ocsbow.py b/ocs/ocsbow.py index afc60b93..1adc970d 100644 --- a/ocs/ocsbow.py +++ b/ocs/ocsbow.py @@ -1127,7 +1127,7 @@ def eligible(subsys): if any([soln == 'process' for soln, text in supports.analysis]): hm = supports.host_manager['ctrl'] - hm.agent_control('start', ['all']) + hm.agent_control('up', ['all']) time.sleep(2) elif action == 'stop': diff --git a/ocs/plugin.py b/ocs/plugin.py index 11bad958..18f87906 100644 --- a/ocs/plugin.py +++ b/ocs/plugin.py @@ -6,4 +6,5 @@ 'FakeDataAgent': {'module': 'ocs.agents.fake_data.agent', 'entry_point': 'main'}, 'InfluxDBAgent': {'module': 'ocs.agents.influxdb_publisher.agent', 'entry_point': 'main'}, 'BarebonesAgent': {'module': 'ocs.agents.barebones.agent', 'entry_point': 'main'}, + 'AccessDirector': {'module': 'ocs.agents.access_director.agent', 'entry_point': 'main'}, } diff --git a/ocs/site_config.py b/ocs/site_config.py index 8e517706..6cb70160 100644 --- a/ocs/site_config.py +++ b/ocs/site_config.py @@ -188,6 +188,12 @@ def from_dict(cls, data, parent=None): ``observatory`` or ``detlab``. (Command line override: ``--address-root``.) + ``access_policy`` (optional): Specify the default access + policy for all agents. To tell all agents to contact an + Access Director Agent with instance_id ``access-dir``, + set this to the string "director:access-dir". (Command + line override: ``--access-policy``.) + """ self = cls() self.parent = parent @@ -425,6 +431,9 @@ def add_arguments(parser=None): Length of time in seconds that the Agent will try to reconnect to the crossbar server before shutting down. + ``--access-policy=...``: + Override the default Access Control policy. + """ if parser is None: parser = argparse.ArgumentParser() @@ -451,6 +460,7 @@ def add_arguments(parser=None): group.add_argument('--crossbar-timeout', type=int, help="Length of time in seconds " "that the Agent will try to reconnect to the crossbar server before " "shutting down. Note this is set per Agent in an instance's arguments list.") + group.add_argument('--access-policy', help="Override Access Control policy.") return parser @@ -575,10 +585,14 @@ def add_site_attributes(args, site, host=None): args.site_realm = site.hub.data['wamp_realm'] if args.address_root is None: args.address_root = site.hub.data['address_root'] + if args.access_policy is None: + args.access_policy = site.hub.data.get('access_policy') if (args.log_dir is None) and (host is not None): args.log_dir = host.log_dir if (args.crossbar_timeout is None) and (host is not None): args.crossbar_timeout = host.crossbar_timeout + if args.site_file is None: + args.site_file = site.source_file @deprecation.deprecated(deprecated_in='v0.6.0', diff --git a/ocs/testing.py b/ocs/testing.py index 7d7f9bb3..0f8c69b3 100644 --- a/ocs/testing.py +++ b/ocs/testing.py @@ -155,7 +155,7 @@ def run_agent(cov): return run_agent -def create_client_fixture(instance_id, timeout=30): +def create_client_fixture(instance_id, timeout=30, privs=None): """Create the fixture that provides tests a Client object. Parameters: @@ -165,6 +165,7 @@ def create_client_fixture(instance_id, timeout=30): between attempts. This is useful if it takes some time for the Agent to start accepting connections, which varies depending on the Agent. + privs (str or int): privs argument for OCSClient constructor. """ @pytest.fixture() @@ -176,7 +177,7 @@ def client_fixture(): while attempts < timeout: try: - client = OCSClient(instance_id) + client = OCSClient(instance_id, privs=privs) return client except RuntimeError as e: print(f"Caught error: {e}") diff --git a/requirements.txt b/requirements.txt index 12d7c960..6b2f51c4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ deprecation PyYAML importlib_metadata setproctitle +pydantic # InfluxDB Publisher influxdb diff --git a/tests/agents/util.py b/tests/agents/util.py index 9e931855..7634781e 100644 --- a/tests/agents/util.py +++ b/tests/agents/util.py @@ -8,15 +8,19 @@ txaio.use_twisted() -def create_agent_fixture(agent_class, agent_kwargs={}): +def create_agent_fixture(agent_class, agent_kwargs={}, site_args={}): """Create a fixture for a given Agent.""" @pytest.fixture def agent(): - site_args = mock.MagicMock() - site_args.log_dir = '/tmp/' + _site_args = mock.MagicMock() + _site_args.log_dir = '/tmp/' + _site_args.access_policy = None + for k, v in site_args.items(): + setattr(_site_args, k, v) + config = mock.MagicMock() - mock_agent = OCSAgent(config, site_args) + mock_agent = OCSAgent(config, _site_args) log = txaio.make_logger() txaio.start_logging(level='debug') diff --git a/tests/default.yaml b/tests/default.yaml index f5643426..b3cbd592 100644 --- a/tests/default.yaml +++ b/tests/default.yaml @@ -11,6 +11,9 @@ hosts: localhost: { 'log-dir': './log/', 'agent-instances': [ + {'agent-class': 'AccessDirector', + 'instance-id': 'access-dir', + 'arguments': ['--config-file', 'ocs-access.yaml']}, {'agent-class': 'HostManager', 'instance-id': 'host-manager-1', 'arguments': []}, @@ -21,6 +24,10 @@ hosts: '--num-channels', '16', '--sample-rate', '5', '--frame-length', '10']}, + {'agent-class': 'FakeDataAgent', + 'instance-id': 'fake-data-access-dir', + 'manage': 'host/down', + 'arguments': ['--access-policy', 'director:access-dir']}, {'agent-class': 'RegistryAgent', 'instance-id': 'registry', 'manage': 'host/up', diff --git a/tests/integration/test_access_integration.py b/tests/integration/test_access_integration.py new file mode 100644 index 00000000..19dc87d2 --- /dev/null +++ b/tests/integration/test_access_integration.py @@ -0,0 +1,145 @@ +import pytest +import time + +import ocs +from ocs.base import OpCode +from ocs import access, ocs_client + +from ocs.testing import ( + create_agent_runner_fixture, + create_client_fixture, +) + +from integration.util import ( + create_crossbar_fixture, +) +from integration.util import docker_compose_file # noqa: F401 + + +wait_for_crossbar = create_crossbar_fixture() + +# For debugging. +_LOG_ARGS = [] # ["--log-dir", "./logs"] + +# Tests in this module make use of two agent instances: +FAKER_ID = 'fake-data-access-dir' +ADIR_ID = 'access-dir' + +FAKER_PATH = '../ocs/agents/fake_data/agent.py' +ADIR_PATH = '../ocs/agents/access_director/agent.py' + + +def create_fakedata_fixture(access_policy=None): + args = ['--instance-id', FAKER_ID] + _LOG_ARGS + if access_policy: + args += ['--access-policy', access_policy] + return create_agent_runner_fixture( + FAKER_PATH, 'accessdir_override', args) + + +# Different ways to launch the faker. +run_fakedata_nodir = create_fakedata_fixture(access_policy='none') +run_fakedata_override = create_fakedata_fixture(access_policy='override:a,b') +run_fakedata_accessdir = create_fakedata_fixture() + +# Clients to the faker. +client_no_privs = create_client_fixture(FAKER_ID) +client_bad_privs = create_client_fixture(FAKER_ID, privs='xyz') +client_good_privs = create_client_fixture(FAKER_ID, privs='a') +client_super_user = create_client_fixture(FAKER_ID, privs='superuser') + +# Access Director agent and client +run_access_director = create_agent_runner_fixture( + ADIR_PATH, ADIR_ID, _LOG_ARGS) +client_adir = create_client_fixture(ADIR_ID) + + +@pytest.mark.integtest +def test_access_lockout(wait_for_crossbar, + run_fakedata_accessdir, + client_good_privs): + """Test that when an agent is told to take credentials from the + AccessDirector, it rejects all requests if the Access Director is + not online. + + """ + resp = client_good_privs.delay_task.start(delay=0.01) + assert resp.status == ocs.ERROR + + resp = client_good_privs.acq.start() + assert resp.status == ocs.ERROR + + +@pytest.fixture +def setup_access_system(wait_for_crossbar, + run_access_director, + run_fakedata_accessdir): + """Launch crossbar, Access Director, and FakeData agent; wait a + couple of seconds for them to synchronize. + + """ + time.sleep(2) + + +@pytest.mark.integtest +def test_access_director(setup_access_system, + client_good_privs, + client_bad_privs, + client_no_privs): + """Test that agent receives credentials from Access Director and + that the programmed passwords work. + + """ + # This should work tho... + resp = client_bad_privs.acq.start() + assert resp.status == ocs.OK + + # Without password, start should fail immediately. + resp = client_no_privs.delay_task.start(delay=0.01) + assert resp.status == ocs.ERROR + + # With password, should succeed. + resp = client_good_privs.delay_task(delay=0.01) + assert resp.status == ocs.OK + time.sleep(1) + assert resp.session['op_code'] == OpCode.SUCCEEDED.value + + +@pytest.mark.integtest +def test_exclusive_access(setup_access_system, + client_adir, + client_good_privs, + client_super_user): + """Test that exclusive access system locks out normal clients. + + """ + # Obtain exclusive access to the "test-grant". + eac = access.ExclusiveAccessClient(client_adir, 'test-grantee', 'test-grant') + ok, info = eac.acquire() + assert ok + time.sleep(1) + + # Confirm old passwords don't work + resp = client_good_privs.delay_task.start(delay=0.1) + assert resp.status == ocs.ERROR + + # ... but excl-access special password works. + client_exac = ocs_client.OCSClient(FAKER_ID, privs=info['password']) + resp = client_exac.delay_task.start(delay=0.01) + assert resp.status == ocs.OK + client_exac.delay_task.wait() + + # ... and super-user still works. + resp = client_super_user.delay_task.start(delay=0.01) + assert resp.status == ocs.OK + client_super_user.delay_task.wait() + + # Release the lock. + ok, info = eac.release() + assert ok + time.sleep(1) + + # And confirm old behavior. + resp = client_good_privs.delay_task.start(delay=0.1) + assert resp.status == ocs.OK + client_good_privs.delay_task.wait() diff --git a/tests/integration/test_fake_data_agent_integration.py b/tests/integration/test_fake_data_agent_integration.py index aab999ac..78f17cb5 100644 --- a/tests/integration/test_fake_data_agent_integration.py +++ b/tests/integration/test_fake_data_agent_integration.py @@ -14,11 +14,12 @@ from integration.util import docker_compose_file # noqa: F401 AGENT_PATH = '../ocs/agents/fake_data/agent.py' +AGENT_ID = 'fake-data-local' wait_for_crossbar = create_crossbar_fixture() run_agent = create_agent_runner_fixture( - AGENT_PATH, 'fake_data') -client = create_client_fixture('fake-data-local') + AGENT_PATH, 'fake_data', ['--instance-id', AGENT_ID]) +client = create_client_fixture(AGENT_ID) @pytest.mark.integtest @@ -51,7 +52,7 @@ def test_fake_data_agent_acq(wait_for_crossbar, run_agent, client): run_agent_acq = create_agent_runner_fixture( AGENT_PATH, 'fake_data', - args=['--mode', 'acq']) + args=['--instance-id', AGENT_ID, '--mode', 'acq']) @pytest.mark.integtest diff --git a/tests/ocs-access.yaml b/tests/ocs-access.yaml new file mode 100644 index 00000000..f27118eb --- /dev/null +++ b/tests/ocs-access.yaml @@ -0,0 +1,15 @@ +passwords: + - default: true + password_1: "" + password_2: "a" + password_4: "superuser" + +distrib_hashfunc: md5 + +exclusive_access_blocks: + - name: test-grant + grants: + - agent_class: "FakeDataAgent" + instance_id: "fake-data*,!fake-data-orama" + cred_level: 2 + lockout_levels: [1,2,3] diff --git a/tests/test_access.py b/tests/test_access.py new file mode 100644 index 00000000..46b69867 --- /dev/null +++ b/tests/test_access.py @@ -0,0 +1,277 @@ +import pytest +import time +import yaml + +from unittest import mock + +from ocs import access + + +def test_access_enums(): + """Check ordering and stringification of access enums.""" + access.agent_rejection_message(access.CredLevel(1), + access.CredLevel(2)) + assert access.CredLevel(1) == access.CredLevel(1) + assert access.CredLevel(1) < access.CredLevel(2) + + +def test_access_policy(): + """Check that agent_get_creds produces correct CredLevel for + typical policy requests. + + """ + agent = access.AgentSpec('my-id', 'my-class') + + rules = access.agent_get_policy_default('director:') + assert access.agent_get_creds('', rules, agent)[0].value == 0 + + rules = access.agent_get_policy_default('override:a,b') + assert access.agent_get_creds('', rules, agent)[0].value == 1 + assert access.agent_get_creds('a', rules, agent)[0].value == 2 + assert access.agent_get_creds('b', rules, agent)[0].value == 3 + + with pytest.raises(ValueError): + rules = access.agent_get_policy_default('invalid:') + + +def test_agent_get_creds(): + """Check that agent_get_creds does things correctly.""" + + def make_pw(pw, hash='none'): + pw = access.HASHFUNCS[hash](pw) + return {'hash': hash, 'value': pw} + + def test(pw, rules, agent, level): + arules = access.AgentAccessRules(rules=rules) + assert access.agent_get_creds(pw, arules, agent)[0].value == level + + agent = access.AgentSpec(agent_class='my-class', + instance_id='my-instance') + + # No rules -> reject. + test('', [], agent, 0) + + # Any password will match empty password. + rules = [ + {'cred_level': 2, + 'hashed_pass': make_pw('')}] + test('', rules, agent, 2) + test('anything', rules, agent, 2) + test(None, rules, agent, 2) + + # Even if it matches other things + rules = [ + {'cred_level': 1, + 'hashed_pass': make_pw('anything', 'md5')}, + {'cred_level': 2, + 'hashed_pass': make_pw('')}, + ] + test('', rules, agent, 2) + test('anything', rules, agent, 2) + test(None, rules, agent, 2) + + # Match this agent + rules = [ + {'cred_level': 1, + 'hashed_pass': make_pw('')}, + {'cred_level': 2, + 'hashed_pass': make_pw('my-pw2'), + 'scope_spec': {'agent_class': 'my-class'}}, + {'cred_level': 3, + 'hashed_pass': make_pw('my-pw3'), + 'scope_spec': {'instance_id': 'my-instance'}}, + ] + test('', rules, agent, 1) + test('my-pw2', rules, agent, 2) + test('my-pw3', rules, agent, 3) + + # Don't match other agents. + rules = [ + {'cred_level': 1, + 'hashed_pass': make_pw('my-pw')}, + {'cred_level': 2, + 'hashed_pass': make_pw('my-pw'), + 'scope_spec': {'agent_class': 'other-class'}}, + {'cred_level': 3, + 'hashed_pass': make_pw('my-pw'), + 'scope_spec': {'instance_id': 'other-instance'}}, + ] + test('', rules, agent, 0) + test('my-pw', rules, agent, 1) + + +def test_pattern_matching(): + def should_succeed(pattern, targets): + for target in targets: + assert access.pattern_match(target, pattern) + + def should_fail(pattern, targets): + for target in targets: + assert not access.pattern_match(target, pattern) + + pat = 'HWP*,Fake*,!HWPSuper,!FakeSerpent' + should_succeed(pat, ['HWPxyz', 'FakeBlomp', 'HWPSuperX']) + should_fail(pat, ['HWPSuper', 'FakeSerpent', 'NormalSerpent']) + + +def test_access_hashfuncs(): + """Check each hash function value stability.""" + # hashfunc test cases + assert access.HASHFUNCS['none']('a') == 'a' + assert len(access.HASHFUNCS['md5']('a')) > 1 + + # hashfunc mapping + agent = access.AgentSpec('my-id', 'my-class') + bad_rules = access.AgentAccessRules( + policy='test', + rules=[ + access.AccessRule(cred_level=access.CredLevel(2), + hashed_pass=access.HashedPass(hash='invalid', + value='0x0x')), + access.AccessRule(cred_level=access.CredLevel(1), + hashed_pass=access.HashedPass.create_free()) + ]) + with pytest.raises(KeyError): + access.agent_get_creds('a', bad_rules, agent) + + +@pytest.fixture(scope='session') +def password_file(tmp_path_factory): + """Write a password file.""" + fn = tmp_path_factory.mktemp('ocs') / 'passwords.yaml' + yaml.dump([ + {'default': True, + 'password_2': 'two'}, + {'instance_id': 'test-agent1', + 'password_2': 'ta-two', + 'password_3': 'ta-three'}, + {'agent_class': 'TestAgent', + 'password_2': 'TA-two'}, + {'agent_class': '!NormalAgent', + 'password_2': 'spec-test'}, + ], fn.open('w')) + return fn + + +def test_access_client_get_password(password_file): + """Check that client_get_password parses the password_file data + appropriately. + + """ + with mock.patch('os.getenv', mock.Mock(return_value=str(password_file))): + # Fall through to default + assert access.client_get_password(2, 'NormalAgent', 'I') == 'two' + + # Agent / instance matching + assert access.client_get_password(2, 'TestAgent', 'test-agent1') == 'ta-two' + assert access.client_get_password(3, 'TestAgent', 'test-agent1') == 'ta-three' + + assert access.client_get_password(2, 'OtherAgent', 'other-agent-x') == 'spec-test' + + # No-match + assert access.client_get_password(3, 'A', 'I') == '' + + # Passing strings just returns the string + assert access.client_get_password('x', 'A', 'I') == 'x' + + +def test_director_parse_config(): + config_raw = { + 'distrib_hashfunc': 'md5', + 'passwords': [ + {'default': True, + 'password_3': "test"}, + {'instance_id': 'faker4', + 'password_2': {'hash': 'none', + 'value': 'blech'}}], + 'exclusive_access_blocks': [ + {'name': 'fake-subsystem', + 'password': 'lockout-test', + 'grants': [ + {'instance_id': 'faker4', + 'cred_level': 3, + 'lockout_levels': [1, 2, 3]}, + ]} + ], + } + + config = access.director_parse_config(config_raw) + assert config['_rules'][1].hashed_pass.hash == 'md5' + + +def test_lockouts(): + dag = access.DistributedAccessGrant( + name='test-lockout', + grants=[ + {'instance_id': 'my-instance', + 'lockout_levels': [1, 2, 3], + 'cred_level': 3, + }, + {'agent_class': 'my-class', + 'lockout_levels': [2, 3], + 'cred_level': 2, + }, + {'default': True, + 'cred_level': 1, + }, + ]) + pw, rules = access.director_get_access_rules(dag, 'this-owner') + assert (pw != '') + + agent = access.AgentSpec(instance_id='my-instance', + agent_class='my-class') + related_agent = access.AgentSpec(instance_id='other-instance', + agent_class='my-class') + + arules = access.AgentAccessRules(rules=rules) + assert access.agent_get_creds(pw, arules, agent)[0].value == 3 + assert access.agent_get_creds(pw, arules, related_agent)[0].value == 2 + + assert access.agent_get_creds('not-' + pw, arules, agent)[0].value == 0 + + # What if we had existing permissions? + rules0 = [access.AccessRule(scope_spec={'default': True}, cred_level=3, + hashed_pass={'hash': 'none', 'value': 'backdoor'}), + access.AccessRule(scope_spec={'default': True}, cred_level=4, + hashed_pass={'hash': 'none', 'value': 'superu'})] + arules = access.AgentAccessRules(rules=rules0 + rules) + # This gets blocked by lockout_levels. + assert access.agent_get_creds('backdoor', arules, agent)[0].value == 0 + # But super-user cannot be blocked. + assert access.agent_get_creds('superu', arules, agent)[0].value == 4 + + # If not my-instance, then access is only partially blocked... + assert access.agent_get_creds('backdoor', arules, related_agent)[0].value == 1 + + +def test_exclusive_access_grant_helper(): + def fake_accessdir(special, action=None, grant_name=None, **kwargs): + if action == 'acquire': + return { + 'grant_name': grant_name, + 'message': 'grant acquired', + 'password': 'abcdefg', + 'expire_at': time.time() + 5, + } + elif action == 'renew': + return { + 'grant_name': grant_name, + 'message': 'grant renewed', + 'expire_at': time.time() + 5, + } + elif action == 'release': + return { + 'grant_name': grant_name, + 'message': 'grant released', + } + + client = mock.MagicMock() + client._client.special = fake_accessdir + eag = access.ExclusiveAccessClient(client, 'test-func', 'fake-subsystem', + password='lockout-test') + ok, detail = eag.acquire() + assert ok + ok, detail = eag.renew() + assert ok + ok, detail = eag.release() + assert ok diff --git a/tests/test_ocs_agent.py b/tests/test_ocs_agent.py index 53cc3d21..3c72e1a8 100644 --- a/tests/test_ocs_agent.py +++ b/tests/test_ocs_agent.py @@ -58,6 +58,7 @@ def mock_agent(): mock_site_args = MagicMock() mock_site_args.working_dir = "./" mock_site_args.log_dir = "./" + mock_site_args.access_policy = None a = OCSAgent(mock_config, mock_site_args, address='test.address') return a diff --git a/tests/test_ocs_client.py b/tests/test_ocs_client.py index ffdfdda7..9d20643b 100644 --- a/tests/test_ocs_client.py +++ b/tests/test_ocs_client.py @@ -19,6 +19,8 @@ mocked_client = MagicMock() mock_from_yaml = MagicMock() +DUMMY_PASS = 'test-password' + @patch('ocs.client_http.ControlClient', mocked_client) @patch('ocs.site_config.SiteConfig.from_yaml', mock_from_yaml) @@ -65,7 +67,7 @@ def test_opname_to_attr(input_, expected): class TestGetOp: def test_invalid_op_type(self): with pytest.raises(ValueError): - _get_op('not_valid', 'name', MagicMock(), MagicMock()) + _get_op('not_valid', 'name', MagicMock(), MagicMock(), '') def mock_client(self, session_name, response_code): """Mock a ControlClient object that has a predefined request response for @@ -88,7 +90,8 @@ def mock_client(self, session_name, response_code): return client - def _client_operation(self, op_type, op_name, response_code=ocs.OK): + def _client_operation(self, op_type, op_name, response_code=ocs.OK, + password=DUMMY_PASS): """Build a mocked client, and get an Operation for it, returning both. @@ -96,7 +99,7 @@ def _client_operation(self, op_type, op_name, response_code=ocs.OK): client = self.mock_client(op_name, response_code) encoded_task = {'blocking': True, 'docstring': 'Example docstring'} - task = _get_op(op_type, op_name, encoded_task, client) + task = _get_op(op_type, op_name, encoded_task, client, password) return (client, task) @@ -108,47 +111,64 @@ def client_task(self): def client_process(self): return self._client_operation('process', 'process_name') + @pytest.fixture + def client_process_nopass(self): + return self._client_operation('process', 'process_name', password='') + def test_task_abort(self, client_task): client, task = client_task print(task.abort()) - client.request.assert_called_with('abort', 'task_name') + client.request.assert_called_with('abort', 'task_name', password=DUMMY_PASS) def test_task_start(self, client_task): client, task = client_task print(task.start()) assert task.start.__doc__ == 'Example docstring' - client.request.assert_called_with('start', 'task_name', params={}) + client.request.assert_called_with('start', 'task_name', params={}, + password=DUMMY_PASS) def test_task_wait(self, client_task): client, task = client_task print(task.wait()) - client.request.assert_called_with('wait', 'task_name', timeout=None) + client.request.assert_called_with('wait', 'task_name', timeout=None, + password=DUMMY_PASS) def test_task_status(self, client_task): client, task = client_task print(task.status()) - client.request.assert_called_with('status', 'task_name') + client.request.assert_called_with('status', 'task_name', + password=DUMMY_PASS) def test_task_call(self): client, task = self._client_operation('task', 'task_name', ocs.OK) print(task()) # equivalent to 'start' + 'wait', but we can only check the last call - client.request.assert_called_with('wait', 'task_name', timeout=None) + client.request.assert_called_with('wait', 'task_name', timeout=None, + password=DUMMY_PASS) def test_task_call_w_error(self): client, task = self._client_operation('task', 'task_name', ocs.ERROR) print(task()) # error skips the 'wait' call after 'start' - client.request.assert_called_with('start', 'task_name', params={}) + client.request.assert_called_with('start', 'task_name', params={}, + password=DUMMY_PASS) def test_process_stop(self, client_process): client, process = client_process print(process.stop()) - client.request.assert_called_with('stop', 'process_name') + client.request.assert_called_with('stop', 'process_name', + password=DUMMY_PASS) def test_process_call(self, client_process): client, process = client_process print(process()) + client.request.assert_called_with('status', 'process_name', + password=DUMMY_PASS) + + def test_op_no_password(self, client_process_nopass): + # Backwards compatibility: if no pw, don't pass the password arg + client, process = client_process_nopass + print(process()) client.request.assert_called_with('status', 'process_name')