""" 3D Printer STM32 MQTT Web Control Panel ======================================== Broker: mqtt.beihong.wang:1883 Subscribe: /3D/message/put (STM32 responses) Publish: /3D/message (commands to STM32) """ import json import time import threading import queue from flask import Flask, render_template, jsonify, request, Response import paho.mqtt.client as mqtt app = Flask(__name__) # --- MQTT Config --- MQTT_BROKER = "mqtt.beihong.wang" MQTT_PORT = 1883 MQTT_USER = "3D_Stm32" MQTT_PASS = "123456" MQTT_CLIENT_ID = "3D_WebPanel" MQTT_TOPIC_PUB = "/3D/message" MQTT_TOPIC_SUB = "/3D/message/put" # --- Message queue for SSE --- msg_queue = queue.Queue() # --- MQTT connection state --- mqtt_state = {"connected": False} def on_connect(client, userdata, flags, reason_code, properties=None): if reason_code == 0: mqtt_state["connected"] = True client.subscribe(MQTT_TOPIC_SUB, qos=1) print(f"[MQTT] Connected, subscribed to {MQTT_TOPIC_SUB}") else: mqtt_state["connected"] = False print(f"[MQTT] Connect failed, code={reason_code}") def on_disconnect(client, userdata, reason_code, properties=None): mqtt_state["connected"] = False print(f"[MQTT] Disconnected, code={reason_code}") def on_message(client, userdata, msg): try: payload = msg.payload.decode("utf-8", errors="replace") print(f"[MQTT] << {msg.topic}: {payload}") msg_queue.put({"topic": msg.topic, "payload": payload, "time": time.strftime("%H:%M:%S")}) except Exception as e: print(f"[MQTT] Message decode error: {e}") def mqtt_loop(): client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=MQTT_CLIENT_ID) client.username_pw_set(MQTT_USER, MQTT_PASS) client.on_connect = on_connect client.on_disconnect = on_disconnect client.on_message = on_message while True: try: print(f"[MQTT] Connecting to {MQTT_BROKER}:{MQTT_PORT} ...") client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60) client.loop_forever() except Exception as e: print(f"[MQTT] Error: {e}, retrying in 5s ...") mqtt_state["connected"] = False time.sleep(5) # --- Flask routes --- @app.route("/") def index(): return render_template("index.html") @app.route("/api/status") def api_status(): return jsonify({"connected": mqtt_state["connected"]}) @app.route("/api/send", methods=["POST"]) def api_send(): data = request.get_json(force=True, silent=True) if not data: return jsonify({"ok": False, "msg": "invalid json"}), 400 if not mqtt_state["connected"]: return jsonify({"ok": False, "msg": "mqtt disconnected"}), 503 try: payload = json.dumps(data, separators=(',', ':'), ensure_ascii=False) # Publish to MQTT broker mqtt_client.publish(MQTT_TOPIC_PUB, payload, qos=1) print(f"[MQTT] >> {MQTT_TOPIC_PUB}: {payload}") return jsonify({"ok": True}) except Exception as e: return jsonify({"ok": False, "msg": str(e)}), 500 @app.route("/api/stream") def api_stream(): """Server-Sent Events endpoint for real-time messages""" def generate(): while True: try: msg = msg_queue.get(timeout=25) yield f"data: {json.dumps(msg, ensure_ascii=False)}\n\n" except queue.Empty: yield ": keepalive\n\n" resp = Response(generate(), mimetype="text/event-stream") resp.headers["Cache-Control"] = "no-cache" resp.headers["X-Accel-Buffering"] = "no" return resp @app.route("/api/poll") def api_poll(): """轮询获取新消息""" try: msg = msg_queue.get_nowait() return jsonify(msg) except queue.Empty: return jsonify({"msg": None}) # --- Global MQTT client reference --- mqtt_client = None @app.before_request def ensure_mqtt_client(): global mqtt_client if mqtt_client is None: # Create a client just for publishing (the loop thread has its own) mqtt_client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2, client_id=MQTT_CLIENT_ID + "_pub") mqtt_client.username_pw_set(MQTT_USER, MQTT_PASS) try: mqtt_client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60) mqtt_client.loop_start() except Exception: pass if __name__ == "__main__": # Start MQTT background thread t = threading.Thread(target=mqtt_loop, daemon=True) t.start() # Start Flask web server app.run(host="127.0.0.1", port=3000, debug=False)