Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 0 additions & 66 deletions app/api/sse_manager.py

This file was deleted.

24 changes: 8 additions & 16 deletions app/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
import re
import time
import warnings
from typing import Dict, Optional
from typing import Any, Dict, Optional

from fastapi.encoders import jsonable_encoder
from fastapi_plugins import redis_plugin
from loguru import logger

from app.api.sse_manager import SSEManager
from app.external.sse_starlette import ServerSentEvent
from app.api.ws_manager import WebSocketManager

sse_mgr = SSEManager()
sse_mgr.setup()
ws_mgr = WebSocketManager()


class ProcessResult:
Expand All @@ -37,15 +35,8 @@ def __str__(self) -> str:
)


def build_sse_event(event: str, json_data: Optional[Dict]):
return ServerSentEvent(
event=event,
data=json.dumps(jsonable_encoder(json_data)),
)


async def broadcast_sse_msg(event: str, json_data: Optional[Dict]):
"""Broadcasts a message to all connected clients
async def broadcast_json_ws(event: str, json_data: dict[str, Any]):
"""Broadcasts a json message to all connected clients

Parameters
----------
Expand All @@ -54,8 +45,7 @@ async def broadcast_sse_msg(event: str, json_data: Optional[Dict]):
data : dictionary, optional
The data to include
"""

await sse_mgr.broadcast_to_all(build_sse_event(event, json_data))
await ws_mgr.broadcast_json(json_data={"event": event, "data": json_data})


async def redis_get(key: str) -> str:
Expand Down Expand Up @@ -108,6 +98,8 @@ class SSE:
LN_FORWARD_SUCCESSES = "ln_forward_successes"
WALLET_BALANCE = "wallet_balance"

SERVER_ERROR = "server_error"


# https://gist.github.com/risent/4cab3878d995bec7d1c2
# https://firebase.blog/posts/2015/02/the-2120-ways-to-ensure-unique_68
Expand Down
54 changes: 54 additions & 0 deletions app/api/ws_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import Any

from fastapi import WebSocket, WebSocketDisconnect
from loguru import logger


class WebSocketManager:
def __init__(self):
self._ctr = 0
self.active_connections: dict[int, WebSocket] = {}

async def connect(self, websocket: WebSocket) -> int:
await websocket.accept()
id = self._next_id()
self.active_connections[id] = websocket
logger.debug(f"Connecting Websocket with ID {id}")

return id

def disconnect(self, id: int):
del self.active_connections[id]

async def send_json(self, id: int, data: dict[str, Any]):
websocket = self._get(id)
await websocket.send_json(data=data)

async def send_personal_message(self, message: str, id: int):
await self._send(message, id)

async def broadcast(self, message: str):
for id in self.active_connections.keys():
await self._send(message, id)

async def broadcast_json(self, json_data: dict[str, Any]):
for id in self.active_connections.keys():
await self.send_json(id=id, data=json_data)

async def _send(self, message: str, id: int):
try:
websocket = self._get(id)
await websocket.send_text(message)
except WebSocketDisconnect:
self.disconnect(id)

def _get(self, id: int):
try:
return self.active_connections[id]
except Exception as e:
raise e

def _next_id(self):
next_id = self._ctr
self._ctr += 1
return next_id
22 changes: 6 additions & 16 deletions app/apps/impl/native_python.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,21 @@
from fastapi import HTTPException, status

from app.apps.impl.apps_base import AppsBase


class _NotImplemented(HTTPException):
def __init__(self):
super().__init__(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Not available in native python mode.",
)


class NativePythonApps(AppsBase):
async def get_app_status_single(self, app_id: str):
raise _NotImplemented()
raise NotImplementedError()

async def get_app_status(self):
raise _NotImplemented()
raise NotImplementedError()

async def get_app_status_advanced(self, app_id: str):
raise _NotImplemented()
raise NotImplementedError()

async def get_app_status_sub(self):
raise _NotImplemented()
raise NotImplementedError()

async def install_app_sub(self, app_id: str):
raise _NotImplemented()
raise NotImplementedError()

async def uninstall_app_sub(self, app_id: str, delete_data: bool):
raise _NotImplemented()
raise NotImplementedError()
24 changes: 12 additions & 12 deletions app/apps/impl/raspiblitz.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from fastapi.encoders import jsonable_encoder
from loguru import logger as logging

from app.api.utils import SSE, broadcast_sse_msg, call_sudo_script, parse_key_value_text
from app.api.utils import SSE, broadcast_json_ws, call_sudo_script, parse_key_value_text
from app.apps.impl.apps_base import AppsBase

available_app_ids = {
Expand Down Expand Up @@ -196,7 +196,7 @@ async def install_app_sub(self, app_id: str):
detail=app_id + "install script does not exist / is not supported",
)

await broadcast_sse_msg(
await broadcast_json_ws(
SSE.INSTALL_APP,
{"id": app_id, "mode": "on", "result": "running", "details": ""},
)
Expand All @@ -212,7 +212,7 @@ async def uninstall_app_sub(self, app_id: str, delete_data: bool):
status.HTTP_400_BAD_REQUEST, detail="script not exist/supported"
)

await broadcast_sse_msg(
await broadcast_json_ws(
SSE.INSTALL_APP,
{"id": app_id, "mode": "off", "result": "running", "details": ""},
)
Expand Down Expand Up @@ -279,7 +279,7 @@ async def run_bonus_script(self, app_id: str, params: str):
logging.error(
f"FOUND `error=` returned by script: {stdoutData['error']}"
)
await broadcast_sse_msg(
await broadcast_json_ws(
SSE.INSTALL_APP,
{
"id": app_id,
Expand All @@ -292,7 +292,7 @@ async def run_bonus_script(self, app_id: str, params: str):
# stdout - consider also script had error
elif "result" not in stdoutData:
logging.error("NO `result=` returned by script:")
await broadcast_sse_msg(
await broadcast_json_ws(
SSE.INSTALL_APP,
{
"id": app_id,
Expand All @@ -310,7 +310,7 @@ async def run_bonus_script(self, app_id: str, params: str):
if updatedAppData["error"] != "":
logging.warning("Error Detected ...")
logging.warning(f"updatedAppData: {updatedAppData}")
await broadcast_sse_msg(
await broadcast_json_ws(
SSE.INSTALL_APP,
{
"id": app_id,
Expand All @@ -324,7 +324,7 @@ async def run_bonus_script(self, app_id: str, params: str):
elif mode == "on":
if updatedAppData["installed"]:
logging.info(f"WIN - install of {app_id} was effective")
await broadcast_sse_msg(
await broadcast_json_ws(
SSE.INSTALL_APP,
{
"id": app_id,
Expand All @@ -335,14 +335,14 @@ async def run_bonus_script(self, app_id: str, params: str):
"details": stdoutData["result"],
},
)
await broadcast_sse_msg(
await broadcast_json_ws(
SSE.INSTALLED_APP_STATUS, [updatedAppData]
)
else:
logging.error(f"FAIL - {app_id} was not installed")
logging.debug(f"updatedAppData: {updatedAppData}")
logging.debug(f"params: {params}")
await broadcast_sse_msg(
await broadcast_json_ws(
SSE.INSTALL_APP,
{
"id": app_id,
Expand All @@ -351,16 +351,16 @@ async def run_bonus_script(self, app_id: str, params: str):
"details": "install was not effective",
},
)
await broadcast_sse_msg(
await broadcast_json_ws(
SSE.INSTALLED_APP_STATUS, [updatedAppData]
)

elif mode == "off":
await broadcast_sse_msg(
await broadcast_json_ws(
SSE.INSTALL_APP,
{"id": app_id, "mode": mode, "result": "win"},
)
await broadcast_sse_msg(SSE.INSTALLED_APP_STATUS, [updatedAppData])
await broadcast_json_ws(SSE.INSTALLED_APP_STATUS, [updatedAppData])

if not updatedAppData["installed"]:
logging.info(f"WIN - uninstall of {app_id} was effective")
Expand Down
13 changes: 0 additions & 13 deletions app/apps/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import app.apps.docs as docs
import app.apps.service as repo
from app.auth.auth_bearer import JWTBearer
from app.external.sse_starlette import EventSourceResponse

_PREFIX = "apps"

Expand Down Expand Up @@ -53,18 +52,6 @@ async def get_single_status_advanced(id: str = Path(..., required=True)):
return await repo.get_app_status_advanced(id)


@router.get(
"/status-sub",
name=f"{_PREFIX}/status-sub",
summary="Subscribe to status changes of currently installed apps.",
response_description=docs.get_app_status_sub_response_docs,
dependencies=[Depends(JWTBearer())],
)
@logger.catch(exclude=(HTTPException,))
async def get_status_sub():
return EventSourceResponse(repo.get_app_status_sub())


@router.post(
"/install/{name}",
name=f"{_PREFIX}/install",
Expand Down
6 changes: 5 additions & 1 deletion app/apps/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from decouple import config
from fastapi import HTTPException, status

from app.system.models import APIPlatform

Expand All @@ -21,7 +22,10 @@ async def get_app_status_single(app_id: str):


async def get_app_status():
return await apps.get_app_status()
try:
return await apps.get_app_status()
except NotImplementedError:
return HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED)


async def get_app_status_advanced(app_id: str):
Expand Down
15 changes: 1 addition & 14 deletions app/bitcoind/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
handle_block_sub,
)
from app.bitcoind.utils import bitcoin_rpc
from app.external.sse_starlette import EventSourceResponse

_PREFIX = "bitcoin"

Expand Down Expand Up @@ -132,18 +131,6 @@ async def getnetworkinfo():
async def get_raw_transaction_path(
txid: str = Query(
..., min_length=64, max_length=64, description="The transaction id"
)
),
):
return await get_raw_transaction(txid)


@router.get(
"/block-sub",
name=f"{_PREFIX}.block-sub",
summary="Subscribe to incoming blocks.",
description=blocks_sub_doc,
response_description="A JSON object with information about the new block.",
dependencies=[Depends(JWTBearer())],
)
async def zmq_sub(request: Request, verbosity: int = 1):
return EventSourceResponse(handle_block_sub(request, verbosity))
Loading