from multiprocessing import connection from fastapi import HTTPException from ..db import get_connection from ..utils.jwt_handler import verify_token import hashlib def query_users_by_range(token: str, start: int, end: int): """ 1. 验证 token 是否有效,以及是否是 admin 用户 2. 查询 user_id 在 [start, end] 范围内的用户 3. 返回用户的基本信息 """ # 1. 验证 token try: payload = verify_token(token) # 如果验证失败会抛出异常 username = payload["sub"] except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) # 2. 确认用户类型是否为 admin connection = get_connection() if not connection: raise HTTPException(status_code=500, detail="Database connection failed!") try: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT user_type FROM users WHERE username = %s;", (username,)) admin_user = cursor.fetchone() if not admin_user: raise HTTPException(status_code=401, detail="Admin user not found.") if admin_user["user_type"] != "admin": raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.") # 3. 查询范围内的用户 cursor.execute(""" SELECT user_id, username, points, phone_number, gender, email, user_type, created_at, updated_at FROM users WHERE user_id BETWEEN %s AND %s ORDER BY user_id ASC """, (start, end)) result = cursor.fetchall() # 4. 返回结果 return result except Exception as e: raise HTTPException(status_code=500, detail=f"Error during query: {str(e)}") finally: cursor.close() connection.close() def delete_user_by_token(token: str, uid: int): """ 删除用户逻辑: 1. 验证 token 确保为管理员。 2. 检查指定用户是否存在。 3. 检查用户是否有相关订单信息。 4. 删除用户。 """ # 1. 验证 token try: payload = verify_token(token) # 如果验证失败会抛出异常 username = payload["sub"] except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) # 2. 确认用户类型是否为 admin connection = get_connection() if not connection: raise HTTPException(status_code=500, detail="Database connection failed!") try: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT user_type FROM users WHERE username = %s;", (username,)) admin_user = cursor.fetchone() if not admin_user: raise HTTPException(status_code=401, detail="Admin user not found.") if admin_user["user_type"] != "admin": raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.") # 3. 检查用户是否存在 cursor.execute("SELECT * FROM users WHERE user_id = %s;", (uid,)) user_to_delete = cursor.fetchone() if not user_to_delete: raise HTTPException(status_code=404, detail=f"User with ID {uid} not found.") # 4. 检查用户是否有订单 cursor.execute("SELECT COUNT(*) AS order_count FROM orders WHERE user_id = %s;", (uid,)) order_info = cursor.fetchone() if order_info["order_count"] > 0: raise HTTPException(status_code=400, detail=f"Cannot delete user with ID {uid}: user has existing orders.") # 5. 删除用户 cursor.execute("DELETE FROM users WHERE user_id = %s;", (uid,)) connection.commit() return f"User with ID {uid} deleted successfully." # except Exception as e: # raise HTTPException(status_code=500, detail=f"Error during deletion: {str(e)}") finally: cursor.close() connection.close() def update_user(token: str, uid: int, username: str = None, email: str = None, phone_number: str = None, gender: str = None, user_type: str = None): """ 管理员更新用户信息: 1. 验证 token 确保为管理员。 2. 确认用户存在。 3. 根据提供的参数更新用户信息。 """ # 1. 验证 token try: payload = verify_token(token) # 如果验证失败会抛出异常 admin_username = payload["sub"] except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) # 2. 确认管理员权限 connection = get_connection() if not connection: raise HTTPException(status_code=500, detail="Database connection failed!") try: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT user_type FROM users WHERE username = %s;", (admin_username,)) admin_user = cursor.fetchone() if not admin_user: raise HTTPException(status_code=401, detail="Admin user not found.") if admin_user["user_type"] != "admin": raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.") # 3. 检查用户是否存在 cursor.execute("SELECT * FROM users WHERE user_id = %s;", (uid,)) user_to_update = cursor.fetchone() if not user_to_update: raise HTTPException(status_code=404, detail=f"User with ID {uid} not found.") # 4. 根据提供的字段更新用户信息 updates = [] params = [] print(username, email, phone_number, gender, user_type) if username: updates.append("username = %s") params.append(username) if email: updates.append("email = %s") params.append(email) if phone_number: updates.append("phone_number = %s") params.append(phone_number) if gender: updates.append("gender = %s") params.append(gender) if user_type: if user_type not in ['admin', 'player']: raise HTTPException(status_code=400, detail="Invalid user_type. Must be 'admin' or 'player'.") updates.append("user_type = %s") params.append(user_type) if not updates: raise HTTPException(status_code=400, detail="No fields to update.") params.append(uid) update_query = f"UPDATE users SET {', '.join(updates)} WHERE user_id = %s;" # 5. 执行更新操作 cursor.execute(update_query, tuple(params)) connection.commit() return f"User with ID {uid} updated successfully." except Exception as e: raise HTTPException(status_code=500, detail=f"Error during update: {str(e)}") finally: cursor.close() connection.close() def get_total_of_users(token: str): """ 获取用户总数 1. 验证 token 确保为管理员。 2. 查询总人数 """ # 1. 验证 token try: payload = verify_token(token) # 如果验证失败会抛出异常 admin_username = payload["sub"] except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) connection = get_connection() if not connection: raise HTTPException(status_code=500, detail="Database connection failed!") try: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT user_type FROM users WHERE username = %s;", (admin_username,)) admin_user = cursor.fetchone() if not admin_user: raise HTTPException(status_code=401, detail="Admin user not found.") if admin_user["user_type"] != "admin": raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.") cursor.execute("SELECT COUNT(*) AS user_count FROM users") result = cursor.fetchone() if not result: raise HTTPException(status_code=404, detail="Database connection failed!") return result["user_count"] finally: cursor.close() connection.close() def query_user(token: str, query_mode: str, query_value: str): """ 通过 `phone_number` / `email` / `username` / `uid` 进行模糊查询 """ # 1. 验证 token try: payload = verify_token(token) username = payload["sub"] except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) # 2. 确认管理员身份 connection = get_connection() if not connection: raise HTTPException(status_code=500, detail="Database connection failed!") try: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT user_type FROM users WHERE username = %s;", (username,)) admin_user = cursor.fetchone() if not admin_user: raise HTTPException(status_code=401, detail="Admin user not found.") if admin_user["user_type"] != "admin": raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.") # 3. 根据查询模式构建 SQL valid_modes = { "phone_number": "phone_number", "email": "email", "username": "username", "uid": "user_id" } if query_mode not in valid_modes: raise HTTPException(status_code=400, detail="Invalid query mode.") search_column = valid_modes[query_mode] # `uid` 应该是精确匹配,而不是模糊查询 if query_mode == "uid": query = f""" SELECT user_id, username, points, phone_number, gender, email, user_type, created_at, updated_at FROM users WHERE {search_column} = %s """ cursor.execute(query, (query_value,)) else: # 其余模式使用 `LIKE` 进行模糊查询 query = f""" SELECT user_id, username, points, phone_number, gender, email, user_type, created_at, updated_at FROM users WHERE {search_column} LIKE %s """ search_value = f"%{query_value}%" # 模糊匹配 cursor.execute(query, (search_value,)) users = cursor.fetchall() if not users: raise HTTPException(status_code=404, detail="User not found.") return users except Exception as e: raise HTTPException(status_code=500, detail=f"Error during query: {str(e)}") finally: cursor.close() connection.close() def update_user_password(token: str, uid: int, new_password: str): """ 管理员修改用户密码: 1. 验证 token 确保为管理员。 2. 确认用户存在。 3. 更新用户密码。 """ # 1. 验证 token try: payload = verify_token(token) # 如果验证失败会抛出异常 admin_username = payload["sub"] except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) # 2. 确认管理员权限 connection = get_connection() if not connection: raise HTTPException(status_code=500, detail="Database connection failed!") try: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT user_type FROM users WHERE username = %s;", (admin_username,)) admin_user = cursor.fetchone() if not admin_user: raise HTTPException(status_code=401, detail="Admin user not found.") if admin_user["user_type"] != "admin": raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.") # 3. 检查用户是否存在 cursor.execute("SELECT * FROM users WHERE user_id = %s;", (uid,)) user_to_update = cursor.fetchone() if not user_to_update: raise HTTPException(status_code=404, detail=f"User with ID {uid} not found.") # 4. 更新用户密码 hashed_password = hashlib.md5(new_password.encode()).hexdigest() cursor.execute("UPDATE users SET password = %s WHERE user_id = %s;", (hashed_password, uid)) connection.commit() return f"User with ID {uid} password updated successfully." except Exception as e: raise HTTPException(status_code=500, detail=f"Error during password update: {str(e)}") finally: cursor.close() connection.close() # 在适当位置添加积分更新方法 def update_user_points(token: str, uid: int, points: int, reason: str): """更新用户积分""" # 1. 验证 token try: payload = verify_token(token) # 如果验证失败会抛出异常 admin_username = payload["sub"] except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) # 2. 确认管理员权限 connection = get_connection() if not connection: raise HTTPException(status_code=500, detail="Database connection failed!") try: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT user_type FROM users WHERE username = %s;", (admin_username,)) admin_user = cursor.fetchone() if not admin_user: raise HTTPException(status_code=401, detail="Admin user not found.") if admin_user["user_type"] != "admin": raise HTTPException(status_code=403, detail="Permission denied: Not an admin user.") # 更新积分 cursor.execute("UPDATE users SET points = points + %s WHERE user_id = %s", (points, uid)) # 记录积分变动历史 cursor.execute( "INSERT INTO points_history (user_id, change_amount, reason) VALUES (%s, %s, %s)", (uid, points, reason) ) connection.commit() return f"用户 {uid} 积分已更新" except Exception as e: raise HTTPException(status_code=500, detail=f"Error during password update: {str(e)}") finally: cursor.close() connection.close()