Initial commit: Office 365 Self Service - 兑换码自助开通系统

This commit is contained in:
youbin
2026-03-28 00:32:30 +08:00
commit 8f5a643ed3
16 changed files with 1219 additions and 0 deletions

View File

@@ -0,0 +1,121 @@
from __future__ import annotations
import logging
from typing import Any
from .graph import GraphAPIError, GraphClient, TokenManager
from .settings import Settings
logger = logging.getLogger("office365_self_service.service")
class ServiceOperationError(RuntimeError):
def __init__(self, message: str, status_code: int = 400, details=None):
super().__init__(message)
self.message = message
self.status_code = status_code
self.details = details
class ServiceConfigurationError(RuntimeError):
pass
class Office365Service:
def __init__(self, settings: Settings):
self.settings = settings
self._graph_client: GraphClient | None = None
def _ensure_client(self) -> GraphClient:
if not self.settings.graph_ready:
joined = "".join(self.settings.validation_errors)
raise ServiceConfigurationError(f"Graph 配置不完整: {joined}")
if self._graph_client is None:
token_manager = TokenManager(
client_id=self.settings.client_id,
client_secret=self.settings.client_secret,
token_endpoint=self.settings.token_endpoint,
scope=self.settings.scope,
)
self._graph_client = GraphClient(token_manager, self.settings.graph_base_url)
return self._graph_client
def create_user(self, username: str, password: str | None = None, display_name: str | None = None) -> dict[str, Any]:
client = self._ensure_client()
upn = f"{username}@{self.settings.default_domain}"
password = password or self.settings.default_password
display_name = display_name or username
create_payload = {
"accountEnabled": True,
"displayName": display_name,
"mailNickname": username,
"userPrincipalName": upn,
"passwordProfile": {
"password": password,
"forceChangePasswordNextSignIn": self.settings.force_change_password,
},
"usageLocation": self.settings.default_usage_location,
}
try:
user = client.create_user(create_payload)
except GraphAPIError as exc:
raise self._translate_graph_error(exc, f"创建用户 {upn} 失败")
license_result = None
logger.info(f"Creating user: {upn}, default_license_sku: {self.settings.default_license_sku}")
if self.settings.default_license_sku:
license_result = self._assign_license(user["id"])
logger.info(f"License assignment result: {license_result}")
return {
"user": user,
"userPrincipalName": upn,
"temporaryPassword": password,
"licenseAssigned": bool(license_result),
"licenseResult": license_result,
}
def _assign_license(self, user_id: str) -> dict[str, Any]:
client = self._ensure_client()
sku_part_number = self.settings.default_license_sku
try:
skus = client.list_subscribed_skus()
except GraphAPIError as exc:
logger.warning("获取许可证列表失败: %s", exc)
return None
matched = next(
(sku for sku in skus if (sku.get("skuPartNumber") or "").upper() == sku_part_number.upper()),
None,
)
if not matched:
logger.warning("未找到许可证 SKU: %s", sku_part_number)
return None
if int(matched.get("consumedUnits", 0) or 0) >= int(matched.get("prepaidUnits", {}).get("enabled", 0) or 0):
logger.warning("许可证 %s 已无可用席位", sku_part_number)
return None
try:
return client.assign_license(
user_id,
add_licenses=[{"skuId": matched["skuId"], "disabledPlans": []}],
)
except GraphAPIError as exc:
logger.warning("分配许可证失败: %s", exc)
return None
def _translate_graph_error(self, exc: GraphAPIError, fallback_message: str) -> ServiceOperationError:
message = fallback_message
if exc.message:
message = f"{fallback_message}: {exc.message}"
status_code = exc.status_code or 502
lowered = message.lower()
if "already exists" in lowered or "another object with the same value" in lowered:
status_code = 409
return ServiceOperationError(message=message, status_code=status_code, details=exc.response)