Initial commit: Office365 web management platform
This commit is contained in:
149
office365_admin/batch.py
Normal file
149
office365_admin/batch.py
Normal file
@@ -0,0 +1,149 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
|
||||
IDENTIFIER_KEYS = (
|
||||
"userprincipalname",
|
||||
"user_id",
|
||||
"userid",
|
||||
"username",
|
||||
"email",
|
||||
"mail",
|
||||
"id",
|
||||
"upn",
|
||||
)
|
||||
|
||||
|
||||
class BatchInputError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
def _strip_bom(raw_text: str) -> str:
|
||||
return raw_text.lstrip("\ufeff").strip()
|
||||
|
||||
|
||||
def _clean_dict(row: dict[str, Any]) -> dict[str, Any]:
|
||||
cleaned: dict[str, Any] = {}
|
||||
for key, value in row.items():
|
||||
normalized_key = str(key).strip()
|
||||
if not normalized_key:
|
||||
continue
|
||||
cleaned[normalized_key] = value.strip() if isinstance(value, str) else value
|
||||
return cleaned
|
||||
|
||||
|
||||
def parse_table_content(raw_text: str) -> list[dict[str, Any]]:
|
||||
raw_text = _strip_bom(raw_text)
|
||||
if not raw_text:
|
||||
raise BatchInputError("批量内容不能为空。")
|
||||
|
||||
try:
|
||||
payload = json.loads(raw_text)
|
||||
except json.JSONDecodeError:
|
||||
payload = None
|
||||
|
||||
if payload is not None:
|
||||
if not isinstance(payload, list):
|
||||
raise BatchInputError("JSON 批量内容必须是数组。")
|
||||
rows: list[dict[str, Any]] = []
|
||||
for index, item in enumerate(payload, start=1):
|
||||
if not isinstance(item, dict):
|
||||
raise BatchInputError(f"第 {index} 条 JSON 记录不是对象。")
|
||||
rows.append(_clean_dict(item))
|
||||
if not rows:
|
||||
raise BatchInputError("批量内容中没有可用记录。")
|
||||
return rows
|
||||
|
||||
try:
|
||||
reader = csv.DictReader(io.StringIO(raw_text))
|
||||
except csv.Error as exc:
|
||||
raise BatchInputError(f"CSV 解析失败: {exc}") from exc
|
||||
|
||||
if not reader.fieldnames:
|
||||
raise BatchInputError("CSV 缺少表头。")
|
||||
|
||||
rows = []
|
||||
for row in reader:
|
||||
if not row:
|
||||
continue
|
||||
if not any((value or "").strip() for value in row.values()):
|
||||
continue
|
||||
rows.append(_clean_dict(row))
|
||||
|
||||
if not rows:
|
||||
raise BatchInputError("未解析到任何批量记录。")
|
||||
return rows
|
||||
|
||||
|
||||
def parse_identifier_content(raw_text: str) -> list[str]:
|
||||
raw_text = _strip_bom(raw_text)
|
||||
if not raw_text:
|
||||
raise BatchInputError("批量内容不能为空。")
|
||||
|
||||
try:
|
||||
payload = json.loads(raw_text)
|
||||
except json.JSONDecodeError:
|
||||
payload = None
|
||||
|
||||
if payload is not None:
|
||||
if not isinstance(payload, list):
|
||||
raise BatchInputError("JSON 批量删除内容必须是数组。")
|
||||
values: list[str] = []
|
||||
for item in payload:
|
||||
if isinstance(item, str) and item.strip():
|
||||
values.append(item.strip())
|
||||
elif isinstance(item, dict):
|
||||
identifier = extract_identifier(item)
|
||||
if identifier:
|
||||
values.append(identifier)
|
||||
if not values:
|
||||
raise BatchInputError("JSON 内容中没有可用的账号标识。")
|
||||
return values
|
||||
|
||||
first_line = raw_text.splitlines()[0] if raw_text.splitlines() else ""
|
||||
if "," in first_line:
|
||||
try:
|
||||
reader = csv.DictReader(io.StringIO(raw_text))
|
||||
if reader.fieldnames:
|
||||
values = []
|
||||
for row in reader:
|
||||
identifier = extract_identifier(_clean_dict(row))
|
||||
if identifier:
|
||||
values.append(identifier)
|
||||
if values:
|
||||
return values
|
||||
except csv.Error:
|
||||
pass
|
||||
|
||||
raw_text = raw_text.replace(",", "\n")
|
||||
values = []
|
||||
for line in raw_text.splitlines():
|
||||
normalized = line.strip()
|
||||
if normalized and not normalized.startswith("#"):
|
||||
values.append(normalized)
|
||||
|
||||
if not values:
|
||||
raise BatchInputError("没有找到可用的账号标识。")
|
||||
return values
|
||||
|
||||
|
||||
def extract_identifier(row: dict[str, Any]) -> str:
|
||||
key_map = {normalize_key(key): value for key, value in row.items()}
|
||||
for key in IDENTIFIER_KEYS:
|
||||
value = key_map.get(normalize_key(key))
|
||||
if value is None:
|
||||
continue
|
||||
if isinstance(value, str) and value.strip():
|
||||
return value.strip()
|
||||
if value:
|
||||
return str(value)
|
||||
return ""
|
||||
|
||||
|
||||
def normalize_key(key: str) -> str:
|
||||
return "".join(ch for ch in key.lower() if ch.isalnum())
|
||||
|
||||
Reference in New Issue
Block a user