Skip to content

High interface fetches hw address #130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 113 additions & 20 deletions xled/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ def __init__(self, hw_address=None, client=None, auto_refresh_token=True, **kwar
:param kwargs: Arguments to pass to the BaseUrlSession initializer.
Most useful is `base_url`.
"""
self.hw_address = hw_address
self.client = client or ClientApplication()
self.client = client or ClientApplicationValidating(hw_address)
self.auto_refresh_token = auto_refresh_token
super(BaseUrlChallengeResponseAuthSession, self).__init__(**kwargs)

Expand Down Expand Up @@ -272,7 +271,6 @@ def fetch_token(self):
prepared = self.prepare_request_challenge()
response = self.send(prepared)
self.client.parse_response_challenge(response)
self.client.challenge_response_valid(self.hw_address)

prepared = self.prepare_request_verify()
response = self.send(prepared)
Expand Down Expand Up @@ -382,10 +380,100 @@ def add_authorization(self, headers):
return headers


class BaseUrlChallengeResponseAuthHwAddressFetchingSession(
BaseUrlChallengeResponseAuthSession
):
def __init__(self, hw_address=None, client=None, auto_refresh_token=True, **kwargs):
"""Construct a new client session.

:param str hw_address: Hardware address of server. Used to validation during
login phase.
:param client: Object with :class:`ClientApplication` interface.
:param bool auto_refresh_token: (optional) if token is found expired
automatically request new one.
:param kwargs: Arguments to pass to the BaseUrlSession initializer.
Most useful is `base_url`.
"""
self.client = client or ClientApplicationHwAddressFetchingValidating(hw_address)
self.auto_refresh_token = auto_refresh_token
super(BaseUrlChallengeResponseAuthHwAddressFetchingSession, self).__init__(
client=self.client, **kwargs
)

@property
def device_info_url(self):
"""Full URL of device_info (gestalt) endpoint

:return: Full URL.
:rtype: str
"""
return self.create_url("gestalt")

def prepare_request_device_info(self):
"""Creates prepared request to send device_info (gestalt)

:return: prepared request
:rtype: requests.PreparedRequest
"""
request = Request("GET", self.device_info_url)
return request.prepare()

def fetch_token(self):
if self.client.hw_address is None:
log.debug("Requesting device_info (gestalt) for hw_address (mac).")
prepared = self.prepare_request_device_info()
response = self.send(prepared)
self.client.parse_response_device_info(response)
return super(
BaseUrlChallengeResponseAuthHwAddressFetchingSession, self
).fetch_token()


class HwAddressFetchingClientMixin(object):
def populate_device_info_attributes(self, response):
"""Fetches mac address from application response

:param: app_response response Response from login endpoint.
:type: application_response :class:`~xled.response.ApplicationResponse`
"""
if "mac" in response:
self.hw_address = response.get("mac")

def parse_response_device_info(self, response):
app_response = ApplicationResponse(response)
try:
app_response.raise_for_status()
except ApplicationError:
log.error("Get device info failed: %r", app_response.data)
raise AuthenticationError()

self.populate_device_info_attributes(app_response)
log.debug("Device has fw_address (mac): %s", self.hw_address)


class ValidatingClientMixin(object):
"""Mixin adds functionality to :class:`ClientApplication` to authenticate server"""

def challenge_response_valid(self, hw_address=None):
def __init__(self, hw_address=None, **kwargs):
self.hw_address = hw_address
super(ValidatingClientMixin, self).__init__(**kwargs)

def challenge_response_valid(self):
expected = xled.security.make_challenge_response(
self._challenge, self.hw_address
)
if expected != self._challenge_response:
msg = (
"challenge-response invalid. "
"Received challenge-response: %r but %r was expected."
)
log.error(msg, self._challenge_response, expected)
raise ValidationError()

msg = "challenge-response is correct."
log.debug(msg)

def parse_response_challenge(self, response):
"""Verifies server with hardware address returned correct challenge response

Creates challenge-response for server's hardware address, challenge and
Expand All @@ -397,26 +485,19 @@ def challenge_response_valid(self, hw_address=None):
:rtype: bool or None
:raises ValidationError: if chalenge-response is invalid
"""
if not hw_address:
msg = "Can not validate challenge-response without hw_address."
log.debug(msg)
return None
response = super(ValidatingClientMixin, self).parse_response_challenge(response)

expected = xled.security.make_challenge_response(self._challenge, hw_address)
if expected != self._challenge_response:
msg = (
"challenge-response invalid. "
"Received challenge-response: %r but %r was expected."
)
log.error(msg, self._challenge_response, expected)
raise ValidationError()
if not self.hw_address:
msg = "Can not validate challenge-response without hw_address %r."
log.debug(msg, self.hw_address)
return response

msg = "challenge-response is correct."
log.debug(msg)
return True
self.challenge_response_valid()

return response

class ClientApplication(ValidatingClientMixin):

class ClientApplication(object):
def __init__(self, challenge=None):
self.authentication_token = None
self.expires_at = None
Expand All @@ -429,6 +510,8 @@ def __init__(self, challenge=None):
self._challenge_response = None
self._expires_in = None

self._hw_address = None

def new_challenge(self):
"""Generates a challenge string to be used in authorizations."""
try:
Expand Down Expand Up @@ -547,3 +630,13 @@ def parse_response_verify(self, response, **kwargs):
self.authentication_token = self._authentication_token
self._authentication_token = None
return response


class ClientApplicationValidating(ValidatingClientMixin, ClientApplication):
pass


class ClientApplicationHwAddressFetchingValidating(
ValidatingClientMixin, HwAddressFetchingClientMixin, ClientApplication
):
pass
22 changes: 21 additions & 1 deletion xled/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@
import xled.util
import xled.security
from xled.udp_client import UDPClient
from xled.auth import BaseUrlChallengeResponseAuthSession
from xled.auth import (
BaseUrlChallengeResponseAuthSession,
BaseUrlChallengeResponseAuthHwAddressFetchingSession,
)
from xled.compat import xrange
from xled.exceptions import HighInterfaceError
from xled.response import ApplicationResponse


log = logging.getLogger(__name__)

#: UDP port to send realtime frames to
Expand Down Expand Up @@ -954,6 +958,22 @@ class HighControlInterface(ControlInterface):
High level interface to control specific device
"""

@property
def session(self):
"""
Session object to operate on

:return: session object with auth
:py:class:`~.auth.BaseUrlChallengeResponseAuthHwAddressFetchingSession()`.
:rtype: requests.Session
"""
if not self._session:
self._session = BaseUrlChallengeResponseAuthHwAddressFetchingSession(
hw_address=self.hw_address, base_url=self.base_url
)
assert self._session
return self._session

def update_firmware(self, stage0, stage1):
"""
Uploads firmware and runs update
Expand Down