225 lines
8.9 KiB
Python
225 lines
8.9 KiB
Python
import json
|
|
import trimesh
|
|
import uuid
|
|
import os
|
|
import configparser
|
|
from datetime import datetime
|
|
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, session, make_response, send_file, abort, jsonify
|
|
from flask_login import login_user, logout_user, login_required, current_user
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from werkzeug.utils import secure_filename
|
|
from app.models import db, User, PrintFile, SystemConfig
|
|
from app.utils.tasks import merge_and_slice_task, slice_stl_task, simplify_stl_task
|
|
from app import i18n_dict
|
|
# import trimesh.repair
|
|
from app.utils.stl_simplifier import simplify_stl
|
|
from app.utils.slice_engines import get_all_engines
|
|
|
|
|
|
main_bp = Blueprint('main', __name__)
|
|
|
|
|
|
auth_bp = Blueprint('auth', __name__, url_prefix='/auth')
|
|
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
|
|
|
|
def get_gcode_dir():
|
|
conf = SystemConfig.query.filter_by(key='gcode_upload_folder').first()
|
|
if conf and conf.value and os.path.exists(conf.value):
|
|
return conf.value
|
|
return current_app.config['UPLOAD_FOLDER']
|
|
|
|
# Guest User Middleware
|
|
@admin_bp.before_request
|
|
def require_admin():
|
|
if not current_user.is_authenticated or not current_user.is_admin:
|
|
flash('Admin access required', 'danger')
|
|
return redirect(url_for('main.index'))
|
|
|
|
@admin_bp.route('/settings', methods=['GET', 'POST'])
|
|
def settings():
|
|
if request.method == 'POST':
|
|
# concurrent_slices = request.form.get('concurrent_slices')
|
|
offset_x = request.form.get('offset_x', '0')
|
|
offset_y = request.form.get('offset_y', '0')
|
|
proxy_skip_size_mb = request.form.get('proxy_skip_size_mb', '5.0')
|
|
default_infill = request.form.get('default_infill', '20')
|
|
default_support = request.form.get('default_support', 'false')
|
|
default_support_pattern = request.form.get('default_support_pattern', 'tree')
|
|
default_quality = request.form.get('default_quality', 'base_global_standard.inst.cfg')
|
|
default_material = request.form.get('default_material', '')
|
|
default_printer = request.form.get('default_printer', '')
|
|
gcode_upload_folder = request.form.get('gcode_upload_folder', '').strip()
|
|
slicer_engine = request.form.get('slicer_engine', 'cura')
|
|
build_plate_model_path = request.form.get('build_plate_model_path', '').strip()
|
|
|
|
# update or create config entries
|
|
config_items = [
|
|
('offset_x', offset_x),
|
|
('offset_y', offset_y),
|
|
('proxy_skip_size_mb', proxy_skip_size_mb),
|
|
('default_infill', default_infill),
|
|
('default_support', default_support),
|
|
('default_support_pattern', default_support_pattern),
|
|
('default_quality', default_quality),
|
|
('default_material', default_material),
|
|
('default_printer', default_printer),
|
|
('gcode_upload_folder', gcode_upload_folder),
|
|
('slicer_engine', slicer_engine),
|
|
('build_plate_model_path', build_plate_model_path),
|
|
('default_guest_stl_quota_mb', request.form.get('default_guest_stl_quota_mb', '0')),
|
|
('default_guest_gcode_quota_mb', request.form.get('default_guest_gcode_quota_mb', '0')),
|
|
('default_user_stl_quota_mb', request.form.get('default_user_stl_quota_mb', '0')),
|
|
('default_user_gcode_quota_mb', request.form.get('default_user_gcode_quota_mb', '0'))
|
|
]
|
|
for key, val in config_items:
|
|
conf = SystemConfig.query.filter_by(key=key).first()
|
|
if not conf:
|
|
conf = SystemConfig(key=key)
|
|
db.session.add(conf)
|
|
conf.value = val
|
|
db.session.commit()
|
|
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
|
|
return jsonify({'success': True}), 200
|
|
flash('Settings updated successfully', 'success')
|
|
return redirect(url_for('admin.settings'))
|
|
|
|
configs = {c.key: c.value for c in SystemConfig.query.all()}
|
|
engines = get_all_engines()
|
|
return render_template('admin/settings.html', configs=configs, engines=engines)
|
|
|
|
@admin_bp.route('/users')
|
|
def users():
|
|
all_users = User.query.order_by(User.created_at.desc()).all()
|
|
user_quotas = {}
|
|
for u in all_users:
|
|
sq = SystemConfig.query.filter_by(key=f"user_{u.id}_stl_quota_mb").first()
|
|
gq = SystemConfig.query.filter_by(key=f"user_{u.id}_gcode_quota_mb").first()
|
|
user_quotas[u.id] = {
|
|
'stl': sq.value if sq else '0',
|
|
'gcode': gq.value if gq else '0'
|
|
}
|
|
return render_template('admin/users.html', users=all_users, user_quotas=user_quotas)
|
|
|
|
@admin_bp.route('/user/add', methods=['POST'])
|
|
def add_user():
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
is_admin = request.form.get('is_admin') == 'on'
|
|
|
|
if User.query.filter_by(username=username).first():
|
|
flash("Username already exists", "danger")
|
|
return redirect(url_for('admin.users'))
|
|
|
|
u = User(username=username, is_guest=False, is_admin=is_admin)
|
|
u.password_hash = generate_password_hash(password)
|
|
db.session.add(u)
|
|
db.session.commit()
|
|
|
|
# Save quotas
|
|
stl_quota = request.form.get('stl_quota_mb', '0')
|
|
gcode_quota = request.form.get('gcode_quota_mb', '0')
|
|
if stl_quota != '0':
|
|
db.session.add(SystemConfig(key=f"user_{u.id}_stl_quota_mb", value=stl_quota))
|
|
if gcode_quota != '0':
|
|
db.session.add(SystemConfig(key=f"user_{u.id}_gcode_quota_mb", value=gcode_quota))
|
|
db.session.commit()
|
|
|
|
flash("User created.", "success")
|
|
return redirect(url_for('admin.users'))
|
|
|
|
@admin_bp.route('/user/<int:user_id>/quota', methods=['POST'])
|
|
def update_quota(user_id):
|
|
stl_quota = request.form.get('stl_quota_mb', '0')
|
|
gcode_quota = request.form.get('gcode_quota_mb', '0')
|
|
|
|
def set_conf(k, v):
|
|
c = SystemConfig.query.filter_by(key=k).first()
|
|
if not c:
|
|
c = SystemConfig(key=k)
|
|
db.session.add(c)
|
|
c.value = str(v)
|
|
|
|
set_conf(f"user_{user_id}_stl_quota_mb", stl_quota)
|
|
set_conf(f"user_{user_id}_gcode_quota_mb", gcode_quota)
|
|
db.session.commit()
|
|
flash("Quotas updated.", "success")
|
|
return redirect(url_for('admin.users'))
|
|
|
|
@admin_bp.route('/user/<int:user_id>/password', methods=['POST'])
|
|
def reset_password(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
if user.is_guest:
|
|
flash("Cannot set password for guests.", "danger")
|
|
return redirect(url_for('admin.users'))
|
|
|
|
pwd = request.form.get('password')
|
|
user.password_hash = generate_password_hash(pwd)
|
|
db.session.commit()
|
|
flash("Password updated.", "success")
|
|
return redirect(url_for('admin.users'))
|
|
|
|
@admin_bp.route('/user/<int:user_id>/delete', methods=['POST'])
|
|
def delete_user(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
if user.id == current_user.id:
|
|
flash('You cannot delete yourself.', 'danger')
|
|
return redirect(url_for('admin.users'))
|
|
|
|
print_files = PrintFile.query.filter_by(user_id=user.id).all()
|
|
for print_file in print_files:
|
|
stl_path = os.path.join(current_app.config['UPLOAD_FOLDER'], print_file.filename)
|
|
gcode_filename = print_file.filename.rsplit('.', 1)[0] + '.gcode'
|
|
gcode_path = os.path.join(get_gcode_dir(), gcode_filename)
|
|
proxy_path = stl_path + '.proxy.stl'
|
|
|
|
if os.path.exists(stl_path):
|
|
try: os.remove(stl_path)
|
|
except: pass
|
|
if os.path.exists(proxy_path):
|
|
try: os.remove(proxy_path)
|
|
except: pass
|
|
if os.path.exists(gcode_path):
|
|
try: os.remove(gcode_path)
|
|
except: pass
|
|
|
|
db.session.delete(print_file)
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
flash(f'User {user.username} and all their files have been deleted.', 'success')
|
|
return redirect(url_for('admin.users'))
|
|
|
|
|
|
|
|
|
|
@admin_bp.route('/api_keys')
|
|
def api_keys():
|
|
from app.models import ApiKey
|
|
keys = ApiKey.query.order_by(ApiKey.created_at.desc()).all()
|
|
return render_template('admin/api_keys.html', keys=keys)
|
|
|
|
@admin_bp.route('/api_key/add', methods=['POST'])
|
|
def add_api_key():
|
|
from app.models import ApiKey
|
|
import secrets
|
|
name = request.form.get('name')
|
|
if not name:
|
|
flash("Name is required", "danger")
|
|
return redirect(url_for('admin.api_keys'))
|
|
|
|
key_value = secrets.token_urlsafe(32)
|
|
new_key = ApiKey(name=name, key=key_value)
|
|
db.session.add(new_key)
|
|
db.session.commit()
|
|
flash(f"API Key '{name}' created.", "success")
|
|
return redirect(url_for('admin.api_keys'))
|
|
|
|
@admin_bp.route('/api_key/<int:key_id>/delete', methods=['POST'])
|
|
def delete_api_key(key_id):
|
|
from app.models import ApiKey
|
|
key = ApiKey.query.get_or_404(key_id)
|
|
db.session.delete(key)
|
|
db.session.commit()
|
|
flash(f'API Key {key.name} deleted.', 'success')
|
|
return redirect(url_for('admin.api_keys'))
|