Files
AIO_3D_Print_Web_Platform/app/utils/api_handle.py

133 lines
4.6 KiB
Python

import functools
from flask import Blueprint, request, jsonify
from app.models import ApiKey, PrintFile, SystemConfig
from app.utils.octoprint_client import OctoPrintClient
api_bp = Blueprint('api_handle', __name__, url_prefix='/api/v1')
def get_octo_client():
url = SystemConfig.query.filter_by(key='octoprint_url').first()
apikey = SystemConfig.query.filter_by(key='octoprint_apikey').first()
if url and url.value and apikey and apikey.value:
return OctoPrintClient(url.value, apikey.value)
return None
def _enrich_job_data(job_data):
if job_data and job_data.get('job', {}).get('file', {}).get('name'):
internal_name = job_data['job']['file']['name']
internal_stl_name = str(internal_name)[:-5]+"stl"
pf = PrintFile.query.filter_by(filename=internal_stl_name).first()
if pf:
job_data['job']['file']['display_name'] = pf.original_filename
else:
job_data['job']['file']['display_name'] = internal_name
return job_data
def require_api_key(f):
@functools.wraps(f)
def decorated(*args, **kwargs):
api_key_header = request.headers.get('X-Api-Key')
if not api_key_header:
return jsonify({'error': 'Missing API Key in headers (X-Api-Key)'}), 401
key_record = ApiKey.query.filter_by(key=api_key_header).first()
if not key_record:
return jsonify({'error': 'Invalid API Key'}), 401
return f(*args, **kwargs)
return decorated
@api_bp.route('/status', methods=['GET'])
@require_api_key
def get_status():
test_data = {
'job': {
'job': {
'estimatedPrintTime': 1234,
'filament': {'length': 765, 'volume': 24356},
'file': {'display_name': 'Test File','date': None, 'name': '20260414135441_42bff5215c6148b8b5f4d8c4f15d5ddc.gcode', 'origin': 'local', 'path': None, 'size': 1468987},
'lastPrintTime': None,
'user': None
},
'progress': {
'completion': 74.8,
'filepos': 1234,
'printTime': 1235,
'printTimeLeft': 6353,
'printTimeLeftOrigin': 5366
},
'state': 'Operational'
},
'status': {
'sd': {'ready': False},
'state': {
'error': '',
'flags': {
'cancelling': False,
'closedOrError': False,
'error': False,
'finishing': False,
'operational': True,
'paused': False,
'pausing': False,
'printing': False,
'ready': True,
'resuming': False,
'sdReady': False
},
'text': 'Operational test'
},
'temperature': {
'bed': {'actual': 85, 'offset': 0, 'target': 56},
'tool0': {'actual': 0.0, 'offset': 0, 'target': 340}
}
}
}
return jsonify(test_data)
# client = get_octo_client()
# if not client:
# return jsonify({'error': 'Printer not configured'}), 503
# try:
# status_data = client.get_printer_status()
# job_data = client.get_job_info()
# job_data = _enrich_job_data(job_data)
# return jsonify({'status': status_data, 'job': job_data})
# except Exception as e:
# return jsonify({'error': str(e)}), 500
@api_bp.route('/octoprint_client', methods=['POST'])
@require_api_key
def invoke_octoprint_client():
"""
Expects JSON payload like:
{
"method": "pause_print",
"kwargs": {"action": "pause"}
}
"""
client = get_octo_client()
if not client:
return jsonify({'error': 'Printer not configured'}), 503
data = request.get_json()
if not data or 'method' not in data:
return jsonify({'error': 'Missing method in JSON payload'}), 400
method_name = data['method']
kwargs = data.get('kwargs', {})
args = data.get('args', [])
if not hasattr(client, method_name):
return jsonify({'error': f'Method {method_name} not found on OctoPrintClient'}), 400
func = getattr(client, method_name)
if not callable(func) or method_name.startswith('_'):
return jsonify({'error': f'Method {method_name} is not allowed'}), 403
try:
result = func(*args, **kwargs)
return jsonify({'success': True, 'result': result})
except Exception as e:
return jsonify({'error': str(e)}), 500