import sys import json import time import base64 import random import signal import logging import socket import threading import subprocess import urllib.parse from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from dataclasses import dataclass, asdict from datetime import datetime import yaml import requests from flask import Flask, jsonify, request # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('airtosocks') DEFAULT_TEST_URL = 'http://ifconfig.me/ip' # 常量 BASE_DIR = Path(__file__).parent CONFIG_DIR = BASE_DIR / 'config' DATA_DIR = BASE_DIR / 'data' BIN_DIR = BASE_DIR / 'bin' XRAY_BIN = BIN_DIR / 'xray' CONFIG_DIR.mkdir(exist_ok=True) DATA_DIR.mkdir(exist_ok=True) BIN_DIR.mkdir(exist_ok=True) @dataclass class ProxyNode: """代理节点信息""" id: str name: str protocol: str # vmess, vless, trojan, ss address: str port: int uuid: str = "" alter_id: int = 0 security: str = "auto" network: str = "tcp" ws_path: str = "" ws_host: str = "" tls: bool = False sni: str = "" flow: str = "" raw_link: str = "" available: bool = False last_check: str = "" socks_port: int = 0 # 分配的本地socks5端口 xray_pid: int = 0 subscription_id: str = "" @dataclass class Subscription: id: str name: str url: str node_ids: List[str] last_sync: str = "" last_error: str = "" sync_interval: int = 0 # 分钟,0=不自动同步 @dataclass class NodePool: id: str name: str node_ids: List[str] created_at: str = "" def safe_b64decode(value: str) -> bytes: padding = (-len(value)) % 4 return base64.b64decode(value + ('=' * padding)) def make_node_id(protocol: str, address: str, port: int, user_id: str) -> str: return f"{protocol}:{address}:{port}:{user_id}" class LinkParser: """解析代理链接""" @staticmethod def parse_vmess(link: str) -> Optional[ProxyNode]: """解析vmess链接""" try: if not link.startswith('vmess://'): return None encoded = link[8:] data = json.loads(safe_b64decode(encoded).decode('utf-8')) address = str(data.get('add', '')) port = int(data.get('port', 443)) uuid = str(data.get('id', '')) node = ProxyNode( id=make_node_id('vmess', address, port, uuid), name=str(data.get('ps', data.get('add', 'unknown'))), protocol='vmess', address=address, port=port, uuid=uuid, alter_id=int(data.get('aid', 0)), security=str(data.get('scy', 'auto')), network=str(data.get('net', 'tcp')), ws_path=str(data.get('path', '')), ws_host=str(data.get('host', '')), tls=str(data.get('tls', '')) == 'tls', sni=str(data.get('sni') or data.get('host') or ''), raw_link=link ) return node except Exception as e: logger.error(f"解析vmess链接失败: {e}") return None @staticmethod def parse_vless(link: str) -> Optional[ProxyNode]: """解析vless链接""" try: if not link.startswith('vless://'): return None parsed = urllib.parse.urlparse(link) params = urllib.parse.parse_qs(parsed.query) address = parsed.hostname or '' port = parsed.port or 443 uuid = parsed.username or '' node = ProxyNode( id=make_node_id('vless', address, port, uuid), name=urllib.parse.unquote(parsed.fragment) if parsed.fragment else parsed.hostname or '', protocol='vless', address=address, port=port, uuid=uuid, security=params.get('security', ['none'])[0], network=params.get('type', ['tcp'])[0], ws_path=params.get('path', ['/'])[0], ws_host=params.get('host', [''])[0], tls=params.get('security', ['none'])[0] == 'tls', sni=params.get('sni', [address])[0] or '', flow=params.get('flow', [''])[0] or '', raw_link=link ) return node except Exception as e: logger.error(f"解析vless链接失败: {e}") return None @staticmethod def parse_trojan(link: str) -> Optional[ProxyNode]: """解析trojan链接""" try: if not link.startswith('trojan://'): return None parsed = urllib.parse.urlparse(link) params = urllib.parse.parse_qs(parsed.query) address = parsed.hostname or '' port = parsed.port or 443 password = parsed.password or parsed.username or '' node = ProxyNode( id=make_node_id('trojan', address, port, password), name=urllib.parse.unquote(parsed.fragment) if parsed.fragment else parsed.hostname or '', protocol='trojan', address=address, port=port, uuid=password, network=params.get('type', ['tcp'])[0], tls=True, sni=params.get('sni', [address])[0] or '', raw_link=link ) return node except Exception as e: logger.error(f"解析trojan链接失败: {e}") return None @staticmethod def parse_ss(link: str) -> Optional[ProxyNode]: """解析ss链接""" try: if not link.startswith('ss://'): return None # ss://base64(method:password)@server:port#name # 或 ss://base64(method:password@server:port)#name content = link[5:] if '@' in content: # 新格式 parsed = urllib.parse.urlparse(link) userinfo = safe_b64decode(parsed.username or '').decode('utf-8') method, password = userinfo.split(':', 1) address = parsed.hostname or '' port = parsed.port or 8388 node = ProxyNode( id=make_node_id('ss', address, port, password), name=urllib.parse.unquote(parsed.fragment) if parsed.fragment else parsed.hostname or '', protocol='ss', address=address, port=port, uuid=password, security=method, raw_link=link ) return node else: # 旧格式 ss://base64 payload = content.split('#')[0] decoded = safe_b64decode(payload).decode('utf-8') parts = decoded.rsplit('@', 1) method, password = parts[0].split(':', 1) host, port = parts[1].rsplit(':', 1) node = ProxyNode( id=make_node_id('ss', host, int(port), password), name=content.split('#')[1] if '#' in content else host, protocol='ss', address=host, port=int(port), uuid=password, security=method, raw_link=link ) return node except Exception as e: logger.error(f"解析ss链接失败: {e}") return None @staticmethod def parse_link(link: str) -> Optional[ProxyNode]: """自动识别并解析链接""" link = link.strip() if link.startswith('vmess://'): return LinkParser.parse_vmess(link) elif link.startswith('vless://'): return LinkParser.parse_vless(link) elif link.startswith('trojan://'): return LinkParser.parse_trojan(link) elif link.startswith('ss://'): return LinkParser.parse_ss(link) else: logger.warning(f"不支持的协议: {link[:20]}...") return None class XrayConfigGenerator: """生成xray配置""" @staticmethod def generate_config(node: ProxyNode, socks_port: int) -> Dict[str, Any]: """生成xray配置""" config = { "log": { "loglevel": "warning" }, "inbounds": [ { "tag": "socks", "port": socks_port, "listen": "0.0.0.0", "protocol": "socks", "settings": { "auth": "noauth", "udp": True } } ], "outbounds": [XrayConfigGenerator._get_outbound(node)], "routing": { "domainStrategy": "AsIs" } } return config @staticmethod def _get_outbound(node: ProxyNode) -> Dict[str, Any]: """根据节点类型生成outbound配置""" if node.protocol == 'vmess': return { "protocol": "vmess", "settings": { "vnext": [{ "address": node.address, "port": node.port, "users": [{ "id": node.uuid, "alterId": node.alter_id, "security": node.security }] }] }, "streamSettings": XrayConfigGenerator._get_stream_settings(node) } elif node.protocol == 'vless': vless_user: Dict[str, Any] = { "id": node.uuid, "encryption": "none" } if node.flow: vless_user["flow"] = node.flow return { "protocol": "vless", "settings": { "vnext": [{ "address": node.address, "port": node.port, "users": [vless_user] }] }, "streamSettings": XrayConfigGenerator._get_stream_settings(node) } elif node.protocol == 'trojan': return { "protocol": "trojan", "settings": { "servers": [{ "address": node.address, "port": node.port, "password": node.uuid }] }, "streamSettings": XrayConfigGenerator._get_stream_settings(node) } elif node.protocol == 'ss': return { "protocol": "shadowsocks", "settings": { "servers": [{ "address": node.address, "port": node.port, "method": node.security, "password": node.uuid }] } } return {} @staticmethod def _get_stream_settings(node: ProxyNode) -> Dict[str, Any]: """生成传输层配置""" settings: Dict[str, Any] = { "network": node.network if node.network in ['tcp', 'ws', 'grpc', 'http'] else "tcp" } if node.tls: settings["security"] = "tls" settings["tlsSettings"] = { "serverName": node.sni, "allowInsecure": True } if node.network == 'ws': settings["wsSettings"] = { "path": node.ws_path, "headers": { "Host": node.ws_host or node.address } } return settings class ProxyManager: """代理管理器""" def __init__(self): self.nodes: Dict[str, ProxyNode] = {} self.lock = threading.RLock() self.base_port = 10000 self.port_pool = set(range(self.base_port, self.base_port + 1000)) self.used_ports = set() self.xray_processes: Dict[str, subprocess.Popen] = {} self.config_file = CONFIG_DIR / 'proxies.json' self.subscription_file = CONFIG_DIR / 'subscriptions.json' self.subscriptions: Dict[str, Subscription] = {} self.pools: Dict[str, NodePool] = {} self.pool_file = CONFIG_DIR / 'pools.json' self._load_nodes() self._load_subscriptions() self._load_pools() def _load_nodes(self): """加载已保存的节点""" if self.config_file.exists(): try: with open(self.config_file, 'r') as f: data = json.load(f) for node_data in data: node = ProxyNode(**node_data) self.nodes[node.id] = node logger.info(f"加载了 {len(self.nodes)} 个节点") except Exception as e: logger.error(f"加载节点失败: {e}") def _save_nodes(self): """保存节点到文件""" try: with open(self.config_file, 'w') as f: json.dump([asdict(n) for n in self.nodes.values()], f, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"保存节点失败: {e}") def _load_subscriptions(self): if self.subscription_file.exists(): try: with open(self.subscription_file, 'r') as f: data = json.load(f) for sub_data in data: sub = Subscription(**sub_data) self.subscriptions[sub.id] = sub logger.info(f"加载了 {len(self.subscriptions)} 个订阅") except Exception as e: logger.error(f"加载订阅失败: {e}") def _save_subscriptions(self): try: with open(self.subscription_file, 'w') as f: json.dump([asdict(s) for s in self.subscriptions.values()], f, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"保存订阅失败: {e}") def _load_pools(self): if self.pool_file.exists(): try: with open(self.pool_file, 'r') as f: data = json.load(f) for p in data: pool = NodePool(**p) self.pools[pool.id] = pool logger.info(f"加载了 {len(self.pools)} 个节点池") except Exception as e: logger.error(f"加载节点池失败: {e}") def _save_pools(self): try: with open(self.pool_file, 'w') as f: json.dump([asdict(p) for p in self.pools.values()], f, indent=2, ensure_ascii=False) except Exception as e: logger.error(f"保存节点池失败: {e}") def search_nodes(self, keyword: str, only_available: bool = True) -> List[ProxyNode]: """按关键字搜索节点""" kw = keyword.strip().lower() with self.lock: results = [] for n in self.nodes.values(): if '0.0.0.0' in n.address: continue if only_available and not n.available: continue text = f"{n.name} {n.address} {n.protocol}".lower() if kw in text: results.append(n) return results def create_pool(self, name: str, node_ids: List[str]) -> NodePool: pool_id = f"pool:{base64.urlsafe_b64encode(name.encode()).decode().rstrip('=')}:{int(time.time())}" pool = NodePool( id=pool_id, name=name, node_ids=node_ids, created_at=datetime.now().isoformat() ) with self.lock: self.pools[pool_id] = pool self._save_pools() return pool def update_pool(self, pool_id: str, name: str = "", node_ids: Optional[List[str]] = None) -> Optional[NodePool]: with self.lock: pool = self.pools.get(pool_id) if not pool: return None if name: pool.name = name if node_ids is not None: pool.node_ids = node_ids self._save_pools() return pool def delete_pool(self, pool_id: str) -> bool: with self.lock: if pool_id not in self.pools: return False del self.pools[pool_id] self._save_pools() return True def get_pool(self, pool_id: str) -> Optional[NodePool]: with self.lock: return self.pools.get(pool_id) def get_all_pools(self) -> List[NodePool]: with self.lock: return list(self.pools.values()) def get_random_from_pool(self, pool_id: str) -> Optional[ProxyNode]: """从指定池中随机获取可用节点""" with self.lock: pool = self.pools.get(pool_id) if not pool: return None candidates = [] for nid in pool.node_ids: n = self.nodes.get(nid) if n and n.available and n.socks_port > 0: candidates.append(n) if not candidates: # 降级:返回任意有端口的 for nid in pool.node_ids: n = self.nodes.get(nid) if n and n.socks_port > 0 and '0.0.0.0' not in n.address: candidates.append(n) if not candidates: return None random.shuffle(candidates) for node in candidates: if self._is_port_alive(node.socks_port): return node return None def add_link(self, link: str) -> Tuple[bool, str]: """添加代理链接""" return self.add_link_with_subscription(link, "") def add_link_with_subscription(self, link: str, subscription_id: str = "") -> Tuple[bool, str]: """添加代理链接并记录订阅来源""" node = LinkParser.parse_link(link) if not node: return False, "解析链接失败" with self.lock: if node.id in self.nodes: existing = self.nodes[node.id] if subscription_id: existing.subscription_id = subscription_id self._save_nodes() return False, "节点已存在" node.subscription_id = subscription_id self.nodes[node.id] = node self._save_nodes() logger.info(f"添加节点: {node.name} ({node.protocol}://{node.address}:{node.port})") return True, f"添加成功: {node.name}" def add_links(self, links: List[str]) -> Tuple[int, int]: """批量添加链接""" success = 0 fail = 0 for link in links: ok, _ = self.add_link(link) if ok: success += 1 else: fail += 1 return success, fail def _normalize_subscription_text(self, text: str) -> str: stripped = text.strip() if not stripped: return "" if '://' in stripped: return stripped try: return safe_b64decode(stripped).decode('utf-8', errors='ignore') except Exception: return stripped def _extract_proxy_links(self, text: str) -> List[str]: normalized = self._normalize_subscription_text(text) links: List[str] = [] prefixes = ('vmess://', 'vless://', 'trojan://', 'ss://') for line in normalized.replace('\r', '\n').split('\n'): line = line.strip() if line.startswith(prefixes): links.append(line) return links def sync_subscription(self, subscription_id: str) -> Tuple[bool, str, int, int]: with self.lock: sub = self.subscriptions.get(subscription_id) if not sub: return False, '订阅不存在', 0, 0 url = sub.url try: resp = requests.get(url, timeout=30) resp.raise_for_status() links = self._extract_proxy_links(resp.text) if not links: with self.lock: sub.last_sync = datetime.now().isoformat() sub.last_error = '订阅中没有可解析的代理链接' self._save_subscriptions() return False, sub.last_error, 0, 0 with self.lock: old_node_ids = list(sub.node_ids) for node_id in old_node_ids: self.delete_node(node_id) success = 0 fail = 0 new_node_ids: List[str] = [] for link in links: ok, _ = self.add_link_with_subscription(link, subscription_id) node = LinkParser.parse_link(link) if ok and node: new_node_ids.append(node.id) success += 1 else: fail += 1 with self.lock: sub.node_ids = new_node_ids sub.last_sync = datetime.now().isoformat() sub.last_error = '' if success > 0 else '没有成功导入任何节点' self._save_subscriptions() return success > 0, '同步完成', success, fail except Exception as e: with self.lock: sub = self.subscriptions.get(subscription_id) if sub: sub.last_sync = datetime.now().isoformat() sub.last_error = str(e) self._save_subscriptions() return False, f'同步失败: {e}', 0, 0 def add_subscription(self, url: str, name: str = '', sync_interval: int = 0) -> Tuple[bool, str, Optional[Subscription], int, int]: subscription_id = f"sub:{base64.urlsafe_b64encode(url.encode()).decode().rstrip('=')}" with self.lock: if subscription_id in self.subscriptions: return False, '订阅已存在', None, 0, 0 sub = Subscription( id=subscription_id, name=name or url, url=url, node_ids=[], sync_interval=sync_interval ) self.subscriptions[subscription_id] = sub self._save_subscriptions() ok, message, success, fail = self.sync_subscription(subscription_id) with self.lock: sub = self.subscriptions.get(subscription_id) return ok, message, sub, success, fail def delete_subscription(self, subscription_id: str) -> bool: with self.lock: sub = self.subscriptions.get(subscription_id) if not sub: return False node_ids = list(sub.node_ids) for node_id in node_ids: self.delete_node(node_id) with self.lock: self.subscriptions.pop(subscription_id, None) self._save_subscriptions() return True def get_all_subscriptions(self) -> List[Subscription]: with self.lock: return list(self.subscriptions.values()) def get_available_port(self) -> Optional[int]: """获取可用端口""" with self.lock: available = self.port_pool - self.used_ports if not available: return None port = random.choice(list(available)) self.used_ports.add(port) return port def release_port(self, port: int): """释放端口""" with self.lock: self.used_ports.discard(port) def start_xray(self, node_id: str) -> bool: """启动xray进程""" node = self.nodes.get(node_id) if not node: return False # 停止旧进程 self.stop_xray(node_id) # 获取端口 socks_port = self.get_available_port() if not socks_port: logger.error("没有可用端口") return False try: # 生成配置 config = XrayConfigGenerator.generate_config(node, socks_port) config_file = DATA_DIR / f'xray_{node_id}.json' with open(config_file, 'w') as f: json.dump(config, f, ensure_ascii=False, indent=2) # 启动xray if not XRAY_BIN.exists(): logger.error(f"xray不存在: {XRAY_BIN}") self.release_port(socks_port) return False proc = subprocess.Popen( [str(XRAY_BIN), 'run', '-config', str(config_file)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) # 等待一下检查进程是否启动 time.sleep(0.5) if proc.poll() is not None: logger.error(f"xray进程启动失败: {node_id}") self.release_port(socks_port) return False with self.lock: node.socks_port = socks_port node.xray_pid = proc.pid self.xray_processes[node_id] = proc self._save_nodes() logger.info(f"启动xray: {node.name} -> socks5://127.0.0.1:{socks_port}") return True except Exception as e: logger.error(f"启动xray失败: {e}") self.release_port(socks_port) return False def stop_xray(self, node_id: str): """停止xray进程""" with self.lock: proc = self.xray_processes.pop(node_id, None) node = self.nodes.get(node_id) if proc: try: proc.terminate() proc.wait(timeout=5) except: proc.kill() if node and node.socks_port: self.release_port(node.socks_port) node.socks_port = 0 node.xray_pid = 0 self._save_nodes() def check_node(self, node_id: str, timeout: int = 8) -> bool: """检查节点可用性""" node = self.nodes.get(node_id) if not node: return False # 如果没有运行,先启动 if node.socks_port == 0: if not self.start_xray(node_id): return False time.sleep(0.3) # 通过代理测试连接 try: proxies = { 'http': f'socks5h://127.0.0.1:{node.socks_port}', 'https': f'socks5h://127.0.0.1:{node.socks_port}' } test_url = DEFAULT_TEST_URL resp = requests.get( test_url, proxies=proxies, timeout=timeout ) available = resp.status_code in (200, 204) with self.lock: node.available = available node.last_check = datetime.now().isoformat() self._save_nodes() return available except Exception as e: logger.warning(f"检查节点失败 {node.name}: {e}") with self.lock: node.available = False node.last_check = datetime.now().isoformat() self._save_nodes() return False def _check_node_wrapper(self, node_id: str): try: self.check_node(node_id) except Exception as e: logger.warning(f"检查异常 {node_id}: {e}") def check_all_nodes(self, max_workers: int = 20): """并发检查所有节点""" node_ids = list(self.nodes.keys()) if not node_ids: return logger.info(f"开始检查 {len(node_ids)} 个节点, 并发={max_workers}...") from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=min(max_workers, len(node_ids))) as pool: pool.map(self._check_node_wrapper, node_ids) logger.info("检查完成") def _is_port_alive(self, port: int) -> bool: try: s = socket.create_connection(("127.0.0.1", port), timeout=1) s.close() return True except Exception: return False def get_random_available(self) -> Optional[ProxyNode]: """获取随机可用节点,返回前快速验证端口是否存活""" with self.lock: available_nodes = [n for n in self.nodes.values() if n.available and n.socks_port > 0] if not available_nodes: available_nodes = [n for n in self.nodes.values() if n.socks_port > 0 and '0.0.0.0' not in n.address] if not available_nodes: return None # 打乱顺序,逐个验证端口存活 random.shuffle(available_nodes) for node in available_nodes: if self._is_port_alive(node.socks_port): return node return None def get_all_nodes(self) -> List[ProxyNode]: """获取所有节点""" with self.lock: return list(self.nodes.values()) def delete_node(self, node_id: str) -> bool: """删除节点""" with self.lock: if node_id not in self.nodes: return False subscription_id = self.nodes[node_id].subscription_id self.stop_xray(node_id) del self.nodes[node_id] if subscription_id and subscription_id in self.subscriptions: sub = self.subscriptions[subscription_id] sub.node_ids = [nid for nid in sub.node_ids if nid != node_id] self._save_subscriptions() self._save_nodes() return True def cleanup(self): """清理所有进程""" for node_id in list(self.xray_processes.keys()): self.stop_xray(node_id) class HealthChecker: """健康检查调度器""" def __init__(self, manager: ProxyManager, interval: int = 600): self.manager = manager self.interval = interval # 默认5分钟 self.running = False self.thread = None def start(self): """启动定时检查""" if self.running: return self.running = True self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start() logger.info(f"健康检查已启动,间隔 {self.interval} 秒") def stop(self): """停止定时检查""" self.running = False if self.thread: self.thread.join(timeout=5) logger.info("健康检查已停止") def _run(self): """运行检查循环""" while self.running: try: self.manager.check_all_nodes() except Exception as e: logger.error(f"健康检查异常: {e}") # 等待下次检查 for _ in range(self.interval): if not self.running: break time.sleep(1) class SubscriptionSyncer: """订阅自动同步调度器,每60秒检查一次,到期的订阅自动同步""" def __init__(self, manager: ProxyManager): self.manager = manager self.running = False self.thread = None def start(self): if self.running: return self.running = True self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start() logger.info("订阅自动同步已启动") def stop(self): self.running = False if self.thread: self.thread.join(timeout=5) logger.info("订阅自动同步已停止") def _run(self): while self.running: try: self._check_and_sync() except Exception as e: logger.error(f"订阅自动同步异常: {e}") for _ in range(60): if not self.running: break time.sleep(1) def _check_and_sync(self): now = datetime.now() for sub in list(self.manager.get_all_subscriptions()): if sub.sync_interval <= 0: continue if not sub.last_sync: continue try: last = datetime.fromisoformat(sub.last_sync) elapsed_min = (now - last).total_seconds() / 60 if elapsed_min >= sub.sync_interval: logger.info(f"自动同步订阅: {sub.name}") self.manager.sync_subscription(sub.id) except Exception: pass # Flask应用 app = Flask(__name__) manager = ProxyManager() checker = HealthChecker(manager) syncer = SubscriptionSyncer(manager) @app.route('/') def index(): """首页""" return '''
填写订阅地址,系统会抓取并同步其中的 vmess 链接
创建专属节点池,通过关键字搜索快速选择节点,然后通过池的随机接口获取代理
支持 vmess://, vless://, trojan://, ss:// 格式,每行一个
全局随机获取 socks5:
访问此链接将随机返回一个可用的 socks5 代理信息
节点池随机获取 socks5:
将 <pool_id> 替换为节点池 ID,可从下方节点池列表复制