Files
inventory/app.py
wangbeihong 6f4a8d82f3 feat:增强框类型管理和搜索功能
- 引入基于 JSON 的框类型覆盖,允许动态更新标签、描述和前缀。
- 增加了一种可调节容量的定制盒型。
- 实现了应用和保存盒子类型覆盖的函数。
- 更新仪表盘,显示按箱型分组的库存低库存商品。
- 创建了一个新的搜索页面,方便快速访问具有增强搜索功能的组件。
- 用搜索页面取代扫描页面,将出站功能直接集成到搜索结果中。
- 改进的界面元素,提升导航和用户体验,包括新增按钮和样式。
- 移除过时的 scanner.js 文件并将其功能集成到搜索页面。
- 更新了各种模板,以反映新的搜索和框类型管理功能。
2026-03-11 16:01:11 +08:00

1820 lines
59 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import csv
import json
from copy import deepcopy
from io import StringIO
from datetime import datetime, timedelta
from flask import Flask, Response, redirect, render_template, request, url_for
from flask_sqlalchemy import SQLAlchemy
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_DIR = os.path.join(BASE_DIR, "data")
os.makedirs(DB_DIR, exist_ok=True)
DB_PATH = os.path.join(DB_DIR, "inventory.db")
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{DB_PATH}"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
LOW_STOCK_THRESHOLD = 5
BOX_TYPES_OVERRIDE_PATH = os.path.join(DB_DIR, "box_types.json")
DEFAULT_BOX_TYPES = {
"small_28": {
"label": "28格小盒大盒",
"default_capacity": 28,
"default_desc": "4连排小盒常见摆放为竖向7排",
"default_prefix": "A",
},
"medium_14": {
"label": "14格中盒大盒",
"default_capacity": 14,
"default_desc": "14格中盒内部摆放方向与28格不同",
"default_prefix": "B",
},
"custom": {
"label": "自定义容器",
"default_capacity": 20,
"default_desc": "可按实际盒型设置格数与编号前缀",
"default_prefix": "C",
},
"bag": {
"label": "袋装清单",
"default_capacity": 1,
"default_desc": "一袋一种器件,按清单管理",
"default_prefix": "BAG",
},
}
BOX_TYPES = deepcopy(DEFAULT_BOX_TYPES)
def _apply_box_type_overrides() -> None:
if not os.path.exists(BOX_TYPES_OVERRIDE_PATH):
return
try:
with open(BOX_TYPES_OVERRIDE_PATH, "r", encoding="utf-8") as f:
overrides = json.load(f)
except (OSError, json.JSONDecodeError):
return
if not isinstance(overrides, dict):
return
for key, value in overrides.items():
if key not in BOX_TYPES or not isinstance(value, dict):
continue
for field in ("label", "default_desc", "default_prefix"):
if field not in value:
continue
BOX_TYPES[key][field] = value[field]
# Keep bag list capacity fixed by domain rule.
BOX_TYPES["bag"]["default_capacity"] = 1
def _save_box_type_overrides() -> None:
payload = {}
for key, defaults in DEFAULT_BOX_TYPES.items():
current = BOX_TYPES.get(key, defaults)
changed = {}
for field in ("label", "default_desc", "default_prefix"):
if current.get(field) != defaults.get(field):
changed[field] = current.get(field)
if changed:
payload[key] = changed
with open(BOX_TYPES_OVERRIDE_PATH, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
_apply_box_type_overrides()
class Box(db.Model):
__tablename__ = "boxes"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False, unique=True)
description = db.Column(db.String(255), nullable=True)
box_type = db.Column(db.String(30), nullable=False, default="small_28")
slot_capacity = db.Column(db.Integer, nullable=False, default=28)
slot_prefix = db.Column(db.String(16), nullable=False, default="A")
start_number = db.Column(db.Integer, nullable=False, default=1)
class Component(db.Model):
__tablename__ = "components"
id = db.Column(db.Integer, primary_key=True)
box_id = db.Column(db.Integer, db.ForeignKey("boxes.id"), nullable=False)
slot_index = db.Column(db.Integer, nullable=False)
part_no = db.Column(db.String(100), nullable=False)
name = db.Column(db.String(120), nullable=False)
specification = db.Column(db.String(120), nullable=True)
quantity = db.Column(db.Integer, nullable=False, default=0)
location = db.Column(db.String(120), nullable=True)
note = db.Column(db.Text, nullable=True)
is_enabled = db.Column(db.Boolean, nullable=False, default=True)
box = db.relationship("Box", backref=db.backref("components", lazy=True))
__table_args__ = (
db.UniqueConstraint("box_id", "slot_index", name="uq_box_slot"),
)
class InventoryEvent(db.Model):
__tablename__ = "inventory_events"
id = db.Column(db.Integer, primary_key=True)
box_id = db.Column(db.Integer, nullable=True)
box_type = db.Column(db.String(30), nullable=True)
component_id = db.Column(db.Integer, nullable=True)
part_no = db.Column(db.String(100), nullable=True)
event_type = db.Column(db.String(30), nullable=False)
delta = db.Column(db.Integer, nullable=False, default=0)
created_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now())
def _add_column_if_missing(table_name: str, column_name: str, ddl: str) -> None:
columns = {
row[1]
for row in db.session.execute(db.text(f"PRAGMA table_info({table_name})")).fetchall()
}
if column_name not in columns:
db.session.execute(db.text(f"ALTER TABLE {table_name} ADD COLUMN {ddl}"))
def ensure_schema() -> None:
_add_column_if_missing(
"boxes",
"box_type",
"box_type VARCHAR(30) NOT NULL DEFAULT 'small_28'",
)
_add_column_if_missing(
"boxes",
"slot_capacity",
"slot_capacity INTEGER NOT NULL DEFAULT 28",
)
_add_column_if_missing(
"boxes",
"slot_prefix",
"slot_prefix VARCHAR(16) NOT NULL DEFAULT 'A'",
)
_add_column_if_missing(
"boxes",
"start_number",
"start_number INTEGER NOT NULL DEFAULT 1",
)
_add_column_if_missing(
"components",
"is_enabled",
"is_enabled BOOLEAN NOT NULL DEFAULT 1",
)
db.session.commit()
def slot_code_for_box(box: Box, slot_index: int) -> str:
serial = box.start_number + slot_index - 1
return f"{box.slot_prefix}{serial}"
def slot_range_label(box: Box) -> str:
if box.box_type == "bag":
return box.slot_prefix or BOX_TYPES["bag"]["default_prefix"]
start_code = slot_code_for_box(box, 1)
end_code = slot_code_for_box(box, box.slot_capacity)
return f"{start_code}-{end_code}"
def compose_box_name(base_name: str, prefix: str, start_number: int, slot_capacity: int) -> str:
base = (base_name or "").strip()
if not base:
base = "盒子"
end_number = start_number + slot_capacity - 1
return f"{base} {prefix}{start_number}-{prefix}{end_number}"
def make_unique_box_name(candidate_name: str, exclude_box_id: int = None) -> str:
name = candidate_name
counter = 2
while True:
query = Box.query.filter_by(name=name)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
if not query.first():
return name
name = f"{candidate_name} #{counter}"
counter += 1
def infer_base_name(box: Box) -> str:
pattern = rf"\s+{re.escape(box.slot_prefix)}\d+-{re.escape(box.slot_prefix)}\d+(?:\s+#\d+)?$"
base = re.sub(pattern, "", box.name).strip()
return base or box.name
def slot_data_for_box(box: Box):
components = Component.query.filter_by(box_id=box.id).all()
slot_map = {c.slot_index: c for c in components}
slots = []
for slot in range(1, box.slot_capacity + 1):
slots.append(
{
"slot": slot,
"slot_code": slot_code_for_box(box, slot),
"component": slot_map.get(slot),
}
)
return slots
def bag_rows_for_box(box: Box):
rows = []
components = (
Component.query.filter_by(box_id=box.id)
.order_by(Component.slot_index.asc())
.all()
)
for c in components:
rows.append({"component": c, "slot_code": slot_code_for_box(box, c.slot_index)})
return rows
def _parse_non_negative_int(raw_value: str, default_value: int = 0) -> int:
raw = (raw_value or "").strip()
if raw == "":
return default_value
value = int(raw)
if value < 0:
raise ValueError
return value
def normalize_legacy_data() -> None:
db.session.execute(
db.text(
"UPDATE boxes SET box_type = 'small_28' WHERE box_type IS NULL OR box_type = ''"
)
)
db.session.execute(
db.text("UPDATE boxes SET slot_capacity = 28 WHERE slot_capacity IS NULL")
)
db.session.execute(
db.text("UPDATE boxes SET slot_prefix = 'A' WHERE slot_prefix IS NULL OR slot_prefix = ''")
)
db.session.execute(
db.text("UPDATE boxes SET start_number = 1 WHERE start_number IS NULL")
)
db.session.execute(
db.text("UPDATE components SET is_enabled = 1 WHERE is_enabled IS NULL")
)
for box in Box.query.all():
if box.box_type not in BOX_TYPES:
box.box_type = "small_28"
if not box.slot_capacity or box.slot_capacity < 1:
box.slot_capacity = BOX_TYPES[box.box_type]["default_capacity"]
if not box.slot_prefix:
box.slot_prefix = BOX_TYPES[box.box_type]["default_prefix"]
if box.start_number is None or box.start_number < 0:
box.start_number = 1
if box.box_type == "bag":
# Bag list is prefix-based and does not use range numbering.
box.start_number = 1
box.name = f"袋装清单 {box.slot_prefix}"
# Keep bag list as a fixed container; create one if missing.
if not Box.query.filter_by(box_type="bag").first():
default_meta = BOX_TYPES["bag"]
db.session.add(
Box(
name=f"袋装清单 {default_meta['default_prefix']}",
description=default_meta["default_desc"],
box_type="bag",
slot_capacity=default_meta["default_capacity"],
slot_prefix=default_meta["default_prefix"],
start_number=1,
)
)
db.session.commit()
def make_overview_rows(box: Box):
enabled_components = (
Component.query.filter_by(box_id=box.id, is_enabled=True)
.order_by(Component.slot_index.asc())
.all()
)
rows = []
for c in enabled_components:
rows.append(
{
"slot_code": slot_code_for_box(box, c.slot_index),
"name": c.name,
"part_no": c.part_no,
}
)
return rows
def box_sort_key(box: Box):
return (
(box.slot_prefix or "").upper(),
box.start_number if box.start_number is not None else 0,
box.name or "",
)
def has_range_conflict(
*,
box_type: str,
prefix: str,
start_number: int,
slot_capacity: int,
exclude_box_id: int = None,
):
end_number = start_number + slot_capacity - 1
query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
for other in query.all():
other_start = other.start_number
other_end = other.start_number + other.slot_capacity - 1
# Two ranges overlap unless one is strictly before the other.
if not (end_number < other_start or start_number > other_end):
return True, other
return False, None
def suggest_next_start_number(
*,
box_type: str,
prefix: str,
slot_capacity: int,
exclude_box_id: int = None,
) -> int:
max_end = 0
query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
for other in query.all():
other_end = other.start_number + other.slot_capacity - 1
if other_end > max_end:
max_end = other_end
return max_end + 1 if max_end > 0 else 1
def build_index_anchor(box_type: str = "") -> str:
if box_type in BOX_TYPES:
return f"group-{box_type}"
return ""
def bad_request(message: str, box_type: str = ""):
anchor = build_index_anchor(box_type)
if anchor and box_type in BOX_TYPES:
back_url = url_for("type_page", box_type=box_type)
else:
back_url = request.referrer or url_for("types_page")
return (
render_template(
"error.html",
status_code=400,
title="请求参数有误",
message=message,
back_url=back_url,
),
400,
)
def render_box_page(box: Box, error: str = "", notice: str = ""):
slots = slot_data_for_box(box)
bag_rows = bag_rows_for_box(box) if box.box_type == "bag" else []
return render_template(
"box.html",
box=box,
slots=slots,
bag_rows=bag_rows,
box_types=BOX_TYPES,
slot_range=slot_range_label(box),
low_stock_threshold=LOW_STOCK_THRESHOLD,
error=error,
notice=notice,
)
def _next_empty_slot_index(box: Box, occupied_slots: set[int]):
if box.box_type == "bag":
return (max(occupied_slots) if occupied_slots else 0) + 1
for idx in range(1, box.slot_capacity + 1):
if idx not in occupied_slots:
return idx
return None
def _parse_bulk_line(line: str):
parts = [p.strip() for p in re.split(r"[,\t]", line)]
while len(parts) < 5:
parts.append("")
return {
"part_no": parts[0],
"name": parts[1],
"quantity_raw": parts[2],
"specification": parts[3],
"note": parts[4],
}
def log_inventory_event(
*,
event_type: str,
delta: int,
box: Box = None,
component: Component = None,
part_no: str = "",
):
event = InventoryEvent(
box_id=box.id if box else (component.box_id if component else None),
box_type=box.box_type if box else None,
component_id=component.id if component else None,
part_no=(part_no or (component.part_no if component else "") or "").strip() or None,
event_type=event_type,
delta=int(delta),
)
db.session.add(event)
def parse_days_value(raw_days: str) -> int:
try:
days = int((raw_days or "7").strip())
except ValueError:
days = 7
return days if days in (7, 30) else 7
def parse_box_type_filter(raw_box_type: str) -> str:
box_type = (raw_box_type or "").strip()
return box_type if box_type in BOX_TYPES else "all"
def _to_date(raw_day):
if isinstance(raw_day, str):
return datetime.strptime(raw_day, "%Y-%m-%d").date()
return raw_day
def query_event_daily_delta(days: int, box_type_filter: str = "all"):
today = datetime.now().date()
start_day = today - timedelta(days=days - 1)
query = db.session.query(
db.func.date(InventoryEvent.created_at).label("event_day"),
db.func.sum(InventoryEvent.delta).label("daily_delta"),
).filter(InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time()))
if box_type_filter != "all":
query = query.filter(InventoryEvent.box_type == box_type_filter)
rows = query.group_by(db.func.date(InventoryEvent.created_at)).all()
delta_by_day = {}
for row in rows:
delta_by_day[_to_date(row.event_day)] = int(row.daily_delta or 0)
return delta_by_day
def build_trend_points_from_events(days: int, total_quantity: int, box_type_filter: str = "all"):
safe_days = days if days in (7, 30) else 7
today = datetime.now().date()
delta_by_day = query_event_daily_delta(safe_days, box_type_filter)
points = []
running_total = total_quantity
reverse_days = [today - timedelta(days=offset) for offset in range(safe_days)]
for day in reverse_days:
points.append(
{
"date": day,
"label": day.strftime("%m-%d"),
"value": running_total,
}
)
running_total -= delta_by_day.get(day, 0)
points.reverse()
return points
def build_box_type_trend_series(days: int, totals_by_type: dict):
safe_days = days if days in (7, 30) else 7
today = datetime.now().date()
all_days = [today - timedelta(days=offset) for offset in range(safe_days)]
series = {}
for box_type in BOX_TYPES.keys():
delta_by_day = query_event_daily_delta(safe_days, box_type)
running_total = int(totals_by_type.get(box_type, 0))
values = []
for day in all_days:
values.append(running_total)
running_total -= delta_by_day.get(day, 0)
values.reverse()
series[box_type] = values
return {
"labels": [day.strftime("%m-%d") for day in reversed(all_days)],
"series": series,
}
def make_sparkline(values: list[int], width: int = 220, height: int = 56) -> str:
if not values:
return ""
min_value = min(values)
max_value = max(values)
span = max(max_value - min_value, 1)
step_x = width / max(len(values) - 1, 1)
points = []
for idx, value in enumerate(values):
x = idx * step_x
y = height - ((value - min_value) / span) * height
points.append(f"{x:.2f},{y:.2f}")
return " ".join(points)
def event_type_label(event_type: str) -> str:
labels = {
"quick_inbound_add": "快速入库新增",
"quick_inbound_merge": "快速入库合并",
"component_outbound": "快速出库",
"component_save": "编辑保存",
"component_enable": "启用元件",
"component_disable": "停用元件",
"component_delete": "删除元件",
"bag_add": "袋装新增",
"bag_batch_add": "袋装批量新增",
"bag_merge": "袋装合并",
"bag_batch_merge": "袋装批量合并",
"box_delete": "删除盒子",
}
return labels.get(event_type, event_type)
def recent_events(limit: int = 20, box_type_filter: str = "all"):
query = InventoryEvent.query
if box_type_filter != "all":
query = query.filter_by(box_type=box_type_filter)
rows = query.order_by(InventoryEvent.created_at.desc()).limit(limit).all()
items = []
for row in rows:
items.append(
{
"time": row.created_at.strftime("%Y-%m-%d %H:%M") if row.created_at else "-",
"type": event_type_label(row.event_type),
"box_type": BOX_TYPES.get(row.box_type, {}).get("label", "全部"),
"part_no": row.part_no or "-",
"delta": int(row.delta or 0),
}
)
return items
def build_dashboard_context():
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
boxes.sort(key=box_sort_key)
groups = {key: [] for key in BOX_TYPES.keys()}
for box in boxes:
box_type = box.box_type if box.box_type in BOX_TYPES else "small_28"
overview_rows = make_overview_rows(box)
groups[box_type].append(
{
"box": box,
"used_count": len(overview_rows),
"slot_range": slot_range_label(box),
"overview_rows": overview_rows,
"base_name": infer_base_name(box),
}
)
components = Component.query.all()
enabled_components = [c for c in components if c.is_enabled]
disabled_components = [c for c in components if not c.is_enabled]
low_stock_components = [c for c in enabled_components if c.quantity < LOW_STOCK_THRESHOLD]
trend_points_7d = build_trend_points_from_events(
days=7,
total_quantity=sum(c.quantity for c in enabled_components),
box_type_filter="all",
)
period_net_change_7d = 0
if len(trend_points_7d) >= 2:
period_net_change_7d = trend_points_7d[-1]["value"] - trend_points_7d[0]["value"]
low_stock_items = []
for c in sorted(low_stock_components, key=lambda item: (item.quantity, item.name or ""))[:12]:
box = box_by_id.get(c.box_id)
box_type_key = box.box_type if box and box.box_type in BOX_TYPES else "small_28"
low_stock_items.append(
{
"name": c.name,
"part_no": c.part_no,
"quantity": c.quantity,
"box_type": box_type_key,
"box_type_label": BOX_TYPES[box_type_key]["label"],
"box_name": box.name if box else f"{c.box_id}",
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
"edit_url": url_for("edit_component", box_id=c.box_id, slot=c.slot_index),
}
)
category_stats = []
max_category_quantity = 0
for key, meta in BOX_TYPES.items():
category_components = [
c
for c in enabled_components
if box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == key
]
quantity = sum(c.quantity for c in category_components)
max_category_quantity = max(max_category_quantity, quantity)
category_stats.append(
{
"key": key,
"label": meta["label"],
"item_count": len(category_components),
"quantity": quantity,
}
)
stats = {
"box_count": len(boxes),
"active_items": len(enabled_components),
"low_stock_count": len(low_stock_components),
"disabled_count": len(disabled_components),
"max_category_quantity": max_category_quantity,
"period_net_change_7d": period_net_change_7d,
}
return {
"groups": groups,
"stats": stats,
"category_stats": category_stats,
"low_stock_items": low_stock_items,
}
@app.route("/")
def index():
return redirect(url_for("types_page"))
@app.route("/types")
def types_page():
dashboard = build_dashboard_context()
notice = request.args.get("notice", "").strip()
error = request.args.get("error", "").strip()
category_stats_map = {item["key"]: item for item in dashboard["category_stats"]}
low_stock_groups = []
for key, meta in BOX_TYPES.items():
grouped_items = [
item
for item in dashboard["low_stock_items"]
if item.get("box_type") == key
]
low_stock_groups.append(
{
"key": key,
"label": meta["label"],
"items": grouped_items,
}
)
type_cards = []
for key, meta in BOX_TYPES.items():
category_item = category_stats_map.get(key, {})
type_cards.append(
{
"key": key,
"label": meta["label"],
"desc": meta["default_desc"],
"count": len(dashboard["groups"].get(key, [])),
"item_count": int(category_item.get("item_count", 0)),
"quantity": int(category_item.get("quantity", 0)),
"url": url_for("type_page", box_type=key),
}
)
return render_template(
"types.html",
type_cards=type_cards,
stats=dashboard["stats"],
low_stock_groups=low_stock_groups,
notice=notice,
error=error,
)
@app.route("/container-type/<box_type>/edit", methods=["GET", "POST"])
def edit_container_type(box_type: str):
if box_type not in BOX_TYPES:
return bad_request("无效盒子类型", "")
meta = BOX_TYPES[box_type]
error = ""
if request.method == "POST":
label = request.form.get("label", "").strip()
default_desc = request.form.get("default_desc", "").strip()
default_prefix = request.form.get("default_prefix", "").strip().upper()
if not label:
error = "容器名称不能为空"
elif not default_prefix:
error = "默认前缀不能为空"
if not error:
meta["label"] = label
meta["default_desc"] = default_desc or DEFAULT_BOX_TYPES[box_type]["default_desc"]
meta["default_prefix"] = default_prefix
_save_box_type_overrides()
return redirect(url_for("types_page", notice="容器属性已更新"))
return render_template(
"type_edit.html",
box_type=box_type,
meta=meta,
error=error,
)
@app.route("/type/<box_type>")
def type_page(box_type: str):
if box_type not in BOX_TYPES:
return bad_request("无效盒子类型", "")
dashboard = build_dashboard_context()
return render_template(
"index.html",
groups=dashboard["groups"],
box_types=BOX_TYPES,
stats=dashboard["stats"],
category_stats=dashboard["category_stats"],
low_stock_items=dashboard["low_stock_items"],
view_box_types=[box_type],
current_box_type=box_type,
separate_mode=True,
)
@app.route("/boxes/create", methods=["POST"])
def create_box():
box_type = request.form.get("box_type", "small_28").strip()
base_name = request.form.get("name", "").strip()
description = request.form.get("description", "").strip()
slot_prefix = request.form.get("slot_prefix", "").strip().upper()
if box_type not in BOX_TYPES:
return bad_request("无效盒子类型", box_type)
if box_type == "bag" and Box.query.filter_by(box_type="bag").first():
return bad_request("袋装清单为固定容器,无需新增盒子", box_type)
if not base_name:
return bad_request("盒子名称不能为空", box_type)
try:
start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1)
except ValueError:
return bad_request("起始序号必须是大于等于 0 的整数", box_type)
meta = BOX_TYPES[box_type]
slot_capacity = meta["default_capacity"]
if box_type == "custom":
try:
slot_capacity = _parse_non_negative_int(
request.form.get("slot_capacity", str(meta["default_capacity"])),
meta["default_capacity"],
)
except ValueError:
return bad_request("格数必须是大于等于 1 的整数", box_type)
if slot_capacity < 1:
return bad_request("格数必须是大于等于 1 的整数", box_type)
effective_prefix = slot_prefix or meta["default_prefix"]
conflict, other_box = has_range_conflict(
box_type=box_type,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
)
if conflict:
return bad_request(
"编号范围冲突: 与现有盒子 "
f"[{other_box.name}]"
" 重叠,请更换前缀或起始序号",
box_type,
)
generated_name = compose_box_name(
base_name=base_name,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
)
final_name = make_unique_box_name(generated_name)
box = Box(
name=final_name,
description=description or meta["default_desc"],
box_type=box_type,
slot_capacity=slot_capacity,
slot_prefix=effective_prefix,
start_number=start_number,
)
db.session.add(box)
db.session.commit()
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
target_type = return_to_type if return_to_type != "all" else box_type
return redirect(url_for("type_page", box_type=target_type))
@app.route("/boxes/<int:box_id>/update", methods=["POST"])
def update_box(box_id: int):
box = Box.query.get_or_404(box_id)
base_name = request.form.get("name", "").strip()
description = request.form.get("description", "").strip()
slot_prefix = request.form.get("slot_prefix", "").strip().upper()
if not base_name:
return bad_request("盒子名称不能为空", box.box_type)
try:
start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1)
except ValueError:
return bad_request("起始序号必须是大于等于 0 的整数", box.box_type)
slot_capacity = box.slot_capacity
if box.box_type == "custom":
try:
slot_capacity = _parse_non_negative_int(
request.form.get("slot_capacity", str(box.slot_capacity)),
box.slot_capacity,
)
except ValueError:
return bad_request("格数必须是大于等于 1 的整数", box.box_type)
if slot_capacity < 1:
return bad_request("格数必须是大于等于 1 的整数", box.box_type)
max_used_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter_by(box_id=box.id)
.scalar()
or 0
)
if max_used_slot > slot_capacity:
return bad_request(
f"格数不能小于已使用位置 {max_used_slot}",
box.box_type,
)
effective_prefix = slot_prefix or BOX_TYPES[box.box_type]["default_prefix"]
conflict, other_box = has_range_conflict(
box_type=box.box_type,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
exclude_box_id=box.id,
)
if conflict:
return bad_request(
"编号范围冲突: 与现有盒子 "
f"[{other_box.name}]"
" 重叠,请更换前缀或起始序号",
box.box_type,
)
generated_name = compose_box_name(
base_name=base_name,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
)
box.name = make_unique_box_name(generated_name, exclude_box_id=box.id)
box.description = description or BOX_TYPES[box.box_type]["default_desc"]
box.slot_prefix = effective_prefix
box.start_number = start_number
box.slot_capacity = slot_capacity
db.session.commit()
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
target_type = return_to_type if return_to_type != "all" else box.box_type
return redirect(url_for("type_page", box_type=target_type))
@app.route("/boxes/<int:box_id>/delete", methods=["POST"])
def delete_box(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type == "bag":
return bad_request("袋装清单为固定容器,不能删除", box.box_type)
enabled_sum = (
db.session.query(db.func.sum(Component.quantity))
.filter_by(box_id=box.id, is_enabled=True)
.scalar()
or 0
)
if enabled_sum:
log_inventory_event(event_type="box_delete", delta=-int(enabled_sum), box=box)
Component.query.filter_by(box_id=box.id).delete()
box_type = box.box_type
db.session.delete(box)
db.session.commit()
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
target_type = return_to_type if return_to_type != "all" else box_type
return redirect(url_for("type_page", box_type=target_type))
@app.route("/boxes/suggest-start")
def suggest_start():
box_type = request.args.get("box_type", "small_28").strip()
if box_type not in BOX_TYPES:
return {"ok": False, "message": "无效盒子类型"}, 400
slot_prefix = request.args.get("slot_prefix", "").strip().upper()
effective_prefix = slot_prefix or BOX_TYPES[box_type]["default_prefix"]
box_id_raw = request.args.get("box_id", "").strip()
exclude_box_id = None
slot_capacity = BOX_TYPES[box_type]["default_capacity"]
if box_type == "custom" and not box_id_raw:
try:
slot_capacity = _parse_non_negative_int(
request.args.get("slot_capacity", str(slot_capacity)),
slot_capacity,
)
except ValueError:
return {"ok": False, "message": "格数必须是大于等于 1 的整数"}, 400
if slot_capacity < 1:
return {"ok": False, "message": "格数必须是大于等于 1 的整数"}, 400
if box_id_raw:
try:
box_id = int(box_id_raw)
except ValueError:
return {"ok": False, "message": "box_id 非法"}, 400
box = Box.query.get(box_id)
if not box:
return {"ok": False, "message": "盒子不存在"}, 404
box_type = box.box_type
slot_capacity = box.slot_capacity
exclude_box_id = box.id
suggested = suggest_next_start_number(
box_type=box_type,
prefix=effective_prefix,
slot_capacity=slot_capacity,
exclude_box_id=exclude_box_id,
)
end_number = suggested + slot_capacity - 1
return {
"ok": True,
"start_number": suggested,
"slot_prefix": effective_prefix,
"preview_range": f"{effective_prefix}{suggested}-{effective_prefix}{end_number}",
}
@app.route("/box/<int:box_id>")
def view_box(box_id: int):
box = Box.query.get_or_404(box_id)
return render_box_page(box)
@app.route("/box/<int:box_id>/labels/export")
def export_box_labels_csv(box_id: int):
box = Box.query.get_or_404(box_id)
rows = (
Component.query.filter_by(box_id=box.id, is_enabled=True)
.order_by(Component.slot_index.asc())
.all()
)
output = StringIO()
writer = csv.writer(output)
writer.writerow(
[
"盒子名称(box_name)",
"位置编号(slot_code)",
"料号(part_no)",
"名称(name)",
"规格(specification)",
"数量(quantity)",
"位置备注(location)",
"备注(note)",
]
)
for c in rows:
slot_code = slot_code_for_box(box, c.slot_index)
writer.writerow(
[
box.name,
slot_code,
c.part_no or "",
c.name or "",
c.specification or "",
int(c.quantity or 0),
c.location or "",
c.note or "",
]
)
csv_content = "\ufeff" + output.getvalue()
output.close()
filename = f"labels_box_{box.id}.csv"
return Response(
csv_content,
mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@app.route("/box/<int:box_id>/quick-inbound", methods=["POST"])
def quick_inbound(box_id: int):
box = Box.query.get_or_404(box_id)
raw_lines = request.form.get("lines", "")
lines = [line.strip() for line in raw_lines.splitlines() if line.strip()]
if not lines:
return render_box_page(box, error="快速入库失败: 请至少输入一行")
components = Component.query.filter_by(box_id=box.id).all()
occupied_slots = {c.slot_index for c in components}
by_part_no = {c.part_no: c for c in components if c.part_no}
added_count = 0
merged_count = 0
skipped_lines = []
changed = False
for line_no, line in enumerate(lines, start=1):
parsed = _parse_bulk_line(line)
part_no = parsed["part_no"]
name = parsed["name"]
quantity_raw = parsed["quantity_raw"]
specification = parsed["specification"]
note = parsed["note"]
if not part_no or not name:
skipped_lines.append(f"{line_no}")
continue
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
skipped_lines.append(f"{line_no}")
continue
existing = by_part_no.get(part_no)
if existing:
old_enabled_qty = existing.quantity if existing.is_enabled else 0
existing.quantity += quantity
existing.name = name or existing.name
if specification:
existing.specification = specification
if note:
existing.note = note
existing.is_enabled = True
new_enabled_qty = existing.quantity
delta = new_enabled_qty - old_enabled_qty
if delta:
log_inventory_event(
event_type="quick_inbound_merge",
delta=delta,
box=box,
component=existing,
part_no=part_no,
)
merged_count += 1
changed = True
continue
slot_index = _next_empty_slot_index(box, occupied_slots)
if slot_index is None:
skipped_lines.append(f"{line_no} 行(盒子已满)")
continue
item = Component(
box_id=box.id,
slot_index=slot_index,
part_no=part_no,
name=name,
quantity=quantity,
specification=specification or None,
note=note or None,
is_enabled=True,
)
db.session.add(item)
if quantity:
log_inventory_event(
event_type="quick_inbound_add",
delta=quantity,
box=box,
component=item,
part_no=part_no,
)
occupied_slots.add(slot_index)
by_part_no[part_no] = item
added_count += 1
changed = True
if box.box_type == "bag" and occupied_slots:
box.slot_capacity = max(box.slot_capacity, max(occupied_slots))
if changed:
db.session.commit()
if added_count == 0 and merged_count == 0:
return render_box_page(
box,
error="快速入库失败: 没有可导入的数据,请检查格式",
)
message = f"快速入库完成: 新增 {added_count} 条,合并 {merged_count}"
if skipped_lines:
message += ";跳过: " + ", ".join(skipped_lines)
return render_box_page(box, notice=message)
@app.route("/box/<int:box_id>/bags/add", methods=["POST"])
def add_bag_item(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type != "bag":
return "当前盒子不是袋装清单", 400
part_no = request.form.get("part_no", "").strip()
name = request.form.get("name", "").strip()
specification = request.form.get("specification", "").strip()
note = request.form.get("note", "").strip()
if not part_no or not name:
return render_box_page(box, error="袋装新增失败: 料号和名称不能为空")
try:
quantity = _parse_non_negative_int(request.form.get("quantity", "0"), 0)
except ValueError:
return render_box_page(box, error="袋装新增失败: 数量必须是大于等于 0 的整数")
existing = Component.query.filter_by(box_id=box.id, part_no=part_no).first()
if existing:
old_enabled_qty = existing.quantity if existing.is_enabled else 0
existing.name = name or existing.name
existing.quantity += quantity
existing.specification = specification or existing.specification
existing.note = note or existing.note
existing.is_enabled = True
new_enabled_qty = existing.quantity
delta = int(new_enabled_qty - old_enabled_qty)
if delta:
log_inventory_event(
event_type="bag_merge",
delta=delta,
box=box,
component=existing,
part_no=part_no,
)
db.session.commit()
return render_box_page(box, notice="同料号已存在,已合并到原袋位")
next_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter(Component.box_id == box.id)
.scalar()
or 0
) + 1
item = Component(
box_id=box.id,
slot_index=next_slot,
part_no=part_no,
name=name,
quantity=quantity,
specification=specification or None,
note=note or None,
is_enabled=True,
)
db.session.add(item)
if quantity:
log_inventory_event(
event_type="bag_add",
delta=quantity,
box=box,
component=item,
part_no=part_no,
)
if next_slot > box.slot_capacity:
box.slot_capacity = next_slot
db.session.commit()
return render_box_page(box, notice="已新增 1 条袋装记录")
@app.route("/box/<int:box_id>/bags/batch", methods=["POST"])
def add_bag_items_batch(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type != "bag":
return "当前盒子不是袋装清单", 400
raw_lines = request.form.get("lines", "")
lines = [line.strip() for line in raw_lines.splitlines() if line.strip()]
if not lines:
return render_box_page(box, error="批量新增失败: 请至少输入一行")
invalid_lines = []
added_count = 0
next_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter(Component.box_id == box.id)
.scalar()
or 0
) + 1
existing_by_part_no = {
c.part_no: c for c in Component.query.filter_by(box_id=box.id).all() if c.part_no
}
merged_count = 0
for line_no, line in enumerate(lines, start=1):
parts = [p.strip() for p in re.split(r"[,\t]", line)]
while len(parts) < 5:
parts.append("")
part_no = parts[0]
name = parts[1]
quantity_raw = parts[2]
specification = parts[3]
note = parts[4]
if not part_no or not name:
invalid_lines.append(f"{line_no}")
continue
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
invalid_lines.append(f"{line_no}")
continue
existing = existing_by_part_no.get(part_no)
if existing:
old_enabled_qty = existing.quantity if existing.is_enabled else 0
existing.name = name or existing.name
existing.quantity += quantity
existing.specification = specification or existing.specification
existing.note = note or existing.note
existing.is_enabled = True
delta = int(existing.quantity - old_enabled_qty)
if delta:
log_inventory_event(
event_type="bag_batch_merge",
delta=delta,
box=box,
component=existing,
part_no=part_no,
)
merged_count += 1
continue
new_component = Component(
box_id=box.id,
slot_index=next_slot,
part_no=part_no,
name=name,
quantity=quantity,
specification=specification or None,
note=note or None,
is_enabled=True,
)
db.session.add(new_component)
existing_by_part_no[part_no] = new_component
if quantity:
log_inventory_event(
event_type="bag_batch_add",
delta=quantity,
box=box,
component=new_component,
part_no=part_no,
)
added_count += 1
next_slot += 1
if added_count:
box.slot_capacity = max(box.slot_capacity, next_slot - 1)
db.session.commit()
if invalid_lines and added_count == 0 and merged_count == 0:
return render_box_page(
box,
error="批量新增失败: 数据格式不正确(请检查: " + ", ".join(invalid_lines) + "",
)
if invalid_lines:
return render_box_page(
box,
notice=f"已新增 {added_count} 条,合并 {merged_count} 条,以下行被跳过: " + ", ".join(invalid_lines),
)
return render_box_page(box, notice=f"批量新增成功:新增 {added_count} 条,合并 {merged_count}")
@app.route("/edit/<int:box_id>/<int:slot>", methods=["GET", "POST"])
def edit_component(box_id: int, slot: int):
box = Box.query.get_or_404(box_id)
if slot < 1 or slot > box.slot_capacity:
return "无效的格子编号", 400
search_query = request.args.get("q", "").strip()
component = Component.query.filter_by(box_id=box.id, slot_index=slot).first()
if request.method == "POST":
action = request.form.get("action", "save")
search_query_post = request.form.get("q", "").strip()
search_query_effective = search_query_post or search_query
if action == "delete":
if component:
if component.is_enabled and component.quantity:
log_inventory_event(
event_type="component_delete",
delta=-int(component.quantity),
box=box,
component=component,
)
db.session.delete(component)
db.session.commit()
if search_query_effective:
return redirect(url_for("search_page", q=search_query_effective))
return redirect(url_for("view_box", box_id=box.id))
if action == "toggle_enable":
if component:
if not component.is_enabled:
component.is_enabled = True
if component.quantity:
log_inventory_event(
event_type="component_enable",
delta=int(component.quantity),
box=box,
component=component,
)
db.session.commit()
return redirect(url_for("edit_component", box_id=box.id, slot=slot, q=search_query_post))
if action == "toggle_disable":
if component:
if component.is_enabled:
component.is_enabled = False
if component.quantity:
log_inventory_event(
event_type="component_disable",
delta=-int(component.quantity),
box=box,
component=component,
)
db.session.commit()
return redirect(url_for("edit_component", box_id=box.id, slot=slot, q=search_query_post))
part_no = request.form.get("part_no", "").strip()
name = request.form.get("name", "").strip()
specification = request.form.get("specification", "").strip()
quantity_raw = request.form.get("quantity", "0").strip()
note = request.form.get("note", "").strip()
if not part_no or not name:
error = "料号和名称不能为空"
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
search_query=search_query_post,
)
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
error = "数量必须是大于等于 0 的整数"
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
search_query=search_query_post,
)
old_enabled_qty = 0
if component is not None and component.is_enabled:
old_enabled_qty = int(component.quantity)
if component is None:
component = Component(box_id=box.id, slot_index=slot)
db.session.add(component)
component.part_no = part_no
component.name = name
component.specification = specification or None
component.quantity = quantity
component.note = note or None
if component.is_enabled is None:
component.is_enabled = True
new_enabled_qty = quantity if component.is_enabled else 0
delta = int(new_enabled_qty - old_enabled_qty)
if delta:
log_inventory_event(
event_type="component_save",
delta=delta,
box=box,
component=component,
part_no=part_no,
)
db.session.commit()
if search_query_effective:
return redirect(url_for("search_page", q=search_query_effective))
return redirect(url_for("view_box", box_id=box.id))
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
search_query=search_query,
)
@app.route("/search")
def search_page():
keyword = request.args.get("q", "").strip()
notice = request.args.get("notice", "").strip()
error = request.args.get("error", "").strip()
results = []
if keyword:
raw_results = (
Component.query.join(Box, Box.id == Component.box_id)
.filter(
Component.is_enabled.is_(True),
db.or_(
Component.part_no.ilike(f"%{keyword}%"),
Component.name.ilike(f"%{keyword}%"),
),
)
.order_by(Component.part_no.asc(), Component.name.asc())
.all()
)
for c in raw_results:
box = Box.query.get(c.box_id)
results.append(
{
"component": c,
"box_name": box.name if box else f"{c.box_id}",
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
"edit_url": url_for("edit_component", box_id=c.box_id, slot=c.slot_index, q=keyword),
}
)
return render_template(
"search.html",
keyword=keyword,
results=results,
notice=notice,
error=error,
)
@app.route("/component/<int:component_id>/outbound", methods=["POST"])
def quick_outbound(component_id: int):
component = Component.query.get_or_404(component_id)
keyword = request.form.get("q", "").strip()
try:
amount = _parse_non_negative_int(request.form.get("amount", "0"), 0)
except ValueError:
return redirect(url_for("search_page", q=keyword, error="出库数量必须是大于等于 0 的整数"))
if amount <= 0:
return redirect(url_for("search_page", q=keyword, error="出库数量必须大于 0"))
if not component.is_enabled:
return redirect(url_for("search_page", q=keyword, error="该元件已停用,不能出库"))
if amount > int(component.quantity or 0):
return redirect(url_for("search_page", q=keyword, error="出库数量超过当前库存"))
component.quantity = int(component.quantity or 0) - amount
box = Box.query.get(component.box_id)
log_inventory_event(
event_type="component_outbound",
delta=-amount,
box=box,
component=component,
part_no=component.part_no,
)
db.session.commit()
slot_code = slot_code_for_box(box, component.slot_index) if box else str(component.slot_index)
notice = f"出库成功: {component.part_no} -{amount}{slot_code}"
return redirect(url_for("search_page", q=keyword, notice=notice))
@app.route("/stats")
def stats_page():
days = parse_days_value(request.args.get("days", "7"))
box_type_filter = parse_box_type_filter(request.args.get("box_type", "all"))
notice = request.args.get("notice", "").strip()
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
components = Component.query.all()
all_enabled_components = [c for c in components if c.is_enabled]
overall_total_quantity = sum(c.quantity for c in all_enabled_components)
enabled_components = [
c
for c in components
if c.is_enabled and (
box_type_filter == "all"
or (box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type_filter)
)
]
total_quantity = sum(c.quantity for c in enabled_components)
total_items = len(enabled_components)
low_stock_count = len([c for c in enabled_components if c.quantity < LOW_STOCK_THRESHOLD])
inventory_share = (
round(total_quantity * 100 / overall_total_quantity, 1) if overall_total_quantity > 0 else 0.0
)
start_day = datetime.now().date() - timedelta(days=days - 1)
event_query = InventoryEvent.query.filter(
InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time())
)
if box_type_filter != "all":
event_query = event_query.filter_by(box_type=box_type_filter)
period_operation_count = event_query.count()
active_days = len(
{
_to_date(row[0])
for row in event_query.with_entities(db.func.date(InventoryEvent.created_at)).all()
if row and row[0]
}
)
totals_by_type = {}
for box_type in BOX_TYPES.keys():
totals_by_type[box_type] = sum(
c.quantity
for c in components
if c.is_enabled and box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type
)
category_stats = []
for key, meta in BOX_TYPES.items():
category_components = [
c
for c in enabled_components
if box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == key
]
category_stats.append(
{
"key": key,
"label": meta["label"],
"item_count": len(category_components),
"quantity": sum(c.quantity for c in category_components),
}
)
category_stats.sort(key=lambda row: row["quantity"], reverse=True)
max_category_quantity = max([row["quantity"] for row in category_stats], default=0)
chart_mode = "category"
chart_title = "分类占比"
chart_rows = category_stats
max_chart_quantity = max_category_quantity
if box_type_filter != "all":
chart_mode = "component"
chart_title = "分类内元件库存 Top"
bucket = {}
for c in enabled_components:
key = (c.part_no or "", c.name or "")
if key not in bucket:
bucket[key] = 0
bucket[key] += int(c.quantity or 0)
component_rows = []
for (part_no, name), quantity in bucket.items():
label = name or part_no or "未命名元件"
if part_no:
label = f"{label} ({part_no})"
component_rows.append({"label": label, "quantity": quantity, "item_count": 1})
component_rows.sort(key=lambda row: row["quantity"], reverse=True)
chart_rows = component_rows[:8]
max_chart_quantity = max([row["quantity"] for row in chart_rows], default=0)
trend_points = build_trend_points_from_events(
days=days,
total_quantity=total_quantity,
box_type_filter=box_type_filter,
)
period_net_change = 0
if len(trend_points) >= 2:
period_net_change = trend_points[-1]["value"] - trend_points[0]["value"]
min_value = min([point["value"] for point in trend_points], default=0)
max_value = max([point["value"] for point in trend_points], default=0)
value_span = max(max_value - min_value, 1)
svg_points = []
if trend_points:
width = 520
height = 180
step_x = width / max(len(trend_points) - 1, 1)
for idx, point in enumerate(trend_points):
x = idx * step_x
y = height - ((point["value"] - min_value) / value_span) * height
svg_points.append(f"{x:.2f},{y:.2f}")
box_type_series_raw = build_box_type_trend_series(days=days, totals_by_type=totals_by_type)
box_type_series = []
for key, meta in BOX_TYPES.items():
values = box_type_series_raw["series"].get(key, [])
delta = values[-1] - values[0] if len(values) >= 2 else 0
box_type_series.append(
{
"key": key,
"label": meta["label"],
"sparkline": make_sparkline(values),
"latest": values[-1] if values else 0,
"delta": delta,
}
)
activity_rows = recent_events(limit=20, box_type_filter=box_type_filter)
return render_template(
"stats.html",
notice=notice,
days=days,
box_type_filter=box_type_filter,
box_types=BOX_TYPES,
total_quantity=total_quantity,
total_items=total_items,
low_stock_count=low_stock_count,
category_stats=category_stats,
max_category_quantity=max_category_quantity,
chart_mode=chart_mode,
chart_title=chart_title,
chart_rows=chart_rows,
max_chart_quantity=max_chart_quantity,
trend_points=trend_points,
trend_polyline=" ".join(svg_points),
min_value=min_value,
max_value=max_value,
period_net_change=period_net_change,
overall_total_quantity=overall_total_quantity,
inventory_share=inventory_share,
period_operation_count=period_operation_count,
active_days=active_days,
box_type_series=box_type_series,
activity_rows=activity_rows,
)
@app.route("/stats/export")
def stats_export_csv():
days = parse_days_value(request.args.get("days", "7"))
box_type_filter = parse_box_type_filter(request.args.get("box_type", "all"))
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
components = Component.query.all()
enabled_components = [
c
for c in components
if c.is_enabled and (
box_type_filter == "all"
or (box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type_filter)
)
]
total_quantity = sum(c.quantity for c in enabled_components)
trend_points = build_trend_points_from_events(days, total_quantity, box_type_filter)
delta_by_day = query_event_daily_delta(days, box_type_filter)
output = StringIO()
writer = csv.writer(output)
writer.writerow(["date", "inventory_total", "daily_delta", "days", "box_type_filter"])
for point in trend_points:
writer.writerow(
[
point["date"].isoformat(),
point["value"],
int(delta_by_day.get(point["date"], 0)),
days,
box_type_filter,
]
)
csv_content = output.getvalue()
output.close()
filename = f"inventory_stats_{box_type_filter}_{days}d.csv"
return Response(
csv_content,
mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@app.route("/stats/clear", methods=["POST"])
def clear_stats_logs():
days = parse_days_value(request.form.get("days", "7"))
box_type_filter = parse_box_type_filter(request.form.get("box_type", "all"))
clear_scope = (request.form.get("scope", "current") or "current").strip()
if clear_scope == "all":
deleted_count = InventoryEvent.query.delete()
db.session.commit()
notice = f"已清除全部统计日志,共 {deleted_count}"
return redirect(url_for("stats_page", days=days, box_type="all", notice=notice))
query = InventoryEvent.query
if box_type_filter != "all":
query = query.filter_by(box_type=box_type_filter)
deleted_count = query.delete(synchronize_session=False)
db.session.commit()
if box_type_filter == "all":
notice = f"已清除当前筛选(全部分类)统计日志,共 {deleted_count}"
else:
label = BOX_TYPES.get(box_type_filter, {}).get("label", box_type_filter)
notice = f"已清除当前筛选({label})统计日志,共 {deleted_count}"
return redirect(url_for("stats_page", days=days, box_type=box_type_filter, notice=notice))
def bootstrap() -> None:
with app.app_context():
db.create_all()
ensure_schema()
normalize_legacy_data()
bootstrap()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)