forked from Gozargah/Marzban
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadmin.py
133 lines (109 loc) · 4.11 KB
/
admin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.exc import IntegrityError
from app.db import Session, crud, get_db
from app.dependencies import get_admin_by_username, validate_admin
from app.models.admin import Admin, AdminCreate, AdminModify, Token
from app.utils import report, responses
from app.utils.jwt import create_admin_token
from config import LOGIN_NOTIFY_WHITE_LIST
router = APIRouter(tags=["Admin"], prefix="/api", responses={401: responses._401})
def get_client_ip(request: Request) -> str:
"""Extract the client's IP address from the request headers or client."""
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
if request.client:
return request.client.host
return "Unknown"
@router.post("/admin/token", response_model=Token)
def admin_token(
request: Request,
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db),
):
"""Authenticate an admin and issue a token."""
client_ip = get_client_ip(request)
dbadmin = validate_admin(db, form_data.username, form_data.password)
if not dbadmin:
report.login(form_data.username, form_data.password, client_ip, False)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if client_ip not in LOGIN_NOTIFY_WHITE_LIST:
report.login(form_data.username, "🔒", client_ip, True)
return Token(access_token=create_admin_token(form_data.username, dbadmin.is_sudo))
@router.post(
"/admin",
response_model=Admin,
responses={403: responses._403, 409: responses._409},
)
def create_admin(
new_admin: AdminCreate,
db: Session = Depends(get_db),
admin: Admin = Depends(Admin.check_sudo_admin),
):
"""Create a new admin if the current admin has sudo privileges."""
try:
dbadmin = crud.create_admin(db, new_admin)
except IntegrityError:
db.rollback()
raise HTTPException(status_code=409, detail="Admin already exists")
return dbadmin
@router.put(
"/admin/{username}",
response_model=Admin,
responses={403: responses._403, 404: responses._404},
)
def modify_admin(
modified_admin: AdminModify,
dbadmin: Admin = Depends(get_admin_by_username),
db: Session = Depends(get_db),
current_admin: Admin = Depends(Admin.check_sudo_admin),
):
"""Modify an existing admin's details."""
if (dbadmin.username != current_admin.username) and dbadmin.is_sudo:
raise HTTPException(
status_code=403,
detail="You're not allowed to edit another sudoer's account. Use marzban-cli instead.",
)
updated_admin = crud.update_admin(db, dbadmin, modified_admin)
return updated_admin
@router.delete(
"/admin/{username}",
responses={403: responses._403},
)
def remove_admin(
dbadmin: Admin = Depends(get_admin_by_username),
db: Session = Depends(get_db),
current_admin: Admin = Depends(Admin.check_sudo_admin),
):
"""Remove an admin from the database."""
if dbadmin.is_sudo:
raise HTTPException(
status_code=403,
detail="You're not allowed to delete sudo accounts. Use marzban-cli instead.",
)
crud.remove_admin(db, dbadmin)
return {"detail": "Admin removed successfully"}
@router.get("/admin", response_model=Admin)
def get_current_admin(admin: Admin = Depends(Admin.get_current)):
"""Retrieve the current authenticated admin."""
return admin
@router.get(
"/admins",
response_model=List[Admin],
responses={403: responses._403},
)
def get_admins(
offset: Optional[int] = None,
limit: Optional[int] = None,
username: Optional[str] = None,
db: Session = Depends(get_db),
admin: Admin = Depends(Admin.check_sudo_admin),
):
"""Fetch a list of admins with optional filters for pagination and username."""
return crud.get_admins(db, offset, limit, username)