整理文件夹及架构,加入打印机页面,octo反代有问题
This commit is contained in:
163
app/utils/conf_parse.py
Normal file
163
app/utils/conf_parse.py
Normal file
@@ -0,0 +1,163 @@
|
||||
import configparser
|
||||
import json
|
||||
import copy
|
||||
import math
|
||||
|
||||
class ConfParse:
|
||||
def __init__(self, config_def_json_files_paths):
|
||||
self.config_def_json_files_paths = config_def_json_files_paths
|
||||
self.configs = {}
|
||||
for file in self.config_def_json_files_paths:
|
||||
with open(file, 'r') as f:
|
||||
jf = json.load(f)
|
||||
if "settings" in jf:
|
||||
setting_json = jf["settings"]
|
||||
for key, value in setting_json.items():
|
||||
# print(key, value)
|
||||
all_items = self._expend_children(value["children"])
|
||||
for item_key, item_value in all_items.items():
|
||||
self.configs[item_key] = item_value
|
||||
elif "overrides" in jf:
|
||||
override_json = jf["overrides"]
|
||||
for key, value in override_json.items():
|
||||
if key in self.configs:
|
||||
for item_key, item_value in value.items():
|
||||
self.configs[key][item_key] = item_value
|
||||
else:
|
||||
self.configs[key] = value
|
||||
|
||||
def _expend_children(self, config):
|
||||
tmp_c = {}
|
||||
for key,val in config.items():
|
||||
if "children" in val:
|
||||
tmp_c.update(self._expend_children(val["children"]))
|
||||
val.pop("children", None)
|
||||
tmp_c[key] = val
|
||||
return tmp_c
|
||||
|
||||
def add_inst_cfg(self, inst_cfg_files_paths):
|
||||
copy_settings = copy.deepcopy(self.configs)
|
||||
config = configparser.ConfigParser()
|
||||
for file in inst_cfg_files_paths:
|
||||
config.read(file)
|
||||
if config.has_section('values'):
|
||||
for key, val in config.items('values'):
|
||||
v = str(val)
|
||||
if key not in copy_settings:
|
||||
copy_settings[key] = {}
|
||||
if v.startswith("="):
|
||||
copy_settings[key]["value"] = v[1:]
|
||||
else:
|
||||
copy_settings[key]["value"] = v
|
||||
return copy_settings
|
||||
|
||||
|
||||
def parse_configs(self, settings):
|
||||
class ConfigStr(str):
|
||||
def __mul__(self, other): raise TypeError()
|
||||
def __rmul__(self, other): raise TypeError()
|
||||
def __add__(self, other): raise TypeError()
|
||||
def __radd__(self, other): raise TypeError()
|
||||
def __sub__(self, other): raise TypeError()
|
||||
def __rsub__(self, other): raise TypeError()
|
||||
def __truediv__(self, other): raise TypeError()
|
||||
def __rtruediv__(self, other): raise TypeError()
|
||||
def __pow__(self, other): raise TypeError()
|
||||
def __rpow__(self, other): raise TypeError()
|
||||
|
||||
parsed_settings = copy.deepcopy(settings)
|
||||
|
||||
last_unparsed = -1
|
||||
while True:
|
||||
unparsed = 0
|
||||
|
||||
# 构建上下文环境变量上下文,用作eval的变量替换
|
||||
context = {}
|
||||
for k, v in parsed_settings.items():
|
||||
# 上下文里的变量取值:优先使用 value(如果有),否则使用 default_value
|
||||
if "value" in v:
|
||||
val = v["value"]
|
||||
elif "default_value" in v:
|
||||
val = v["default_value"]
|
||||
else:
|
||||
val = None
|
||||
|
||||
if isinstance(val, str):
|
||||
if val.lower() == "true":
|
||||
val = True
|
||||
elif val.lower() == "false":
|
||||
val = False
|
||||
else:
|
||||
try:
|
||||
val = int(val)
|
||||
except ValueError:
|
||||
try:
|
||||
val = float(val)
|
||||
except ValueError:
|
||||
val = ConfigStr(val)
|
||||
context[k] = val
|
||||
|
||||
# 自定义函数实现
|
||||
def resolveOrValue(key):
|
||||
return context.get(key)
|
||||
|
||||
def extruderValues(key):
|
||||
# 兼容简易的多挤出机查询逻辑,目前单挤出机环境下返回一个单元素列表
|
||||
return [context.get(key)]
|
||||
|
||||
def extruderValue(extruder_position, key):
|
||||
# 对于单挤出机环境或者全局配置,直接忽略 extruder_position,返回指定的 key 对应的值
|
||||
return context.get(key)
|
||||
|
||||
def defaultExtruderPosition():
|
||||
return 0
|
||||
|
||||
# 提供基础的运算函数支持
|
||||
builtin_funcs = {
|
||||
"max": max,
|
||||
"min": min,
|
||||
"abs": abs,
|
||||
"round": round,
|
||||
"int": int,
|
||||
"float": float,
|
||||
"bool": bool,
|
||||
"math": math,
|
||||
"resolveOrValue": resolveOrValue,
|
||||
"extruderValues": extruderValues,
|
||||
"extruderValue": extruderValue,
|
||||
"defaultExtruderPosition": defaultExtruderPosition
|
||||
}
|
||||
|
||||
for key, val_dict in parsed_settings.items():
|
||||
for field, field_val in val_dict.items():
|
||||
# 仅对字符串进行尝试计算
|
||||
if "type" == field:
|
||||
continue
|
||||
|
||||
if isinstance(field_val, str):
|
||||
try:
|
||||
# 如果是一个普通的纯字符串,比如分类名(如"Machine Type" ),eval可能会抛出SyntaxError或NameError
|
||||
# 如果它是一个python表达式,则会被顺利计算出结果
|
||||
evaluated = eval(field_val, {"__builtins__": builtin_funcs}, context)
|
||||
# 避免将原本只是用来作类型声明的 "int"/"float" 纯字符串,被错误求值为内置 Python type class 打断 JSON 序列化
|
||||
if evaluated != field_val and not isinstance(evaluated, type):
|
||||
if val_dict.get("type") == "str" and not isinstance(evaluated, str):
|
||||
if isinstance(evaluated, (list, dict)):
|
||||
import json
|
||||
val_dict[field] = json.dumps(evaluated).replace(" ", "")
|
||||
else:
|
||||
val_dict[field] = str(evaluated)
|
||||
else:
|
||||
val_dict[field] = evaluated
|
||||
except Exception:
|
||||
# 解析失败(例如是个普通英文字符串或者依赖还没被解开)则暂不做处理
|
||||
unparsed += 1
|
||||
|
||||
# 如果在这一轮中未解析出的表达式数量不再减少,说明已经到达极限,跳出循环
|
||||
if unparsed == last_unparsed:
|
||||
break
|
||||
last_unparsed = unparsed
|
||||
|
||||
return parsed_settings
|
||||
|
||||
|
||||
152
app/utils/octoprint_client.py
Normal file
152
app/utils/octoprint_client.py
Normal file
@@ -0,0 +1,152 @@
|
||||
import requests
|
||||
from urllib.parse import urljoin
|
||||
|
||||
class OctoPrintClient:
|
||||
"""
|
||||
Client for interacting with the OctoPrint API using Application Keys or standard API Keys.
|
||||
Designed to be easily extensible.
|
||||
"""
|
||||
def __init__(self, base_url, api_key=None):
|
||||
self.base_url = base_url.rstrip('/')
|
||||
self.api_key = api_key
|
||||
self.session = requests.Session()
|
||||
if self.api_key:
|
||||
self.session.headers.update({"X-Api-Key": self.api_key})
|
||||
|
||||
def _request(self, method, endpoint, **kwargs):
|
||||
"""Internal method to handle API requests and standard parsing."""
|
||||
url = urljoin(self.base_url, endpoint)
|
||||
response = self.session.request(method, url, **kwargs)
|
||||
response.raise_for_status()
|
||||
|
||||
# Octoprint often returns 204 No Content for successful commands
|
||||
if response.status_code == 204:
|
||||
return True
|
||||
|
||||
try:
|
||||
return response.json()
|
||||
except ValueError:
|
||||
return response.text
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Application Key Workflow
|
||||
# -------------------------------------------------------------------------
|
||||
def request_app_key(self, app_name="My OctoPrint App"):
|
||||
"""
|
||||
Step 1: Start the Application Key authorization flow.
|
||||
Returns: (app_token, client_token)
|
||||
You should poll verify_app_key(client_token) until the user allows it in OctoPrint UI.
|
||||
"""
|
||||
data = self._request("POST", "/plugin/appkeys/request", json={"app": app_name})
|
||||
return data.get("app_token"), data.get("client_token")
|
||||
|
||||
def verify_app_key(self, client_token):
|
||||
"""
|
||||
Step 2: Check if the requested app key is approved.
|
||||
Returns True if authorized, False if still pending.
|
||||
Raises an exception if denied or timed out.
|
||||
"""
|
||||
url = urljoin(self.base_url, "/plugin/appkeys/probe")
|
||||
# App Key probe requires passing the client_token as a Bearer token
|
||||
# Don't use self._request since it uses the established session/api_key
|
||||
response = requests.get(url, headers={"Authorization": f"Bearer {client_token}"})
|
||||
|
||||
if response.status_code == 204:
|
||||
return True
|
||||
elif response.status_code == 202:
|
||||
return False # Pending approval
|
||||
else:
|
||||
raise Exception(f"App key request denied or expired (HTTP {response.status_code})")
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Files
|
||||
# -------------------------------------------------------------------------
|
||||
def get_files(self, location="local"):
|
||||
"""
|
||||
Retrieve all files available on OctoPrint.
|
||||
location: 'local' (internal storage) or 'sdcard' (SD card on printer)
|
||||
"""
|
||||
return self._request("GET", f"/api/files/{location}")
|
||||
|
||||
def select_file(self, location, path, print_after_select=False):
|
||||
"""Select a file, and optionally start printing it immediately."""
|
||||
payload = {"command": "select", "print": print_after_select}
|
||||
return self._request("POST", f"/api/files/{location}/{path}", json=payload)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Printer Status
|
||||
# -------------------------------------------------------------------------
|
||||
def get_printer_status(self):
|
||||
"""
|
||||
Get the current printer state (e.g., temperatures, operational state).
|
||||
Note: If printer is disconnected, this may return an HTTP error.
|
||||
"""
|
||||
try:
|
||||
return self._request("GET", "/api/printer")
|
||||
except requests.HTTPError as e:
|
||||
if e.response.status_code == 409:
|
||||
return {"state": {"text": "Offline/Disconnected"}}
|
||||
raise
|
||||
|
||||
def get_job_info(self):
|
||||
"""Get information about the current print job and progress."""
|
||||
return self._request("GET", "/api/job")
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Printer Control
|
||||
# -------------------------------------------------------------------------
|
||||
def start_print(self):
|
||||
"""Start the print job (Requires a file to be selected first)."""
|
||||
return self._request("POST", "/api/job", json={"command": "start"})
|
||||
|
||||
def pause_print(self, action="pause"):
|
||||
"""
|
||||
Pause, resume, or toggle the print job.
|
||||
action: 'pause', 'resume', or 'toggle'
|
||||
"""
|
||||
return self._request("POST", "/api/job", json={"command": "pause", "action": action})
|
||||
|
||||
def cancel_print(self):
|
||||
"""Cancel the current print job."""
|
||||
return self._request("POST", "/api/job", json={"command": "cancel"})
|
||||
|
||||
def send_gcode(self, commands):
|
||||
"""
|
||||
Send a string or list of G-Code commands directly to the printer.
|
||||
Example: send_gcode("G28") or send_gcode(["G28", "G29"])
|
||||
"""
|
||||
if isinstance(commands, str):
|
||||
commands = [commands]
|
||||
return self._request("POST", "/api/printer/command", json={"commands": commands})
|
||||
|
||||
def home_axes(self, axes=["x", "y", "z"]):
|
||||
"""Convenience method to home the printer axes."""
|
||||
return self._request("POST", "/api/printer/printhead", json={"command": "home", "axes": axes})
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Webcam / Video
|
||||
# -------------------------------------------------------------------------
|
||||
def get_webcam_stream_url(self):
|
||||
"""
|
||||
Attempts to fetch the configured webcam stream URL from OctoPrint settings.
|
||||
Provides a fallback if settings are inaccessible.
|
||||
"""
|
||||
try:
|
||||
settings = self._request("GET", "/api/settings")
|
||||
stream_url = settings.get("webcam", {}).get("streamUrl", "/webcam/?action=stream")
|
||||
if stream_url.startswith("/"):
|
||||
return urljoin(self.base_url, stream_url)
|
||||
return stream_url
|
||||
except requests.HTTPError:
|
||||
# Fallback standard URL
|
||||
return urljoin(self.base_url, "/webcam/?action=stream")
|
||||
|
||||
# --- Example Usage / Extensibility test ---
|
||||
if __name__ == "__main__":
|
||||
# Example snippet of how to use the client:
|
||||
OCTOPRINT_URL = "http://octopi.local"
|
||||
# OCTOPRINT_KEY = "YOUR_APP_KEY_HERE"
|
||||
|
||||
# client = OctoPrintClient(OCTOPRINT_URL, OCTOPRINT_KEY)
|
||||
# print(client.get_printer_status())
|
||||
pass
|
||||
107
app/utils/stl_merger.py
Normal file
107
app/utils/stl_merger.py
Normal file
@@ -0,0 +1,107 @@
|
||||
import struct
|
||||
import math
|
||||
import os
|
||||
|
||||
def merge_stls(input_files, output_path):
|
||||
try:
|
||||
from stl import mesh
|
||||
import numpy as np
|
||||
|
||||
meshes = []
|
||||
for path, matrix in input_files:
|
||||
try:
|
||||
# 重新换回轻量级的 numpy-stl 以防内存溢出 (OOM)
|
||||
m = mesh.Mesh.from_file(path)
|
||||
|
||||
mat = np.array(matrix, dtype=np.float64).reshape((4, 4)).T
|
||||
|
||||
vectors = m.vectors.reshape(-1, 3)
|
||||
hom_vectors = np.hstack((vectors, np.ones((len(vectors), 1), dtype=np.float32)))
|
||||
transformed = (mat @ hom_vectors.T).T
|
||||
m.vectors = transformed[:, :3].reshape(-1, 3, 3)
|
||||
|
||||
# 检测缩放矩阵是否引发镜像翻转 (行列式为负数)
|
||||
det = np.linalg.det(mat[:3, :3])
|
||||
if det < 0:
|
||||
# 发生镜像反转,不仅法线会反向,三角形三个顶点的顺逆时针(Winding Order)也会错乱
|
||||
# 强行交换每个三角形的顶点2和顶点3以纠正渲染正反面
|
||||
m.vectors[:, [1, 2]] = m.vectors[:, [2, 1]]
|
||||
|
||||
m.update_normals()
|
||||
meshes.append(m)
|
||||
except Exception as e:
|
||||
print(f"Error processing path {path} with stl mesh: {e}")
|
||||
|
||||
if not meshes:
|
||||
return
|
||||
|
||||
if len(meshes) == 1:
|
||||
meshes[0].save(output_path)
|
||||
return
|
||||
|
||||
merged_data = np.concatenate([m.data for m in meshes])
|
||||
merged_mesh = mesh.Mesh(merged_data)
|
||||
merged_mesh.save(output_path)
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
print(f"Mesh fast-merge failed: {e}. Falling back to struct parsing.")
|
||||
|
||||
# Extreme fallback just in case no stl libraries work
|
||||
total_faces = 0
|
||||
meshes_data = []
|
||||
|
||||
for path, matrix in input_files:
|
||||
with open(path, 'rb') as f:
|
||||
f.read(80)
|
||||
faces = struct.unpack('<I', f.read(4))[0]
|
||||
data = f.read(faces * 50)
|
||||
|
||||
def apply_m(_x, _y, _z):
|
||||
w = _x * matrix[3] + _y * matrix[7] + _z * matrix[11] + matrix[15]
|
||||
nx = (_x * matrix[0] + _y * matrix[4] + _z * matrix[8] + matrix[12]) / w
|
||||
ny = (_x * matrix[1] + _y * matrix[5] + _z * matrix[9] + matrix[13]) / w
|
||||
nz = (_x * matrix[2] + _y * matrix[6] + _z * matrix[10] + matrix[14]) / w
|
||||
return nx, ny, nz
|
||||
|
||||
new_data = bytearray(faces * 50)
|
||||
src_offset = 0
|
||||
dst_offset = 0
|
||||
for _ in range(faces):
|
||||
n_x, n_y, n_z, v1x, v1y, v1z, v2x, v2y, v2z, v3x, v3y, v3z, attr = struct.unpack_from('<12fH', data, src_offset)
|
||||
|
||||
nv1x, nv1y, nv1z = apply_m(v1x, v1y, v1z)
|
||||
nv2x, nv2y, nv2z = apply_m(v2x, v2y, v2z)
|
||||
nv3x, nv3y, nv3z = apply_m(v3x, v3y, v3z)
|
||||
|
||||
# Recalculate normal properly using cross product to fix flipped/sheared surfaces
|
||||
Ux = nv2x - nv1x
|
||||
Uy = nv2y - nv1y
|
||||
Uz = nv2z - nv1z
|
||||
Vx = nv3x - nv1x
|
||||
Vy = nv3y - nv1y
|
||||
Vz = nv3z - nv1z
|
||||
|
||||
nnx = Uy * Vz - Uz * Vy
|
||||
nny = Uz * Vx - Ux * Vz
|
||||
nnz = Ux * Vy - Uy * Vx
|
||||
|
||||
l = math.sqrt(nnx**2 + nny**2 + nnz**2)
|
||||
if l > 1e-8:
|
||||
nnx, nny, nnz = nnx/l, nny/l, nnz/l
|
||||
else:
|
||||
nnx, nny, nnz = 0.0, 0.0, 0.0
|
||||
|
||||
struct.pack_into('<12fH', new_data, dst_offset, nnx, nny, nnz, nv1x, nv1y, nv1z, nv2x, nv2y, nv2z, nv3x, nv3y, nv3z, attr)
|
||||
src_offset += 50
|
||||
dst_offset += 50
|
||||
|
||||
meshes_data.append(new_data)
|
||||
total_faces += faces
|
||||
|
||||
with open(output_path, 'wb') as f:
|
||||
f.write(b'\0' * 80)
|
||||
f.write(struct.pack('<I', total_faces))
|
||||
for d in meshes_data:
|
||||
f.write(d)
|
||||
|
||||
170
app/utils/stl_simplifier.py
Normal file
170
app/utils/stl_simplifier.py
Normal file
@@ -0,0 +1,170 @@
|
||||
import numpy as np
|
||||
from stl import mesh
|
||||
import struct
|
||||
import sys
|
||||
import os
|
||||
|
||||
def simplify_stl(input_path, output_path, keep_ratio=0.1):
|
||||
try:
|
||||
# Try using professional pymeshlab first
|
||||
import pymeshlab
|
||||
ms = pymeshlab.MeshSet()
|
||||
ms.load_new_mesh(input_path)
|
||||
|
||||
target_faces = int(ms.current_mesh().face_number() * keep_ratio)
|
||||
|
||||
# Optimize using quadric edge collapse to preserve 95% visual effect
|
||||
try:
|
||||
ms.apply_filter('meshing_decimation_quadric_edge_collapse',
|
||||
targetfacenum=target_faces,
|
||||
preserveboundary=True,
|
||||
preservenormal=True,
|
||||
preservetopology=True)
|
||||
except AttributeError:
|
||||
ms.meshing_decimation_quadric_edge_collapse(
|
||||
targetfacenum=target_faces,
|
||||
preserveboundary=True,
|
||||
preservenormal=True,
|
||||
preservetopology=True
|
||||
)
|
||||
|
||||
ms.save_current_mesh(output_path)
|
||||
return True
|
||||
except ImportError:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"Pymeshlab simplification failed: {e}. Falling back to Open3D...")
|
||||
|
||||
try:
|
||||
# Try using open3d as second fallback
|
||||
import open3d as o3d
|
||||
o3d_mesh = o3d.io.read_triangle_mesh(input_path)
|
||||
if len(o3d_mesh.triangles) > 0:
|
||||
target_faces = max(1, int(len(o3d_mesh.triangles) * keep_ratio))
|
||||
smp_mesh = o3d_mesh.simplify_quadric_decimation(target_number_of_triangles=target_faces)
|
||||
smp_mesh.compute_triangle_normals()
|
||||
o3d.io.write_triangle_mesh(output_path, smp_mesh)
|
||||
return True
|
||||
except ImportError:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"Open3D simplification failed: {e}. Falling back to PyFQMR...")
|
||||
|
||||
try:
|
||||
# Try using pyfqmr as third fallback
|
||||
import pyfqmr
|
||||
import trimesh
|
||||
mesh_data = trimesh.load(input_path, file_type='stl')
|
||||
target_faces = max(1, int(len(mesh_data.faces) * keep_ratio))
|
||||
simplifier = pyfqmr.Simplify()
|
||||
simplifier.setMesh(mesh_data.vertices, mesh_data.faces)
|
||||
simplifier.simplify_mesh(target_count=target_faces, aggressiveness=7, preserve_border=True, verbose=False)
|
||||
|
||||
mesh_parts = simplifier.getMesh()
|
||||
smp_mesh = trimesh.Trimesh(vertices=mesh_parts[0], faces=mesh_parts[1], process=False)
|
||||
smp_mesh.export(output_path, file_type='stl')
|
||||
return True
|
||||
except ImportError:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"PyFQMR simplification failed: {e}. Falling back to custom algorithm...")
|
||||
|
||||
try:
|
||||
try:
|
||||
import trimesh
|
||||
mesh_data = trimesh.load(input_path, file_type='stl')
|
||||
if hasattr(mesh_data, 'triangles'):
|
||||
vertices = mesh_data.triangles.reshape(-1, 3)
|
||||
else:
|
||||
vertices = mesh_data.vertices[mesh_data.faces].reshape(-1, 3)
|
||||
use_trimesh = True
|
||||
except ImportError:
|
||||
# Load mesh using numpy-stl fallback
|
||||
m = mesh.Mesh.from_file(input_path)
|
||||
vertices = m.vectors.reshape(-1, 3)
|
||||
use_trimesh = False
|
||||
|
||||
min_v = vertices.min(axis=0)
|
||||
max_v = vertices.max(axis=0)
|
||||
bbox_size = max_v - min_v
|
||||
max_dim = np.max(bbox_size)
|
||||
|
||||
if max_dim == 0:
|
||||
if use_trimesh:
|
||||
mesh_data.export(output_path, file_type='stl')
|
||||
else:
|
||||
m.save(output_path)
|
||||
return True
|
||||
|
||||
# Target roughly a resolution that gives us keep_ratio faces.
|
||||
# This is a heuristic approach to grid-based vertex clustering.
|
||||
# Function to simplify given a grid size
|
||||
def do_simplify(g_size):
|
||||
v_idx = np.round((vertices - min_v) / g_size).astype(np.int64)
|
||||
|
||||
# Fast 1D hash to avoid extremely slow np.unique(axis=0) on 2D arrays
|
||||
max_idx = v_idx.max(axis=0) + 1
|
||||
v_1d = v_idx[:, 0] + v_idx[:, 1] * max_idx[0] + v_idx[:, 2] * max_idx[0] * max_idx[1]
|
||||
|
||||
# Find unique grid cells and map old vertices to them
|
||||
_, unique_idx, inv_idx = np.unique(v_1d, return_index=True, return_inverse=True)
|
||||
new_verts = vertices[unique_idx]
|
||||
|
||||
# Map faces to new vertices
|
||||
faces = inv_idx.reshape(-1, 3)
|
||||
|
||||
# Remove degenerate faces (faces where at least two vertices resolve to the same cell)
|
||||
valid = (faces[:,0] != faces[:,1]) & (faces[:,1] != faces[:,2]) & (faces[:,0] != faces[:,2])
|
||||
valid_faces = faces[valid]
|
||||
|
||||
return new_verts, valid_faces
|
||||
|
||||
target_faces = max(1, int((len(vertices) // 3) * keep_ratio))
|
||||
low_g = max_dim * 0.0005
|
||||
high_g = max_dim * 0.2
|
||||
best_verts = vertices
|
||||
best_faces = np.arange(len(vertices)).reshape(-1, 3)
|
||||
|
||||
# Binary search for the right grid size
|
||||
for _ in range(8):
|
||||
g_size = (low_g + high_g) / 2
|
||||
v, f = do_simplify(g_size)
|
||||
best_verts, best_faces = v, f
|
||||
|
||||
if len(f) > target_faces:
|
||||
# too many faces, make grid coarser (larger)
|
||||
low_g = g_size
|
||||
else:
|
||||
# too few faces, make grid finer (smaller)
|
||||
high_g = g_size
|
||||
|
||||
if abs(len(f) - target_faces) < target_faces * 0.05:
|
||||
break
|
||||
|
||||
new_vertices, valid_faces = best_verts, best_faces
|
||||
|
||||
if use_trimesh:
|
||||
simplified = trimesh.Trimesh(vertices=new_vertices, faces=valid_faces, process=False)
|
||||
simplified.export(output_path, file_type='stl')
|
||||
return True
|
||||
|
||||
# Build the simplified mesh using fallback
|
||||
new_m = mesh.Mesh(np.zeros(valid_faces.shape[0], dtype=mesh.Mesh.dtype))
|
||||
|
||||
# Vectorized assignment
|
||||
new_m.vectors[:, 0, :] = new_vertices[valid_faces[:, 0]]
|
||||
new_m.vectors[:, 1, :] = new_vertices[valid_faces[:, 1]]
|
||||
new_m.vectors[:, 2, :] = new_vertices[valid_faces[:, 2]]
|
||||
|
||||
# Calculate normals correctly
|
||||
new_m.update_normals()
|
||||
new_m.save(output_path)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error simplifying STL: {e}")
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) > 2:
|
||||
simplify_stl(sys.argv[1], sys.argv[2])
|
||||
297
app/utils/tasks.py
Normal file
297
app/utils/tasks.py
Normal file
@@ -0,0 +1,297 @@
|
||||
from huey import SqliteHuey
|
||||
import subprocess
|
||||
import os
|
||||
from app.models import db, PrintFile, SystemConfig
|
||||
from app.utils.conf_parse import ConfParse
|
||||
import json
|
||||
import uuid
|
||||
import configparser
|
||||
|
||||
|
||||
import os
|
||||
|
||||
# Ensure instance directory exists
|
||||
instance_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), '..', 'instance')
|
||||
os.makedirs(instance_dir, exist_ok=True)
|
||||
huey_db_path = os.path.join(instance_dir, 'huey_queue.db')
|
||||
|
||||
huey = SqliteHuey(filename=huey_db_path)
|
||||
|
||||
|
||||
@huey.task()
|
||||
def slice_stl_task(file_id, stl_filepath, quality_preset=None, infill_density=None, support_enable=None, support_pattern=None, delete_stl=False):
|
||||
# This is run by the Huey worker
|
||||
# We need to create an app context to interact with the database
|
||||
from app import create_app
|
||||
app = create_app()
|
||||
with app.app_context():
|
||||
print_file = PrintFile.query.get(file_id)
|
||||
if not print_file:
|
||||
return
|
||||
|
||||
# Cache variables and commit slicing status
|
||||
gcode_filename = print_file.filename.rsplit('.', 1)[0] + '.gcode'
|
||||
gcode_filepath = os.path.join(app.config['UPLOAD_FOLDER'], gcode_filename)
|
||||
print_file.status = 'slicing'
|
||||
db.session.commit()
|
||||
|
||||
# Remove DB session to avoid locking the sqlite db during long slicing operations
|
||||
db.session.remove()
|
||||
|
||||
tmp_def_path = None
|
||||
|
||||
try:
|
||||
# Create Cura engine options
|
||||
# use our local minimal configurations detached from the entire Cura framework
|
||||
print_config_path = os.path.abspath(os.path.join(app.root_path, '..', 'print_config'))
|
||||
printers_path = os.path.join(print_config_path, 'printers')
|
||||
extruders_path = os.path.join(print_config_path, 'extruders')
|
||||
materials_path = os.path.join(print_config_path, 'materials')
|
||||
presets_path = os.path.join(print_config_path, 'quality')
|
||||
variants_path = os.path.join(print_config_path, 'variants')
|
||||
|
||||
env = os.environ.copy()
|
||||
env["CURA_ENGINE_SEARCH_PATH"] = f"{printers_path}:{extruders_path}:{materials_path}:{presets_path}:{variants_path}"
|
||||
|
||||
def_files = [
|
||||
os.path.join(printers_path, "fdmprinter.def.json"),
|
||||
os.path.join(printers_path, "fdmextruder.def.json"),
|
||||
os.path.join(printers_path, "creality_base.def.json"),
|
||||
os.path.join(printers_path, "creality_ender3v3se.def.json")
|
||||
]
|
||||
|
||||
inst_files_list = []
|
||||
|
||||
if quality_preset:
|
||||
config = configparser.ConfigParser()
|
||||
preset_path = os.path.join(presets_path, 'creality', 'presets', quality_preset)
|
||||
if os.path.exists(preset_path):
|
||||
config.read(preset_path)
|
||||
material_type = config.get('metadata', 'material', fallback=None)
|
||||
variant_type = config.get('metadata', 'variant', fallback=None)
|
||||
quality_type = config.get('metadata', 'quality_type', fallback=None)
|
||||
|
||||
if material_type:
|
||||
m_path = os.path.join(materials_path, f"{material_type}.inst.cfg")
|
||||
if os.path.exists(m_path): inst_files_list.append(m_path)
|
||||
if variant_type:
|
||||
variant_d = variant_type.split("mm")[0]
|
||||
v_path = os.path.join(variants_path, "creality", f"creality_ender3v3se_{variant_d}.inst.cfg")
|
||||
if os.path.exists(v_path): inst_files_list.append(v_path)
|
||||
|
||||
if support_pattern == 'tree':
|
||||
t_path = os.path.join(print_config_path, 'supports', 'tree.inst.cfg')
|
||||
if os.path.exists(t_path): inst_files_list.append(t_path)
|
||||
elif support_pattern and support_pattern != 'false':
|
||||
n_path = os.path.join(print_config_path, 'supports', 'normal.inst.cfg')
|
||||
if os.path.exists(n_path): inst_files_list.append(n_path)
|
||||
|
||||
if quality_preset and quality_type:
|
||||
g_path = os.path.join(presets_path, 'creality', 'globals', f"{quality_type}.inst.cfg")
|
||||
if os.path.exists(g_path): inst_files_list.append(g_path)
|
||||
|
||||
if quality_preset and os.path.exists(preset_path):
|
||||
inst_files_list.append(preset_path)
|
||||
|
||||
|
||||
p = ConfParse(def_files)
|
||||
settings_with_inst = p.add_inst_cfg(inst_files_list)
|
||||
|
||||
if infill_density is not None:
|
||||
if "infill_sparse_density" not in settings_with_inst: settings_with_inst["infill_sparse_density"] = {}
|
||||
settings_with_inst["infill_sparse_density"]["value"] = str(infill_density)
|
||||
if "infill_line_distance" not in settings_with_inst: settings_with_inst["infill_line_distance"] = {}
|
||||
settings_with_inst["infill_line_distance"]["value"] = str(100 / int(infill_density)) if int(infill_density) > 0 else "9999"
|
||||
|
||||
if support_enable is not None:
|
||||
if "support_enable" not in settings_with_inst: settings_with_inst["support_enable"] = {}
|
||||
settings_with_inst["support_enable"]["value"] = True if support_enable in ['true', 'buildplate'] else False
|
||||
if "support_type" not in settings_with_inst: settings_with_inst["support_type"] = {}
|
||||
settings_with_inst["support_type"]["value"] = "'buildplate'" if support_enable == 'buildplate' else "'everywhere'"
|
||||
|
||||
if support_pattern == 'tree':
|
||||
if "support_structure" not in settings_with_inst: settings_with_inst["support_structure"] = {}
|
||||
settings_with_inst["support_structure"]["value"] = "'tree'"
|
||||
elif support_pattern in settings_with_inst["support_pattern"]["options"].keys():
|
||||
if "support_structure" not in settings_with_inst: settings_with_inst["support_structure"] = {}
|
||||
settings_with_inst["support_structure"]["value"] = "'normal'"
|
||||
if "support_pattern" not in settings_with_inst: settings_with_inst["support_pattern"] = {}
|
||||
settings_with_inst["support_pattern"]["value"] = f"'{support_pattern}'"
|
||||
|
||||
# Parse to exact values
|
||||
res = p.parse_configs(settings_with_inst)
|
||||
|
||||
override_dict = {}
|
||||
for k, v in res.items():
|
||||
if v.get("enabled", True):
|
||||
val = v.get("value", None)
|
||||
if val is not None:
|
||||
# Filter out our protective ConfigStr wrappers
|
||||
# if type(val).__name__ == "ConfigStr": pass
|
||||
# else: override_dict[k] = {"default_value": val}
|
||||
override_dict[k] = {"value": val,"default_value": val}
|
||||
elif "default_value" in v:
|
||||
override_dict[k] = {"default_value": v["default_value"], "value": v["default_value"]}
|
||||
|
||||
|
||||
|
||||
tmp_def_filename = f"tmp_{uuid.uuid4().hex}.def.json"
|
||||
tmp_def_path = os.path.join(app.config['UPLOAD_FOLDER'], tmp_def_filename)
|
||||
|
||||
tmp_def_obj = {
|
||||
"version": 2,
|
||||
"name": "TempProfile",
|
||||
"inherits": "fdmprinter",
|
||||
"metadata": {
|
||||
"visible": True,
|
||||
"author": "System",
|
||||
"manufacturer": "System",
|
||||
"file_formats": "text/x-gcode",
|
||||
"first_start_actions": ["MachineSettingsAction"],
|
||||
"has_materials": True,
|
||||
"has_variants": True,
|
||||
"has_machine_quality": True,
|
||||
"variants_name": "Nozzle Size",
|
||||
|
||||
"preferred_variant_name": "0.4mm Nozzle",
|
||||
"preferred_quality_type": "standard",
|
||||
"preferred_material": "generic_pla",
|
||||
|
||||
},
|
||||
"overrides": override_dict
|
||||
}
|
||||
|
||||
pretty_json = json.dumps(tmp_def_obj, indent=4)
|
||||
|
||||
with open(tmp_def_path, "w") as f:
|
||||
f.write(pretty_json)
|
||||
|
||||
command = [
|
||||
"CuraEngine", "slice",
|
||||
"-j", tmp_def_path,
|
||||
"-l", stl_filepath,
|
||||
"-o", gcode_filepath
|
||||
]
|
||||
|
||||
app.logger.info(f"Running command: {' '.join(command)}")
|
||||
# print(f"Running command: {' '.join(command)}")
|
||||
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
|
||||
stdout, stderr = process.communicate()
|
||||
|
||||
# if stdout:
|
||||
# print(f"[CuraEngine STDOUT]\n{stdout.decode('utf-8', errors='ignore')}")
|
||||
# if stderr:
|
||||
# print(f"[CuraEngine STDERR]\n{stderr.decode('utf-8', errors='ignore')}", flush=True)
|
||||
|
||||
# Re-fetch print_file and update status
|
||||
print_file = PrintFile.query.get(file_id)
|
||||
if not print_file:
|
||||
return
|
||||
|
||||
if process.returncode == 0:
|
||||
print_file.status = 'sliced'
|
||||
else:
|
||||
print_file.status = 'failed'
|
||||
app.logger.error(f"CuraEngine Error: {stderr.decode()}")
|
||||
|
||||
except Exception as e:
|
||||
# Re-fetch in case of exception
|
||||
print_file = PrintFile.query.get(file_id)
|
||||
if print_file:
|
||||
print_file.status = 'failed'
|
||||
app.logger.error(f"Subprocess Exception: {e}")
|
||||
|
||||
if delete_stl and os.path.exists(stl_filepath):
|
||||
try:
|
||||
os.remove(stl_filepath)
|
||||
except Exception as e:
|
||||
app.logger.error(f"Failed to delete temp STL {stl_filepath}: {e}")
|
||||
|
||||
if tmp_def_path and os.path.exists(tmp_def_path):
|
||||
try:
|
||||
os.remove(tmp_def_path)
|
||||
# pass
|
||||
except Exception as e:
|
||||
app.logger.error(f"Failed to delete temp JSON config {tmp_def_path}: {e}")
|
||||
|
||||
db.session.commit()
|
||||
db.session.remove()
|
||||
|
||||
|
||||
@huey.task()
|
||||
def merge_and_slice_task(file_id, inputs, merged_filepath, quality_preset=None, infill_density=None, support_enable=None, support_pattern=None, delete_stl=False):
|
||||
from app import create_app
|
||||
app = create_app()
|
||||
with app.app_context():
|
||||
from app.models import PrintFile, db
|
||||
print_file = PrintFile.query.get(file_id)
|
||||
if not print_file:
|
||||
return
|
||||
|
||||
db.session.remove()
|
||||
|
||||
try:
|
||||
from app.utils.stl_merger import merge_stls
|
||||
merge_stls(inputs, merged_filepath)
|
||||
|
||||
# Now trigger the regular slicing task
|
||||
# We can just call the slicing logic or enqueue it
|
||||
slice_stl_task(file_id, merged_filepath, quality_preset, infill_density, support_enable, support_pattern, delete_stl=delete_stl)
|
||||
except Exception as e:
|
||||
print_file = PrintFile.query.get(file_id)
|
||||
if print_file:
|
||||
print_file.status = 'failed'
|
||||
db.session.commit()
|
||||
app.logger.error(f"Merge Exception: {e}")
|
||||
finally:
|
||||
db.session.remove()
|
||||
|
||||
@huey.task()
|
||||
def simplify_stl_task(file_id, filepath):
|
||||
from app import create_app
|
||||
app = create_app()
|
||||
with app.app_context():
|
||||
from app.models import PrintFile, SystemConfig, db
|
||||
import os
|
||||
from app.utils.stl_simplifier import simplify_stl
|
||||
|
||||
print_file = PrintFile.query.get(file_id)
|
||||
if not print_file:
|
||||
return
|
||||
|
||||
try:
|
||||
file_size_mb = os.path.getsize(filepath) / (1024 * 1024)
|
||||
|
||||
configs = {c.key: c.value for c in SystemConfig.query.all()}
|
||||
skip_size = float(configs.get('proxy_skip_size_mb', '5.0'))
|
||||
|
||||
proxy_path = filepath + '.proxy.stl'
|
||||
|
||||
if file_size_mb <= skip_size:
|
||||
# File is small enough, no proxy needed
|
||||
print_file.status = 'uploaded'
|
||||
db.session.commit()
|
||||
return
|
||||
|
||||
# Aim for approx 7.5 MB for the proxy
|
||||
target_mb = 7.5
|
||||
keep_ratio = target_mb / file_size_mb
|
||||
if keep_ratio > 1.0:
|
||||
keep_ratio = 1.0
|
||||
elif keep_ratio < 0.01:
|
||||
keep_ratio = 0.01
|
||||
|
||||
app.logger.info(f"Simplifying {filepath}... Size: {file_size_mb:.2f}MB, Target Ratio: {keep_ratio:.3f}")
|
||||
|
||||
simplify_stl(filepath, proxy_path, keep_ratio=keep_ratio)
|
||||
|
||||
except Exception as e:
|
||||
app.logger.error(f"Simplify task error: {e}")
|
||||
|
||||
# Update status to uploaded regardless of success or failure of proxy generation
|
||||
# So the user can still slice or download it
|
||||
print_file = PrintFile.query.get(file_id)
|
||||
if print_file:
|
||||
print_file.status = 'uploaded'
|
||||
db.session.commit()
|
||||
db.session.remove()
|
||||
Reference in New Issue
Block a user