# Python Web 上位机实施文档(给开发AI直接执行) ## 1. 目标与技术选型 目标:实现一个可运行、可联调、可扩展的上位机系统,满足以下要求: - 与小车通过 TCP 协议通信 - 浏览器可视化控制(自动模式 + 手动麦轮模式) - 实时状态显示(500ms级) - 日志、重连、超时、错误恢复完整 推荐技术栈: - 后端:Python 3.11+ - Web框架:FastAPI - 实时推送:WebSocket - 异步:asyncio - 前端:Vue3 或 React(都可以,本文给中立接口规范) - 部署:Windows 本地运行(开发阶段) 建议架构: - Browser UI <-> WebSocket/HTTP <-> Python Backend <-> TCP Socket <-> Car --- ## 2. 协议契约(必须严格一致) 帧格式: LOGI::# 校验和: - 对字符串 LOGI: 做 ASCII 累加 - 取低8位 - 转 2位大写十六进制 Python函数(必须实现并复用): ```python def build_frame(payload: str) -> str: base = f"LOGI:{payload}" cs = sum(base.encode("ascii")) & 0xFF return f"{base}:{cs:02X}#" ``` 已验证可直接发送的关键命令: - LOGI:MD:MAN:0C# - LOGI:MD:AUTO:69# - LOGI:MV:FWD:23# - LOGI:MV:BWD:1F# - LOGI:MV:LEFT:6D# - LOGI:MV:RIGHT:C0# - LOGI:MV:LF:D4# - LOGI:MV:RF:DA# - LOGI:MV:LB:D0# - LOGI:MV:RB:D6# - LOGI:MV:CW:DC# - LOGI:MV:CCW:1F# - LOGI:MV:STOP:88# - LOGI:SP:030:D5# - LOGI:SP:050:D7# - LOGI:SP:080:DA# - LOGI:GS:001:CA# - LOGI:GS:002:CB# - LOGI:ST:RUN:3B# - LOGI:ST:STOP:8C# --- ## 3. 项目目录(AI生成代码必须遵循) ```text upper-computer-python-web/ backend/ app/ main.py config.py models.py protocol.py tcp_client.py command_service.py state_store.py ws_hub.py parser.py logger.py api/ http_routes.py ws_routes.py requirements.txt frontend/ src/ main.ts api.ts ws.ts store.ts components/ ConnectionPanel.vue ModePanel.vue SpeedPanel.vue AutoPanel.vue ManualPad.vue StatusPanel.vue LogPanel.vue package.json README.md ``` --- ## 4. 后端职责与实现要求 ### 4.1 TCP客户端(tcp_client.py) 必须实现: - connect(host, port) - disconnect() - send_frame(frame) - background_read_loop() - auto_reconnect_loop() 重连策略: - 1s -> 2s -> 3s -> 3s循环 连接状态事件: - connected - disconnected - reconnecting - connect_failed ### 4.2 流式解析器(parser.py) TCP是流,必须做粘包拆包。 规则: 1. 维护 bytearray 缓冲区 2. 找 LOGI: 起点 3. 找 # 终点 4. 抽取候选帧 5. 校验和正确才分发 6. 错帧丢弃并记录日志 必须支持: - 一次收多帧 - 一帧分多次收 - 帧前垃圾字节 ### 4.3 协议服务(protocol.py) 必须提供: - build_frame(payload) - parse_frame(frame) -> dict - verify_checksum(frame) -> bool parse_frame 输出建议: - type: status / feedback / unknown - payload_raw - fields: dict ### 4.4 命令服务(command_service.py) 必须实现命令队列与反馈匹配: - send_command(payload, wait_feedback=True) - 超时默认 800ms - 可重试命令:SP、MD、MV:STOP(最多2次) - 非幂等命令 ST:RUN、GS 不自动重试 反馈格式: - FB::<0|1> ### 4.5 状态存储(state_store.py) 维护全局状态对象: - connection - speed - station - run - mode - manual_dir - distance - trk - dev - obs - rpm[4] - last_status_ts 超过2秒无状态包: - 标记 stale = true - 推送前端告警 ### 4.6 WebSocket中枢(ws_hub.py) 前端订阅以下事件: - conn_update - status_update - feedback_update - tx_log - rx_log - warn - error --- ## 5. HTTP 与 WebSocket 接口规范 ### 5.1 HTTP 1) POST /api/connect - body: {"host":"192.168.1.50","port":3456} - resp: {"ok":true} 2) POST /api/disconnect - resp: {"ok":true} 3) POST /api/cmd - body: {"payload":"MV:FWD","waitFeedback":true} - resp: {"ok":true,"feedback":{"cmd":"MV","status":1}} 4) GET /api/state - resp: 全量状态JSON 5) GET /api/health - resp: {"ok":true} ### 5.2 WebSocket - ws://host:port/ws 消息统一格式: - {"event":"status_update","data":{...}} - {"event":"feedback_update","data":{...}} - {"event":"tx_log","data":{"frame":"..."}} - {"event":"rx_log","data":{"frame":"..."}} --- ## 6. 前端页面与按钮行为 页面最少包含: - 连接面板:IP、端口、连接/断开 - 模式面板:AUTO / MAN - 速度面板:0-100滑条 + 发送按钮 - 自动面板:站点001/002、RUN、STOP - 手动面板:8方向 + CW/CCW + STOP - 状态面板:SP/STA/RUN/MODE/MAN/DIS/TRK/DEV/OBS/RPM - 日志面板:TX/RX 按钮映射要求: 1) 切手动 - 发送 payload: MD:MAN 2) 切自动 - 发送 payload: MD:AUTO 3) 速度 - 发送 payload: SP:xxx(补零3位) 4) 自动站点 - 001 -> GS:001 - 002 -> GS:002 5) 自动运行控制 - RUN -> ST:RUN - STOP -> ST:STOP 6) 手动方向 - 前进 MV:FWD - 后退 MV:BWD - 左移 MV:LEFT - 右移 MV:RIGHT - 左前 MV:LF - 右前 MV:RF - 左后 MV:LB - 右后 MV:RB - 顺时针 MV:CW - 逆时针 MV:CCW - 停止 MV:STOP 手动安全行为(必须): - 按下方向键:发对应MV - 松开方向键:立即发 MV:STOP --- ## 7. 关键业务逻辑(必须实现) ### 7.1 模式冲突恢复 如果发送 MV:* 收到 FB:MV:0: 1. 自动提示“当前不在手动模式” 2. 自动发送 MD:MAN 3. 重试一次原MV命令 ### 7.2 断线保护 断线时: - 禁用全部运动按钮 - UI状态置灰 - 重连成功后,不自动恢复运动,必须人工再次点方向 ### 7.3 急停策略 急停按钮逻辑: 1. 先发 MV:STOP 2. 若当前模式是AUTO,再发 ST:STOP ### 7.4 自动流程约束 自动流程推荐顺序: - SP -> MD:AUTO -> GS -> ST:RUN 到站后固件会锁存,下一次运行前应: - 重发 GS - 再发 ST:RUN --- ## 8. 后端参考代码骨架(最小可运行) main.py 示例: ```python from fastapi import FastAPI, WebSocket from app.api.http_routes import router as http_router from app.api.ws_routes import router as ws_router app = FastAPI(title="Car Upper Computer") app.include_router(http_router, prefix="/api") app.include_router(ws_router) @app.get("/api/health") async def health(): return {"ok": True} ``` command_service.py 的命令发送入口示例: ```python async def send_payload(payload: str, wait_feedback: bool = True): frame = build_frame(payload) await tcp_client.send_frame(frame) await ws_hub.broadcast("tx_log", {"frame": frame}) if not wait_feedback: return {"ok": True} fb = await feedback_waiter.wait(payload.split(":", 1)[0], timeout=0.8) return {"ok": fb is not None, "feedback": fb} ``` --- ## 9. 联调步骤(按此执行) 1. 启动后端 - uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload 2. 启动前端 - npm install - npm run dev 3. 页面连接到小车TCP - 输入IP和端口 - 点击连接 4. 先测基础命令 - SP:050 - MD:MAN - MV:FWD - MV:STOP 5. 再测自动流程 - MD:AUTO - GS:001 - ST:RUN 6. 检查状态面板字段是否持续刷新 --- ## 10. 测试清单(必须通过) 单元测试: - build_frame校验值正确 - verify_checksum正确拦截坏包 - parser支持粘包拆包 集成测试: - 命令发送后能收到FB - 状态包断流时 stale 告警出现 - 断线后自动重连并恢复状态展示 交互测试: - 手动按钮按下/松开逻辑正确 - 急停在任意状态都可触发 --- ## 11. 给开发AI的最终任务提示词 请实现一个 Python Web 上位机系统,后端用 FastAPI + asyncio TCP + WebSocket,前端用 Vue3 或 React。协议为 LOGI::#,CS 为 LOGI: 的 ASCII 累加低8位。实现 SP/ST/GS/MD/MV 命令,解析 STAT 与 FB。支持手动麦轮方向控制(8方向+旋转+STOP)和自动模式控制。必须实现 TCP 粘包拆包解析、命令反馈超时处理、自动重连、状态实时推送、日志面板与安全急停逻辑。