import subprocess import time import re class WifiManager: def __init__(self, interface="wlan0", backend="nmcli"): self.interface = interface self.backend = backend # "nmcli" or "wpa_cli" def _run_wpa_cli(self, *args): """执行 wpa_cli 命令""" cmd = ["wpa_cli", "-i", self.interface] + list(args) try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) return result.stdout.strip() except subprocess.CalledProcessError as e: print(f"执行 wpa_cli {args} 失败: {e.stderr}") return "" def _run_nmcli(self, *args): """执行 nmcli 命令""" cmd = ["nmcli"] + list(args) try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) return result.stdout.strip() except subprocess.CalledProcessError as e: print(f"执行 nmcli {args} 失败: {e.stderr}") return "" def list_saved_networks(self): if self.backend == "wpa_cli": output = self._run_wpa_cli("list_networks") networks = [] lines = output.splitlines() if len(lines) > 1: for line in lines[1:]: parts = line.split('\t') if len(parts) >= 4: networks.append({ "network_id": parts[0], "ssid": parts[1], "bssid": parts[2], "flags": parts[3] }) return networks else: output = self._run_nmcli("-t", "-f", "UUID,NAME,TYPE", "connection", "show") networks = [] for line in output.splitlines(): parts = line.split(':') # UUID:NAME:TYPE if len(parts) >= 3 and "wireless" in parts[-1]: networks.append({ "network_id": parts[0], "ssid": parts[1], "bssid": "", "flags": parts[-1] }) return networks def scan_networks(self): if self.backend == "wpa_cli": self._run_wpa_cli("scan") time.sleep(3) output = self._run_wpa_cli("scan_results") networks = [] lines = output.splitlines() if len(lines) > 1: for line in lines[1:]: parts = line.split('\t') if len(parts) >= 5: networks.append({ "bssid": parts[0], "frequency": parts[1], "signal_level": parts[2], "flags": parts[3], "ssid": parts[4] }) return networks else: output = self._run_nmcli("-t", "-m", "multiline", "-f", "BSSID,FREQ,SIGNAL,SECURITY,SSID", "device", "wifi", "list", "--rescan", "yes") networks = [] current = {} for line in output.splitlines(): if ':' in line: k, v = line.split(':', 1) current[k] = v if k == "SSID": networks.append({ "bssid": current.get("BSSID", ""), "frequency": current.get("FREQ", "").replace(" MHz", ""), "signal_level": current.get("SIGNAL", ""), "flags": current.get("SECURITY", ""), "ssid": current.get("SSID", "") }) current = {} return networks def connect_wifi(self, ssid, password=None): if self.backend == "wpa_cli": network_id = self._run_wpa_cli("add_network") if not network_id.isdigit(): return False self._run_wpa_cli("set_network", network_id, "ssid", f'"{ssid}"') if password: self._run_wpa_cli("set_network", network_id, "psk", f'"{password}"') else: self._run_wpa_cli("set_network", network_id, "key_mgmt", "NONE") self._run_wpa_cli("enable_network", network_id) self._run_wpa_cli("select_network", network_id) self._run_wpa_cli("save_config") return True else: args = ["device", "wifi", "connect", ssid, "ifname", self.interface] if password: args.extend(["password", password]) res = self._run_nmcli(*args) return "successfully" in res.lower() or "成功" in res def connect_eap(self, ssid, identity, password): if self.backend == "wpa_cli": network_id = self._run_wpa_cli("add_network") if not network_id.isdigit(): return False self._run_wpa_cli("set_network", network_id, "ssid", f'"{ssid}"') self._run_wpa_cli("set_network", network_id, "key_mgmt", "WPA-EAP") self._run_wpa_cli("set_network", network_id, "eap", "PEAP") self._run_wpa_cli("set_network", network_id, "phase2", '"auth=MSCHAPV2"') self._run_wpa_cli("set_network", network_id, "identity", f'"{identity}"') self._run_wpa_cli("set_network", network_id, "password", f'"{password}"') self._run_wpa_cli("enable_network", network_id) self._run_wpa_cli("select_network", network_id) self._run_wpa_cli("save_config") return True else: res = self._run_nmcli("con", "add", "type", "wifi", "con-name", ssid, "ifname", self.interface, "ssid", ssid, "--", "802-11-wireless-security.key-mgmt", "wpa-eap", "802-1x.eap", "peap", "802-1x.phase2-auth", "mschapv2", "802-1x.identity", identity, "802-1x.password", password) if "successfully" in res.lower() or "成功" in res: self._run_nmcli("con", "up", ssid) return True return False def connect_network_id(self, network_id): if self.backend == "wpa_cli": ret1 = self._run_wpa_cli("select_network", str(network_id)) self._run_wpa_cli("save_config") return bool(ret1) else: res = self._run_nmcli("connection", "up", "uuid", str(network_id)) return "successfully" in res.lower() or "成功" in res def remove_network(self, network_id): if self.backend == "wpa_cli": self._run_wpa_cli("remove_network", str(network_id)) self._run_wpa_cli("save_config") else: self._run_nmcli("connection", "delete", "uuid", str(network_id)) def get_current_status(self): if self.backend == "wpa_cli": output = self._run_wpa_cli("status") status = {} for line in output.splitlines(): if '=' in line: key, val = line.split('=', 1) status[key] = val return status else: output = self._run_nmcli("-t", "-m", "multiline", "-f", "GENERAL.STATE,GENERAL.CONNECTION,IP4.ADDRESS", "device", "show", self.interface) status = {"wpa_state": "DISCONNECTED", "ssid": "", "signal_level": None} for line in output.splitlines(): if line.startswith("GENERAL.STATE:"): state_val = line.split(':', 1)[1] if "connected" in state_val.lower(): status["wpa_state"] = "COMPLETED" elif line.startswith("GENERAL.CONNECTION:"): conn_val = line.split(':', 1)[1] if conn_val: status["ssid"] = conn_val elif line.startswith("IP4.ADDRESS[1]:"): ip_val = line.split(':', 1)[1] status["ip_address"] = ip_val.split('/')[0] # 额外获取当前连接网络的信号强度(nmcli device wifi 才有 SIGNAL 字段) if status["wpa_state"] == "COMPLETED": wifi_out = self._run_nmcli("-t", "-f", "SSID,SIGNAL,IN-USE", "device", "wifi") for line in wifi_out.splitlines(): parts = line.split(':') if len(parts) >= 3 and parts[2] == '*': status["signal_level"] = parts[1] # SIGNAL 字段(百分比 0-100) break return status def open_hotspot(self, ssid, password, channel=6): if self.backend == "wpa_cli": network_id = self._run_wpa_cli("add_network") if not network_id.isdigit(): return False self._run_wpa_cli("set_network", network_id, "ssid", f'"{ssid}"') self._run_wpa_cli("set_network", network_id, "mode", "2") self._run_wpa_cli("set_network", network_id, "key_mgmt", "WPA-PSK") self._run_wpa_cli("set_network", network_id, "psk", f'"{password}"') self._run_wpa_cli("set_network", network_id, "frequency", str(2412 + (channel - 1) * 5)) self._run_wpa_cli("select_network", network_id) self._run_wpa_cli("save_config") return network_id else: self._run_nmcli("device", "wifi", "hotspot", "ifname", self.interface, "con-name", ssid, "ssid", ssid, "band", "bg", "channel", str(channel), "password", password) networks = self.list_saved_networks() for net in networks: if net.get("ssid") == ssid: return net.get("network_id") return ssid def close_hotspot(self, network_id=None): if self.backend == "wpa_cli": if network_id is not None: self._run_wpa_cli("remove_network", str(network_id)) self._run_wpa_cli("reconfigure") self._run_wpa_cli("save_config") else: if network_id is not None: self._run_nmcli("connection", "down", str(network_id)) self._run_nmcli("connection", "delete", str(network_id)) if __name__ == "__main__": wifi = WifiManager("wlan0", backend="nmcli") print("Current status:", wifi.get_current_status()) print("Saved networks:", wifi.list_saved_networks())