Files
office365manage/office365_admin/batch.py

150 lines
4.2 KiB
Python

from __future__ import annotations
import csv
import io
import json
from typing import Any
IDENTIFIER_KEYS = (
"userprincipalname",
"user_id",
"userid",
"username",
"email",
"mail",
"id",
"upn",
)
class BatchInputError(ValueError):
pass
def _strip_bom(raw_text: str) -> str:
return raw_text.lstrip("\ufeff").strip()
def _clean_dict(row: dict[str, Any]) -> dict[str, Any]:
cleaned: dict[str, Any] = {}
for key, value in row.items():
normalized_key = str(key).strip()
if not normalized_key:
continue
cleaned[normalized_key] = value.strip() if isinstance(value, str) else value
return cleaned
def parse_table_content(raw_text: str) -> list[dict[str, Any]]:
raw_text = _strip_bom(raw_text)
if not raw_text:
raise BatchInputError("批量内容不能为空。")
try:
payload = json.loads(raw_text)
except json.JSONDecodeError:
payload = None
if payload is not None:
if not isinstance(payload, list):
raise BatchInputError("JSON 批量内容必须是数组。")
rows: list[dict[str, Any]] = []
for index, item in enumerate(payload, start=1):
if not isinstance(item, dict):
raise BatchInputError(f"{index} 条 JSON 记录不是对象。")
rows.append(_clean_dict(item))
if not rows:
raise BatchInputError("批量内容中没有可用记录。")
return rows
try:
reader = csv.DictReader(io.StringIO(raw_text))
except csv.Error as exc:
raise BatchInputError(f"CSV 解析失败: {exc}") from exc
if not reader.fieldnames:
raise BatchInputError("CSV 缺少表头。")
rows = []
for row in reader:
if not row:
continue
if not any((value or "").strip() for value in row.values()):
continue
rows.append(_clean_dict(row))
if not rows:
raise BatchInputError("未解析到任何批量记录。")
return rows
def parse_identifier_content(raw_text: str) -> list[str]:
raw_text = _strip_bom(raw_text)
if not raw_text:
raise BatchInputError("批量内容不能为空。")
try:
payload = json.loads(raw_text)
except json.JSONDecodeError:
payload = None
if payload is not None:
if not isinstance(payload, list):
raise BatchInputError("JSON 批量删除内容必须是数组。")
values: list[str] = []
for item in payload:
if isinstance(item, str) and item.strip():
values.append(item.strip())
elif isinstance(item, dict):
identifier = extract_identifier(item)
if identifier:
values.append(identifier)
if not values:
raise BatchInputError("JSON 内容中没有可用的账号标识。")
return values
first_line = raw_text.splitlines()[0] if raw_text.splitlines() else ""
if "," in first_line:
try:
reader = csv.DictReader(io.StringIO(raw_text))
if reader.fieldnames:
values = []
for row in reader:
identifier = extract_identifier(_clean_dict(row))
if identifier:
values.append(identifier)
if values:
return values
except csv.Error:
pass
raw_text = raw_text.replace(",", "\n")
values = []
for line in raw_text.splitlines():
normalized = line.strip()
if normalized and not normalized.startswith("#"):
values.append(normalized)
if not values:
raise BatchInputError("没有找到可用的账号标识。")
return values
def extract_identifier(row: dict[str, Any]) -> str:
key_map = {normalize_key(key): value for key, value in row.items()}
for key in IDENTIFIER_KEYS:
value = key_map.get(normalize_key(key))
if value is None:
continue
if isinstance(value, str) and value.strip():
return value.strip()
if value:
return str(value)
return ""
def normalize_key(key: str) -> str:
return "".join(ch for ch in key.lower() if ch.isalnum())