import os import re from flask import Flask, redirect, render_template, request, url_for from flask_sqlalchemy import SQLAlchemy BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DB_DIR = os.path.join(BASE_DIR, "data") os.makedirs(DB_DIR, exist_ok=True) DB_PATH = os.path.join(DB_DIR, "inventory.db") app = Flask(__name__) app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{DB_PATH}" app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False db = SQLAlchemy(app) BOX_TYPES = { "small_28": { "label": "28格小盒大盒", "default_capacity": 28, "default_desc": "4连排小盒,常见摆放为竖向7排", }, "medium_14": { "label": "14格中盒大盒", "default_capacity": 14, "default_desc": "装十四个中等盒子(尺寸不固定)", }, "bag": { "label": "袋装清单", "default_capacity": 1, "default_desc": "用于较小防静电袋存放", }, } class Box(db.Model): __tablename__ = "boxes" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False, unique=True) description = db.Column(db.String(255), nullable=True) box_type = db.Column(db.String(30), nullable=False, default="small_28") slot_capacity = db.Column(db.Integer, nullable=False, default=28) class Component(db.Model): __tablename__ = "components" id = db.Column(db.Integer, primary_key=True) box_id = db.Column(db.Integer, db.ForeignKey("boxes.id"), nullable=False) slot_index = db.Column(db.Integer, nullable=False) part_no = db.Column(db.String(100), nullable=False) name = db.Column(db.String(120), nullable=False) specification = db.Column(db.String(120), nullable=True) quantity = db.Column(db.Integer, nullable=False, default=0) location = db.Column(db.String(120), nullable=True) note = db.Column(db.Text, nullable=True) box = db.relationship("Box", backref=db.backref("components", lazy=True)) __table_args__ = ( db.UniqueConstraint("box_id", "slot_index", name="uq_box_slot"), ) def ensure_default_box() -> None: defaults = [ ("默认28格大盒", "small_28"), ("默认14格中盒", "medium_14"), ("默认袋装清单", "bag"), ] for box_name, box_type in defaults: exists = Box.query.filter_by(name=box_name).first() if exists: continue meta = BOX_TYPES[box_type] db.session.add( Box( name=box_name, description=meta["default_desc"], box_type=box_type, slot_capacity=meta["default_capacity"], ) ) db.session.commit() def slot_data_for_box(box: Box): components = Component.query.filter_by(box_id=box.id).all() slot_map = {c.slot_index: c for c in components} slots = [] for slot in range(1, box.slot_capacity + 1): slots.append({"slot": slot, "component": slot_map.get(slot)}) return slots def bag_items_for_box(box: Box): return ( Component.query.filter_by(box_id=box.id) .order_by(Component.slot_index.asc()) .all() ) def _parse_non_negative_int(raw_value: str, default_value: int = 0) -> int: raw = (raw_value or "").strip() if raw == "": return default_value value = int(raw) if value < 0: raise ValueError return value def _add_bag_item( box: Box, part_no: str, name: str, quantity: int, specification: str, location: str, note: str, ) -> None: next_slot = ( db.session.query(db.func.max(Component.slot_index)) .filter(Component.box_id == box.id) .scalar() or 0 ) + 1 item = Component( box_id=box.id, slot_index=next_slot, part_no=part_no, name=name, quantity=quantity, specification=specification or None, location=location or None, note=note or None, ) db.session.add(item) if next_slot > box.slot_capacity: box.slot_capacity = next_slot def render_box_page(box: Box, error: str = "", notice: str = ""): slots = slot_data_for_box(box) bag_items = bag_items_for_box(box) if box.box_type == "bag" else [] return render_template( "box.html", box=box, slots=slots, bag_items=bag_items, box_types=BOX_TYPES, error=error, notice=notice, ) def ensure_box_schema() -> None: columns = { row[1] for row in db.session.execute(db.text("PRAGMA table_info(boxes)")).fetchall() } if "box_type" not in columns: db.session.execute( db.text( "ALTER TABLE boxes ADD COLUMN box_type VARCHAR(30) NOT NULL DEFAULT 'small_28'" ) ) if "slot_capacity" not in columns: db.session.execute( db.text( "ALTER TABLE boxes ADD COLUMN slot_capacity INTEGER NOT NULL DEFAULT 28" ) ) db.session.commit() @app.route("/") def index(): boxes = Box.query.order_by(Box.id.asc()).all() groups = {key: [] for key in BOX_TYPES.keys()} for box in boxes: used_count = Component.query.filter_by(box_id=box.id).count() box_type = box.box_type if box.box_type in BOX_TYPES else "small_28" groups[box_type].append({"box": box, "used_count": used_count}) return render_template("index.html", groups=groups, box_types=BOX_TYPES) @app.route("/boxes/create", methods=["POST"]) def create_box(): box_type = request.form.get("box_type", "small_28").strip() name = request.form.get("name", "").strip() description = request.form.get("description", "").strip() if box_type not in BOX_TYPES: return "无效盒子类型", 400 if not name: return "盒子名称不能为空", 400 if Box.query.filter_by(name=name).first(): return "盒子名称已存在,请更换", 400 meta = BOX_TYPES[box_type] box = Box( name=name, description=description or meta["default_desc"], box_type=box_type, slot_capacity=meta["default_capacity"], ) db.session.add(box) db.session.commit() return redirect(url_for("index")) @app.route("/box/") def view_box(box_id: int): box = Box.query.get_or_404(box_id) return render_box_page(box) @app.route("/box//bags/add", methods=["POST"]) def add_bag_item(box_id: int): box = Box.query.get_or_404(box_id) if box.box_type != "bag": return "当前盒子不是袋装清单", 400 part_no = request.form.get("part_no", "").strip() name = request.form.get("name", "").strip() specification = request.form.get("specification", "").strip() location = request.form.get("location", "").strip() note = request.form.get("note", "").strip() if not part_no or not name: return render_box_page(box, error="袋装新增失败: 料号和名称不能为空") try: quantity = _parse_non_negative_int(request.form.get("quantity", "0"), 0) except ValueError: return render_box_page(box, error="袋装新增失败: 数量必须是大于等于 0 的整数") _add_bag_item(box, part_no, name, quantity, specification, location, note) db.session.commit() return render_box_page(box, notice="已新增 1 条袋装记录") @app.route("/box//bags/batch", methods=["POST"]) def add_bag_items_batch(box_id: int): box = Box.query.get_or_404(box_id) if box.box_type != "bag": return "当前盒子不是袋装清单", 400 raw_lines = request.form.get("lines", "") lines = [line.strip() for line in raw_lines.splitlines() if line.strip()] if not lines: return render_box_page(box, error="批量新增失败: 请至少输入一行") invalid_lines = [] added_count = 0 for line_no, line in enumerate(lines, start=1): parts = [p.strip() for p in re.split(r"[,\t]", line)] while len(parts) < 6: parts.append("") part_no = parts[0] name = parts[1] quantity_raw = parts[2] specification = parts[3] location = parts[4] note = parts[5] if not part_no or not name: invalid_lines.append(f"第 {line_no} 行") continue try: quantity = _parse_non_negative_int(quantity_raw, 0) except ValueError: invalid_lines.append(f"第 {line_no} 行") continue _add_bag_item(box, part_no, name, quantity, specification, location, note) added_count += 1 if added_count: db.session.commit() if invalid_lines and added_count == 0: return render_box_page( box, error="批量新增失败: 数据格式不正确(请检查: " + ", ".join(invalid_lines) + ")", ) if invalid_lines: return render_box_page( box, notice=f"已新增 {added_count} 条,以下行被跳过: " + ", ".join(invalid_lines), ) return render_box_page(box, notice=f"批量新增成功,共 {added_count} 条") @app.route("/edit//", methods=["GET", "POST"]) def edit_component(box_id: int, slot: int): box = Box.query.get_or_404(box_id) if slot < 1 or slot > box.slot_capacity: return "无效的格子编号", 400 component = Component.query.filter_by(box_id=box.id, slot_index=slot).first() if request.method == "POST": action = request.form.get("action", "save") if action == "delete": if component: db.session.delete(component) db.session.commit() return redirect(url_for("view_box", box_id=box.id)) part_no = request.form.get("part_no", "").strip() name = request.form.get("name", "").strip() specification = request.form.get("specification", "").strip() quantity_raw = request.form.get("quantity", "0").strip() location = request.form.get("location", "").strip() note = request.form.get("note", "").strip() if not part_no or not name: error = "料号和名称不能为空" return render_template( "edit.html", box=box, slot=slot, component=component, error=error, ) try: quantity = int(quantity_raw) if quantity < 0: raise ValueError except ValueError: error = "数量必须是大于等于 0 的整数" return render_template( "edit.html", box=box, slot=slot, component=component, error=error, ) if component is None: component = Component(box_id=box.id, slot_index=slot) db.session.add(component) component.part_no = part_no component.name = name component.specification = specification or None component.quantity = quantity component.location = location or None component.note = note or None db.session.commit() return redirect(url_for("view_box", box_id=box.id)) return render_template("edit.html", box=box, slot=slot, component=component) @app.route("/scan") def scan_page(): keyword = request.args.get("q", "").strip() results = [] if keyword: results = ( Component.query.filter( db.or_( Component.part_no.ilike(f"%{keyword}%"), Component.name.ilike(f"%{keyword}%"), ) ) .order_by(Component.part_no.asc()) .all() ) return render_template("scan.html", keyword=keyword, results=results) @app.route("/api/scan") def scan_api(): code = request.args.get("code", "").strip() if not code: return {"ok": False, "message": "code 不能为空"}, 400 item = Component.query.filter_by(part_no=code).first() if not item: return {"ok": False, "message": "未找到元件"}, 404 return { "ok": True, "data": { "id": item.id, "part_no": item.part_no, "name": item.name, "quantity": item.quantity, "box_id": item.box_id, "slot_index": item.slot_index, }, } def bootstrap() -> None: with app.app_context(): db.create_all() ensure_box_schema() db.session.execute( db.text( "UPDATE boxes SET box_type = 'small_28' WHERE box_type IS NULL OR box_type = ''" ) ) db.session.execute( db.text("UPDATE boxes SET slot_capacity = 28 WHERE slot_capacity IS NULL") ) db.session.commit() ensure_default_box() bootstrap() if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)