import os import re import csv import json import hmac import base64 import random import string import hashlib import time import urllib.error import urllib.parse import urllib.request from copy import deepcopy from io import StringIO from datetime import datetime, timedelta from flask import Flask, Response, redirect, render_template, request, url_for from flask_sqlalchemy import SQLAlchemy BASE_DIR = os.path.dirname(os.path.abspath(__file__)) DB_DIR = os.path.join(BASE_DIR, "data") os.makedirs(DB_DIR, exist_ok=True) DB_PATH = os.path.join(DB_DIR, "inventory.db") app = Flask(__name__) app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{DB_PATH}" app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False db = SQLAlchemy(app) LOW_STOCK_THRESHOLD = 5 BOX_TYPES_OVERRIDE_PATH = os.path.join(DB_DIR, "box_types.json") AI_SETTINGS_PATH = os.path.join(DB_DIR, "ai_settings.json") LCSC_BASE_URL = "https://open-api.jlc.com" LCSC_BASIC_PATH = "/lcsc/openapi/sku/product/basic" AI_SETTINGS_DEFAULT = { "api_url": os.environ.get( "SILICONFLOW_API_URL", "https://api.siliconflow.cn/v1/chat/completions", ), "model": os.environ.get( "SILICONFLOW_MODEL", "Qwen/Qwen2.5-7B-Instruct", ), "api_key": os.environ.get("SILICONFLOW_API_KEY", ""), "timeout": int(os.environ.get("SILICONFLOW_TIMEOUT", "30") or "30"), "restock_threshold": LOW_STOCK_THRESHOLD, "restock_limit": 24, "lcsc_timeout": int(os.environ.get("LCSC_TIMEOUT", "20") or "20"), "lcsc_app_id": os.environ.get("LCSC_APP_ID", ""), "lcsc_access_key": os.environ.get("LCSC_ACCESS_KEY", ""), "lcsc_secret_key": os.environ.get("LCSC_SECRET_KEY", ""), "lock_storage_mode": False, } DEFAULT_BOX_TYPES = { "small_28": { "label": "28格小盒大盒", "default_capacity": 28, "default_desc": "4连排小盒,常见摆放为竖向7排", "default_prefix": "A", }, "medium_14": { "label": "14格中盒大盒", "default_capacity": 14, "default_desc": "14格中盒,内部摆放方向与28格不同", "default_prefix": "B", }, "custom": { "label": "自定义容器", "default_capacity": 20, "default_desc": "可按实际盒型设置格数与编号前缀", "default_prefix": "C", }, "bag": { "label": "袋装清单", "default_capacity": 28, "default_desc": "一袋一种器件,按清单管理", "default_prefix": "BAG", }, } BOX_TYPES = deepcopy(DEFAULT_BOX_TYPES) def _apply_box_type_overrides() -> None: if not os.path.exists(BOX_TYPES_OVERRIDE_PATH): return try: with open(BOX_TYPES_OVERRIDE_PATH, "r", encoding="utf-8") as f: overrides = json.load(f) except (OSError, json.JSONDecodeError): return if not isinstance(overrides, dict): return for key, value in overrides.items(): if key not in BOX_TYPES or not isinstance(value, dict): continue for field in ("label", "default_desc", "default_prefix"): if field not in value: continue BOX_TYPES[key][field] = value[field] # Keep bag container capacity fixed by domain rule. BOX_TYPES["bag"]["default_capacity"] = 28 def _save_box_type_overrides() -> None: payload = {} for key, defaults in DEFAULT_BOX_TYPES.items(): current = BOX_TYPES.get(key, defaults) changed = {} for field in ("label", "default_desc", "default_prefix"): if current.get(field) != defaults.get(field): changed[field] = current.get(field) if changed: payload[key] = changed with open(BOX_TYPES_OVERRIDE_PATH, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) _apply_box_type_overrides() def _load_ai_settings() -> dict: settings = dict(AI_SETTINGS_DEFAULT) if not os.path.exists(AI_SETTINGS_PATH): return settings try: with open(AI_SETTINGS_PATH, "r", encoding="utf-8") as f: saved = json.load(f) except (OSError, json.JSONDecodeError): return settings if not isinstance(saved, dict): return settings for key in settings.keys(): if key in saved: settings[key] = saved[key] return settings def _save_ai_settings(settings: dict) -> None: with open(AI_SETTINGS_PATH, "w", encoding="utf-8") as f: json.dump(settings, f, ensure_ascii=False, indent=2) def _get_ai_settings() -> dict: settings = _load_ai_settings() try: settings["timeout"] = max(5, int(settings.get("timeout", 30))) except (TypeError, ValueError): settings["timeout"] = 30 try: settings["restock_threshold"] = max(0, int(settings.get("restock_threshold", LOW_STOCK_THRESHOLD))) except (TypeError, ValueError): settings["restock_threshold"] = LOW_STOCK_THRESHOLD try: settings["restock_limit"] = max(1, int(settings.get("restock_limit", 24))) except (TypeError, ValueError): settings["restock_limit"] = 24 try: settings["lcsc_timeout"] = max(5, int(settings.get("lcsc_timeout", 20))) except (TypeError, ValueError): settings["lcsc_timeout"] = 20 settings["api_url"] = (settings.get("api_url") or "").strip() settings["model"] = (settings.get("model") or "").strip() settings["api_key"] = (settings.get("api_key") or "").strip() settings["lcsc_base_url"] = LCSC_BASE_URL settings["lcsc_basic_path"] = LCSC_BASIC_PATH settings["lcsc_app_id"] = (settings.get("lcsc_app_id") or "").strip() settings["lcsc_access_key"] = (settings.get("lcsc_access_key") or "").strip() settings["lcsc_secret_key"] = (settings.get("lcsc_secret_key") or "").strip() settings["lock_storage_mode"] = bool(settings.get("lock_storage_mode", False)) return settings def _generate_nonce(length: int = 32) -> str: alphabet = string.ascii_letters + string.digits return "".join(random.choice(alphabet) for _ in range(length)) def _generate_jop_signature(method: str, path: str, timestamp: str, nonce: str, post_data: str, secret_key: str) -> str: string_to_sign = f"{method}\n{path}\n{timestamp}\n{nonce}\n{post_data}\n" digest = hmac.new( secret_key.encode("utf-8"), string_to_sign.encode("utf-8"), hashlib.sha256, ).digest() return base64.b64encode(digest).decode("utf-8") def _extract_lcsc_product_id_from_input(raw_identifier: str) -> int | None: text = (raw_identifier or "").strip() if not text: return None # Accept full item detail URL and extract /23913.html. try: parsed = urllib.parse.urlparse(text) except ValueError: parsed = None if parsed and parsed.netloc: path = parsed.path or "" m = re.search(r"/(\d+)\.html$", path) if m: return int(m.group(1)) return None def _fetch_lcsc_product_basic(product_identifier: str, settings: dict) -> dict: raw_identifier = (product_identifier or "").strip() if not raw_identifier: raise RuntimeError("立创商品链接不能为空") product_id_from_input = _extract_lcsc_product_id_from_input(raw_identifier) if product_id_from_input is None: raise RuntimeError("请输入立创商品详情页链接,例如 https://item.szlcsc.com/23913.html") app_id = settings.get("lcsc_app_id", "").strip() access_key = settings.get("lcsc_access_key", "").strip() secret_key = settings.get("lcsc_secret_key", "").strip() if not app_id or not access_key or not secret_key: raise RuntimeError("立创 JOP 鉴权参数不完整,请填写 app_id/access_key/secret_key") timeout = int(settings.get("lcsc_timeout", 20)) def request_openapi(payload: dict) -> dict: post_data_str = json.dumps(payload, ensure_ascii=False) timestamp = str(int(time.time())) nonce = _generate_nonce(32) signature = _generate_jop_signature( "POST", LCSC_BASIC_PATH, timestamp, nonce, post_data_str, secret_key, ) headers = { "Content-Type": "application/json; utf-8", "Authorization": ( f'JOP appid="{app_id}",accesskey="{access_key}",' f'timestamp="{timestamp}",nonce="{nonce}",signature="{signature}"' ), } endpoint = LCSC_BASE_URL + LCSC_BASIC_PATH req = urllib.request.Request( endpoint, data=post_data_str.encode("utf-8"), method="POST", headers=headers, ) try: with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="ignore") raise RuntimeError(f"立创接口调用失败: HTTP {exc.code} {detail[:180]}") from exc except urllib.error.URLError as exc: raise RuntimeError(f"立创接口调用失败: 连接失败 {exc.reason}") from exc try: response = json.loads(raw) except json.JSONDecodeError as exc: raise RuntimeError("立创接口返回非 JSON 数据") from exc code = response.get("code") successful = response.get("successful", False) if code != 200 and not successful: message = str(response.get("message", "立创接口调用失败") or "立创接口调用失败") raise RuntimeError(f"立创接口调用失败: code={code}, message={message}") data = response.get("data") or {} items = data.get("productBasicInfoVOList") or [] if not items: raise RuntimeError("未查询到商品信息,请检查编号是否正确") return items[0] return request_openapi({"productId": product_id_from_input}) def _map_lcsc_product_to_component(product: dict) -> dict: product_model = str(product.get("productModel") or "").strip() product_code = str(product.get("productCode") or "").strip() product_id = product.get("productId") part_no = product_model or product_code or (str(product_id) if product_id is not None else "") name = str(product.get("productName") or "").strip() or part_no or "未命名元件" brand_name = str(product.get("brandName") or "").strip() encap_standard = str(product.get("encapStandard") or "").strip() catalog_name = str(product.get("catalogName") or "").strip() # Prefer concise, searchable spec fields. spec_parts = [brand_name, encap_standard, catalog_name] specification = " / ".join([p for p in spec_parts if p]) arrange_map = { "biandai": "编带", "bianpai": "编排", "daizhuang": "袋装", "guanzhuang": "管装", "hezhuang": "盒装", "juan": "卷装", "kun": "捆装", "tuopan": "托盘", "xiangzhuang": "箱装", } unit_map = { "pan": "圆盘", "bao": "包", "ben": "本", "dai": "袋", "guan": "管", "he": "盒", "juan": "卷", "kun": "捆", "mi": "米", "tuopan": "托盘", "xiang": "箱", } product_arrange_raw = str(product.get("productArrange") or "").strip().lower() product_arrange = arrange_map.get(product_arrange_raw, product_arrange_raw) min_packet_number = product.get("minPacketNumber") min_packet_unit_raw = str(product.get("minPacketUnit") or "").strip().lower() min_packet_unit = unit_map.get(min_packet_unit_raw, min_packet_unit_raw) note_bits = [] if product_code: note_bits.append(f"LCSC {product_code}") if product_id is not None: note_bits.append(f"ID {product_id}") if product_arrange: note_bits.append(f"编排 {product_arrange}") if min_packet_number: unit_suffix = min_packet_unit or "" note_bits.append(f"最小包装 {min_packet_number}{unit_suffix}") note = " | ".join(note_bits) return { "part_no": part_no, "name": name, "specification": specification, "note": note, } class Box(db.Model): __tablename__ = "boxes" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False, unique=True) description = db.Column(db.String(255), nullable=True) box_type = db.Column(db.String(30), nullable=False, default="small_28") slot_capacity = db.Column(db.Integer, nullable=False, default=28) slot_prefix = db.Column(db.String(16), nullable=False, default="A") start_number = db.Column(db.Integer, nullable=False, default=1) class Component(db.Model): __tablename__ = "components" id = db.Column(db.Integer, primary_key=True) box_id = db.Column(db.Integer, db.ForeignKey("boxes.id"), nullable=False) slot_index = db.Column(db.Integer, nullable=False) part_no = db.Column(db.String(100), nullable=False) name = db.Column(db.String(120), nullable=False) specification = db.Column(db.String(120), nullable=True) quantity = db.Column(db.Integer, nullable=False, default=0) location = db.Column(db.String(120), nullable=True) note = db.Column(db.Text, nullable=True) is_enabled = db.Column(db.Boolean, nullable=False, default=True) box = db.relationship("Box", backref=db.backref("components", lazy=True)) __table_args__ = ( db.UniqueConstraint("box_id", "slot_index", name="uq_box_slot"), ) class InventoryEvent(db.Model): __tablename__ = "inventory_events" id = db.Column(db.Integer, primary_key=True) box_id = db.Column(db.Integer, nullable=True) box_type = db.Column(db.String(30), nullable=True) component_id = db.Column(db.Integer, nullable=True) part_no = db.Column(db.String(100), nullable=True) event_type = db.Column(db.String(30), nullable=False) delta = db.Column(db.Integer, nullable=False, default=0) created_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now()) def _add_column_if_missing(table_name: str, column_name: str, ddl: str) -> None: columns = { row[1] for row in db.session.execute(db.text(f"PRAGMA table_info({table_name})")).fetchall() } if column_name not in columns: db.session.execute(db.text(f"ALTER TABLE {table_name} ADD COLUMN {ddl}")) def ensure_schema() -> None: _add_column_if_missing( "boxes", "box_type", "box_type VARCHAR(30) NOT NULL DEFAULT 'small_28'", ) _add_column_if_missing( "boxes", "slot_capacity", "slot_capacity INTEGER NOT NULL DEFAULT 28", ) _add_column_if_missing( "boxes", "slot_prefix", "slot_prefix VARCHAR(16) NOT NULL DEFAULT 'A'", ) _add_column_if_missing( "boxes", "start_number", "start_number INTEGER NOT NULL DEFAULT 1", ) _add_column_if_missing( "components", "is_enabled", "is_enabled BOOLEAN NOT NULL DEFAULT 1", ) db.session.commit() def slot_code_for_box(box: Box, slot_index: int) -> str: serial = box.start_number + slot_index - 1 return f"{box.slot_prefix}{serial}" def slot_range_label(box: Box) -> str: start_code = slot_code_for_box(box, 1) end_code = slot_code_for_box(box, box.slot_capacity) return f"{start_code}-{end_code}" def _find_enabled_part_no_conflict(part_no: str, exclude_component_id: int = None): normalized = (part_no or "").strip() if not normalized: return None query = Component.query.filter( Component.part_no == normalized, Component.is_enabled.is_(True), ) if exclude_component_id is not None: query = query.filter(Component.id != exclude_component_id) return query.order_by(Component.box_id.asc(), Component.slot_index.asc()).first() def _normalize_material_text(text: str) -> str: raw = (text or "").upper().strip() if not raw: return "" raw = raw.replace("(", "(").replace(")", ")") raw = re.sub(r"[\s\-_/,|;:,。;:]+", "", raw) raw = re.sub(r"[()\[\]{}]", "", raw) return raw def _material_identity_key(name: str, specification: str) -> str: name_key = _normalize_material_text(name) spec_key = _normalize_material_text(specification) if not name_key and not spec_key: return "" return f"{name_key}|{spec_key}" def _find_enabled_material_conflict( name: str, specification: str, exclude_component_id: int = None, exclude_part_no: str = "", ): target_key = _material_identity_key(name, specification) if not target_key: return None query = Component.query.filter(Component.is_enabled.is_(True)) if exclude_component_id is not None: query = query.filter(Component.id != exclude_component_id) normalized_exclude_part_no = (exclude_part_no or "").strip() for candidate in query.order_by(Component.box_id.asc(), Component.slot_index.asc()).all(): if normalized_exclude_part_no and (candidate.part_no or "").strip() == normalized_exclude_part_no: continue if _material_identity_key(candidate.name, candidate.specification) == target_key: return candidate return None def _is_truthy_form_value(value: str) -> bool: return (value or "").strip().lower() in {"1", "true", "yes", "on"} def _append_merge_part_no_note(note: str, part_no: str) -> str: normalized = (part_no or "").strip() if not normalized: return note or "" line = f"合并料号: {normalized}" current = note or "" if line in current: return current return f"{current}\n{line}".strip() def _is_slot_part_replacement(component: Component, new_part_no: str) -> bool: if component is None: return False old_part_no = (component.part_no or "").strip() target_part_no = (new_part_no or "").strip() return bool(old_part_no and target_part_no and old_part_no != target_part_no) def _merge_into_existing_component( target: Component, incoming_part_no: str, incoming_name: str, incoming_specification: str, incoming_note: str, incoming_quantity: int, source_component: Component = None, source_box: Box = None, ) -> None: old_target_enabled_qty = int(target.quantity or 0) if target.is_enabled else 0 target.name = incoming_name or target.name if incoming_specification: target.specification = incoming_specification if incoming_note: target.note = incoming_note if incoming_part_no and incoming_part_no != target.part_no: target.note = _append_merge_part_no_note(target.note, incoming_part_no) target.quantity = int(target.quantity or 0) + int(incoming_quantity or 0) target.is_enabled = True target_delta = int(target.quantity or 0) - old_target_enabled_qty if target_delta: log_inventory_event( event_type="component_merge_confirmed", delta=target_delta, box=target.box, component=target, part_no=target.part_no, ) if source_component is not None and source_component.id != target.id: if source_component.is_enabled and source_component.quantity: log_inventory_event( event_type="component_merge_cleanup", delta=-int(source_component.quantity), box=source_box, component=source_component, part_no=source_component.part_no, ) db.session.delete(source_component) def _format_component_position(component: Component) -> str: target_box = Box.query.get(component.box_id) if not target_box: return f"盒子#{component.box_id} 位置#{component.slot_index}" return f"{target_box.name} {slot_code_for_box(target_box, component.slot_index)}" def compose_box_name(base_name: str, prefix: str, start_number: int, slot_capacity: int) -> str: base = (base_name or "").strip() if not base: base = "盒子" end_number = start_number + slot_capacity - 1 return f"{base} {prefix}{start_number}-{prefix}{end_number}" def make_unique_box_name(candidate_name: str, exclude_box_id: int = None) -> str: name = candidate_name counter = 2 while True: query = Box.query.filter_by(name=name) if exclude_box_id is not None: query = query.filter(Box.id != exclude_box_id) if not query.first(): return name name = f"{candidate_name} #{counter}" counter += 1 def infer_base_name(box: Box) -> str: pattern = rf"\s+{re.escape(box.slot_prefix)}\d+-{re.escape(box.slot_prefix)}\d+(?:\s+#\d+)?$" base = re.sub(pattern, "", box.name).strip() return base or box.name def _extract_lcsc_code_from_text(text: str) -> str: raw = (text or "").upper() match = re.search(r"\bC\d{3,}\b", raw) return match.group(0) if match else "" def _parse_slot_spec_fields(specification: str) -> dict: parts = [p.strip() for p in (specification or "").split("/") if p.strip()] return { "brand": parts[0] if len(parts) >= 1 else "", "package": parts[1] if len(parts) >= 2 else "", "usage": parts[2] if len(parts) >= 3 else "", } def _parse_note_detail_fields(note: str) -> dict: raw = (note or "") lcsc_code = _extract_lcsc_code_from_text(raw) product_id = "" id_match = re.search(r"\b(?:ID|productId)\s*(\d+)\b", raw, flags=re.IGNORECASE) if id_match: product_id = id_match.group(1) arrange = "" arrange_match = re.search(r"编排\s*([^|]+)", raw) if arrange_match: arrange = arrange_match.group(1).strip() min_pack = "" min_pack_match = re.search(r"最小包装\s*([^|]+)", raw) if min_pack_match: min_pack = min_pack_match.group(1).strip() return { "lcsc_code": lcsc_code, "product_id": product_id, "arrange": arrange, "min_pack": min_pack, } def slot_data_for_box(box: Box): components = Component.query.filter_by(box_id=box.id).all() slot_map = {c.slot_index: c for c in components} slots = [] for slot in range(1, box.slot_capacity + 1): component = slot_map.get(slot) lcsc_code = _extract_lcsc_code_from_text(component.note if component else "") if not lcsc_code and component: lcsc_code = _extract_lcsc_code_from_text(component.part_no) spec_fields = _parse_slot_spec_fields(component.specification) if component else { "brand": "", "package": "", "usage": "", } slots.append( { "slot": slot, "slot_code": slot_code_for_box(box, slot), "component": component, "lcsc_code": lcsc_code, "spec_fields": spec_fields, } ) return slots def bag_rows_for_box(box: Box): rows = [] components = ( Component.query.filter_by(box_id=box.id) .order_by(Component.slot_index.asc()) .all() ) for c in components: rows.append({"component": c, "slot_code": slot_code_for_box(box, c.slot_index)}) return rows def _parse_non_negative_int(raw_value: str, default_value: int = 0) -> int: raw = (raw_value or "").strip() if raw == "": return default_value value = int(raw) if value < 0: raise ValueError return value def normalize_legacy_data() -> None: db.session.execute( db.text( "UPDATE boxes SET box_type = 'small_28' WHERE box_type IS NULL OR box_type = ''" ) ) db.session.execute( db.text("UPDATE boxes SET slot_capacity = 28 WHERE slot_capacity IS NULL") ) db.session.execute( db.text("UPDATE boxes SET slot_prefix = 'A' WHERE slot_prefix IS NULL OR slot_prefix = ''") ) db.session.execute( db.text("UPDATE boxes SET start_number = 1 WHERE start_number IS NULL") ) db.session.execute( db.text("UPDATE components SET is_enabled = 1 WHERE is_enabled IS NULL") ) for box in Box.query.all(): if box.box_type not in BOX_TYPES: box.box_type = "small_28" if not box.slot_capacity or box.slot_capacity < 1: box.slot_capacity = BOX_TYPES[box.box_type]["default_capacity"] if not box.slot_prefix: box.slot_prefix = BOX_TYPES[box.box_type]["default_prefix"] if box.start_number is None or box.start_number < 0: box.start_number = 1 if box.box_type == "bag": box.start_number = 1 box.name = "袋装清单" if not Box.query.filter_by(box_type="bag").first(): default_meta = BOX_TYPES["bag"] db.session.add( Box( name="袋装清单", description=default_meta["default_desc"], box_type="bag", slot_capacity=default_meta["default_capacity"], slot_prefix=default_meta["default_prefix"], start_number=1, ) ) db.session.commit() def get_fixed_bag_box() -> Box: bag_box = Box.query.filter_by(box_type="bag").order_by(Box.id.asc()).first() if bag_box: return bag_box meta = BOX_TYPES["bag"] bag_box = Box( name="袋装清单", description=meta["default_desc"], box_type="bag", slot_capacity=meta["default_capacity"], slot_prefix=meta["default_prefix"], start_number=1, ) db.session.add(bag_box) db.session.commit() return bag_box @app.route("/box//bag-capacity", methods=["POST"]) def update_bag_capacity(box_id: int): box = Box.query.get_or_404(box_id) if box.box_type != "bag": return bad_request("当前容器不是袋装清单", box.box_type) try: slot_capacity = _parse_non_negative_int(request.form.get("slot_capacity", ""), box.slot_capacity) except ValueError: return render_box_page(box, error="袋位数量必须是大于等于 1 的整数") if slot_capacity < 1: return render_box_page(box, error="袋位数量必须是大于等于 1 的整数") max_used_slot = ( db.session.query(db.func.max(Component.slot_index)) .filter_by(box_id=box.id) .scalar() or 0 ) if max_used_slot > slot_capacity: return render_box_page(box, error=f"袋位数量不能小于已使用位置 {max_used_slot}") box.slot_capacity = slot_capacity db.session.commit() return redirect(url_for("view_box", box_id=box.id, notice="袋位数量已更新")) def make_overview_rows(box: Box): enabled_components = ( Component.query.filter_by(box_id=box.id, is_enabled=True) .order_by(Component.slot_index.asc()) .all() ) rows = [] for c in enabled_components: rows.append( { "slot_code": slot_code_for_box(box, c.slot_index), "name": c.name, "part_no": c.part_no, } ) return rows def box_sort_key(box: Box): return ( (box.slot_prefix or "").upper(), box.start_number if box.start_number is not None else 0, box.name or "", ) def has_range_conflict( *, box_type: str, prefix: str, start_number: int, slot_capacity: int, exclude_box_id: int = None, ): end_number = start_number + slot_capacity - 1 query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix) if exclude_box_id is not None: query = query.filter(Box.id != exclude_box_id) for other in query.all(): other_start = other.start_number other_end = other.start_number + other.slot_capacity - 1 # Two ranges overlap unless one is strictly before the other. if not (end_number < other_start or start_number > other_end): return True, other return False, None def suggest_next_start_number( *, box_type: str, prefix: str, slot_capacity: int, exclude_box_id: int = None, ) -> int: max_end = 0 query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix) if exclude_box_id is not None: query = query.filter(Box.id != exclude_box_id) for other in query.all(): other_end = other.start_number + other.slot_capacity - 1 if other_end > max_end: max_end = other_end return max_end + 1 if max_end > 0 else 1 def build_index_anchor(box_type: str = "") -> str: if box_type in BOX_TYPES: return f"group-{box_type}" return "" def bad_request(message: str, box_type: str = ""): anchor = build_index_anchor(box_type) if anchor and box_type in BOX_TYPES: back_url = url_for("type_page", box_type=box_type) else: back_url = request.referrer or url_for("types_page") return ( render_template( "error.html", status_code=400, title="请求参数有误", message=message, back_url=back_url, ), 400, ) def render_box_page(box: Box, error: str = "", notice: str = ""): slots = slot_data_for_box(box) return render_template( "box.html", box=box, slots=slots, box_types=BOX_TYPES, slot_range=slot_range_label(box), low_stock_threshold=LOW_STOCK_THRESHOLD, error=error, notice=notice, ) def _next_empty_slot_index(box: Box, occupied_slots: set[int]): for idx in range(1, box.slot_capacity + 1): if idx not in occupied_slots: return idx return None def _parse_bulk_line(line: str): parts = [p.strip() for p in re.split(r"[,\t]", line)] while len(parts) < 5: parts.append("") return { "part_no": parts[0], "name": parts[1], "quantity_raw": parts[2], "specification": parts[3], "note": parts[4], } def log_inventory_event( *, event_type: str, delta: int, box: Box = None, component: Component = None, part_no: str = "", ): event = InventoryEvent( box_id=box.id if box else (component.box_id if component else None), box_type=box.box_type if box else None, component_id=component.id if component else None, part_no=(part_no or (component.part_no if component else "") or "").strip() or None, event_type=event_type, delta=int(delta), ) db.session.add(event) def parse_days_value(raw_days: str) -> int: try: days = int((raw_days or "7").strip()) except ValueError: days = 7 return days if days in (7, 30) else 7 def parse_box_type_filter(raw_box_type: str) -> str: box_type = (raw_box_type or "").strip() return box_type if box_type in BOX_TYPES else "all" def _to_date(raw_day): if isinstance(raw_day, str): return datetime.strptime(raw_day, "%Y-%m-%d").date() return raw_day def query_event_daily_delta(days: int, box_type_filter: str = "all"): today = datetime.now().date() start_day = today - timedelta(days=days - 1) query = db.session.query( db.func.date(InventoryEvent.created_at).label("event_day"), db.func.sum(InventoryEvent.delta).label("daily_delta"), ).filter(InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time())) if box_type_filter != "all": query = query.filter(InventoryEvent.box_type == box_type_filter) rows = query.group_by(db.func.date(InventoryEvent.created_at)).all() delta_by_day = {} for row in rows: delta_by_day[_to_date(row.event_day)] = int(row.daily_delta or 0) return delta_by_day def build_trend_points_from_events(days: int, total_quantity: int, box_type_filter: str = "all"): safe_days = days if days in (7, 30) else 7 today = datetime.now().date() delta_by_day = query_event_daily_delta(safe_days, box_type_filter) points = [] running_total = total_quantity reverse_days = [today - timedelta(days=offset) for offset in range(safe_days)] for day in reverse_days: points.append( { "date": day, "label": day.strftime("%m-%d"), "value": running_total, } ) running_total -= delta_by_day.get(day, 0) points.reverse() return points def build_box_type_trend_series(days: int, totals_by_type: dict): safe_days = days if days in (7, 30) else 7 today = datetime.now().date() all_days = [today - timedelta(days=offset) for offset in range(safe_days)] series = {} for box_type in BOX_TYPES.keys(): delta_by_day = query_event_daily_delta(safe_days, box_type) running_total = int(totals_by_type.get(box_type, 0)) values = [] for day in all_days: values.append(running_total) running_total -= delta_by_day.get(day, 0) values.reverse() series[box_type] = values return { "labels": [day.strftime("%m-%d") for day in reversed(all_days)], "series": series, } def make_sparkline(values: list[int], width: int = 220, height: int = 56) -> str: if not values: return "" min_value = min(values) max_value = max(values) span = max(max_value - min_value, 1) step_x = width / max(len(values) - 1, 1) points = [] for idx, value in enumerate(values): x = idx * step_x y = height - ((value - min_value) / span) * height points.append(f"{x:.2f},{y:.2f}") return " ".join(points) def event_type_label(event_type: str) -> str: labels = { "quick_inbound_add": "快速入库新增", "quick_inbound_merge": "快速入库合并", "component_outbound": "快速出库", "component_save": "编辑保存", "component_enable": "启用元件", "component_disable": "停用元件", "component_delete": "删除元件", "bag_add": "袋装新增", "bag_batch_add": "袋装批量新增", "bag_merge": "袋装合并", "bag_batch_merge": "袋装批量合并", "box_delete": "删除盒子", } return labels.get(event_type, event_type) def recent_events(limit: int = 20, box_type_filter: str = "all"): query = InventoryEvent.query if box_type_filter != "all": query = query.filter_by(box_type=box_type_filter) rows = query.order_by(InventoryEvent.created_at.desc()).limit(limit).all() items = [] for row in rows: items.append( { "time": row.created_at.strftime("%Y-%m-%d %H:%M") if row.created_at else "-", "type": event_type_label(row.event_type), "box_type": BOX_TYPES.get(row.box_type, {}).get("label", "全部"), "part_no": row.part_no or "-", "delta": int(row.delta or 0), } ) return items def build_dashboard_context(): boxes = Box.query.all() box_by_id = {box.id: box for box in boxes} boxes.sort(key=box_sort_key) groups = {key: [] for key in BOX_TYPES.keys()} for box in boxes: box_type = box.box_type if box.box_type in BOX_TYPES else "small_28" overview_rows = make_overview_rows(box) groups[box_type].append( { "box": box, "used_count": len(overview_rows), "slot_range": slot_range_label(box), "overview_rows": overview_rows, "base_name": infer_base_name(box), } ) components = Component.query.all() enabled_components = [c for c in components if c.is_enabled] disabled_components = [c for c in components if not c.is_enabled] low_stock_components = [c for c in enabled_components if c.quantity < LOW_STOCK_THRESHOLD] trend_points_7d = build_trend_points_from_events( days=7, total_quantity=sum(c.quantity for c in enabled_components), box_type_filter="all", ) period_net_change_7d = 0 if len(trend_points_7d) >= 2: period_net_change_7d = trend_points_7d[-1]["value"] - trend_points_7d[0]["value"] low_stock_items = [] for c in sorted(low_stock_components, key=lambda item: (item.quantity, item.name or ""))[:12]: box = box_by_id.get(c.box_id) box_type_key = box.box_type if box and box.box_type in BOX_TYPES else "small_28" low_stock_items.append( { "name": c.name, "part_no": c.part_no, "quantity": c.quantity, "box_type": box_type_key, "box_type_label": BOX_TYPES[box_type_key]["label"], "box_name": box.name if box else f"盒 {c.box_id}", "slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index), "edit_url": url_for("edit_component", box_id=c.box_id, slot=c.slot_index), } ) category_stats = [] max_category_quantity = 0 for key, meta in BOX_TYPES.items(): category_components = [ c for c in enabled_components if box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == key ] quantity = sum(c.quantity for c in category_components) max_category_quantity = max(max_category_quantity, quantity) category_stats.append( { "key": key, "label": meta["label"], "item_count": len(category_components), "quantity": quantity, } ) stats = { "box_count": len(boxes), "active_items": len(enabled_components), "low_stock_count": len(low_stock_components), "disabled_count": len(disabled_components), "max_category_quantity": max_category_quantity, "period_net_change_7d": period_net_change_7d, } return { "groups": groups, "stats": stats, "category_stats": category_stats, "low_stock_items": low_stock_items, } def _build_restock_payload(*, limit: int = 20, threshold: int = LOW_STOCK_THRESHOLD) -> dict: boxes = Box.query.all() box_by_id = {box.id: box for box in boxes} enabled_components = Component.query.filter_by(is_enabled=True).all() low_stock_components = [c for c in enabled_components if int(c.quantity or 0) < threshold] low_items = [] for c in sorted(low_stock_components, key=lambda item: (int(item.quantity or 0), item.name or ""))[:limit]: box = box_by_id.get(c.box_id) box_type = box.box_type if box and box.box_type in BOX_TYPES else "small_28" low_items.append( { "part_no": c.part_no, "name": c.name, "quantity": int(c.quantity or 0), "box_type": box_type, "box_type_label": BOX_TYPES[box_type]["label"], "box_name": box.name if box else f"盒 {c.box_id}", "slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index), } ) start_day = datetime.now().date() - timedelta(days=29) outbound_rows = ( db.session.query(InventoryEvent.part_no, db.func.sum(-InventoryEvent.delta).label("outbound_qty")) .filter( InventoryEvent.event_type == "component_outbound", InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time()), InventoryEvent.delta < 0, ) .group_by(InventoryEvent.part_no) .order_by(db.func.sum(-InventoryEvent.delta).desc()) .limit(20) .all() ) outbound_top = [ {"part_no": row[0] or "-", "outbound_qty_30d": int(row[1] or 0)} for row in outbound_rows ] return { "generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "threshold": int(threshold), "low_stock_items": low_items, "top_outbound_30d": outbound_top, } def _call_siliconflow_chat( system_prompt: str, user_prompt: str, *, api_url: str, model: str, api_key: str, timeout: int, ) -> str: api_key = (api_key or "").strip() if not api_key: raise RuntimeError("SILICONFLOW_API_KEY 未配置") if not api_url: raise RuntimeError("AI API URL 未配置") if not model: raise RuntimeError("AI 模型名称未配置") payload = { "model": model, "temperature": 0.2, "max_tokens": 700, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], } body = json.dumps(payload).encode("utf-8") req = urllib.request.Request( api_url, data=body, method="POST", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", }, ) try: with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read().decode("utf-8") except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="ignore") raise RuntimeError(f"AI 服务返回 HTTP {exc.code}: {detail[:200]}") from exc except urllib.error.URLError as exc: raise RuntimeError(f"AI 服务连接失败: {exc.reason}") from exc try: data = json.loads(raw) return data["choices"][0]["message"]["content"].strip() except (KeyError, IndexError, TypeError, json.JSONDecodeError) as exc: raise RuntimeError("AI 返回格式无法解析") from exc @app.route("/") def index(): return redirect(url_for("types_page")) @app.route("/types") def types_page(): dashboard = build_dashboard_context() notice = request.args.get("notice", "").strip() error = request.args.get("error", "").strip() category_stats_map = {item["key"]: item for item in dashboard["category_stats"]} low_stock_groups = [] for key, meta in BOX_TYPES.items(): grouped_items = [ item for item in dashboard["low_stock_items"] if item.get("box_type") == key ] low_stock_groups.append( { "key": key, "label": meta["label"], "items": grouped_items, } ) type_cards = [] for key, meta in BOX_TYPES.items(): category_item = category_stats_map.get(key, {}) type_cards.append( { "key": key, "label": meta["label"], "desc": meta["default_desc"], "count": len(dashboard["groups"].get(key, [])), "item_count": int(category_item.get("item_count", 0)), "quantity": int(category_item.get("quantity", 0)), "url": url_for("type_page", box_type=key), } ) return render_template( "types.html", type_cards=type_cards, stats=dashboard["stats"], low_stock_groups=low_stock_groups, notice=notice, error=error, ) @app.route("/container-type//edit", methods=["GET", "POST"]) def edit_container_type(box_type: str): if box_type not in BOX_TYPES: return bad_request("无效盒子类型", "") meta = BOX_TYPES[box_type] error = "" if request.method == "POST": label = request.form.get("label", "").strip() default_desc = request.form.get("default_desc", "").strip() default_prefix = request.form.get("default_prefix", "").strip().upper() if not label: error = "容器名称不能为空" elif not default_prefix: error = "默认前缀不能为空" if not error: meta["label"] = label meta["default_desc"] = default_desc or DEFAULT_BOX_TYPES[box_type]["default_desc"] meta["default_prefix"] = default_prefix _save_box_type_overrides() return redirect(url_for("types_page", notice="容器属性已更新")) return render_template( "type_edit.html", box_type=box_type, meta=meta, error=error, ) @app.route("/type/") def type_page(box_type: str): if box_type not in BOX_TYPES: return bad_request("无效盒子类型", "") if box_type == "bag": return render_box_page(get_fixed_bag_box()) dashboard = build_dashboard_context() return render_template( "index.html", groups=dashboard["groups"], box_types=BOX_TYPES, stats=dashboard["stats"], category_stats=dashboard["category_stats"], low_stock_items=dashboard["low_stock_items"], view_box_types=[box_type], current_box_type=box_type, separate_mode=True, ) @app.route("/boxes/create", methods=["POST"]) def create_box(): box_type = request.form.get("box_type", "small_28").strip() base_name = request.form.get("name", "").strip() description = request.form.get("description", "").strip() slot_prefix = request.form.get("slot_prefix", "").strip().upper() if box_type not in BOX_TYPES: return bad_request("无效盒子类型", box_type) if box_type == "bag": return redirect(url_for("type_page", box_type="bag")) if not base_name: return bad_request("盒子名称不能为空", box_type) try: start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1) except ValueError: return bad_request("起始序号必须是大于等于 0 的整数", box_type) meta = BOX_TYPES[box_type] slot_capacity = meta["default_capacity"] if box_type == "custom": try: slot_capacity = _parse_non_negative_int( request.form.get("slot_capacity", str(meta["default_capacity"])), meta["default_capacity"], ) except ValueError: return bad_request("格数必须是大于等于 1 的整数", box_type) if slot_capacity < 1: return bad_request("格数必须是大于等于 1 的整数", box_type) effective_prefix = slot_prefix or meta["default_prefix"] conflict, other_box = has_range_conflict( box_type=box_type, prefix=effective_prefix, start_number=start_number, slot_capacity=slot_capacity, ) if conflict: return bad_request( "编号范围冲突: 与现有盒子 " f"[{other_box.name}]" " 重叠,请更换前缀或起始序号", box_type, ) generated_name = compose_box_name( base_name=base_name, prefix=effective_prefix, start_number=start_number, slot_capacity=slot_capacity, ) final_name = make_unique_box_name(generated_name) box = Box( name=final_name, description=description or meta["default_desc"], box_type=box_type, slot_capacity=slot_capacity, slot_prefix=effective_prefix, start_number=start_number, ) db.session.add(box) db.session.commit() return_to_type = parse_box_type_filter(request.form.get("return_to_type", "")) target_type = return_to_type if return_to_type != "all" else box_type return redirect(url_for("type_page", box_type=target_type)) @app.route("/boxes//update", methods=["POST"]) def update_box(box_id: int): box = Box.query.get_or_404(box_id) base_name = request.form.get("name", "").strip() description = request.form.get("description", "").strip() slot_prefix = request.form.get("slot_prefix", "").strip().upper() if not base_name: return bad_request("盒子名称不能为空", box.box_type) try: start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1) except ValueError: return bad_request("起始序号必须是大于等于 0 的整数", box.box_type) slot_capacity = box.slot_capacity if box.box_type == "custom": try: slot_capacity = _parse_non_negative_int( request.form.get("slot_capacity", str(box.slot_capacity)), box.slot_capacity, ) except ValueError: return bad_request("格数必须是大于等于 1 的整数", box.box_type) if slot_capacity < 1: return bad_request("格数必须是大于等于 1 的整数", box.box_type) max_used_slot = ( db.session.query(db.func.max(Component.slot_index)) .filter_by(box_id=box.id) .scalar() or 0 ) if max_used_slot > slot_capacity: return bad_request( f"格数不能小于已使用位置 {max_used_slot}", box.box_type, ) effective_prefix = slot_prefix or BOX_TYPES[box.box_type]["default_prefix"] conflict, other_box = has_range_conflict( box_type=box.box_type, prefix=effective_prefix, start_number=start_number, slot_capacity=slot_capacity, exclude_box_id=box.id, ) if conflict: return bad_request( "编号范围冲突: 与现有盒子 " f"[{other_box.name}]" " 重叠,请更换前缀或起始序号", box.box_type, ) generated_name = compose_box_name( base_name=base_name, prefix=effective_prefix, start_number=start_number, slot_capacity=slot_capacity, ) box.name = make_unique_box_name(generated_name, exclude_box_id=box.id) box.description = description or BOX_TYPES[box.box_type]["default_desc"] box.slot_prefix = effective_prefix box.start_number = start_number box.slot_capacity = slot_capacity db.session.commit() return_to_type = parse_box_type_filter(request.form.get("return_to_type", "")) target_type = return_to_type if return_to_type != "all" else box.box_type return redirect(url_for("type_page", box_type=target_type)) @app.route("/boxes//delete", methods=["POST"]) def delete_box(box_id: int): box = Box.query.get_or_404(box_id) if box.box_type == "bag": return bad_request("袋装清单为固定大容器,不能删除", box.box_type) enabled_sum = ( db.session.query(db.func.sum(Component.quantity)) .filter_by(box_id=box.id, is_enabled=True) .scalar() or 0 ) if enabled_sum: log_inventory_event(event_type="box_delete", delta=-int(enabled_sum), box=box) Component.query.filter_by(box_id=box.id).delete() box_type = box.box_type db.session.delete(box) db.session.commit() return_to_type = parse_box_type_filter(request.form.get("return_to_type", "")) target_type = return_to_type if return_to_type != "all" else box_type return redirect(url_for("type_page", box_type=target_type)) @app.route("/boxes/suggest-start") def suggest_start(): box_type = request.args.get("box_type", "small_28").strip() if box_type not in BOX_TYPES: return {"ok": False, "message": "无效盒子类型"}, 400 slot_prefix = request.args.get("slot_prefix", "").strip().upper() effective_prefix = slot_prefix or BOX_TYPES[box_type]["default_prefix"] box_id_raw = request.args.get("box_id", "").strip() exclude_box_id = None slot_capacity = BOX_TYPES[box_type]["default_capacity"] if box_type == "custom" and not box_id_raw: try: slot_capacity = _parse_non_negative_int( request.args.get("slot_capacity", str(slot_capacity)), slot_capacity, ) except ValueError: return {"ok": False, "message": "格数必须是大于等于 1 的整数"}, 400 if slot_capacity < 1: return {"ok": False, "message": "格数必须是大于等于 1 的整数"}, 400 if box_id_raw: try: box_id = int(box_id_raw) except ValueError: return {"ok": False, "message": "box_id 非法"}, 400 box = Box.query.get(box_id) if not box: return {"ok": False, "message": "盒子不存在"}, 404 box_type = box.box_type slot_capacity = box.slot_capacity exclude_box_id = box.id suggested = suggest_next_start_number( box_type=box_type, prefix=effective_prefix, slot_capacity=slot_capacity, exclude_box_id=exclude_box_id, ) end_number = suggested + slot_capacity - 1 return { "ok": True, "start_number": suggested, "slot_prefix": effective_prefix, "preview_range": f"{effective_prefix}{suggested}-{effective_prefix}{end_number}", } @app.route("/box/") def view_box(box_id: int): box = Box.query.get_or_404(box_id) return render_box_page(box) @app.route("/box//labels/export") def export_box_labels_csv(box_id: int): box = Box.query.get_or_404(box_id) rows = ( Component.query.filter_by(box_id=box.id, is_enabled=True) .order_by(Component.slot_index.asc()) .all() ) output = StringIO() writer = csv.writer(output) writer.writerow( [ "盒子名称(box_name)", "位置编号(slot_code)", "料号(part_no)", "名称(name)", "品牌(brand)", "封装(package)", "用途(usage)", "立创编号(lcsc_code)", "立创商品ID(lcsc_product_id)", "商品编排(arrange)", "最小包装(min_pack)", "规格(specification)", "数量(quantity)", "位置备注(location)", "备注(note)", ] ) for c in rows: slot_code = slot_code_for_box(box, c.slot_index) spec_fields = _parse_slot_spec_fields(c.specification) note_fields = _parse_note_detail_fields(c.note) if not note_fields["lcsc_code"]: note_fields["lcsc_code"] = _extract_lcsc_code_from_text(c.part_no) writer.writerow( [ box.name, slot_code, c.part_no or "", c.name or "", spec_fields["brand"], spec_fields["package"], spec_fields["usage"], note_fields["lcsc_code"], note_fields["product_id"], note_fields["arrange"], note_fields["min_pack"], c.specification or "", int(c.quantity or 0), c.location or "", c.note or "", ] ) csv_content = "\ufeff" + output.getvalue() output.close() filename = f"labels_box_{box.id}.csv" return Response( csv_content, mimetype="text/csv; charset=utf-8", headers={"Content-Disposition": f"attachment; filename={filename}"}, ) @app.route("/box//quick-inbound", methods=["POST"]) def quick_inbound(box_id: int): box = Box.query.get_or_404(box_id) raw_lines = request.form.get("lines", "") lines = [line.strip() for line in raw_lines.splitlines() if line.strip()] if not lines: return render_box_page(box, error="快速入库失败: 请至少输入一行") components = Component.query.filter_by(box_id=box.id).all() occupied_slots = {c.slot_index for c in components} added_count = 0 skipped_lines = [] changed = False for line_no, line in enumerate(lines, start=1): parsed = _parse_bulk_line(line) part_no = parsed["part_no"] name = parsed["name"] quantity_raw = parsed["quantity_raw"] specification = parsed["specification"] note = parsed["note"] if not part_no or not name: skipped_lines.append(f"第 {line_no} 行") continue try: quantity = _parse_non_negative_int(quantity_raw, 0) except ValueError: skipped_lines.append(f"第 {line_no} 行") continue part_no_conflict = _find_enabled_part_no_conflict(part_no) if part_no_conflict: skipped_lines.append( f"第 {line_no} 行({part_no} 已在 {_format_component_position(part_no_conflict)},需人工确认后再合并)" ) continue material_conflict = _find_enabled_material_conflict( name=name, specification=specification, exclude_part_no=part_no, ) if material_conflict: skipped_lines.append( "第 " f"{line_no} 行(检测到同参数物料: {material_conflict.part_no} 已在 " f"{_format_component_position(material_conflict)},需人工确认后再合并)" ) continue slot_index = _next_empty_slot_index(box, occupied_slots) if slot_index is None: skipped_lines.append(f"第 {line_no} 行(盒子已满)") continue item = Component( box_id=box.id, slot_index=slot_index, part_no=part_no, name=name, quantity=quantity, specification=specification or None, note=note or None, is_enabled=True, ) db.session.add(item) if quantity: log_inventory_event( event_type="quick_inbound_add", delta=quantity, box=box, component=item, part_no=part_no, ) occupied_slots.add(slot_index) added_count += 1 changed = True if changed: db.session.commit() if added_count == 0: return render_box_page( box, error="快速入库失败: 没有可导入的数据,请检查格式", ) message = f"快速入库完成: 新增 {added_count} 条" if skipped_lines: message += ";跳过: " + ", ".join(skipped_lines) return render_box_page(box, notice=message) @app.route("/box//bags/add", methods=["POST"]) def add_bag_item(box_id: int): box = Box.query.get_or_404(box_id) if box.box_type != "bag": return "当前盒子不是袋装清单", 400 part_no = request.form.get("part_no", "").strip() name = request.form.get("name", "").strip() specification = request.form.get("specification", "").strip() note = request.form.get("note", "").strip() if not part_no or not name: return render_box_page(box, error="袋装新增失败: 料号和名称不能为空") try: quantity = _parse_non_negative_int(request.form.get("quantity", "0"), 0) except ValueError: return render_box_page(box, error="袋装新增失败: 数量必须是大于等于 0 的整数") part_no_conflict = _find_enabled_part_no_conflict(part_no) if part_no_conflict: return render_box_page( box, error=( f"袋装新增失败: 料号 {part_no} 已存在于 " f"{_format_component_position(part_no_conflict)},需人工确认后再合并" ), ) material_conflict = _find_enabled_material_conflict( name=name, specification=specification, exclude_part_no=part_no, ) if material_conflict: return render_box_page( box, error=( "袋装新增失败: 检测到同参数物料 " f"{material_conflict.part_no} 已在 " f"{_format_component_position(material_conflict)},需人工确认后再合并" ), ) next_slot = ( db.session.query(db.func.max(Component.slot_index)) .filter(Component.box_id == box.id) .scalar() or 0 ) + 1 item = Component( box_id=box.id, slot_index=next_slot, part_no=part_no, name=name, quantity=quantity, specification=specification or None, note=note or None, is_enabled=True, ) db.session.add(item) if quantity: log_inventory_event( event_type="bag_add", delta=quantity, box=box, component=item, part_no=part_no, ) if next_slot > box.slot_capacity: box.slot_capacity = next_slot db.session.commit() return render_box_page(box, notice="已新增 1 条袋装记录") @app.route("/box//bags/batch", methods=["POST"]) def add_bag_items_batch(box_id: int): box = Box.query.get_or_404(box_id) if box.box_type != "bag": return "当前盒子不是袋装清单", 400 raw_lines = request.form.get("lines", "") lines = [line.strip() for line in raw_lines.splitlines() if line.strip()] if not lines: return render_box_page(box, error="批量新增失败: 请至少输入一行") skipped_lines = [] added_count = 0 next_slot = ( db.session.query(db.func.max(Component.slot_index)) .filter(Component.box_id == box.id) .scalar() or 0 ) + 1 for line_no, line in enumerate(lines, start=1): parts = [p.strip() for p in re.split(r"[,\t]", line)] while len(parts) < 5: parts.append("") part_no = parts[0] name = parts[1] quantity_raw = parts[2] specification = parts[3] note = parts[4] if not part_no or not name: skipped_lines.append(f"第 {line_no} 行(料号或名称为空)") continue try: quantity = _parse_non_negative_int(quantity_raw, 0) except ValueError: skipped_lines.append(f"第 {line_no} 行(数量格式错误)") continue part_no_conflict = _find_enabled_part_no_conflict(part_no) if part_no_conflict: skipped_lines.append( f"第 {line_no} 行({part_no} 已在 {_format_component_position(part_no_conflict)},需人工确认后再合并)" ) continue material_conflict = _find_enabled_material_conflict( name=name, specification=specification, exclude_part_no=part_no, ) if material_conflict: skipped_lines.append( "第 " f"{line_no} 行(检测到同参数物料: {material_conflict.part_no} 已在 " f"{_format_component_position(material_conflict)},需人工确认后再合并)" ) continue new_component = Component( box_id=box.id, slot_index=next_slot, part_no=part_no, name=name, quantity=quantity, specification=specification or None, note=note or None, is_enabled=True, ) db.session.add(new_component) if quantity: log_inventory_event( event_type="bag_batch_add", delta=quantity, box=box, component=new_component, part_no=part_no, ) added_count += 1 next_slot += 1 if added_count: box.slot_capacity = max(box.slot_capacity, next_slot - 1) db.session.commit() if skipped_lines and added_count == 0: return render_box_page( box, error="批量新增失败: 没有可导入的数据(请检查: " + ", ".join(skipped_lines) + ")", ) if skipped_lines: return render_box_page( box, notice=f"已新增 {added_count} 条,以下行被跳过: " + ", ".join(skipped_lines), ) return render_box_page(box, notice=f"批量新增成功:新增 {added_count} 条") @app.route("/edit//", methods=["GET", "POST"]) def edit_component(box_id: int, slot: int): box = Box.query.get_or_404(box_id) if slot < 1 or slot > box.slot_capacity: return "无效的格子编号", 400 search_query = request.args.get("q", "").strip() notice = request.args.get("notice", "").strip() error = request.args.get("error", "").strip() component = Component.query.filter_by(box_id=box.id, slot_index=slot).first() settings = _get_ai_settings() lock_storage_mode = bool(settings.get("lock_storage_mode", False)) if request.method == "POST": action = request.form.get("action", "save") search_query_post = request.form.get("q", "").strip() search_query_effective = search_query_post or search_query delete_confirm_slot = request.form.get("delete_confirm_slot", "").strip().upper() expected_slot_code = slot_code_for_box(box, slot).strip().upper() if action == "delete": if lock_storage_mode: return render_template( "edit.html", box=box, slot=slot, slot_code=slot_code_for_box(box, slot), component=component, error="锁仓模式已开启,禁止删除位置绑定。", notice=notice, search_query=search_query_post, lock_storage_mode=lock_storage_mode, ) if delete_confirm_slot != expected_slot_code: return render_template( "edit.html", box=box, slot=slot, slot_code=slot_code_for_box(box, slot), component=component, error=f"删除确认失败: 请输入当前位置编号 {expected_slot_code}", notice=notice, search_query=search_query_post, lock_storage_mode=lock_storage_mode, ) if component: if component.is_enabled and component.quantity: log_inventory_event( event_type="component_delete", delta=-int(component.quantity), box=box, component=component, ) db.session.delete(component) db.session.commit() if search_query_effective: return redirect(url_for("search_page", q=search_query_effective)) return redirect(url_for("view_box", box_id=box.id)) if action == "toggle_enable": if component: if not component.is_enabled: component.is_enabled = True if component.quantity: log_inventory_event( event_type="component_enable", delta=int(component.quantity), box=box, component=component, ) db.session.commit() return redirect(url_for("edit_component", box_id=box.id, slot=slot, q=search_query_post)) if action == "toggle_disable": if component: if component.is_enabled: component.is_enabled = False if component.quantity: log_inventory_event( event_type="component_disable", delta=-int(component.quantity), box=box, component=component, ) db.session.commit() return redirect(url_for("edit_component", box_id=box.id, slot=slot, q=search_query_post)) part_no = request.form.get("part_no", "").strip() name = request.form.get("name", "").strip() specification = request.form.get("specification", "").strip() quantity_raw = request.form.get("quantity", "0").strip() note = request.form.get("note", "").strip() confirm_merge = _is_truthy_form_value(request.form.get("confirm_merge", "")) confirm_position_change = _is_truthy_form_value(request.form.get("confirm_position_change", "")) if not part_no or not name: error = "料号和名称不能为空" return render_template( "edit.html", box=box, slot=slot, slot_code=slot_code_for_box(box, slot), component=component, error=error, notice=notice, search_query=search_query_post, lock_storage_mode=lock_storage_mode, ) try: quantity = _parse_non_negative_int(quantity_raw, 0) except ValueError: error = "数量必须是大于等于 0 的整数" return render_template( "edit.html", box=box, slot=slot, slot_code=slot_code_for_box(box, slot), component=component, error=error, notice=notice, search_query=search_query_post, lock_storage_mode=lock_storage_mode, ) if lock_storage_mode and _is_slot_part_replacement(component, part_no): error = "锁仓模式已开启,禁止替换当前位置绑定料号。" return render_template( "edit.html", box=box, slot=slot, slot_code=slot_code_for_box(box, slot), component=component, error=error, notice=notice, search_query=search_query_post, lock_storage_mode=lock_storage_mode, ) if _is_slot_part_replacement(component, part_no) and not confirm_position_change: error = ( "当前位已绑定到固定料号,检测到替换操作。" "如需变更位置绑定,请勾选“我确认替换当前位物料”后再保存" ) return render_template( "edit.html", box=box, slot=slot, slot_code=slot_code_for_box(box, slot), component=component, error=error, notice=notice, search_query=search_query_post, lock_storage_mode=lock_storage_mode, ) current_component_id = component.id if component is not None else None part_no_conflict = _find_enabled_part_no_conflict(part_no, exclude_component_id=current_component_id) material_conflict = _find_enabled_material_conflict( name=name, specification=specification, exclude_component_id=current_component_id, exclude_part_no=part_no, ) conflict = part_no_conflict or material_conflict if conflict and not confirm_merge: conflict_reason = "同料号" if part_no_conflict else "同参数" error = ( f"检测到{conflict_reason}物料已存在于 {_format_component_position(conflict)};" "请勾选“人工确认后合并”再保存" ) return render_template( "edit.html", box=box, slot=slot, slot_code=slot_code_for_box(box, slot), component=component, error=error, notice=notice, search_query=search_query_post, lock_storage_mode=lock_storage_mode, ) if conflict and confirm_merge: _merge_into_existing_component( target=conflict, incoming_part_no=part_no, incoming_name=name, incoming_specification=specification, incoming_note=note, incoming_quantity=quantity, source_component=component, source_box=box, ) db.session.commit() target_box = Box.query.get(conflict.box_id) target_slot = conflict.slot_index notice_text = ( f"已人工确认合并到 {_format_component_position(conflict)}," f"累计数量 {conflict.quantity}" ) if target_box: return redirect( url_for( "edit_component", box_id=target_box.id, slot=target_slot, notice=notice_text, ) ) return redirect(url_for("view_box", box_id=box.id, notice=notice_text)) old_enabled_qty = 0 if component is not None and component.is_enabled: old_enabled_qty = int(component.quantity) if component is None: component = Component(box_id=box.id, slot_index=slot) db.session.add(component) component.part_no = part_no component.name = name component.specification = specification or None component.quantity = quantity component.note = note or None if component.is_enabled is None: component.is_enabled = True new_enabled_qty = quantity if component.is_enabled else 0 delta = int(new_enabled_qty - old_enabled_qty) if delta: log_inventory_event( event_type="component_save", delta=delta, box=box, component=component, part_no=part_no, ) db.session.commit() if search_query_effective: return redirect(url_for("search_page", q=search_query_effective)) return redirect(url_for("view_box", box_id=box.id)) return render_template( "edit.html", box=box, slot=slot, slot_code=slot_code_for_box(box, slot), component=component, error=error, notice=notice, search_query=search_query, lock_storage_mode=lock_storage_mode, ) @app.route("/edit///lcsc-import", methods=["POST"]) def lcsc_import_to_edit_slot(box_id: int, slot: int): box = Box.query.get_or_404(box_id) if slot < 1 or slot > box.slot_capacity: return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="目标位置超出当前容器范围")) try: quantity = _parse_non_negative_int(request.form.get("quantity", "0"), 0) except ValueError: return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="数量必须是大于等于 0 的整数")) settings = _get_ai_settings() product_identifier = request.form.get("lcsc_product_id", "").strip() try: product = _fetch_lcsc_product_basic(product_identifier, settings) except RuntimeError as exc: return redirect(url_for("edit_component", box_id=box.id, slot=slot, error=f"立创导入失败: {exc}")) mapped = _map_lcsc_product_to_component(product) if not mapped["part_no"] or not mapped["name"]: return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="立创导入失败: 商品信息缺少料号或名称")) component = Component.query.filter_by(box_id=box.id, slot_index=slot).first() confirm_merge = _is_truthy_form_value(request.form.get("confirm_merge", "")) confirm_position_change = _is_truthy_form_value(request.form.get("confirm_position_change", "")) if bool(settings.get("lock_storage_mode", False)) and _is_slot_part_replacement(component, mapped["part_no"]): return redirect( url_for( "edit_component", box_id=box.id, slot=slot, error="立创导入失败: 锁仓模式已开启,禁止替换当前位置绑定料号。", ) ) if _is_slot_part_replacement(component, mapped["part_no"]) and not confirm_position_change: return redirect( url_for( "edit_component", box_id=box.id, slot=slot, error=( "立创导入失败: 当前位已绑定固定料号。" "如需替换,请勾选“我确认替换当前位物料”后再导入" ), ) ) current_component_id = component.id if component is not None else None part_no_conflict = _find_enabled_part_no_conflict( mapped["part_no"], exclude_component_id=current_component_id, ) material_conflict = _find_enabled_material_conflict( name=mapped["name"], specification=mapped["specification"], exclude_component_id=current_component_id, exclude_part_no=mapped["part_no"], ) conflict = part_no_conflict or material_conflict if conflict and not confirm_merge: conflict_reason = "同料号" if part_no_conflict else "同参数" return redirect( url_for( "edit_component", box_id=box.id, slot=slot, error=( f"立创导入失败: 检测到{conflict_reason}物料已存在于 " f"{_format_component_position(conflict)},请勾选人工确认后合并" ), ) ) if conflict and confirm_merge: _merge_into_existing_component( target=conflict, incoming_part_no=mapped["part_no"], incoming_name=mapped["name"], incoming_specification=mapped["specification"], incoming_note=mapped["note"], incoming_quantity=quantity, source_component=component, source_box=box, ) db.session.commit() target_box = Box.query.get(conflict.box_id) target_slot = conflict.slot_index notice_text = ( f"已人工确认合并到 {_format_component_position(conflict)}," f"累计数量 {conflict.quantity}" ) if target_box: return redirect( url_for( "edit_component", box_id=target_box.id, slot=target_slot, notice=notice_text, ) ) return redirect(url_for("view_box", box_id=box.id, notice=notice_text)) old_enabled_qty = int(component.quantity or 0) if component and component.is_enabled else 0 if component is None: component = Component(box_id=box.id, slot_index=slot) db.session.add(component) component.part_no = mapped["part_no"] component.name = mapped["name"] component.specification = mapped["specification"] or None component.note = mapped["note"] or None component.quantity = quantity component.is_enabled = True delta = int(component.quantity or 0) - old_enabled_qty if delta: log_inventory_event( event_type="component_save", delta=delta, box=box, component=component, part_no=component.part_no, ) db.session.commit() slot_code = slot_code_for_box(box, slot) return redirect( url_for( "edit_component", box_id=box.id, slot=slot, notice=f"立创导入成功: {mapped['part_no']} 已写入 {slot_code}", ) ) @app.route("/search") def search_page(): keyword = request.args.get("q", "").strip() notice = request.args.get("notice", "").strip() error = request.args.get("error", "").strip() results = [] if keyword: raw_results = ( Component.query.join(Box, Box.id == Component.box_id) .filter( Component.is_enabled.is_(True), db.or_( Component.part_no.ilike(f"%{keyword}%"), Component.name.ilike(f"%{keyword}%"), ), ) .order_by(Component.part_no.asc(), Component.name.asc()) .all() ) for c in raw_results: box = Box.query.get(c.box_id) results.append( { "component": c, "box_name": box.name if box else f"盒 {c.box_id}", "slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index), "edit_url": url_for("edit_component", box_id=c.box_id, slot=c.slot_index, q=keyword), } ) return render_template( "search.html", keyword=keyword, results=results, notice=notice, error=error, ) @app.route("/ai/restock-plan", methods=["POST"]) def ai_restock_plan(): ai_settings = _get_ai_settings() data = _build_restock_payload( limit=ai_settings["restock_limit"], threshold=ai_settings["restock_threshold"], ) def _empty_plan(summary: str) -> dict: return { "summary": summary, "urgent": [], "this_week": [], "defer": [], } def _normalize_item(raw_item: dict) -> dict: if not isinstance(raw_item, dict): return { "part_no": "-", "name": "未命名元件", "suggest_qty": "待确认", "reason": "AI 返回项格式异常", } return { "part_no": str(raw_item.get("part_no", "-") or "-").strip() or "-", "name": str(raw_item.get("name", "未命名元件") or "未命名元件").strip(), "suggest_qty": str(raw_item.get("suggest_qty", "待确认") or "待确认").strip(), "reason": str(raw_item.get("reason", "") or "").strip() or "无", } def _normalize_plan(raw_plan: dict, default_summary: str) -> dict: if not isinstance(raw_plan, dict): return _empty_plan(default_summary) summary = str(raw_plan.get("summary", "") or "").strip() or default_summary def to_items(key: str): rows = raw_plan.get(key, []) if not isinstance(rows, list): return [] return [_normalize_item(row) for row in rows] return { "summary": summary, "urgent": to_items("urgent"), "this_week": to_items("this_week"), "defer": to_items("defer"), } def _extract_json_block(raw_text: str) -> str: text = (raw_text or "").strip() if not text: return "" if text.startswith("```"): text = re.sub(r"^```(?:json)?\\s*", "", text) text = re.sub(r"\\s*```$", "", text) first = text.find("{") last = text.rfind("}") if first >= 0 and last > first: return text[first : last + 1] return text def _build_rule_based_plan() -> dict: threshold = int(data.get("threshold", LOW_STOCK_THRESHOLD)) urgent = [] this_week = [] for idx, item in enumerate(data.get("low_stock_items", [])): qty = int(item.get("quantity", 0) or 0) suggest_qty = max(threshold * 2 - qty, 1) row = { "part_no": item.get("part_no", "-") or "-", "name": item.get("name", "未命名元件") or "未命名元件", "suggest_qty": str(suggest_qty), "reason": f"当前库存 {qty},低于阈值 {threshold}", } if idx < 5: urgent.append(row) else: this_week.append(row) return { "summary": "已按规则生成兜底补货建议(AI 输出异常时使用)", "urgent": urgent, "this_week": this_week, "defer": [], } if not data["low_stock_items"]: plan = _empty_plan("当前没有低库存元件,暂不需要补货。") return { "ok": True, "suggestion": "当前没有低库存元件,暂不需要补货。", "plan": plan, "data": data, } system_prompt = ( "你是电子元器件库存助手。" "必须只输出 JSON,不要 Markdown,不要解释文字。" "输出结构必须是: " "{\"summary\":string,\"urgent\":[item],\"this_week\":[item],\"defer\":[item]}。" "item 结构: {\"part_no\":string,\"name\":string,\"suggest_qty\":string,\"reason\":string}。" "各数组允许为空。" ) user_prompt = "库存数据如下(JSON):\n" + json.dumps(data, ensure_ascii=False) try: suggestion = _call_siliconflow_chat( system_prompt, user_prompt, api_url=ai_settings["api_url"], model=ai_settings["model"], api_key=ai_settings["api_key"], timeout=ai_settings["timeout"], ) except RuntimeError as exc: fallback_plan = _build_rule_based_plan() return { "ok": False, "message": str(exc), "plan": fallback_plan, "data": data, }, 400 parse_warning = "" try: parsed_plan = json.loads(_extract_json_block(suggestion)) plan = _normalize_plan(parsed_plan, "已生成 AI 补货建议") except json.JSONDecodeError: plan = _build_rule_based_plan() parse_warning = "AI 返回格式异常,已切换到规则兜底建议。" return { "ok": True, "suggestion": suggestion, "plan": plan, "parse_warning": parse_warning, "data": data, } @app.route("/ai/settings", methods=["GET", "POST"]) def ai_settings_page(): settings = _get_ai_settings() error = "" notice = request.args.get("notice", "").strip() if request.method == "POST": api_url = request.form.get("api_url", "").strip() model = request.form.get("model", "").strip() api_key = request.form.get("api_key", "").strip() lcsc_app_id = request.form.get("lcsc_app_id", "").strip() lcsc_access_key = request.form.get("lcsc_access_key", "").strip() lcsc_secret_key = request.form.get("lcsc_secret_key", "").strip() lock_storage_mode = _is_truthy_form_value(request.form.get("lock_storage_mode", "")) try: timeout = int((request.form.get("timeout", "30") or "30").strip()) if timeout < 5: raise ValueError except ValueError: error = "超时时间必须是大于等于 5 的整数" timeout = settings["timeout"] try: restock_threshold = int((request.form.get("restock_threshold", "5") or "5").strip()) if restock_threshold < 0: raise ValueError except ValueError: if not error: error = "低库存阈值必须是大于等于 0 的整数" restock_threshold = settings["restock_threshold"] try: restock_limit = int((request.form.get("restock_limit", "24") or "24").strip()) if restock_limit < 1: raise ValueError except ValueError: if not error: error = "补货条目数必须是大于等于 1 的整数" restock_limit = settings["restock_limit"] try: lcsc_timeout = int((request.form.get("lcsc_timeout", "20") or "20").strip()) if lcsc_timeout < 5: raise ValueError except ValueError: if not error: error = "立创接口超时时间必须是大于等于 5 的整数" lcsc_timeout = settings["lcsc_timeout"] if not api_url and not error: error = "API URL 不能为空" if not model and not error: error = "模型名称不能为空" if (not lcsc_app_id or not lcsc_access_key or not lcsc_secret_key) and not error: error = "立创接口需要填写 app_id、access_key、secret_key" if not error: settings = { "api_url": api_url, "model": model, "api_key": api_key, "timeout": timeout, "restock_threshold": restock_threshold, "restock_limit": restock_limit, "lcsc_base_url": LCSC_BASE_URL, "lcsc_basic_path": LCSC_BASIC_PATH, "lcsc_timeout": lcsc_timeout, "lcsc_app_id": lcsc_app_id, "lcsc_access_key": lcsc_access_key, "lcsc_secret_key": lcsc_secret_key, "lock_storage_mode": lock_storage_mode, } _save_ai_settings(settings) return redirect(url_for("ai_settings_page", notice="AI参数已保存")) settings.update( { "api_url": api_url, "model": model, "api_key": api_key, "timeout": timeout, "restock_threshold": restock_threshold, "restock_limit": restock_limit, "lcsc_base_url": LCSC_BASE_URL, "lcsc_basic_path": LCSC_BASIC_PATH, "lcsc_timeout": lcsc_timeout, "lcsc_app_id": lcsc_app_id, "lcsc_access_key": lcsc_access_key, "lcsc_secret_key": lcsc_secret_key, "lock_storage_mode": lock_storage_mode, } ) return render_template( "ai_settings.html", settings=settings, notice=notice, error=error, ) @app.route("/component//outbound", methods=["POST"]) def quick_outbound(component_id: int): component = Component.query.get_or_404(component_id) keyword = request.form.get("q", "").strip() try: amount = _parse_non_negative_int(request.form.get("amount", "0"), 0) except ValueError: return redirect(url_for("search_page", q=keyword, error="出库数量必须是大于等于 0 的整数")) if amount <= 0: return redirect(url_for("search_page", q=keyword, error="出库数量必须大于 0")) if not component.is_enabled: return redirect(url_for("search_page", q=keyword, error="该元件已停用,不能出库")) if amount > int(component.quantity or 0): return redirect(url_for("search_page", q=keyword, error="出库数量超过当前库存")) component.quantity = int(component.quantity or 0) - amount box = Box.query.get(component.box_id) log_inventory_event( event_type="component_outbound", delta=-amount, box=box, component=component, part_no=component.part_no, ) db.session.commit() slot_code = slot_code_for_box(box, component.slot_index) if box else str(component.slot_index) notice = f"出库成功: {component.part_no} -{amount}({slot_code})" return redirect(url_for("search_page", q=keyword, notice=notice)) @app.route("/stats") def stats_page(): days = parse_days_value(request.args.get("days", "7")) box_type_filter = parse_box_type_filter(request.args.get("box_type", "all")) notice = request.args.get("notice", "").strip() boxes = Box.query.all() box_by_id = {box.id: box for box in boxes} components = Component.query.all() all_enabled_components = [c for c in components if c.is_enabled] overall_total_quantity = sum(c.quantity for c in all_enabled_components) enabled_components = [ c for c in components if c.is_enabled and ( box_type_filter == "all" or (box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type_filter) ) ] total_quantity = sum(c.quantity for c in enabled_components) total_items = len(enabled_components) low_stock_count = len([c for c in enabled_components if c.quantity < LOW_STOCK_THRESHOLD]) inventory_share = ( round(total_quantity * 100 / overall_total_quantity, 1) if overall_total_quantity > 0 else 0.0 ) start_day = datetime.now().date() - timedelta(days=days - 1) event_query = InventoryEvent.query.filter( InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time()) ) if box_type_filter != "all": event_query = event_query.filter_by(box_type=box_type_filter) period_operation_count = event_query.count() active_days = len( { _to_date(row[0]) for row in event_query.with_entities(db.func.date(InventoryEvent.created_at)).all() if row and row[0] } ) totals_by_type = {} for box_type in BOX_TYPES.keys(): totals_by_type[box_type] = sum( c.quantity for c in components if c.is_enabled and box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type ) category_stats = [] for key, meta in BOX_TYPES.items(): category_components = [ c for c in enabled_components if box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == key ] category_stats.append( { "key": key, "label": meta["label"], "item_count": len(category_components), "quantity": sum(c.quantity for c in category_components), } ) category_stats.sort(key=lambda row: row["quantity"], reverse=True) max_category_quantity = max([row["quantity"] for row in category_stats], default=0) chart_mode = "category" chart_title = "分类占比" chart_rows = category_stats max_chart_quantity = max_category_quantity if box_type_filter != "all": chart_mode = "component" chart_title = "分类内元件库存 Top" bucket = {} for c in enabled_components: key = (c.part_no or "", c.name or "") if key not in bucket: bucket[key] = 0 bucket[key] += int(c.quantity or 0) component_rows = [] for (part_no, name), quantity in bucket.items(): label = name or part_no or "未命名元件" if part_no: label = f"{label} ({part_no})" component_rows.append({"label": label, "quantity": quantity, "item_count": 1}) component_rows.sort(key=lambda row: row["quantity"], reverse=True) chart_rows = component_rows[:8] max_chart_quantity = max([row["quantity"] for row in chart_rows], default=0) trend_points = build_trend_points_from_events( days=days, total_quantity=total_quantity, box_type_filter=box_type_filter, ) period_net_change = 0 if len(trend_points) >= 2: period_net_change = trend_points[-1]["value"] - trend_points[0]["value"] min_value = min([point["value"] for point in trend_points], default=0) max_value = max([point["value"] for point in trend_points], default=0) value_span = max(max_value - min_value, 1) svg_points = [] if trend_points: width = 520 height = 180 step_x = width / max(len(trend_points) - 1, 1) for idx, point in enumerate(trend_points): x = idx * step_x y = height - ((point["value"] - min_value) / value_span) * height svg_points.append(f"{x:.2f},{y:.2f}") box_type_series_raw = build_box_type_trend_series(days=days, totals_by_type=totals_by_type) box_type_series = [] for key, meta in BOX_TYPES.items(): values = box_type_series_raw["series"].get(key, []) delta = values[-1] - values[0] if len(values) >= 2 else 0 box_type_series.append( { "key": key, "label": meta["label"], "sparkline": make_sparkline(values), "latest": values[-1] if values else 0, "delta": delta, } ) activity_rows = recent_events(limit=20, box_type_filter=box_type_filter) return render_template( "stats.html", notice=notice, days=days, box_type_filter=box_type_filter, box_types=BOX_TYPES, total_quantity=total_quantity, total_items=total_items, low_stock_count=low_stock_count, category_stats=category_stats, max_category_quantity=max_category_quantity, chart_mode=chart_mode, chart_title=chart_title, chart_rows=chart_rows, max_chart_quantity=max_chart_quantity, trend_points=trend_points, trend_polyline=" ".join(svg_points), min_value=min_value, max_value=max_value, period_net_change=period_net_change, overall_total_quantity=overall_total_quantity, inventory_share=inventory_share, period_operation_count=period_operation_count, active_days=active_days, box_type_series=box_type_series, activity_rows=activity_rows, ) @app.route("/stats/export") def stats_export_csv(): days = parse_days_value(request.args.get("days", "7")) box_type_filter = parse_box_type_filter(request.args.get("box_type", "all")) boxes = Box.query.all() box_by_id = {box.id: box for box in boxes} components = Component.query.all() enabled_components = [ c for c in components if c.is_enabled and ( box_type_filter == "all" or (box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type_filter) ) ] total_quantity = sum(c.quantity for c in enabled_components) trend_points = build_trend_points_from_events(days, total_quantity, box_type_filter) delta_by_day = query_event_daily_delta(days, box_type_filter) output = StringIO() writer = csv.writer(output) writer.writerow(["date", "inventory_total", "daily_delta", "days", "box_type_filter"]) for point in trend_points: writer.writerow( [ point["date"].isoformat(), point["value"], int(delta_by_day.get(point["date"], 0)), days, box_type_filter, ] ) csv_content = output.getvalue() output.close() filename = f"inventory_stats_{box_type_filter}_{days}d.csv" return Response( csv_content, mimetype="text/csv; charset=utf-8", headers={"Content-Disposition": f"attachment; filename={filename}"}, ) @app.route("/stats/clear", methods=["POST"]) def clear_stats_logs(): days = parse_days_value(request.form.get("days", "7")) box_type_filter = parse_box_type_filter(request.form.get("box_type", "all")) clear_scope = (request.form.get("scope", "current") or "current").strip() if clear_scope == "all": deleted_count = InventoryEvent.query.delete() db.session.commit() notice = f"已清除全部统计日志,共 {deleted_count} 条" return redirect(url_for("stats_page", days=days, box_type="all", notice=notice)) query = InventoryEvent.query if box_type_filter != "all": query = query.filter_by(box_type=box_type_filter) deleted_count = query.delete(synchronize_session=False) db.session.commit() if box_type_filter == "all": notice = f"已清除当前筛选(全部分类)统计日志,共 {deleted_count} 条" else: label = BOX_TYPES.get(box_type_filter, {}).get("label", box_type_filter) notice = f"已清除当前筛选({label})统计日志,共 {deleted_count} 条" return redirect(url_for("stats_page", days=days, box_type=box_type_filter, notice=notice)) def bootstrap() -> None: with app.app_context(): db.create_all() ensure_schema() normalize_legacy_data() bootstrap() if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)