Files
AIO_3D_Print_Web_Platform/app/routes/main_routes.py

635 lines
27 KiB
Python

import json
import uuid
import os
import configparser
from datetime import datetime
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app, session, make_response, send_file, abort, jsonify
from flask_login import login_user, logout_user, login_required, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
from app.models import db, User, PrintFile, SystemConfig, UserSession
from app.utils.tasks import merge_and_slice_task, slice_stl_task, simplify_stl_task
from app import i18n_dict
from app.utils.stl_simplifier import simplify_stl
from app.routes.admin_routes import get_gcode_dir
from app.utils.slice_engines import get_slicer_engine
from app.utils.gcode_parser import get_gcode_metadata
main_bp = Blueprint('main', __name__)
@main_bp.before_app_request
def check_user_session():
if current_user.is_authenticated and not current_user.is_guest:
session_token = session.get('user_session_token')
if session_token:
user_session = UserSession.query.filter_by(session_token=session_token).first()
if not user_session or not user_session.is_active:
logout_user()
session.pop('user_session_token', None)
flash('Your session has been terminated.', 'warning')
return redirect(url_for('auth.login'))
else:
user_session.last_active = datetime.utcnow()
db.session.commit()
auth_bp = Blueprint('auth', __name__, url_prefix='/auth')
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
def get_quota_info(user, file_type):
# Returns (quota_mb, current_size_bytes)
if user.is_admin:
quota_mb = 0.0
else:
conf = SystemConfig.query.filter_by(key=f"user_{user.id}_{file_type}_quota_mb").first()
quota_mb = float(conf.value) if conf else 0.0
if quota_mb == 0.0:
if user.is_guest:
def_conf = SystemConfig.query.filter_by(key=f"default_guest_{file_type}_quota_mb").first()
else:
def_conf = SystemConfig.query.filter_by(key=f"default_user_{file_type}_quota_mb").first()
quota_mb = float(def_conf.value) if def_conf else 0.0
user_files = PrintFile.query.filter_by(user_id=user.id).all()
current_size = 0
upload_dir = current_app.config.get('UPLOAD_FOLDER', 'uploads')
gcode_dir = get_gcode_dir()
for pf in user_files:
if file_type == 'stl' and not pf.original_filename.lower().endswith(('.gcode', '.gco', '.g')):
path = os.path.join(upload_dir, pf.filename)
if os.path.exists(path):
current_size += os.path.getsize(path)
elif file_type == 'gcode':
g_filename = pf.filename.rsplit('.', 1)[0] + '.gcode'
path = os.path.join(gcode_dir, g_filename)
if os.path.exists(path):
current_size += os.path.getsize(path)
else:
p2 = os.path.join(upload_dir, g_filename)
if os.path.exists(p2): current_size += os.path.getsize(p2)
return quota_mb, current_size
def check_quota(user, file_type, size_bytes):
if user.is_admin:
return True
quota_mb, current_size = get_quota_info(user, file_type)
if quota_mb <= 0.0:
return True
if current_size + size_bytes > quota_mb * 1024 * 1024:
return False
return True
# Guest User Middleware
@main_bp.before_app_request
def assign_guest_cookie():
if request.path.startswith('/api/'):
return
if not current_user.is_authenticated:
guest_id = request.cookies.get('guest_id')
if not guest_id:
guest_id = str(uuid.uuid4())
user = User(username=f'guest_{guest_id[:8]}', is_guest=True, guest_cookie_id=guest_id)
db.session.add(user)
db.session.commit()
login_user(user)
# We will set the cookie in the response after request, see below
request.guest_id_to_set = guest_id
else:
user = User.query.filter_by(guest_cookie_id=guest_id).first()
if user:
login_user(user)
@main_bp.after_app_request
def set_guest_cookie(response):
if hasattr(request, 'guest_id_to_set'):
response.set_cookie('guest_id', request.guest_id_to_set, max_age=60*60*24*365) # 1 year
return response
# --- Main Routes ---
@main_bp.route('/')
@login_required
def index():
user_files = PrintFile.query.filter((PrintFile.user_id == current_user.id) | (current_user.is_admin)).all() if current_user.is_admin else PrintFile.query.filter_by(user_id=current_user.id).all()
stl_count = 0
stl_size = 0
gcode_count = 0
gcode_size = 0
upload_dir = current_app.config.get('UPLOAD_FOLDER', 'uploads')
gcode_dir = get_gcode_dir()
for f in user_files:
is_external_gcode = f.original_filename.lower().endswith(('.gcode', '.gco', '.g'))
if is_external_gcode:
gcode_count += 1
gcode_path = os.path.join(gcode_dir, f.filename.replace('.stl', '.gcode'))
if os.path.exists(gcode_path):
gcode_size += os.path.getsize(gcode_path)
else:
stl_count += 1
stl_path = os.path.join(upload_dir, f.filename)
if os.path.exists(stl_path):
stl_size += os.path.getsize(stl_path)
if f.status == 'sliced':
gcode_count += 1
gcode_filename = f.filename.rsplit('.', 1)[0] + '.gcode'
gcode_path = os.path.join(gcode_dir, gcode_filename)
if os.path.exists(gcode_path):
gcode_size += os.path.getsize(gcode_path)
else:
gcode_fallback = os.path.join(upload_dir, gcode_filename)
if os.path.exists(gcode_fallback):
gcode_size += os.path.getsize(gcode_fallback)
def format_size(size_bytes):
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.2f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.2f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB"
stl_quota_mb, stl_used_bytes = get_quota_info(current_user, 'stl')
gcode_quota_mb, gcode_used_bytes = get_quota_info(current_user, 'gcode')
return render_template('slice/index.html',
stl_count=stl_count,
stl_size_str=format_size(stl_size),
gcode_count=gcode_count,
gcode_size_str=format_size(gcode_size),
stl_used_bytes=stl_used_bytes,
stl_quota_mb=stl_quota_mb,
gcode_used_bytes=gcode_used_bytes,
gcode_quota_mb=gcode_quota_mb,
format_size=format_size
)
@main_bp.route('/set_language/<lang>')
def set_language(lang):
if lang not in i18n_dict:
lang = 'en'
# return to previous page
response = make_response(redirect(request.referrer or url_for('main.index')))
response.set_cookie('lang', lang, max_age=60*60*24*365)
return response
@main_bp.route('/files', methods=['GET', 'POST'])
@login_required
def files():
if request.method == 'POST':
if 'file' not in request.files:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'No file part'}), 400
flash('No file part', 'danger')
return redirect(request.url)
uploaded_files = request.files.getlist('file')
success_count = 0
for file in uploaded_files:
if file.filename == '':
continue
if file and file.filename.lower().endswith('.stl'):
file.seek(0, os.SEEK_END)
size_bytes = file.tell()
file.seek(0, os.SEEK_SET)
if not check_quota(current_user, 'stl', size_bytes):
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'STL Storage Quota Exceeded'}), 400
flash('STL Storage Quota Exceeded', 'danger')
continue
original_filename = file.filename # Do not use secure_filename to keep Chinese characters
ext = os.path.splitext(original_filename)[1].lower()
if not ext:
ext = '.stl'
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique_filename = f"{timestamp}_{uuid.uuid4().hex}_{success_count}{ext}"
filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], unique_filename)
file.save(filepath)
print_file = PrintFile(
filename=unique_filename,
original_filename=original_filename,
file_type='stl',
user_id=current_user.id,
status='simplifying' # Set to simplifying while proxy is generated
)
db.session.add(print_file)
db.session.commit()
# Start background simplification
simplify_stl_task(print_file.id, filepath)
success_count += 1
if success_count > 0:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': True, 'count': success_count})
flash(f'{success_count} file(s) uploaded successfully!', 'success')
else:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'error': 'No valid files uploaded'}), 400
flash('No valid files uploaded', 'danger')
return redirect(url_for('main.files'))
# Order by newest first
user_files = PrintFile.query.filter_by(user_id=current_user.id, file_type='stl').order_by(PrintFile.created_at.desc()).all()
return render_template('slice/files.html', files=user_files)
@main_bp.route('/api/files_status')
@login_required
def files_status():
files = PrintFile.query.filter_by(user_id=current_user.id).all()
return jsonify({str(f.id): f.status for f in files})
@main_bp.route('/download/<int:file_id>')
@login_required
def download_gcode(file_id):
print_file = PrintFile.query.get_or_404(file_id)
if print_file.user_id != current_user.id and not current_user.is_admin:
abort(403)
if print_file.status != 'sliced':
flash('File is not ready yet.', 'warning')
return redirect(url_for('main.files'))
gcode_filename = print_file.filename.rsplit('.', 1)[0] + '.gcode'
conf = SystemConfig.query.filter_by(key='gcode_upload_folder').first()
gcode_dir = conf.value if (conf and conf.value and os.path.exists(conf.value)) else current_app.config['UPLOAD_FOLDER']
filepath = os.path.join(gcode_dir, gcode_filename)
if not os.path.exists(filepath):
fallback = os.path.join(current_app.config['UPLOAD_FOLDER'], gcode_filename)
if os.path.exists(fallback): filepath = fallback
if os.path.exists(filepath):
safe_name = print_file.original_filename.rsplit('.', 1)[0] + '.gcode'
return send_file(filepath, as_attachment=True, download_name=safe_name)
flash('GCode file not found. It might have been deleted.', 'danger')
return redirect(url_for('main.files'))
@main_bp.route('/preview_gcode/<int:file_id>')
@login_required
def preview_gcode(file_id):
print_file = PrintFile.query.get_or_404(file_id)
if print_file.user_id != current_user.id and not current_user.is_admin:
abort(403)
gcode_filename = print_file.filename.rsplit('.', 1)[0] + '.gcode'
conf = SystemConfig.query.filter_by(key='gcode_upload_folder').first()
gcode_dir = conf.value if (conf and conf.value and os.path.exists(conf.value)) else current_app.config['UPLOAD_FOLDER']
filepath = os.path.join(gcode_dir, gcode_filename)
if not os.path.exists(filepath):
fallback = os.path.join(current_app.config['UPLOAD_FOLDER'], gcode_filename)
if os.path.exists(fallback): filepath = fallback
content = "File not found or not ready."
line_count = 0
time_info = "-"
layer1_time = "-"
filament_used = "-"
if os.path.exists(filepath):
metadata = get_gcode_metadata(filepath)
time_info = metadata.get('print_time', '-')
layer1_time = metadata.get('first_layer_time', '-')
filament_used = metadata.get('filament_used', '-')
with open(filepath, 'r') as f:
lines = f.readlines()
line_count = len(lines)
content = "".join(lines[:500]) # Preview first 500 lines
if line_count > 500:
content += f"\n... \n[Preview truncated. Total lines: {line_count}. Please download to view full file.]"
engine_name = SystemConfig.query.filter_by(key='slicer_engine').first()
if engine_name:
engine = get_slicer_engine(str(engine_name.value), current_app.config['PRINT_CONFIG_FOLDER'])
w, h, hd = engine.get_bed_dimensions()
configs = {c.key: c.value for c in SystemConfig.query.all()}
offset_x = float(configs.get('offset_x', '0.0'))
offset_y = float(configs.get('offset_y', '0.0'))
return render_template('slice/gcode_preview.html', file=print_file, content=content, line_count=line_count, machine_width=w, machine_depth=h, machine_height=hd, offset_x=offset_x, offset_y=offset_y,
time_info=time_info, layer1_time=layer1_time, filament_used=filament_used)
@main_bp.route('/delete_file/<int:file_id>', methods=['POST'])
@login_required
def delete_file(file_id):
print_file = PrintFile.query.get_or_404(file_id)
if print_file.user_id != current_user.id and not current_user.is_admin:
abort(403)
stl_path = os.path.join(current_app.config['UPLOAD_FOLDER'], print_file.filename)
gcode_filename = print_file.filename.rsplit('.', 1)[0] + '.gcode'
conf = SystemConfig.query.filter_by(key='gcode_upload_folder').first()
gcode_dir = conf.value if (conf and conf.value and os.path.exists(conf.value)) else current_app.config['UPLOAD_FOLDER']
gcode_path = os.path.join(gcode_dir, gcode_filename)
fallback_gcode = os.path.join(current_app.config['UPLOAD_FOLDER'], gcode_filename)
proxy_path = stl_path + '.proxy.stl'
if os.path.exists(stl_path):
os.remove(stl_path)
if os.path.exists(proxy_path):
os.remove(proxy_path)
if os.path.exists(gcode_path):
os.remove(gcode_path)
if os.path.exists(fallback_gcode):
os.remove(fallback_gcode)
db.session.delete(print_file)
db.session.commit()
flash(f"Deleted {print_file.original_filename} successfully.", 'success')
return redirect(url_for('main.files'))
# --- Auth Routes ---
@main_bp.route('/plater')
@login_required
def plater():
quota_mb, current_size = get_quota_info(current_user, 'gcode')
quota_exceeded = (quota_mb > 0 and current_size >= quota_mb * 1024 * 1024)
engine_name = SystemConfig.query.filter_by(key='slicer_engine').first()
if engine_name:
engine = get_slicer_engine(str(engine_name.value), current_app.config['PRINT_CONFIG_FOLDER'])
w, h, hd = engine.get_bed_dimensions()
print(f"Bed dimensions: {w}x{h}x{hd}")
configs = {c.key: c.value for c in SystemConfig.query.all()}
offset_x = float(configs.get('offset_x', '0.0'))
offset_y = float(configs.get('offset_y', '0.0'))
default_infill = configs.get('default_infill', '20')
default_support = configs.get('default_support', 'false')
default_support_pattern = configs.get('default_support_pattern', 'tree')
default_quality = configs.get('default_quality', 'base_global_standard.inst.cfg')
default_material = configs.get('default_material', '')
user_files = PrintFile.query.filter_by(user_id=current_user.id, file_type='stl').order_by(PrintFile.created_at.desc()).all()
models = [{'id': f.id, 'name': f.original_filename, 'status': f.status, 'url': url_for('main.serve_proxy_file', file_id=f.id), 'transform_matrix': f.transform_matrix} for f in user_files]
return render_template('slice/plater.html', w=w, h=h, hd=hd, last_quality=default_quality, last_material=default_material, models=models, offset_x=offset_x, offset_y=offset_y, default_infill=default_infill, default_support=default_support, default_support_pattern=default_support_pattern, quota_exceeded=quota_exceeded, configs=configs)
@main_bp.route('/file/<int:file_id>')
@login_required
def serve_file(file_id):
f = PrintFile.query.get_or_404(file_id)
if f.user_id != current_user.id and not current_user.is_admin:
abort(403)
path = os.path.join(current_app.config['UPLOAD_FOLDER'], f.filename)
return send_file(path)
@main_bp.route('/proxy/<int:file_id>')
@login_required
def serve_proxy_file(file_id):
f = PrintFile.query.get_or_404(file_id)
if f.user_id != current_user.id and not current_user.is_admin:
abort(403)
path = os.path.join(current_app.config['UPLOAD_FOLDER'], f.filename)
proxy_path = path + '.proxy.stl'
if os.path.exists(proxy_path):
return send_file(proxy_path)
return send_file(path)
@main_bp.route('/api/merge_and_slice', methods=['POST'])
@login_required
def merge_and_slice():
quota_mb, current_size = get_quota_info(current_user, 'gcode')
if quota_mb > 0 and current_size >= quota_mb * 1024 * 1024:
return jsonify({'success': False, 'error': 'GCode Storage Quota Exceeded. Please delete some files first.'})
data = request.json
pieces = data.get('pieces', [])
quality = data.get('quality', 'base_global_standard.inst.cfg')
material = data.get('material', '')
infill_density = data.get('infill', '20')
support_enable = data.get('support', 'false')
support_pattern = data.get('support_pattern', 'lines')
if not pieces:
return jsonify({'error': 'No pieces provided'}), 400
inputs = []
# Build a combined name
names = []
for p in pieces[:3]: # Cap names at 3 to avoid super long string
f = PrintFile.query.get(p['file_id'])
if f and (f.user_id == current_user.id or current_user.is_admin):
names.append(f.original_filename.replace('.stl', ''))
combined_name = ", ".join(names)
if len(pieces) > 3:
combined_name += "等合并切片"
elif len(pieces) > 1:
combined_name += "合并切片"
else:
combined_name += " 单独切片"
is_edit = data.get('is_edit', False)
for p in pieces:
f = PrintFile.query.get(p['file_id'])
if f and (f.user_id == current_user.id or current_user.is_admin):
path = os.path.join(current_app.config['UPLOAD_FOLDER'], f.filename)
inputs.append((path, p['matrix']))
# 只有在单一编辑模式才修改原模型的矩阵 (如果多模型/新建模式,我们不修改原模型,而是后续记录到新的包含实体上)
if 'raw_matrix' in p and is_edit and len(pieces) == 1:
f.transform_matrix = json.dumps({
"is_composite": False,
"matrix": p['raw_matrix'],
"settings": {
"quality": quality,
"material": material,
"infill": infill_density,
"support": support_enable,
"support_pattern": support_pattern
}
})
db.session.add(f)
db.session.commit()
target_file_id = data.get('target_file_id')
if is_edit and target_file_id:
print_file = PrintFile.query.get(target_file_id)
if not print_file:
return jsonify({'error': 'Original file not found'}), 404
print_file.status = 'merging'
if print_file.transform_matrix and 'is_composite' in print_file.transform_matrix:
composite_data = {
"is_composite": True,
"parts": [],
"settings": {
"quality": quality,
"material": material,
"infill": infill_density,
"support": support_enable,
"support_pattern": support_pattern
}
}
for p in pieces:
pf = PrintFile.query.get(p['file_id'])
if pf:
composite_data['parts'].append({
"file_id": pf.id,
"name": pf.original_filename,
"url": url_for('main.serve_proxy_file', file_id=pf.id),
"raw_matrix": p.get('raw_matrix', p['matrix'])
})
print_file.transform_matrix = json.dumps(composite_data)
elif len(pieces) == 1:
print_file.transform_matrix = json.dumps({
"is_composite": False,
"matrix": pieces[0].get('raw_matrix', pieces[0]['matrix']),
"settings": {
"quality": quality,
"material": material,
"infill": infill_density,
"support": support_enable,
"support_pattern": support_pattern
}
})
db.session.commit()
temp_filename = f"temp_edit_{uuid.uuid4().hex}.stl"
temp_filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], temp_filename)
merge_and_slice_task(target_file_id, inputs, temp_filepath, quality, material, infill_density, support_enable, support_pattern, delete_stl=True)
elif len(inputs) == 1 and is_edit:
target_file_id = pieces[0]['file_id']
print_file = PrintFile.query.get(target_file_id)
if not print_file:
return jsonify({'error': 'Original file not found'}), 404
print_file.status = 'merging'
db.session.commit()
# We still need to apply transforms to a temporary STL to generate correct GCode
temp_filename = f"temp_{uuid.uuid4().hex}.stl"
temp_filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], temp_filename)
merge_and_slice_task(target_file_id, inputs, temp_filepath, quality, material, infill_density, support_enable, support_pattern, delete_stl=True)
else:
# Multiple models, create a new "Merged Slice" PrintFile entry to keep track of combination
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique_filename = f"{timestamp}_{uuid.uuid4().hex}.stl"
merged_filepath = os.path.join(current_app.config['UPLOAD_FOLDER'], unique_filename)
# 构建组合文件元数据树 (is_composite: true)
composite_data = {
"is_composite": True,
"parts": [],
"settings": {
"quality": quality,
"material": material,
"infill": infill_density,
"support": support_enable,
"support_pattern": support_pattern
}
}
for p in pieces:
pf = PrintFile.query.get(p['file_id'])
if pf:
composite_data['parts'].append({
"file_id": pf.id,
"name": pf.original_filename,
"url": url_for('main.serve_proxy_file', file_id=pf.id),
"raw_matrix": p.get('raw_matrix', p['matrix'])
})
print_file = PrintFile(
filename=unique_filename,
original_filename=f"{combined_name}.stl",
file_type='stl',
user_id=current_user.id,
status='merging',
transform_matrix=json.dumps(composite_data)
)
db.session.add(print_file)
db.session.commit()
merge_and_slice_task(print_file.id, inputs, merged_filepath, quality, material, infill_density, support_enable, support_pattern, delete_stl=False)
return jsonify({'success': True, 'message': 'Plater slice queued!'})
@main_bp.route('/api/build_plate_model')
@login_required
def build_plate_model():
conf = SystemConfig.query.filter_by(key='build_plate_model_path').first()
if conf and conf.value and os.path.exists(conf.value):
return send_file(conf.value)
abort(404)
@main_bp.route('/api/engine_options/<engine_name>')
@login_required
def engine_options(engine_name):
engine = get_slicer_engine(engine_name, current_app.config['PRINT_CONFIG_FOLDER'])
presets = engine.get_quality_presets()
patterns = engine.get_support_patterns()
materials = engine.get_materials() if hasattr(engine, 'get_materials') else []
printers = engine.get_all_printers() if hasattr(engine, 'get_all_printers') else []
return jsonify({'presets': presets, 'support_patterns': patterns, 'materials': materials, 'printers': printers})
@main_bp.route('/account', methods=['GET', 'POST'])
@login_required
def account():
if current_user.is_guest:
flash('Guests cannot manage accounts.', 'danger')
return redirect(url_for('main.index'))
if request.method == 'POST':
action = request.form.get('action')
if action == 'change_password':
current_pass = request.form.get('current_password')
new_pass = request.form.get('new_password')
confirm_pass = request.form.get('confirm_password')
if not check_password_hash(current_user.password_hash, current_pass):
flash('Current password is incorrect.', 'danger')
elif new_pass != confirm_pass:
flash('New passwords do not match.', 'danger')
elif len(new_pass) < 6:
flash('New password must be at least 6 characters.', 'danger')
else:
current_user.password_hash = generate_password_hash(new_pass)
db.session.commit()
flash('Password updated successfully.', 'success')
elif action == 'terminate_session':
session_id = request.form.get('session_id')
token_to_terminate = request.form.get('session_token')
my_session_token = session.get('user_session_token')
if token_to_terminate == my_session_token:
flash('You cannot terminate your current session from here. Please logout instead.', 'warning')
else:
us = UserSession.query.filter_by(id=session_id, user_id=current_user.id).first()
if us:
us.is_active = False
db.session.commit()
flash('Session terminated.', 'success')
return redirect(url_for('main.account'))
sessions = UserSession.query.filter_by(user_id=current_user.id, is_active=True).order_by(UserSession.last_active.desc()).all()
current_token = session.get('user_session_token')
return render_template('slice/account.html', sessions=sessions, current_token=current_token)