194 lines
6.8 KiB
Python
194 lines
6.8 KiB
Python
from flask import Blueprint, render_template, request, jsonify, flash, redirect, url_for, current_app, Response
|
|
from flask_login import login_required, current_user
|
|
import requests
|
|
from .models import SystemConfig, db
|
|
from .octoprint_client import OctoPrintClient
|
|
|
|
printer_bp = Blueprint('printer', __name__, url_prefix='/printer')
|
|
|
|
def get_octo_client():
|
|
url = SystemConfig.query.filter_by(key='octoprint_url').first()
|
|
apikey = SystemConfig.query.filter_by(key='octoprint_apikey').first()
|
|
if url and url.value and apikey and apikey.value:
|
|
return OctoPrintClient(url.value, apikey.value)
|
|
return None
|
|
|
|
@printer_bp.route('/status')
|
|
@login_required
|
|
def status():
|
|
client = get_octo_client()
|
|
status_data = None
|
|
job_data = None
|
|
error = None
|
|
if client:
|
|
try:
|
|
status_data = client.get_printer_status()
|
|
job_data = client.get_job_info()
|
|
except Exception as e:
|
|
error = str(e)
|
|
else:
|
|
error = "OctoPrint is not configured."
|
|
|
|
return render_template('printer/status.html', status=status_data, job=job_data, error=error)
|
|
|
|
@printer_bp.route('/prepare')
|
|
@login_required
|
|
def prepare():
|
|
client = get_octo_client()
|
|
files = []
|
|
error = None
|
|
if client:
|
|
try:
|
|
res = client.get_files()
|
|
files = res.get('files', [])
|
|
except Exception as e:
|
|
error = str(e)
|
|
else:
|
|
error = "OctoPrint is not configured."
|
|
return render_template('printer/prepare.html', files=files, error=error)
|
|
|
|
@printer_bp.route('/api/print_file', methods=['POST'])
|
|
@login_required
|
|
def api_print_file():
|
|
path = request.json.get('path')
|
|
location = request.json.get('origin', 'local')
|
|
client = get_octo_client()
|
|
if client and path:
|
|
try:
|
|
client.select_file(location, path, print_after_select=True)
|
|
return jsonify({"success": True})
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)})
|
|
return jsonify({"success": False, "error": "Not configured or missing path"})
|
|
|
|
@printer_bp.route('/control')
|
|
@login_required
|
|
def control():
|
|
client = get_octo_client()
|
|
webcam_url = None
|
|
error = None
|
|
if client:
|
|
try:
|
|
webcam_url = client.get_webcam_stream_url()
|
|
except Exception as e:
|
|
error = str(e)
|
|
else:
|
|
error = "OctoPrint is not configured."
|
|
return render_template('printer/control.html', webcam_url=webcam_url, error=error)
|
|
|
|
@printer_bp.route('/api/command', methods=['POST'])
|
|
@login_required
|
|
def api_command():
|
|
cmd = request.json.get('command')
|
|
client = get_octo_client()
|
|
if client and cmd:
|
|
try:
|
|
if cmd == 'home':
|
|
client.home_axes()
|
|
elif cmd == 'pause':
|
|
client.pause_print()
|
|
elif cmd == 'cancel':
|
|
client.cancel_print()
|
|
return jsonify({"success": True})
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)})
|
|
return jsonify({"success": False, "error": "Invalid client or command"})
|
|
|
|
@printer_bp.route('/octo_config', methods=['GET', 'POST'])
|
|
@login_required
|
|
def octo_config():
|
|
if not current_user.is_admin:
|
|
flash("Admin access required", "danger")
|
|
return redirect(url_for('printer.status'))
|
|
|
|
if request.method == 'POST':
|
|
url = request.form.get('octoprint_url', '')
|
|
apikey = request.form.get('octoprint_apikey', '')
|
|
|
|
conf_url = SystemConfig.query.filter_by(key='octoprint_url').first()
|
|
if not conf_url:
|
|
conf_url = SystemConfig(key='octoprint_url')
|
|
db.session.add(conf_url)
|
|
conf_url.value = url.rstrip('/')
|
|
|
|
conf_key = SystemConfig.query.filter_by(key='octoprint_apikey').first()
|
|
if not conf_key:
|
|
conf_key = SystemConfig(key='octoprint_apikey')
|
|
db.session.add(conf_key)
|
|
conf_key.value = apikey
|
|
|
|
db.session.commit()
|
|
flash("OctoPrint settings updated", "success")
|
|
return redirect(url_for('printer.octo_config'))
|
|
|
|
configs = {c.key: c.value for c in SystemConfig.query.all()}
|
|
return render_template('printer/octo_config.html', configs=configs)
|
|
|
|
@printer_bp.route('/octo_embed')
|
|
@login_required
|
|
def octo_embed():
|
|
if not current_user.is_admin:
|
|
flash("Admin access required", "danger")
|
|
return redirect(url_for('printer.status'))
|
|
|
|
url = SystemConfig.query.filter_by(key='octoprint_url').first()
|
|
embed_url = url_for('printer.octo_proxy') if url and url.value else None
|
|
return render_template('printer/octo_embed.html', embed_url=embed_url)
|
|
|
|
@printer_bp.route('/proxy', defaults={'path': ''}, methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
|
|
@printer_bp.route('/proxy/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
|
|
@login_required
|
|
def octo_proxy(path):
|
|
if not current_user.is_admin:
|
|
return "Unauthorized", 403
|
|
|
|
url_config = SystemConfig.query.filter_by(key='octoprint_url').first()
|
|
if not url_config or not url_config.value:
|
|
return "OctoPrint URL not configured", 404
|
|
|
|
from urllib.parse import urlparse
|
|
base_url = url_config.value.rstrip('/')
|
|
target_url = f"{base_url}/{path}"
|
|
|
|
if request.query_string:
|
|
target_url = f"{target_url}?{request.query_string.decode('utf-8')}"
|
|
|
|
# Build headers for reverse proxy, masking origin/referer to avoid CSRF
|
|
parsed_base = urlparse(base_url)
|
|
headers = {k: v for k, v in request.headers if k.lower() not in ['host', 'content-length']}
|
|
headers['Host'] = parsed_base.netloc
|
|
|
|
if 'Origin' in headers:
|
|
headers['Origin'] = base_url
|
|
if 'Referer' in headers:
|
|
headers['Referer'] = f"{base_url}/{path}"
|
|
|
|
headers['X-Forwarded-For'] = request.remote_addr
|
|
headers['X-Forwarded-Host'] = request.host
|
|
headers['X-Forwarded-Proto'] = request.scheme
|
|
headers['X-Script-Name'] = '/printer/proxy'
|
|
|
|
try:
|
|
resp = requests.request(
|
|
method=request.method,
|
|
url=target_url,
|
|
headers=headers,
|
|
data=request.get_data(),
|
|
cookies=request.cookies,
|
|
allow_redirects=False,
|
|
stream=True
|
|
)
|
|
except requests.exceptions.RequestException as e:
|
|
return f"Proxy connection error: {str(e)}", 502
|
|
|
|
# Strip headers that might break the iframe or framing
|
|
excluded_headers = ['content-encoding', 'content-length', 'transfer-encoding', 'connection', 'x-frame-options', 'content-security-policy']
|
|
response_headers = [(name, value) for (name, value) in resp.headers.items() if name.lower() not in excluded_headers]
|
|
|
|
def generate():
|
|
for chunk in resp.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
yield chunk
|
|
|
|
return Response(generate(), resp.status_code, response_headers)
|