Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 45 additions & 37 deletions octoprint_psucontrol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(self):
self._idleTimer = None
self._waitForHeaters = False
self._skipIdleTimer = False
self._configuredGPIOPins = {}
self._configuredGPIOPins: Dict[str, periphery.CdevGPIO] = {}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plugin still supports older Python 2.7 installations which doesn't support type annotations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about that, I did not realize that octoprint still has support for python 2.7.

self._noSensing_isPSUOn = False
self.isPSUOn = False

Expand Down Expand Up @@ -166,6 +166,7 @@ def cleanup_gpio(self):
for k, pin in self._configuredGPIOPins.items():
self._logger.debug("Cleaning up {} pin {}".format(k, pin.name))
try:
# should work, even if we try to clean up the same GPIO twice
pin.close()
except Exception:
self._logger.exception(
Expand All @@ -181,44 +182,55 @@ def configure_gpio(self):
self._logger.info("Using GPIO for On/Off")
self._logger.info("Configuring GPIO for pin {}".format(self.config['onoffGPIOPin']))

if not self.config['invertonoffGPIOPin']:
initial_output = 'low'
else:
initial_output = 'high'

try:
pin = periphery.GPIO(self.config['GPIODevice'], self.config['onoffGPIOPin'], initial_output)
self._configuredGPIOPins['switch'] = pin
pin = periphery.CdevGPIO(
path=self.config['GPIODevice'],
line=self.config['onoffGPIOPin'],
direction="out",
inverted=self.config['invertonoffGPIOPin']
)
except Exception:
self._logger.exception(
"Exception while setting up GPIO pin {}".format(self.config['onoffGPIOPin'])
)
else:
self._configuredGPIOPins['switch'] = pin

if self.config['sensingMethod'] == 'GPIO':
self._logger.info("Using GPIO sensing to determine PSU on/off state.")
self._logger.info("Configuring GPIO for pin {}".format(self.config['senseGPIOPin']))


if not SUPPORTS_LINE_BIAS:
if self.config['senseGPIOPinPUD'] != '':
self._logger.warning("Kernel version 5.5 or greater required for GPIO bias. Using 'default'.")
bias = "default"
elif self.config['senseGPIOPinPUD'] == '':
bias = "disable"
elif self.config['senseGPIOPinPUD'] == 'PULL_UP':
bias = "pull_up"
elif self.config['senseGPIOPinPUD'] == 'PULL_DOWN':
bias = "pull_down"
else:
bias = "default"

try:
pin = periphery.CdevGPIO(path=self.config['GPIODevice'], line=self.config['senseGPIOPin'], direction='in', bias=bias)

if self.config['switchingMethod'] == 'GPIO' and self.config['senseGPIOPin'] == self.config['onoffGPIOPin']:
self._logger.info("Sensing through same pin as switching.")
self._configuredGPIOPins['sense'] = pin
except Exception:
self._logger.exception(
"Exception while setting up GPIO pin {}".format(self.config['senseGPIOPin'])
)
else:
self._logger.info("Configuring GPIO for pin {}".format(self.config['senseGPIOPin']))
if not SUPPORTS_LINE_BIAS:
if self.config['senseGPIOPinPUD'] != '':
self._logger.warning("Kernel version 5.5 or greater required for GPIO bias. Using 'default'.")
bias = "default"
elif self.config['senseGPIOPinPUD'] == '':
bias = "disable"
elif self.config['senseGPIOPinPUD'] == 'PULL_UP':
bias = "pull_up"
elif self.config['senseGPIOPinPUD'] == 'PULL_DOWN':
bias = "pull_down"
else:
bias = "default"

try:
pin = periphery.CdevGPIO(
path=self.config['GPIODevice'],
line=self.config['senseGPIOPin'],
direction='in',
bias=bias,
inverted=self.config['invertsenseGPIOPin']
)
self._configuredGPIOPins['sense'] = pin
except Exception:
self._logger.exception(
"Exception while setting up GPIO pin {}".format(self.config['senseGPIOPin'])
)


def _get_plugin_key(self, implementation):
Expand Down Expand Up @@ -248,15 +260,15 @@ def _check_psu_state(self):
self._logger.debug("Polling PSU state...")

if self.config['sensingMethod'] == 'GPIO':
r = 0
r = False
try:
r = self._configuredGPIOPins['sense'].read()
except Exception:
self._logger.exception("Exception while reading GPIO line")

self._logger.debug("Result: {}".format(r))

new_isPSUOn = r ^ self.config['invertsenseGPIOPin']
new_isPSUOn = r

self.isPSUOn = new_isPSUOn
elif self.config['sensingMethod'] == 'SYSTEM':
Expand Down Expand Up @@ -472,10 +484,8 @@ def turn_psu_on(self):
self._logger.debug("On system command returned: {}".format(r))
elif self.config['switchingMethod'] == 'GPIO':
self._logger.debug("Switching PSU On Using GPIO: {}".format(self.config['onoffGPIOPin']))
pin_output = bool(1 ^ self.config['invertonoffGPIOPin'])

try:
self._configuredGPIOPins['switch'].write(pin_output)
self._configuredGPIOPins['switch'].write(True)
except Exception :
self._logger.exception("Exception while writing GPIO line")
return
Expand Down Expand Up @@ -538,10 +548,8 @@ def turn_psu_off(self):
self._logger.debug("Off system command returned: {}".format(r))
elif self.config['switchingMethod'] == 'GPIO':
self._logger.debug("Switching PSU Off Using GPIO: {}".format(self.config['onoffGPIOPin']))
pin_output = bool(0 ^ self.config['invertonoffGPIOPin'])

try:
self._configuredGPIOPins['switch'].write(pin_output)
self._configuredGPIOPins['switch'].write(False)
except Exception:
self._logger.exception("Exception while writing GPIO line")
return
Expand Down