gcode预览测试

This commit is contained in:
2026-05-14 20:21:16 +08:00
parent 65f221a5d8
commit 837996c436
17 changed files with 1363 additions and 296 deletions

View File

@@ -17,58 +17,58 @@ class AIOPrrintSystemAPI:
def get_status(self):
# test_data = {
# 'job': {
# 'job': {
# 'estimatedPrintTime': 1234,
# 'filament': {'length': 765, 'volume': 24356},
# 'file': {'display_name': 'Test File','date': None, 'name': '20260414135441_42bff5215c6148b8b5f4d8c4f15d5ddc.gcode', 'origin': 'local', 'path': None, 'size': 1468987},
# 'lastPrintTime': None,
# 'user': None
# },
# 'progress': {
# 'completion': 74.8,
# 'filepos': 1234,
# 'printTime': 1235,
# 'printTimeLeft': 6353,
# 'printTimeLeftOrigin': 5366
# },
# 'state': 'Operational'
# },
# 'status': {
# 'sd': {'ready': False},
# 'state': {
# 'error': '',
# 'flags': {
# 'cancelling': False,
# 'closedOrError': False,
# 'error': False,
# 'finishing': False,
# 'operational': True,
# 'paused': False,
# 'pausing': False,
# 'printing': False,
# 'ready': True,
# 'resuming': False,
# 'sdReady': False
# },
# 'text': 'Operational test'
# },
# 'temperature': {
# 'bed': {'actual': 85, 'offset': 0, 'target': 56},
# 'tool0': {'actual': 0.0, 'offset': 0, 'target': 340}
# }
# }
# }
# return test_data
test_data = {
'job': {
'job': {
'estimatedPrintTime': 1234,
'filament': {'length': 765, 'volume': 24356},
'file': {'display_name': 'Test File','date': None, 'name': '20260508141659_085359c9908947bebcaa0fe7490641e8.gcode', 'origin': 'local', 'path': None, 'size': 1468987},
'lastPrintTime': None,
'user': None
},
'progress': {
'completion': 74.8,
'filepos': 1234,
'printTime': 1235,
'printTimeLeft': 6353,
'printTimeLeftOrigin': 5366
},
'state': 'Operational'
},
'status': {
'sd': {'ready': False},
'state': {
'error': '',
'flags': {
'cancelling': False,
'closedOrError': False,
'error': False,
'finishing': False,
'operational': True,
'paused': False,
'pausing': False,
'printing': False,
'ready': True,
'resuming': False,
'sdReady': False
},
'text': 'Operational test'
},
'temperature': {
'bed': {'actual': 85, 'offset': 0, 'target': 56},
'tool0': {'actual': 0.0, 'offset': 0, 'target': 340}
}
}
}
return test_data
url = f"{self.api_url}/status"
try:
r = requests.get(url, headers=self.headers, timeout=5)
r.raise_for_status()
return r.json()
except:
return {"status": {}, "job": {}}
# url = f"{self.api_url}/status"
# try:
# r = requests.get(url, headers=self.headers, timeout=5)
# r.raise_for_status()
# return r.json()
# except:
# return {"status": {}, "job": {}}
def pause_print(self):
return self._post_action("pause_print", action="pause")
@@ -83,7 +83,7 @@ class AIOPrrintSystemAPI:
return self._post_action("auto_leveling")
def send_gcode(self, gcode):
return self._post_action("send_gcode", gcode=gcode)
return self._post_action("send_gcode", commands=gcode)
def off_motors(self):
return self.send_gcode("M84")

View File

@@ -5,6 +5,7 @@ from PyQt6.QtCore import QTimer
class AutoFanStatus:
def __init__(self, update_interval_ms=1000):
self.cpu_temp = 0.0
self.cpu_load = 0.0 # 1 分钟 CPU 负载
self.fan_speed = 0
self.fan_state = "Unknown"
self.fan_rpm = 0
@@ -37,4 +38,12 @@ class AutoFanStatus:
self.fan_speed = 0
self.fan_state = "Unknown"
self.fan_rpm = 0
self.is_auto_fan_service_running = False
self.is_auto_fan_service_running = False
# 读取 CPU 负载(始终执行)
try:
with open("/proc/loadavg", "r") as f:
fields = f.read().split()
self.cpu_load = float(fields[0]) # 1 分钟平均负载
except (OSError, IndexError, ValueError):
self.cpu_load = 0.0

View File

@@ -42,8 +42,6 @@ class ConfigParse(QObject):
self._last_mtime = mtime
except OSError:
return
print("Config changed")
old_config = self.config
new_config = self._load_config()
@@ -68,4 +66,5 @@ class ConfigParse(QObject):
self.gcode_dir = self.config.get("gcode_dir", None)
self.move_axis_area = self.config.get("move_axis_area", None)
self.move_max_speed = self.config.get("move_max_speed", {"x": 3000, "y": 3000, "z": 200})
self.home_positions = self.config.get("home_positions", {"x": 134, "y": 123, "z": 10})
self.move_max_speed = self.config.get("move_max_speed", None)

574
utils/gcode_viewer.py Normal file
View File

@@ -0,0 +1,574 @@
import numpy as np
from PyQt6.QtOpenGLWidgets import QOpenGLWidget
from PyQt6.QtCore import Qt, QEvent, QThread, pyqtSignal
from PyQt6.QtGui import QTouchEvent, QSurfaceFormat
from PyQt6.QtOpenGL import QOpenGLShaderProgram, QOpenGLShader, QOpenGLBuffer
class GCodeParseWorker(QThread):
finished = pyqtSignal(dict)
def __init__(self, filepath, type_map, default_colors, parent=None):
super().__init__(parent)
self.filepath = filepath
self.TYPE_MAP = type_map
self.DEFAULT_COLORS = default_colors
def run(self):
points = []
colors = []
type_segments = {}
type_visibility = {}
x = y = z = e = 0.0
vertex_idx = 0
feature_type = 'OTHER'
current_segment_type = 'OTHER'
segment_start = 0
type_visibility['TRAVEL'] = False
relative_e = False
min_x = min_y = min_z = float('inf')
max_x = max_y = max_z = float('-inf')
current_offset = 0
max_z_seen = -999.0
layer_map = [(0, 0)]
def add_segment(t_name, start, length):
if length > 0:
type_segments.setdefault(t_name, []).append((start, length))
try:
with open(self.filepath, 'rb') as f:
for line_bytes in f:
current_offset += len(line_bytes)
line = line_bytes.decode('utf-8', errors='ignore').strip()
if not line:
continue
if line.startswith('M82'):
relative_e = False
continue
if line.startswith('M83'):
relative_e = True
continue
if line.startswith(';'):
if line.startswith(';TYPE:'):
raw_type = line.split(':', 1)[1].strip()
else:
raw_type = line[1:].strip()
if raw_type not in self.TYPE_MAP and not any(k in raw_type.lower() for k in ('perimeter', 'infill', 'material', 'skirt/brim')):
continue
if 'Skirt/Brim' in raw_type:
raw_type = 'Skirt'
new_type = self.TYPE_MAP.get(raw_type, 'OTHER')
if new_type != feature_type:
feature_type = new_type
if feature_type not in type_visibility:
type_visibility[feature_type] = True
continue
if line.startswith(('G0', 'G1')):
new_x, new_y, new_z = x, y, z
e_val = 0.0
has_e = False
parts = line.split(';')[0].split()
for p in parts[1:]:
try:
if p.startswith('X'): new_x = float(p[1:])
elif p.startswith('Y'): new_y = float(p[1:])
elif p.startswith('Z'): new_z = float(p[1:])
elif p.startswith('E'):
e_val = float(p[1:])
has_e = True
except ValueError:
pass
if new_x == x and new_y == y and new_z == z:
if has_e:
if relative_e: e += e_val
else: e = e_val
continue
is_extrusion = False
if has_e:
if relative_e:
is_extrusion = e_val > 0
new_e = e + e_val
else:
is_extrusion = e_val > e
new_e = e_val
else:
new_e = e
if is_extrusion:
seg_type = feature_type
c = self.DEFAULT_COLORS.get(feature_type, self.DEFAULT_COLORS['OTHER'])
else:
seg_type = 'TRAVEL'
c = self.DEFAULT_COLORS['TRAVEL']
if 'TRAVEL' not in type_visibility:
type_visibility['TRAVEL'] = False
if seg_type != current_segment_type:
if vertex_idx > segment_start:
add_segment(current_segment_type, segment_start, vertex_idx - segment_start)
current_segment_type = seg_type
segment_start = vertex_idx
if new_x < min_x: min_x = new_x
if new_x > max_x: max_x = new_x
if new_y < min_y: min_y = new_y
if new_y > max_y: max_y = new_y
if new_z < min_z: min_z = new_z
if new_z > max_z: max_z = new_z
if new_z > max_z_seen and is_extrusion:
max_z_seen = new_z
layer_map.append((current_offset, vertex_idx))
points.extend([x, y, z, new_x, new_y, new_z])
colors.extend([*c, *c])
vertex_idx += 2
x, y, z, e = new_x, new_y, new_z, new_e
elif line.startswith('G92'):
parts = line.split(';')[0].split()
for p in parts[1:]:
try:
if p.startswith('E'): e = float(p[1:])
elif p.startswith('X'): x = float(p[1:])
elif p.startswith('Y'): y = float(p[1:])
elif p.startswith('Z'): z = float(p[1:])
except ValueError:
pass
if vertex_idx > segment_start:
add_segment(current_segment_type, segment_start, vertex_idx - segment_start)
cx = (min_x + max_x) / 2.0 if max_x >= min_x else 110.0
cy = (min_y + max_y) / 2.0 if max_y >= min_y else 110.0
cz = (min_z + max_z) / 2.0 if max_z >= min_z else 0.0
result = {
'vertices': np.array(points, dtype=np.float32) if vertex_idx > 0 else np.zeros((0,), dtype=np.float32),
'colors': np.array(colors, dtype=np.float32) if vertex_idx > 0 else np.zeros((0,), dtype=np.float32),
'vertex_count': vertex_idx,
'center_x': cx,
'center_y': cy,
'center_z': cz,
'type_segments': type_segments,
'type_visibility': type_visibility,
'layer_map': layer_map
}
self.finished.emit(result)
except Exception as e:
print("ParseGCode Error:", e)
self.finished.emit({})
class GCodeViewerWidget(QOpenGLWidget):
"""
3D G-code 预览控件OpenGL ES / eglfs 兼容版)
使用可编程管线替代固定管线,适配树莓派 4B
"""
# PrusaSlicer 类型映射、颜色定义等保持不变
TYPE_MAP = {
'External perimeter': 'WALL-OUTER',
'Perimeter': 'WALL-INNER',
'Overhang perimeter': 'WALL-OUTER',
'Solid infill': 'SKIN',
'Top solid infill': 'SKIN',
'Bridge infill': 'SKIN',
'Internal infill': 'FILL',
'Support material': 'SUPPORT',
'Support material interface': 'SUPPORT-INTERFACE',
'Skirt': 'SKIRT',
'Brim': 'SKIRT',
'Custom': 'OTHER',
}
DEFAULT_COLORS = {
'WALL-OUTER': (0.92, 0.55, 0.22), # 0xeb8b38
'WALL-INNER': (0.25, 0.50, 0.81), # 0x4080cf
'FILL': (0.80, 0.75, 0.29), # 0xccc04b
'SKIN': (0.62, 0.38, 0.70), # 0x9e60b3
'SUPPORT': (0.34, 0.70, 0.34), # 0x57b357
'SUPPORT-INTERFACE': (0.17, 0.42, 0.17), # 0x2b6b2b
'SKIRT': (0.00, 1.00, 1.00), # 0x00ffff
'OTHER': (0.67, 0.67, 0.67), # 0xaaaaaa
'TRAVEL': (0.25, 0.31, 0.38), # 0x405060
}
# 顶点着色器GLSL ES 1.00
VERTEX_SHADER = """
attribute vec3 aPos;
attribute vec3 aColor;
varying vec3 vColor;
uniform mat4 uMVP;
void main() {
gl_Position = uMVP * vec4(aPos, 1.0);
vColor = aColor;
}
"""
# 片段着色器(添加可调节颜色深度的 uniform 以实现边界加深)
FRAGMENT_SHADER = """
precision mediump float;
varying vec3 vColor;
uniform float uDarken;
void main() {
gl_FragColor = vec4(vColor * uDarken, 1.0);
}
"""
def __init__(self, parent=None):
# 请求 OpenGL ES 2.0 上下文
fmt = QSurfaceFormat()
fmt.setRenderableType(QSurfaceFormat.RenderableType.OpenGLES)
fmt.setVersion(2, 0)
super().__init__(parent)
self.setMinimumSize(400, 300)
self.setAttribute(Qt.WidgetAttribute.WA_AcceptTouchEvents, True)
self.setFormat(fmt)
# 数据缓冲区
self.vertices = None
self.colors = None
self.vertex_count = 0
self.vbo_vertices = None
self.vbo_colors = None
self.vbo_ready = False
# 类型分段
self.type_segments = {}
self.type_visibility = {}
self.current_type = 'OTHER'
# 视角参数
self.view_rot_x = -60.0
self.view_rot_z = 45.0
self.view_zoom = -250.0 # 稍微退后一点以便看到整个打印平台
self.view_trans_x = 0.0
self.view_trans_y = 0.0
# ...
self.progress_ratio = 1.0
self.progress_vertices = 0
# 按需渲染:缓存当前的 filepos 对应的顶点数
self.layer_map = [(0, 0)] # (offset_bytes, vertex_idx)
self.last_reported_filepos = -1
# 模型中心点
self.center_x = 110.0
self.center_y = 110.0
self.center_z = 0.0
# 触摸状态
self.last_mouse_pos = None
self._touch_points = {}
self._pinch_start_dist = 0.0
self._pinch_start_zoom = 0.0
self._pinch_start_center = None
self._pinch_start_trans = (0.0, 0.0)
self._ignore_wheel = False
# 着色器程序
self.shader_program = None
self.aPos_location = None
self.aColor_location = None
self.uMVP_location = None
# ── 公开接口 ──
def load_gcode(self, filepath: str):
if hasattr(self, '_worker') and self._worker.isRunning():
self._worker.terminate()
self._worker.wait()
self._worker = GCodeParseWorker(filepath, self.TYPE_MAP, self.DEFAULT_COLORS)
self._worker.finished.connect(self._on_parse_finished)
self._worker.start()
def _on_parse_finished(self, result: dict):
if not result:
return
self.vertices = result['vertices']
self.colors = result['colors']
self.vertex_count = result['vertex_count']
self.center_x = result['center_x']
self.center_y = result['center_y']
self.center_z = result['center_z']
self.type_segments = result['type_segments']
self.type_visibility = result['type_visibility']
self.layer_map = result['layer_map']
self.vbo_ready = False
# 初始时先不显示,让 update_by_filepos 决定或者如果是0则自动更新
# 这里把 progress_vertices 赋予当前的 target
target = 0
if self.layer_map:
target = self.layer_map[-1][1] # 全显
self.progress_vertices = target
self.last_reported_filepos = -1
self.update()
def update_processes(self, progress: float):
pass
def update_by_filepos(self, filepos: int, is_printing: bool = True):
import bisect
if not hasattr(self, 'layer_map') or not self.layer_map:
return
if not is_printing:
# 不在打印途中时,渲染包含所有顶点(完整模型)
target_vertices = self.vertex_count
else:
# 使用二分查找快速定位当前 filepos 对应的最多展示顶点数
keys = [item[0] for item in self.layer_map]
idx = bisect.bisect_right(keys, filepos)
target_vertices = 0 if idx == 0 else self.layer_map[idx-1][1]
# 按需渲染:只有当层级(计算出的可见顶点数)真正发生跳变时才调用 update()
if target_vertices != getattr(self, 'progress_vertices', -1):
self.progress_vertices = target_vertices
self.update()
def update_switch(self, type_name: str, visible: bool):
if type_name in self.type_visibility:
self.type_visibility[type_name] = visible
self.update()
def set_view_angles(self, rot_x: float, rot_z: float, zoom: float = None):
self.view_rot_x = max(-90.0, min(0.0, rot_x))
self.view_rot_z = rot_z
if zoom is not None:
self.view_zoom = zoom
self.update()
if length == 0:
return
self.type_segments.setdefault(type_name, []).append((start, length))
# ── OpenGL 可编程管线初始化 ──
def initializeGL(self):
import OpenGL.GL as gl
gl.glClearColor(0.15, 0.15, 0.15, 1.0)
gl.glEnable(gl.GL_DEPTH_TEST)
gl.glLineWidth(1.0)
# 编译着色器
self.shader_program = QOpenGLShaderProgram()
self.shader_program.addShaderFromSourceCode(QOpenGLShader.ShaderTypeBit.Vertex, self.VERTEX_SHADER)
self.shader_program.addShaderFromSourceCode(QOpenGLShader.ShaderTypeBit.Fragment, self.FRAGMENT_SHADER)
self.shader_program.link()
# 获取属性和 uniform 位置
self.aPos_location = self.shader_program.attributeLocation("aPos")
self.aColor_location = self.shader_program.attributeLocation("aColor")
self.uMVP_location = self.shader_program.uniformLocation("uMVP")
self.uDarken_location = self.shader_program.uniformLocation("uDarken")
# 创建缓冲对象
self.vbo_vertices = QOpenGLBuffer(QOpenGLBuffer.Type.VertexBuffer)
self.vbo_colors = QOpenGLBuffer(QOpenGLBuffer.Type.VertexBuffer)
def resizeGL(self, w, h):
import OpenGL.GL as gl
gl.glViewport(0, 0, w, h)
# ── 构建 MVP 矩阵(替代 glOrtho/glRotate ──
def _build_mvp(self):
from PyQt6.QtGui import QMatrix4x4
mat = QMatrix4x4()
aspect = self.width() / self.height() if self.height() else 1
mat.perspective(45.0, aspect, 1.0, 1000.0)
mat.translate(self.view_trans_x, self.view_trans_y, self.view_zoom)
mat.rotate(self.view_rot_x, 1.0, 0.0, 0.0)
mat.rotate(self.view_rot_z, 0.0, 0.0, 1.0)
mat.translate(-self.center_x, -self.center_y, -self.center_z)
return mat
# ── 渲染 ──
def paintGL(self):
import OpenGL.GL as gl
gl.glClear(gl.GL_COLOR_BUFFER_BIT | gl.GL_DEPTH_BUFFER_BIT)
if self.vertices is None or self.vertex_count == 0:
return
if not self.vbo_ready:
self._create_vbos()
self.vbo_ready = True
# 使用着色器程序
self.shader_program.bind()
# 设置 MVP 矩阵
import math
from PyQt6.QtGui import QMatrix4x4
mvp_mat = self._build_mvp()
self.shader_program.setUniformValue(self.uMVP_location, mvp_mat)
# 绑定 VBO
self.vbo_vertices.bind()
self.shader_program.setAttributeBuffer(self.aPos_location, gl.GL_FLOAT, 0, 3, 0)
self.shader_program.enableAttributeArray(self.aPos_location)
self.vbo_colors.bind()
self.shader_program.setAttributeBuffer(self.aColor_location, gl.GL_FLOAT, 0, 3, 0)
self.shader_program.enableAttributeArray(self.aColor_location)
# 允许 z-fighting 覆盖,用于同一位置多次渲染线条
gl.glDepthFunc(gl.GL_LEQUAL)
# 渲染两次:一次绘制加粗加深的边界底线,一次绘制正常宽度的原色骨架线
# 在树莓派等性能有限的平台上使用真实的3D圆柱/方块代替线条会导致顶点数暴增十几倍直接卡顿,
# 因此通过动态加粗像素级线宽来性能无损地模拟出“体积感”)
for pass_idx in range(2):
if pass_idx == 0:
gl.glLineWidth(6.0) # 底线宽度(加大以模拟体积轮廓)
self.shader_program.setUniformValue(self.uDarken_location, 0.8) # 加深颜色至 40% 亮度
else:
gl.glLineWidth(3.0) # 主体宽度(加大以模拟线条厚度)
self.shader_program.setUniformValue(self.uDarken_location, 1.0) # 保持原色
# 按类型分段绘制
for type_name, segments in self.type_segments.items():
if not self.type_visibility.get(type_name, True):
continue
for start, length in segments:
if start >= self.progress_vertices:
continue
end = start + length
visible_start = start
visible_count = length
if end > self.progress_vertices:
visible_count = self.progress_vertices - start
if visible_count > 0:
gl.glDrawArrays(gl.GL_LINES, visible_start, visible_count)
# 恢复默认深度测试模式
gl.glDepthFunc(gl.GL_LESS)
self.shader_program.disableAttributeArray(self.aPos_location)
self.shader_program.disableAttributeArray(self.aColor_location)
self.vbo_vertices.release()
self.vbo_colors.release()
self.shader_program.release()
def _create_vbos(self):
if self.vbo_vertices.isCreated():
self.vbo_vertices.destroy()
if self.vbo_colors.isCreated():
self.vbo_colors.destroy()
self.vbo_vertices.create()
self.vbo_vertices.bind()
self.vbo_vertices.allocate(self.vertices.tobytes(), self.vertices.nbytes)
self.vbo_vertices.release()
self.vbo_colors.create()
self.vbo_colors.bind()
self.vbo_colors.allocate(self.colors.tobytes(), self.colors.nbytes)
self.vbo_colors.release()
# ── 触摸/鼠标交互(完全不变) ──
def mousePressEvent(self, event):
self.last_mouse_pos = event.position()
def mouseMoveEvent(self, event):
if self.last_mouse_pos is None:
return
dx = event.position().x() - self.last_mouse_pos.x()
dy = event.position().y() - self.last_mouse_pos.y()
if event.buttons() & Qt.MouseButton.LeftButton:
self.view_rot_x += dy * 0.5
self.view_rot_x = max(-90.0, min(0.0, self.view_rot_x)) # 限制垂直视角的翻转
self.view_rot_z += dx * 0.5
self.last_mouse_pos = event.position()
self.update()
def wheelEvent(self, event):
delta = event.angleDelta().y() / 120
self.view_zoom += delta * 10
self.update()
def event(self, e):
if e.type() in (QEvent.Type.TouchBegin, QEvent.Type.TouchUpdate, QEvent.Type.TouchEnd):
self._ignore_wheel = True
self.touchEvent(e)
return True
elif e.type() == QEvent.Type.Wheel:
if self._ignore_wheel:
self._ignore_wheel = False
return True
return super().event(e)
def touchEvent(self, event: QTouchEvent):
points = event.points()
if not points:
return
if event.type() == QEvent.Type.TouchEnd:
self._touch_points.clear()
self._pinch_start_center = None
self._pinch_start_dist = 0.0
event.accept()
return
if len(points) == 1:
p = points[0]
# 如果是刚检测到单指,或者从双指变回单指
if p.id() not in self._touch_points or self._pinch_start_center is not None:
self._touch_points.clear()
self._touch_points[p.id()] = p.position()
self._pinch_start_center = None
else:
last = self._touch_points[p.id()]
dx = p.position().x() - last.x()
dy = p.position().y() - last.y()
self.view_rot_x += dy * 0.5
self.view_rot_x = max(-90.0, min(0.0, self.view_rot_x)) # 限制垂直视角的翻转
self.view_rot_z += dx * 0.5
self._touch_points[p.id()] = p.position()
self.update()
elif len(points) == 2:
p1, p2 = points[0], points[1]
dx_p = p1.position().x() - p2.position().x()
dy_p = p1.position().y() - p2.position().y()
dist = (dx_p**2 + dy_p**2)**0.5
center_x = (p1.position().x() + p2.position().x()) / 2.0
center_y = (p1.position().y() + p2.position().y()) / 2.0
# 初始化双指状态
if self._pinch_start_center is None:
self._pinch_start_dist = dist
self._pinch_start_zoom = self.view_zoom
self._pinch_start_center = (center_x, center_y)
self._pinch_start_trans = (self.view_trans_x, self.view_trans_y)
self._touch_points.clear() # 清除单指记录
else:
# 缩放 (双指捏合)
if self._pinch_start_dist > 0:
scale = dist / self._pinch_start_dist
self.view_zoom = self._pinch_start_zoom * (1 / scale)
# 平移 (双指并行移动)
dcx = center_x - self._pinch_start_center[0]
dcy = center_y - self._pinch_start_center[1]
pan_speed = abs(self.view_zoom) * 0.002
self.view_trans_x = self._pinch_start_trans[0] + dcx * pan_speed
self.view_trans_y = self._pinch_start_trans[1] - dcy * pan_speed
self.update()
event.accept()

View File

@@ -3,8 +3,9 @@ import time
import re
class WifiManager:
def __init__(self, interface="wlan0"):
def __init__(self, interface="wlan0", backend="nmcli"):
self.interface = interface
self.backend = backend # "nmcli" or "wpa_cli"
def _run_wpa_cli(self, *args):
"""执行 wpa_cli 命令"""
@@ -16,128 +17,209 @@ class WifiManager:
print(f"执行 wpa_cli {args} 失败: {e.stderr}")
return ""
def _run_nmcli(self, *args):
"""执行 nmcli 命令"""
cmd = ["nmcli"] + list(args)
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
print(f"执行 nmcli {args} 失败: {e.stderr}")
return ""
def list_saved_networks(self):
"""列出已保存的 wifiwpa_cli list_networks 以 Tab 分隔)"""
output = self._run_wpa_cli("list_networks")
networks = []
lines = output.splitlines()
if len(lines) > 1:
for line in lines[1:]: # skip header
parts = line.split('\t')
if len(parts) >= 4:
if self.backend == "wpa_cli":
output = self._run_wpa_cli("list_networks")
networks = []
lines = output.splitlines()
if len(lines) > 1:
for line in lines[1:]:
parts = line.split('\t')
if len(parts) >= 4:
networks.append({
"network_id": parts[0],
"ssid": parts[1],
"bssid": parts[2],
"flags": parts[3]
})
return networks
else:
output = self._run_nmcli("-t", "-f", "UUID,NAME,TYPE", "connection", "show")
networks = []
for line in output.splitlines():
parts = line.split(':')
# UUID:NAME:TYPE
if len(parts) >= 3 and "wireless" in parts[-1]:
networks.append({
"network_id": parts[0],
"ssid": parts[1],
"bssid": parts[2],
"flags": parts[3]
"bssid": "",
"flags": parts[-1]
})
return networks
return networks
def scan_networks(self):
"""扫描范围内的 wifi"""
self._run_wpa_cli("scan")
time.sleep(3) # 等待扫描完成
output = self._run_wpa_cli("scan_results")
networks = []
lines = output.splitlines()
if len(lines) > 1:
for line in lines[1:]:
# bssid / frequency / signal level / flags / ssid
parts = line.split('\t')
if len(parts) >= 5:
networks.append({
"bssid": parts[0],
"frequency": parts[1],
"signal_level": parts[2],
"flags": parts[3],
"ssid": parts[4]
})
return networks
if self.backend == "wpa_cli":
self._run_wpa_cli("scan")
time.sleep(3)
output = self._run_wpa_cli("scan_results")
networks = []
lines = output.splitlines()
if len(lines) > 1:
for line in lines[1:]:
parts = line.split('\t')
if len(parts) >= 5:
networks.append({
"bssid": parts[0],
"frequency": parts[1],
"signal_level": parts[2],
"flags": parts[3],
"ssid": parts[4]
})
return networks
else:
output = self._run_nmcli("-t", "-m", "multiline", "-f", "BSSID,FREQ,SIGNAL,SECURITY,SSID", "device", "wifi", "list", "--rescan", "yes")
networks = []
current = {}
for line in output.splitlines():
if ':' in line:
k, v = line.split(':', 1)
current[k] = v
if k == "SSID":
networks.append({
"bssid": current.get("BSSID", ""),
"frequency": current.get("FREQ", "").replace(" MHz", ""),
"signal_level": current.get("SIGNAL", ""),
"flags": current.get("SECURITY", ""),
"ssid": current.get("SSID", "")
})
current = {}
return networks
def connect_wifi(self, ssid, password=None):
"""连接普通 Wi-Fi (WPA2-PSK)"""
network_id = self._run_wpa_cli("add_network")
if not network_id.isdigit():
return False
self._run_wpa_cli("set_network", network_id, "ssid", f'"{ssid}"')
if password:
self._run_wpa_cli("set_network", network_id, "psk", f'"{password}"')
if self.backend == "wpa_cli":
network_id = self._run_wpa_cli("add_network")
if not network_id.isdigit(): return False
self._run_wpa_cli("set_network", network_id, "ssid", f'"{ssid}"')
if password: self._run_wpa_cli("set_network", network_id, "psk", f'"{password}"')
else: self._run_wpa_cli("set_network", network_id, "key_mgmt", "NONE")
self._run_wpa_cli("enable_network", network_id)
self._run_wpa_cli("select_network", network_id)
self._run_wpa_cli("save_config")
return True
else:
self._run_wpa_cli("set_network", network_id, "key_mgmt", "NONE")
self._run_wpa_cli("enable_network", network_id)
self._run_wpa_cli("select_network", network_id)
self._run_wpa_cli("save_config")
return True
args = ["device", "wifi", "connect", ssid, "ifname", self.interface]
if password:
args.extend(["password", password])
res = self._run_nmcli(*args)
return "successfully" in res.lower() or "成功" in res
def connect_eap(self, ssid, identity, password):
"""连接企业级 Wi-Fi (WPA-EAP PEAP/MSCHAPv2)"""
network_id = self._run_wpa_cli("add_network")
if not network_id.isdigit():
if self.backend == "wpa_cli":
network_id = self._run_wpa_cli("add_network")
if not network_id.isdigit(): return False
self._run_wpa_cli("set_network", network_id, "ssid", f'"{ssid}"')
self._run_wpa_cli("set_network", network_id, "key_mgmt", "WPA-EAP")
self._run_wpa_cli("set_network", network_id, "eap", "PEAP")
self._run_wpa_cli("set_network", network_id, "phase2", '"auth=MSCHAPV2"')
self._run_wpa_cli("set_network", network_id, "identity", f'"{identity}"')
self._run_wpa_cli("set_network", network_id, "password", f'"{password}"')
self._run_wpa_cli("enable_network", network_id)
self._run_wpa_cli("select_network", network_id)
self._run_wpa_cli("save_config")
return True
else:
res = self._run_nmcli("con", "add", "type", "wifi", "con-name", ssid, "ifname", self.interface, "ssid", ssid,
"--", "802-11-wireless-security.key-mgmt", "wpa-eap", "802-1x.eap", "peap",
"802-1x.phase2-auth", "mschapv2", "802-1x.identity", identity, "802-1x.password", password)
if "successfully" in res.lower() or "成功" in res:
self._run_nmcli("con", "up", ssid)
return True
return False
self._run_wpa_cli("set_network", network_id, "ssid", f'"{ssid}"')
self._run_wpa_cli("set_network", network_id, "key_mgmt", "WPA-EAP")
self._run_wpa_cli("set_network", network_id, "eap", "PEAP")
self._run_wpa_cli("set_network", network_id, "phase2", '"auth=MSCHAPV2"')
self._run_wpa_cli("set_network", network_id, "identity", f'"{identity}"')
self._run_wpa_cli("set_network", network_id, "password", f'"{password}"')
self._run_wpa_cli("enable_network", network_id)
self._run_wpa_cli("select_network", network_id)
self._run_wpa_cli("save_config")
return True
def connect_network_id(self, network_id):
"""通过 network_id 连接已保存的网络"""
ret1 = self._run_wpa_cli("select_network", str(network_id))
self._run_wpa_cli("save_config")
return bool(ret1)
if self.backend == "wpa_cli":
ret1 = self._run_wpa_cli("select_network", str(network_id))
self._run_wpa_cli("save_config")
return bool(ret1)
else:
res = self._run_nmcli("connection", "up", "uuid", str(network_id))
return "successfully" in res.lower() or "成功" in res
def remove_network(self, network_id):
"""删除某个已保存的网络"""
self._run_wpa_cli("remove_network", str(network_id))
self._run_wpa_cli("save_config")
if self.backend == "wpa_cli":
self._run_wpa_cli("remove_network", str(network_id))
self._run_wpa_cli("save_config")
else:
self._run_nmcli("connection", "delete", "uuid", str(network_id))
def get_current_status(self):
"""获取当前网络状态"""
output = self._run_wpa_cli("status")
status = {}
for line in output.splitlines():
if '=' in line:
key, val = line.split('=', 1)
status[key] = val
return status
if self.backend == "wpa_cli":
output = self._run_wpa_cli("status")
status = {}
for line in output.splitlines():
if '=' in line:
key, val = line.split('=', 1)
status[key] = val
return status
else:
output = self._run_nmcli("-t", "-m", "multiline", "-f", "GENERAL.STATE,GENERAL.CONNECTION,IP4.ADDRESS", "device", "show", self.interface)
status = {"wpa_state": "DISCONNECTED", "ssid": "", "signal_level": None}
for line in output.splitlines():
if line.startswith("GENERAL.STATE:"):
state_val = line.split(':', 1)[1]
if "connected" in state_val.lower():
status["wpa_state"] = "COMPLETED"
elif line.startswith("GENERAL.CONNECTION:"):
conn_val = line.split(':', 1)[1]
if conn_val:
status["ssid"] = conn_val
elif line.startswith("IP4.ADDRESS[1]:"):
ip_val = line.split(':', 1)[1]
status["ip_address"] = ip_val.split('/')[0]
# 额外获取当前连接网络的信号强度nmcli device wifi 才有 SIGNAL 字段)
if status["wpa_state"] == "COMPLETED":
wifi_out = self._run_nmcli("-t", "-f", "SSID,SIGNAL,IN-USE", "device", "wifi")
for line in wifi_out.splitlines():
parts = line.split(':')
if len(parts) >= 3 and parts[2] == '*':
status["signal_level"] = parts[1] # SIGNAL 字段(百分比 0-100
break
return status
def open_hotspot(self, ssid, password, channel=6):
"""
开启热点 (AP 模式)
注意:要求网卡和 wpa_supplicant 均支持 AP 模式 (mode=2)
"""
network_id = self._run_wpa_cli("add_network")
if not network_id.isdigit():
return False
self._run_wpa_cli("set_network", network_id, "ssid", f'"{ssid}"')
self._run_wpa_cli("set_network", network_id, "mode", "2")
self._run_wpa_cli("set_network", network_id, "key_mgmt", "WPA-PSK")
self._run_wpa_cli("set_network", network_id, "psk", f'"{password}"')
self._run_wpa_cli("set_network", network_id, "frequency", str(2412 + (channel - 1) * 5))
self._run_wpa_cli("select_network", network_id)
self._run_wpa_cli("save_config")
return network_id
if self.backend == "wpa_cli":
network_id = self._run_wpa_cli("add_network")
if not network_id.isdigit(): return False
self._run_wpa_cli("set_network", network_id, "ssid", f'"{ssid}"')
self._run_wpa_cli("set_network", network_id, "mode", "2")
self._run_wpa_cli("set_network", network_id, "key_mgmt", "WPA-PSK")
self._run_wpa_cli("set_network", network_id, "psk", f'"{password}"')
self._run_wpa_cli("set_network", network_id, "frequency", str(2412 + (channel - 1) * 5))
self._run_wpa_cli("select_network", network_id)
self._run_wpa_cli("save_config")
return network_id
else:
self._run_nmcli("device", "wifi", "hotspot", "ifname", self.interface, "con-name", ssid, "ssid", ssid, "band", "bg", "channel", str(channel), "password", password)
networks = self.list_saved_networks()
for net in networks:
if net.get("ssid") == ssid:
return net.get("network_id")
return ssid
def close_hotspot(self, network_id=None):
"""关闭热点"""
if network_id is not None:
self._run_wpa_cli("remove_network", str(network_id))
self._run_wpa_cli("reconfigure")
self._run_wpa_cli("save_config")
if self.backend == "wpa_cli":
if network_id is not None:
self._run_wpa_cli("remove_network", str(network_id))
self._run_wpa_cli("reconfigure")
self._run_wpa_cli("save_config")
else:
if network_id is not None:
self._run_nmcli("connection", "down", str(network_id))
self._run_nmcli("connection", "delete", str(network_id))
if __name__ == "__main__":
# Example Usage
wifi = WifiManager("wlan0")
wifi = WifiManager("wlan0", backend="nmcli")
print("Current status:", wifi.get_current_status())
print("Saved networks:", wifi.list_saved_networks())