Skip to content

Retrieving Camera status before initiating preset move #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
110 changes: 100 additions & 10 deletions occameracontrol/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from requests.auth import HTTPDigestAuth
from typing import Optional

from occameracontrol.agent import Agent
from agent import Agent
from occameracontrol.metrics import register_camera_move, \
register_camera_expectation
register_camera_expectation, register_camera_status


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -75,13 +75,78 @@ def __str__(self) -> str:
'''
return f"'{self.agent.agent_id}' @ '{self.url}'"

def activate_camera(self, on=True):
def is_on(self) -> bool:
"""Retrieve whether or not the camera is in Standby.
For Panasonic camera AW-UE70:
0 if Standby
1 if On
3 if Transferring from Standby to On
TODO:
- Which panasonic models do we have?
- Maybe there is a difference for other models?
--> Works for the two models that we have

For Sony camera:
0 if Standby
1 if On

-1 if Something went wrong
"""
state = False
if self.type == CameraType.panasonic:
url = f'{self.url}/cgi-bin/aw_ptz'
command = "#O"
params = {'cmd': command, 'res': 1}
auth = (self.user, self.password) \
if self.user and self.password else None
logger.debug('GET %s with params: %s', url, params)
response = requests.get(url, auth=auth, params=params, timeout=5)
response.raise_for_status()
state_int = int(response.content.decode().removeprefix('p'))
while state_int == 3:
# Escape the transition from standby to on
time.sleep(3)
response = requests.get(
url,
auth=auth,
params=params,
timeout=5)
response.raise_for_status()
state = bool(state_int)

if self.type == CameraType.sony:
url = f'{self.url}/command/inquiry.cgi'
params = {'inq': 'system'}
headers = {'referer': f'{self.url}/'}
auth = HTTPDigestAuth(self.user, self.password) \
if self.user and self.password else None
logger.debug('GET %s with params: %s', url, params)
response = requests.get(url,
auth=auth,
headers=headers,
params=params,
timeout=5)
response.raise_for_status()
values = response.content.decode().split("&")
for v in values:
if "Power" in v:
if v.removeprefix("Power=")[1] == 'on':
state = True
else:
state = False

register_camera_status(self.url, int(state))
return state

def set_power(self, turn_on=True):
"""Activate the camera or put it into standby mode.
:param bool on: camera should be online or standby (default: True)
:param bool on: camera should be turned on (True)
or set to standby (False) (default: True)
"""
if self.type == CameraType.panasonic:
url = f'{self.url}/cgi-bin/aw_ptz'
command = '#On' if on else '#Of'
# If the camera is in Standby, turn it on
command = '#On' if not turn_on else '#Of'
params = {'cmd': command, 'res': 1}
auth = (self.user, self.password) \
if self.user and self.password else None
Expand All @@ -91,7 +156,8 @@ def activate_camera(self, on=True):

elif self.type == CameraType.sony:
url = f'{self.url}/command/main.cgi'
command = 'on' if on else 'standby'
# If the camera is in Standby, turn it on
command = 'on' if not turn_on else 'standby'
params = {'System': command}
headers = {'referer': f'{self.url}/'}
auth = HTTPDigestAuth(self.user, self.password) \
Expand All @@ -104,6 +170,8 @@ def activate_camera(self, on=True):
timeout=5)
response.raise_for_status()

self.is_on()

def move_to_preset(self, preset: int):
'''Move the PTZ camera to the specified preset position
'''
Expand Down Expand Up @@ -170,17 +238,39 @@ def update_position(self):
logger.info('[%s] Event `%s` started', agent_id, event.title)
logger.info('[%s] Moving to preset %i', agent_id,
self.preset_active)
self.activate_camera()
logger.debug('[%s] Retrieving the camera state', agent_id)
if not self.is_on():
self.set_power()
time.sleep(10)

# To update the metrics
self.is_on()

self.move_to_preset(self.preset_active)
else: # No active event
if self.position != self.preset_inactive:
logger.info('[%s] Returning to preset %i', agent_id,
self.preset_inactive)
self.activate_camera()
self.move_to_preset(self.preset_inactive)
logger.debug('[%s] Retrieving the camera state', agent_id)
if not self.is_on():
self.set_power()
time.sleep(10)

# To update the metrics
self.is_on()

self.move_to_preset(self.preset_inactive)
# Regular update
if time.time() - self.last_updated >= self.update_frequency:
logger.info('[%s] Re-sending preset %i to camera', agent_id,
self.position)
self.activate_camera()

logger.debug('[%s] Retrieving the camera state', agent_id)
if not self.is_on():
self.set_power()
time.sleep(10)

# To update the metrics
self.is_on()

self.move_to_preset(self.position)
14 changes: 14 additions & 0 deletions occameracontrol/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
'camera_position_expected',
'The position (preset number) a camera should be in',
('camera',))
camera_is_on = Gauge(
'camera_status',
'Whether the camera is On (1.0) or Standby (0.0)',
('camera',))


class RequestErrorHandler():
Expand Down Expand Up @@ -120,6 +124,16 @@ def register_camera_expectation(camera: str, position: int):
camera_position_expected.labels(camera).set(position)


def register_camera_status(camera: str, status: int):
'''Update metrics for the status of the camera. This ensures the (power)
state of the camera is available as part of the metrics.

:param camera: Camera identified
:param status: 1.0 if camera is 'on', 0.0 if 'standby'
'''
camera_is_on.labels(camera).set(status)


def start_metrics_exporter():
'''Start the web server for the metrics exporter endpoint if it is enabled
in the configuration.
Expand Down
Loading