import json import uuid import os import configparser from datetime import datetime from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, session, make_response, send_file, abort, jsonify from flask_login import login_user, logout_user, login_required, current_user from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename from app.models import db, User, PrintFile, SystemConfig, UserSession from app.utils.tasks import merge_and_slice_task, slice_stl_task, simplify_stl_task from app import i18n_dict from app.utils.stl_simplifier import simplify_stl from app.routes.admin_routes import get_gcode_dir from app.utils.slice_engines import get_slicer_engine from app.utils.gcode_parser import get_gcode_metadata main_bp = Blueprint('main', __name__) @main_bp.before_app_request def check_user_session(): if current_user.is_authenticated and not current_user.is_guest: session_token = session.get('user_session_token') client_ip = request.headers.get('X-Real-IP') or request.remote_addr if session_token: user_session = UserSession.query.filter_by(session_token=session_token).first() if not user_session or not user_session.is_active: logout_user() session.pop('user_session_token', None) flash('Your session has been terminated.', 'warning') return redirect(url_for('auth.login')) else: user_session.last_active = datetime.utcnow() user_session.ip_address = client_ip db.session.commit() else: # Re-authenticated via remember me, but no session token new_session_token = str(uuid.uuid4()) user_session = UserSession( user_id=current_user.id, session_token=new_session_token, ip_address=client_ip, user_agent=request.user_agent.string, last_active=datetime.utcnow() ) db.session.add(user_session) db.session.commit() session['user_session_token'] = new_session_token # Check default admin password securely without checking hash on every request if current_user.is_admin: if session.get('pwd_check_done') is None: session['pwd_check_done'] = True if check_password_hash(current_user.password_hash, 'admin123'): session['must_change_password'] = True else: session.pop('must_change_password', None) if session.get('must_change_password'): if request.endpoint and request.endpoint not in ['main.account', 'auth.logout', 'static']: flash('For security reasons, please change your default admin password.', 'warning') return redirect(url_for('main.account')) auth_bp = Blueprint('auth', __name__, url_prefix='/auth') admin_bp = Blueprint('admin', __name__, url_prefix='/admin') def get_quota_info(user, file_type): # Returns (quota_mb, current_size_bytes) if user.is_admin: quota_mb = 0.0 else: conf = SystemConfig.query.filter_by(key=f"user_{user.id}_{file_type}_quota_mb").first() quota_mb = float(conf.value) if conf else 0.0 if quota_mb == 0.0: if user.is_guest: def_conf = SystemConfig.query.filter_by(key=f"default_guest_{file_type}_quota_mb").first() else: def_conf = SystemConfig.query.filter_by(key=f"default_user_{file_type}_quota_mb").first() quota_mb = float(def_conf.value) if def_conf else 0.0 user_files = PrintFile.query.filter_by(user_id=user.id).all() current_size = 0 upload_dir = current_app.config.get('UPLOAD_FOLDER', 'uploads') gcode_dir = get_gcode_dir() for pf in user_files: if file_type == 'stl' and not pf.original_filename.lower().endswith(('.gcode', '.gco', '.g')): path = os.path.join(upload_dir, pf.filename) if os.path.exists(path): current_size += os.path.getsize(path) elif file_type == 'gcode': g_filename = pf.filename.rsplit('.', 1)[0] + '.gcode' path = os.path.join(gcode_dir, g_filename) if os.path.exists(path): current_size += os.path.getsize(path) else: p2 = os.path.join(upload_dir, g_filename) if os.path.exists(p2): current_size += os.path.getsize(p2) return quota_mb, current_size def check_quota(user, file_type, size_bytes): if user.is_admin: return True quota_mb, current_size = get_quota_info(user, file_type) if quota_mb <= 0.0: return True if current_size + size_bytes > quota_mb * 1024 * 1024: return False return True # Guest User Middleware @main_bp.before_app_request def assign_guest_cookie(): if request.path.startswith('/api/'): return if not current_user.is_authenticated: guest_id = request.cookies.get('guest_id') if not guest_id: guest_id = str(uuid.uuid4()) user = User(username=f'guest_{guest_id[:8]}', is_guest=True, guest_cookie_id=guest_id) db.session.add(user) db.session.commit() login_user(user) # We will set the cookie in the response after request, see below request.guest_id_to_set = guest_id else: user = User.query.filter_by(guest_cookie_id=guest_id).first() if user: login_user(user) @main_bp.after_app_request def set_guest_cookie(response): if hasattr(request, 'guest_id_to_set'): response.set_cookie('guest_id', request.guest_id_to_set, max_age=60*60*24*365) # 1 year return response # --- Main Routes --- @main_bp.route('/') @login_required def index(): user_files = PrintFile.query.filter((PrintFile.user_id == current_user.id) | (current_user.is_admin)).all() if current_user.is_admin else PrintFile.query.filter_by(user_id=current_user.id).all() stl_count = 0 stl_size = 0 gcode_count = 0 gcode_size = 0 upload_dir = current_app.config.get('UPLOAD_FOLDER', 'uploads') gcode_dir = get_gcode_dir() for f in user_files: is_external_gcode = f.original_filename.lower().endswith(('.gcode', '.gco', '.g')) if is_external_gcode: gcode_count += 1 gcode_path = os.path.join(gcode_dir, f.filename.replace('.stl', '.gcode')) if os.path.exists(gcode_path): gcode_size += os.path.getsize(gcode_path) else: stl_count += 1 stl_path = os.path.join(upload_dir, f.filename) if os.path.exists(stl_path): stl_size += os.path.getsize(stl_path) if f.status == 'sliced': gcode_count += 1 gcode_filename = f.filename.rsplit('.', 1)[0] + '.gcode' gcode_path = os.path.join(gcode_dir, gcode_filename) if os.path.exists(gcode_path): gcode_size += os.path.getsize(gcode_path) else: gcode_fallback = os.path.join(upload_dir, gcode_filename) if os.path.exists(gcode_fallback): gcode_size += os.path.getsize(gcode_fallback) def format_size(size_bytes): if size_bytes < 1024: return f"{size_bytes} B" elif size_bytes < 1024 * 1024: return f"{size_bytes / 1024:.2f} KB" elif size_bytes < 1024 * 1024 * 1024: return f"{size_bytes / (1024 * 1024):.2f} MB" else: return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB" stl_quota_mb, stl_used_bytes = get_quota_info(current_user, 'stl') gcode_quota_mb, gcode_used_bytes = get_quota_info(current_user, 'gcode') return render_template('slice/index.html', stl_count=stl_count, stl_size_str=format_size(stl_size), gcode_count=gcode_count, gcode_size_str=format_size(gcode_size), stl_used_bytes=stl_used_bytes, stl_quota_mb=stl_quota_mb, gcode_used_bytes=gcode_used_bytes, gcode_quota_mb=gcode_quota_mb, format_size=format_size ) @main_bp.route('/set_language/') def set_language(lang): if lang not in i18n_dict: lang = 'en' # return to previous page response = make_response(redirect(request.referrer or url_for('main.index'))) response.set_cookie('lang', lang, max_age=60*60*24*365) return response @main_bp.route('/files', methods=['GET', 'POST']) @login_required def files(): if request.method == 'POST': if 'file' not in request.files: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'error': 'No file part'}), 400 flash('No file part', 'danger') return redirect(request.url) uploaded_files = request.files.getlist('file') success_count = 0 for file in uploaded_files: if file.filename == '': continue if file and file.filename.lower().endswith('.stl'): file.seek(0, os.SEEK_END) size_bytes = file.tell() file.seek(0, os.SEEK_SET) if not check_quota(current_user, 'stl', size_bytes): if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'error': 'STL Storage Quota Exceeded'}), 400 flash('STL Storage Quota Exceeded', 'danger') continue original_filename = file.filename # Do not use secure_filename to keep Chinese characters ext = os.path.splitext(original_filename)[1].lower() if not ext: ext = '.stl' timestamp = datetime.now().strftime('%Y%m%d%H%M%S') unique_filename = f"{timestamp}_{uuid.uuid4().hex}_{success_count}{ext}" filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], unique_filename) file.save(filepath) print_file = PrintFile( filename=unique_filename, original_filename=original_filename, file_type='stl', user_id=current_user.id, status='simplifying' # Set to simplifying while proxy is generated ) db.session.add(print_file) db.session.commit() # Start background simplification simplify_stl_task(print_file.id, filepath) success_count += 1 if success_count > 0: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'success': True, 'count': success_count}) flash(f'{success_count} file(s) uploaded successfully!', 'success') else: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'error': 'No valid files uploaded'}), 400 flash('No valid files uploaded', 'danger') return redirect(url_for('main.files')) # Order by newest first user_files = PrintFile.query.filter_by(user_id=current_user.id, file_type='stl').order_by(PrintFile.created_at.desc()).all() return render_template('slice/files.html', files=user_files) @main_bp.route('/api/files_status') @login_required def files_status(): files = PrintFile.query.filter_by(user_id=current_user.id).all() return jsonify({str(f.id): f.status for f in files}) @main_bp.route('/download/') @login_required def download_gcode(file_id): print_file = PrintFile.query.get_or_404(file_id) if print_file.user_id != current_user.id and not current_user.is_admin: abort(403) if print_file.status != 'sliced': flash('File is not ready yet.', 'warning') return redirect(url_for('main.files')) gcode_filename = print_file.filename.rsplit('.', 1)[0] + '.gcode' conf = SystemConfig.query.filter_by(key='gcode_upload_folder').first() gcode_dir = conf.value if (conf and conf.value and os.path.exists(conf.value)) else current_app.config['UPLOAD_FOLDER'] filepath = os.path.join(gcode_dir, gcode_filename) if not os.path.exists(filepath): fallback = os.path.join(current_app.config['UPLOAD_FOLDER'], gcode_filename) if os.path.exists(fallback): filepath = fallback if os.path.exists(filepath): safe_name = print_file.original_filename.rsplit('.', 1)[0] + '.gcode' return send_file(filepath, as_attachment=True, download_name=safe_name) flash('GCode file not found. It might have been deleted.', 'danger') return redirect(url_for('main.files')) @main_bp.route('/preview_gcode/') @login_required def preview_gcode(file_id): print_file = PrintFile.query.get_or_404(file_id) if print_file.user_id != current_user.id and not current_user.is_admin: abort(403) gcode_filename = print_file.filename.rsplit('.', 1)[0] + '.gcode' conf = SystemConfig.query.filter_by(key='gcode_upload_folder').first() gcode_dir = conf.value if (conf and conf.value and os.path.exists(conf.value)) else current_app.config['UPLOAD_FOLDER'] filepath = os.path.join(gcode_dir, gcode_filename) if not os.path.exists(filepath): fallback = os.path.join(current_app.config['UPLOAD_FOLDER'], gcode_filename) if os.path.exists(fallback): filepath = fallback content = "File not found or not ready." line_count = 0 time_info = "-" layer1_time = "-" filament_used = "-" if os.path.exists(filepath): metadata = get_gcode_metadata(filepath) time_info = metadata.get('print_time', '-') layer1_time = metadata.get('first_layer_time', '-') filament_used = metadata.get('filament_used', '-') with open(filepath, 'r') as f: lines = f.readlines() line_count = len(lines) content = "".join(lines[:500]) # Preview first 500 lines if line_count > 500: content += f"\n... \n[Preview truncated. Total lines: {line_count}. Please download to view full file.]" engine_name = SystemConfig.query.filter_by(key='slicer_engine').first() if engine_name: engine = get_slicer_engine(str(engine_name.value), current_app.config['PRINT_CONFIG_FOLDER'], current_app.config['PRUSA_SLICE_BIN']) w, h, hd = engine.get_bed_dimensions() configs = {c.key: c.value for c in SystemConfig.query.all()} offset_x = float(configs.get('offset_x', '0.0')) offset_y = float(configs.get('offset_y', '0.0')) return render_template('slice/gcode_preview.html', file=print_file, content=content, line_count=line_count, machine_width=w, machine_depth=h, machine_height=hd, offset_x=offset_x, offset_y=offset_y, time_info=time_info, layer1_time=layer1_time, filament_used=filament_used) @main_bp.route('/delete_file/', methods=['POST']) @login_required def delete_file(file_id): print_file = PrintFile.query.get_or_404(file_id) if print_file.user_id != current_user.id and not current_user.is_admin: abort(403) stl_path = os.path.join(current_app.config['UPLOAD_FOLDER'], print_file.filename) gcode_filename = print_file.filename.rsplit('.', 1)[0] + '.gcode' conf = SystemConfig.query.filter_by(key='gcode_upload_folder').first() gcode_dir = conf.value if (conf and conf.value and os.path.exists(conf.value)) else current_app.config['UPLOAD_FOLDER'] gcode_path = os.path.join(gcode_dir, gcode_filename) fallback_gcode = os.path.join(current_app.config['UPLOAD_FOLDER'], gcode_filename) proxy_path = stl_path + '.proxy.stl' if os.path.exists(stl_path): os.remove(stl_path) if os.path.exists(proxy_path): os.remove(proxy_path) if os.path.exists(gcode_path): os.remove(gcode_path) if os.path.exists(fallback_gcode): os.remove(fallback_gcode) db.session.delete(print_file) db.session.commit() flash(f"Deleted {print_file.original_filename} successfully.", 'success') return redirect(url_for('main.files')) # --- Auth Routes --- @main_bp.route('/plater') @login_required def plater(): quota_mb, current_size = get_quota_info(current_user, 'gcode') quota_exceeded = (quota_mb > 0 and current_size >= quota_mb * 1024 * 1024) engine_name = SystemConfig.query.filter_by(key='slicer_engine').first() if engine_name: engine = get_slicer_engine(str(engine_name.value), current_app.config['PRINT_CONFIG_FOLDER'], current_app.config['PRUSA_SLICE_BIN']) w, h, hd = engine.get_bed_dimensions() print(f"Bed dimensions: {w}x{h}x{hd}") configs = {c.key: c.value for c in SystemConfig.query.all()} offset_x = float(configs.get('offset_x', '0.0')) offset_y = float(configs.get('offset_y', '0.0')) default_infill = configs.get('default_infill', '20') default_support = configs.get('default_support', 'false') default_support_pattern = configs.get('default_support_pattern', 'tree') default_quality = configs.get('default_quality', 'base_global_standard.inst.cfg') default_material = configs.get('default_material', '') user_files = PrintFile.query.filter_by(user_id=current_user.id, file_type='stl').order_by(PrintFile.created_at.desc()).all() models = [{'id': f.id, 'name': f.original_filename, 'status': f.status, 'url': url_for('main.serve_proxy_file', file_id=f.id), 'transform_matrix': f.transform_matrix} for f in user_files] return render_template('slice/plater.html', w=w, h=h, hd=hd, last_quality=default_quality, last_material=default_material, models=models, offset_x=offset_x, offset_y=offset_y, default_infill=default_infill, default_support=default_support, default_support_pattern=default_support_pattern, quota_exceeded=quota_exceeded, configs=configs) import re import markdown @main_bp.route('/helper_slice') def helper_slice(): lang = request.cookies.get('lang', 'en') filepath = os.path.join(current_app.root_path, 'assets', 'doc', f'slice_helper_{lang}.md') if not os.path.exists(filepath): filepath = os.path.join(current_app.root_path, 'assets', 'doc', 'slice_helper_en.md') content_html = "" if os.path.exists(filepath): with open(filepath, 'r', encoding='utf-8') as f: md_text = f.read() content_html = markdown.markdown(md_text, extensions=['fenced_code', 'tables']) # Rewrite relative image links to /assets/doc/ content_html = re.sub(r'src="(?!http|/)([^"]+)"', r'src="/assets/doc/\1"', content_html) return render_template('slice/helper_slice.html', content_html=content_html) @main_bp.route('/file/') @login_required def serve_file(file_id): f = PrintFile.query.get_or_404(file_id) if f.user_id != current_user.id and not current_user.is_admin: abort(403) path = os.path.join(current_app.config['UPLOAD_FOLDER'], f.filename) return send_file(path) @main_bp.route('/proxy/') @login_required def serve_proxy_file(file_id): f = PrintFile.query.get_or_404(file_id) if f.user_id != current_user.id and not current_user.is_admin: abort(403) path = os.path.join(current_app.config['UPLOAD_FOLDER'], f.filename) proxy_path = path + '.proxy.stl' if os.path.exists(proxy_path): return send_file(proxy_path) return send_file(path) @main_bp.route('/api/merge_and_slice', methods=['POST']) @login_required def merge_and_slice(): quota_mb, current_size = get_quota_info(current_user, 'gcode') if quota_mb > 0 and current_size >= quota_mb * 1024 * 1024: return jsonify({'success': False, 'error': 'GCode Storage Quota Exceeded. Please delete some files first.'}) data = request.json pieces = data.get('pieces', []) quality = data.get('quality', 'base_global_standard.inst.cfg') material = data.get('material', '') infill_density = data.get('infill', '20') support_enable = data.get('support', 'false') support_pattern = data.get('support_pattern', 'lines') if not pieces: return jsonify({'error': 'No pieces provided'}), 400 inputs = [] # Build a combined name names = [] for p in pieces[:3]: # Cap names at 3 to avoid super long string f = PrintFile.query.get(p['file_id']) if f and (f.user_id == current_user.id or current_user.is_admin): names.append(f.original_filename.replace('.stl', '')) combined_name = ", ".join(names) if len(pieces) > 3: combined_name += "等合并切片" elif len(pieces) > 1: combined_name += "合并切片" else: combined_name += " 单独切片" is_edit = data.get('is_edit', False) for p in pieces: f = PrintFile.query.get(p['file_id']) if f and (f.user_id == current_user.id or current_user.is_admin): path = os.path.join(current_app.config['UPLOAD_FOLDER'], f.filename) inputs.append((path, p['matrix'])) # 只有在单一编辑模式才修改原模型的矩阵 (如果多模型/新建模式,我们不修改原模型,而是后续记录到新的包含实体上) if 'raw_matrix' in p and is_edit and len(pieces) == 1: f.transform_matrix = json.dumps({ "is_composite": False, "matrix": p['raw_matrix'], "settings": { "quality": quality, "material": material, "infill": infill_density, "support": support_enable, "support_pattern": support_pattern } }) db.session.add(f) db.session.commit() target_file_id = data.get('target_file_id') if is_edit and target_file_id: print_file = PrintFile.query.get(target_file_id) if not print_file: return jsonify({'error': 'Original file not found'}), 404 print_file.status = 'merging' if print_file.transform_matrix and 'is_composite' in print_file.transform_matrix: composite_data = { "is_composite": True, "parts": [], "settings": { "quality": quality, "material": material, "infill": infill_density, "support": support_enable, "support_pattern": support_pattern } } for p in pieces: pf = PrintFile.query.get(p['file_id']) if pf: composite_data['parts'].append({ "file_id": pf.id, "name": pf.original_filename, "url": url_for('main.serve_proxy_file', file_id=pf.id), "raw_matrix": p.get('raw_matrix', p['matrix']) }) print_file.transform_matrix = json.dumps(composite_data) elif len(pieces) == 1: print_file.transform_matrix = json.dumps({ "is_composite": False, "matrix": pieces[0].get('raw_matrix', pieces[0]['matrix']), "settings": { "quality": quality, "material": material, "infill": infill_density, "support": support_enable, "support_pattern": support_pattern } }) db.session.commit() temp_filename = f"temp_edit_{uuid.uuid4().hex}.stl" temp_filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], temp_filename) merge_and_slice_task(target_file_id, inputs, temp_filepath, quality, material, infill_density, support_enable, support_pattern, delete_stl=True) elif len(inputs) == 1 and is_edit: target_file_id = pieces[0]['file_id'] print_file = PrintFile.query.get(target_file_id) if not print_file: return jsonify({'error': 'Original file not found'}), 404 print_file.status = 'merging' db.session.commit() # We still need to apply transforms to a temporary STL to generate correct GCode temp_filename = f"temp_{uuid.uuid4().hex}.stl" temp_filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], temp_filename) merge_and_slice_task(target_file_id, inputs, temp_filepath, quality, material, infill_density, support_enable, support_pattern, delete_stl=True) else: # Multiple models, create a new "Merged Slice" PrintFile entry to keep track of combination timestamp = datetime.now().strftime('%Y%m%d%H%M%S') unique_filename = f"{timestamp}_{uuid.uuid4().hex}.stl" merged_filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], unique_filename) # 构建组合文件元数据树 (is_composite: true) composite_data = { "is_composite": True, "parts": [], "settings": { "quality": quality, "material": material, "infill": infill_density, "support": support_enable, "support_pattern": support_pattern } } for p in pieces: pf = PrintFile.query.get(p['file_id']) if pf: composite_data['parts'].append({ "file_id": pf.id, "name": pf.original_filename, "url": url_for('main.serve_proxy_file', file_id=pf.id), "raw_matrix": p.get('raw_matrix', p['matrix']) }) print_file = PrintFile( filename=unique_filename, original_filename=f"{combined_name}.stl", file_type='stl', user_id=current_user.id, status='merging', transform_matrix=json.dumps(composite_data) ) db.session.add(print_file) db.session.commit() merge_and_slice_task(print_file.id, inputs, merged_filepath, quality, material, infill_density, support_enable, support_pattern, delete_stl=False) return jsonify({'success': True, 'message': 'Plater slice queued!'}) @main_bp.route('/api/build_plate_model') @login_required def build_plate_model(): conf = SystemConfig.query.filter_by(key='build_plate_model_path').first() if conf and conf.value and os.path.exists(conf.value): return send_file(conf.value) abort(404) @main_bp.route('/api/engine_options/') @login_required def engine_options(engine_name): engine = get_slicer_engine(engine_name, current_app.config['PRINT_CONFIG_FOLDER'], current_app.config['PRUSA_SLICE_BIN']) presets = engine.get_quality_presets() patterns = engine.get_support_patterns() materials = engine.get_materials() if hasattr(engine, 'get_materials') else [] printers = engine.get_all_printers() if hasattr(engine, 'get_all_printers') else [] return jsonify({'presets': presets, 'support_patterns': patterns, 'materials': materials, 'printers': printers}) @main_bp.route('/account', methods=['GET', 'POST']) @login_required def account(): if current_user.is_guest: flash('Guests cannot manage accounts.', 'danger') return redirect(url_for('main.index')) if request.method == 'POST': action = request.form.get('action') if action == 'change_password': current_pass = request.form.get('current_password') new_pass = request.form.get('new_password') confirm_pass = request.form.get('confirm_password') if not check_password_hash(current_user.password_hash, current_pass): flash('Current password is incorrect.', 'danger') elif new_pass != confirm_pass: flash('New passwords do not match.', 'danger') elif len(new_pass) < 6: flash('New password must be at least 6 characters.', 'danger') elif current_user.is_admin and new_pass == 'admin123': flash('Your new password cannot be the default "admin123".', 'danger') else: current_user.password_hash = generate_password_hash(new_pass) db.session.commit() # If they just changed it, clear the must change flag session.pop('must_change_password', None) flash('Password updated successfully.', 'success') elif action == 'terminate_session': session_id = request.form.get('session_id') token_to_terminate = request.form.get('session_token') my_session_token = session.get('user_session_token') if token_to_terminate == my_session_token: flash('You cannot terminate your current session from here. Please logout instead.', 'warning') else: us = UserSession.query.filter_by(id=session_id, user_id=current_user.id).first() if us: us.is_active = False db.session.commit() flash('Session terminated.', 'success') return redirect(url_for('main.account')) sessions = UserSession.query.filter_by(user_id=current_user.id, is_active=True).order_by(UserSession.last_active.desc()).all() current_token = session.get('user_session_token') return render_template('slice/account.html', sessions=sessions, current_token=current_token)