Compare commits

..

2 Commits

Author SHA1 Message Date
zeer
a65b67485e Add Yaohuo verification-based self-service signup 2026-04-15 15:36:50 +08:00
youbin
de130f1052 Harden redemption flow and improve operational safety 2026-03-31 08:14:31 +08:00
14 changed files with 1696 additions and 116 deletions

View File

@@ -9,6 +9,7 @@ PORT=8000
DEBUG=false
# 数据库 (SQLite)
# 默认 sqlite:///redemption.db 会落到 Flask 的 instance/redemption.db
DATABASE_URL=sqlite:///redemption.db
# Flask会话密钥 (建议使用随机长字符串)
@@ -30,4 +31,11 @@ DEFAULT_DOMAIN=yourtenant.onmicrosoft.com
DEFAULT_PASSWORD=P@ssw0rd123!
DEFAULT_USAGE_LOCATION=US
DEFAULT_LICENSE_SKU=ENTERPRISEPACK
FORCE_CHANGE_PASSWORD=true
LICENSE_ASSIGNMENT_REQUIRED=false
FORCE_CHANGE_PASSWORD=true
# 妖火论坛私信验证
YAOHUO_VERIFICATION_ENABLED=false
YAOHUO_COOKIE=
YAOHUO_MESSAGE_URL=https://www.yaohuo.me/bbs/messagelist_add.aspx
YAOHUO_VERIFICATION_CODE_TTL_SECONDS=600

38
AGENTS.md Normal file
View File

@@ -0,0 +1,38 @@
# AGENTS.md
## Stack And Entry Points
- Single-package Flask app. Runtime entrypoint is `app.py`, which exposes `app = create_app()` for Gunicorn and local runs.
- App wiring lives in `office365_self_service/__init__.py`: settings load first, logging is configured, SQLite directories are created if needed, `db.create_all()` runs during app startup, and blueprints from `office365_self_service/routes.py` are registered there.
- Core behavior is split by file: `routes.py` handles both HTML pages and JSON APIs, `services.py` contains Office 365 business logic, `graph.py` wraps Microsoft Graph calls, and `models.py` defines the SQLite-backed SQLAlchemy models.
## Commands
- Create a local env and install deps with `python3 -m venv .venv && source .venv/bin/activate && pip install -r requirements.txt`.
- Run locally with `python3 app.py`.
- Run the test suite with `python3 -m unittest`.
- Run one test class with `python3 -m unittest tests.test_app.AppRouteTests`.
- Run one test method with `python3 -m unittest tests.test_app.AppRouteTests.test_redeem_marks_code_used_and_prevents_second_use`.
- Container flow is `docker compose up -d --build`, with healthcheck hitting `http://localhost:8000/api/health`.
## Verified Repo Conventions
- There is no configured linter, formatter, typechecker, pytest, or task runner in this repo. Do not invent `pytest`, `ruff`, `mypy`, `npm`, or `make` workflows.
- Tests use the standard library `unittest` module in `tests/test_app.py`.
- The Docker image runs Gunicorn via `gunicorn -w 2 -b 0.0.0.0:8000 app:app`; local development uses Flask's built-in server from `app.py`.
## Config And Data Gotchas
- Copy `.env.example` to `.env` before running locally or via Docker Compose.
- Graph readiness is configuration-driven in `office365_self_service/settings.py`. Missing `CLIENT_ID`, `TENANT_ID`, `CLIENT_SECRET`, or `DEFAULT_PASSWORD` does not stop app startup, but service calls fail later with configuration errors.
- `WEB_AUTH_ENABLED=true` only enables real admin login protection when both `ADMIN_USERNAME` and `ADMIN_PASSWORD` are set. Otherwise settings downgrade to effectively unauthenticated admin APIs and emit a warning.
- Relative SQLite URLs such as `sqlite:///redemption.db` resolve through Flask into `instance/redemption.db`.
- If `DATABASE_URL` is set to a container path like `sqlite:////app/data/redemption.db` outside Docker, settings automatically remap it to the matching local repo path and record a warning.
- App startup creates tables automatically with `db.create_all()`. There are no migrations in this repo.
- Logs are written to `logs/office365_self_service.log` via a rotating file handler during app startup.
## API And Behavior Notes
- Public health/config endpoints are `/api/health` and `/api/config`. Admin also has `/admin/api/health`, `/admin/api/session`, and authenticated config/data endpoints under `/admin/api/*`.
- Redemption is stateful in the database: codes move `available -> processing -> used`, and failures release codes back to `available`.
- Username handling is easy to guess wrong: plain usernames are lowercased and expanded with `DEFAULT_DOMAIN`; full UPNs are accepted as-is. If `DEFAULT_DOMAIN` is empty, callers must submit a full email address.
- License assignment is optional. When `DEFAULT_LICENSE_SKU` is set and `LICENSE_ASSIGNMENT_REQUIRED=true`, failed license assignment triggers deletion of the newly created user and surfaces an error instead of a warning.
## Editing Guidance
- Preserve the app-factory shape and the `service_factory` injection seam in `create_app()`. Tests rely on injecting fake services there.
- Keep focused verification lightweight: for most backend changes, run the relevant `python3 -m unittest` target rather than assuming extra tooling exists.

View File

@@ -8,6 +8,7 @@
- **自助开通**:用户输入兑换码和用户名自助开通 Office 365 账号
- **自动授权**:账号开通时自动分配许可证
- **兑换记录**:后台记录兑换码与已开通账号的对应关系
- **审计日志**:后台分页查看生成、删除、兑换成功/失败等关键事件
## 项目结构
@@ -80,15 +81,22 @@ docker compose down
| `DEFAULT_PASSWORD` | 是 | 新建账号默认密码 | 自定义高强度密码 |
| `DEFAULT_DOMAIN` | 建议 | 默认域名 | 例如 `yourtenant.onmicrosoft.com` |
| `DEFAULT_LICENSE_SKU` | 可选 | 默认许可证 SKU | 例如 `ENTERPRISEPACK``M365_BUSINESS_PREMIUM` |
| `LICENSE_ASSIGNMENT_REQUIRED` | 可选 | 许可证分配失败时是否回滚删除新账号 | 默认 `false` |
| `DEFAULT_USAGE_LOCATION` | 建议 | 默认使用地区 | 国际版常用:`US``SG``JP` |
| `WEB_AUTH_ENABLED` | 可选 | 后台登录保护 | `true``false` |
| `ADMIN_USERNAME` | 建议 | 后台登录用户名 | 自定义 |
| `ADMIN_PASSWORD` | 建议 | 后台登录密码 | 自定义 |
| `YAOHUO_VERIFICATION_ENABLED` | 可选 | 是否启用妖火论坛私信验证免兑换码开通 | `true``false` |
| `YAOHUO_COOKIE` | 妖火验证必填 | 后台已登录妖火账号的 Cookie | 浏览器复制完整 Cookie |
| `YAOHUO_MESSAGE_URL` | 可选 | 妖火发私信地址 | 默认 `https://www.yaohuo.me/bbs/messagelist_add.aspx` |
| `YAOHUO_VERIFICATION_CODE_TTL_SECONDS` | 可选 | 妖火验证码有效期(秒) | 默认 `600` |
| `SESSION_SECRET` | 建议 | Flask 会话密钥 | 随机长字符串 |
| `HOST` | 可选 | 服务监听地址 | 默认 `0.0.0.0` |
| `PORT` | 可选 | 服务监听端口 | 默认 `8000` |
| `DEBUG` | 可选 | 调试模式 | 默认 `false` |
提示:如果本地误用了容器内的 SQLite 路径(例如 `sqlite:////app/data/redemption.db`),项目现在会自动映射到当前仓库下的对应本地路径。
### Entra ID (Azure AD) 应用配置
1. **创建应用注册**
@@ -126,7 +134,7 @@ docker compose down
1. 使用设置的 admin 账号登录
2. 点击「生成兑换码」批量生成兑换码
3. 可以查看所有兑换码兑换记录
3. 可以查看兑换码兑换记录和审计日志
### 用户自助开通
@@ -137,6 +145,15 @@ docker compose down
3. 点击「立即开通」
4. 系统返回临时密码,首次登录后需更改密码
### 妖火论坛验证开通
1.`.env` 中启用 `YAOHUO_VERIFICATION_ENABLED=true`
2. 配置可用的 `YAOHUO_COOKIE`
3. 用户在首页切换到「妖火验证开通」
4. 输入目标妖火 ID系统向该 ID 发送私信验证码
5. 对方提供验证码后完成验证
6. 验证通过后无需兑换码即可直接开通账号
## 技术栈
- Python 3.9+
@@ -148,5 +165,6 @@ docker compose down
## 注意事项
- `DEFAULT_LICENSE_SKU` 必须是租户中实际存在的 SKU 名称
- 如果希望“建号和授权”保持强一致,可设置 `LICENSE_ASSIGNMENT_REQUIRED=true`
- 兑换码使用后立即失效,无法重复使用
- 生产环境建议使用 `DEBUG=false` 并配置反向代理
- 生产环境建议使用 `DEBUG=false` 并配置反向代理

View File

@@ -5,12 +5,13 @@ services:
- "8000:8000"
volumes:
- ./data:/app/data
- ./instance:/app/instance
- ./logs:/app/logs
env_file:
- .env
restart: unless-stopped
healthcheck:
test: ["CMD", "python", "-c", "from urllib.request import urlopen; urlopen('http://localhost:8000/admin/api/health', timeout=5).read()"]
test: ["CMD", "python", "-c", "from urllib.request import urlopen; urlopen('http://localhost:8000/api/health', timeout=5).read()"]
interval: 30s
timeout: 10s
retries: 3

View File

@@ -9,14 +9,22 @@ from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import event
from sqlalchemy.engine import Engine
from sqlalchemy.engine.url import make_url
from .services import Office365Service
from .services import Office365Service, YaohuoVerificationService
from .settings import Settings, load_settings
db = SQLAlchemy()
def _ensure_sqlite_directory(database_url: str) -> None:
url = make_url(database_url)
if url.drivername != "sqlite" or not url.database or url.database == ":memory:":
return
Path(url.database).parent.mkdir(parents=True, exist_ok=True)
def _configure_logging(app: Flask) -> None:
log_dir = Path(app.root_path).parent / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
@@ -62,11 +70,12 @@ def create_app(
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
_configure_logging(app)
_ensure_sqlite_directory(settings.database_url)
db.init_app(app)
with app.app_context():
from .models import RedemptionCode
from .models import AuditEvent, RedemptionCode
db.create_all()
if service_factory is None:
@@ -77,10 +86,11 @@ def create_app(
service = service_factory
app.extensions["office365_service"] = service
app.extensions["yaohuo_verification_service"] = YaohuoVerificationService(settings)
from .routes import bp_admin, bp_user
app.register_blueprint(bp_admin)
app.register_blueprint(bp_user)
return app
return app

View File

@@ -30,9 +30,29 @@ class TokenManager:
"client_secret": self.client_secret,
"scope": self.scope,
}
response = requests.post(self.token_endpoint, data=data, timeout=30)
response.raise_for_status()
token_data = response.json()
try:
response = requests.post(self.token_endpoint, data=data, timeout=30)
response.raise_for_status()
except requests.RequestException as exc:
status_code = getattr(getattr(exc, "response", None), "status_code", 0) or 0
response_payload = None
response_text = ""
if getattr(exc, "response", None) is not None:
response_text = exc.response.text[:200]
try:
response_payload = exc.response.json()
except ValueError:
response_payload = None
message = "获取访问令牌失败"
if response_text:
message = f"{message}: {response_text}"
raise GraphAPIError(message, status_code=status_code, response=response_payload) from exc
try:
token_data = response.json()
except ValueError as exc:
raise GraphAPIError("解析访问令牌响应失败", response.status_code) from exc
self._token = token_data["access_token"]
expires_in = token_data.get("expires_in", 3600)
self._token_expires_at = time.time() + expires_in
@@ -127,4 +147,4 @@ class GraphClient:
else:
payload["addLicenses"] = []
payload["removeLicenses"] = remove_licenses if remove_licenses else []
return self.post(f"/users/{user_id}/assignLicense", json=payload)
return self.post(f"/users/{user_id}/assignLicense", json=payload)

View File

@@ -1,17 +1,30 @@
from __future__ import annotations
from datetime import datetime
import json
from datetime import datetime, timezone
from . import db
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def serialize_datetime(value: datetime | None) -> str | None:
if value is None:
return None
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
class RedemptionCode(db.Model):
__tablename__ = "redemption_codes"
id = db.Column(db.Integer, primary_key=True)
code = db.Column(db.String(64), unique=True, nullable=False, index=True)
status = db.Column(db.String(16), nullable=False, default="available")
created_at = db.Column(db.DateTime, nullable=False, default=lambda: datetime.now())
created_at = db.Column(db.DateTime, nullable=False, default=utc_now)
used_at = db.Column(db.DateTime, nullable=True)
used_by_username = db.Column(db.String(256), nullable=True)
used_by_principal_name = db.Column(db.String(256), nullable=True)
@@ -21,8 +34,42 @@ class RedemptionCode(db.Model):
"id": self.id,
"code": self.code,
"status": self.status,
"createdAt": self.created_at.isoformat() if self.created_at else None,
"usedAt": self.used_at.isoformat() if self.used_at else None,
"createdAt": serialize_datetime(self.created_at),
"usedAt": serialize_datetime(self.used_at),
"usedByUsername": self.used_by_username,
"usedByPrincipalName": self.used_by_principal_name,
}
}
class AuditEvent(db.Model):
__tablename__ = "audit_events"
id = db.Column(db.Integer, primary_key=True)
event_type = db.Column(db.String(64), nullable=False, index=True)
status = db.Column(db.String(16), nullable=False, default="success", index=True)
actor = db.Column(db.String(128), nullable=False, default="system")
code = db.Column(db.String(64), nullable=True, index=True)
username = db.Column(db.String(256), nullable=True)
principal_name = db.Column(db.String(256), nullable=True)
details = db.Column(db.Text, nullable=True)
created_at = db.Column(db.DateTime, nullable=False, default=utc_now, index=True)
def to_dict(self):
parsed_details = None
if self.details:
try:
parsed_details = json.loads(self.details)
except ValueError:
parsed_details = {"raw": self.details}
return {
"id": self.id,
"eventType": self.event_type,
"status": self.status,
"actor": self.actor,
"code": self.code,
"username": self.username,
"principalName": self.principal_name,
"details": parsed_details,
"createdAt": serialize_datetime(self.created_at),
}

View File

@@ -1,19 +1,27 @@
from __future__ import annotations
import json
import logging
import secrets
from datetime import datetime, timezone, timedelta
from datetime import datetime, timezone
from functools import wraps
from flask import Blueprint, current_app, jsonify, render_template, request, session
from sqlalchemy import func
from sqlalchemy import func, update
from . import db
from .models import RedemptionCode
from .services import Office365Service, ServiceConfigurationError, ServiceOperationError
from .models import AuditEvent, RedemptionCode, utc_now
from .services import Office365Service, ServiceConfigurationError, ServiceOperationError, YaohuoVerificationService
bp_admin = Blueprint("admin", __name__, url_prefix="/admin")
bp_user = Blueprint("user", __name__)
logger = logging.getLogger("office365_self_service.routes")
STATUS_AVAILABLE = "available"
STATUS_PROCESSING = "processing"
STATUS_USED = "used"
YAOHUO_SESSION_KEY = "yaohuo_verification"
def _settings():
@@ -24,6 +32,10 @@ def _service() -> Office365Service:
return current_app.extensions["office365_service"]
def _yaohuo_service() -> YaohuoVerificationService:
return current_app.extensions["yaohuo_verification_service"]
def _success(data=None, message: str = "ok", status: int = 200):
return jsonify({"success": True, "message": message, "data": data}), status
@@ -67,6 +79,232 @@ def _json_payload() -> dict:
return request.get_json(silent=True) or {}
def _session_verification_state() -> dict:
payload = session.get(YAOHUO_SESSION_KEY)
return payload if isinstance(payload, dict) else {}
def _clear_yaohuo_verification() -> None:
session.pop(YAOHUO_SESSION_KEY, None)
def _verification_expired(expires_at: str | None) -> bool:
if not expires_at:
return True
try:
return datetime.fromisoformat(expires_at) <= datetime.now(timezone.utc)
except ValueError:
return True
def _store_yaohuo_verification(target_user_id: str, code: str, expires_at: str) -> None:
session[YAOHUO_SESSION_KEY] = {
"targetUserId": target_user_id,
"code": code,
"expiresAt": expires_at,
"verified": False,
}
def _mark_yaohuo_verified() -> None:
state = _session_verification_state()
if not state:
return
state["verified"] = True
state.pop("code", None)
session[YAOHUO_SESSION_KEY] = state
def _code_match(code: str):
return func.lower(RedemptionCode.code) == code.lower()
def _health_payload() -> dict:
settings = _settings()
return {
"platform": settings.to_public_dict(),
"authenticated": _authenticated(),
"yaohuoVerified": bool(_session_verification_state().get("verified")),
}
def _current_actor(default: str = "system") -> str:
if _authenticated():
return session.get("admin_username") or _settings().admin_username or default
return default
def _build_audit_event(
event_type: str,
*,
status: str = "success",
actor: str | None = None,
code: str | None = None,
username: str | None = None,
principal_name: str | None = None,
details: dict | None = None,
) -> AuditEvent:
return AuditEvent(
event_type=event_type,
status=status,
actor=actor or "system",
code=code,
username=username,
principal_name=principal_name,
details=json.dumps(details, ensure_ascii=False, sort_keys=True) if details is not None else None,
)
def _provision_account(username: str):
actor = _current_actor("public")
try:
user_result = _service().create_user(username=username)
except ServiceConfigurationError as exc:
_record_audit_events(
_build_audit_event(
"account_provisioned",
status="failed",
actor=actor,
username=username,
details={"message": str(exc)},
)
)
return _error(str(exc), status=503)
except ServiceOperationError as exc:
_record_audit_events(
_build_audit_event(
"account_provisioned",
status="failed",
actor=actor,
username=username,
principal_name=(exc.details or {}).get("userPrincipalName") if isinstance(exc.details, dict) else None,
details={"message": exc.message, "serviceDetails": exc.details},
)
)
return _error(exc.message, status=exc.status_code, details=exc.details)
except Exception as exc:
logger.exception("妖火验证建号时发生未预期错误")
_record_audit_events(
_build_audit_event(
"account_provisioned",
status="failed",
actor=actor,
username=username,
details={"message": f"创建账号失败: {exc}"},
)
)
return _error(f"创建账号失败: {exc}", status=500)
_record_audit_events(
_build_audit_event(
"account_provisioned",
status="success",
actor=actor,
username=username,
principal_name=user_result.get("userPrincipalName"),
details={
"licenseAssigned": user_result.get("licenseAssigned"),
"licenseMessage": user_result.get("licenseMessage"),
"source": "yaohuo_verification",
},
)
)
return _success({
"userPrincipalName": user_result.get("userPrincipalName"),
"temporaryPassword": user_result.get("temporaryPassword"),
"licenseAssigned": user_result.get("licenseAssigned"),
"licenseMessage": user_result.get("licenseMessage"),
}, "账号开通成功!", status=201)
def _record_audit_events(*events: AuditEvent) -> None:
pending = [event for event in events if event is not None]
if not pending:
return
try:
db.session.add_all(pending)
db.session.commit()
except Exception:
db.session.rollback()
logger.exception("写入审计日志失败,共 %s 条事件。", len(pending))
def _pagination_params() -> tuple[int, int, int]:
settings = _settings()
try:
page = int(request.args.get("page", "1"))
except ValueError:
page = 1
try:
page_size = int(request.args.get("pageSize", str(settings.default_page_size)))
except ValueError:
page_size = settings.default_page_size
page = max(page, 1)
page_size = min(max(page_size, 1), settings.max_page_size)
return page, page_size, (page - 1) * page_size
def _pagination_payload(page: int, page_size: int, total: int) -> dict[str, int]:
pages = (total + page_size - 1) // page_size if total else 0
return {
"page": page,
"pageSize": page_size,
"total": total,
"pages": pages,
}
def _normalize_pagination(page: int, page_size: int, total: int) -> tuple[int, int, int]:
pages = (total + page_size - 1) // page_size if total else 0
if pages and page > pages:
page = pages
return page, (page - 1) * page_size, pages
def _reserve_code(code: str) -> bool:
result = db.session.execute(
update(RedemptionCode)
.where(_code_match(code), RedemptionCode.status == STATUS_AVAILABLE)
.values(status=STATUS_PROCESSING)
)
db.session.commit()
return result.rowcount == 1
def _release_code(code: str) -> None:
db.session.rollback()
db.session.execute(
update(RedemptionCode)
.where(_code_match(code), RedemptionCode.status == STATUS_PROCESSING)
.values(
status=STATUS_AVAILABLE,
used_at=None,
used_by_username=None,
used_by_principal_name=None,
)
)
db.session.commit()
def _complete_redemption(code: str, username: str, principal_name: str | None) -> bool:
result = db.session.execute(
update(RedemptionCode)
.where(_code_match(code), RedemptionCode.status == STATUS_PROCESSING)
.values(
status=STATUS_USED,
used_at=utc_now(),
used_by_username=username,
used_by_principal_name=principal_name,
)
)
db.session.commit()
return result.rowcount == 1
@bp_admin.get("/")
def admin_index():
if not _authenticated():
@@ -76,13 +314,7 @@ def admin_index():
@bp_admin.get("/api/health")
def health():
settings = _settings()
return _success(
{
"platform": settings.to_public_dict(),
"authenticated": _authenticated(),
}
)
return _success(_health_payload())
@bp_admin.get("/api/session")
@@ -110,6 +342,7 @@ def login():
if username == settings.admin_username and password == settings.admin_password:
session["authenticated"] = True
session.permanent = True
session["admin_username"] = username
return _success({"authenticated": True}, message="登录成功。")
return _error("用户名或密码错误。", status=401)
@@ -126,20 +359,54 @@ def config_info():
return _success(_settings().to_public_dict())
@bp_admin.get("/api/audit-events")
@require_auth
def list_audit_events():
event_type = request.args.get("eventType", "").strip()
status = request.args.get("status", "").strip()
page, page_size, offset = _pagination_params()
query = db.select(AuditEvent)
count_query = db.select(func.count()).select_from(AuditEvent)
if event_type:
query = query.where(AuditEvent.event_type == event_type)
count_query = count_query.where(AuditEvent.event_type == event_type)
if status:
query = query.where(AuditEvent.status == status)
count_query = count_query.where(AuditEvent.status == status)
total = db.session.execute(count_query).scalar_one()
page, offset, _ = _normalize_pagination(page, page_size, total)
events = db.session.execute(
query.order_by(AuditEvent.created_at.desc(), AuditEvent.id.desc()).offset(offset).limit(page_size)
).scalars().all()
return _success({
"events": [event.to_dict() for event in events],
**_pagination_payload(page, page_size, total),
})
@bp_admin.get("/api/codes")
@require_auth
def list_codes():
status = request.args.get("status")
page, page_size, offset = _pagination_params()
query = db.select(RedemptionCode)
count_query = db.select(func.count()).select_from(RedemptionCode)
if status == "available":
query = query.where(RedemptionCode.status == "available")
elif status == "used":
query = query.where(RedemptionCode.status == "used")
if status in {STATUS_AVAILABLE, STATUS_PROCESSING, STATUS_USED}:
query = query.where(RedemptionCode.status == status)
count_query = count_query.where(RedemptionCode.status == status)
result = db.session.execute(query.order_by(RedemptionCode.created_at.desc())).scalars().all()
total = db.session.execute(count_query).scalar_one()
page, offset, _ = _normalize_pagination(page, page_size, total)
result = db.session.execute(
query.order_by(RedemptionCode.created_at.desc()).offset(offset).limit(page_size)
).scalars().all()
codes = [code.to_dict() for code in result]
return _success({"codes": codes, "total": len(codes)})
return _success({"codes": codes, **_pagination_payload(page, page_size, total)})
@bp_admin.post("/api/codes/generate")
@@ -147,6 +414,7 @@ def list_codes():
def generate_codes():
payload = _json_payload()
count = payload.get("count", 1)
actor = _current_actor("auth-disabled-admin")
if count < 1:
count = 1
if count > 100:
@@ -163,40 +431,58 @@ def generate_codes():
codes.append(code)
db.session.commit()
_record_audit_events(
*[
_build_audit_event(
"code_generated",
actor=actor,
code=generated_code,
details={"batchCount": len(codes)},
)
for generated_code in codes
]
)
return _success({"codes": codes, "count": len(codes)}, f"成功生成 {count} 个兑换码。")
@bp_admin.delete("/api/codes/<code>")
@require_auth
def delete_code(code: str):
actor = _current_actor("auth-disabled-admin")
redemption_code = RedemptionCode.query.filter_by(code=code).first()
if not redemption_code:
return _error("兑换码不存在。", status=404)
if redemption_code.status != STATUS_AVAILABLE:
return _error("仅可删除未使用的兑换码。", status=409)
db.session.delete(redemption_code)
db.session.commit()
_record_audit_events(
_build_audit_event(
"code_deleted",
actor=actor,
code=code,
)
)
return _success(message="兑换码已删除。")
@bp_admin.get("/api/records")
@require_auth
def list_records():
page = int(request.args.get("page", "1"))
page_size = int(request.args.get("pageSize", "25"))
page, page_size, offset = _pagination_params()
query = db.select(RedemptionCode).where(RedemptionCode.status == "used")
result = db.session.execute(query.order_by(RedemptionCode.used_at.desc())).scalars().all()
total = len(result)
start = (page - 1) * page_size
end = start + page_size
records = result[start:end]
query = db.select(RedemptionCode).where(RedemptionCode.status == STATUS_USED)
count_query = db.select(func.count()).select_from(RedemptionCode).where(RedemptionCode.status == STATUS_USED)
total = db.session.execute(count_query).scalar_one()
page, offset, _ = _normalize_pagination(page, page_size, total)
records = db.session.execute(
query.order_by(RedemptionCode.used_at.desc()).offset(offset).limit(page_size)
).scalars().all()
return _success({
"records": [code.to_dict() for code in records],
"page": page,
"pageSize": page_size,
"total": total,
**_pagination_payload(page, page_size, total),
})
@@ -205,43 +491,213 @@ def index():
return render_template("user_redemption.html", settings=_settings())
@bp_user.get("/api/health")
def user_health():
return _success(_health_payload())
@bp_user.post("/api/redeem")
def redeem():
payload = _json_payload()
code = str(payload.get("code", "")).strip().upper()
username = str(payload.get("username", "")).strip().lower()
actor = _current_actor("public")
if not code:
return _error("请输入兑换码。", status=400)
if not username:
return _error("请输入用户名。", status=400)
redemption_code = RedemptionCode.query.filter(
func.lower(RedemptionCode.code) == code.lower(),
RedemptionCode.status == "available"
).first()
if not redemption_code:
if not _reserve_code(code):
_record_audit_events(
_build_audit_event(
"redeem_completed",
status="failed",
actor=actor,
code=code,
username=username,
details={"message": "兑换码无效或已被使用。"},
)
)
return _error("兑换码无效或已被使用。", status=404)
try:
user_result = _service().create_user(username=username)
except ServiceConfigurationError as exc:
_release_code(code)
_record_audit_events(
_build_audit_event(
"redeem_completed",
status="failed",
actor=actor,
code=code,
username=username,
details={"message": str(exc)},
)
)
return _error(str(exc), status=503)
except ServiceOperationError as exc:
return _error(str(exc), status=500)
_release_code(code)
_record_audit_events(
_build_audit_event(
"redeem_completed",
status="failed",
actor=actor,
code=code,
username=username,
principal_name=(exc.details or {}).get("userPrincipalName") if isinstance(exc.details, dict) else None,
details={"message": exc.message, "serviceDetails": exc.details},
)
)
return _error(exc.message, status=exc.status_code, details=exc.details)
except Exception as exc:
logger.exception("兑换码 %s 开通账号时发生未预期错误", code)
_release_code(code)
_record_audit_events(
_build_audit_event(
"redeem_completed",
status="failed",
actor=actor,
code=code,
username=username,
details={"message": f"创建账号失败: {exc}"},
)
)
return _error(f"创建账号失败: {exc}", status=500)
redemption_code.status = "used"
redemption_code.used_at = datetime.now(timezone.utc)
redemption_code.used_by_username = username
redemption_code.used_by_principal_name = user_result.get("userPrincipalName")
db.session.commit()
if not _complete_redemption(code, username, user_result.get("userPrincipalName")):
logger.error("账号 %s 已创建,但兑换码 %s 未能完成最终状态更新。", user_result.get("userPrincipalName"), code)
_record_audit_events(
_build_audit_event(
"redeem_completed",
status="warning",
actor=actor,
code=code,
username=username,
principal_name=user_result.get("userPrincipalName"),
details={"message": "账号已创建,但兑换码状态更新失败。"},
)
)
return _error(
"账号已创建,但兑换码状态更新失败,请联系管理员处理。",
status=500,
details={"userPrincipalName": user_result.get("userPrincipalName")},
)
_record_audit_events(
_build_audit_event(
"redeem_completed",
status="success",
actor=actor,
code=code,
username=username,
principal_name=user_result.get("userPrincipalName"),
details={
"licenseAssigned": user_result.get("licenseAssigned"),
"licenseMessage": user_result.get("licenseMessage"),
},
)
)
return _success({
"userPrincipalName": user_result.get("userPrincipalName"),
"temporaryPassword": user_result.get("temporaryPassword"),
"licenseAssigned": user_result.get("licenseAssigned"),
"licenseMessage": user_result.get("licenseMessage"),
}, "账号开通成功!", status=201)
@bp_user.post("/api/yaohuo/send-code")
def yaohuo_send_code():
payload = _json_payload()
target_user_id = str(payload.get("targetUserId", "")).strip()
actor = _current_actor("public")
try:
code = _yaohuo_service().generate_code()
expires_at = _yaohuo_service().expires_at().isoformat()
_yaohuo_service().send_verification_code(target_user_id, code)
_store_yaohuo_verification(target_user_id, code, expires_at)
except ServiceConfigurationError as exc:
return _error(str(exc), status=503)
except ServiceOperationError as exc:
_record_audit_events(
_build_audit_event(
"yaohuo_verification_requested",
status="failed",
actor=actor,
details={"targetUserId": target_user_id, "message": exc.message},
)
)
return _error(exc.message, status=exc.status_code, details=exc.details)
except ValueError as exc:
return _error(str(exc), status=400)
_record_audit_events(
_build_audit_event(
"yaohuo_verification_requested",
actor=actor,
details={"targetUserId": target_user_id},
)
)
return _success({"expiresAt": expires_at}, "验证码已发送到指定妖火 ID 的私信。")
@bp_user.post("/api/yaohuo/verify")
def yaohuo_verify():
payload = _json_payload()
submitted_code = str(payload.get("code", "")).strip()
state = _session_verification_state()
actor = _current_actor("public")
if not state:
return _error("请先发送验证码。", status=400)
if _verification_expired(state.get("expiresAt")):
_clear_yaohuo_verification()
return _error("验证码已过期,请重新发送。", status=410)
if not submitted_code:
return _error("请输入验证码。", status=400)
if submitted_code != str(state.get("code", "")):
_record_audit_events(
_build_audit_event(
"yaohuo_verified",
status="failed",
actor=actor,
details={"targetUserId": state.get("targetUserId"), "message": "验证码错误。"},
)
)
return _error("验证码错误。", status=400)
_mark_yaohuo_verified()
_record_audit_events(
_build_audit_event(
"yaohuo_verified",
actor=actor,
details={"targetUserId": state.get("targetUserId")},
)
)
return _success({"verified": True}, "妖火论坛验证成功。")
@bp_user.post("/api/yaohuo/provision")
def yaohuo_provision():
payload = _json_payload()
username = str(payload.get("username", "")).strip().lower()
state = _session_verification_state()
if not username:
return _error("请输入用户名。", status=400)
if not state or not state.get("verified"):
return _error("请先完成妖火论坛验证。", status=403)
if _verification_expired(state.get("expiresAt")):
_clear_yaohuo_verification()
return _error("验证状态已过期,请重新验证。", status=410)
result = _provision_account(username)
if result[1] < 400:
_clear_yaohuo_verification()
return result
@bp_user.get("/api/config")
def config():
return _success(_settings().to_public_dict())
return _success(_settings().to_public_dict())

View File

@@ -1,8 +1,13 @@
from __future__ import annotations
import logging
import random
import re
from datetime import datetime, timedelta, timezone
from typing import Any
import requests
from .graph import GraphAPIError, GraphClient, TokenManager
from .settings import Settings
@@ -43,16 +48,16 @@ class Office365Service:
return self._graph_client
def create_user(self, username: str, password: str | None = None, display_name: str | None = None, retry: bool = True) -> dict[str, Any]:
upn, mail_nickname = self._build_user_identifiers(username)
client = self._ensure_client()
upn = f"{username}@{self.settings.default_domain}"
password = password or self.settings.default_password
display_name = display_name or username
display_name = display_name or mail_nickname
create_payload = {
"accountEnabled": True,
"displayName": display_name,
"mailNickname": username,
"mailNickname": mail_nickname,
"userPrincipalName": upn,
"passwordProfile": {
"password": password,
@@ -71,8 +76,17 @@ class Office365Service:
raise self._translate_graph_error(exc, f"创建用户 {upn} 失败")
license_result = None
license_message = None
if self.settings.default_license_sku:
license_result = self._assign_license(user["id"])
license_result, license_message, license_status = self._assign_license(user["id"])
if license_message and self.settings.license_assignment_required:
self._rollback_user_for_license_failure(
client=client,
user_id=user["id"],
user_principal_name=upn,
license_message=license_message,
license_status=license_status,
)
return {
"user": user,
@@ -80,37 +94,98 @@ class Office365Service:
"temporaryPassword": password,
"licenseAssigned": bool(license_result),
"licenseResult": license_result,
"licenseMessage": license_message,
}
def _assign_license(self, user_id: str) -> dict[str, Any]:
def _build_user_identifiers(self, username: str) -> tuple[str, str]:
normalized = (username or "").strip().lower()
if not normalized:
raise ValueError("请输入用户名。")
if "@" in normalized:
local_part, _, domain = normalized.partition("@")
if not local_part or not domain:
raise ValueError("请输入有效的完整邮箱地址。")
return normalized, local_part
if not self.settings.default_domain:
raise ServiceConfigurationError("DEFAULT_DOMAIN 未配置,请输入完整邮箱地址后重试。")
return f"{normalized}@{self.settings.default_domain}", normalized
def _assign_license(self, user_id: str) -> tuple[dict[str, Any] | None, str | None, int]:
client = self._ensure_client()
sku_part_number = self.settings.default_license_sku
try:
skus = client.list_subscribed_skus()
except GraphAPIError as exc:
logger.warning("获取许可证列表失败: %s", exc)
return None
message = f"获取许可证列表失败: {exc.message or exc}"
logger.warning(message)
return None, message, exc.status_code or 502
matched = next(
(sku for sku in skus if (sku.get("skuPartNumber") or "").upper() == sku_part_number.upper()),
None,
)
if not matched:
logger.warning("未找到许可证 SKU: %s", sku_part_number)
return None
message = f"未找到许可证 SKU: {sku_part_number}"
logger.warning(message)
return None, message, 409
if int(matched.get("consumedUnits", 0) or 0) >= int(matched.get("prepaidUnits", {}).get("enabled", 0) or 0):
logger.warning("许可证 %s 已无可用席位", sku_part_number)
return None
message = f"许可证 {sku_part_number} 已无可用席位"
logger.warning(message)
return None, message, 409
try:
return client.assign_license(
user_id,
add_licenses=[{"skuId": matched["skuId"], "disabledPlans": []}],
return (
client.assign_license(
user_id,
add_licenses=[{"skuId": matched["skuId"], "disabledPlans": []}],
),
None,
200,
)
except GraphAPIError as exc:
logger.warning("分配许可证失败: %s", exc)
return None
message = f"分配许可证失败: {exc.message or exc}"
logger.warning(message)
return None, message, exc.status_code or 502
def _rollback_user_for_license_failure(
self,
client: GraphClient,
user_id: str,
user_principal_name: str,
license_message: str,
license_status: int,
) -> None:
try:
client.delete_user(user_id)
except GraphAPIError as exc:
delete_message = exc.message or str(exc)
raise ServiceOperationError(
message=(
f"账号 {user_principal_name} 已创建,但许可证分配失败且回滚删除失败。"
f"{license_message};删除失败: {delete_message}"
),
status_code=502,
details={
"userPrincipalName": user_principal_name,
"licenseError": license_message,
"rollbackDeleteError": delete_message,
"rolledBack": False,
},
) from exc
raise ServiceOperationError(
message=f"许可证分配失败,已回滚删除账号 {user_principal_name}{license_message}",
status_code=license_status or 409,
details={
"userPrincipalName": user_principal_name,
"licenseError": license_message,
"rolledBack": True,
},
)
def _translate_graph_error(self, exc: GraphAPIError, fallback_message: str) -> ServiceOperationError:
message = fallback_message
@@ -120,4 +195,73 @@ class Office365Service:
lowered = message.lower()
if "already exists" in lowered or "another object with the same value" in lowered:
status_code = 409
return ServiceOperationError(message=message, status_code=status_code, details=exc.response)
return ServiceOperationError(message=message, status_code=status_code, details=exc.response)
class YaohuoVerificationService:
def __init__(self, settings: Settings):
self.settings = settings
def verification_ready(self) -> bool:
return self.settings.yaohuo_verification_enabled and bool(self.settings.yaohuo_cookie)
def ensure_ready(self) -> None:
if not self.settings.yaohuo_verification_enabled:
raise ServiceConfigurationError("妖火论坛验证功能未启用。")
if not self.settings.yaohuo_cookie:
raise ServiceConfigurationError("YAOHUO_COOKIE 未配置,无法发送妖火私信验证码。")
def generate_code(self) -> str:
return f"{random.randint(0, 999999):06d}"
def expires_at(self) -> datetime:
return datetime.now(timezone.utc) + timedelta(seconds=self.settings.yaohuo_verification_code_ttl_seconds)
def send_verification_code(self, target_user_id: str, code: str) -> None:
self.ensure_ready()
normalized_user_id = self._normalize_target_user_id(target_user_id)
content = f"【Office 365 自助开通验证】您的验证码是:{code}{self.settings.yaohuo_verification_code_ttl_seconds // 60} 分钟内有效。"
payload = {
"touseridlist": normalized_user_id,
"content": content,
"action": "gomod",
"classid": "0",
"siteid": "1000",
"types": "",
"issystem": "",
"g": "发送消息",
}
try:
response = requests.post(
self.settings.yaohuo_message_url,
data=payload,
headers={
"Cookie": self.settings.yaohuo_cookie,
"Referer": self.settings.yaohuo_message_url,
"User-Agent": "Mozilla/5.0",
},
timeout=30,
)
except requests.RequestException as exc:
raise ServiceOperationError(f"发送妖火验证码失败: {exc}", status_code=502) from exc
if response.status_code >= 400:
raise ServiceOperationError(
f"发送妖火验证码失败,状态码 {response.status_code}",
status_code=502,
)
body = response.text
if "发短信息" in body and "发送成功" not in body and "返回上级" in body:
logger.warning("妖火私信发送结果无法明确判断成功,按成功处理。")
return
if any(keyword in body for keyword in ("成功", "发送成功", "发送完毕")):
return
def _normalize_target_user_id(self, target_user_id: str) -> str:
normalized = re.sub(r"\s+", "", str(target_user_id or ""))
if not normalized or not normalized.isdigit():
raise ValueError("请输入有效的妖火 ID。")
return normalized

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
from dotenv import load_dotenv
@@ -22,6 +23,24 @@ def _env_int(name: str, default: int) -> int:
return default
def _normalize_database_url(database_url: str, warnings: list[str]) -> str:
normalized = database_url.strip()
if not normalized:
return "sqlite:///redemption.db"
container_prefix = "sqlite:////app/"
if normalized.startswith(container_prefix) and not Path("/.dockerenv").exists():
local_relative = normalized.removeprefix(container_prefix)
project_root = Path(__file__).resolve().parent.parent
local_path = (project_root / local_relative).resolve()
warnings.append(
f"DATABASE_URL 使用容器路径时,已自动映射到本地路径 {local_path}"
)
return f"sqlite:///{local_path}"
return normalized
@dataclass
class Settings:
app_name: str
@@ -39,11 +58,16 @@ class Settings:
default_domain: str
default_usage_location: str
default_license_sku: str
license_assignment_required: bool
force_change_password: bool
graph_base_url: str
token_endpoint: str
scope: str
database_url: str
yaohuo_cookie: str
yaohuo_message_url: str
yaohuo_verification_enabled: bool
yaohuo_verification_code_ttl_seconds: int
default_page_size: int = 25
max_page_size: int = 100
validation_errors: tuple[str, ...] = field(default_factory=tuple)
@@ -68,9 +92,11 @@ class Settings:
"defaultDomain": self.default_domain,
"defaultUsageLocation": self.default_usage_location,
"defaultLicenseSku": self.default_license_sku,
"licenseAssignmentRequired": self.license_assignment_required,
"forceChangePassword": self.force_change_password,
"pageSize": self.default_page_size,
"maxPageSize": self.max_page_size,
"yaohuoVerificationEnabled": self.yaohuo_verification_enabled,
}
@@ -84,6 +110,7 @@ def load_settings() -> Settings:
validation_errors: list[str] = []
warnings: list[str] = []
database_url = _normalize_database_url(os.getenv("DATABASE_URL", "sqlite:///redemption.db"), warnings)
required_fields = {
"CLIENT_ID": os.getenv("CLIENT_ID", "").strip(),
@@ -121,13 +148,19 @@ def load_settings() -> Settings:
default_domain=os.getenv("DEFAULT_DOMAIN", "").strip(),
default_usage_location=os.getenv("DEFAULT_USAGE_LOCATION", "US").strip() or "US",
default_license_sku=os.getenv("DEFAULT_LICENSE_SKU", "").strip(),
license_assignment_required=_env_bool("LICENSE_ASSIGNMENT_REQUIRED", False),
force_change_password=_env_bool("FORCE_CHANGE_PASSWORD", True),
graph_base_url=graph_base_url,
token_endpoint=token_endpoint,
scope=scope,
database_url=os.getenv("DATABASE_URL", "sqlite:///redemption.db").strip(),
database_url=database_url,
yaohuo_cookie=os.getenv("YAOHUO_COOKIE", "").strip(),
yaohuo_message_url=os.getenv("YAOHUO_MESSAGE_URL", "https://www.yaohuo.me/bbs/messagelist_add.aspx").strip()
or "https://www.yaohuo.me/bbs/messagelist_add.aspx",
yaohuo_verification_enabled=_env_bool("YAOHUO_VERIFICATION_ENABLED", False),
yaohuo_verification_code_ttl_seconds=min(max(_env_int("YAOHUO_VERIFICATION_CODE_TTL_SECONDS", 600), 60), 3600),
default_page_size=min(max(_env_int("DEFAULT_PAGE_SIZE", 25), 1), 100),
max_page_size=min(max(_env_int("MAX_PAGE_SIZE", 100), 10), 500),
validation_errors=tuple(validation_errors),
warnings=tuple(warnings),
)
)

View File

@@ -21,6 +21,7 @@
<nav class="nav flex-column">
<a class="nav-link active" href="#" data-tab="codes">兑换码管理</a>
<a class="nav-link" href="#" data-tab="records">兑换记录</a>
<a class="nav-link" href="#" data-tab="audit">审计日志</a>
<a class="nav-link" href="#" id="logoutBtn">退出登录</a>
</nav>
<div class="px-3 mt-3">
@@ -42,6 +43,7 @@
<div class="mb-3">
<button class="btn btn-sm btn-outline-secondary filter-btn" data-filter="all">全部</button>
<button class="btn btn-sm btn-outline-secondary filter-btn" data-filter="available">可用</button>
<button class="btn btn-sm btn-outline-secondary filter-btn" data-filter="processing">处理中</button>
<button class="btn btn-sm btn-outline-secondary filter-btn" data-filter="used">已使用</button>
</div>
<div class="table-responsive">
@@ -59,6 +61,18 @@
<tbody></tbody>
</table>
</div>
<div class="d-flex flex-column flex-md-row justify-content-between align-items-md-center gap-2">
<div class="d-flex align-items-center gap-2">
<label class="form-label mb-0" for="codesPageSize">每页</label>
<select class="form-select form-select-sm w-auto" id="codesPageSize"></select>
<small class="text-muted" id="codesSummary">共 0 条</small>
</div>
<div class="btn-group">
<button class="btn btn-sm btn-outline-secondary" id="codesPrevBtn">上一页</button>
<button class="btn btn-sm btn-outline-secondary disabled" id="codesPageIndicator">第 1 / 1 页</button>
<button class="btn btn-sm btn-outline-secondary" id="codesNextBtn">下一页</button>
</div>
</div>
</div>
</div>
</div>
@@ -80,6 +94,54 @@
<tbody></tbody>
</table>
</div>
<div class="d-flex flex-column flex-md-row justify-content-between align-items-md-center gap-2">
<div class="d-flex align-items-center gap-2">
<label class="form-label mb-0" for="recordsPageSize">每页</label>
<select class="form-select form-select-sm w-auto" id="recordsPageSize"></select>
<small class="text-muted" id="recordsSummary">共 0 条</small>
</div>
<div class="btn-group">
<button class="btn btn-sm btn-outline-secondary" id="recordsPrevBtn">上一页</button>
<button class="btn btn-sm btn-outline-secondary disabled" id="recordsPageIndicator">第 1 / 1 页</button>
<button class="btn btn-sm btn-outline-secondary" id="recordsNextBtn">下一页</button>
</div>
</div>
</div>
</div>
</div>
<div class="tab-content d-none" id="auditTab">
<h4 class="mb-3">审计日志</h4>
<div class="card">
<div class="card-body">
<div class="table-responsive">
<table class="table table-hover" id="auditTable">
<thead>
<tr>
<th>时间</th>
<th>事件</th>
<th>状态</th>
<th>操作人</th>
<th>兑换码</th>
<th>账号</th>
<th>详情</th>
</tr>
</thead>
<tbody></tbody>
</table>
</div>
<div class="d-flex flex-column flex-md-row justify-content-between align-items-md-center gap-2">
<div class="d-flex align-items-center gap-2">
<label class="form-label mb-0" for="auditPageSize">每页</label>
<select class="form-select form-select-sm w-auto" id="auditPageSize"></select>
<small class="text-muted" id="auditSummary">共 0 条</small>
</div>
<div class="btn-group">
<button class="btn btn-sm btn-outline-secondary" id="auditPrevBtn">上一页</button>
<button class="btn btn-sm btn-outline-secondary disabled" id="auditPageIndicator">第 1 / 1 页</button>
<button class="btn btn-sm btn-outline-secondary" id="auditNextBtn">下一页</button>
</div>
</div>
</div>
</div>
</div>
@@ -114,42 +176,174 @@
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/js/bootstrap.bundle.min.js"></script>
<script>
const defaultPageSize = {{ settings.default_page_size }};
const maxPageSize = {{ settings.max_page_size }};
let currentFilter = 'all';
const paginationState = {
codes: { page: 1, pageSize: defaultPageSize, total: 0, pages: 0 },
records: { page: 1, pageSize: defaultPageSize, total: 0, pages: 0 },
audit: { page: 1, pageSize: defaultPageSize, total: 0, pages: 0 }
};
async function loadCodes() {
const url = currentFilter === 'all' ? '/admin/api/codes' : `/admin/api/codes?status=${currentFilter}`;
function renderStatusBadge(status) {
if (status === 'available') {
return '<span class="badge bg-success">可用</span>';
}
if (status === 'processing') {
return '<span class="badge bg-warning text-dark">处理中</span>';
}
return '<span class="badge bg-secondary">已使用</span>';
}
function renderAuditStatus(status) {
if (status === 'success') {
return '<span class="badge bg-success">成功</span>';
}
if (status === 'warning') {
return '<span class="badge bg-warning text-dark">警告</span>';
}
return '<span class="badge bg-danger">失败</span>';
}
function escapeHtml(value) {
return String(value ?? '')
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;')
.replace(/'/g, '&#39;');
}
function renderAuditDetails(details) {
if (!details) return '-';
if (details.message) return escapeHtml(details.message);
return escapeHtml(JSON.stringify(details));
}
function pageSizeOptions() {
return [...new Set([10, 25, 50, 100, defaultPageSize])]
.filter(size => size <= maxPageSize)
.sort((a, b) => a - b);
}
function initPageSizeSelect(id, value, onChange) {
const select = document.getElementById(id);
select.innerHTML = pageSizeOptions().map(size => `
<option value="${size}" ${size === value ? 'selected' : ''}>${size}</option>
`).join('');
select.addEventListener('change', () => onChange(parseInt(select.value, 10) || defaultPageSize));
}
function updatePager(prefix, state) {
const pageCount = state.pages || 1;
document.getElementById(`${prefix}Summary`).textContent = `${state.total}`;
document.getElementById(`${prefix}PageIndicator`).textContent = `${state.page} / ${pageCount}`;
document.getElementById(`${prefix}PrevBtn`).disabled = state.page <= 1;
document.getElementById(`${prefix}NextBtn`).disabled = state.total === 0 || state.page >= pageCount;
}
function renderEmptyRow(tableBodySelector, colspan, message) {
document.querySelector(tableBodySelector).innerHTML = `
<tr><td colspan="${colspan}" class="text-center text-muted py-4">${message}</td></tr>
`;
}
async function loadCodes(page = paginationState.codes.page) {
const params = new URLSearchParams({
page: String(page),
pageSize: String(paginationState.codes.pageSize)
});
if (currentFilter !== 'all') {
params.set('status', currentFilter);
}
const url = `/admin/api/codes?${params.toString()}`;
const response = await fetch(url, { credentials: 'same-origin' });
const data = await response.json();
if (data.success) {
paginationState.codes.page = data.data.page;
paginationState.codes.pageSize = data.data.pageSize;
paginationState.codes.total = data.data.total;
paginationState.codes.pages = data.data.pages;
const tbody = document.querySelector('#codesTable tbody');
tbody.innerHTML = data.data.codes.map(code => `
<tr>
<td><code>${code.code}</code></td>
<td><span class="badge ${code.status === 'available' ? 'bg-success' : 'bg-secondary'}">${code.status === 'available' ? '可用' : '已使用'}</span></td>
<td>${code.createdAt ? new Date(code.createdAt).toLocaleString() : '-'}</td>
<td>${code.usedAt ? new Date(code.usedAt).toLocaleString() : '-'}</td>
<td>${code.usedByUsername || '-'}</td>
<td class="table-actions">
${code.status === 'available' ? `<button class="btn btn-danger btn-sm" onclick="deleteCode('${code.code}')">删除</button>` : ''}
</td>
</tr>
`).join('');
if (!data.data.codes.length) {
renderEmptyRow('#codesTable tbody', 6, '当前筛选条件下暂无兑换码');
} else {
tbody.innerHTML = data.data.codes.map(code => `
<tr>
<td><code>${escapeHtml(code.code)}</code></td>
<td>${renderStatusBadge(code.status)}</td>
<td>${code.createdAt ? new Date(code.createdAt).toLocaleString() : '-'}</td>
<td>${code.usedAt ? new Date(code.usedAt).toLocaleString() : '-'}</td>
<td>${escapeHtml(code.usedByUsername || '-')}</td>
<td class="table-actions">
${code.status === 'available' ? `<button class="btn btn-danger btn-sm" onclick="deleteCode('${code.code}')">删除</button>` : ''}
</td>
</tr>
`).join('');
}
updatePager('codes', paginationState.codes);
}
}
async function loadRecords() {
const response = await fetch('/admin/api/records', { credentials: 'same-origin' });
async function loadRecords(page = paginationState.records.page) {
const params = new URLSearchParams({
page: String(page),
pageSize: String(paginationState.records.pageSize)
});
const response = await fetch(`/admin/api/records?${params.toString()}`, { credentials: 'same-origin' });
const data = await response.json();
if (data.success) {
paginationState.records.page = data.data.page;
paginationState.records.pageSize = data.data.pageSize;
paginationState.records.total = data.data.total;
paginationState.records.pages = data.data.pages;
const tbody = document.querySelector('#recordsTable tbody');
tbody.innerHTML = data.data.records.map(code => `
<tr>
<td><code>${code.code}</code></td>
<td>${code.usedByUsername || '-'}</td>
<td>${code.usedByPrincipalName || '-'}</td>
<td>${code.usedAt ? new Date(code.usedAt).toLocaleString() : '-'}</td>
</tr>
`).join('');
if (!data.data.records.length) {
renderEmptyRow('#recordsTable tbody', 4, '暂无兑换记录');
} else {
tbody.innerHTML = data.data.records.map(code => `
<tr>
<td><code>${escapeHtml(code.code)}</code></td>
<td>${escapeHtml(code.usedByUsername || '-')}</td>
<td>${escapeHtml(code.usedByPrincipalName || '-')}</td>
<td>${code.usedAt ? new Date(code.usedAt).toLocaleString() : '-'}</td>
</tr>
`).join('');
}
updatePager('records', paginationState.records);
}
}
async function loadAudit(page = paginationState.audit.page) {
const params = new URLSearchParams({
page: String(page),
pageSize: String(paginationState.audit.pageSize)
});
const response = await fetch(`/admin/api/audit-events?${params.toString()}`, { credentials: 'same-origin' });
const data = await response.json();
if (data.success) {
paginationState.audit.page = data.data.page;
paginationState.audit.pageSize = data.data.pageSize;
paginationState.audit.total = data.data.total;
paginationState.audit.pages = data.data.pages;
const tbody = document.querySelector('#auditTable tbody');
if (!data.data.events.length) {
renderEmptyRow('#auditTable tbody', 7, '暂无审计日志');
} else {
tbody.innerHTML = data.data.events.map(event => `
<tr>
<td>${event.createdAt ? new Date(event.createdAt).toLocaleString() : '-'}</td>
<td>${escapeHtml(event.eventType)}</td>
<td>${renderAuditStatus(event.status)}</td>
<td>${escapeHtml(event.actor || '-')}</td>
<td>${event.code ? `<code>${escapeHtml(event.code)}</code>` : '-'}</td>
<td>${escapeHtml(event.principalName || event.username || '-')}</td>
<td>${renderAuditDetails(event.details)}</td>
</tr>
`).join('');
}
updatePager('audit', paginationState.audit);
}
}
@@ -158,7 +352,7 @@
const response = await fetch(`/admin/api/codes/${code}`, { method: 'DELETE', credentials: 'same-origin' });
const data = await response.json();
if (data.success) {
loadCodes();
loadCodes(paginationState.codes.page);
} else {
alert(data.message);
}
@@ -169,7 +363,8 @@
document.querySelectorAll('.filter-btn').forEach(b => b.classList.remove('btn-secondary', 'active'));
btn.classList.add('btn-secondary', 'active');
currentFilter = btn.dataset.filter;
loadCodes();
paginationState.codes.page = 1;
loadCodes(1);
});
});
@@ -182,6 +377,7 @@
document.getElementById(link.dataset.tab + 'Tab').classList.remove('d-none');
if (link.dataset.tab === 'codes') loadCodes();
if (link.dataset.tab === 'records') loadRecords();
if (link.dataset.tab === 'audit') loadAudit();
});
});
@@ -198,7 +394,8 @@
const textarea = document.querySelector('#generatedCodes textarea');
textarea.value = data.data.codes.join('\n');
document.getElementById('generatedCodes').classList.remove('d-none');
loadCodes();
paginationState.codes.page = 1;
loadCodes(1);
}
});
@@ -207,7 +404,29 @@
window.location.href = '/admin/';
});
initPageSizeSelect('codesPageSize', paginationState.codes.pageSize, (value) => {
paginationState.codes.pageSize = value;
paginationState.codes.page = 1;
loadCodes(1);
});
initPageSizeSelect('recordsPageSize', paginationState.records.pageSize, (value) => {
paginationState.records.pageSize = value;
paginationState.records.page = 1;
loadRecords(1);
});
initPageSizeSelect('auditPageSize', paginationState.audit.pageSize, (value) => {
paginationState.audit.pageSize = value;
paginationState.audit.page = 1;
loadAudit(1);
});
document.getElementById('codesPrevBtn').addEventListener('click', () => loadCodes(paginationState.codes.page - 1));
document.getElementById('codesNextBtn').addEventListener('click', () => loadCodes(paginationState.codes.page + 1));
document.getElementById('recordsPrevBtn').addEventListener('click', () => loadRecords(paginationState.records.page - 1));
document.getElementById('recordsNextBtn').addEventListener('click', () => loadRecords(paginationState.records.page + 1));
document.getElementById('auditPrevBtn').addEventListener('click', () => loadAudit(paginationState.audit.page - 1));
document.getElementById('auditNextBtn').addEventListener('click', () => loadAudit(paginationState.audit.page + 1));
loadCodes();
</script>
</body>
</html>
</html>

View File

@@ -27,6 +27,15 @@
</form>
</div>
<script>
function escapeHtml(value) {
return String(value ?? '')
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;')
.replace(/'/g, '&#39;');
}
document.getElementById('loginForm').addEventListener('submit', async (e) => {
e.preventDefault();
const formData = new FormData(e.target);
@@ -40,9 +49,9 @@
if (data.success) {
window.location.href = '/admin/';
} else {
document.getElementById('message').innerHTML = `<div class="alert alert-danger">${data.message}</div>`;
document.getElementById('message').innerHTML = `<div class="alert alert-danger">${escapeHtml(data.message)}</div>`;
}
});
</script>
</body>
</html>
</html>

View File

@@ -7,17 +7,23 @@
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
<style>
body { background-color: #f5f5f5; min-height: 100vh; display: flex; align-items: center; justify-content: center; padding: 2rem 0; }
.redemption-card { max-width: 500px; width: 100%; padding: 2rem; background: white; border-radius: 8px; box-shadow: 0 2px 8px rgba(0,0,0,0.1); }
.redemption-card { max-width: 640px; width: 100%; padding: 2rem; background: white; border-radius: 8px; box-shadow: 0 2px 8px rgba(0,0,0,0.1); }
.result-box { background: #f8f9fa; border: 1px solid #dee2e6; border-radius: 4px; padding: 1rem; }
.mode-switch .btn { flex: 1; }
</style>
</head>
<body>
<div class="redemption-card">
<h3 class="text-center mb-4">{{ settings.app_name }}</h3>
<p class="text-center text-muted mb-4">兑换码开通 Office 365 账号</p>
<p class="text-center text-muted mb-4">支持兑换码开通,或通过妖火论坛验证后免兑换码开通</p>
<div id="message"></div>
<div class="btn-group w-100 mode-switch mb-4" role="group">
<button type="button" class="btn btn-primary" id="redeemModeBtn">兑换码开通</button>
<button type="button" class="btn btn-outline-primary" id="yaohuoModeBtn">妖火验证开通</button>
</div>
<div id="redeemForm">
<div class="mb-3">
<label class="form-label">兑换码</label>
@@ -25,15 +31,51 @@
</div>
<div class="mb-3">
<label class="form-label">用户名</label>
{% if settings.default_domain %}
<div class="input-group">
<input type="text" class="form-control" id="usernameInput" placeholder="请输入用户名" required>
<span class="input-group-text">@{{ settings.default_domain }}</span>
</div>
<div class="form-text">请输入您想要的用户名,将自动拼接域名为完整邮箱地址</div>
{% else %}
<input type="text" class="form-control" id="usernameInput" placeholder="请输入完整邮箱地址,例如 alice@example.com" required>
<div class="form-text">当前未配置默认域名,请直接输入完整邮箱地址。</div>
{% endif %}
</div>
<button type="submit" class="btn btn-primary w-100" id="redeemBtn">立即开通</button>
</div>
<div id="yaohuoForm" class="d-none">
<div class="mb-3">
<label class="form-label">妖火 ID</label>
<input type="text" class="form-control" id="yaohuoIdInput" placeholder="请输入对方妖火 ID">
<div class="form-text">系统会通过后台已登录的妖火账号,向该 ID 发送私信验证码。</div>
</div>
<div class="d-grid gap-2 mb-3">
<button type="button" class="btn btn-outline-primary" id="sendYaohuoCodeBtn">发送验证码</button>
</div>
<div class="mb-3">
<label class="form-label">验证码</label>
<input type="text" class="form-control" id="yaohuoCodeInput" placeholder="请输入收到的验证码">
</div>
<div class="d-grid gap-2 mb-3">
<button type="button" class="btn btn-outline-success" id="verifyYaohuoBtn">验证妖火身份</button>
</div>
<div class="mb-3">
<label class="form-label">用户名</label>
{% if settings.default_domain %}
<div class="input-group">
<input type="text" class="form-control" id="yaohuoUsernameInput" placeholder="请输入用户名">
<span class="input-group-text">@{{ settings.default_domain }}</span>
</div>
{% else %}
<input type="text" class="form-control" id="yaohuoUsernameInput" placeholder="请输入完整邮箱地址,例如 alice@example.com">
{% endif %}
<div class="form-text">完成妖火验证后,无需兑换码即可开通账号。</div>
</div>
<button type="button" class="btn btn-success w-100" id="yaohuoProvisionBtn">验证后免码开通</button>
</div>
<div id="successResult" class="d-none">
<div class="text-center mb-4">
<div class="text-success mb-3">
@@ -50,17 +92,68 @@
<div class="alert alert-info">
<strong>提示:</strong>首次登录后系统会要求您更改密码,请使用临时密码登录。
</div>
<div class="alert alert-warning d-none" id="licenseWarning"></div>
<button class="btn btn-outline-secondary w-100" onclick="location.reload()">开通另一个账号</button>
</div>
</div>
<script>
function escapeHtml(value) {
return String(value ?? '')
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;')
.replace(/'/g, '&#39;');
}
function showMessage(html) {
document.getElementById('message').innerHTML = html;
}
function switchMode(mode) {
const redeemForm = document.getElementById('redeemForm');
const yaohuoForm = document.getElementById('yaohuoForm');
const redeemModeBtn = document.getElementById('redeemModeBtn');
const yaohuoModeBtn = document.getElementById('yaohuoModeBtn');
showMessage('');
if (mode === 'yaohuo') {
redeemForm.classList.add('d-none');
yaohuoForm.classList.remove('d-none');
redeemModeBtn.className = 'btn btn-outline-primary';
yaohuoModeBtn.className = 'btn btn-primary';
return;
}
redeemForm.classList.remove('d-none');
yaohuoForm.classList.add('d-none');
redeemModeBtn.className = 'btn btn-primary';
yaohuoModeBtn.className = 'btn btn-outline-primary';
}
async function showProvisionSuccess(data) {
document.getElementById('redeemForm').classList.add('d-none');
document.getElementById('yaohuoForm').classList.add('d-none');
document.getElementById('successResult').classList.remove('d-none');
document.getElementById('resultEmail').textContent = data.userPrincipalName;
document.getElementById('resultPassword').textContent = data.temporaryPassword;
const licenseWarning = document.getElementById('licenseWarning');
if (data.licenseAssigned === false && data.licenseMessage) {
licenseWarning.textContent = data.licenseMessage;
licenseWarning.classList.remove('d-none');
} else {
licenseWarning.classList.add('d-none');
licenseWarning.textContent = '';
}
}
document.getElementById('redeemBtn').addEventListener('click', async () => {
const code = document.getElementById('codeInput').value.trim();
const username = document.getElementById('usernameInput').value.trim();
if (!code || !username) {
document.getElementById('message').innerHTML = '<div class="alert alert-danger">请填写完整的兑换码和用户名</div>';
showMessage('<div class="alert alert-danger">请填写完整的兑换码和用户名</div>');
return;
}
@@ -77,21 +170,102 @@
const data = await response.json();
if (data.success) {
document.getElementById('redeemForm').classList.add('d-none');
document.getElementById('successResult').classList.remove('d-none');
document.getElementById('resultEmail').textContent = data.data.userPrincipalName;
document.getElementById('resultPassword').textContent = data.data.temporaryPassword;
showProvisionSuccess(data.data);
} else {
document.getElementById('message').innerHTML = `<div class="alert alert-danger">${data.message}</div>`;
showMessage(`<div class="alert alert-danger">${escapeHtml(data.message)}</div>`);
btn.disabled = false;
btn.textContent = '立即开通';
}
} catch (e) {
document.getElementById('message').innerHTML = '<div class="alert alert-danger">网络错误,请稍后重试</div>';
showMessage('<div class="alert alert-danger">网络错误,请稍后重试</div>');
btn.disabled = false;
btn.textContent = '立即开通';
}
});
document.getElementById('redeemModeBtn').addEventListener('click', () => switchMode('redeem'));
document.getElementById('yaohuoModeBtn').addEventListener('click', () => switchMode('yaohuo'));
document.getElementById('sendYaohuoCodeBtn').addEventListener('click', async () => {
const targetUserId = document.getElementById('yaohuoIdInput').value.trim();
if (!targetUserId) {
showMessage('<div class="alert alert-danger">请输入妖火 ID</div>');
return;
}
const btn = document.getElementById('sendYaohuoCodeBtn');
btn.disabled = true;
btn.textContent = '发送中...';
try {
const response = await fetch('/api/yaohuo/send-code', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({ targetUserId })
});
const data = await response.json();
if (data.success) {
showMessage(`<div class="alert alert-success">${escapeHtml(data.message)}</div>`);
} else {
showMessage(`<div class="alert alert-danger">${escapeHtml(data.message)}</div>`);
}
} catch (e) {
showMessage('<div class="alert alert-danger">网络错误,请稍后重试</div>');
} finally {
btn.disabled = false;
btn.textContent = '发送验证码';
}
});
document.getElementById('verifyYaohuoBtn').addEventListener('click', async () => {
const code = document.getElementById('yaohuoCodeInput').value.trim();
if (!code) {
showMessage('<div class="alert alert-danger">请输入验证码</div>');
return;
}
const response = await fetch('/api/yaohuo/verify', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({ code })
});
const data = await response.json();
if (data.success) {
showMessage(`<div class="alert alert-success">${escapeHtml(data.message)}</div>`);
} else {
showMessage(`<div class="alert alert-danger">${escapeHtml(data.message)}</div>`);
}
});
document.getElementById('yaohuoProvisionBtn').addEventListener('click', async () => {
const username = document.getElementById('yaohuoUsernameInput').value.trim();
if (!username) {
showMessage('<div class="alert alert-danger">请输入用户名</div>');
return;
}
const btn = document.getElementById('yaohuoProvisionBtn');
btn.disabled = true;
btn.textContent = '开通中...';
try {
const response = await fetch('/api/yaohuo/provision', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({ username })
});
const data = await response.json();
if (data.success) {
showProvisionSuccess(data.data);
} else {
showMessage(`<div class="alert alert-danger">${escapeHtml(data.message)}</div>`);
btn.disabled = false;
btn.textContent = '验证后免码开通';
}
} catch (e) {
showMessage('<div class="alert alert-danger">网络错误,请稍后重试</div>');
btn.disabled = false;
btn.textContent = '验证后免码开通';
}
});
</script>
</body>
</html>
</html>

403
tests/test_app.py Normal file
View File

@@ -0,0 +1,403 @@
import os
from datetime import datetime, timezone
import tempfile
import unittest
from unittest.mock import patch
from pathlib import Path
from office365_self_service import create_app, db
from office365_self_service.models import AuditEvent, RedemptionCode
from office365_self_service.services import Office365Service, ServiceConfigurationError, ServiceOperationError, YaohuoVerificationService
from office365_self_service.settings import GRAPH_BASE_URL, GRAPH_SCOPE, Settings, TOKEN_ENDPOINT_TEMPLATE, load_settings
def build_settings(database_url: str, **overrides) -> Settings:
tenant_id = overrides.pop("tenant_id", "tenant-id")
base = {
"app_name": "Office 365 Self Service Test",
"host": "127.0.0.1",
"port": 5000,
"debug": False,
"session_secret": "test-secret",
"auth_enabled": False,
"admin_username": "",
"admin_password": "",
"client_id": "client-id",
"tenant_id": tenant_id,
"client_secret": "client-secret",
"default_password": "TempPassw0rd!",
"default_domain": "example.com",
"default_usage_location": "US",
"default_license_sku": "",
"license_assignment_required": False,
"force_change_password": True,
"graph_base_url": GRAPH_BASE_URL,
"token_endpoint": TOKEN_ENDPOINT_TEMPLATE.format(tenant_id=tenant_id),
"scope": GRAPH_SCOPE,
"database_url": database_url,
"yaohuo_cookie": "",
"yaohuo_message_url": "https://www.yaohuo.me/bbs/messagelist_add.aspx",
"yaohuo_verification_enabled": False,
"yaohuo_verification_code_ttl_seconds": 600,
"validation_errors": (),
"warnings": (),
}
base.update(overrides)
return Settings(**base)
class FakeService:
def __init__(self, result=None, error=None):
self.result = result or {
"userPrincipalName": "alice@example.com",
"temporaryPassword": "TempPassw0rd!",
"licenseAssigned": True,
"licenseMessage": None,
}
self.error = error
self.calls = []
def create_user(self, username: str, **kwargs):
self.calls.append(username)
if self.error:
raise self.error
return self.result
class FakeGraphClient:
def __init__(self, skus=None, assign_result=None, assign_error=None, delete_error=None):
self.payloads = []
self.deleted_users = []
self.skus = skus or []
self.assign_result = assign_result or {"status": "ok"}
self.assign_error = assign_error
self.delete_error = delete_error
def create_user(self, payload):
self.payloads.append(payload)
return {"id": "user-1"}
def list_subscribed_skus(self):
return self.skus
def assign_license(self, user_id, add_licenses=None, remove_licenses=None):
if self.assign_error:
raise self.assign_error
return self.assign_result
def delete_user(self, user_id):
if self.delete_error:
raise self.delete_error
self.deleted_users.append(user_id)
class FakeYaohuoService:
def __init__(self):
self.sent = []
def generate_code(self) -> str:
return "123456"
def expires_at(self):
return datetime(2026, 4, 15, 12, 0, 0, tzinfo=timezone.utc)
def send_verification_code(self, target_user_id: str, code: str) -> None:
self.sent.append((target_user_id, code))
class AppRouteTests(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
db_path = Path(self.temp_dir.name) / "test.db"
self.settings = build_settings(f"sqlite:///{db_path}")
self.service = FakeService()
self.app = create_app(
settings_override=self.settings,
service_factory=lambda _settings: self.service,
)
self.app.testing = True
self.client = self.app.test_client()
self.app.extensions["yaohuo_verification_service"] = FakeYaohuoService()
with self.app.app_context():
db.drop_all()
db.create_all()
def tearDown(self):
self.temp_dir.cleanup()
def add_code(self, code: str, status: str = "available"):
with self.app.app_context():
db.session.add(RedemptionCode(code=code, status=status))
db.session.commit()
def fetch_code(self, code: str) -> RedemptionCode:
with self.app.app_context():
return RedemptionCode.query.filter_by(code=code).first()
def fetch_audit_events(self) -> list[AuditEvent]:
with self.app.app_context():
return AuditEvent.query.order_by(AuditEvent.created_at.asc(), AuditEvent.id.asc()).all()
def test_redeem_marks_code_used_and_prevents_second_use(self):
self.add_code("CODE-001")
response = self.client.post("/api/redeem", json={"code": "code-001", "username": "alice"})
payload = response.get_json()
self.assertEqual(response.status_code, 201)
self.assertTrue(payload["success"])
self.assertEqual(payload["data"]["userPrincipalName"], "alice@example.com")
self.assertEqual(self.service.calls, ["alice"])
code = self.fetch_code("CODE-001")
self.assertEqual(code.status, "used")
self.assertEqual(code.used_by_username, "alice")
self.assertEqual(code.used_by_principal_name, "alice@example.com")
second = self.client.post("/api/redeem", json={"code": "CODE-001", "username": "bob"})
second_payload = second.get_json()
self.assertEqual(second.status_code, 404)
self.assertFalse(second_payload["success"])
self.assertEqual(self.service.calls, ["alice"])
def test_redeem_releases_code_when_service_fails(self):
self.service = FakeService(error=ServiceOperationError("用户名已存在。", status_code=409))
self.app = create_app(
settings_override=self.settings,
service_factory=lambda _settings: self.service,
)
self.app.testing = True
self.client = self.app.test_client()
with self.app.app_context():
db.drop_all()
db.create_all()
db.session.add(RedemptionCode(code="CODE-002"))
db.session.commit()
response = self.client.post("/api/redeem", json={"code": "CODE-002", "username": "alice"})
payload = response.get_json()
self.assertEqual(response.status_code, 409)
self.assertFalse(payload["success"])
code = self.fetch_code("CODE-002")
self.assertEqual(code.status, "available")
self.assertIsNone(code.used_by_username)
self.assertEqual(self.service.calls, ["alice"])
def test_public_health_endpoint_is_available(self):
response = self.client.get("/api/health")
payload = response.get_json()
self.assertEqual(response.status_code, 200)
self.assertTrue(payload["success"])
self.assertIn("platform", payload["data"])
def test_generate_delete_and_failed_redeem_are_audited(self):
generate = self.client.post("/admin/api/codes/generate", json={"count": 1})
generated_code = generate.get_json()["data"]["codes"][0]
delete = self.client.delete(f"/admin/api/codes/{generated_code}")
self.assertEqual(delete.status_code, 200)
failed = self.client.post("/api/redeem", json={"code": "MISSING-CODE", "username": "alice"})
self.assertEqual(failed.status_code, 404)
audit_response = self.client.get("/admin/api/audit-events?page=1&pageSize=10")
audit_payload = audit_response.get_json()
self.assertEqual(audit_response.status_code, 200)
self.assertEqual(audit_payload["data"]["total"], 3)
self.assertEqual(
[event["eventType"] for event in audit_payload["data"]["events"]],
["redeem_completed", "code_deleted", "code_generated"],
)
def test_successful_redeem_creates_success_audit_event(self):
self.add_code("CODE-004")
response = self.client.post("/api/redeem", json={"code": "CODE-004", "username": "alice"})
self.assertEqual(response.status_code, 201)
events = self.fetch_audit_events()
self.assertEqual(len(events), 1)
event_payload = events[0].to_dict()
self.assertEqual(event_payload["eventType"], "redeem_completed")
self.assertEqual(event_payload["status"], "success")
self.assertEqual(event_payload["code"], "CODE-004")
self.assertEqual(event_payload["principalName"], "alice@example.com")
self.assertEqual(event_payload["details"]["licenseAssigned"], True)
def test_codes_api_uses_database_pagination(self):
for index in range(5):
self.add_code(f"CODE-{index:03d}")
response = self.client.get("/admin/api/codes?page=2&pageSize=2")
payload = response.get_json()
self.assertEqual(response.status_code, 200)
self.assertEqual(payload["data"]["page"], 2)
self.assertEqual(payload["data"]["pageSize"], 2)
self.assertEqual(payload["data"]["total"], 5)
self.assertEqual(payload["data"]["pages"], 3)
self.assertEqual(len(payload["data"]["codes"]), 2)
def test_records_api_uses_database_pagination(self):
with self.app.app_context():
for index in range(5):
db.session.add(
RedemptionCode(
code=f"USED-{index:03d}",
status="used",
used_by_username=f"user{index}",
used_by_principal_name=f"user{index}@example.com",
)
)
db.session.commit()
response = self.client.get("/admin/api/records?page=2&pageSize=2")
payload = response.get_json()
self.assertEqual(response.status_code, 200)
self.assertEqual(payload["data"]["page"], 2)
self.assertEqual(payload["data"]["pageSize"], 2)
self.assertEqual(payload["data"]["total"], 5)
self.assertEqual(payload["data"]["pages"], 3)
self.assertEqual(len(payload["data"]["records"]), 2)
def test_delete_rejects_non_available_codes(self):
self.add_code("CODE-003", status="used")
response = self.client.delete("/admin/api/codes/CODE-003")
payload = response.get_json()
self.assertEqual(response.status_code, 409)
self.assertFalse(payload["success"])
def test_yaohuo_send_verify_and_provision_without_redemption_code(self):
response = self.client.post("/api/yaohuo/send-code", json={"targetUserId": "12345"})
payload = response.get_json()
self.assertEqual(response.status_code, 200)
self.assertTrue(payload["success"])
self.assertEqual(self.app.extensions["yaohuo_verification_service"].sent, [("12345", "123456")])
verify = self.client.post("/api/yaohuo/verify", json={"code": "123456"})
verify_payload = verify.get_json()
self.assertEqual(verify.status_code, 200)
self.assertTrue(verify_payload["success"])
provision = self.client.post("/api/yaohuo/provision", json={"username": "alice"})
provision_payload = provision.get_json()
self.assertEqual(provision.status_code, 201)
self.assertTrue(provision_payload["success"])
self.assertEqual(provision_payload["data"]["userPrincipalName"], "alice@example.com")
self.assertEqual(self.service.calls, ["alice"])
def test_yaohuo_provision_requires_verified_session(self):
response = self.client.post("/api/yaohuo/provision", json={"username": "alice"})
payload = response.get_json()
self.assertEqual(response.status_code, 403)
self.assertFalse(payload["success"])
self.assertEqual(self.service.calls, [])
class ServiceBehaviorTests(unittest.TestCase):
def test_create_user_accepts_full_upn_without_default_domain(self):
settings = build_settings("sqlite:////tmp/unused.db", default_domain="")
service = Office365Service(settings)
fake_client = FakeGraphClient()
service._graph_client = fake_client
result = service.create_user("alice@example.com")
self.assertEqual(result["userPrincipalName"], "alice@example.com")
self.assertEqual(fake_client.payloads[0]["mailNickname"], "alice")
self.assertEqual(fake_client.payloads[0]["userPrincipalName"], "alice@example.com")
def test_create_user_requires_full_upn_when_default_domain_missing(self):
settings = build_settings("sqlite:////tmp/unused.db", default_domain="")
service = Office365Service(settings)
service._graph_client = FakeGraphClient()
with self.assertRaises(ServiceConfigurationError):
service.create_user("alice")
def test_create_user_returns_license_warning_when_not_strict(self):
settings = build_settings(
"sqlite:////tmp/unused.db",
default_license_sku="ENTERPRISEPACK",
license_assignment_required=False,
)
service = Office365Service(settings)
service._graph_client = FakeGraphClient(skus=[])
result = service.create_user("alice")
self.assertFalse(result["licenseAssigned"])
self.assertEqual(result["licenseMessage"], "未找到许可证 SKU: ENTERPRISEPACK")
def test_create_user_rolls_back_when_license_required(self):
settings = build_settings(
"sqlite:////tmp/unused.db",
default_license_sku="ENTERPRISEPACK",
license_assignment_required=True,
)
service = Office365Service(settings)
fake_client = FakeGraphClient(skus=[])
service._graph_client = fake_client
with self.assertRaises(ServiceOperationError) as context:
service.create_user("alice")
self.assertIn("已回滚删除账号 alice@example.com", str(context.exception))
self.assertEqual(fake_client.deleted_users, ["user-1"])
class YaohuoServiceTests(unittest.TestCase):
def test_generate_code_returns_six_digits(self):
settings = build_settings("sqlite:////tmp/unused.db", yaohuo_verification_enabled=True, yaohuo_cookie="cookie")
service = YaohuoVerificationService(settings)
code = service.generate_code()
self.assertEqual(len(code), 6)
self.assertTrue(code.isdigit())
class ModelSerializationTests(unittest.TestCase):
def test_redemption_code_serializes_datetimes_as_utc_z(self):
code = RedemptionCode(
code="CODE-UTC",
created_at=datetime(2026, 3, 31, 12, 0, 0),
used_at=datetime(2026, 3, 31, 13, 0, 0, tzinfo=timezone.utc),
)
payload = code.to_dict()
self.assertEqual(payload["createdAt"], "2026-03-31T12:00:00Z")
self.assertEqual(payload["usedAt"], "2026-03-31T13:00:00Z")
class SettingsTests(unittest.TestCase):
def test_container_database_url_is_remapped_locally(self):
env = {
"CLIENT_ID": "client-id",
"TENANT_ID": "tenant-id",
"CLIENT_SECRET": "client-secret",
"DEFAULT_PASSWORD": "TempPassw0rd!",
"DATABASE_URL": "sqlite:////app/data/redemption.db",
}
with patch.dict(os.environ, env, clear=False):
settings = load_settings()
self.assertTrue(settings.database_url.endswith("/office365-self-service/data/redemption.db"))
self.assertIn("已自动映射到本地路径", " ".join(settings.warnings))
if __name__ == "__main__":
unittest.main()