Files

292 lines
8.4 KiB
Python

from __future__ import annotations
from functools import wraps
import logging
from flask import Blueprint, current_app, jsonify, render_template, request, session
from .batch import BatchInputError, parse_identifier_content, parse_table_content
from .services import Office365Service, ServiceConfigurationError, ServiceOperationError
from .tasks import BackgroundTaskManager, TaskNotFoundError
bp = Blueprint("office365_admin", __name__)
logger = logging.getLogger("office365_admin.routes")
def _settings():
return current_app.config["SETTINGS"]
def _service() -> Office365Service:
return current_app.extensions["office365_service"]
def _task_manager() -> BackgroundTaskManager:
return current_app.extensions["task_manager"]
def _success(data=None, message: str = "ok", status: int = 200):
return jsonify({"success": True, "message": message, "data": data}), status
def _error(message: str, status: int = 400, details=None):
payload = {"success": False, "message": message}
if details is not None:
payload["details"] = details
return jsonify(payload), status
def _authenticated() -> bool:
settings = _settings()
if not settings.effective_auth_enabled:
return True
return bool(session.get("authenticated"))
def require_auth(view_func):
@wraps(view_func)
def wrapped(*args, **kwargs):
if not _authenticated():
return _error("请先登录后台管理平台。", status=401)
return view_func(*args, **kwargs)
return wrapped
def _handle_service_call(callback):
try:
return callback()
except BatchInputError as exc:
return _error(str(exc), status=400)
except ServiceConfigurationError as exc:
return _error(str(exc), status=503)
except ServiceOperationError as exc:
return _error(exc.message, status=exc.status_code, details=exc.details)
except ValueError as exc:
return _error(str(exc), status=400)
def _json_payload() -> dict:
return request.get_json(silent=True) or {}
def _text_payload() -> str:
payload = _json_payload()
if payload.get("content"):
return str(payload["content"])
if request.form.get("content"):
return request.form["content"]
uploaded_file = request.files.get("file")
if uploaded_file and uploaded_file.filename:
return uploaded_file.read().decode("utf-8-sig")
return ""
@bp.get("/")
def index():
return render_template("index.html", bootstrap=_settings().to_public_dict())
@bp.get("/api/health")
def health():
settings = _settings()
return _success(
{
"platform": settings.to_public_dict(),
"authenticated": _authenticated(),
}
)
@bp.get("/api/session")
def session_info():
return _success(
{
"authenticated": _authenticated(),
"authEnabled": _settings().effective_auth_enabled,
}
)
@bp.post("/api/login")
def login():
settings = _settings()
if not settings.effective_auth_enabled:
session["authenticated"] = True
session.permanent = True
return _success({"authenticated": True}, message="当前平台未启用登录保护。")
payload = _json_payload()
username = str(payload.get("username", "")).strip()
password = str(payload.get("password", "")).strip()
if username == settings.admin_username and password == settings.admin_password:
session["authenticated"] = True
session.permanent = True
return _success({"authenticated": True}, message="登录成功。")
return _error("用户名或密码错误。", status=401)
@bp.post("/api/logout")
def logout():
session.clear()
return _success({"authenticated": False}, message="已退出登录。")
@bp.get("/api/config")
@require_auth
def config_info():
return _success(_settings().to_public_dict())
@bp.get("/api/tasks/<task_id>")
@require_auth
def task_status(task_id: str):
try:
return _success(_task_manager().get_task(task_id))
except TaskNotFoundError:
return _error("任务不存在。", status=404)
@bp.get("/api/licenses")
@require_auth
def licenses():
return _handle_service_call(lambda: _success(_service().list_licenses()))
@bp.get("/api/users")
@require_auth
def list_users():
search = request.args.get("search", "").strip()
try:
page = int(request.args.get("page", "1"))
page_size = int(request.args.get("pageSize", str(_settings().default_page_size)))
except ValueError:
return _error("page 和 pageSize 必须是整数。", status=400)
return _handle_service_call(
lambda: _success(_service().list_users(search=search, page=page, page_size=page_size))
)
@bp.get("/api/users/selection")
@require_auth
def list_user_identifiers():
search = request.args.get("search", "").strip()
return _handle_service_call(
lambda: _success(_service().list_user_identifiers(search=search))
)
@bp.get("/api/users/<path:identifier>")
@require_auth
def get_user(identifier: str):
return _handle_service_call(lambda: _success(_service().get_user(identifier)))
@bp.post("/api/users")
@require_auth
def create_user():
payload = _json_payload()
return _handle_service_call(
lambda: _success(_service().create_user(payload), message="用户创建成功。", status=201)
)
@bp.patch("/api/users/<path:identifier>")
@require_auth
def update_user(identifier: str):
payload = _json_payload()
return _handle_service_call(
lambda: _success(_service().update_user(identifier, payload), message="用户更新成功。")
)
@bp.delete("/api/users/<path:identifier>")
@require_auth
def delete_user(identifier: str):
return _handle_service_call(
lambda: _success(_service().delete_user(identifier), message="用户删除成功。")
)
@bp.post("/api/users/<path:identifier>/reset-password")
@require_auth
def reset_password(identifier: str):
payload = _json_payload()
return _handle_service_call(
lambda: _success(_service().reset_password(identifier, payload), message="密码重置成功。")
)
@bp.post("/api/users/batch/create")
@require_auth
def batch_create():
payload = _json_payload()
rows = payload.get("rows")
if rows is None:
rows = parse_table_content(_text_payload())
return _handle_service_call(lambda: _submit_batch_task("create", rows))
@bp.post("/api/users/batch/update")
@require_auth
def batch_update():
payload = _json_payload()
rows = payload.get("rows")
if rows is None:
rows = parse_table_content(_text_payload())
return _handle_service_call(lambda: _submit_batch_task("update", rows))
@bp.post("/api/users/batch/delete")
@require_auth
def batch_delete():
payload = _json_payload()
identifiers = payload.get("identifiers")
if identifiers is None:
identifiers = parse_identifier_content(_text_payload())
return _handle_service_call(lambda: _submit_batch_task("delete", identifiers))
@bp.post("/api/users/batch/reset-password")
@require_auth
def batch_reset_password():
payload = _json_payload()
rows = payload.get("rows")
if rows is None:
text = _text_payload()
try:
rows = parse_table_content(text)
except BatchInputError:
rows = parse_identifier_content(text)
return _handle_service_call(lambda: _submit_batch_task("reset-password", rows))
def _submit_batch_task(operation: str, items):
service = _service()
task_manager = _task_manager()
total = len(items)
if operation == "create":
runner = lambda progress: service.batch_create(items, progress_callback=progress)
message = "批量创建任务已提交。"
elif operation == "update":
runner = lambda progress: service.batch_update(items, progress_callback=progress)
message = "批量更新任务已提交。"
elif operation == "delete":
runner = lambda progress: service.batch_delete(items, progress_callback=progress)
message = "批量删除任务已提交。"
elif operation == "reset-password":
runner = lambda progress: service.batch_reset_password(items, progress_callback=progress)
message = "批量重置密码任务已提交。"
else:
raise ValueError("不支持的批量任务类型。")
logger.info("Submitting batch task: operation=%s total=%s", operation, total)
task = task_manager.submit(operation=operation, total=total, runner=runner)
return _success(task, message=message, status=202)