Files
inventory/app.py

792 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
from flask import Flask, redirect, render_template, request, url_for
from flask_sqlalchemy import SQLAlchemy
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_DIR = os.path.join(BASE_DIR, "data")
os.makedirs(DB_DIR, exist_ok=True)
DB_PATH = os.path.join(DB_DIR, "inventory.db")
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{DB_PATH}"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
LOW_STOCK_THRESHOLD = 5
BOX_TYPES = {
"small_28": {
"label": "28格小盒大盒",
"default_capacity": 28,
"default_desc": "4连排小盒常见摆放为竖向7排",
"default_prefix": "A",
},
"medium_14": {
"label": "14格中盒大盒",
"default_capacity": 14,
"default_desc": "14格中盒内部摆放方向与28格不同",
"default_prefix": "B",
},
"bag": {
"label": "袋装清单",
"default_capacity": 1,
"default_desc": "用于较小防静电袋存放",
"default_prefix": "BAG",
},
}
class Box(db.Model):
__tablename__ = "boxes"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False, unique=True)
description = db.Column(db.String(255), nullable=True)
box_type = db.Column(db.String(30), nullable=False, default="small_28")
slot_capacity = db.Column(db.Integer, nullable=False, default=28)
slot_prefix = db.Column(db.String(16), nullable=False, default="A")
start_number = db.Column(db.Integer, nullable=False, default=1)
class Component(db.Model):
__tablename__ = "components"
id = db.Column(db.Integer, primary_key=True)
box_id = db.Column(db.Integer, db.ForeignKey("boxes.id"), nullable=False)
slot_index = db.Column(db.Integer, nullable=False)
part_no = db.Column(db.String(100), nullable=False)
name = db.Column(db.String(120), nullable=False)
specification = db.Column(db.String(120), nullable=True)
quantity = db.Column(db.Integer, nullable=False, default=0)
location = db.Column(db.String(120), nullable=True)
note = db.Column(db.Text, nullable=True)
is_enabled = db.Column(db.Boolean, nullable=False, default=True)
box = db.relationship("Box", backref=db.backref("components", lazy=True))
__table_args__ = (
db.UniqueConstraint("box_id", "slot_index", name="uq_box_slot"),
)
def _add_column_if_missing(table_name: str, column_name: str, ddl: str) -> None:
columns = {
row[1]
for row in db.session.execute(db.text(f"PRAGMA table_info({table_name})")).fetchall()
}
if column_name not in columns:
db.session.execute(db.text(f"ALTER TABLE {table_name} ADD COLUMN {ddl}"))
def ensure_schema() -> None:
_add_column_if_missing(
"boxes",
"box_type",
"box_type VARCHAR(30) NOT NULL DEFAULT 'small_28'",
)
_add_column_if_missing(
"boxes",
"slot_capacity",
"slot_capacity INTEGER NOT NULL DEFAULT 28",
)
_add_column_if_missing(
"boxes",
"slot_prefix",
"slot_prefix VARCHAR(16) NOT NULL DEFAULT 'A'",
)
_add_column_if_missing(
"boxes",
"start_number",
"start_number INTEGER NOT NULL DEFAULT 1",
)
_add_column_if_missing(
"components",
"is_enabled",
"is_enabled BOOLEAN NOT NULL DEFAULT 1",
)
db.session.commit()
def slot_code_for_box(box: Box, slot_index: int) -> str:
serial = box.start_number + slot_index - 1
return f"{box.slot_prefix}{serial}"
def slot_range_label(box: Box) -> str:
start_code = slot_code_for_box(box, 1)
end_code = slot_code_for_box(box, box.slot_capacity)
return f"{start_code}-{end_code}"
def compose_box_name(base_name: str, prefix: str, start_number: int, slot_capacity: int) -> str:
base = (base_name or "").strip()
if not base:
base = "盒子"
end_number = start_number + slot_capacity - 1
return f"{base} {prefix}{start_number}-{prefix}{end_number}"
def make_unique_box_name(candidate_name: str, exclude_box_id: int = None) -> str:
name = candidate_name
counter = 2
while True:
query = Box.query.filter_by(name=name)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
if not query.first():
return name
name = f"{candidate_name} #{counter}"
counter += 1
def infer_base_name(box: Box) -> str:
pattern = rf"\s+{re.escape(box.slot_prefix)}\d+-{re.escape(box.slot_prefix)}\d+(?:\s+#\d+)?$"
base = re.sub(pattern, "", box.name).strip()
return base or box.name
def slot_data_for_box(box: Box):
components = Component.query.filter_by(box_id=box.id).all()
slot_map = {c.slot_index: c for c in components}
slots = []
for slot in range(1, box.slot_capacity + 1):
slots.append(
{
"slot": slot,
"slot_code": slot_code_for_box(box, slot),
"component": slot_map.get(slot),
}
)
return slots
def bag_rows_for_box(box: Box):
rows = []
components = (
Component.query.filter_by(box_id=box.id)
.order_by(Component.slot_index.asc())
.all()
)
for c in components:
rows.append({"component": c, "slot_code": slot_code_for_box(box, c.slot_index)})
return rows
def _parse_non_negative_int(raw_value: str, default_value: int = 0) -> int:
raw = (raw_value or "").strip()
if raw == "":
return default_value
value = int(raw)
if value < 0:
raise ValueError
return value
def normalize_legacy_data() -> None:
db.session.execute(
db.text(
"UPDATE boxes SET box_type = 'small_28' WHERE box_type IS NULL OR box_type = ''"
)
)
db.session.execute(
db.text("UPDATE boxes SET slot_capacity = 28 WHERE slot_capacity IS NULL")
)
db.session.execute(
db.text("UPDATE boxes SET slot_prefix = 'A' WHERE slot_prefix IS NULL OR slot_prefix = ''")
)
db.session.execute(
db.text("UPDATE boxes SET start_number = 1 WHERE start_number IS NULL")
)
db.session.execute(
db.text("UPDATE components SET is_enabled = 1 WHERE is_enabled IS NULL")
)
for box in Box.query.all():
if box.box_type not in BOX_TYPES:
box.box_type = "small_28"
if not box.slot_capacity or box.slot_capacity < 1:
box.slot_capacity = BOX_TYPES[box.box_type]["default_capacity"]
if not box.slot_prefix:
box.slot_prefix = BOX_TYPES[box.box_type]["default_prefix"]
if box.start_number is None or box.start_number < 0:
box.start_number = 1
db.session.commit()
def make_overview_rows(box: Box):
enabled_components = (
Component.query.filter_by(box_id=box.id, is_enabled=True)
.order_by(Component.slot_index.asc())
.all()
)
rows = []
for c in enabled_components:
rows.append(
{
"slot_code": slot_code_for_box(box, c.slot_index),
"name": c.name,
"part_no": c.part_no,
}
)
return rows
def box_sort_key(box: Box):
return (
(box.slot_prefix or "").upper(),
box.start_number if box.start_number is not None else 0,
box.name or "",
)
def has_range_conflict(
*,
box_type: str,
prefix: str,
start_number: int,
slot_capacity: int,
exclude_box_id: int = None,
):
end_number = start_number + slot_capacity - 1
query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
for other in query.all():
other_start = other.start_number
other_end = other.start_number + other.slot_capacity - 1
# Two ranges overlap unless one is strictly before the other.
if not (end_number < other_start or start_number > other_end):
return True, other
return False, None
def suggest_next_start_number(
*,
box_type: str,
prefix: str,
slot_capacity: int,
exclude_box_id: int = None,
) -> int:
max_end = 0
query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
for other in query.all():
other_end = other.start_number + other.slot_capacity - 1
if other_end > max_end:
max_end = other_end
return max_end + 1 if max_end > 0 else 1
def build_index_anchor(box_type: str = "") -> str:
if box_type in BOX_TYPES:
return f"group-{box_type}"
return ""
def bad_request(message: str, box_type: str = ""):
anchor = build_index_anchor(box_type)
if anchor:
back_url = f"{url_for('index')}#{anchor}"
else:
back_url = request.referrer or url_for("index")
return (
render_template(
"error.html",
status_code=400,
title="请求参数有误",
message=message,
back_url=back_url,
),
400,
)
def render_box_page(box: Box, error: str = "", notice: str = ""):
slots = slot_data_for_box(box)
bag_rows = bag_rows_for_box(box) if box.box_type == "bag" else []
return render_template(
"box.html",
box=box,
slots=slots,
bag_rows=bag_rows,
box_types=BOX_TYPES,
slot_range=slot_range_label(box),
low_stock_threshold=LOW_STOCK_THRESHOLD,
error=error,
notice=notice,
)
@app.route("/")
def index():
boxes = Box.query.all()
boxes.sort(key=box_sort_key)
groups = {key: [] for key in BOX_TYPES.keys()}
for box in boxes:
box_type = box.box_type if box.box_type in BOX_TYPES else "small_28"
overview_rows = make_overview_rows(box)
groups[box_type].append(
{
"box": box,
"used_count": len(overview_rows),
"slot_range": slot_range_label(box),
"overview_rows": overview_rows,
"base_name": infer_base_name(box),
}
)
return render_template("index.html", groups=groups, box_types=BOX_TYPES)
@app.route("/boxes/create", methods=["POST"])
def create_box():
box_type = request.form.get("box_type", "small_28").strip()
base_name = request.form.get("name", "").strip()
description = request.form.get("description", "").strip()
slot_prefix = request.form.get("slot_prefix", "").strip().upper()
if box_type not in BOX_TYPES:
return bad_request("无效盒子类型", box_type)
if not base_name:
return bad_request("盒子名称不能为空", box_type)
try:
start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1)
except ValueError:
return bad_request("起始序号必须是大于等于 0 的整数", box_type)
meta = BOX_TYPES[box_type]
effective_prefix = slot_prefix or meta["default_prefix"]
conflict, other_box = has_range_conflict(
box_type=box_type,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=meta["default_capacity"],
)
if conflict:
return bad_request(
"编号范围冲突: 与现有盒子 "
f"[{other_box.name}]"
" 重叠,请更换前缀或起始序号",
box_type,
)
generated_name = compose_box_name(
base_name=base_name,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=meta["default_capacity"],
)
final_name = make_unique_box_name(generated_name)
box = Box(
name=final_name,
description=description or meta["default_desc"],
box_type=box_type,
slot_capacity=meta["default_capacity"],
slot_prefix=effective_prefix,
start_number=start_number,
)
db.session.add(box)
db.session.commit()
return redirect(url_for("index"))
@app.route("/boxes/<int:box_id>/update", methods=["POST"])
def update_box(box_id: int):
box = Box.query.get_or_404(box_id)
base_name = request.form.get("name", "").strip()
description = request.form.get("description", "").strip()
slot_prefix = request.form.get("slot_prefix", "").strip().upper()
if not base_name:
return bad_request("盒子名称不能为空", box.box_type)
try:
start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1)
except ValueError:
return bad_request("起始序号必须是大于等于 0 的整数", box.box_type)
effective_prefix = slot_prefix or BOX_TYPES[box.box_type]["default_prefix"]
conflict, other_box = has_range_conflict(
box_type=box.box_type,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=box.slot_capacity,
exclude_box_id=box.id,
)
if conflict:
return bad_request(
"编号范围冲突: 与现有盒子 "
f"[{other_box.name}]"
" 重叠,请更换前缀或起始序号",
box.box_type,
)
generated_name = compose_box_name(
base_name=base_name,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=box.slot_capacity,
)
box.name = make_unique_box_name(generated_name, exclude_box_id=box.id)
box.description = description or BOX_TYPES[box.box_type]["default_desc"]
box.slot_prefix = effective_prefix
box.start_number = start_number
db.session.commit()
return redirect(url_for("index"))
@app.route("/boxes/<int:box_id>/delete", methods=["POST"])
def delete_box(box_id: int):
box = Box.query.get_or_404(box_id)
Component.query.filter_by(box_id=box.id).delete()
db.session.delete(box)
db.session.commit()
return redirect(url_for("index"))
@app.route("/boxes/suggest-start")
def suggest_start():
box_type = request.args.get("box_type", "small_28").strip()
if box_type not in BOX_TYPES:
return {"ok": False, "message": "无效盒子类型"}, 400
slot_prefix = request.args.get("slot_prefix", "").strip().upper()
effective_prefix = slot_prefix or BOX_TYPES[box_type]["default_prefix"]
box_id_raw = request.args.get("box_id", "").strip()
exclude_box_id = None
slot_capacity = BOX_TYPES[box_type]["default_capacity"]
if box_id_raw:
try:
box_id = int(box_id_raw)
except ValueError:
return {"ok": False, "message": "box_id 非法"}, 400
box = Box.query.get(box_id)
if not box:
return {"ok": False, "message": "盒子不存在"}, 404
box_type = box.box_type
slot_capacity = box.slot_capacity
exclude_box_id = box.id
suggested = suggest_next_start_number(
box_type=box_type,
prefix=effective_prefix,
slot_capacity=slot_capacity,
exclude_box_id=exclude_box_id,
)
end_number = suggested + slot_capacity - 1
return {
"ok": True,
"start_number": suggested,
"slot_prefix": effective_prefix,
"preview_range": f"{effective_prefix}{suggested}-{effective_prefix}{end_number}",
}
@app.route("/box/<int:box_id>")
def view_box(box_id: int):
box = Box.query.get_or_404(box_id)
return render_box_page(box)
@app.route("/box/<int:box_id>/bags/add", methods=["POST"])
def add_bag_item(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type != "bag":
return "当前盒子不是袋装清单", 400
part_no = request.form.get("part_no", "").strip()
name = request.form.get("name", "").strip()
specification = request.form.get("specification", "").strip()
location = request.form.get("location", "").strip()
note = request.form.get("note", "").strip()
if not part_no or not name:
return render_box_page(box, error="袋装新增失败: 料号和名称不能为空")
try:
quantity = _parse_non_negative_int(request.form.get("quantity", "0"), 0)
except ValueError:
return render_box_page(box, error="袋装新增失败: 数量必须是大于等于 0 的整数")
next_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter(Component.box_id == box.id)
.scalar()
or 0
) + 1
item = Component(
box_id=box.id,
slot_index=next_slot,
part_no=part_no,
name=name,
quantity=quantity,
specification=specification or None,
location=location or None,
note=note or None,
is_enabled=True,
)
db.session.add(item)
if next_slot > box.slot_capacity:
box.slot_capacity = next_slot
db.session.commit()
return render_box_page(box, notice="已新增 1 条袋装记录")
@app.route("/box/<int:box_id>/bags/batch", methods=["POST"])
def add_bag_items_batch(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type != "bag":
return "当前盒子不是袋装清单", 400
raw_lines = request.form.get("lines", "")
lines = [line.strip() for line in raw_lines.splitlines() if line.strip()]
if not lines:
return render_box_page(box, error="批量新增失败: 请至少输入一行")
invalid_lines = []
added_count = 0
next_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter(Component.box_id == box.id)
.scalar()
or 0
) + 1
for line_no, line in enumerate(lines, start=1):
parts = [p.strip() for p in re.split(r"[,\t]", line)]
while len(parts) < 6:
parts.append("")
part_no = parts[0]
name = parts[1]
quantity_raw = parts[2]
specification = parts[3]
location = parts[4]
note = parts[5]
if not part_no or not name:
invalid_lines.append(f"{line_no}")
continue
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
invalid_lines.append(f"{line_no}")
continue
db.session.add(
Component(
box_id=box.id,
slot_index=next_slot,
part_no=part_no,
name=name,
quantity=quantity,
specification=specification or None,
location=location or None,
note=note or None,
is_enabled=True,
)
)
added_count += 1
next_slot += 1
if added_count:
box.slot_capacity = max(box.slot_capacity, next_slot - 1)
db.session.commit()
if invalid_lines and added_count == 0:
return render_box_page(
box,
error="批量新增失败: 数据格式不正确(请检查: " + ", ".join(invalid_lines) + "",
)
if invalid_lines:
return render_box_page(
box,
notice=f"已新增 {added_count} 条,以下行被跳过: " + ", ".join(invalid_lines),
)
return render_box_page(box, notice=f"批量新增成功,共 {added_count}")
@app.route("/edit/<int:box_id>/<int:slot>", methods=["GET", "POST"])
def edit_component(box_id: int, slot: int):
box = Box.query.get_or_404(box_id)
if slot < 1 or slot > box.slot_capacity:
return "无效的格子编号", 400
component = Component.query.filter_by(box_id=box.id, slot_index=slot).first()
if request.method == "POST":
action = request.form.get("action", "save")
if action == "delete":
if component:
db.session.delete(component)
db.session.commit()
return redirect(url_for("view_box", box_id=box.id))
if action == "toggle_enable":
if component:
component.is_enabled = True
db.session.commit()
return redirect(url_for("edit_component", box_id=box.id, slot=slot))
if action == "toggle_disable":
if component:
component.is_enabled = False
db.session.commit()
return redirect(url_for("edit_component", box_id=box.id, slot=slot))
part_no = request.form.get("part_no", "").strip()
name = request.form.get("name", "").strip()
specification = request.form.get("specification", "").strip()
quantity_raw = request.form.get("quantity", "0").strip()
location = request.form.get("location", "").strip()
note = request.form.get("note", "").strip()
is_enabled = request.form.get("is_enabled") == "1"
if not part_no or not name:
error = "料号和名称不能为空"
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
)
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
error = "数量必须是大于等于 0 的整数"
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
)
if component is None:
component = Component(box_id=box.id, slot_index=slot)
db.session.add(component)
component.part_no = part_no
component.name = name
component.specification = specification or None
component.quantity = quantity
component.location = location or None
component.note = note or None
component.is_enabled = is_enabled
db.session.commit()
return redirect(url_for("view_box", box_id=box.id))
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
)
@app.route("/scan")
def scan_page():
keyword = request.args.get("q", "").strip()
results = []
if keyword:
raw_results = (
Component.query.join(Box, Box.id == Component.box_id)
.filter(
db.or_(
Component.part_no.ilike(f"%{keyword}%"),
Component.name.ilike(f"%{keyword}%"),
)
)
.order_by(Component.part_no.asc())
.all()
)
for c in raw_results:
box = Box.query.get(c.box_id)
results.append(
{
"component": c,
"box_name": box.name if box else f"{c.box_id}",
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
}
)
return render_template("scan.html", keyword=keyword, results=results)
@app.route("/api/scan")
def scan_api():
code = request.args.get("code", "").strip()
if not code:
return {"ok": False, "message": "code 不能为空"}, 400
item = Component.query.filter_by(part_no=code).first()
if not item:
return {"ok": False, "message": "未找到元件"}, 404
box = Box.query.get(item.box_id)
return {
"ok": True,
"data": {
"id": item.id,
"part_no": item.part_no,
"name": item.name,
"quantity": item.quantity,
"box_id": item.box_id,
"box_name": box.name if box else None,
"slot_index": item.slot_index,
"slot_code": slot_code_for_box(box, item.slot_index) if box else str(item.slot_index),
"is_enabled": bool(item.is_enabled),
},
}
def bootstrap() -> None:
with app.app_context():
db.create_all()
ensure_schema()
normalize_legacy_data()
bootstrap()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)