import json import uuid import os import configparser import secrets from datetime import datetime from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, session, make_response, send_file, abort, jsonify from flask_login import login_user, logout_user, login_required, current_user from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename from app.models import db, User, PrintFile, SystemConfig, ApiKey from app.utils.tasks import merge_and_slice_task, slice_stl_task, simplify_stl_task from app import i18n_dict from app.utils.stl_simplifier import simplify_stl from app.utils.slice_engines import get_all_engines main_bp = Blueprint('main', __name__) auth_bp = Blueprint('auth', __name__, url_prefix='/auth') admin_bp = Blueprint('admin', __name__, url_prefix='/admin') def get_gcode_dir(): conf = SystemConfig.query.filter_by(key='gcode_upload_folder').first() if conf and conf.value and os.path.exists(conf.value): return conf.value return current_app.config['UPLOAD_FOLDER'] # Guest User Middleware @admin_bp.before_request def require_admin(): if not current_user.is_authenticated or not current_user.is_admin: flash('Admin access required', 'danger') return redirect(url_for('main.index')) @admin_bp.route('/settings', methods=['GET', 'POST']) def settings(): if request.method == 'POST': # concurrent_slices = request.form.get('concurrent_slices') offset_x = request.form.get('offset_x', '0') offset_y = request.form.get('offset_y', '0') proxy_skip_size_mb = request.form.get('proxy_skip_size_mb', '5.0') default_infill = request.form.get('default_infill', '20') default_support = request.form.get('default_support', 'false') default_support_pattern = request.form.get('default_support_pattern', 'tree') default_quality = request.form.get('default_quality', 'base_global_standard.inst.cfg') default_material = request.form.get('default_material', '') default_printer = request.form.get('default_printer', '') gcode_upload_folder = request.form.get('gcode_upload_folder', '').strip() slicer_engine = request.form.get('slicer_engine', 'cura') build_plate_model_path = request.form.get('build_plate_model_path', '').strip() # update or create config entries config_items = [ ('offset_x', offset_x), ('offset_y', offset_y), ('proxy_skip_size_mb', proxy_skip_size_mb), ('default_infill', default_infill), ('default_support', default_support), ('default_support_pattern', default_support_pattern), ('default_quality', default_quality), ('default_material', default_material), ('default_printer', default_printer), ('gcode_upload_folder', gcode_upload_folder), ('slicer_engine', slicer_engine), ('build_plate_model_path', build_plate_model_path), ('default_guest_stl_quota_mb', request.form.get('default_guest_stl_quota_mb', '0')), ('default_guest_gcode_quota_mb', request.form.get('default_guest_gcode_quota_mb', '0')), ('default_user_stl_quota_mb', request.form.get('default_user_stl_quota_mb', '0')), ('default_user_gcode_quota_mb', request.form.get('default_user_gcode_quota_mb', '0')) ] for key, val in config_items: conf = SystemConfig.query.filter_by(key=key).first() if not conf: conf = SystemConfig(key=key) db.session.add(conf) conf.value = val db.session.commit() if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'success': True}), 200 flash('Settings updated successfully', 'success') return redirect(url_for('admin.settings')) configs = {c.key: c.value for c in SystemConfig.query.all()} engines = get_all_engines() return render_template('admin/settings.html', configs=configs, engines=engines) @admin_bp.route('/users') def users(): all_users = User.query.order_by(User.created_at.desc()).all() user_quotas = {} # Load defaults def_guest_stl = SystemConfig.query.filter_by(key="default_guest_stl_quota_mb").first() def_guest_stl_val = def_guest_stl.value if def_guest_stl else '0' def_guest_gcode = SystemConfig.query.filter_by(key="default_guest_gcode_quota_mb").first() def_guest_gcode_val = def_guest_gcode.value if def_guest_gcode else '0' def_user_stl = SystemConfig.query.filter_by(key="default_user_stl_quota_mb").first() def_user_stl_val = def_user_stl.value if def_user_stl else '0' def_user_gcode = SystemConfig.query.filter_by(key="default_user_gcode_quota_mb").first() def_user_gcode_val = def_user_gcode.value if def_user_gcode else '0' for u in all_users: if u.is_admin: eff_stl = '0' eff_gcode = '0' elif u.is_guest: eff_stl = def_guest_stl_val eff_gcode = def_guest_gcode_val else: eff_stl = def_user_stl_val eff_gcode = def_user_gcode_val sq = SystemConfig.query.filter_by(key=f"user_{u.id}_stl_quota_mb").first() gq = SystemConfig.query.filter_by(key=f"user_{u.id}_gcode_quota_mb").first() user_stl = sq.value if sq else '0' user_gcode = gq.value if gq else '0' user_quotas[u.id] = { 'stl': user_stl, 'gcode': user_gcode, 'eff_stl': eff_stl if user_stl == '0' else user_stl, 'eff_gcode': eff_gcode if user_gcode == '0' else user_gcode, } return render_template('admin/users.html', users=all_users, user_quotas=user_quotas) @admin_bp.route('/user/add', methods=['POST']) def add_user(): username = request.form.get('username') password = request.form.get('password') is_admin = request.form.get('is_admin') == 'on' if User.query.filter_by(username=username).first(): flash("Username already exists", "danger") return redirect(url_for('admin.users')) u = User(username=username, is_guest=False, is_admin=is_admin) u.password_hash = generate_password_hash(password) db.session.add(u) db.session.commit() # Save quotas stl_quota = request.form.get('stl_quota_mb', '0') gcode_quota = request.form.get('gcode_quota_mb', '0') if stl_quota != '0': db.session.add(SystemConfig(key=f"user_{u.id}_stl_quota_mb", value=stl_quota)) if gcode_quota != '0': db.session.add(SystemConfig(key=f"user_{u.id}_gcode_quota_mb", value=gcode_quota)) db.session.commit() flash("User created.", "success") return redirect(url_for('admin.users')) @admin_bp.route('/user//quota', methods=['POST']) def update_quota(user_id): stl_quota = request.form.get('stl_quota_mb', '0') gcode_quota = request.form.get('gcode_quota_mb', '0') def set_conf(k, v): c = SystemConfig.query.filter_by(key=k).first() if not c: c = SystemConfig(key=k) db.session.add(c) c.value = str(v) set_conf(f"user_{user_id}_stl_quota_mb", stl_quota) set_conf(f"user_{user_id}_gcode_quota_mb", gcode_quota) db.session.commit() flash("Quotas updated.", "success") return redirect(url_for('admin.users')) @admin_bp.route('/user//password', methods=['POST']) def reset_password(user_id): user = User.query.get_or_404(user_id) if user.is_guest: flash("Cannot set password for guests.", "danger") return redirect(url_for('admin.users')) pwd = request.form.get('password') user.password_hash = generate_password_hash(pwd) db.session.commit() flash("Password updated.", "success") return redirect(url_for('admin.users')) @admin_bp.route('/user//delete', methods=['POST']) def delete_user(user_id): user = User.query.get_or_404(user_id) if user.id == current_user.id: flash('You cannot delete yourself.', 'danger') return redirect(url_for('admin.users')) print_files = PrintFile.query.filter_by(user_id=user.id).all() for print_file in print_files: stl_path = os.path.join(current_app.config['UPLOAD_FOLDER'], print_file.filename) gcode_filename = print_file.filename.rsplit('.', 1)[0] + '.gcode' gcode_path = os.path.join(get_gcode_dir(), gcode_filename) proxy_path = stl_path + '.proxy.stl' if os.path.exists(stl_path): try: os.remove(stl_path) except: pass if os.path.exists(proxy_path): try: os.remove(proxy_path) except: pass if os.path.exists(gcode_path): try: os.remove(gcode_path) except: pass db.session.delete(print_file) db.session.delete(user) db.session.commit() flash(f'User {user.username} and all their files have been deleted.', 'success') return redirect(url_for('admin.users')) @admin_bp.route('/api_keys') def api_keys(): keys = ApiKey.query.order_by(ApiKey.created_at.desc()).all() return render_template('admin/api_keys.html', keys=keys) @admin_bp.route('/api_key/add', methods=['POST']) def add_api_key(): name = request.form.get('name') if not name: flash("Name is required", "danger") return redirect(url_for('admin.api_keys')) key_value = secrets.token_urlsafe(32) new_key = ApiKey(name=name, key=key_value) db.session.add(new_key) db.session.commit() flash(f"API Key '{name}' created.", "success") return redirect(url_for('admin.api_keys')) @admin_bp.route('/api_key//delete', methods=['POST']) def delete_api_key(key_id): key = ApiKey.query.get_or_404(key_id) db.session.delete(key) db.session.commit() flash(f'API Key {key.name} deleted.', 'success') return redirect(url_for('admin.api_keys'))