Files
inventory/app.py
wangbeihong 10da4c2859 feat:集成 LCSC 产品 API 用于袋子管理
- 增加了 LCSC API 集成,可利用 app_id、access_key 和 secret_key 获取产品详情。
- 实现了用于安全 API 请求的一次性和签名生成。
- 通过新端点提升包容量管理,更新插槽容量。
- 更新界面,支持 LCSC 产品直接导入袋口。
- 改进了 API 响应和用户输入验证的错误处理。
- 重构箱子渲染逻辑,以适应新的包包功能和展示产品详情。
- 为与 LCSC 产品信息相关的新 UI 元素添加了 CSS 样式。
- 更新了 AI 设置页面,包含了 LCSC API 配置选项。
2026-03-12 13:46:28 +08:00

2600 lines
86 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import csv
import json
import hmac
import base64
import random
import string
import hashlib
import time
import urllib.error
import urllib.parse
import urllib.request
from copy import deepcopy
from io import StringIO
from datetime import datetime, timedelta
from flask import Flask, Response, redirect, render_template, request, url_for
from flask_sqlalchemy import SQLAlchemy
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_DIR = os.path.join(BASE_DIR, "data")
os.makedirs(DB_DIR, exist_ok=True)
DB_PATH = os.path.join(DB_DIR, "inventory.db")
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{DB_PATH}"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
LOW_STOCK_THRESHOLD = 5
BOX_TYPES_OVERRIDE_PATH = os.path.join(DB_DIR, "box_types.json")
AI_SETTINGS_PATH = os.path.join(DB_DIR, "ai_settings.json")
LCSC_BASE_URL = "https://open-api.jlc.com"
LCSC_BASIC_PATH = "/lcsc/openapi/sku/product/basic"
AI_SETTINGS_DEFAULT = {
"api_url": os.environ.get(
"SILICONFLOW_API_URL",
"https://api.siliconflow.cn/v1/chat/completions",
),
"model": os.environ.get(
"SILICONFLOW_MODEL",
"Qwen/Qwen2.5-7B-Instruct",
),
"api_key": os.environ.get("SILICONFLOW_API_KEY", ""),
"timeout": int(os.environ.get("SILICONFLOW_TIMEOUT", "30") or "30"),
"restock_threshold": LOW_STOCK_THRESHOLD,
"restock_limit": 24,
"lcsc_timeout": int(os.environ.get("LCSC_TIMEOUT", "20") or "20"),
"lcsc_app_id": os.environ.get("LCSC_APP_ID", ""),
"lcsc_access_key": os.environ.get("LCSC_ACCESS_KEY", ""),
"lcsc_secret_key": os.environ.get("LCSC_SECRET_KEY", ""),
}
DEFAULT_BOX_TYPES = {
"small_28": {
"label": "28格小盒大盒",
"default_capacity": 28,
"default_desc": "4连排小盒常见摆放为竖向7排",
"default_prefix": "A",
},
"medium_14": {
"label": "14格中盒大盒",
"default_capacity": 14,
"default_desc": "14格中盒内部摆放方向与28格不同",
"default_prefix": "B",
},
"custom": {
"label": "自定义容器",
"default_capacity": 20,
"default_desc": "可按实际盒型设置格数与编号前缀",
"default_prefix": "C",
},
"bag": {
"label": "袋装清单",
"default_capacity": 28,
"default_desc": "一袋一种器件,按清单管理",
"default_prefix": "BAG",
},
}
BOX_TYPES = deepcopy(DEFAULT_BOX_TYPES)
def _apply_box_type_overrides() -> None:
if not os.path.exists(BOX_TYPES_OVERRIDE_PATH):
return
try:
with open(BOX_TYPES_OVERRIDE_PATH, "r", encoding="utf-8") as f:
overrides = json.load(f)
except (OSError, json.JSONDecodeError):
return
if not isinstance(overrides, dict):
return
for key, value in overrides.items():
if key not in BOX_TYPES or not isinstance(value, dict):
continue
for field in ("label", "default_desc", "default_prefix"):
if field not in value:
continue
BOX_TYPES[key][field] = value[field]
# Keep bag container capacity fixed by domain rule.
BOX_TYPES["bag"]["default_capacity"] = 28
def _save_box_type_overrides() -> None:
payload = {}
for key, defaults in DEFAULT_BOX_TYPES.items():
current = BOX_TYPES.get(key, defaults)
changed = {}
for field in ("label", "default_desc", "default_prefix"):
if current.get(field) != defaults.get(field):
changed[field] = current.get(field)
if changed:
payload[key] = changed
with open(BOX_TYPES_OVERRIDE_PATH, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
_apply_box_type_overrides()
def _load_ai_settings() -> dict:
settings = dict(AI_SETTINGS_DEFAULT)
if not os.path.exists(AI_SETTINGS_PATH):
return settings
try:
with open(AI_SETTINGS_PATH, "r", encoding="utf-8") as f:
saved = json.load(f)
except (OSError, json.JSONDecodeError):
return settings
if not isinstance(saved, dict):
return settings
for key in settings.keys():
if key in saved:
settings[key] = saved[key]
return settings
def _save_ai_settings(settings: dict) -> None:
with open(AI_SETTINGS_PATH, "w", encoding="utf-8") as f:
json.dump(settings, f, ensure_ascii=False, indent=2)
def _get_ai_settings() -> dict:
settings = _load_ai_settings()
try:
settings["timeout"] = max(5, int(settings.get("timeout", 30)))
except (TypeError, ValueError):
settings["timeout"] = 30
try:
settings["restock_threshold"] = max(0, int(settings.get("restock_threshold", LOW_STOCK_THRESHOLD)))
except (TypeError, ValueError):
settings["restock_threshold"] = LOW_STOCK_THRESHOLD
try:
settings["restock_limit"] = max(1, int(settings.get("restock_limit", 24)))
except (TypeError, ValueError):
settings["restock_limit"] = 24
try:
settings["lcsc_timeout"] = max(5, int(settings.get("lcsc_timeout", 20)))
except (TypeError, ValueError):
settings["lcsc_timeout"] = 20
settings["api_url"] = (settings.get("api_url") or "").strip()
settings["model"] = (settings.get("model") or "").strip()
settings["api_key"] = (settings.get("api_key") or "").strip()
settings["lcsc_base_url"] = LCSC_BASE_URL
settings["lcsc_basic_path"] = LCSC_BASIC_PATH
settings["lcsc_app_id"] = (settings.get("lcsc_app_id") or "").strip()
settings["lcsc_access_key"] = (settings.get("lcsc_access_key") or "").strip()
settings["lcsc_secret_key"] = (settings.get("lcsc_secret_key") or "").strip()
return settings
def _generate_nonce(length: int = 32) -> str:
alphabet = string.ascii_letters + string.digits
return "".join(random.choice(alphabet) for _ in range(length))
def _generate_jop_signature(method: str, path: str, timestamp: str, nonce: str, post_data: str, secret_key: str) -> str:
string_to_sign = f"{method}\n{path}\n{timestamp}\n{nonce}\n{post_data}\n"
digest = hmac.new(
secret_key.encode("utf-8"),
string_to_sign.encode("utf-8"),
hashlib.sha256,
).digest()
return base64.b64encode(digest).decode("utf-8")
def _extract_lcsc_product_id_from_input(raw_identifier: str) -> int | None:
text = (raw_identifier or "").strip()
if not text:
return None
# Accept full item detail URL and extract /23913.html.
try:
parsed = urllib.parse.urlparse(text)
except ValueError:
parsed = None
if parsed and parsed.netloc:
path = parsed.path or ""
m = re.search(r"/(\d+)\.html$", path)
if m:
return int(m.group(1))
return None
def _fetch_lcsc_product_basic(product_identifier: str, settings: dict) -> dict:
raw_identifier = (product_identifier or "").strip()
if not raw_identifier:
raise RuntimeError("立创商品链接不能为空")
product_id_from_input = _extract_lcsc_product_id_from_input(raw_identifier)
if product_id_from_input is None:
raise RuntimeError("请输入立创商品详情页链接,例如 https://item.szlcsc.com/23913.html")
app_id = settings.get("lcsc_app_id", "").strip()
access_key = settings.get("lcsc_access_key", "").strip()
secret_key = settings.get("lcsc_secret_key", "").strip()
if not app_id or not access_key or not secret_key:
raise RuntimeError("立创 JOP 鉴权参数不完整,请填写 app_id/access_key/secret_key")
timeout = int(settings.get("lcsc_timeout", 20))
def request_openapi(payload: dict) -> dict:
post_data_str = json.dumps(payload, ensure_ascii=False)
timestamp = str(int(time.time()))
nonce = _generate_nonce(32)
signature = _generate_jop_signature(
"POST",
LCSC_BASIC_PATH,
timestamp,
nonce,
post_data_str,
secret_key,
)
headers = {
"Content-Type": "application/json; utf-8",
"Authorization": (
f'JOP appid="{app_id}",accesskey="{access_key}",'
f'timestamp="{timestamp}",nonce="{nonce}",signature="{signature}"'
),
}
endpoint = LCSC_BASE_URL + LCSC_BASIC_PATH
req = urllib.request.Request(
endpoint,
data=post_data_str.encode("utf-8"),
method="POST",
headers=headers,
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"立创接口调用失败: HTTP {exc.code} {detail[:180]}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"立创接口调用失败: 连接失败 {exc.reason}") from exc
try:
response = json.loads(raw)
except json.JSONDecodeError as exc:
raise RuntimeError("立创接口返回非 JSON 数据") from exc
code = response.get("code")
successful = response.get("successful", False)
if code != 200 and not successful:
message = str(response.get("message", "立创接口调用失败") or "立创接口调用失败")
raise RuntimeError(f"立创接口调用失败: code={code}, message={message}")
data = response.get("data") or {}
items = data.get("productBasicInfoVOList") or []
if not items:
raise RuntimeError("未查询到商品信息,请检查编号是否正确")
return items[0]
return request_openapi({"productId": product_id_from_input})
def _map_lcsc_product_to_component(product: dict) -> dict:
product_model = str(product.get("productModel") or "").strip()
product_code = str(product.get("productCode") or "").strip()
product_id = product.get("productId")
part_no = product_model or product_code or (str(product_id) if product_id is not None else "")
name = str(product.get("productName") or "").strip() or part_no or "未命名元件"
brand_name = str(product.get("brandName") or "").strip()
encap_standard = str(product.get("encapStandard") or "").strip()
catalog_name = str(product.get("catalogName") or "").strip()
# Prefer concise, searchable spec fields.
spec_parts = [brand_name, encap_standard, catalog_name]
specification = " / ".join([p for p in spec_parts if p])
arrange_map = {
"biandai": "编带",
"bianpai": "编排",
"daizhuang": "袋装",
"guanzhuang": "管装",
"hezhuang": "盒装",
"juan": "卷装",
"kun": "捆装",
"tuopan": "托盘",
"xiangzhuang": "箱装",
}
unit_map = {
"pan": "圆盘",
"bao": "",
"ben": "",
"dai": "",
"guan": "",
"he": "",
"juan": "",
"kun": "",
"mi": "",
"tuopan": "托盘",
"xiang": "",
}
product_arrange_raw = str(product.get("productArrange") or "").strip().lower()
product_arrange = arrange_map.get(product_arrange_raw, product_arrange_raw)
min_packet_number = product.get("minPacketNumber")
min_packet_unit_raw = str(product.get("minPacketUnit") or "").strip().lower()
min_packet_unit = unit_map.get(min_packet_unit_raw, min_packet_unit_raw)
note_bits = []
if product_code:
note_bits.append(f"LCSC {product_code}")
if product_id is not None:
note_bits.append(f"ID {product_id}")
if product_arrange:
note_bits.append(f"编排 {product_arrange}")
if min_packet_number:
unit_suffix = min_packet_unit or ""
note_bits.append(f"最小包装 {min_packet_number}{unit_suffix}")
note = " | ".join(note_bits)
return {
"part_no": part_no,
"name": name,
"specification": specification,
"note": note,
}
class Box(db.Model):
__tablename__ = "boxes"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False, unique=True)
description = db.Column(db.String(255), nullable=True)
box_type = db.Column(db.String(30), nullable=False, default="small_28")
slot_capacity = db.Column(db.Integer, nullable=False, default=28)
slot_prefix = db.Column(db.String(16), nullable=False, default="A")
start_number = db.Column(db.Integer, nullable=False, default=1)
class Component(db.Model):
__tablename__ = "components"
id = db.Column(db.Integer, primary_key=True)
box_id = db.Column(db.Integer, db.ForeignKey("boxes.id"), nullable=False)
slot_index = db.Column(db.Integer, nullable=False)
part_no = db.Column(db.String(100), nullable=False)
name = db.Column(db.String(120), nullable=False)
specification = db.Column(db.String(120), nullable=True)
quantity = db.Column(db.Integer, nullable=False, default=0)
location = db.Column(db.String(120), nullable=True)
note = db.Column(db.Text, nullable=True)
is_enabled = db.Column(db.Boolean, nullable=False, default=True)
box = db.relationship("Box", backref=db.backref("components", lazy=True))
__table_args__ = (
db.UniqueConstraint("box_id", "slot_index", name="uq_box_slot"),
)
class InventoryEvent(db.Model):
__tablename__ = "inventory_events"
id = db.Column(db.Integer, primary_key=True)
box_id = db.Column(db.Integer, nullable=True)
box_type = db.Column(db.String(30), nullable=True)
component_id = db.Column(db.Integer, nullable=True)
part_no = db.Column(db.String(100), nullable=True)
event_type = db.Column(db.String(30), nullable=False)
delta = db.Column(db.Integer, nullable=False, default=0)
created_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now())
def _add_column_if_missing(table_name: str, column_name: str, ddl: str) -> None:
columns = {
row[1]
for row in db.session.execute(db.text(f"PRAGMA table_info({table_name})")).fetchall()
}
if column_name not in columns:
db.session.execute(db.text(f"ALTER TABLE {table_name} ADD COLUMN {ddl}"))
def ensure_schema() -> None:
_add_column_if_missing(
"boxes",
"box_type",
"box_type VARCHAR(30) NOT NULL DEFAULT 'small_28'",
)
_add_column_if_missing(
"boxes",
"slot_capacity",
"slot_capacity INTEGER NOT NULL DEFAULT 28",
)
_add_column_if_missing(
"boxes",
"slot_prefix",
"slot_prefix VARCHAR(16) NOT NULL DEFAULT 'A'",
)
_add_column_if_missing(
"boxes",
"start_number",
"start_number INTEGER NOT NULL DEFAULT 1",
)
_add_column_if_missing(
"components",
"is_enabled",
"is_enabled BOOLEAN NOT NULL DEFAULT 1",
)
db.session.commit()
def slot_code_for_box(box: Box, slot_index: int) -> str:
serial = box.start_number + slot_index - 1
return f"{box.slot_prefix}{serial}"
def slot_range_label(box: Box) -> str:
start_code = slot_code_for_box(box, 1)
end_code = slot_code_for_box(box, box.slot_capacity)
return f"{start_code}-{end_code}"
def compose_box_name(base_name: str, prefix: str, start_number: int, slot_capacity: int) -> str:
base = (base_name or "").strip()
if not base:
base = "盒子"
end_number = start_number + slot_capacity - 1
return f"{base} {prefix}{start_number}-{prefix}{end_number}"
def make_unique_box_name(candidate_name: str, exclude_box_id: int = None) -> str:
name = candidate_name
counter = 2
while True:
query = Box.query.filter_by(name=name)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
if not query.first():
return name
name = f"{candidate_name} #{counter}"
counter += 1
def infer_base_name(box: Box) -> str:
pattern = rf"\s+{re.escape(box.slot_prefix)}\d+-{re.escape(box.slot_prefix)}\d+(?:\s+#\d+)?$"
base = re.sub(pattern, "", box.name).strip()
return base or box.name
def _extract_lcsc_code_from_text(text: str) -> str:
raw = (text or "").upper()
match = re.search(r"\bC\d{3,}\b", raw)
return match.group(0) if match else ""
def _parse_slot_spec_fields(specification: str) -> dict:
parts = [p.strip() for p in (specification or "").split("/") if p.strip()]
return {
"brand": parts[0] if len(parts) >= 1 else "",
"package": parts[1] if len(parts) >= 2 else "",
"usage": parts[2] if len(parts) >= 3 else "",
}
def _parse_note_detail_fields(note: str) -> dict:
raw = (note or "")
lcsc_code = _extract_lcsc_code_from_text(raw)
product_id = ""
id_match = re.search(r"\b(?:ID|productId)\s*(\d+)\b", raw, flags=re.IGNORECASE)
if id_match:
product_id = id_match.group(1)
arrange = ""
arrange_match = re.search(r"编排\s*([^|]+)", raw)
if arrange_match:
arrange = arrange_match.group(1).strip()
min_pack = ""
min_pack_match = re.search(r"最小包装\s*([^|]+)", raw)
if min_pack_match:
min_pack = min_pack_match.group(1).strip()
return {
"lcsc_code": lcsc_code,
"product_id": product_id,
"arrange": arrange,
"min_pack": min_pack,
}
def slot_data_for_box(box: Box):
components = Component.query.filter_by(box_id=box.id).all()
slot_map = {c.slot_index: c for c in components}
slots = []
for slot in range(1, box.slot_capacity + 1):
component = slot_map.get(slot)
lcsc_code = _extract_lcsc_code_from_text(component.note if component else "")
if not lcsc_code and component:
lcsc_code = _extract_lcsc_code_from_text(component.part_no)
spec_fields = _parse_slot_spec_fields(component.specification) if component else {
"brand": "",
"package": "",
"usage": "",
}
slots.append(
{
"slot": slot,
"slot_code": slot_code_for_box(box, slot),
"component": component,
"lcsc_code": lcsc_code,
"spec_fields": spec_fields,
}
)
return slots
def bag_rows_for_box(box: Box):
rows = []
components = (
Component.query.filter_by(box_id=box.id)
.order_by(Component.slot_index.asc())
.all()
)
for c in components:
rows.append({"component": c, "slot_code": slot_code_for_box(box, c.slot_index)})
return rows
def _parse_non_negative_int(raw_value: str, default_value: int = 0) -> int:
raw = (raw_value or "").strip()
if raw == "":
return default_value
value = int(raw)
if value < 0:
raise ValueError
return value
def normalize_legacy_data() -> None:
db.session.execute(
db.text(
"UPDATE boxes SET box_type = 'small_28' WHERE box_type IS NULL OR box_type = ''"
)
)
db.session.execute(
db.text("UPDATE boxes SET slot_capacity = 28 WHERE slot_capacity IS NULL")
)
db.session.execute(
db.text("UPDATE boxes SET slot_prefix = 'A' WHERE slot_prefix IS NULL OR slot_prefix = ''")
)
db.session.execute(
db.text("UPDATE boxes SET start_number = 1 WHERE start_number IS NULL")
)
db.session.execute(
db.text("UPDATE components SET is_enabled = 1 WHERE is_enabled IS NULL")
)
for box in Box.query.all():
if box.box_type not in BOX_TYPES:
box.box_type = "small_28"
if not box.slot_capacity or box.slot_capacity < 1:
box.slot_capacity = BOX_TYPES[box.box_type]["default_capacity"]
if not box.slot_prefix:
box.slot_prefix = BOX_TYPES[box.box_type]["default_prefix"]
if box.start_number is None or box.start_number < 0:
box.start_number = 1
if box.box_type == "bag":
box.start_number = 1
box.name = "袋装清单"
if not Box.query.filter_by(box_type="bag").first():
default_meta = BOX_TYPES["bag"]
db.session.add(
Box(
name="袋装清单",
description=default_meta["default_desc"],
box_type="bag",
slot_capacity=default_meta["default_capacity"],
slot_prefix=default_meta["default_prefix"],
start_number=1,
)
)
db.session.commit()
def get_fixed_bag_box() -> Box:
bag_box = Box.query.filter_by(box_type="bag").order_by(Box.id.asc()).first()
if bag_box:
return bag_box
meta = BOX_TYPES["bag"]
bag_box = Box(
name="袋装清单",
description=meta["default_desc"],
box_type="bag",
slot_capacity=meta["default_capacity"],
slot_prefix=meta["default_prefix"],
start_number=1,
)
db.session.add(bag_box)
db.session.commit()
return bag_box
@app.route("/box/<int:box_id>/bag-capacity", methods=["POST"])
def update_bag_capacity(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type != "bag":
return bad_request("当前容器不是袋装清单", box.box_type)
try:
slot_capacity = _parse_non_negative_int(request.form.get("slot_capacity", ""), box.slot_capacity)
except ValueError:
return render_box_page(box, error="袋位数量必须是大于等于 1 的整数")
if slot_capacity < 1:
return render_box_page(box, error="袋位数量必须是大于等于 1 的整数")
max_used_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter_by(box_id=box.id)
.scalar()
or 0
)
if max_used_slot > slot_capacity:
return render_box_page(box, error=f"袋位数量不能小于已使用位置 {max_used_slot}")
box.slot_capacity = slot_capacity
db.session.commit()
return redirect(url_for("view_box", box_id=box.id, notice="袋位数量已更新"))
def make_overview_rows(box: Box):
enabled_components = (
Component.query.filter_by(box_id=box.id, is_enabled=True)
.order_by(Component.slot_index.asc())
.all()
)
rows = []
for c in enabled_components:
rows.append(
{
"slot_code": slot_code_for_box(box, c.slot_index),
"name": c.name,
"part_no": c.part_no,
}
)
return rows
def box_sort_key(box: Box):
return (
(box.slot_prefix or "").upper(),
box.start_number if box.start_number is not None else 0,
box.name or "",
)
def has_range_conflict(
*,
box_type: str,
prefix: str,
start_number: int,
slot_capacity: int,
exclude_box_id: int = None,
):
end_number = start_number + slot_capacity - 1
query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
for other in query.all():
other_start = other.start_number
other_end = other.start_number + other.slot_capacity - 1
# Two ranges overlap unless one is strictly before the other.
if not (end_number < other_start or start_number > other_end):
return True, other
return False, None
def suggest_next_start_number(
*,
box_type: str,
prefix: str,
slot_capacity: int,
exclude_box_id: int = None,
) -> int:
max_end = 0
query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
for other in query.all():
other_end = other.start_number + other.slot_capacity - 1
if other_end > max_end:
max_end = other_end
return max_end + 1 if max_end > 0 else 1
def build_index_anchor(box_type: str = "") -> str:
if box_type in BOX_TYPES:
return f"group-{box_type}"
return ""
def bad_request(message: str, box_type: str = ""):
anchor = build_index_anchor(box_type)
if anchor and box_type in BOX_TYPES:
back_url = url_for("type_page", box_type=box_type)
else:
back_url = request.referrer or url_for("types_page")
return (
render_template(
"error.html",
status_code=400,
title="请求参数有误",
message=message,
back_url=back_url,
),
400,
)
def render_box_page(box: Box, error: str = "", notice: str = ""):
slots = slot_data_for_box(box)
return render_template(
"box.html",
box=box,
slots=slots,
box_types=BOX_TYPES,
slot_range=slot_range_label(box),
low_stock_threshold=LOW_STOCK_THRESHOLD,
error=error,
notice=notice,
)
def _next_empty_slot_index(box: Box, occupied_slots: set[int]):
for idx in range(1, box.slot_capacity + 1):
if idx not in occupied_slots:
return idx
return None
def _parse_bulk_line(line: str):
parts = [p.strip() for p in re.split(r"[,\t]", line)]
while len(parts) < 5:
parts.append("")
return {
"part_no": parts[0],
"name": parts[1],
"quantity_raw": parts[2],
"specification": parts[3],
"note": parts[4],
}
def log_inventory_event(
*,
event_type: str,
delta: int,
box: Box = None,
component: Component = None,
part_no: str = "",
):
event = InventoryEvent(
box_id=box.id if box else (component.box_id if component else None),
box_type=box.box_type if box else None,
component_id=component.id if component else None,
part_no=(part_no or (component.part_no if component else "") or "").strip() or None,
event_type=event_type,
delta=int(delta),
)
db.session.add(event)
def parse_days_value(raw_days: str) -> int:
try:
days = int((raw_days or "7").strip())
except ValueError:
days = 7
return days if days in (7, 30) else 7
def parse_box_type_filter(raw_box_type: str) -> str:
box_type = (raw_box_type or "").strip()
return box_type if box_type in BOX_TYPES else "all"
def _to_date(raw_day):
if isinstance(raw_day, str):
return datetime.strptime(raw_day, "%Y-%m-%d").date()
return raw_day
def query_event_daily_delta(days: int, box_type_filter: str = "all"):
today = datetime.now().date()
start_day = today - timedelta(days=days - 1)
query = db.session.query(
db.func.date(InventoryEvent.created_at).label("event_day"),
db.func.sum(InventoryEvent.delta).label("daily_delta"),
).filter(InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time()))
if box_type_filter != "all":
query = query.filter(InventoryEvent.box_type == box_type_filter)
rows = query.group_by(db.func.date(InventoryEvent.created_at)).all()
delta_by_day = {}
for row in rows:
delta_by_day[_to_date(row.event_day)] = int(row.daily_delta or 0)
return delta_by_day
def build_trend_points_from_events(days: int, total_quantity: int, box_type_filter: str = "all"):
safe_days = days if days in (7, 30) else 7
today = datetime.now().date()
delta_by_day = query_event_daily_delta(safe_days, box_type_filter)
points = []
running_total = total_quantity
reverse_days = [today - timedelta(days=offset) for offset in range(safe_days)]
for day in reverse_days:
points.append(
{
"date": day,
"label": day.strftime("%m-%d"),
"value": running_total,
}
)
running_total -= delta_by_day.get(day, 0)
points.reverse()
return points
def build_box_type_trend_series(days: int, totals_by_type: dict):
safe_days = days if days in (7, 30) else 7
today = datetime.now().date()
all_days = [today - timedelta(days=offset) for offset in range(safe_days)]
series = {}
for box_type in BOX_TYPES.keys():
delta_by_day = query_event_daily_delta(safe_days, box_type)
running_total = int(totals_by_type.get(box_type, 0))
values = []
for day in all_days:
values.append(running_total)
running_total -= delta_by_day.get(day, 0)
values.reverse()
series[box_type] = values
return {
"labels": [day.strftime("%m-%d") for day in reversed(all_days)],
"series": series,
}
def make_sparkline(values: list[int], width: int = 220, height: int = 56) -> str:
if not values:
return ""
min_value = min(values)
max_value = max(values)
span = max(max_value - min_value, 1)
step_x = width / max(len(values) - 1, 1)
points = []
for idx, value in enumerate(values):
x = idx * step_x
y = height - ((value - min_value) / span) * height
points.append(f"{x:.2f},{y:.2f}")
return " ".join(points)
def event_type_label(event_type: str) -> str:
labels = {
"quick_inbound_add": "快速入库新增",
"quick_inbound_merge": "快速入库合并",
"component_outbound": "快速出库",
"component_save": "编辑保存",
"component_enable": "启用元件",
"component_disable": "停用元件",
"component_delete": "删除元件",
"bag_add": "袋装新增",
"bag_batch_add": "袋装批量新增",
"bag_merge": "袋装合并",
"bag_batch_merge": "袋装批量合并",
"box_delete": "删除盒子",
}
return labels.get(event_type, event_type)
def recent_events(limit: int = 20, box_type_filter: str = "all"):
query = InventoryEvent.query
if box_type_filter != "all":
query = query.filter_by(box_type=box_type_filter)
rows = query.order_by(InventoryEvent.created_at.desc()).limit(limit).all()
items = []
for row in rows:
items.append(
{
"time": row.created_at.strftime("%Y-%m-%d %H:%M") if row.created_at else "-",
"type": event_type_label(row.event_type),
"box_type": BOX_TYPES.get(row.box_type, {}).get("label", "全部"),
"part_no": row.part_no or "-",
"delta": int(row.delta or 0),
}
)
return items
def build_dashboard_context():
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
boxes.sort(key=box_sort_key)
groups = {key: [] for key in BOX_TYPES.keys()}
for box in boxes:
box_type = box.box_type if box.box_type in BOX_TYPES else "small_28"
overview_rows = make_overview_rows(box)
groups[box_type].append(
{
"box": box,
"used_count": len(overview_rows),
"slot_range": slot_range_label(box),
"overview_rows": overview_rows,
"base_name": infer_base_name(box),
}
)
components = Component.query.all()
enabled_components = [c for c in components if c.is_enabled]
disabled_components = [c for c in components if not c.is_enabled]
low_stock_components = [c for c in enabled_components if c.quantity < LOW_STOCK_THRESHOLD]
trend_points_7d = build_trend_points_from_events(
days=7,
total_quantity=sum(c.quantity for c in enabled_components),
box_type_filter="all",
)
period_net_change_7d = 0
if len(trend_points_7d) >= 2:
period_net_change_7d = trend_points_7d[-1]["value"] - trend_points_7d[0]["value"]
low_stock_items = []
for c in sorted(low_stock_components, key=lambda item: (item.quantity, item.name or ""))[:12]:
box = box_by_id.get(c.box_id)
box_type_key = box.box_type if box and box.box_type in BOX_TYPES else "small_28"
low_stock_items.append(
{
"name": c.name,
"part_no": c.part_no,
"quantity": c.quantity,
"box_type": box_type_key,
"box_type_label": BOX_TYPES[box_type_key]["label"],
"box_name": box.name if box else f"{c.box_id}",
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
"edit_url": url_for("edit_component", box_id=c.box_id, slot=c.slot_index),
}
)
category_stats = []
max_category_quantity = 0
for key, meta in BOX_TYPES.items():
category_components = [
c
for c in enabled_components
if box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == key
]
quantity = sum(c.quantity for c in category_components)
max_category_quantity = max(max_category_quantity, quantity)
category_stats.append(
{
"key": key,
"label": meta["label"],
"item_count": len(category_components),
"quantity": quantity,
}
)
stats = {
"box_count": len(boxes),
"active_items": len(enabled_components),
"low_stock_count": len(low_stock_components),
"disabled_count": len(disabled_components),
"max_category_quantity": max_category_quantity,
"period_net_change_7d": period_net_change_7d,
}
return {
"groups": groups,
"stats": stats,
"category_stats": category_stats,
"low_stock_items": low_stock_items,
}
def _build_restock_payload(*, limit: int = 20, threshold: int = LOW_STOCK_THRESHOLD) -> dict:
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
enabled_components = Component.query.filter_by(is_enabled=True).all()
low_stock_components = [c for c in enabled_components if int(c.quantity or 0) < threshold]
low_items = []
for c in sorted(low_stock_components, key=lambda item: (int(item.quantity or 0), item.name or ""))[:limit]:
box = box_by_id.get(c.box_id)
box_type = box.box_type if box and box.box_type in BOX_TYPES else "small_28"
low_items.append(
{
"part_no": c.part_no,
"name": c.name,
"quantity": int(c.quantity or 0),
"box_type": box_type,
"box_type_label": BOX_TYPES[box_type]["label"],
"box_name": box.name if box else f"{c.box_id}",
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
}
)
start_day = datetime.now().date() - timedelta(days=29)
outbound_rows = (
db.session.query(InventoryEvent.part_no, db.func.sum(-InventoryEvent.delta).label("outbound_qty"))
.filter(
InventoryEvent.event_type == "component_outbound",
InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time()),
InventoryEvent.delta < 0,
)
.group_by(InventoryEvent.part_no)
.order_by(db.func.sum(-InventoryEvent.delta).desc())
.limit(20)
.all()
)
outbound_top = [
{"part_no": row[0] or "-", "outbound_qty_30d": int(row[1] or 0)}
for row in outbound_rows
]
return {
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"threshold": int(threshold),
"low_stock_items": low_items,
"top_outbound_30d": outbound_top,
}
def _call_siliconflow_chat(
system_prompt: str,
user_prompt: str,
*,
api_url: str,
model: str,
api_key: str,
timeout: int,
) -> str:
api_key = (api_key or "").strip()
if not api_key:
raise RuntimeError("SILICONFLOW_API_KEY 未配置")
if not api_url:
raise RuntimeError("AI API URL 未配置")
if not model:
raise RuntimeError("AI 模型名称未配置")
payload = {
"model": model,
"temperature": 0.2,
"max_tokens": 700,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
}
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
api_url,
data=body,
method="POST",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"AI 服务返回 HTTP {exc.code}: {detail[:200]}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"AI 服务连接失败: {exc.reason}") from exc
try:
data = json.loads(raw)
return data["choices"][0]["message"]["content"].strip()
except (KeyError, IndexError, TypeError, json.JSONDecodeError) as exc:
raise RuntimeError("AI 返回格式无法解析") from exc
@app.route("/")
def index():
return redirect(url_for("types_page"))
@app.route("/types")
def types_page():
dashboard = build_dashboard_context()
notice = request.args.get("notice", "").strip()
error = request.args.get("error", "").strip()
category_stats_map = {item["key"]: item for item in dashboard["category_stats"]}
low_stock_groups = []
for key, meta in BOX_TYPES.items():
grouped_items = [
item
for item in dashboard["low_stock_items"]
if item.get("box_type") == key
]
low_stock_groups.append(
{
"key": key,
"label": meta["label"],
"items": grouped_items,
}
)
type_cards = []
for key, meta in BOX_TYPES.items():
category_item = category_stats_map.get(key, {})
type_cards.append(
{
"key": key,
"label": meta["label"],
"desc": meta["default_desc"],
"count": len(dashboard["groups"].get(key, [])),
"item_count": int(category_item.get("item_count", 0)),
"quantity": int(category_item.get("quantity", 0)),
"url": url_for("type_page", box_type=key),
}
)
return render_template(
"types.html",
type_cards=type_cards,
stats=dashboard["stats"],
low_stock_groups=low_stock_groups,
notice=notice,
error=error,
)
@app.route("/container-type/<box_type>/edit", methods=["GET", "POST"])
def edit_container_type(box_type: str):
if box_type not in BOX_TYPES:
return bad_request("无效盒子类型", "")
meta = BOX_TYPES[box_type]
error = ""
if request.method == "POST":
label = request.form.get("label", "").strip()
default_desc = request.form.get("default_desc", "").strip()
default_prefix = request.form.get("default_prefix", "").strip().upper()
if not label:
error = "容器名称不能为空"
elif not default_prefix:
error = "默认前缀不能为空"
if not error:
meta["label"] = label
meta["default_desc"] = default_desc or DEFAULT_BOX_TYPES[box_type]["default_desc"]
meta["default_prefix"] = default_prefix
_save_box_type_overrides()
return redirect(url_for("types_page", notice="容器属性已更新"))
return render_template(
"type_edit.html",
box_type=box_type,
meta=meta,
error=error,
)
@app.route("/type/<box_type>")
def type_page(box_type: str):
if box_type not in BOX_TYPES:
return bad_request("无效盒子类型", "")
if box_type == "bag":
return render_box_page(get_fixed_bag_box())
dashboard = build_dashboard_context()
return render_template(
"index.html",
groups=dashboard["groups"],
box_types=BOX_TYPES,
stats=dashboard["stats"],
category_stats=dashboard["category_stats"],
low_stock_items=dashboard["low_stock_items"],
view_box_types=[box_type],
current_box_type=box_type,
separate_mode=True,
)
@app.route("/boxes/create", methods=["POST"])
def create_box():
box_type = request.form.get("box_type", "small_28").strip()
base_name = request.form.get("name", "").strip()
description = request.form.get("description", "").strip()
slot_prefix = request.form.get("slot_prefix", "").strip().upper()
if box_type not in BOX_TYPES:
return bad_request("无效盒子类型", box_type)
if box_type == "bag":
return redirect(url_for("type_page", box_type="bag"))
if not base_name:
return bad_request("盒子名称不能为空", box_type)
try:
start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1)
except ValueError:
return bad_request("起始序号必须是大于等于 0 的整数", box_type)
meta = BOX_TYPES[box_type]
slot_capacity = meta["default_capacity"]
if box_type == "custom":
try:
slot_capacity = _parse_non_negative_int(
request.form.get("slot_capacity", str(meta["default_capacity"])),
meta["default_capacity"],
)
except ValueError:
return bad_request("格数必须是大于等于 1 的整数", box_type)
if slot_capacity < 1:
return bad_request("格数必须是大于等于 1 的整数", box_type)
effective_prefix = slot_prefix or meta["default_prefix"]
conflict, other_box = has_range_conflict(
box_type=box_type,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
)
if conflict:
return bad_request(
"编号范围冲突: 与现有盒子 "
f"[{other_box.name}]"
" 重叠,请更换前缀或起始序号",
box_type,
)
generated_name = compose_box_name(
base_name=base_name,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
)
final_name = make_unique_box_name(generated_name)
box = Box(
name=final_name,
description=description or meta["default_desc"],
box_type=box_type,
slot_capacity=slot_capacity,
slot_prefix=effective_prefix,
start_number=start_number,
)
db.session.add(box)
db.session.commit()
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
target_type = return_to_type if return_to_type != "all" else box_type
return redirect(url_for("type_page", box_type=target_type))
@app.route("/boxes/<int:box_id>/update", methods=["POST"])
def update_box(box_id: int):
box = Box.query.get_or_404(box_id)
base_name = request.form.get("name", "").strip()
description = request.form.get("description", "").strip()
slot_prefix = request.form.get("slot_prefix", "").strip().upper()
if not base_name:
return bad_request("盒子名称不能为空", box.box_type)
try:
start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1)
except ValueError:
return bad_request("起始序号必须是大于等于 0 的整数", box.box_type)
slot_capacity = box.slot_capacity
if box.box_type == "custom":
try:
slot_capacity = _parse_non_negative_int(
request.form.get("slot_capacity", str(box.slot_capacity)),
box.slot_capacity,
)
except ValueError:
return bad_request("格数必须是大于等于 1 的整数", box.box_type)
if slot_capacity < 1:
return bad_request("格数必须是大于等于 1 的整数", box.box_type)
max_used_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter_by(box_id=box.id)
.scalar()
or 0
)
if max_used_slot > slot_capacity:
return bad_request(
f"格数不能小于已使用位置 {max_used_slot}",
box.box_type,
)
effective_prefix = slot_prefix or BOX_TYPES[box.box_type]["default_prefix"]
conflict, other_box = has_range_conflict(
box_type=box.box_type,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
exclude_box_id=box.id,
)
if conflict:
return bad_request(
"编号范围冲突: 与现有盒子 "
f"[{other_box.name}]"
" 重叠,请更换前缀或起始序号",
box.box_type,
)
generated_name = compose_box_name(
base_name=base_name,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
)
box.name = make_unique_box_name(generated_name, exclude_box_id=box.id)
box.description = description or BOX_TYPES[box.box_type]["default_desc"]
box.slot_prefix = effective_prefix
box.start_number = start_number
box.slot_capacity = slot_capacity
db.session.commit()
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
target_type = return_to_type if return_to_type != "all" else box.box_type
return redirect(url_for("type_page", box_type=target_type))
@app.route("/boxes/<int:box_id>/delete", methods=["POST"])
def delete_box(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type == "bag":
return bad_request("袋装清单为固定大容器,不能删除", box.box_type)
enabled_sum = (
db.session.query(db.func.sum(Component.quantity))
.filter_by(box_id=box.id, is_enabled=True)
.scalar()
or 0
)
if enabled_sum:
log_inventory_event(event_type="box_delete", delta=-int(enabled_sum), box=box)
Component.query.filter_by(box_id=box.id).delete()
box_type = box.box_type
db.session.delete(box)
db.session.commit()
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
target_type = return_to_type if return_to_type != "all" else box_type
return redirect(url_for("type_page", box_type=target_type))
@app.route("/boxes/suggest-start")
def suggest_start():
box_type = request.args.get("box_type", "small_28").strip()
if box_type not in BOX_TYPES:
return {"ok": False, "message": "无效盒子类型"}, 400
slot_prefix = request.args.get("slot_prefix", "").strip().upper()
effective_prefix = slot_prefix or BOX_TYPES[box_type]["default_prefix"]
box_id_raw = request.args.get("box_id", "").strip()
exclude_box_id = None
slot_capacity = BOX_TYPES[box_type]["default_capacity"]
if box_type == "custom" and not box_id_raw:
try:
slot_capacity = _parse_non_negative_int(
request.args.get("slot_capacity", str(slot_capacity)),
slot_capacity,
)
except ValueError:
return {"ok": False, "message": "格数必须是大于等于 1 的整数"}, 400
if slot_capacity < 1:
return {"ok": False, "message": "格数必须是大于等于 1 的整数"}, 400
if box_id_raw:
try:
box_id = int(box_id_raw)
except ValueError:
return {"ok": False, "message": "box_id 非法"}, 400
box = Box.query.get(box_id)
if not box:
return {"ok": False, "message": "盒子不存在"}, 404
box_type = box.box_type
slot_capacity = box.slot_capacity
exclude_box_id = box.id
suggested = suggest_next_start_number(
box_type=box_type,
prefix=effective_prefix,
slot_capacity=slot_capacity,
exclude_box_id=exclude_box_id,
)
end_number = suggested + slot_capacity - 1
return {
"ok": True,
"start_number": suggested,
"slot_prefix": effective_prefix,
"preview_range": f"{effective_prefix}{suggested}-{effective_prefix}{end_number}",
}
@app.route("/box/<int:box_id>")
def view_box(box_id: int):
box = Box.query.get_or_404(box_id)
return render_box_page(box)
@app.route("/box/<int:box_id>/labels/export")
def export_box_labels_csv(box_id: int):
box = Box.query.get_or_404(box_id)
rows = (
Component.query.filter_by(box_id=box.id, is_enabled=True)
.order_by(Component.slot_index.asc())
.all()
)
output = StringIO()
writer = csv.writer(output)
writer.writerow(
[
"盒子名称(box_name)",
"位置编号(slot_code)",
"料号(part_no)",
"名称(name)",
"品牌(brand)",
"封装(package)",
"用途(usage)",
"立创编号(lcsc_code)",
"立创商品ID(lcsc_product_id)",
"商品编排(arrange)",
"最小包装(min_pack)",
"规格(specification)",
"数量(quantity)",
"位置备注(location)",
"备注(note)",
]
)
for c in rows:
slot_code = slot_code_for_box(box, c.slot_index)
spec_fields = _parse_slot_spec_fields(c.specification)
note_fields = _parse_note_detail_fields(c.note)
if not note_fields["lcsc_code"]:
note_fields["lcsc_code"] = _extract_lcsc_code_from_text(c.part_no)
writer.writerow(
[
box.name,
slot_code,
c.part_no or "",
c.name or "",
spec_fields["brand"],
spec_fields["package"],
spec_fields["usage"],
note_fields["lcsc_code"],
note_fields["product_id"],
note_fields["arrange"],
note_fields["min_pack"],
c.specification or "",
int(c.quantity or 0),
c.location or "",
c.note or "",
]
)
csv_content = "\ufeff" + output.getvalue()
output.close()
filename = f"labels_box_{box.id}.csv"
return Response(
csv_content,
mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@app.route("/box/<int:box_id>/quick-inbound", methods=["POST"])
def quick_inbound(box_id: int):
box = Box.query.get_or_404(box_id)
raw_lines = request.form.get("lines", "")
lines = [line.strip() for line in raw_lines.splitlines() if line.strip()]
if not lines:
return render_box_page(box, error="快速入库失败: 请至少输入一行")
components = Component.query.filter_by(box_id=box.id).all()
occupied_slots = {c.slot_index for c in components}
by_part_no = {c.part_no: c for c in components if c.part_no}
added_count = 0
merged_count = 0
skipped_lines = []
changed = False
for line_no, line in enumerate(lines, start=1):
parsed = _parse_bulk_line(line)
part_no = parsed["part_no"]
name = parsed["name"]
quantity_raw = parsed["quantity_raw"]
specification = parsed["specification"]
note = parsed["note"]
if not part_no or not name:
skipped_lines.append(f"{line_no}")
continue
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
skipped_lines.append(f"{line_no}")
continue
existing = by_part_no.get(part_no)
if existing:
old_enabled_qty = existing.quantity if existing.is_enabled else 0
existing.quantity += quantity
existing.name = name or existing.name
if specification:
existing.specification = specification
if note:
existing.note = note
existing.is_enabled = True
new_enabled_qty = existing.quantity
delta = new_enabled_qty - old_enabled_qty
if delta:
log_inventory_event(
event_type="quick_inbound_merge",
delta=delta,
box=box,
component=existing,
part_no=part_no,
)
merged_count += 1
changed = True
continue
slot_index = _next_empty_slot_index(box, occupied_slots)
if slot_index is None:
skipped_lines.append(f"{line_no} 行(盒子已满)")
continue
item = Component(
box_id=box.id,
slot_index=slot_index,
part_no=part_no,
name=name,
quantity=quantity,
specification=specification or None,
note=note or None,
is_enabled=True,
)
db.session.add(item)
if quantity:
log_inventory_event(
event_type="quick_inbound_add",
delta=quantity,
box=box,
component=item,
part_no=part_no,
)
occupied_slots.add(slot_index)
by_part_no[part_no] = item
added_count += 1
changed = True
if changed:
db.session.commit()
if added_count == 0 and merged_count == 0:
return render_box_page(
box,
error="快速入库失败: 没有可导入的数据,请检查格式",
)
message = f"快速入库完成: 新增 {added_count} 条,合并 {merged_count}"
if skipped_lines:
message += ";跳过: " + ", ".join(skipped_lines)
return render_box_page(box, notice=message)
@app.route("/box/<int:box_id>/bags/add", methods=["POST"])
def add_bag_item(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type != "bag":
return "当前盒子不是袋装清单", 400
part_no = request.form.get("part_no", "").strip()
name = request.form.get("name", "").strip()
specification = request.form.get("specification", "").strip()
note = request.form.get("note", "").strip()
if not part_no or not name:
return render_box_page(box, error="袋装新增失败: 料号和名称不能为空")
try:
quantity = _parse_non_negative_int(request.form.get("quantity", "0"), 0)
except ValueError:
return render_box_page(box, error="袋装新增失败: 数量必须是大于等于 0 的整数")
existing = Component.query.filter_by(box_id=box.id, part_no=part_no).first()
if existing:
old_enabled_qty = existing.quantity if existing.is_enabled else 0
existing.name = name or existing.name
existing.quantity += quantity
existing.specification = specification or existing.specification
existing.note = note or existing.note
existing.is_enabled = True
new_enabled_qty = existing.quantity
delta = int(new_enabled_qty - old_enabled_qty)
if delta:
log_inventory_event(
event_type="bag_merge",
delta=delta,
box=box,
component=existing,
part_no=part_no,
)
db.session.commit()
return render_box_page(box, notice="同料号已存在,已合并到原袋位")
next_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter(Component.box_id == box.id)
.scalar()
or 0
) + 1
item = Component(
box_id=box.id,
slot_index=next_slot,
part_no=part_no,
name=name,
quantity=quantity,
specification=specification or None,
note=note or None,
is_enabled=True,
)
db.session.add(item)
if quantity:
log_inventory_event(
event_type="bag_add",
delta=quantity,
box=box,
component=item,
part_no=part_no,
)
if next_slot > box.slot_capacity:
box.slot_capacity = next_slot
db.session.commit()
return render_box_page(box, notice="已新增 1 条袋装记录")
@app.route("/box/<int:box_id>/bags/batch", methods=["POST"])
def add_bag_items_batch(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type != "bag":
return "当前盒子不是袋装清单", 400
raw_lines = request.form.get("lines", "")
lines = [line.strip() for line in raw_lines.splitlines() if line.strip()]
if not lines:
return render_box_page(box, error="批量新增失败: 请至少输入一行")
invalid_lines = []
added_count = 0
next_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter(Component.box_id == box.id)
.scalar()
or 0
) + 1
existing_by_part_no = {
c.part_no: c for c in Component.query.filter_by(box_id=box.id).all() if c.part_no
}
merged_count = 0
for line_no, line in enumerate(lines, start=1):
parts = [p.strip() for p in re.split(r"[,\t]", line)]
while len(parts) < 5:
parts.append("")
part_no = parts[0]
name = parts[1]
quantity_raw = parts[2]
specification = parts[3]
note = parts[4]
if not part_no or not name:
invalid_lines.append(f"{line_no}")
continue
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
invalid_lines.append(f"{line_no}")
continue
existing = existing_by_part_no.get(part_no)
if existing:
old_enabled_qty = existing.quantity if existing.is_enabled else 0
existing.name = name or existing.name
existing.quantity += quantity
existing.specification = specification or existing.specification
existing.note = note or existing.note
existing.is_enabled = True
delta = int(existing.quantity - old_enabled_qty)
if delta:
log_inventory_event(
event_type="bag_batch_merge",
delta=delta,
box=box,
component=existing,
part_no=part_no,
)
merged_count += 1
continue
new_component = Component(
box_id=box.id,
slot_index=next_slot,
part_no=part_no,
name=name,
quantity=quantity,
specification=specification or None,
note=note or None,
is_enabled=True,
)
db.session.add(new_component)
existing_by_part_no[part_no] = new_component
if quantity:
log_inventory_event(
event_type="bag_batch_add",
delta=quantity,
box=box,
component=new_component,
part_no=part_no,
)
added_count += 1
next_slot += 1
if added_count:
box.slot_capacity = max(box.slot_capacity, next_slot - 1)
db.session.commit()
if invalid_lines and added_count == 0 and merged_count == 0:
return render_box_page(
box,
error="批量新增失败: 数据格式不正确(请检查: " + ", ".join(invalid_lines) + "",
)
if invalid_lines:
return render_box_page(
box,
notice=f"已新增 {added_count} 条,合并 {merged_count} 条,以下行被跳过: " + ", ".join(invalid_lines),
)
return render_box_page(box, notice=f"批量新增成功:新增 {added_count} 条,合并 {merged_count}")
@app.route("/edit/<int:box_id>/<int:slot>", methods=["GET", "POST"])
def edit_component(box_id: int, slot: int):
box = Box.query.get_or_404(box_id)
if slot < 1 or slot > box.slot_capacity:
return "无效的格子编号", 400
search_query = request.args.get("q", "").strip()
notice = request.args.get("notice", "").strip()
error = request.args.get("error", "").strip()
component = Component.query.filter_by(box_id=box.id, slot_index=slot).first()
if request.method == "POST":
action = request.form.get("action", "save")
search_query_post = request.form.get("q", "").strip()
search_query_effective = search_query_post or search_query
if action == "delete":
if component:
if component.is_enabled and component.quantity:
log_inventory_event(
event_type="component_delete",
delta=-int(component.quantity),
box=box,
component=component,
)
db.session.delete(component)
db.session.commit()
if search_query_effective:
return redirect(url_for("search_page", q=search_query_effective))
return redirect(url_for("view_box", box_id=box.id))
if action == "toggle_enable":
if component:
if not component.is_enabled:
component.is_enabled = True
if component.quantity:
log_inventory_event(
event_type="component_enable",
delta=int(component.quantity),
box=box,
component=component,
)
db.session.commit()
return redirect(url_for("edit_component", box_id=box.id, slot=slot, q=search_query_post))
if action == "toggle_disable":
if component:
if component.is_enabled:
component.is_enabled = False
if component.quantity:
log_inventory_event(
event_type="component_disable",
delta=-int(component.quantity),
box=box,
component=component,
)
db.session.commit()
return redirect(url_for("edit_component", box_id=box.id, slot=slot, q=search_query_post))
part_no = request.form.get("part_no", "").strip()
name = request.form.get("name", "").strip()
specification = request.form.get("specification", "").strip()
quantity_raw = request.form.get("quantity", "0").strip()
note = request.form.get("note", "").strip()
if not part_no or not name:
error = "料号和名称不能为空"
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
notice=notice,
search_query=search_query_post,
)
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
error = "数量必须是大于等于 0 的整数"
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
notice=notice,
search_query=search_query_post,
)
old_enabled_qty = 0
if component is not None and component.is_enabled:
old_enabled_qty = int(component.quantity)
if component is None:
component = Component(box_id=box.id, slot_index=slot)
db.session.add(component)
component.part_no = part_no
component.name = name
component.specification = specification or None
component.quantity = quantity
component.note = note or None
if component.is_enabled is None:
component.is_enabled = True
new_enabled_qty = quantity if component.is_enabled else 0
delta = int(new_enabled_qty - old_enabled_qty)
if delta:
log_inventory_event(
event_type="component_save",
delta=delta,
box=box,
component=component,
part_no=part_no,
)
db.session.commit()
if search_query_effective:
return redirect(url_for("search_page", q=search_query_effective))
return redirect(url_for("view_box", box_id=box.id))
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
notice=notice,
search_query=search_query,
)
@app.route("/edit/<int:box_id>/<int:slot>/lcsc-import", methods=["POST"])
def lcsc_import_to_edit_slot(box_id: int, slot: int):
box = Box.query.get_or_404(box_id)
if slot < 1 or slot > box.slot_capacity:
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="目标位置超出当前容器范围"))
try:
quantity = _parse_non_negative_int(request.form.get("quantity", "0"), 0)
except ValueError:
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="数量必须是大于等于 0 的整数"))
settings = _get_ai_settings()
product_identifier = request.form.get("lcsc_product_id", "").strip()
try:
product = _fetch_lcsc_product_basic(product_identifier, settings)
except RuntimeError as exc:
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error=f"立创导入失败: {exc}"))
mapped = _map_lcsc_product_to_component(product)
if not mapped["part_no"] or not mapped["name"]:
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="立创导入失败: 商品信息缺少料号或名称"))
component = Component.query.filter_by(box_id=box.id, slot_index=slot).first()
old_enabled_qty = int(component.quantity or 0) if component and component.is_enabled else 0
if component is None:
component = Component(box_id=box.id, slot_index=slot)
db.session.add(component)
component.part_no = mapped["part_no"]
component.name = mapped["name"]
component.specification = mapped["specification"] or None
component.note = mapped["note"] or None
component.quantity = quantity
component.is_enabled = True
delta = int(component.quantity or 0) - old_enabled_qty
if delta:
log_inventory_event(
event_type="component_save",
delta=delta,
box=box,
component=component,
part_no=component.part_no,
)
db.session.commit()
slot_code = slot_code_for_box(box, slot)
return redirect(
url_for(
"edit_component",
box_id=box.id,
slot=slot,
notice=f"立创导入成功: {mapped['part_no']} 已写入 {slot_code}",
)
)
@app.route("/search")
def search_page():
keyword = request.args.get("q", "").strip()
notice = request.args.get("notice", "").strip()
error = request.args.get("error", "").strip()
results = []
if keyword:
raw_results = (
Component.query.join(Box, Box.id == Component.box_id)
.filter(
Component.is_enabled.is_(True),
db.or_(
Component.part_no.ilike(f"%{keyword}%"),
Component.name.ilike(f"%{keyword}%"),
),
)
.order_by(Component.part_no.asc(), Component.name.asc())
.all()
)
for c in raw_results:
box = Box.query.get(c.box_id)
results.append(
{
"component": c,
"box_name": box.name if box else f"{c.box_id}",
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
"edit_url": url_for("edit_component", box_id=c.box_id, slot=c.slot_index, q=keyword),
}
)
return render_template(
"search.html",
keyword=keyword,
results=results,
notice=notice,
error=error,
)
@app.route("/ai/restock-plan", methods=["POST"])
def ai_restock_plan():
ai_settings = _get_ai_settings()
data = _build_restock_payload(
limit=ai_settings["restock_limit"],
threshold=ai_settings["restock_threshold"],
)
def _empty_plan(summary: str) -> dict:
return {
"summary": summary,
"urgent": [],
"this_week": [],
"defer": [],
}
def _normalize_item(raw_item: dict) -> dict:
if not isinstance(raw_item, dict):
return {
"part_no": "-",
"name": "未命名元件",
"suggest_qty": "待确认",
"reason": "AI 返回项格式异常",
}
return {
"part_no": str(raw_item.get("part_no", "-") or "-").strip() or "-",
"name": str(raw_item.get("name", "未命名元件") or "未命名元件").strip(),
"suggest_qty": str(raw_item.get("suggest_qty", "待确认") or "待确认").strip(),
"reason": str(raw_item.get("reason", "") or "").strip() or "",
}
def _normalize_plan(raw_plan: dict, default_summary: str) -> dict:
if not isinstance(raw_plan, dict):
return _empty_plan(default_summary)
summary = str(raw_plan.get("summary", "") or "").strip() or default_summary
def to_items(key: str):
rows = raw_plan.get(key, [])
if not isinstance(rows, list):
return []
return [_normalize_item(row) for row in rows]
return {
"summary": summary,
"urgent": to_items("urgent"),
"this_week": to_items("this_week"),
"defer": to_items("defer"),
}
def _extract_json_block(raw_text: str) -> str:
text = (raw_text or "").strip()
if not text:
return ""
if text.startswith("```"):
text = re.sub(r"^```(?:json)?\\s*", "", text)
text = re.sub(r"\\s*```$", "", text)
first = text.find("{")
last = text.rfind("}")
if first >= 0 and last > first:
return text[first : last + 1]
return text
def _build_rule_based_plan() -> dict:
threshold = int(data.get("threshold", LOW_STOCK_THRESHOLD))
urgent = []
this_week = []
for idx, item in enumerate(data.get("low_stock_items", [])):
qty = int(item.get("quantity", 0) or 0)
suggest_qty = max(threshold * 2 - qty, 1)
row = {
"part_no": item.get("part_no", "-") or "-",
"name": item.get("name", "未命名元件") or "未命名元件",
"suggest_qty": str(suggest_qty),
"reason": f"当前库存 {qty},低于阈值 {threshold}",
}
if idx < 5:
urgent.append(row)
else:
this_week.append(row)
return {
"summary": "已按规则生成兜底补货建议AI 输出异常时使用)",
"urgent": urgent,
"this_week": this_week,
"defer": [],
}
if not data["low_stock_items"]:
plan = _empty_plan("当前没有低库存元件,暂不需要补货。")
return {
"ok": True,
"suggestion": "当前没有低库存元件,暂不需要补货。",
"plan": plan,
"data": data,
}
system_prompt = (
"你是电子元器件库存助手。"
"必须只输出 JSON不要 Markdown不要解释文字。"
"输出结构必须是: "
"{\"summary\":string,\"urgent\":[item],\"this_week\":[item],\"defer\":[item]}。"
"item 结构: {\"part_no\":string,\"name\":string,\"suggest_qty\":string,\"reason\":string}。"
"各数组允许为空。"
)
user_prompt = "库存数据如下(JSON):\n" + json.dumps(data, ensure_ascii=False)
try:
suggestion = _call_siliconflow_chat(
system_prompt,
user_prompt,
api_url=ai_settings["api_url"],
model=ai_settings["model"],
api_key=ai_settings["api_key"],
timeout=ai_settings["timeout"],
)
except RuntimeError as exc:
fallback_plan = _build_rule_based_plan()
return {
"ok": False,
"message": str(exc),
"plan": fallback_plan,
"data": data,
}, 400
parse_warning = ""
try:
parsed_plan = json.loads(_extract_json_block(suggestion))
plan = _normalize_plan(parsed_plan, "已生成 AI 补货建议")
except json.JSONDecodeError:
plan = _build_rule_based_plan()
parse_warning = "AI 返回格式异常,已切换到规则兜底建议。"
return {
"ok": True,
"suggestion": suggestion,
"plan": plan,
"parse_warning": parse_warning,
"data": data,
}
@app.route("/ai/settings", methods=["GET", "POST"])
def ai_settings_page():
settings = _get_ai_settings()
error = ""
notice = request.args.get("notice", "").strip()
if request.method == "POST":
api_url = request.form.get("api_url", "").strip()
model = request.form.get("model", "").strip()
api_key = request.form.get("api_key", "").strip()
lcsc_app_id = request.form.get("lcsc_app_id", "").strip()
lcsc_access_key = request.form.get("lcsc_access_key", "").strip()
lcsc_secret_key = request.form.get("lcsc_secret_key", "").strip()
try:
timeout = int((request.form.get("timeout", "30") or "30").strip())
if timeout < 5:
raise ValueError
except ValueError:
error = "超时时间必须是大于等于 5 的整数"
timeout = settings["timeout"]
try:
restock_threshold = int((request.form.get("restock_threshold", "5") or "5").strip())
if restock_threshold < 0:
raise ValueError
except ValueError:
if not error:
error = "低库存阈值必须是大于等于 0 的整数"
restock_threshold = settings["restock_threshold"]
try:
restock_limit = int((request.form.get("restock_limit", "24") or "24").strip())
if restock_limit < 1:
raise ValueError
except ValueError:
if not error:
error = "补货条目数必须是大于等于 1 的整数"
restock_limit = settings["restock_limit"]
try:
lcsc_timeout = int((request.form.get("lcsc_timeout", "20") or "20").strip())
if lcsc_timeout < 5:
raise ValueError
except ValueError:
if not error:
error = "立创接口超时时间必须是大于等于 5 的整数"
lcsc_timeout = settings["lcsc_timeout"]
if not api_url and not error:
error = "API URL 不能为空"
if not model and not error:
error = "模型名称不能为空"
if (not lcsc_app_id or not lcsc_access_key or not lcsc_secret_key) and not error:
error = "立创接口需要填写 app_id、access_key、secret_key"
if not error:
settings = {
"api_url": api_url,
"model": model,
"api_key": api_key,
"timeout": timeout,
"restock_threshold": restock_threshold,
"restock_limit": restock_limit,
"lcsc_base_url": LCSC_BASE_URL,
"lcsc_basic_path": LCSC_BASIC_PATH,
"lcsc_timeout": lcsc_timeout,
"lcsc_app_id": lcsc_app_id,
"lcsc_access_key": lcsc_access_key,
"lcsc_secret_key": lcsc_secret_key,
}
_save_ai_settings(settings)
return redirect(url_for("ai_settings_page", notice="AI参数已保存"))
settings.update(
{
"api_url": api_url,
"model": model,
"api_key": api_key,
"timeout": timeout,
"restock_threshold": restock_threshold,
"restock_limit": restock_limit,
"lcsc_base_url": LCSC_BASE_URL,
"lcsc_basic_path": LCSC_BASIC_PATH,
"lcsc_timeout": lcsc_timeout,
"lcsc_app_id": lcsc_app_id,
"lcsc_access_key": lcsc_access_key,
"lcsc_secret_key": lcsc_secret_key,
}
)
return render_template(
"ai_settings.html",
settings=settings,
notice=notice,
error=error,
)
@app.route("/component/<int:component_id>/outbound", methods=["POST"])
def quick_outbound(component_id: int):
component = Component.query.get_or_404(component_id)
keyword = request.form.get("q", "").strip()
try:
amount = _parse_non_negative_int(request.form.get("amount", "0"), 0)
except ValueError:
return redirect(url_for("search_page", q=keyword, error="出库数量必须是大于等于 0 的整数"))
if amount <= 0:
return redirect(url_for("search_page", q=keyword, error="出库数量必须大于 0"))
if not component.is_enabled:
return redirect(url_for("search_page", q=keyword, error="该元件已停用,不能出库"))
if amount > int(component.quantity or 0):
return redirect(url_for("search_page", q=keyword, error="出库数量超过当前库存"))
component.quantity = int(component.quantity or 0) - amount
box = Box.query.get(component.box_id)
log_inventory_event(
event_type="component_outbound",
delta=-amount,
box=box,
component=component,
part_no=component.part_no,
)
db.session.commit()
slot_code = slot_code_for_box(box, component.slot_index) if box else str(component.slot_index)
notice = f"出库成功: {component.part_no} -{amount}{slot_code}"
return redirect(url_for("search_page", q=keyword, notice=notice))
@app.route("/stats")
def stats_page():
days = parse_days_value(request.args.get("days", "7"))
box_type_filter = parse_box_type_filter(request.args.get("box_type", "all"))
notice = request.args.get("notice", "").strip()
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
components = Component.query.all()
all_enabled_components = [c for c in components if c.is_enabled]
overall_total_quantity = sum(c.quantity for c in all_enabled_components)
enabled_components = [
c
for c in components
if c.is_enabled and (
box_type_filter == "all"
or (box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type_filter)
)
]
total_quantity = sum(c.quantity for c in enabled_components)
total_items = len(enabled_components)
low_stock_count = len([c for c in enabled_components if c.quantity < LOW_STOCK_THRESHOLD])
inventory_share = (
round(total_quantity * 100 / overall_total_quantity, 1) if overall_total_quantity > 0 else 0.0
)
start_day = datetime.now().date() - timedelta(days=days - 1)
event_query = InventoryEvent.query.filter(
InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time())
)
if box_type_filter != "all":
event_query = event_query.filter_by(box_type=box_type_filter)
period_operation_count = event_query.count()
active_days = len(
{
_to_date(row[0])
for row in event_query.with_entities(db.func.date(InventoryEvent.created_at)).all()
if row and row[0]
}
)
totals_by_type = {}
for box_type in BOX_TYPES.keys():
totals_by_type[box_type] = sum(
c.quantity
for c in components
if c.is_enabled and box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type
)
category_stats = []
for key, meta in BOX_TYPES.items():
category_components = [
c
for c in enabled_components
if box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == key
]
category_stats.append(
{
"key": key,
"label": meta["label"],
"item_count": len(category_components),
"quantity": sum(c.quantity for c in category_components),
}
)
category_stats.sort(key=lambda row: row["quantity"], reverse=True)
max_category_quantity = max([row["quantity"] for row in category_stats], default=0)
chart_mode = "category"
chart_title = "分类占比"
chart_rows = category_stats
max_chart_quantity = max_category_quantity
if box_type_filter != "all":
chart_mode = "component"
chart_title = "分类内元件库存 Top"
bucket = {}
for c in enabled_components:
key = (c.part_no or "", c.name or "")
if key not in bucket:
bucket[key] = 0
bucket[key] += int(c.quantity or 0)
component_rows = []
for (part_no, name), quantity in bucket.items():
label = name or part_no or "未命名元件"
if part_no:
label = f"{label} ({part_no})"
component_rows.append({"label": label, "quantity": quantity, "item_count": 1})
component_rows.sort(key=lambda row: row["quantity"], reverse=True)
chart_rows = component_rows[:8]
max_chart_quantity = max([row["quantity"] for row in chart_rows], default=0)
trend_points = build_trend_points_from_events(
days=days,
total_quantity=total_quantity,
box_type_filter=box_type_filter,
)
period_net_change = 0
if len(trend_points) >= 2:
period_net_change = trend_points[-1]["value"] - trend_points[0]["value"]
min_value = min([point["value"] for point in trend_points], default=0)
max_value = max([point["value"] for point in trend_points], default=0)
value_span = max(max_value - min_value, 1)
svg_points = []
if trend_points:
width = 520
height = 180
step_x = width / max(len(trend_points) - 1, 1)
for idx, point in enumerate(trend_points):
x = idx * step_x
y = height - ((point["value"] - min_value) / value_span) * height
svg_points.append(f"{x:.2f},{y:.2f}")
box_type_series_raw = build_box_type_trend_series(days=days, totals_by_type=totals_by_type)
box_type_series = []
for key, meta in BOX_TYPES.items():
values = box_type_series_raw["series"].get(key, [])
delta = values[-1] - values[0] if len(values) >= 2 else 0
box_type_series.append(
{
"key": key,
"label": meta["label"],
"sparkline": make_sparkline(values),
"latest": values[-1] if values else 0,
"delta": delta,
}
)
activity_rows = recent_events(limit=20, box_type_filter=box_type_filter)
return render_template(
"stats.html",
notice=notice,
days=days,
box_type_filter=box_type_filter,
box_types=BOX_TYPES,
total_quantity=total_quantity,
total_items=total_items,
low_stock_count=low_stock_count,
category_stats=category_stats,
max_category_quantity=max_category_quantity,
chart_mode=chart_mode,
chart_title=chart_title,
chart_rows=chart_rows,
max_chart_quantity=max_chart_quantity,
trend_points=trend_points,
trend_polyline=" ".join(svg_points),
min_value=min_value,
max_value=max_value,
period_net_change=period_net_change,
overall_total_quantity=overall_total_quantity,
inventory_share=inventory_share,
period_operation_count=period_operation_count,
active_days=active_days,
box_type_series=box_type_series,
activity_rows=activity_rows,
)
@app.route("/stats/export")
def stats_export_csv():
days = parse_days_value(request.args.get("days", "7"))
box_type_filter = parse_box_type_filter(request.args.get("box_type", "all"))
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
components = Component.query.all()
enabled_components = [
c
for c in components
if c.is_enabled and (
box_type_filter == "all"
or (box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type_filter)
)
]
total_quantity = sum(c.quantity for c in enabled_components)
trend_points = build_trend_points_from_events(days, total_quantity, box_type_filter)
delta_by_day = query_event_daily_delta(days, box_type_filter)
output = StringIO()
writer = csv.writer(output)
writer.writerow(["date", "inventory_total", "daily_delta", "days", "box_type_filter"])
for point in trend_points:
writer.writerow(
[
point["date"].isoformat(),
point["value"],
int(delta_by_day.get(point["date"], 0)),
days,
box_type_filter,
]
)
csv_content = output.getvalue()
output.close()
filename = f"inventory_stats_{box_type_filter}_{days}d.csv"
return Response(
csv_content,
mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@app.route("/stats/clear", methods=["POST"])
def clear_stats_logs():
days = parse_days_value(request.form.get("days", "7"))
box_type_filter = parse_box_type_filter(request.form.get("box_type", "all"))
clear_scope = (request.form.get("scope", "current") or "current").strip()
if clear_scope == "all":
deleted_count = InventoryEvent.query.delete()
db.session.commit()
notice = f"已清除全部统计日志,共 {deleted_count}"
return redirect(url_for("stats_page", days=days, box_type="all", notice=notice))
query = InventoryEvent.query
if box_type_filter != "all":
query = query.filter_by(box_type=box_type_filter)
deleted_count = query.delete(synchronize_session=False)
db.session.commit()
if box_type_filter == "all":
notice = f"已清除当前筛选(全部分类)统计日志,共 {deleted_count}"
else:
label = BOX_TYPES.get(box_type_filter, {}).get("label", box_type_filter)
notice = f"已清除当前筛选({label})统计日志,共 {deleted_count}"
return redirect(url_for("stats_page", days=days, box_type=box_type_filter, notice=notice))
def bootstrap() -> None:
with app.app_context():
db.create_all()
ensure_schema()
normalize_legacy_data()
bootstrap()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)