-
Notifications
You must be signed in to change notification settings - Fork 6
Open
Description
Hello there,
Great work on the component, but I was wondering if you had given any thought on using the linktap callback feature for state updates.
I've been playing around with this:
You could allow an optional config on adding a gateway, which would register a webhook. I am not positive if there is a local http api to SET that but I would imagine so if its being set in the web interface. The json data coming in from the webhook is identical to that from polling but almost instant and does not require any polling or retrying.
Just a first pass to setup the webhook is what I came up with below, but would still need to connect it to the coordinator for updates
from __future__ import annotations
from aiohttp import web
import logging
from homeassistant.components import cloud, webhook
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_WEBHOOK_ID, URL_API
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_send
from .const import (
CONF_CLOUDHOOK_URL,
DOMAIN,
WEBHOOK_CONST_ID,
WEBHOOK_UPDATE
)
_LOGGER = logging.getLogger(__name__)
@callback
def async_register_webhook(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Register a webhook."""
webhook_id: str = entry.data[CONF_WEBHOOK_ID]
async def _async_handle_linktap_webhook(
hass: HomeAssistant, webhook_id: str, request: web.Request
) -> web.Response:
"""Handle webhook calls from the server."""
data = await request.json()
tap = data.get("dev_id")
# for key, value in data.items():
async_dispatcher_send(hass,
WEBHOOK_UPDATE,
tap,
data)
_LOGGER.debug("Received webhook data: %s", data)
return web.Response(status=web.HTTPNoContent.status_code)
webhook.async_register(
hass, DOMAIN, "Linktap2", webhook_id, _async_handle_linktap_webhook, local_only=True
)
@callback
def async_unregister_webhook(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Unregister a webhook."""
webhook_id: str = entry.data[CONF_WEBHOOK_ID]
webhook.async_unregister(hass, webhook_id)
async def async_get_or_create_registered_webhook_id_and_url(
hass: HomeAssistant, entry: ConfigEntry
) -> str:
"""Generate webhook url."""
config = entry.data.copy()
updated_config = False
webhook_url = None
if not (webhook_id := config.get(CONF_WEBHOOK_ID)):
webhook_id = webhook.async_generate_id()
config[CONF_WEBHOOK_ID] = webhook_id
updated_config = True
if not webhook_url:
webhook_url = webhook.async_generate_url(hass, webhook_id, True, False)
if updated_config:
hass.config_entries.async_update_entry(entry, data=config)
return webhook_urlReactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels
