71 lines
2.1 KiB
Python
71 lines
2.1 KiB
Python
import requests
|
||
import time
|
||
import configparser
|
||
|
||
# 全局变量用于缓存 Token 信息
|
||
_access_token = None
|
||
_expires_time = 0
|
||
_appid = None
|
||
_secret = None
|
||
|
||
|
||
def get_access_token() -> str:
|
||
"""
|
||
获取微信 Access Token(带缓存机制)
|
||
|
||
参数:
|
||
appid: 微信小程序 AppID
|
||
secret: 微信小程序 AppSecret
|
||
|
||
返回:
|
||
str: 有效的 Access Token
|
||
|
||
异常:
|
||
Exception: 包含错误信息的异常
|
||
"""
|
||
config = configparser.ConfigParser()
|
||
config.read('backend/config.conf')
|
||
wechat = config['wechat']
|
||
appid = wechat['appid']
|
||
secret = wechat['secret']
|
||
|
||
global _access_token, _expires_time, _appid, _secret
|
||
|
||
# 检查缓存有效性(相同凭证且未过期)
|
||
if _appid == appid and _secret == secret and _access_token and time.time() < _expires_time:
|
||
print(f"使用缓存的 Access Token: {_access_token},有效期至 {time.ctime(_expires_time)}")
|
||
return _access_token
|
||
|
||
# 请求新的 Access Token
|
||
url = "https://api.weixin.qq.com/cgi-bin/token"
|
||
params = {
|
||
"grant_type": "client_credential",
|
||
"appid": appid,
|
||
"secret": secret
|
||
}
|
||
|
||
try:
|
||
response = requests.get(url, params=params)
|
||
response.raise_for_status() # 检查 HTTP 状态码
|
||
data = response.json()
|
||
except requests.RequestException as e:
|
||
raise Exception(f"网络请求失败: {str(e)}")
|
||
except ValueError:
|
||
raise Exception("无效的 JSON 响应")
|
||
|
||
# 处理微信接口错误
|
||
if "errcode" in data:
|
||
raise Exception(f"微信接口错误 [{data['errcode']}]: {data['errmsg']}")
|
||
|
||
# 验证响应数据
|
||
if "access_token" not in data or "expires_in" not in data:
|
||
raise Exception("响应数据不完整")
|
||
|
||
# 更新缓存信息(预留 5 秒缓冲时间防止临界点)
|
||
_access_token = data["access_token"]
|
||
_expires_time = time.time() + data["expires_in"] - 5
|
||
_appid = appid
|
||
_secret = secret
|
||
print(f"Access Token 已更新: {_access_token},有效期至 {time.ctime(_expires_time)}")
|
||
|
||
return _access_token |