import json import trimesh import uuid import os import configparser from datetime import datetime from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, session, make_response, send_file, abort, jsonify from flask_login import login_user, logout_user, login_required, current_user from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename from app.models import db, User, PrintFile, SystemConfig from app.utils.tasks import merge_and_slice_task, slice_stl_task, simplify_stl_task from app import i18n_dict # import trimesh.repair from app.utils.stl_simplifier import simplify_stl main_bp = Blueprint('main', __name__) auth_bp = Blueprint('auth', __name__, url_prefix='/auth') admin_bp = Blueprint('admin', __name__, url_prefix='/admin') # Guest User Middleware @auth_bp.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user = User.query.filter_by(username=username, is_guest=False).first() remember = bool(request.form.get('remember')) merge_data = bool(request.form.get('merge_data')) if user and check_password_hash(user.password_hash, password): login_user(user, remember=remember) from app.models import UserSession session_token = str(uuid.uuid4()) # 尝试获取反向代理传递的真实 IP client_ip = request.headers.get('X-Real-IP') if not client_ip: client_ip = request.remote_addr user_session = UserSession( user_id=user.id, session_token=session_token, ip_address=client_ip, user_agent=request.user_agent.string ) db.session.add(user_session) db.session.commit() session['user_session_token'] = session_token if merge_data: guest_id = request.cookies.get('guest_id') if guest_id: guest_user = User.query.filter_by(guest_cookie_id=guest_id, is_guest=True).first() if guest_user: from app.routes.main_routes import get_quota_info guest_files = PrintFile.query.filter_by(user_id=guest_user.id).all() stl_quota, stl_used = get_quota_info(user, 'stl') gcode_quota, gcode_used = get_quota_info(user, 'gcode') stl_quota_bytes = stl_quota * 1024 * 1024 if stl_quota > 0 else float('inf') gcode_quota_bytes = gcode_quota * 1024 * 1024 if gcode_quota > 0 else float('inf') from app.routes.admin_routes import get_gcode_dir upload_dir = current_app.config.get('UPLOAD_FOLDER', 'uploads') gcode_dir = get_gcode_dir() for pf in guest_files: file_size = 0 file_type = 'stl' is_external_gcode = pf.original_filename.lower().endswith(('.gcode', '.gco', '.g')) if is_external_gcode or pf.status == 'sliced': file_type = 'gcode' g_filename = pf.filename.rsplit('.', 1)[0] + '.gcode' path = os.path.join(gcode_dir, g_filename) if os.path.exists(path): file_size = os.path.getsize(path) else: p2 = os.path.join(upload_dir, g_filename) if os.path.exists(p2): file_size = os.path.getsize(p2) else: path = os.path.join(upload_dir, pf.filename) if os.path.exists(path): file_size = os.path.getsize(path) # Check quota can_merge = True if not user.is_admin: if file_type == 'stl' and (stl_used + file_size > stl_quota_bytes): can_merge = False elif file_type == 'gcode' and (gcode_used + file_size > gcode_quota_bytes): can_merge = False if can_merge: pf.user_id = user.id if file_type == 'stl': stl_used += file_size else: gcode_used += file_size else: # delete from disk to prevent orphans stl_path = os.path.join(upload_dir, pf.filename) proxy_path = stl_path + '.proxy.stl' gcode_filename = pf.filename.rsplit('.', 1)[0] + '.gcode' gp = os.path.join(gcode_dir, gcode_filename) fp = os.path.join(upload_dir, gcode_filename) if os.path.exists(stl_path): os.remove(stl_path) if os.path.exists(proxy_path): os.remove(proxy_path) if os.path.exists(gp): os.remove(gp) if os.path.exists(fp): os.remove(fp) db.session.delete(pf) # Save changes to files first so SQLAlchemy doesn't try to nullify related keys db.session.commit() # Delete guest user after merge db.session.delete(guest_user) db.session.commit() response = make_response(redirect(url_for('main.index'))) if merge_data: response.delete_cookie('guest_id') return response flash('Invalid username or password', 'danger') return render_template('auth/login.html') @auth_bp.route('/logout') @login_required def logout(): session_token = session.get('user_session_token') if session_token: from app.models import UserSession user_session = UserSession.query.filter_by(session_token=session_token).first() if user_session: user_session.is_active = False db.session.commit() logout_user() session.pop('user_session_token', None) response = make_response(redirect(url_for('main.index'))) response.delete_cookie('guest_id') # Optionally clear guest cookie return response # --- Admin Routes ---