Files
office365-self-service/office365_self_service/services.py

121 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import logging
from typing import Any
from .graph import GraphAPIError, GraphClient, TokenManager
from .settings import Settings
logger = logging.getLogger("office365_self_service.service")
class ServiceOperationError(RuntimeError):
def __init__(self, message: str, status_code: int = 400, details=None):
super().__init__(message)
self.message = message
self.status_code = status_code
self.details = details
class ServiceConfigurationError(RuntimeError):
pass
class Office365Service:
def __init__(self, settings: Settings):
self.settings = settings
self._graph_client: GraphClient | None = None
def _ensure_client(self) -> GraphClient:
if not self.settings.graph_ready:
joined = "".join(self.settings.validation_errors)
raise ServiceConfigurationError(f"Graph 配置不完整: {joined}")
if self._graph_client is None:
token_manager = TokenManager(
client_id=self.settings.client_id,
client_secret=self.settings.client_secret,
token_endpoint=self.settings.token_endpoint,
scope=self.settings.scope,
)
self._graph_client = GraphClient(token_manager, self.settings.graph_base_url)
return self._graph_client
def create_user(self, username: str, password: str | None = None, display_name: str | None = None) -> dict[str, Any]:
client = self._ensure_client()
upn = f"{username}@{self.settings.default_domain}"
password = password or self.settings.default_password
display_name = display_name or username
create_payload = {
"accountEnabled": True,
"displayName": display_name,
"mailNickname": username,
"userPrincipalName": upn,
"passwordProfile": {
"password": password,
"forceChangePasswordNextSignIn": self.settings.force_change_password,
},
"usageLocation": self.settings.default_usage_location,
}
try:
user = client.create_user(create_payload)
except GraphAPIError as exc:
raise self._translate_graph_error(exc, f"创建用户 {upn} 失败")
license_result = None
logger.info(f"Creating user: {upn}, default_license_sku: {self.settings.default_license_sku}")
if self.settings.default_license_sku:
license_result = self._assign_license(user["id"])
logger.info(f"License assignment result: {license_result}")
return {
"user": user,
"userPrincipalName": upn,
"temporaryPassword": password,
"licenseAssigned": bool(license_result),
"licenseResult": license_result,
}
def _assign_license(self, user_id: str) -> dict[str, Any]:
client = self._ensure_client()
sku_part_number = self.settings.default_license_sku
try:
skus = client.list_subscribed_skus()
except GraphAPIError as exc:
logger.warning("获取许可证列表失败: %s", exc)
return None
matched = next(
(sku for sku in skus if (sku.get("skuPartNumber") or "").upper() == sku_part_number.upper()),
None,
)
if not matched:
logger.warning("未找到许可证 SKU: %s", sku_part_number)
return None
if int(matched.get("consumedUnits", 0) or 0) >= int(matched.get("prepaidUnits", {}).get("enabled", 0) or 0):
logger.warning("许可证 %s 已无可用席位", sku_part_number)
return None
try:
return client.assign_license(
user_id,
add_licenses=[{"skuId": matched["skuId"], "disabledPlans": []}],
)
except GraphAPIError as exc:
logger.warning("分配许可证失败: %s", exc)
return None
def _translate_graph_error(self, exc: GraphAPIError, fallback_message: str) -> ServiceOperationError:
message = fallback_message
if exc.message:
message = f"{fallback_message}: {exc.message}"
status_code = exc.status_code or 502
lowered = message.lower()
if "already exists" in lowered or "another object with the same value" in lowered:
status_code = 409
return ServiceOperationError(message=message, status_code=status_code, details=exc.response)