Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkpointer/checkpointer/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.8.10"
__version__ = "2.8.11"
709 changes: 373 additions & 336 deletions checkpointer/uv.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lmax-connector/lmax_connector/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.8.10"
__version__ = "2.8.11"
759 changes: 424 additions & 335 deletions lp-pricer/uv.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pragma-sdk/pragma_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.8.10"
__version__ = "2.8.11"
421 changes: 2 additions & 419 deletions pragma-sdk/uv.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pragma-utils/pragma_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.8.10"
__version__ = "2.8.11"
157 changes: 1 addition & 156 deletions pragma-utils/uv.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion price-pusher/price_pusher/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "2.8.10"
__version__ = "2.8.11"
14 changes: 12 additions & 2 deletions price-pusher/price_pusher/core/pusher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import time
import logging
from typing import List, Optional, Dict
from typing import List, Optional, Dict, Callable
from starknet_py.contract import InvokeResult
from starknet_py.net.client_errors import ClientError
from requests.exceptions import RequestException
Expand Down Expand Up @@ -30,10 +30,15 @@ async def update_price_feeds(self, entries: List[Entry]) -> Optional[Dict]: ...


class PricePusher(IPricePusher):
def __init__(self, client: PragmaClient):
def __init__(
self,
client: PragmaClient,
on_successful_push: Optional[Callable[[], None]] = None,
):
self.client = client
self.consecutive_push_error = 0
self.onchain_lock = asyncio.Lock()
self.on_successful_push = on_successful_push

# Setup RPC health monitoring if using onchain client
if isinstance(self.client, PragmaOnChainClient):
Expand Down Expand Up @@ -82,6 +87,11 @@ async def update_price_feeds(self, entries: List[Entry]) -> Optional[Dict]:
f"(took {(end_t - start_t):.2f}s)"
)
self.consecutive_push_error = 0

# Notify health server of successful push
if self.on_successful_push:
self.on_successful_push()

return response

except (ClientError, RequestException) as e:
Expand Down
96 changes: 96 additions & 0 deletions price-pusher/price_pusher/health_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import time
import logging
from typing import Optional
from aiohttp import web

logger = logging.getLogger(__name__)


class HealthServer:
"""
Health check server that monitors the price pusher service.
Returns healthy status while warming up, then monitors push frequency.
"""

def __init__(self, port: int = 8080, max_seconds_without_push: int = 300):
"""
Initialize the health server.

Args:
port: Port to run the health server on
max_seconds_without_push: Maximum seconds without a push before unhealthy
"""
self.port = port
self.last_push_timestamp: Optional[float] = None
self.max_seconds_without_push = max_seconds_without_push
self.startup_time = time.time()
self.total_pushes = 0

def update_last_push(self) -> None:
"""Called by pusher after successful push"""
self.last_push_timestamp = time.time()
self.total_pushes += 1
logger.debug(f"Health server: Push #{self.total_pushes} recorded")

async def health_check(self, request) -> web.Response:
"""
Health check endpoint.

Returns:
- 200 (healthy) if warming up or push within threshold
- 503 (unhealthy) if no push for too long after first push
"""
# No push yet = still warming up = healthy
if not self.last_push_timestamp:
seconds_since_startup = time.time() - self.startup_time
return web.json_response(
{
"status": "healthy",
"state": "warming_up",
"seconds_since_startup": int(seconds_since_startup),
"total_pushes": 0,
"message": f"Waiting for first push ({int(seconds_since_startup)}s since startup)",
},
status=200,
)

# We have pushed at least once
seconds_since_push = time.time() - self.last_push_timestamp

if seconds_since_push > self.max_seconds_without_push:
return web.json_response(
{
"status": "unhealthy",
"state": "stale",
"last_push_seconds_ago": int(seconds_since_push),
"max_allowed_seconds": self.max_seconds_without_push,
"total_pushes": self.total_pushes,
"message": f"No push for {int(seconds_since_push)} seconds",
},
status=503,
)

return web.json_response(
{
"status": "healthy",
"state": "active",
"last_push_seconds_ago": int(seconds_since_push),
"max_allowed_seconds": self.max_seconds_without_push,
"total_pushes": self.total_pushes,
},
status=200,
)

async def start(self) -> None:
"""Start the health check HTTP server"""
app = web.Application()
app.router.add_get("/health", self.health_check)
app.router.add_get("/healthz", self.health_check) # k8s convention
app.router.add_get("/", self.health_check) # root endpoint

runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", self.port)
await site.start()

logger.info(f"🏥 Health server started on port {self.port}")
39 changes: 38 additions & 1 deletion price-pusher/price_pusher/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from price_pusher.orchestrator import Orchestrator
from price_pusher.price_types import Target, Network
from price_pusher.health_server import HealthServer

logger = logging.getLogger(__name__)

Expand All @@ -46,6 +47,8 @@ async def main(
max_fee: Optional[int] = None,
pagination: Optional[int] = None,
enable_strk_fees: Optional[bool] = None,
health_port: Optional[int] = None,
max_seconds_without_push: Optional[int] = None,
) -> None:
"""
Main function of the price pusher.
Expand Down Expand Up @@ -74,13 +77,25 @@ async def main(
)

logger.info("⏳ Starting orchestration...")

# Create health server if configured
health_server = None
if health_port:
health_server = HealthServer(
port=health_port, max_seconds_without_push=max_seconds_without_push or 300
)

poller = PricePoller(fetcher_client=fetcher_client)
pusher = PricePusher(client=pragma_client)
pusher = PricePusher(
client=pragma_client,
on_successful_push=health_server.update_last_push if health_server else None,
)
orchestrator = Orchestrator(
poller=poller,
poller_refresh_interval=poller_refresh_interval,
listeners=_create_listeners(price_configs, target, pragma_client),
pusher=pusher,
health_server=health_server,
)

logger.info("🚀 Orchestration starting 🚀")
Expand Down Expand Up @@ -274,6 +289,20 @@ def _create_client(
default=5,
help="Interval in seconds between poller refreshes. Default to 5 seconds.",
)
@click.option(
"--health-port",
type=click.IntRange(min=1, max=65535),
required=False,
default=8080,
help="Port for health check HTTP server. Default to 8080. Set to 0 to disable.",
)
@click.option(
"--max-seconds-without-push",
type=click.IntRange(min=60),
required=False,
default=300,
help="Maximum seconds without push before unhealthy. Default to 300 seconds (5 minutes).",
)
def cli_entrypoint(
config_file: str,
log_level: str,
Expand All @@ -290,6 +319,8 @@ def cli_entrypoint(
pagination: Optional[int],
enable_strk_fees: Optional[bool],
poller_refresh_interval: int,
health_port: Optional[int],
max_seconds_without_push: Optional[int],
) -> None:
if target == "offchain":
if not api_key or not api_base_url:
Expand Down Expand Up @@ -334,6 +365,10 @@ def cli_entrypoint(
if api_base_url is not None and api_base_url.endswith("/"):
api_base_url = api_base_url.rstrip()[:-1]

# Disable health server if port is 0
if health_port == 0:
health_port = None

asyncio.run(
main(
price_configs=price_configs,
Expand All @@ -350,6 +385,8 @@ def cli_entrypoint(
pagination=pagination,
enable_strk_fees=enable_strk_fees,
poller_refresh_interval=poller_refresh_interval,
health_port=health_port,
max_seconds_without_push=max_seconds_without_push,
)
)

Expand Down
26 changes: 23 additions & 3 deletions price-pusher/price_pusher/orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import asyncio

from typing import List, Dict
from typing import List, Dict, Optional

from pragma_sdk.common.types.entry import Entry

Expand All @@ -11,6 +11,7 @@
from price_pusher.core.listener import PriceListener
from price_pusher.core.pusher import PricePusher
from price_pusher.price_types import LatestOrchestratorPairPrices
from price_pusher.health_server import HealthServer

logger = logging.getLogger(__name__)

Expand All @@ -37,10 +38,12 @@ def __init__(
listeners: List[PriceListener],
pusher: PricePusher,
poller_refresh_interval: int = 5,
health_server: Optional[HealthServer] = None,
) -> None:
self.poller = poller
self.listeners = listeners
self.pusher = pusher
self.health_server = health_server

# Time between poller refresh
self.poller_refresh_interval = poller_refresh_interval
Expand All @@ -67,12 +70,19 @@ async def run_forever(self) -> None:
- the listener, that listen our oracle and push an event when the
data is outdated and needs new entries,
- the pusher, that pushes entries to our oracle.
- the health server (if configured), that provides health status.
"""
await asyncio.gather(
tasks = [
self._poller_service(),
self._listener_services(),
self._pusher_service(),
)
]

# Start health server if configured
if self.health_server:
tasks.append(self._health_server_service())

await asyncio.gather(*tasks)

async def _poller_service(self) -> None:
"""
Expand Down Expand Up @@ -196,3 +206,13 @@ def callback_update_prices(self, entries: List[Entry]) -> None:
if source not in self.latest_prices[pair_id][data_type]:
self.latest_prices[pair_id][data_type][source] = {}
self.latest_prices[pair_id][data_type][source][expiry] = entry

async def _health_server_service(self) -> None:
"""
Start the health check HTTP server.
"""
if self.health_server:
await self.health_server.start()
# Keep the service running
while True:
await asyncio.sleep(3600) # Sleep for an hour
1 change: 1 addition & 0 deletions price-pusher/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies = [
"click>=8.1.0",
"pydantic>=2.7.4",
"asgiref>=3.4.1",
"aiohttp>=3.9.0",
]

dynamic = ["version"]
Expand Down
Loading
Loading