table_game/backend/app/services/admin_user_service.py
2025-03-10 08:35:19 +08:00

395 lines
14 KiB
Python

from multiprocessing import connection
from fastapi import HTTPException
from ..db import get_connection
from ..utils.jwt_handler import verify_token
import hashlib
def query_users_by_range(token: str, start: int, end: int):
"""
1. 验证 token 是否有效,以及是否是 admin 用户
2. 查询 user_id 在 [start, end] 范围内的用户
3. 返回用户的基本信息
"""
# 1. 验证 token
try:
payload = verify_token(token) # 如果验证失败会抛出异常
username = payload["sub"]
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
# 2. 确认用户类型是否为 admin
connection = get_connection()
if not connection:
raise HTTPException(status_code=500, detail="Database connection failed!")
try:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_type FROM users WHERE username = %s;", (username,))
admin_user = cursor.fetchone()
if not admin_user:
raise HTTPException(status_code=401, detail="Admin user not found.")
if admin_user["user_type"] != "admin":
raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.")
# 3. 查询范围内的用户
cursor.execute("""
SELECT
user_id,
username,
points,
phone_number,
gender,
email,
user_type,
created_at,
updated_at
FROM users
WHERE user_id BETWEEN %s AND %s
ORDER BY user_id ASC
""", (start, end))
result = cursor.fetchall()
# 4. 返回结果
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error during query: {str(e)}")
finally:
cursor.close()
connection.close()
def delete_user_by_token(token: str, uid: int):
"""
删除用户逻辑:
1. 验证 token 确保为管理员。
2. 检查指定用户是否存在。
3. 检查用户是否有相关订单信息。
4. 删除用户。
"""
# 1. 验证 token
try:
payload = verify_token(token) # 如果验证失败会抛出异常
username = payload["sub"]
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
# 2. 确认用户类型是否为 admin
connection = get_connection()
if not connection:
raise HTTPException(status_code=500, detail="Database connection failed!")
try:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_type FROM users WHERE username = %s;", (username,))
admin_user = cursor.fetchone()
if not admin_user:
raise HTTPException(status_code=401, detail="Admin user not found.")
if admin_user["user_type"] != "admin":
raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.")
# 3. 检查用户是否存在
cursor.execute("SELECT * FROM users WHERE user_id = %s;", (uid,))
user_to_delete = cursor.fetchone()
if not user_to_delete:
raise HTTPException(status_code=404, detail=f"User with ID {uid} not found.")
# 4. 检查用户是否有订单
cursor.execute("SELECT COUNT(*) AS order_count FROM orders WHERE user_id = %s;", (uid,))
order_info = cursor.fetchone()
if order_info["order_count"] > 0:
raise HTTPException(status_code=400, detail=f"Cannot delete user with ID {uid}: user has existing orders.")
# 5. 删除用户
cursor.execute("DELETE FROM users WHERE user_id = %s;", (uid,))
connection.commit()
return f"User with ID {uid} deleted successfully."
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Error during deletion: {str(e)}")
finally:
cursor.close()
connection.close()
def update_user(token: str, uid: int, username: str = None, email: str = None,
phone_number: str = None, gender: str = None, user_type: str = None):
"""
管理员更新用户信息:
1. 验证 token 确保为管理员。
2. 确认用户存在。
3. 根据提供的参数更新用户信息。
"""
# 1. 验证 token
try:
payload = verify_token(token) # 如果验证失败会抛出异常
admin_username = payload["sub"]
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
# 2. 确认管理员权限
connection = get_connection()
if not connection:
raise HTTPException(status_code=500, detail="Database connection failed!")
try:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_type FROM users WHERE username = %s;", (admin_username,))
admin_user = cursor.fetchone()
if not admin_user:
raise HTTPException(status_code=401, detail="Admin user not found.")
if admin_user["user_type"] != "admin":
raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.")
# 3. 检查用户是否存在
cursor.execute("SELECT * FROM users WHERE user_id = %s;", (uid,))
user_to_update = cursor.fetchone()
if not user_to_update:
raise HTTPException(status_code=404, detail=f"User with ID {uid} not found.")
# 4. 根据提供的字段更新用户信息
updates = []
params = []
print(username, email, phone_number, gender, user_type)
if username:
updates.append("username = %s")
params.append(username)
if email:
updates.append("email = %s")
params.append(email)
if phone_number:
updates.append("phone_number = %s")
params.append(phone_number)
if gender:
updates.append("gender = %s")
params.append(gender)
if user_type:
if user_type not in ['admin', 'player']:
raise HTTPException(status_code=400, detail="Invalid user_type. Must be 'admin' or 'player'.")
updates.append("user_type = %s")
params.append(user_type)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update.")
params.append(uid)
update_query = f"UPDATE users SET {', '.join(updates)} WHERE user_id = %s;"
# 5. 执行更新操作
cursor.execute(update_query, tuple(params))
connection.commit()
return f"User with ID {uid} updated successfully."
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error during update: {str(e)}")
finally:
cursor.close()
connection.close()
def get_total_of_users(token: str):
"""
获取用户总数
1. 验证 token 确保为管理员。
2. 查询总人数
"""
# 1. 验证 token
try:
payload = verify_token(token) # 如果验证失败会抛出异常
admin_username = payload["sub"]
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
connection = get_connection()
if not connection:
raise HTTPException(status_code=500, detail="Database connection failed!")
try:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_type FROM users WHERE username = %s;", (admin_username,))
admin_user = cursor.fetchone()
if not admin_user:
raise HTTPException(status_code=401, detail="Admin user not found.")
if admin_user["user_type"] != "admin":
raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.")
cursor.execute("SELECT COUNT(*) AS user_count FROM users")
result = cursor.fetchone()
if not result:
raise HTTPException(status_code=404, detail="Database connection failed!")
return result["user_count"]
finally:
cursor.close()
connection.close()
def query_user(token: str, query_mode: str, query_value: str):
"""
通过 `phone_number` / `email` / `username` / `uid` 进行模糊查询
"""
# 1. 验证 token
try:
payload = verify_token(token)
username = payload["sub"]
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
# 2. 确认管理员身份
connection = get_connection()
if not connection:
raise HTTPException(status_code=500, detail="Database connection failed!")
try:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_type FROM users WHERE username = %s;", (username,))
admin_user = cursor.fetchone()
if not admin_user:
raise HTTPException(status_code=401, detail="Admin user not found.")
if admin_user["user_type"] != "admin":
raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.")
# 3. 根据查询模式构建 SQL
valid_modes = {
"phone_number": "phone_number",
"email": "email",
"username": "username",
"uid": "user_id"
}
if query_mode not in valid_modes:
raise HTTPException(status_code=400, detail="Invalid query mode.")
search_column = valid_modes[query_mode]
# `uid` 应该是精确匹配,而不是模糊查询
if query_mode == "uid":
query = f"""
SELECT user_id, username, points, phone_number, gender, email, user_type, created_at, updated_at
FROM users
WHERE {search_column} = %s
"""
cursor.execute(query, (query_value,))
else:
# 其余模式使用 `LIKE` 进行模糊查询
query = f"""
SELECT user_id, username, points, phone_number, gender, email, user_type, created_at, updated_at
FROM users
WHERE {search_column} LIKE %s
"""
search_value = f"%{query_value}%" # 模糊匹配
cursor.execute(query, (search_value,))
users = cursor.fetchall()
if not users:
raise HTTPException(status_code=404, detail="User not found.")
return users
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error during query: {str(e)}")
finally:
cursor.close()
connection.close()
def update_user_password(token: str, uid: int, new_password: str):
"""
管理员修改用户密码:
1. 验证 token 确保为管理员。
2. 确认用户存在。
3. 更新用户密码。
"""
# 1. 验证 token
try:
payload = verify_token(token) # 如果验证失败会抛出异常
admin_username = payload["sub"]
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
# 2. 确认管理员权限
connection = get_connection()
if not connection:
raise HTTPException(status_code=500, detail="Database connection failed!")
try:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_type FROM users WHERE username = %s;", (admin_username,))
admin_user = cursor.fetchone()
if not admin_user:
raise HTTPException(status_code=401, detail="Admin user not found.")
if admin_user["user_type"] != "admin":
raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.")
# 3. 检查用户是否存在
cursor.execute("SELECT * FROM users WHERE user_id = %s;", (uid,))
user_to_update = cursor.fetchone()
if not user_to_update:
raise HTTPException(status_code=404, detail=f"User with ID {uid} not found.")
# 4. 更新用户密码
hashed_password = hashlib.md5(new_password.encode()).hexdigest()
cursor.execute("UPDATE users SET password = %s WHERE user_id = %s;", (hashed_password, uid))
connection.commit()
return f"User with ID {uid} password updated successfully."
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error during password update: {str(e)}")
finally:
cursor.close()
connection.close()
# 在适当位置添加积分更新方法
def update_user_points(token: str, uid: int, points: int, reason: str):
"""更新用户积分"""
# 1. 验证 token
try:
payload = verify_token(token) # 如果验证失败会抛出异常
admin_username = payload["sub"]
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
# 2. 确认管理员权限
connection = get_connection()
if not connection:
raise HTTPException(status_code=500, detail="Database connection failed!")
try:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_type FROM users WHERE username = %s;", (admin_username,))
admin_user = cursor.fetchone()
if not admin_user:
raise HTTPException(status_code=401, detail="Admin user not found.")
if admin_user["user_type"] != "admin":
raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.")
# 更新积分
cursor.execute("UPDATE users SET points = points + %s WHERE user_id = %s",
(points, uid))
# 记录积分变动历史
cursor.execute(
"INSERT INTO points_history (user_id, change_amount, reason) VALUES (%s, %s, %s)",
(uid, points, reason)
)
connection.commit()
return f"用户 {uid} 积分已更新"
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error during password update: {str(e)}")
finally:
cursor.close()
connection.close()