from __future__ import annotations import logging from typing import Any from .graph import GraphAPIError, GraphClient, TokenManager from .settings import Settings logger = logging.getLogger("office365_self_service.service") class ServiceOperationError(RuntimeError): def __init__(self, message: str, status_code: int = 400, details=None): super().__init__(message) self.message = message self.status_code = status_code self.details = details class ServiceConfigurationError(RuntimeError): pass class Office365Service: def __init__(self, settings: Settings): self.settings = settings self._graph_client: GraphClient | None = None def _ensure_client(self) -> GraphClient: if not self.settings.graph_ready: joined = ";".join(self.settings.validation_errors) raise ServiceConfigurationError(f"Graph 配置不完整: {joined}") if self._graph_client is None: token_manager = TokenManager( client_id=self.settings.client_id, client_secret=self.settings.client_secret, token_endpoint=self.settings.token_endpoint, scope=self.settings.scope, ) self._graph_client = GraphClient(token_manager, self.settings.graph_base_url) return self._graph_client def create_user(self, username: str, password: str | None = None, display_name: str | None = None, retry: bool = True) -> dict[str, Any]: client = self._ensure_client() upn = f"{username}@{self.settings.default_domain}" password = password or self.settings.default_password display_name = display_name or username create_payload = { "accountEnabled": True, "displayName": display_name, "mailNickname": username, "userPrincipalName": upn, "passwordProfile": { "password": password, "forceChangePasswordNextSignIn": self.settings.force_change_password, }, "usageLocation": self.settings.default_usage_location, } try: user = client.create_user(create_payload) except GraphAPIError as exc: if retry and "token is expired" in str(exc).lower(): logger.info("Token expired, refreshing and retrying...") self._graph_client.token_manager.clear_token() return self.create_user(username, password, display_name, retry=False) raise self._translate_graph_error(exc, f"创建用户 {upn} 失败") license_result = None if self.settings.default_license_sku: license_result = self._assign_license(user["id"]) return { "user": user, "userPrincipalName": upn, "temporaryPassword": password, "licenseAssigned": bool(license_result), "licenseResult": license_result, } def _assign_license(self, user_id: str) -> dict[str, Any]: client = self._ensure_client() sku_part_number = self.settings.default_license_sku try: skus = client.list_subscribed_skus() except GraphAPIError as exc: logger.warning("获取许可证列表失败: %s", exc) return None matched = next( (sku for sku in skus if (sku.get("skuPartNumber") or "").upper() == sku_part_number.upper()), None, ) if not matched: logger.warning("未找到许可证 SKU: %s", sku_part_number) return None if int(matched.get("consumedUnits", 0) or 0) >= int(matched.get("prepaidUnits", {}).get("enabled", 0) or 0): logger.warning("许可证 %s 已无可用席位", sku_part_number) return None try: return client.assign_license( user_id, add_licenses=[{"skuId": matched["skuId"], "disabledPlans": []}], ) except GraphAPIError as exc: logger.warning("分配许可证失败: %s", exc) return None def _translate_graph_error(self, exc: GraphAPIError, fallback_message: str) -> ServiceOperationError: message = fallback_message if exc.message: message = f"{fallback_message}: {exc.message}" status_code = exc.status_code or 502 lowered = message.lower() if "already exists" in lowered or "another object with the same value" in lowered: status_code = 409 return ServiceOperationError(message=message, status_code=status_code, details=exc.response)