# users.py from flask import Blueprint, render_template, request, redirect, url_for, flash, session, jsonify import requests import math from frontend.config import Config users_bp = Blueprint('users', __name__, url_prefix='/users') # 显示用户列表,带分页 @users_bp.route('/') def list_users(): """显示用户列表,带分页""" if not session.get('token'): flash("请先登录", "warning") return redirect(url_for('auth.login')) # 1. 获取用户总数 response = requests.post(f'{Config.BASE_API_URL}/admin/users/sum', json={"token": session.get('token')}) total_users = 0 if response.status_code == 200: try: total_users = int(response.json().get('message', 0)) except (ValueError, TypeError): total_users = 0 # 2. 分页查询用户 page = request.args.get('page', 1, type=int) start = (page - 1) * 20 + 1 end = page * 20 coupon_resp = requests.get( f"{Config.BASE_API_URL}/admin/coupons", params={"token": session['token']} ) coupons = coupon_resp.json() if coupon_resp.status_code == 200 else [] response2 = requests.post(f'{Config.BASE_API_URL}/admin/users/query', json={"token": session.get('token'), "start": start, "end": end}) users = response2.json() if response2.status_code == 200 else [] total_pages = math.ceil(total_users / 20) return render_template("users.html", users=users, coupons=coupons, page=page, total_users=total_users, total_pages=total_pages) # 搜索接口 @users_bp.route('/search', methods=['POST']) def search_users(): """ 前端在 /users 页面点击搜索时 POST 到 /users/search 不需要从前端传 token,直接使用 session['token'] """ if not session.get('token'): return jsonify([]) # 或返回 401 data = request.get_json() or {} query_mode = data.get('query_mode') query_value = data.get('query_value') # 若搜索值为空,直接返回空 if not query_value: return jsonify([]) payload = { "token": session['token'], "query_mode": query_mode, "query_value": query_value } resp = requests.post(f"{Config.BASE_API_URL}/admin/users/search", json=payload) if resp.status_code == 200: return jsonify(resp.json()) return jsonify([]) # 删除用户 @users_bp.route('/delete_user', methods=['POST']) def delete_user(): if not session.get('token'): flash("请先登录", "warning") return redirect(url_for('auth.login')) uid = request.form.get("uid") page = request.args.get('page', 1, type=int) payload = { "token": session['token'], "uid": int(uid) } resp = requests.post(f"{Config.BASE_API_URL}/admin/users/del", json=payload) if resp.status_code == 200: flash("删除成功", "success") else: flash("删除失败", "danger") return redirect(url_for('users.list_users', page=page)) # 更新用户 @users_bp.route('/update_user', methods=['POST']) def update_user(): if not session.get('token'): flash("请先登录", "warning") return redirect(url_for('auth.login')) uid = request.form.get('uid') page = request.args.get('page', 1, type=int) payload = { "token": session['token'], "uid": int(uid), "username": request.form.get('username'), "email": request.form.get('email'), "phone_number": request.form.get('phone_number'), "gender": request.form.get('gender'), "user_type": request.form.get('user_type') } resp = requests.post(f"{Config.BASE_API_URL}/admin/users/update", json=payload) if resp.status_code == 200: flash("更新成功", "success") else: flash("更新失败", "danger") return redirect(url_for('users.list_users', page=page)) # 重置密码(实现具体逻辑) @users_bp.route('/reset_password', methods=['POST']) def reset_password(): """ 这个路由接收新密码并调用后端 API 进行密码重置。 """ if not session.get('token'): flash("请先登录", "warning") return redirect(url_for('auth.login')) uid = request.form.get('uid') new_password = request.form.get('password') # 确保 uid 和 new_password 不为空 if not uid or not new_password: flash("缺少必要参数", "danger") return redirect(url_for('users.list_users', page=request.args.get('page', 1, type=int))) payload = { "token": session['token'], "uid": int(uid), "new_password": new_password } resp = requests.post(f"{Config.BASE_API_URL}/admin/users/update_password", json=payload) if resp.status_code == 200: flash("重置密码成功", "success") else: flash("重置密码失败", "danger") return redirect(url_for('users.list_users', page=request.args.get('page', 1, type=int))) # 在users_bp蓝图下添加新路由 @users_bp.route('/adjust_points', methods=['POST']) def adjust_points(): if not session.get('token'): flash("请先登录", "warning") return redirect(url_for('auth.login')) uid = request.form.get('uid') points = int(request.form.get('points')) reason = request.form.get('reason') payload = { "token": session['token'], "uid": uid, "points": points, "reason": reason } resp = requests.post(f"{Config.BASE_API_URL}/admin/users/update_points", json=payload) if resp.status_code == 200: flash("积分更新成功", "success") else: flash("积分更新失败", "danger") return redirect(url_for('users.list_users', page=request.args.get('page', 1))) @users_bp.route('/issue-coupon', methods=['POST']) def issue_coupon(): if not session.get('token'): return redirect(url_for('auth.login')) try: user_id = int(request.form['user_id']) coupon_id = int(request.form['coupon_id']) # 通过Flask中转请求到FastAPI response = requests.post( f"{Config.BASE_API_URL}/admin/coupons/issue", json={ "token": session['token'], "user_id": user_id, "coupon_id": coupon_id } ) print(request.form) if response.status_code == 200: flash("优惠券发放成功", "success") else: flash(response.json().get('detail', '发放失败'), "danger") except requests.exceptions.ConnectionError: flash("后端服务不可用", "danger") return redirect(url_for('users.list_users'))