Fix token expiration issue - auto refresh token before expiry

This commit is contained in:
youbin
2026-03-28 15:09:09 +08:00
parent 841d30d3c3
commit 3317b8e8cd
2 changed files with 15 additions and 5 deletions

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import logging
import time
from typing import Any
import requests
@@ -16,11 +17,13 @@ class TokenManager:
self.token_endpoint = token_endpoint
self.scope = scope
self._token: str | None = None
self._token_expires_at: float = 0
def get_token(self) -> str:
if self._token:
if self._token and time.time() < self._token_expires_at - 60:
return self._token
self.clear_token()
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
@@ -29,11 +32,16 @@ class TokenManager:
}
response = requests.post(self.token_endpoint, data=data, timeout=30)
response.raise_for_status()
self._token = response.json()["access_token"]
token_data = response.json()
self._token = token_data["access_token"]
expires_in = token_data.get("expires_in", 3600)
self._token_expires_at = time.time() + expires_in
logger.info(f"Token refreshed, expires in {expires_in} seconds")
return self._token
def clear_token(self) -> None:
self._token = None
self._token_expires_at = 0
class GraphAPIError(Exception):

View File

@@ -42,7 +42,7 @@ class Office365Service:
self._graph_client = GraphClient(token_manager, self.settings.graph_base_url)
return self._graph_client
def create_user(self, username: str, password: str | None = None, display_name: str | None = None) -> dict[str, Any]:
def create_user(self, username: str, password: str | None = None, display_name: str | None = None, retry: bool = True) -> dict[str, Any]:
client = self._ensure_client()
upn = f"{username}@{self.settings.default_domain}"
@@ -64,13 +64,15 @@ class Office365Service:
try:
user = client.create_user(create_payload)
except GraphAPIError as exc:
if retry and "token is expired" in str(exc).lower():
logger.info("Token expired, refreshing and retrying...")
self._graph_client.token_manager.clear_token()
return self.create_user(username, password, display_name, retry=False)
raise self._translate_graph_error(exc, f"创建用户 {upn} 失败")
license_result = None
logger.info(f"Creating user: {upn}, default_license_sku: {self.settings.default_license_sku}")
if self.settings.default_license_sku:
license_result = self._assign_license(user["id"])
logger.info(f"License assignment result: {license_result}")
return {
"user": user,