-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathroutes.py
More file actions
119 lines (94 loc) · 3.02 KB
/
routes.py
File metadata and controls
119 lines (94 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""API Routes for Space Entropy Generator"""
from fastapi import APIRouter, HTTPException, Query, Path
from fastapi.responses import JSONResponse, HTMLResponse
from typing import Optional
import base64
import logging
from pathlib import Path as FilePath
from app.entropy.pool import entropy_pool
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/", response_class=HTMLResponse)
async def root():
"""Root endpoint - serve web interface"""
html_file = FilePath(__file__).parent.parent / "templates" / "index.html"
if html_file.exists():
return html_file.read_text()
else:
# Fallback to JSON if HTML not found
return JSONResponse({
"name": "Space Entropy Generator",
"version": "0.1.0",
"description": "True Randomness as a Service using space imagery",
"endpoints": {
"/random/{n}": "Get n random bytes (Base64 encoded)",
"/health": "Health check",
"/stats": "Entropy pool statistics"
}
})
@router.get("/health")
async def health_check():
"""
Health check endpoint
Returns service health status
"""
health = await entropy_pool.health_check()
return {
"status": "healthy" if health.get('healthy', False) else "degraded",
"service": "space-entropy-generator",
"version": "0.1.0",
"redis": health
}
@router.get("/stats")
async def get_stats():
"""
Get entropy pool statistics
Returns information about the entropy pool status
"""
stats = await entropy_pool.get_stats()
return stats
@router.get("/random/{n}")
async def get_random_bytes(
n: int = Path(..., ge=1, le=10240, description="Number of random bytes to generate"),
):
"""
Get n random bytes from the entropy pool
Args:
n: Number of bytes to return (1-10240)
Returns:
Base64 encoded random bytes
"""
# Validate input
if n < 1 or n > 10240:
raise HTTPException(
status_code=400,
detail="Number of bytes must be between 1 and 10240"
)
# Get entropy from pool
try:
entropy = await entropy_pool.get_entropy(n)
if entropy is None:
raise HTTPException(
status_code=503,
detail="Entropy pool is empty. Please try again later."
)
# Encode as base64 for safe transport
encoded = base64.b64encode(entropy).decode('utf-8')
return {
"bytes": encoded,
"length": len(entropy),
"format": "base64"
}
except ConnectionError as e:
raise HTTPException(
status_code=503,
detail=f"Entropy service unavailable: {str(e)}"
)
@router.get("/random")
async def get_random_default():
"""
Get default amount of random bytes (256 bytes)
Returns:
Base64 encoded random bytes
"""
return await get_random_bytes(256)