150 lines
4.2 KiB
Python
150 lines
4.2 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import io
|
|
import json
|
|
from typing import Any
|
|
|
|
|
|
IDENTIFIER_KEYS = (
|
|
"userprincipalname",
|
|
"user_id",
|
|
"userid",
|
|
"username",
|
|
"email",
|
|
"mail",
|
|
"id",
|
|
"upn",
|
|
)
|
|
|
|
|
|
class BatchInputError(ValueError):
|
|
pass
|
|
|
|
|
|
def _strip_bom(raw_text: str) -> str:
|
|
return raw_text.lstrip("\ufeff").strip()
|
|
|
|
|
|
def _clean_dict(row: dict[str, Any]) -> dict[str, Any]:
|
|
cleaned: dict[str, Any] = {}
|
|
for key, value in row.items():
|
|
normalized_key = str(key).strip()
|
|
if not normalized_key:
|
|
continue
|
|
cleaned[normalized_key] = value.strip() if isinstance(value, str) else value
|
|
return cleaned
|
|
|
|
|
|
def parse_table_content(raw_text: str) -> list[dict[str, Any]]:
|
|
raw_text = _strip_bom(raw_text)
|
|
if not raw_text:
|
|
raise BatchInputError("批量内容不能为空。")
|
|
|
|
try:
|
|
payload = json.loads(raw_text)
|
|
except json.JSONDecodeError:
|
|
payload = None
|
|
|
|
if payload is not None:
|
|
if not isinstance(payload, list):
|
|
raise BatchInputError("JSON 批量内容必须是数组。")
|
|
rows: list[dict[str, Any]] = []
|
|
for index, item in enumerate(payload, start=1):
|
|
if not isinstance(item, dict):
|
|
raise BatchInputError(f"第 {index} 条 JSON 记录不是对象。")
|
|
rows.append(_clean_dict(item))
|
|
if not rows:
|
|
raise BatchInputError("批量内容中没有可用记录。")
|
|
return rows
|
|
|
|
try:
|
|
reader = csv.DictReader(io.StringIO(raw_text))
|
|
except csv.Error as exc:
|
|
raise BatchInputError(f"CSV 解析失败: {exc}") from exc
|
|
|
|
if not reader.fieldnames:
|
|
raise BatchInputError("CSV 缺少表头。")
|
|
|
|
rows = []
|
|
for row in reader:
|
|
if not row:
|
|
continue
|
|
if not any((value or "").strip() for value in row.values()):
|
|
continue
|
|
rows.append(_clean_dict(row))
|
|
|
|
if not rows:
|
|
raise BatchInputError("未解析到任何批量记录。")
|
|
return rows
|
|
|
|
|
|
def parse_identifier_content(raw_text: str) -> list[str]:
|
|
raw_text = _strip_bom(raw_text)
|
|
if not raw_text:
|
|
raise BatchInputError("批量内容不能为空。")
|
|
|
|
try:
|
|
payload = json.loads(raw_text)
|
|
except json.JSONDecodeError:
|
|
payload = None
|
|
|
|
if payload is not None:
|
|
if not isinstance(payload, list):
|
|
raise BatchInputError("JSON 批量删除内容必须是数组。")
|
|
values: list[str] = []
|
|
for item in payload:
|
|
if isinstance(item, str) and item.strip():
|
|
values.append(item.strip())
|
|
elif isinstance(item, dict):
|
|
identifier = extract_identifier(item)
|
|
if identifier:
|
|
values.append(identifier)
|
|
if not values:
|
|
raise BatchInputError("JSON 内容中没有可用的账号标识。")
|
|
return values
|
|
|
|
first_line = raw_text.splitlines()[0] if raw_text.splitlines() else ""
|
|
if "," in first_line:
|
|
try:
|
|
reader = csv.DictReader(io.StringIO(raw_text))
|
|
if reader.fieldnames:
|
|
values = []
|
|
for row in reader:
|
|
identifier = extract_identifier(_clean_dict(row))
|
|
if identifier:
|
|
values.append(identifier)
|
|
if values:
|
|
return values
|
|
except csv.Error:
|
|
pass
|
|
|
|
raw_text = raw_text.replace(",", "\n")
|
|
values = []
|
|
for line in raw_text.splitlines():
|
|
normalized = line.strip()
|
|
if normalized and not normalized.startswith("#"):
|
|
values.append(normalized)
|
|
|
|
if not values:
|
|
raise BatchInputError("没有找到可用的账号标识。")
|
|
return values
|
|
|
|
|
|
def extract_identifier(row: dict[str, Any]) -> str:
|
|
key_map = {normalize_key(key): value for key, value in row.items()}
|
|
for key in IDENTIFIER_KEYS:
|
|
value = key_map.get(normalize_key(key))
|
|
if value is None:
|
|
continue
|
|
if isinstance(value, str) and value.strip():
|
|
return value.strip()
|
|
if value:
|
|
return str(value)
|
|
return ""
|
|
|
|
|
|
def normalize_key(key: str) -> str:
|
|
return "".join(ch for ch in key.lower() if ch.isalnum())
|
|
|