Files
inventory/app.py

4612 lines
160 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""库存管理 Flask 应用主文件。
中文说明:
1. 这个文件同时承担了配置加载、数据库模型、数据修复、页面路由、统计计算、AI 补货建议等职责。
2. 为了便于你后续阅读和维护,关键函数上方会保留中文解释,说明“这个函数做什么”以及“为什么这样做”。
3. 这些中文解释属于代码可读性的一部分,不应在后续维护中随意删除。
"""
import os
import re
import csv
import json
import hmac
import difflib
import base64
import random
import string
import hashlib
import time
import urllib.error
import urllib.parse
import urllib.request
from copy import deepcopy
from io import StringIO
from datetime import datetime, timedelta
from flask import Flask, Response, redirect, render_template, request, url_for
from flask_sqlalchemy import SQLAlchemy
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
DB_DIR = os.path.join(BASE_DIR, "data")
os.makedirs(DB_DIR, exist_ok=True)
DB_PATH = os.path.join(DB_DIR, "inventory.db")
# Flask 和 SQLAlchemy 基础初始化。
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{DB_PATH}"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
# 这里集中放全局常量,避免后面函数里散落硬编码。
LOW_STOCK_THRESHOLD = 5
BOX_TYPES_OVERRIDE_PATH = os.path.join(DB_DIR, "box_types.json")
AI_SETTINGS_PATH = os.path.join(DB_DIR, "ai_settings.json")
LCSC_BASE_URL = "https://open-api.jlc.com"
LCSC_BASIC_PATH = "/lcsc/openapi/sku/product/basic"
AI_SETTINGS_DEFAULT = {
"api_url": os.environ.get(
"SILICONFLOW_API_URL",
"https://api.siliconflow.cn/v1/chat/completions",
),
"model": os.environ.get(
"SILICONFLOW_MODEL",
"Qwen/Qwen2.5-7B-Instruct",
),
"api_key": os.environ.get("SILICONFLOW_API_KEY", ""),
"timeout": int(os.environ.get("SILICONFLOW_TIMEOUT", "30") or "30"),
"restock_threshold": LOW_STOCK_THRESHOLD,
"restock_limit": 24,
"lcsc_timeout": int(os.environ.get("LCSC_TIMEOUT", "20") or "20"),
"lcsc_app_id": os.environ.get("LCSC_APP_ID", ""),
"lcsc_access_key": os.environ.get("LCSC_ACCESS_KEY", ""),
"lcsc_secret_key": os.environ.get("LCSC_SECRET_KEY", ""),
"lock_storage_mode": False,
}
DEFAULT_BOX_TYPES = {
"small_28": {
"label": "28格小盒大盒",
"default_capacity": 28,
"default_desc": "4连排小盒常见摆放为竖向7排",
"default_prefix": "A",
},
"medium_14": {
"label": "14格中盒大盒",
"default_capacity": 14,
"default_desc": "14格中盒内部摆放方向与28格不同",
"default_prefix": "B",
},
"custom": {
"label": "自定义容器",
"default_capacity": 20,
"default_desc": "可按实际盒型设置格数与编号前缀",
"default_prefix": "C",
},
"bag": {
"label": "袋装清单",
"default_capacity": 28,
"default_desc": "一袋一种器件,按清单管理",
"default_prefix": "BAG",
},
}
BOX_TYPES = deepcopy(DEFAULT_BOX_TYPES)
SEARCH_GENERIC_TERMS = {
"元件",
"器件",
"相关",
"相关器件",
"型号",
"物料",
"库存",
"电子",
}
SEARCH_NOTE_HINT_TERMS = {
"常用",
"项目",
"样品",
"替代",
"调试",
"电源",
"测试",
"备件",
}
COMPONENT_CATEGORY_HINTS = [
("电阻", ["电阻", "resistor", "res"]),
("电容", ["电容", "capacitor", "cap"]),
("电感", ["电感", "inductor"]),
("稳压", ["稳压", "ldo", "regulator", "dc-dc", "dcdc"]),
("二极管", ["二极管", "diode", "tvs", "esd"]),
("三极管", ["三极管", "transistor", "mos", "mosfet", "bjt"]),
("接口", ["usb", "type-c", "uart", "rs485", "i2c", "spi", "can"]),
("MCU", ["mcu", "stm32", "esp32", "avr", "单片机"]),
("存储", ["eeprom", "flash", "存储"]),
("晶振", ["晶振", "oscillator", "crystal"]),
("连接器", ["连接器", "connector", "header", "socket"]),
("传感器", ["sensor", "传感器"]),
("驱动", ["driver", "驱动"]),
]
SEARCH_FIELD_WEIGHTS = {
"part_no": 6,
"name": 5,
"specification": 4,
"note": 3,
}
SEARCH_FUZZY_PROFILES = {
"strict": {
"label": "严格",
"field_hit": 0.8,
"combined_hit": 0.78,
"soft_hit": 0.66,
"keyword_hit": 0.74,
"keyword_soft": 0.62,
"score_gate": 6.0,
"coverage_gate": 0.48,
"high_fuzzy_gate": 0.9,
},
"balanced": {
"label": "平衡",
"field_hit": 0.75,
"combined_hit": 0.72,
"soft_hit": 0.6,
"keyword_hit": 0.7,
"keyword_soft": 0.58,
"score_gate": 4.5,
"coverage_gate": 0.35,
"high_fuzzy_gate": 0.82,
},
"loose": {
"label": "宽松",
"field_hit": 0.7,
"combined_hit": 0.66,
"soft_hit": 0.54,
"keyword_hit": 0.66,
"keyword_soft": 0.52,
"score_gate": 3.0,
"coverage_gate": 0.22,
"high_fuzzy_gate": 0.74,
},
}
def _apply_box_type_overrides() -> None:
"""加载盒型覆盖配置。
中文说明:默认盒型写在代码里;如果用户在页面上修改了名称、描述、前缀,
会写入 data/box_types.json这里负责把这些覆盖项合并回运行时配置。
"""
if not os.path.exists(BOX_TYPES_OVERRIDE_PATH):
return
try:
with open(BOX_TYPES_OVERRIDE_PATH, "r", encoding="utf-8") as f:
overrides = json.load(f)
except (OSError, json.JSONDecodeError):
return
if not isinstance(overrides, dict):
return
for key, value in overrides.items():
if key not in BOX_TYPES or not isinstance(value, dict):
continue
for field in ("label", "default_desc", "default_prefix"):
if field not in value:
continue
BOX_TYPES[key][field] = value[field]
# Keep bag container capacity fixed by domain rule.
BOX_TYPES["bag"]["default_capacity"] = 28
def _save_box_type_overrides() -> None:
payload = {}
for key, defaults in DEFAULT_BOX_TYPES.items():
current = BOX_TYPES.get(key, defaults)
changed = {}
for field in ("label", "default_desc", "default_prefix"):
if current.get(field) != defaults.get(field):
changed[field] = current.get(field)
if changed:
payload[key] = changed
with open(BOX_TYPES_OVERRIDE_PATH, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
_apply_box_type_overrides()
def _load_ai_settings() -> dict:
settings = dict(AI_SETTINGS_DEFAULT)
if not os.path.exists(AI_SETTINGS_PATH):
return settings
try:
with open(AI_SETTINGS_PATH, "r", encoding="utf-8") as f:
saved = json.load(f)
except (OSError, json.JSONDecodeError):
return settings
if not isinstance(saved, dict):
return settings
for key in settings.keys():
if key in saved:
settings[key] = saved[key]
return settings
def _save_ai_settings(settings: dict) -> None:
with open(AI_SETTINGS_PATH, "w", encoding="utf-8") as f:
json.dump(settings, f, ensure_ascii=False, indent=2)
def _get_ai_settings() -> dict:
"""读取并清洗 AI / 立创接口配置。
中文说明:这个函数不只是“读文件”,还会把超时、阈值、布尔值这些字段
统一修正成安全可用的格式,避免后面的业务逻辑反复判空和转类型。
"""
settings = _load_ai_settings()
try:
settings["timeout"] = max(5, int(settings.get("timeout", 30)))
except (TypeError, ValueError):
settings["timeout"] = 30
try:
settings["restock_threshold"] = max(0, int(settings.get("restock_threshold", LOW_STOCK_THRESHOLD)))
except (TypeError, ValueError):
settings["restock_threshold"] = LOW_STOCK_THRESHOLD
try:
settings["restock_limit"] = max(1, int(settings.get("restock_limit", 24)))
except (TypeError, ValueError):
settings["restock_limit"] = 24
try:
settings["lcsc_timeout"] = max(5, int(settings.get("lcsc_timeout", 20)))
except (TypeError, ValueError):
settings["lcsc_timeout"] = 20
settings["api_url"] = (settings.get("api_url") or "").strip()
settings["model"] = (settings.get("model") or "").strip()
settings["api_key"] = (settings.get("api_key") or "").strip()
settings["lcsc_base_url"] = LCSC_BASE_URL
settings["lcsc_basic_path"] = LCSC_BASIC_PATH
settings["lcsc_app_id"] = (settings.get("lcsc_app_id") or "").strip()
settings["lcsc_access_key"] = (settings.get("lcsc_access_key") or "").strip()
settings["lcsc_secret_key"] = (settings.get("lcsc_secret_key") or "").strip()
settings["lock_storage_mode"] = bool(settings.get("lock_storage_mode", False))
return settings
def _generate_nonce(length: int = 32) -> str:
alphabet = string.ascii_letters + string.digits
return "".join(random.choice(alphabet) for _ in range(length))
def _generate_jop_signature(method: str, path: str, timestamp: str, nonce: str, post_data: str, secret_key: str) -> str:
string_to_sign = f"{method}\n{path}\n{timestamp}\n{nonce}\n{post_data}\n"
digest = hmac.new(
secret_key.encode("utf-8"),
string_to_sign.encode("utf-8"),
hashlib.sha256,
).digest()
return base64.b64encode(digest).decode("utf-8")
def _extract_lcsc_product_id_from_input(raw_identifier: str) -> int | None:
text = (raw_identifier or "").strip()
if not text:
return None
# Accept full item detail URL and extract /23913.html.
try:
parsed = urllib.parse.urlparse(text)
except ValueError:
parsed = None
if parsed and parsed.netloc:
path = parsed.path or ""
m = re.search(r"/(\d+)\.html$", path)
if m:
return int(m.group(1))
return None
def _fetch_lcsc_product_basic(product_identifier: str, settings: dict) -> dict:
"""调用立创开放接口,按商品详情页链接读取基础资料。
中文说明:当前业务只接受“商品详情页链接”,先从链接里提取 productId
再按 JOP 鉴权规则签名请求接口,最后返回接口中的第一条商品记录。
"""
raw_identifier = (product_identifier or "").strip()
if not raw_identifier:
raise RuntimeError("立创商品链接不能为空")
product_id_from_input = _extract_lcsc_product_id_from_input(raw_identifier)
if product_id_from_input is None:
raise RuntimeError("请输入立创商品详情页链接,例如 https://item.szlcsc.com/23913.html")
app_id = settings.get("lcsc_app_id", "").strip()
access_key = settings.get("lcsc_access_key", "").strip()
secret_key = settings.get("lcsc_secret_key", "").strip()
if not app_id or not access_key or not secret_key:
raise RuntimeError("立创 JOP 鉴权参数不完整,请填写 app_id/access_key/secret_key")
timeout = int(settings.get("lcsc_timeout", 20))
def request_openapi(payload: dict) -> dict:
post_data_str = json.dumps(payload, ensure_ascii=False)
timestamp = str(int(time.time()))
nonce = _generate_nonce(32)
signature = _generate_jop_signature(
"POST",
LCSC_BASIC_PATH,
timestamp,
nonce,
post_data_str,
secret_key,
)
headers = {
"Content-Type": "application/json; utf-8",
"Authorization": (
f'JOP appid="{app_id}",accesskey="{access_key}",'
f'timestamp="{timestamp}",nonce="{nonce}",signature="{signature}"'
),
}
endpoint = LCSC_BASE_URL + LCSC_BASIC_PATH
req = urllib.request.Request(
endpoint,
data=post_data_str.encode("utf-8"),
method="POST",
headers=headers,
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"立创接口调用失败: HTTP {exc.code} {detail[:180]}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"立创接口调用失败: 连接失败 {exc.reason}") from exc
try:
response = json.loads(raw)
except json.JSONDecodeError as exc:
raise RuntimeError("立创接口返回非 JSON 数据") from exc
code = response.get("code")
successful = response.get("successful", False)
if code != 200 and not successful:
message = str(response.get("message", "立创接口调用失败") or "立创接口调用失败")
raise RuntimeError(f"立创接口调用失败: code={code}, message={message}")
data = response.get("data") or {}
items = data.get("productBasicInfoVOList") or []
if not items:
raise RuntimeError("未查询到商品信息,请检查编号是否正确")
return items[0]
return request_openapi({"productId": product_id_from_input})
def _map_lcsc_product_to_component(product: dict) -> dict:
"""把立创商品字段映射为系统内部元件字段。
中文说明:接口原始字段很多,这里只提取库存系统真正会用到的料号、名称、
规格和备注,尽量保持结果简洁且便于搜索。
"""
product_model = str(product.get("productModel") or "").strip()
product_code = str(product.get("productCode") or "").strip()
product_id = product.get("productId")
part_no = product_model or product_code or (str(product_id) if product_id is not None else "")
name = str(product.get("productName") or "").strip() or part_no or "未命名元件"
brand_name = str(product.get("brandName") or "").strip()
encap_standard = str(product.get("encapStandard") or "").strip()
catalog_name = str(product.get("catalogName") or "").strip()
# Prefer concise, searchable spec fields.
spec_parts = [brand_name, encap_standard, catalog_name]
specification = " / ".join([p for p in spec_parts if p])
arrange_map = {
"biandai": "编带",
"bianpai": "编排",
"daizhuang": "袋装",
"guanzhuang": "管装",
"hezhuang": "盒装",
"juan": "卷装",
"kun": "捆装",
"tuopan": "托盘",
"xiangzhuang": "箱装",
}
unit_map = {
"pan": "圆盘",
"bao": "",
"ben": "",
"dai": "",
"guan": "",
"he": "",
"juan": "",
"kun": "",
"mi": "",
"tuopan": "托盘",
"xiang": "",
}
product_arrange_raw = str(product.get("productArrange") or "").strip().lower()
product_arrange = arrange_map.get(product_arrange_raw, product_arrange_raw)
min_packet_number = product.get("minPacketNumber")
min_packet_unit_raw = str(product.get("minPacketUnit") or "").strip().lower()
min_packet_unit = unit_map.get(min_packet_unit_raw, min_packet_unit_raw)
note_bits = []
if product_code:
note_bits.append(f"LCSC {product_code}")
if product_id is not None:
note_bits.append(f"ID {product_id}")
if product_arrange:
note_bits.append(f"编排 {product_arrange}")
if min_packet_number:
unit_suffix = min_packet_unit or ""
note_bits.append(f"最小包装 {min_packet_number}{unit_suffix}")
note = " | ".join(note_bits)
return {
"part_no": part_no,
"name": name,
"specification": specification,
"note": note,
}
# Box 表示一个容器/盒子Component 表示盒内某个位置上的元件。
class Box(db.Model):
__tablename__ = "boxes"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False, unique=True)
description = db.Column(db.String(255), nullable=True)
box_type = db.Column(db.String(30), nullable=False, default="small_28")
slot_capacity = db.Column(db.Integer, nullable=False, default=28)
slot_prefix = db.Column(db.String(16), nullable=False, default="A")
start_number = db.Column(db.Integer, nullable=False, default=1)
class Component(db.Model):
__tablename__ = "components"
id = db.Column(db.Integer, primary_key=True)
box_id = db.Column(db.Integer, db.ForeignKey("boxes.id"), nullable=False)
slot_index = db.Column(db.Integer, nullable=False)
part_no = db.Column(db.String(100), nullable=False)
name = db.Column(db.String(120), nullable=False)
specification = db.Column(db.String(120), nullable=True)
quantity = db.Column(db.Integer, nullable=False, default=0)
location = db.Column(db.String(120), nullable=True)
note = db.Column(db.Text, nullable=True)
is_enabled = db.Column(db.Boolean, nullable=False, default=True)
box = db.relationship("Box", backref=db.backref("components", lazy=True))
__table_args__ = (
db.UniqueConstraint("box_id", "slot_index", name="uq_box_slot"),
)
class InventoryEvent(db.Model):
__tablename__ = "inventory_events"
id = db.Column(db.Integer, primary_key=True)
box_id = db.Column(db.Integer, nullable=True)
box_type = db.Column(db.String(30), nullable=True)
component_id = db.Column(db.Integer, nullable=True)
part_no = db.Column(db.String(100), nullable=True)
event_type = db.Column(db.String(30), nullable=False)
delta = db.Column(db.Integer, nullable=False, default=0)
created_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now())
def _add_column_if_missing(table_name: str, column_name: str, ddl: str) -> None:
columns = {
row[1]
for row in db.session.execute(db.text(f"PRAGMA table_info({table_name})")).fetchall()
}
if column_name not in columns:
db.session.execute(db.text(f"ALTER TABLE {table_name} ADD COLUMN {ddl}"))
def ensure_schema() -> None:
"""对旧数据库做最小增量迁移。
中文说明:项目早期数据库可能缺少新字段,这里用“缺什么补什么”的方式补列,
这样升级时不会强制删库重建。
"""
_add_column_if_missing(
"boxes",
"box_type",
"box_type VARCHAR(30) NOT NULL DEFAULT 'small_28'",
)
_add_column_if_missing(
"boxes",
"slot_capacity",
"slot_capacity INTEGER NOT NULL DEFAULT 28",
)
_add_column_if_missing(
"boxes",
"slot_prefix",
"slot_prefix VARCHAR(16) NOT NULL DEFAULT 'A'",
)
_add_column_if_missing(
"boxes",
"start_number",
"start_number INTEGER NOT NULL DEFAULT 1",
)
_add_column_if_missing(
"components",
"is_enabled",
"is_enabled BOOLEAN NOT NULL DEFAULT 1",
)
db.session.commit()
def slot_code_for_box(box: Box, slot_index: int) -> str:
serial = box.start_number + slot_index - 1
return f"{box.slot_prefix}{serial}"
def slot_range_label(box: Box) -> str:
start_code = slot_code_for_box(box, 1)
end_code = slot_code_for_box(box, box.slot_capacity)
return f"{start_code}-{end_code}"
def _find_enabled_part_no_conflict(part_no: str, exclude_component_id: int = None):
normalized = (part_no or "").strip()
if not normalized:
return None
query = Component.query.filter(
Component.part_no == normalized,
Component.is_enabled.is_(True),
)
if exclude_component_id is not None:
query = query.filter(Component.id != exclude_component_id)
return query.order_by(Component.box_id.asc(), Component.slot_index.asc()).first()
def _normalize_material_text(text: str) -> str:
raw = (text or "").upper().strip()
if not raw:
return ""
raw = raw.replace("", "(").replace("", ")")
raw = re.sub(r"[\s\-_/,|;:,。;:]+", "", raw)
raw = re.sub(r"[()\[\]{}]", "", raw)
return raw
def _material_identity_key(name: str, specification: str) -> str:
name_key = _normalize_material_text(name)
spec_key = _normalize_material_text(specification)
if not name_key and not spec_key:
return ""
return f"{name_key}|{spec_key}"
def _find_enabled_material_conflict(
name: str,
specification: str,
exclude_component_id: int = None,
exclude_part_no: str = "",
):
"""查找“同名 + 同规格”的启用中物料冲突。
中文说明:有些元件料号不同,但实际是同一种物料,所以不能只按 part_no 去重;
这里会把名称和规格做标准化后比对,避免重复建库存位。
"""
target_key = _material_identity_key(name, specification)
if not target_key:
return None
query = Component.query.filter(Component.is_enabled.is_(True))
if exclude_component_id is not None:
query = query.filter(Component.id != exclude_component_id)
normalized_exclude_part_no = (exclude_part_no or "").strip()
for candidate in query.order_by(Component.box_id.asc(), Component.slot_index.asc()).all():
if normalized_exclude_part_no and (candidate.part_no or "").strip() == normalized_exclude_part_no:
continue
if _material_identity_key(candidate.name, candidate.specification) == target_key:
return candidate
return None
def _is_truthy_form_value(value: str) -> bool:
return (value or "").strip().lower() in {"1", "true", "yes", "on"}
def _append_merge_part_no_note(note: str, part_no: str) -> str:
normalized = (part_no or "").strip()
if not normalized:
return note or ""
line = f"合并料号: {normalized}"
current = note or ""
if line in current:
return current
return f"{current}\n{line}".strip()
def _is_slot_part_replacement(component: Component, new_part_no: str) -> bool:
if component is None:
return False
old_part_no = (component.part_no or "").strip()
target_part_no = (new_part_no or "").strip()
return bool(old_part_no and target_part_no and old_part_no != target_part_no)
def _merge_into_existing_component(
target: Component,
incoming_part_no: str,
incoming_name: str,
incoming_specification: str,
incoming_note: str,
incoming_quantity: int,
source_component: Component = None,
source_box: Box = None,
) -> None:
"""把一个待保存/待导入的元件合并进已有元件记录。
中文说明:合并时会做三件事:
1. 把名称、规格、备注等信息补到目标记录上。
2. 把数量累加,并记录库存事件。
3. 如果来源位置本来就有旧记录,则删除旧记录,避免同一物料保留两份。
"""
old_target_enabled_qty = int(target.quantity or 0) if target.is_enabled else 0
target.name = incoming_name or target.name
if incoming_specification:
target.specification = incoming_specification
if incoming_note:
target.note = incoming_note
if incoming_part_no and incoming_part_no != target.part_no:
target.note = _append_merge_part_no_note(target.note, incoming_part_no)
target.quantity = int(target.quantity or 0) + int(incoming_quantity or 0)
target.is_enabled = True
target_delta = int(target.quantity or 0) - old_target_enabled_qty
if target_delta:
log_inventory_event(
event_type="component_merge_confirmed",
delta=target_delta,
box=target.box,
component=target,
part_no=target.part_no,
)
if source_component is not None and source_component.id != target.id:
if source_component.is_enabled and source_component.quantity:
log_inventory_event(
event_type="component_merge_cleanup",
delta=-int(source_component.quantity),
box=source_box,
component=source_component,
part_no=source_component.part_no,
)
db.session.delete(source_component)
def _format_component_position(component: Component) -> str:
target_box = Box.query.get(component.box_id)
if not target_box:
return f"盒子#{component.box_id} 位置#{component.slot_index}"
return f"{target_box.name} {slot_code_for_box(target_box, component.slot_index)}"
def compose_box_name(base_name: str, prefix: str, start_number: int, slot_capacity: int) -> str:
base = (base_name or "").strip()
if not base:
base = "盒子"
end_number = start_number + slot_capacity - 1
return f"{base} {prefix}{start_number}-{prefix}{end_number}"
def make_unique_box_name(candidate_name: str, exclude_box_id: int = None) -> str:
name = candidate_name
counter = 2
while True:
query = Box.query.filter_by(name=name)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
if not query.first():
return name
name = f"{candidate_name} #{counter}"
counter += 1
def infer_base_name(box: Box) -> str:
pattern = rf"\s+{re.escape(box.slot_prefix)}\d+-{re.escape(box.slot_prefix)}\d+(?:\s+#\d+)?$"
base = re.sub(pattern, "", box.name).strip()
return base or box.name
def _extract_lcsc_code_from_text(text: str) -> str:
raw = (text or "").upper()
match = re.search(r"\bC\d{3,}\b", raw)
return match.group(0) if match else ""
def _parse_slot_spec_fields(specification: str) -> dict:
parts = [p.strip() for p in (specification or "").split("/") if p.strip()]
return {
"brand": parts[0] if len(parts) >= 1 else "",
"package": parts[1] if len(parts) >= 2 else "",
"usage": parts[2] if len(parts) >= 3 else "",
}
def _parse_note_detail_fields(note: str) -> dict:
raw = (note or "")
lcsc_code = _extract_lcsc_code_from_text(raw)
product_id = ""
id_match = re.search(r"\b(?:ID|productId)\s*(\d+)\b", raw, flags=re.IGNORECASE)
if id_match:
product_id = id_match.group(1)
arrange = ""
arrange_match = re.search(r"编排\s*([^|]+)", raw)
if arrange_match:
arrange = arrange_match.group(1).strip()
min_pack = ""
min_pack_match = re.search(r"最小包装\s*([^|]+)", raw)
if min_pack_match:
min_pack = min_pack_match.group(1).strip()
return {
"lcsc_code": lcsc_code,
"product_id": product_id,
"arrange": arrange,
"min_pack": min_pack,
}
def slot_data_for_box(box: Box):
components = Component.query.filter_by(box_id=box.id).all()
slot_map = {c.slot_index: c for c in components}
slots = []
for slot in range(1, box.slot_capacity + 1):
component = slot_map.get(slot)
lcsc_code = _extract_lcsc_code_from_text(component.note if component else "")
if not lcsc_code and component:
lcsc_code = _extract_lcsc_code_from_text(component.part_no)
spec_fields = _parse_slot_spec_fields(component.specification) if component else {
"brand": "",
"package": "",
"usage": "",
}
slots.append(
{
"slot": slot,
"slot_code": slot_code_for_box(box, slot),
"component": component,
"lcsc_code": lcsc_code,
"spec_fields": spec_fields,
}
)
return slots
def bag_rows_for_box(box: Box):
rows = []
components = (
Component.query.filter_by(box_id=box.id)
.order_by(Component.slot_index.asc())
.all()
)
for c in components:
rows.append({"component": c, "slot_code": slot_code_for_box(box, c.slot_index)})
return rows
def _parse_non_negative_int(raw_value: str, default_value: int = 0) -> int:
raw = (raw_value or "").strip()
if raw == "":
return default_value
value = int(raw)
if value < 0:
raise ValueError
return value
def normalize_legacy_data() -> None:
"""修复历史数据,使旧数据满足当前业务规则。
中文说明:这个函数主要解决历史版本遗留问题,例如空 box_type、空前缀、
旧数据没有 is_enabled 字段,以及确保“袋装清单”这个固定容器一定存在。
"""
db.session.execute(
db.text(
"UPDATE boxes SET box_type = 'small_28' WHERE box_type IS NULL OR box_type = ''"
)
)
db.session.execute(
db.text("UPDATE boxes SET slot_capacity = 28 WHERE slot_capacity IS NULL")
)
db.session.execute(
db.text("UPDATE boxes SET slot_prefix = 'A' WHERE slot_prefix IS NULL OR slot_prefix = ''")
)
db.session.execute(
db.text("UPDATE boxes SET start_number = 1 WHERE start_number IS NULL")
)
db.session.execute(
db.text("UPDATE components SET is_enabled = 1 WHERE is_enabled IS NULL")
)
for box in Box.query.all():
if box.box_type not in BOX_TYPES:
box.box_type = "small_28"
if not box.slot_capacity or box.slot_capacity < 1:
box.slot_capacity = BOX_TYPES[box.box_type]["default_capacity"]
if not box.slot_prefix:
box.slot_prefix = BOX_TYPES[box.box_type]["default_prefix"]
if box.start_number is None or box.start_number < 0:
box.start_number = 1
if box.box_type == "bag":
box.start_number = 1
box.name = "袋装清单"
if not Box.query.filter_by(box_type="bag").first():
default_meta = BOX_TYPES["bag"]
db.session.add(
Box(
name="袋装清单",
description=default_meta["default_desc"],
box_type="bag",
slot_capacity=default_meta["default_capacity"],
slot_prefix=default_meta["default_prefix"],
start_number=1,
)
)
db.session.commit()
def get_fixed_bag_box() -> Box:
bag_box = Box.query.filter_by(box_type="bag").order_by(Box.id.asc()).first()
if bag_box:
return bag_box
meta = BOX_TYPES["bag"]
bag_box = Box(
name="袋装清单",
description=meta["default_desc"],
box_type="bag",
slot_capacity=meta["default_capacity"],
slot_prefix=meta["default_prefix"],
start_number=1,
)
db.session.add(bag_box)
db.session.commit()
return bag_box
@app.route("/box/<int:box_id>/bag-capacity", methods=["POST"])
def update_bag_capacity(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type != "bag":
return bad_request("当前容器不是袋装清单", box.box_type)
try:
slot_capacity = _parse_non_negative_int(request.form.get("slot_capacity", ""), box.slot_capacity)
except ValueError:
return render_box_page(box, error="袋位数量必须是大于等于 1 的整数")
if slot_capacity < 1:
return render_box_page(box, error="袋位数量必须是大于等于 1 的整数")
max_used_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter_by(box_id=box.id)
.scalar()
or 0
)
if max_used_slot > slot_capacity:
return render_box_page(box, error=f"袋位数量不能小于已使用位置 {max_used_slot}")
box.slot_capacity = slot_capacity
db.session.commit()
return redirect(url_for("view_box", box_id=box.id, notice="袋位数量已更新"))
def make_overview_rows(box: Box):
enabled_components = (
Component.query.filter_by(box_id=box.id, is_enabled=True)
.order_by(Component.slot_index.asc())
.all()
)
rows = []
for c in enabled_components:
rows.append(
{
"slot_code": slot_code_for_box(box, c.slot_index),
"name": c.name,
"part_no": c.part_no,
}
)
return rows
def box_sort_key(box: Box):
return (
(box.slot_prefix or "").upper(),
box.start_number if box.start_number is not None else 0,
box.name or "",
)
def has_range_conflict(
*,
box_type: str,
prefix: str,
start_number: int,
slot_capacity: int,
exclude_box_id: int = None,
):
end_number = start_number + slot_capacity - 1
query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
for other in query.all():
other_start = other.start_number
other_end = other.start_number + other.slot_capacity - 1
# Two ranges overlap unless one is strictly before the other.
if not (end_number < other_start or start_number > other_end):
return True, other
return False, None
def suggest_next_start_number(
*,
box_type: str,
prefix: str,
slot_capacity: int,
exclude_box_id: int = None,
) -> int:
max_end = 0
query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix)
if exclude_box_id is not None:
query = query.filter(Box.id != exclude_box_id)
for other in query.all():
other_end = other.start_number + other.slot_capacity - 1
if other_end > max_end:
max_end = other_end
return max_end + 1 if max_end > 0 else 1
def build_index_anchor(box_type: str = "") -> str:
if box_type in BOX_TYPES:
return f"group-{box_type}"
return ""
def bad_request(message: str, box_type: str = ""):
anchor = build_index_anchor(box_type)
if anchor and box_type in BOX_TYPES:
back_url = url_for("type_page", box_type=box_type)
else:
back_url = request.referrer or url_for("types_page")
return (
render_template(
"error.html",
status_code=400,
title="请求参数有误",
message=message,
back_url=back_url,
),
400,
)
def render_box_page(box: Box, error: str = "", notice: str = ""):
"""统一渲染盒子详情页。
中文说明:很多路由最终都要返回 box.html这里集中准备模板所需的公共数据
避免每个路由重复组织 slots、提示信息和阈值参数。
"""
slots = slot_data_for_box(box)
return render_template(
"box.html",
box=box,
slots=slots,
box_types=BOX_TYPES,
slot_range=slot_range_label(box),
low_stock_threshold=LOW_STOCK_THRESHOLD,
error=error,
notice=notice,
)
def _next_empty_slot_index(box: Box, occupied_slots: set[int]):
for idx in range(1, box.slot_capacity + 1):
if idx not in occupied_slots:
return idx
return None
def _parse_bulk_line(line: str):
parts = [p.strip() for p in re.split(r"[,\t]", line)]
while len(parts) < 5:
parts.append("")
return {
"part_no": parts[0],
"name": parts[1],
"quantity_raw": parts[2],
"specification": parts[3],
"note": parts[4],
}
def _split_inbound_line_fields(line: str) -> tuple[list[str], list[str]]:
"""把原始入库行尽量拆分成 5 列。
中文说明:用户粘贴的数据分隔符不一定统一,这里优先识别逗号/Tab
其次尝试竖线或连续空格,尽量把一行拆成系统可识别的字段。
"""
warnings = []
raw = (line or "").strip()
if not raw:
return ["", "", "", "", ""], warnings
normalized = raw.replace("", ",")
if "," in normalized or "\t" in normalized:
parts = [p.strip() for p in re.split(r"[,\t]", normalized)]
elif "|" in normalized:
parts = [p.strip() for p in normalized.split("|")]
warnings.append("检测到竖线分隔,已自动转换为标准字段")
else:
parts = [p.strip() for p in re.split(r"\s{2,}", normalized) if p.strip()]
warnings.append("未检测到逗号或Tab已按连续空格尝试拆分")
if len(parts) > 5:
parts = parts[:4] + [" | ".join(parts[4:])]
warnings.append("字段超过 5 列,已将多余内容合并到备注")
while len(parts) < 5:
parts.append("")
return parts, warnings
def _format_inbound_line(part_no: str, name: str, quantity: int, specification: str, note: str) -> str:
safe_quantity = int(quantity or 0)
return f"{part_no}, {name}, {safe_quantity}, {specification}, {note}".strip()
def _dedupe_ordered_text(values: list[str]) -> list[str]:
seen = set()
output = []
for value in values:
text = (value or "").strip()
if not text or text in seen:
continue
seen.add(text)
output.append(text)
return output
def _split_text_fragments(value: str) -> list[str]:
raw = (value or "").strip()
if not raw:
return []
parts = [p.strip() for p in re.split(r"[\n,;|]+", raw) if p.strip()]
return _dedupe_ordered_text(parts)
def _normalize_inbound_name(name: str, part_no: str) -> str:
cleaned = _compact_spaces(name).replace("", " ")
cleaned = re.sub(r"\s{2,}", " ", cleaned)
if cleaned:
return cleaned
return (part_no or "").strip()
def _normalize_inbound_specification(specification: str) -> str:
parts = _split_text_fragments(specification)
return " / ".join(parts[:4])
def _normalize_inbound_note(note: str) -> str:
parts = _split_text_fragments(note)
return " | ".join(parts[:8])
def _normalize_inbound_row_style(row: dict) -> dict:
"""统一 AI 入库预处理输出风格。
中文说明:这里会把规格/备注从“逗号碎片”收敛为固定分隔格式,
减少后续导入再解析时的歧义,同时保留搜索关键词。
"""
current = dict(row or {})
part_no = (current.get("part_no") or "").strip()
name = _normalize_inbound_name(current.get("name") or "", part_no)
specification = _normalize_inbound_specification(current.get("specification") or "")
note = _normalize_inbound_note(current.get("note") or "")
warnings = list(current.get("warnings") or [])
raw = (current.get("raw") or "").strip()
quantity = 0
quantity_raw = str(current.get("quantity_raw", "") or "").strip()
try:
quantity = _parse_non_negative_int(str(current.get("quantity", 0) or "0"), 0)
except ValueError:
quantity = 0
if quantity_raw == "":
warnings.append("未检测到数量默认为0")
is_sparse_description = any("单字段描述" in str(msg or "") for msg in warnings)
if part_no.upper().startswith("AUTO-"):
if "待确认厂家型号" not in note:
note = " | ".join([p for p in [note, "待确认厂家型号"] if p])
if is_sparse_description and raw and raw != name and raw not in note:
note = " | ".join([p for p in [note, raw] if p])
current.update(
{
"part_no": part_no,
"name": name,
"quantity": int(quantity),
"specification": specification,
"note": note,
"warnings": _dedupe_ordered_text(warnings),
"errors": [],
}
)
if not part_no:
current["errors"].append("缺少料号")
if not name:
current["errors"].append("缺少名称")
current["is_valid"] = len(current["errors"]) == 0
current["normalized_line"] = _format_inbound_line(part_no, name, quantity, specification, note)
return current
def _guess_part_no_from_free_text(text: str) -> str:
"""从自由文本生成一个可用的临时料号。
中文说明:当用户只有“描述句”而不是规范料号时,先生成 AUTO- 前缀的临时料号,
让这一行能继续进入 AI 预处理和人工确认流程,避免直接报“缺少名称/料号”。
"""
raw = (text or "").strip()
if not raw:
return "AUTO-UNKNOWN"
upper = raw.upper().replace("", "(").replace("", ")")
tokens = re.findall(r"[A-Z]{2,}[A-Z0-9.-]*|\d+(?:\.\d+)?(?:V|A|MA|UA|UF|NF|PF|MHZ|GHZ)?", upper)
useful = [token for token in tokens if len(token) >= 2][:3]
if useful:
base = "-".join(useful)[:26].strip("-_")
if base:
return f"AUTO-{base}"
digest = hashlib.md5(raw.encode("utf-8")).hexdigest()[:8].upper()
return f"AUTO-{digest}"
def _auto_patch_sparse_inbound_fields(part_no: str, name: str, warnings: list[str]) -> tuple[str, str, list[str]]:
patched_part_no = (part_no or "").strip()
patched_name = (name or "").strip()
patched_warnings = list(warnings or [])
# 单字段描述行: 默认把唯一文本视为名称,并生成临时料号。
if not patched_name and patched_part_no:
if re.search(r"[\u4e00-\u9fff]", patched_part_no) or len(patched_part_no.split()) >= 2:
patched_name = patched_part_no
patched_part_no = _guess_part_no_from_free_text(patched_name)
patched_warnings.append("检测到单字段描述,已自动生成临时料号并将描述写入名称")
return patched_part_no, patched_name, _dedupe_ordered_text(patched_warnings)
def _fetch_open_search_context(query: str, timeout: int) -> dict:
"""通过公开搜索接口获取简短检索线索。"""
raw_query = (query or "").strip()
if not raw_query:
return {"query": "", "sources": []}
params = urllib.parse.urlencode(
{
"q": raw_query,
"format": "json",
"no_html": "1",
"skip_disambig": "1",
"kl": "cn-zh",
}
)
endpoint = f"https://api.duckduckgo.com/?{params}"
req = urllib.request.Request(
endpoint,
method="GET",
headers={"User-Agent": "inventory-ai-inbound/1.0"},
)
def classify_source_reliability(url: str, snippet: str) -> dict:
parsed = urllib.parse.urlparse(url or "")
domain = (parsed.netloc or "").lower()
if domain.startswith("www."):
domain = domain[4:]
high_domains = (
"ti.com",
"analog.com",
"st.com",
"nxp.com",
"microchip.com",
"onsemi.com",
"infineon.com",
"renesas.com",
"murata.com",
"tdk.com",
"jlc.com",
"szlcsc.com",
"mouser.com",
"digikey.com",
"arrow.com",
"alldatasheet",
"datasheet",
)
medium_domains = (
"wikipedia.org",
"baike.baidu.com",
"elecfans.com",
"eefocus.com",
"51hei.com",
"cnblogs.com",
"csdn.net",
"bilibili.com",
)
low_domains = (
"tieba.baidu.com",
"zhihu.com",
"weibo.com",
"douyin.com",
"xiaohongshu.com",
"taobao.com",
"tmall.com",
"1688.com",
"aliexpress.com",
)
snippet_text = (snippet or "").lower()
if any(item in domain for item in high_domains):
return {
"reliability_level": "high",
"reliability_label": "高可信",
"reliability_reason": "官网/数据手册/主流分销来源",
"domain": domain,
}
if any(item in domain for item in low_domains):
return {
"reliability_level": "low",
"reliability_label": "低可信",
"reliability_reason": "社区/电商/社媒内容,仅供线索参考",
"domain": domain,
}
if any(item in domain for item in medium_domains):
return {
"reliability_level": "medium",
"reliability_label": "中可信",
"reliability_reason": "技术社区或百科内容,建议二次核对",
"domain": domain,
}
if "datasheet" in snippet_text or "规格" in snippet_text or "参数" in snippet_text:
return {
"reliability_level": "medium",
"reliability_label": "中可信",
"reliability_reason": "文本包含参数关键词,建议核对原始链接",
"domain": domain,
}
return {
"reliability_level": "medium",
"reliability_label": "中可信",
"reliability_reason": "来源类型未知,建议人工确认",
"domain": domain,
}
sources = []
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8", errors="ignore")
payload = json.loads(raw)
except Exception:
return {"query": raw_query, "sources": []}
heading = str(payload.get("Heading") or "").strip()
abstract = str(payload.get("AbstractText") or "").strip()
abstract_url = str(payload.get("AbstractURL") or "").strip()
if heading or abstract:
reliability = classify_source_reliability(abstract_url, abstract)
sources.append(
{
"title": heading or raw_query,
"snippet": abstract,
"url": abstract_url,
**reliability,
}
)
def append_related(items: list):
for item in items or []:
if isinstance(item, dict) and "Topics" in item:
append_related(item.get("Topics") or [])
continue
if not isinstance(item, dict):
continue
text = str(item.get("Text") or "").strip()
link = str(item.get("FirstURL") or "").strip()
if not text:
continue
reliability = classify_source_reliability(link, text)
sources.append(
{
"title": raw_query,
"snippet": text,
"url": link,
**reliability,
}
)
if len(sources) >= 4:
return
append_related(payload.get("RelatedTopics") or [])
return {
"query": raw_query,
"sources": sources[:4],
}
def _build_inbound_online_context(rows: list[dict], timeout: int, max_lines: int = 4) -> list[dict]:
"""为信息不完整的入库行构建联网检索上下文。"""
contexts = []
for row in rows:
if len(contexts) >= max_lines:
break
needs_more = (not row.get("name")) or (not row.get("specification")) or (not row.get("note"))
if not needs_more:
continue
query = (row.get("raw") or row.get("name") or row.get("part_no") or "").strip()
if len(query) < 2:
continue
result = _fetch_open_search_context(query, timeout=timeout)
if not result.get("sources"):
continue
contexts.append(
{
"line_no": row.get("line_no"),
"query": result.get("query", ""),
"sources": result.get("sources", []),
}
)
return contexts
def _parse_inbound_preview_rows(raw_lines: list[str]) -> list[dict]:
rows = []
for line_no, line in enumerate(raw_lines, start=1):
parts, warnings = _split_inbound_line_fields(line)
part_no = (parts[0] or "").strip()
name = (parts[1] or "").strip()
quantity_raw = (parts[2] or "").strip()
specification = (parts[3] or "").strip()
note = (parts[4] or "").strip()
part_no, name, warnings = _auto_patch_sparse_inbound_fields(part_no, name, warnings)
quantity = 0
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
warnings.append("数量格式异常已按0处理")
quantity = 0
row = {
"line_no": line_no,
"raw": line,
"part_no": part_no,
"name": name,
"quantity": int(quantity),
"quantity_raw": quantity_raw,
"specification": specification,
"note": note,
"errors": [],
"warnings": warnings,
"is_valid": True,
"normalized_line": "",
}
rows.append(_normalize_inbound_row_style(row))
return rows
def _extract_json_object_block(raw_text: str) -> str:
text = (raw_text or "").strip()
if not text:
return ""
if text.startswith("```"):
text = re.sub(r"^```(?:json)?\\s*", "", text)
text = re.sub(r"\\s*```$", "", text)
first = text.find("{")
last = text.rfind("}")
if first >= 0 and last > first:
return text[first : last + 1]
return text
def _normalize_ai_inbound_rows(ai_rows: list, fallback_rows: list[dict]) -> list[dict]:
by_line = {row["line_no"]: dict(row) for row in fallback_rows}
for raw_row in ai_rows or []:
if not isinstance(raw_row, dict):
continue
try:
line_no = int(raw_row.get("line_no"))
except (TypeError, ValueError):
continue
if line_no not in by_line:
continue
current = by_line[line_no]
part_no = str(raw_row.get("part_no", current["part_no"]) or "").strip()
name = str(raw_row.get("name", current["name"]) or "").strip()
specification = str(raw_row.get("specification", current["specification"]) or "").strip()
note = str(raw_row.get("note", current["note"]) or "").strip()
quantity = current["quantity"]
quantity_candidate = raw_row.get("quantity", current["quantity"])
try:
quantity = _parse_non_negative_int(str(quantity_candidate), 0)
except (TypeError, ValueError):
# AI 数量不可信时保留规则解析值,不覆盖。
pass
warnings = list(current.get("warnings", []))
for w in raw_row.get("warnings", []) if isinstance(raw_row.get("warnings", []), list) else []:
text = str(w or "").strip()
if text:
warnings.append(text)
current.update(
{
"part_no": part_no,
"name": name,
"quantity": int(quantity),
"specification": specification,
"note": note,
"warnings": warnings,
}
)
current = _normalize_inbound_row_style(current)
by_line[line_no] = current
return [by_line[idx] for idx in sorted(by_line.keys())]
def _ai_enhance_inbound_preview(
raw_lines: list[str],
mode: str,
fallback_rows: list[dict],
settings: dict,
use_web_search: bool = False,
) -> tuple[list[dict], str, list[dict]]:
"""使用 AI 对规则解析结果做二次修正。
中文说明AI 负责“更聪明地拆分和纠错”,但最终仍会做字段约束;
如果 AI 不可用或返回异常,自动退回规则解析,不影响使用。
"""
api_key = (settings.get("api_key") or "").strip()
api_url = (settings.get("api_url") or "").strip()
model = (settings.get("model") or "").strip()
web_notice = ""
online_context = []
if use_web_search:
online_context = _build_inbound_online_context(
fallback_rows,
timeout=min(12, int(settings.get("timeout", 30))),
max_lines=4,
)
if online_context:
web_notice = f"已联网检索补充 {len(online_context)} 行参考信息"
else:
web_notice = "已尝试联网检索,但未找到可用补充信息"
if not api_key or not api_url or not model:
if web_notice:
return fallback_rows, f"AI 参数未完整配置,已使用规则解析结果;{web_notice}", online_context
return fallback_rows, "AI 参数未完整配置,已使用规则解析结果", online_context
numbered_lines = [{"line_no": idx, "raw": line} for idx, line in enumerate(raw_lines, start=1)]
system_prompt = (
"你是电子元件入库清洗助手。"
"必须只输出 JSON不要 Markdown不要解释文字。"
"请输出对象: {\"rows\":[{\"line_no\":number,\"part_no\":string,\"name\":string,\"quantity\":number,\"specification\":string,\"note\":string,\"warnings\":string[]}]}。"
"不要新增或删除行号;每个 line_no 仅返回一条。"
"quantity 必须是 >=0 的整数;无法确定时返回 0 并在 warnings 中说明。"
"当原始信息不足时,可结合提供的联网检索摘要补全 name/specification/note并保留 AUTO- 临时料号。"
)
user_prompt = (
f"导入模式: {mode}\n"
"原始行(JSON):\n"
+ json.dumps(numbered_lines, ensure_ascii=False)
+ "\n规则解析参考(JSON):\n"
+ json.dumps(fallback_rows, ensure_ascii=False)
)
if online_context:
user_prompt += "\n联网检索补充(JSON):\n" + json.dumps(online_context, ensure_ascii=False)
try:
suggestion = _call_siliconflow_chat(
system_prompt,
user_prompt,
api_url=api_url,
model=model,
api_key=api_key,
timeout=int(settings.get("timeout", 30)),
)
parsed = json.loads(_extract_json_object_block(suggestion))
ai_rows = parsed.get("rows", []) if isinstance(parsed, dict) else []
normalized_rows = _normalize_ai_inbound_rows(ai_rows, fallback_rows)
notice_parts = []
notice_parts.append("已自动规范规格为“ / ”分隔、备注为“ | ”分隔")
if web_notice:
notice_parts.append(web_notice)
return normalized_rows, "".join(notice_parts), online_context
except Exception:
if web_notice:
return fallback_rows, f"AI 解析失败,已自动回退到规则解析结果;{web_notice}", online_context
return fallback_rows, "AI 解析失败,已自动回退到规则解析结果", online_context
def log_inventory_event(
*,
event_type: str,
delta: int,
box: Box = None,
component: Component = None,
part_no: str = "",
):
"""记录库存变动日志。
中文说明:统计页的趋势图、最近活动、净变化等都依赖这张事件表,
所以凡是入库、出库、删除、合并等会改变库存的操作,都尽量走这里记账。
"""
event = InventoryEvent(
box_id=box.id if box else (component.box_id if component else None),
box_type=box.box_type if box else None,
component_id=component.id if component else None,
part_no=(part_no or (component.part_no if component else "") or "").strip() or None,
event_type=event_type,
delta=int(delta),
)
db.session.add(event)
def parse_days_value(raw_days: str) -> int:
try:
days = int((raw_days or "7").strip())
except ValueError:
days = 7
return days if days in (7, 30) else 7
def parse_box_type_filter(raw_box_type: str) -> str:
box_type = (raw_box_type or "").strip()
return box_type if box_type in BOX_TYPES else "all"
def _to_date(raw_day):
if isinstance(raw_day, str):
return datetime.strptime(raw_day, "%Y-%m-%d").date()
return raw_day
def query_event_daily_delta(days: int, box_type_filter: str = "all"):
today = datetime.now().date()
start_day = today - timedelta(days=days - 1)
query = db.session.query(
db.func.date(InventoryEvent.created_at).label("event_day"),
db.func.sum(InventoryEvent.delta).label("daily_delta"),
).filter(InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time()))
if box_type_filter != "all":
query = query.filter(InventoryEvent.box_type == box_type_filter)
rows = query.group_by(db.func.date(InventoryEvent.created_at)).all()
delta_by_day = {}
for row in rows:
delta_by_day[_to_date(row.event_day)] = int(row.daily_delta or 0)
return delta_by_day
def build_trend_points_from_events(days: int, total_quantity: int, box_type_filter: str = "all"):
safe_days = days if days in (7, 30) else 7
today = datetime.now().date()
delta_by_day = query_event_daily_delta(safe_days, box_type_filter)
points = []
running_total = total_quantity
reverse_days = [today - timedelta(days=offset) for offset in range(safe_days)]
for day in reverse_days:
points.append(
{
"date": day,
"label": day.strftime("%m-%d"),
"value": running_total,
}
)
running_total -= delta_by_day.get(day, 0)
points.reverse()
return points
def build_box_type_trend_series(days: int, totals_by_type: dict):
safe_days = days if days in (7, 30) else 7
today = datetime.now().date()
all_days = [today - timedelta(days=offset) for offset in range(safe_days)]
series = {}
for box_type in BOX_TYPES.keys():
delta_by_day = query_event_daily_delta(safe_days, box_type)
running_total = int(totals_by_type.get(box_type, 0))
values = []
for day in all_days:
values.append(running_total)
running_total -= delta_by_day.get(day, 0)
values.reverse()
series[box_type] = values
return {
"labels": [day.strftime("%m-%d") for day in reversed(all_days)],
"series": series,
}
def make_sparkline(values: list[int], width: int = 220, height: int = 56) -> str:
if not values:
return ""
min_value = min(values)
max_value = max(values)
span = max(max_value - min_value, 1)
step_x = width / max(len(values) - 1, 1)
points = []
for idx, value in enumerate(values):
x = idx * step_x
y = height - ((value - min_value) / span) * height
points.append(f"{x:.2f},{y:.2f}")
return " ".join(points)
def event_type_label(event_type: str) -> str:
labels = {
"quick_inbound_add": "快速入库新增",
"quick_inbound_merge": "快速入库合并",
"component_outbound": "快速出库",
"component_save": "编辑保存",
"component_enable": "启用元件",
"component_disable": "停用元件",
"component_delete": "删除元件",
"bag_add": "袋装新增",
"bag_batch_add": "袋装批量新增",
"bag_merge": "袋装合并",
"bag_batch_merge": "袋装批量合并",
"box_delete": "删除盒子",
}
return labels.get(event_type, event_type)
def recent_events(limit: int = 20, box_type_filter: str = "all"):
query = InventoryEvent.query
if box_type_filter != "all":
query = query.filter_by(box_type=box_type_filter)
rows = query.order_by(InventoryEvent.created_at.desc()).limit(limit).all()
items = []
for row in rows:
items.append(
{
"time": row.created_at.strftime("%Y-%m-%d %H:%M") if row.created_at else "-",
"type": event_type_label(row.event_type),
"box_type": BOX_TYPES.get(row.box_type, {}).get("label", "全部"),
"part_no": row.part_no or "-",
"delta": int(row.delta or 0),
}
)
return items
def build_dashboard_context():
"""构建首页/分类页需要的聚合数据。
中文说明:这里会一次性整理盒子分组、库存统计、低库存清单等信息,
让模板层尽量只负责展示,不承担复杂的数据拼装逻辑。
"""
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
boxes.sort(key=box_sort_key)
groups = {key: [] for key in BOX_TYPES.keys()}
for box in boxes:
box_type = box.box_type if box.box_type in BOX_TYPES else "small_28"
overview_rows = make_overview_rows(box)
groups[box_type].append(
{
"box": box,
"used_count": len(overview_rows),
"slot_range": slot_range_label(box),
"overview_rows": overview_rows,
"base_name": infer_base_name(box),
}
)
components = Component.query.all()
enabled_components = [c for c in components if c.is_enabled]
disabled_components = [c for c in components if not c.is_enabled]
low_stock_components = [c for c in enabled_components if c.quantity < LOW_STOCK_THRESHOLD]
trend_points_7d = build_trend_points_from_events(
days=7,
total_quantity=sum(c.quantity for c in enabled_components),
box_type_filter="all",
)
period_net_change_7d = 0
if len(trend_points_7d) >= 2:
period_net_change_7d = trend_points_7d[-1]["value"] - trend_points_7d[0]["value"]
low_stock_items = []
for c in sorted(low_stock_components, key=lambda item: (item.quantity, item.name or ""))[:12]:
box = box_by_id.get(c.box_id)
box_type_key = box.box_type if box and box.box_type in BOX_TYPES else "small_28"
low_stock_items.append(
{
"name": c.name,
"part_no": c.part_no,
"quantity": c.quantity,
"box_type": box_type_key,
"box_type_label": BOX_TYPES[box_type_key]["label"],
"box_name": box.name if box else f"{c.box_id}",
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
"edit_url": url_for("edit_component", box_id=c.box_id, slot=c.slot_index),
}
)
category_stats = []
max_category_quantity = 0
for key, meta in BOX_TYPES.items():
category_components = [
c
for c in enabled_components
if box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == key
]
quantity = sum(c.quantity for c in category_components)
max_category_quantity = max(max_category_quantity, quantity)
category_stats.append(
{
"key": key,
"label": meta["label"],
"item_count": len(category_components),
"quantity": quantity,
}
)
stats = {
"box_count": len(boxes),
"active_items": len(enabled_components),
"low_stock_count": len(low_stock_components),
"disabled_count": len(disabled_components),
"max_category_quantity": max_category_quantity,
"period_net_change_7d": period_net_change_7d,
}
return {
"groups": groups,
"stats": stats,
"category_stats": category_stats,
"low_stock_items": low_stock_items,
}
def _build_restock_payload(*, limit: int = 20, threshold: int = LOW_STOCK_THRESHOLD) -> dict:
"""生成补货分析输入数据。
中文说明AI 不直接读数据库,而是读取这里整理好的低库存清单和近 30 天出库热点,
这样可以稳定控制输入格式,也方便 AI 异常时用同一份数据做规则兜底。
"""
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
enabled_components = Component.query.filter_by(is_enabled=True).all()
low_stock_components = [c for c in enabled_components if int(c.quantity or 0) < threshold]
low_items = []
for c in sorted(low_stock_components, key=lambda item: (int(item.quantity or 0), item.name or ""))[:limit]:
box = box_by_id.get(c.box_id)
box_type = box.box_type if box and box.box_type in BOX_TYPES else "small_28"
low_items.append(
{
"part_no": c.part_no,
"name": c.name,
"quantity": int(c.quantity or 0),
"box_type": box_type,
"box_type_label": BOX_TYPES[box_type]["label"],
"box_name": box.name if box else f"{c.box_id}",
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
}
)
start_day = datetime.now().date() - timedelta(days=29)
outbound_rows = (
db.session.query(InventoryEvent.part_no, db.func.sum(-InventoryEvent.delta).label("outbound_qty"))
.filter(
InventoryEvent.event_type == "component_outbound",
InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time()),
InventoryEvent.delta < 0,
)
.group_by(InventoryEvent.part_no)
.order_by(db.func.sum(-InventoryEvent.delta).desc())
.limit(20)
.all()
)
outbound_top = [
{"part_no": row[0] or "-", "outbound_qty_30d": int(row[1] or 0)}
for row in outbound_rows
]
return {
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"threshold": int(threshold),
"low_stock_items": low_items,
"top_outbound_30d": outbound_top,
}
def _normalize_part_no_key(part_no: str) -> str:
return (part_no or "").strip().upper()
def _pick_standard_text(values: list[str]) -> str:
"""从多个文本候选中挑选建议标准写法。
中文说明:优先选择出现次数最多的写法;若次数相同,选择长度更长的写法,
这样一般能保留更多有效信息。
"""
bucket = {}
for value in values:
text = (value or "").strip()
if not text:
continue
bucket[text] = bucket.get(text, 0) + 1
if not bucket:
return ""
ordered = sorted(bucket.items(), key=lambda item: (-item[1], -len(item[0]), item[0]))
return ordered[0][0]
def _compact_spaces(text: str) -> str:
return re.sub(r"\s+", " ", (text or "").strip())
def _dedupe_text_list(values: list[str], limit: int | None = None) -> list[str]:
seen = set()
rows = []
for value in values:
text = _compact_spaces(str(value or ""))
if not text:
continue
key = _normalize_material_text(text)
if not key or key in seen:
continue
seen.add(key)
rows.append(text)
if limit is not None and len(rows) >= limit:
break
return rows
def _split_natural_language_terms(query: str) -> list[str]:
raw = _compact_spaces(query)
if not raw:
return []
normalized = re.sub(r"[,;/|]+", " ", raw)
parts = [p.strip() for p in re.split(r"\s+", normalized) if p.strip()]
if len(parts) <= 1:
parts = re.findall(r"[A-Za-z0-9.+#%-]+(?:-[A-Za-z0-9.+#%-]+)?|[\u4e00-\u9fff]{1,}", normalized)
return _dedupe_text_list(parts, limit=10)
def _looks_like_part_no_term(term: str) -> bool:
upper = (term or "").strip().upper()
if not upper:
return False
if re.fullmatch(r"C\d{3,}", upper):
return True
if re.search(r"[A-Z]", upper) and re.search(r"\d", upper) and len(upper) >= 6:
return True
return False
def _looks_like_package_term(term: str) -> bool:
upper = (term or "").strip().upper()
if not upper:
return False
return bool(
re.fullmatch(r"(?:0201|0402|0603|0805|1206|1210|1812|2512)", upper)
or re.fullmatch(r"(?:SOT|SOP|SOIC|QFN|QFP|LQFP|TQFP|DIP|TO|DFN|BGA)[- ]?\d+[A-Z-]*", upper)
or re.fullmatch(r"[A-Z]{2,6}-\d{1,3}", upper)
)
def _looks_like_spec_term(term: str) -> bool:
upper = (term or "").strip().upper()
if not upper:
return False
if _looks_like_package_term(upper):
return True
return bool(
re.search(r"\d(?:\.\d+)?\s?(?:V|A|MA|UA|OHM|R|K|M|UF|NF|PF|UH|MH|W|%|MHZ|GHZ|KB|MB|BIT)\b", upper)
or upper in {"USB", "TYPE-C", "X7R", "X5R", "NPO", "COG", "UART", "I2C", "SPI", "CAN", "LDO", "DC-DC"}
)
def _build_rule_based_search_plan(query: str) -> dict:
"""把自然语言查询映射为多字段组合搜索计划。
中文说明:这里先做一层规则解析,把用户输入拆成“更像料号 / 更像规格 / 更像备注 / 更像名称”
的字段集合;这样即使没有配置 AI也能支持如“3.3V 稳压芯片”“0805 常用电阻”这类查询。
"""
terms = _split_natural_language_terms(query)
field_map = {
"part_no": [],
"name": [],
"specification": [],
"note": [],
}
for term in terms:
lowered = term.lower()
if term in SEARCH_GENERIC_TERMS:
continue
if _looks_like_part_no_term(term):
field_map["part_no"].append(term.upper())
continue
if _looks_like_spec_term(term):
field_map["specification"].append(term)
continue
if term in SEARCH_NOTE_HINT_TERMS or lowered in SEARCH_NOTE_HINT_TERMS:
field_map["note"].append(term)
continue
field_map["name"].append(term)
for key in field_map:
field_map[key] = _dedupe_text_list(field_map[key], limit=6)
keywords = _dedupe_text_list(
field_map["part_no"] + field_map["name"] + field_map["specification"] + field_map["note"],
limit=10,
)
summary_bits = []
field_labels = {
"part_no": "料号",
"name": "名称",
"specification": "规格",
"note": "备注",
}
for field, values in field_map.items():
if values:
summary_bits.append(f"{field_labels[field]}: {' / '.join(values)}")
return {
"query": query,
"mode": "rule",
"field_map": field_map,
"keywords": keywords,
"summary": "".join(summary_bits) if summary_bits else "未识别到明确字段,按全文模糊搜索",
}
def _normalize_search_plan(raw_plan: dict, fallback_plan: dict) -> dict:
if not isinstance(raw_plan, dict):
return fallback_plan
field_map = {}
for field in ("part_no", "name", "specification", "note"):
raw_values = raw_plan.get(field, fallback_plan["field_map"].get(field, []))
if isinstance(raw_values, str):
raw_values = [raw_values]
if not isinstance(raw_values, list):
raw_values = fallback_plan["field_map"].get(field, [])
field_map[field] = _dedupe_text_list(raw_values, limit=6)
keywords = raw_plan.get("keywords", [])
if isinstance(keywords, str):
keywords = re.split(r"[,/|\s]+", keywords)
if not isinstance(keywords, list):
keywords = []
keywords = _dedupe_text_list(keywords, limit=10)
if not keywords:
keywords = _dedupe_text_list(
field_map["part_no"] + field_map["name"] + field_map["specification"] + field_map["note"],
limit=10,
)
summary = _compact_spaces(str(raw_plan.get("summary", "") or "")) or fallback_plan.get("summary", "")
if not any(field_map.values()):
return fallback_plan
return {
"query": fallback_plan.get("query", ""),
"mode": "ai",
"field_map": field_map,
"keywords": keywords,
"summary": summary,
}
def _build_search_plan(query: str, settings: dict) -> tuple[dict, str, dict]:
fallback_plan = _build_rule_based_search_plan(query)
trace = {
"query": query,
"fallback_plan": fallback_plan,
"used_ai": False,
"used_fallback": False,
"ai_raw": "",
"ai_error": "",
"final_mode": "rule",
}
api_key = (settings.get("api_key") or "").strip()
api_url = (settings.get("api_url") or "").strip()
model = (settings.get("model") or "").strip()
if not api_key or not api_url or not model:
trace["used_fallback"] = True
return fallback_plan, "", trace
system_prompt = (
"你是电子元件库存搜索解析助手。"
"必须只输出 JSON不要 Markdown不要解释文字。"
"输出格式: {\"part_no\":[string],\"name\":[string],\"specification\":[string],\"note\":[string],\"keywords\":[string],\"summary\":string}。"
"目标是把自然语言查询拆成适合库存系统组合搜索的字段词。"
"不要虚构料号;每个数组最多 6 项。"
)
user_prompt = (
"用户搜索词:\n"
+ json.dumps({"query": query, "fallback": fallback_plan}, ensure_ascii=False)
)
try:
suggestion = _call_siliconflow_chat(
system_prompt,
user_prompt,
api_url=api_url,
model=model,
api_key=api_key,
timeout=int(settings.get("timeout", 30)),
)
trace["used_ai"] = True
trace["ai_raw"] = suggestion
parsed = json.loads(_extract_json_object_block(suggestion))
final_plan = _normalize_search_plan(parsed, fallback_plan)
trace["final_mode"] = final_plan.get("mode", "ai")
trace["used_fallback"] = trace["final_mode"] != "ai"
return final_plan, "", trace
except Exception as exc:
trace["used_fallback"] = True
trace["ai_error"] = str(exc)
return fallback_plan, "AI 搜索解析失败,已回退到规则搜索", trace
def _parse_search_fuzziness(raw: str) -> str:
mode = (raw or "balanced").strip().lower()
if mode not in SEARCH_FUZZY_PROFILES:
mode = "balanced"
return mode
def _search_text_contains(text: str, term: str) -> bool:
normalized_text = _normalize_material_text(text)
normalized_term = _normalize_material_text(term)
if not normalized_text or not normalized_term:
return False
return normalized_term in normalized_text
def _fuzzy_ratio(a: str, b: str) -> float:
"""计算两个字符串的相似度,用于搜索兜底模糊匹配。"""
left = _normalize_material_text(a)
right = _normalize_material_text(b)
if not left or not right:
return 0.0
return difflib.SequenceMatcher(None, left, right).ratio()
def _fuzzy_term_match_score(text: str, term: str) -> float:
"""对单个词做宽松匹配评分。
中文说明:先尝试直接包含匹配;不命中时再做片段相似度,
避免搜索词稍有差异(如“稳压器/稳压芯片”)就完全漏检。
"""
normalized_text = _normalize_material_text(text)
normalized_term = _normalize_material_text(term)
if not normalized_text or not normalized_term:
return 0.0
if normalized_term in normalized_text:
term_len = max(len(normalized_term), 1)
bonus = min(term_len / 12.0, 0.35)
return min(1.0, 0.75 + bonus)
if len(normalized_term) <= 1:
return 0.0
best = _fuzzy_ratio(normalized_text, normalized_term)
window = len(normalized_term)
if len(normalized_text) > window and window >= 2:
step = 1 if window <= 4 else 2
for idx in range(0, len(normalized_text) - window + 1, step):
chunk = normalized_text[idx : idx + window]
ratio = _fuzzy_ratio(chunk, normalized_term)
if ratio > best:
best = ratio
return best
def _search_component_match_info(component: Component, plan: dict, fuzziness: str = "balanced") -> dict:
profile = SEARCH_FUZZY_PROFILES.get(fuzziness, SEARCH_FUZZY_PROFILES["balanced"])
field_texts = {
"part_no": component.part_no or "",
"name": component.name or "",
"specification": component.specification or "",
"note": component.note or "",
}
combined_text = " ".join(field_texts.values())
matched_fields = set()
matched_terms = []
score = 0
total_terms = 0
fuzzy_matches = []
for field, terms in plan.get("field_map", {}).items():
for term in terms:
total_terms += 1
field_score = _fuzzy_term_match_score(field_texts.get(field, ""), term)
combined_score = _fuzzy_term_match_score(combined_text, term)
if field_score >= profile["field_hit"]:
score += SEARCH_FIELD_WEIGHTS.get(field, 1) + 1
matched_fields.add(field)
matched_terms.append(term)
fuzzy_matches.append({"term": term, "score": round(field_score, 3), "field": field})
elif field != "part_no" and combined_score >= profile["combined_hit"]:
score += SEARCH_FIELD_WEIGHTS.get(field, 1)
matched_fields.add(field)
matched_terms.append(term)
fuzzy_matches.append({"term": term, "score": round(combined_score, 3), "field": "all"})
elif max(field_score, combined_score) >= profile["soft_hit"]:
# 低分模糊命中只给轻权重,避免误召回过多。
score += 1
matched_terms.append(term)
fuzzy_matches.append(
{
"term": term,
"score": round(max(field_score, combined_score), 3),
"field": field if field_score >= combined_score else "all",
}
)
for term in plan.get("keywords", []):
if term in matched_terms:
continue
keyword_score = _fuzzy_term_match_score(combined_text, term)
if keyword_score >= profile["keyword_hit"]:
score += 1
matched_terms.append(term)
fuzzy_matches.append({"term": term, "score": round(keyword_score, 3), "field": "all"})
elif keyword_score >= profile["keyword_soft"]:
score += 0.5
matched_terms.append(term)
fuzzy_matches.append({"term": term, "score": round(keyword_score, 3), "field": "all"})
unique_matched_terms = _dedupe_text_list(matched_terms, limit=8)
coverage = len(unique_matched_terms) / max(total_terms or len(plan.get("keywords", [])) or 1, 1)
is_match = False
if score >= profile["score_gate"]:
is_match = True
elif unique_matched_terms and coverage >= profile["coverage_gate"]:
is_match = True
elif plan.get("keywords") and len(plan.get("keywords", [])) == 1 and unique_matched_terms:
is_match = True
elif any(item["score"] >= profile["high_fuzzy_gate"] for item in fuzzy_matches):
is_match = True
return {
"is_match": is_match,
"score": score,
"coverage": coverage,
"matched_terms": unique_matched_terms,
"matched_fields": sorted(matched_fields),
"fuzzy_matches": sorted(fuzzy_matches, key=lambda row: row["score"], reverse=True)[:6],
}
def _infer_component_category(part_no: str, name: str, specification: str, note: str) -> str:
combined = " ".join([part_no or "", name or "", specification or "", note or ""]).lower()
for label, patterns in COMPONENT_CATEGORY_HINTS:
for pattern in patterns:
if pattern in combined:
return label
return ""
def _extract_primary_package(specification: str, name: str = "", note: str = "") -> str:
spec_fields = _parse_slot_spec_fields(specification)
package = _compact_spaces(spec_fields.get("package", ""))
if package:
return package
combined = " ".join([name or "", specification or "", note or ""])
match = re.search(
r"\b(0201|0402|0603|0805|1206|1210|1812|2512|SOT-23(?:-\d+)?|SOP-?\d+|SOIC-?\d+|QFN-?\d+|QFP-?\d+|LQFP-?\d+|TQFP-?\d+|DIP-?\d+|DFN-?\d+|TO-?\d+)\b",
combined,
flags=re.IGNORECASE,
)
return match.group(1).upper() if match else ""
def _extract_component_keywords(part_no: str, name: str, specification: str, note: str) -> list[str]:
combined = " ".join([part_no or "", name or "", specification or "", note or ""])
keywords = []
category = _infer_component_category(part_no, name, specification, note)
if category:
keywords.append(category)
package = _extract_primary_package(specification, name=name, note=note)
if package:
keywords.append(package)
lcsc_code = _extract_lcsc_code_from_text(note or part_no or "")
if lcsc_code:
keywords.append(lcsc_code)
value_patterns = [
r"\b\d+(?:\.\d+)?\s?(?:V|A|mA|uA|W|MHz|GHz|KB|MB|bit)\b",
r"\b\d+(?:\.\d+)?\s?(?:K|M|R|ohm|Ω)\b",
r"\b\d+(?:\.\d+)?\s?(?:uF|nF|pF|uH|mH)\b",
r"\b\d+%\b",
r"\b(?:USB|TYPE-C|UART|I2C|SPI|CAN|RS485|LDO|DC-DC|X7R|X5R|NPO|COG)\b",
]
for pattern in value_patterns:
for match in re.findall(pattern, combined, flags=re.IGNORECASE):
keywords.append(_compact_spaces(str(match)).upper().replace("MA", "mA").replace("UA", "uA"))
for token in _split_natural_language_terms(name):
if token in SEARCH_GENERIC_TERMS or len(token) <= 1:
continue
if _looks_like_part_no_term(token):
continue
keywords.append(token)
for token in _split_natural_language_terms(note):
if token in SEARCH_GENERIC_TERMS or len(token) <= 1:
continue
keywords.append(token)
return _dedupe_text_list(keywords, limit=8)
def _truncate_text(text: str, limit: int) -> str:
raw = _compact_spaces(text)
if len(raw) <= limit:
return raw
return raw[: max(limit - 1, 1)].rstrip(" -_/|") + ""
def _compose_standardized_note(note: str, keywords: list[str]) -> str:
segments = []
for chunk in re.split(r"[\n|]+", note or ""):
text = _compact_spaces(chunk)
if not text:
continue
if text.startswith("关键词:"):
continue
segments.append(text)
if keywords:
segments.append("关键词: " + ", ".join(keywords[:6]))
return " | ".join(_dedupe_text_list(segments, limit=8))
def _build_rule_based_standardization(part_no: str, name: str, specification: str, note: str) -> dict:
"""生成标签打印和备注标准化建议。
中文说明:这里不直接覆盖数据库,而是先给出“短标签 / 建议名称 / 建议备注 / 搜索关键词”
供用户确认;即使 AI 不可用,也会用规则生成一个稳定可用的建议结果。
"""
category = _infer_component_category(part_no, name, specification, note)
keywords = _extract_component_keywords(part_no, name, specification, note)
package = _extract_primary_package(specification, name=name, note=note)
main_terms = []
for term in keywords:
if term in {category, package}:
continue
if _looks_like_part_no_term(term):
continue
main_terms.append(term)
short_bits = []
if category:
short_bits.append(category)
for term in main_terms[:2]:
if term not in short_bits:
short_bits.append(term)
if package and package not in short_bits:
short_bits.append(package)
fallback_name = _compact_spaces(name or "")
short_label = _truncate_text(" ".join(short_bits) or fallback_name or part_no or "未命名元件", 18)
standardized_name = fallback_name
if not standardized_name or len(standardized_name) > 24:
standardized_name = _truncate_text(" ".join(short_bits) or part_no or fallback_name or "未命名元件", 24)
standardized_specification = _compact_spaces(specification or "")
standardized_note = _compose_standardized_note(note or "", keywords)
return {
"short_label": short_label,
"name": standardized_name,
"specification": standardized_specification,
"note": standardized_note,
"keywords": keywords,
}
def _normalize_standardization_suggestion(raw: dict, fallback: dict) -> dict:
if not isinstance(raw, dict):
return fallback
result = {
"short_label": _compact_spaces(str(raw.get("short_label", fallback["short_label"]) or fallback["short_label"])),
"name": _compact_spaces(str(raw.get("name", fallback["name"]) or fallback["name"])),
"specification": _compact_spaces(str(raw.get("specification", fallback["specification"]) or fallback["specification"])),
"note": _compact_spaces(str(raw.get("note", fallback["note"]) or fallback["note"])),
"keywords": fallback.get("keywords", []),
}
keywords = raw.get("keywords", fallback.get("keywords", []))
if isinstance(keywords, str):
keywords = re.split(r"[,/|\s]+", keywords)
if not isinstance(keywords, list):
keywords = fallback.get("keywords", [])
result["keywords"] = _dedupe_text_list(keywords, limit=8) or fallback.get("keywords", [])
if not result["short_label"]:
result["short_label"] = fallback["short_label"]
if not result["name"]:
result["name"] = fallback["name"]
if not result["note"] or "关键词:" not in result["note"]:
result["note"] = _compose_standardized_note(result["note"], result["keywords"])
return result
def _build_component_standardization_suggestion(
part_no: str,
name: str,
specification: str,
note: str,
settings: dict,
) -> tuple[dict, str]:
fallback = _build_rule_based_standardization(part_no, name, specification, note)
api_key = (settings.get("api_key") or "").strip()
api_url = (settings.get("api_url") or "").strip()
model = (settings.get("model") or "").strip()
if not api_key or not api_url or not model:
return fallback, ""
system_prompt = (
"你是电子元件标签与备注标准化助手。"
"必须只输出 JSON不要 Markdown不要解释文字。"
"输出格式: {\"short_label\":string,\"name\":string,\"specification\":string,\"note\":string,\"keywords\":[string]}。"
"要求: short_label 更适合标签打印name 更短但仍可检索note 保留追溯信息并补充统一关键词。"
)
user_prompt = "元件字段(JSON):\n" + json.dumps(
{
"part_no": part_no,
"name": name,
"specification": specification,
"note": note,
"fallback": fallback,
},
ensure_ascii=False,
)
try:
suggestion = _call_siliconflow_chat(
system_prompt,
user_prompt,
api_url=api_url,
model=model,
api_key=api_key,
timeout=int(settings.get("timeout", 30)),
)
parsed = json.loads(_extract_json_object_block(suggestion))
return _normalize_standardization_suggestion(parsed, fallback), ""
except Exception:
return fallback, "AI 标准化失败,已回退到规则建议"
def _build_duplicate_member(component: Component, box_by_id: dict[int, Box]) -> dict:
box = box_by_id.get(component.box_id)
lcsc_code = _extract_lcsc_code_from_text(component.note or "")
if not lcsc_code:
lcsc_code = _extract_lcsc_code_from_text(component.part_no or "")
return {
"component_id": component.id,
"part_no": component.part_no or "",
"name": component.name or "",
"specification": component.specification or "",
"quantity": int(component.quantity or 0),
"lcsc_code": lcsc_code,
"box_id": component.box_id,
"box_name": box.name if box else f"{component.box_id}",
"slot_code": slot_code_for_box(box, component.slot_index) if box else str(component.slot_index),
"edit_url": url_for("edit_component", box_id=component.box_id, slot=component.slot_index),
}
def _build_duplicate_audit_payload(limit_groups: int = 60) -> dict:
"""构建重复物料巡检结果。
中文说明:本巡检不自动合并库存,只输出“疑似重复”清单和原因,
由人工在编辑页确认后执行合并,保持现有安全策略。
"""
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
enabled_components = Component.query.filter_by(is_enabled=True).all()
by_part_no = {}
by_material = {}
by_lcsc = {}
for component in enabled_components:
part_no_key = _normalize_part_no_key(component.part_no)
if part_no_key:
by_part_no.setdefault(part_no_key, []).append(component)
material_key = _material_identity_key(component.name, component.specification)
if material_key:
by_material.setdefault(material_key, []).append(component)
lcsc_code = _extract_lcsc_code_from_text(component.note or "")
if not lcsc_code:
lcsc_code = _extract_lcsc_code_from_text(component.part_no or "")
if lcsc_code:
by_lcsc.setdefault(lcsc_code, []).append(component)
def make_groups(source: dict, group_type: str, reason_text: str) -> list[dict]:
groups = []
for key, members in source.items():
if len(members) < 2:
continue
sorted_members = sorted(members, key=lambda c: (c.box_id, c.slot_index, c.id))
names = [(c.name or "").strip() for c in sorted_members]
specifications = [(c.specification or "").strip() for c in sorted_members]
unique_names = sorted({text for text in names if text})
unique_specifications = sorted({text for text in specifications if text})
standard_name = _pick_standard_text(names)
standard_specification = _pick_standard_text(specifications)
suggestion_lines = []
if len(unique_names) > 1 and standard_name:
suggestion_lines.append(f"建议统一名称为: {standard_name}")
if len(unique_specifications) > 1 and standard_specification:
suggestion_lines.append(f"建议统一规格为: {standard_specification}")
if not suggestion_lines:
suggestion_lines.append("命名和规格写法基本一致,可仅核对是否需要合并库存位置")
groups.append(
{
"type": group_type,
"key": key,
"reason": reason_text,
"member_count": len(sorted_members),
"unique_name_count": len(unique_names),
"unique_specification_count": len(unique_specifications),
"standard_name": standard_name,
"standard_specification": standard_specification,
"suggestion": "".join(suggestion_lines),
"members": [_build_duplicate_member(component, box_by_id) for component in sorted_members],
}
)
groups.sort(key=lambda row: (-int(row["member_count"]), row["key"]))
return groups[:limit_groups]
part_no_groups = make_groups(by_part_no, "part_no", "疑似同料号")
material_groups = make_groups(by_material, "material", "疑似同参数")
lcsc_groups = make_groups(by_lcsc, "lcsc", "疑似同立创编号")
all_groups = part_no_groups + material_groups + lcsc_groups
all_groups.sort(key=lambda row: (-int(row["member_count"]), row["type"], row["key"]))
return {
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"total_enabled": len(enabled_components),
"total_groups": len(all_groups),
"part_no_groups": part_no_groups,
"material_groups": material_groups,
"lcsc_groups": lcsc_groups,
"groups": all_groups[:limit_groups],
}
def _build_duplicate_audit_summary_with_ai(payload: dict, settings: dict) -> tuple[str, str]:
api_key = (settings.get("api_key") or "").strip()
api_url = (settings.get("api_url") or "").strip()
model = (settings.get("model") or "").strip()
if not api_key or not api_url or not model:
return "", "AI 参数未完整配置,摘要使用规则生成"
brief_payload = {
"generated_at": payload.get("generated_at"),
"total_enabled": payload.get("total_enabled", 0),
"total_groups": payload.get("total_groups", 0),
"part_no_groups": len(payload.get("part_no_groups", [])),
"material_groups": len(payload.get("material_groups", [])),
"lcsc_groups": len(payload.get("lcsc_groups", [])),
"top_groups": [
{
"reason": g.get("reason", ""),
"key": g.get("key", ""),
"member_count": g.get("member_count", 0),
"suggestion": g.get("suggestion", ""),
}
for g in payload.get("groups", [])[:8]
],
}
system_prompt = (
"你是库存数据治理助手。"
"请输出简短中文总结不超过120字包含风险级别和处理优先顺序。"
"不要使用Markdown。"
)
user_prompt = "重复物料巡检结果(JSON):\n" + json.dumps(brief_payload, ensure_ascii=False)
try:
summary = _call_siliconflow_chat(
system_prompt,
user_prompt,
api_url=api_url,
model=model,
api_key=api_key,
timeout=int(settings.get("timeout", 30)),
)
return summary, ""
except Exception:
return "", "AI 摘要生成失败,已使用规则结果"
def _call_siliconflow_chat(
system_prompt: str,
user_prompt: str,
*,
api_url: str,
model: str,
api_key: str,
timeout: int,
) -> str:
api_key = (api_key or "").strip()
if not api_key:
raise RuntimeError("SILICONFLOW_API_KEY 未配置")
if not api_url:
raise RuntimeError("AI API URL 未配置")
if not model:
raise RuntimeError("AI 模型名称未配置")
payload = {
"model": model,
"temperature": 0.2,
"max_tokens": 700,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
}
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
api_url,
data=body,
method="POST",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"AI 服务返回 HTTP {exc.code}: {detail[:200]}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"AI 服务连接失败: {exc.reason}") from exc
try:
data = json.loads(raw)
return data["choices"][0]["message"]["content"].strip()
except (KeyError, IndexError, TypeError, json.JSONDecodeError) as exc:
raise RuntimeError("AI 返回格式无法解析") from exc
@app.route("/")
def index():
return redirect(url_for("types_page"))
@app.route("/types")
def types_page():
dashboard = build_dashboard_context()
notice = request.args.get("notice", "").strip()
error = request.args.get("error", "").strip()
category_stats_map = {item["key"]: item for item in dashboard["category_stats"]}
low_stock_groups = []
for key, meta in BOX_TYPES.items():
grouped_items = [
item
for item in dashboard["low_stock_items"]
if item.get("box_type") == key
]
low_stock_groups.append(
{
"key": key,
"label": meta["label"],
"items": grouped_items,
}
)
type_cards = []
for key, meta in BOX_TYPES.items():
category_item = category_stats_map.get(key, {})
type_cards.append(
{
"key": key,
"label": meta["label"],
"desc": meta["default_desc"],
"count": len(dashboard["groups"].get(key, [])),
"item_count": int(category_item.get("item_count", 0)),
"quantity": int(category_item.get("quantity", 0)),
"url": url_for("type_page", box_type=key),
}
)
return render_template(
"types.html",
type_cards=type_cards,
stats=dashboard["stats"],
low_stock_groups=low_stock_groups,
notice=notice,
error=error,
)
@app.route("/container-type/<box_type>/edit", methods=["GET", "POST"])
def edit_container_type(box_type: str):
if box_type not in BOX_TYPES:
return bad_request("无效盒子类型", "")
meta = BOX_TYPES[box_type]
error = ""
if request.method == "POST":
label = request.form.get("label", "").strip()
default_desc = request.form.get("default_desc", "").strip()
default_prefix = request.form.get("default_prefix", "").strip().upper()
if not label:
error = "容器名称不能为空"
elif not default_prefix:
error = "默认前缀不能为空"
if not error:
meta["label"] = label
meta["default_desc"] = default_desc or DEFAULT_BOX_TYPES[box_type]["default_desc"]
meta["default_prefix"] = default_prefix
_save_box_type_overrides()
return redirect(url_for("types_page", notice="容器属性已更新"))
return render_template(
"type_edit.html",
box_type=box_type,
meta=meta,
error=error,
)
@app.route("/type/<box_type>")
def type_page(box_type: str):
if box_type not in BOX_TYPES:
return bad_request("无效盒子类型", "")
if box_type == "bag":
return render_box_page(get_fixed_bag_box())
dashboard = build_dashboard_context()
return render_template(
"index.html",
groups=dashboard["groups"],
box_types=BOX_TYPES,
stats=dashboard["stats"],
category_stats=dashboard["category_stats"],
low_stock_items=dashboard["low_stock_items"],
view_box_types=[box_type],
current_box_type=box_type,
separate_mode=True,
)
@app.route("/boxes/create", methods=["POST"])
def create_box():
box_type = request.form.get("box_type", "small_28").strip()
base_name = request.form.get("name", "").strip()
description = request.form.get("description", "").strip()
slot_prefix = request.form.get("slot_prefix", "").strip().upper()
if box_type not in BOX_TYPES:
return bad_request("无效盒子类型", box_type)
if box_type == "bag":
return redirect(url_for("type_page", box_type="bag"))
if not base_name:
return bad_request("盒子名称不能为空", box_type)
try:
start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1)
except ValueError:
return bad_request("起始序号必须是大于等于 0 的整数", box_type)
meta = BOX_TYPES[box_type]
slot_capacity = meta["default_capacity"]
if box_type == "custom":
try:
slot_capacity = _parse_non_negative_int(
request.form.get("slot_capacity", str(meta["default_capacity"])),
meta["default_capacity"],
)
except ValueError:
return bad_request("格数必须是大于等于 1 的整数", box_type)
if slot_capacity < 1:
return bad_request("格数必须是大于等于 1 的整数", box_type)
effective_prefix = slot_prefix or meta["default_prefix"]
conflict, other_box = has_range_conflict(
box_type=box_type,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
)
if conflict:
return bad_request(
"编号范围冲突: 与现有盒子 "
f"[{other_box.name}]"
" 重叠,请更换前缀或起始序号",
box_type,
)
generated_name = compose_box_name(
base_name=base_name,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
)
final_name = make_unique_box_name(generated_name)
box = Box(
name=final_name,
description=description or meta["default_desc"],
box_type=box_type,
slot_capacity=slot_capacity,
slot_prefix=effective_prefix,
start_number=start_number,
)
db.session.add(box)
db.session.commit()
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
target_type = return_to_type if return_to_type != "all" else box_type
return redirect(url_for("type_page", box_type=target_type))
@app.route("/boxes/<int:box_id>/update", methods=["POST"])
def update_box(box_id: int):
box = Box.query.get_or_404(box_id)
base_name = request.form.get("name", "").strip()
description = request.form.get("description", "").strip()
slot_prefix = request.form.get("slot_prefix", "").strip().upper()
if not base_name:
return bad_request("盒子名称不能为空", box.box_type)
try:
start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1)
except ValueError:
return bad_request("起始序号必须是大于等于 0 的整数", box.box_type)
slot_capacity = box.slot_capacity
if box.box_type == "custom":
try:
slot_capacity = _parse_non_negative_int(
request.form.get("slot_capacity", str(box.slot_capacity)),
box.slot_capacity,
)
except ValueError:
return bad_request("格数必须是大于等于 1 的整数", box.box_type)
if slot_capacity < 1:
return bad_request("格数必须是大于等于 1 的整数", box.box_type)
max_used_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter_by(box_id=box.id)
.scalar()
or 0
)
if max_used_slot > slot_capacity:
return bad_request(
f"格数不能小于已使用位置 {max_used_slot}",
box.box_type,
)
effective_prefix = slot_prefix or BOX_TYPES[box.box_type]["default_prefix"]
conflict, other_box = has_range_conflict(
box_type=box.box_type,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
exclude_box_id=box.id,
)
if conflict:
return bad_request(
"编号范围冲突: 与现有盒子 "
f"[{other_box.name}]"
" 重叠,请更换前缀或起始序号",
box.box_type,
)
generated_name = compose_box_name(
base_name=base_name,
prefix=effective_prefix,
start_number=start_number,
slot_capacity=slot_capacity,
)
box.name = make_unique_box_name(generated_name, exclude_box_id=box.id)
box.description = description or BOX_TYPES[box.box_type]["default_desc"]
box.slot_prefix = effective_prefix
box.start_number = start_number
box.slot_capacity = slot_capacity
db.session.commit()
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
target_type = return_to_type if return_to_type != "all" else box.box_type
return redirect(url_for("type_page", box_type=target_type))
@app.route("/boxes/<int:box_id>/delete", methods=["POST"])
def delete_box(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type == "bag":
return bad_request("袋装清单为固定大容器,不能删除", box.box_type)
enabled_sum = (
db.session.query(db.func.sum(Component.quantity))
.filter_by(box_id=box.id, is_enabled=True)
.scalar()
or 0
)
if enabled_sum:
log_inventory_event(event_type="box_delete", delta=-int(enabled_sum), box=box)
Component.query.filter_by(box_id=box.id).delete()
box_type = box.box_type
db.session.delete(box)
db.session.commit()
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
target_type = return_to_type if return_to_type != "all" else box_type
return redirect(url_for("type_page", box_type=target_type))
@app.route("/boxes/suggest-start")
def suggest_start():
box_type = request.args.get("box_type", "small_28").strip()
if box_type not in BOX_TYPES:
return {"ok": False, "message": "无效盒子类型"}, 400
slot_prefix = request.args.get("slot_prefix", "").strip().upper()
effective_prefix = slot_prefix or BOX_TYPES[box_type]["default_prefix"]
box_id_raw = request.args.get("box_id", "").strip()
exclude_box_id = None
slot_capacity = BOX_TYPES[box_type]["default_capacity"]
if box_type == "custom" and not box_id_raw:
try:
slot_capacity = _parse_non_negative_int(
request.args.get("slot_capacity", str(slot_capacity)),
slot_capacity,
)
except ValueError:
return {"ok": False, "message": "格数必须是大于等于 1 的整数"}, 400
if slot_capacity < 1:
return {"ok": False, "message": "格数必须是大于等于 1 的整数"}, 400
if box_id_raw:
try:
box_id = int(box_id_raw)
except ValueError:
return {"ok": False, "message": "box_id 非法"}, 400
box = Box.query.get(box_id)
if not box:
return {"ok": False, "message": "盒子不存在"}, 404
box_type = box.box_type
slot_capacity = box.slot_capacity
exclude_box_id = box.id
suggested = suggest_next_start_number(
box_type=box_type,
prefix=effective_prefix,
slot_capacity=slot_capacity,
exclude_box_id=exclude_box_id,
)
end_number = suggested + slot_capacity - 1
return {
"ok": True,
"start_number": suggested,
"slot_prefix": effective_prefix,
"preview_range": f"{effective_prefix}{suggested}-{effective_prefix}{end_number}",
}
@app.route("/box/<int:box_id>")
def view_box(box_id: int):
box = Box.query.get_or_404(box_id)
return render_box_page(box)
@app.route("/box/<int:box_id>/labels/export")
def export_box_labels_csv(box_id: int):
box = Box.query.get_or_404(box_id)
rows = (
Component.query.filter_by(box_id=box.id, is_enabled=True)
.order_by(Component.slot_index.asc())
.all()
)
output = StringIO()
writer = csv.writer(output)
writer.writerow(
[
"盒子名称(box_name)",
"位置编号(slot_code)",
"料号(part_no)",
"短标签(short_label)",
"名称(name)",
"品牌(brand)",
"封装(package)",
"用途(usage)",
"立创编号(lcsc_code)",
"立创商品ID(lcsc_product_id)",
"商品编排(arrange)",
"最小包装(min_pack)",
"规格(specification)",
"搜索关键词(search_keywords)",
"数量(quantity)",
"位置备注(location)",
"备注(note)",
]
)
for c in rows:
slot_code = slot_code_for_box(box, c.slot_index)
spec_fields = _parse_slot_spec_fields(c.specification)
note_fields = _parse_note_detail_fields(c.note)
standardization = _build_rule_based_standardization(c.part_no, c.name, c.specification, c.note)
if not note_fields["lcsc_code"]:
note_fields["lcsc_code"] = _extract_lcsc_code_from_text(c.part_no)
writer.writerow(
[
box.name,
slot_code,
c.part_no or "",
standardization["short_label"],
c.name or "",
spec_fields["brand"],
spec_fields["package"],
spec_fields["usage"],
note_fields["lcsc_code"],
note_fields["product_id"],
note_fields["arrange"],
note_fields["min_pack"],
c.specification or "",
", ".join(standardization["keywords"]),
int(c.quantity or 0),
c.location or "",
c.note or "",
]
)
csv_content = "\ufeff" + output.getvalue()
output.close()
filename = f"labels_box_{box.id}.csv"
return Response(
csv_content,
mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@app.route("/box/<int:box_id>/quick-inbound", methods=["POST"])
def quick_inbound(box_id: int):
box = Box.query.get_or_404(box_id)
raw_lines = request.form.get("lines", "")
lines = [line.strip() for line in raw_lines.splitlines() if line.strip()]
if not lines:
return render_box_page(box, error="快速入库失败: 请至少输入一行")
components = Component.query.filter_by(box_id=box.id).all()
occupied_slots = {c.slot_index for c in components}
added_count = 0
skipped_lines = []
changed = False
for line_no, line in enumerate(lines, start=1):
parsed = _parse_bulk_line(line)
part_no = parsed["part_no"]
name = parsed["name"]
quantity_raw = parsed["quantity_raw"]
specification = parsed["specification"]
note = parsed["note"]
if not part_no or not name:
skipped_lines.append(f"{line_no}")
continue
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
skipped_lines.append(f"{line_no}")
continue
part_no_conflict = _find_enabled_part_no_conflict(part_no)
if part_no_conflict:
skipped_lines.append(
f"{line_no} 行({part_no} 已在 {_format_component_position(part_no_conflict)},需人工确认后再合并)"
)
continue
material_conflict = _find_enabled_material_conflict(
name=name,
specification=specification,
exclude_part_no=part_no,
)
if material_conflict:
skipped_lines.append(
""
f"{line_no} 行(检测到同参数物料: {material_conflict.part_no} 已在 "
f"{_format_component_position(material_conflict)},需人工确认后再合并)"
)
continue
slot_index = _next_empty_slot_index(box, occupied_slots)
if slot_index is None:
skipped_lines.append(f"{line_no} 行(盒子已满)")
continue
item = Component(
box_id=box.id,
slot_index=slot_index,
part_no=part_no,
name=name,
quantity=quantity,
specification=specification or None,
note=note or None,
is_enabled=True,
)
db.session.add(item)
if quantity:
log_inventory_event(
event_type="quick_inbound_add",
delta=quantity,
box=box,
component=item,
part_no=part_no,
)
occupied_slots.add(slot_index)
added_count += 1
changed = True
if changed:
db.session.commit()
if added_count == 0:
return render_box_page(
box,
error="快速入库失败: 没有可导入的数据,请检查格式",
)
message = f"快速入库完成: 新增 {added_count}"
if skipped_lines:
message += ";跳过: " + ", ".join(skipped_lines)
return render_box_page(box, notice=message)
@app.route("/box/<int:box_id>/bags/add", methods=["POST"])
def add_bag_item(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type != "bag":
return "当前盒子不是袋装清单", 400
part_no = request.form.get("part_no", "").strip()
name = request.form.get("name", "").strip()
specification = request.form.get("specification", "").strip()
note = request.form.get("note", "").strip()
if not part_no or not name:
return render_box_page(box, error="袋装新增失败: 料号和名称不能为空")
try:
quantity = _parse_non_negative_int(request.form.get("quantity", "0"), 0)
except ValueError:
return render_box_page(box, error="袋装新增失败: 数量必须是大于等于 0 的整数")
part_no_conflict = _find_enabled_part_no_conflict(part_no)
if part_no_conflict:
return render_box_page(
box,
error=(
f"袋装新增失败: 料号 {part_no} 已存在于 "
f"{_format_component_position(part_no_conflict)},需人工确认后再合并"
),
)
material_conflict = _find_enabled_material_conflict(
name=name,
specification=specification,
exclude_part_no=part_no,
)
if material_conflict:
return render_box_page(
box,
error=(
"袋装新增失败: 检测到同参数物料 "
f"{material_conflict.part_no} 已在 "
f"{_format_component_position(material_conflict)},需人工确认后再合并"
),
)
next_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter(Component.box_id == box.id)
.scalar()
or 0
) + 1
item = Component(
box_id=box.id,
slot_index=next_slot,
part_no=part_no,
name=name,
quantity=quantity,
specification=specification or None,
note=note or None,
is_enabled=True,
)
db.session.add(item)
if quantity:
log_inventory_event(
event_type="bag_add",
delta=quantity,
box=box,
component=item,
part_no=part_no,
)
if next_slot > box.slot_capacity:
box.slot_capacity = next_slot
db.session.commit()
return render_box_page(box, notice="已新增 1 条袋装记录")
@app.route("/box/<int:box_id>/bags/batch", methods=["POST"])
def add_bag_items_batch(box_id: int):
box = Box.query.get_or_404(box_id)
if box.box_type != "bag":
return "当前盒子不是袋装清单", 400
raw_lines = request.form.get("lines", "")
lines = [line.strip() for line in raw_lines.splitlines() if line.strip()]
if not lines:
return render_box_page(box, error="批量新增失败: 请至少输入一行")
skipped_lines = []
added_count = 0
next_slot = (
db.session.query(db.func.max(Component.slot_index))
.filter(Component.box_id == box.id)
.scalar()
or 0
) + 1
for line_no, line in enumerate(lines, start=1):
parts = [p.strip() for p in re.split(r"[,\t]", line)]
while len(parts) < 5:
parts.append("")
part_no = parts[0]
name = parts[1]
quantity_raw = parts[2]
specification = parts[3]
note = parts[4]
if not part_no or not name:
skipped_lines.append(f"{line_no} 行(料号或名称为空)")
continue
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
skipped_lines.append(f"{line_no} 行(数量格式错误)")
continue
part_no_conflict = _find_enabled_part_no_conflict(part_no)
if part_no_conflict:
skipped_lines.append(
f"{line_no} 行({part_no} 已在 {_format_component_position(part_no_conflict)},需人工确认后再合并)"
)
continue
material_conflict = _find_enabled_material_conflict(
name=name,
specification=specification,
exclude_part_no=part_no,
)
if material_conflict:
skipped_lines.append(
""
f"{line_no} 行(检测到同参数物料: {material_conflict.part_no} 已在 "
f"{_format_component_position(material_conflict)},需人工确认后再合并)"
)
continue
new_component = Component(
box_id=box.id,
slot_index=next_slot,
part_no=part_no,
name=name,
quantity=quantity,
specification=specification or None,
note=note or None,
is_enabled=True,
)
db.session.add(new_component)
if quantity:
log_inventory_event(
event_type="bag_batch_add",
delta=quantity,
box=box,
component=new_component,
part_no=part_no,
)
added_count += 1
next_slot += 1
if added_count:
box.slot_capacity = max(box.slot_capacity, next_slot - 1)
db.session.commit()
if skipped_lines and added_count == 0:
return render_box_page(
box,
error="批量新增失败: 没有可导入的数据(请检查: " + ", ".join(skipped_lines) + "",
)
if skipped_lines:
return render_box_page(
box,
notice=f"已新增 {added_count} 条,以下行被跳过: " + ", ".join(skipped_lines),
)
return render_box_page(box, notice=f"批量新增成功:新增 {added_count}")
@app.route("/edit/<int:box_id>/<int:slot>", methods=["GET", "POST"])
def edit_component(box_id: int, slot: int):
box = Box.query.get_or_404(box_id)
if slot < 1 or slot > box.slot_capacity:
return "无效的格子编号", 400
search_query = request.args.get("q", "").strip()
notice = request.args.get("notice", "").strip()
error = request.args.get("error", "").strip()
component = Component.query.filter_by(box_id=box.id, slot_index=slot).first()
settings = _get_ai_settings()
lock_storage_mode = bool(settings.get("lock_storage_mode", False))
if request.method == "POST":
action = request.form.get("action", "save")
search_query_post = request.form.get("q", "").strip()
search_query_effective = search_query_post or search_query
delete_confirm_slot = request.form.get("delete_confirm_slot", "").strip().upper()
expected_slot_code = slot_code_for_box(box, slot).strip().upper()
if action == "delete":
if lock_storage_mode:
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error="锁仓模式已开启,禁止删除位置绑定。",
notice=notice,
search_query=search_query_post,
lock_storage_mode=lock_storage_mode,
)
if delete_confirm_slot != expected_slot_code:
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=f"删除确认失败: 请输入当前位置编号 {expected_slot_code}",
notice=notice,
search_query=search_query_post,
lock_storage_mode=lock_storage_mode,
)
if component:
if component.is_enabled and component.quantity:
log_inventory_event(
event_type="component_delete",
delta=-int(component.quantity),
box=box,
component=component,
)
db.session.delete(component)
db.session.commit()
if search_query_effective:
return redirect(url_for("search_page", q=search_query_effective))
return redirect(url_for("view_box", box_id=box.id))
if action == "toggle_enable":
if component:
if not component.is_enabled:
component.is_enabled = True
if component.quantity:
log_inventory_event(
event_type="component_enable",
delta=int(component.quantity),
box=box,
component=component,
)
db.session.commit()
return redirect(url_for("edit_component", box_id=box.id, slot=slot, q=search_query_post))
if action == "toggle_disable":
if component:
if component.is_enabled:
component.is_enabled = False
if component.quantity:
log_inventory_event(
event_type="component_disable",
delta=-int(component.quantity),
box=box,
component=component,
)
db.session.commit()
return redirect(url_for("edit_component", box_id=box.id, slot=slot, q=search_query_post))
part_no = request.form.get("part_no", "").strip()
name = request.form.get("name", "").strip()
specification = request.form.get("specification", "").strip()
quantity_raw = request.form.get("quantity", "0").strip()
note = request.form.get("note", "").strip()
confirm_merge = _is_truthy_form_value(request.form.get("confirm_merge", ""))
confirm_position_change = _is_truthy_form_value(request.form.get("confirm_position_change", ""))
if not part_no or not name:
error = "料号和名称不能为空"
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
notice=notice,
search_query=search_query_post,
lock_storage_mode=lock_storage_mode,
)
try:
quantity = _parse_non_negative_int(quantity_raw, 0)
except ValueError:
error = "数量必须是大于等于 0 的整数"
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
notice=notice,
search_query=search_query_post,
lock_storage_mode=lock_storage_mode,
)
if lock_storage_mode and _is_slot_part_replacement(component, part_no):
error = "锁仓模式已开启,禁止替换当前位置绑定料号。"
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
notice=notice,
search_query=search_query_post,
lock_storage_mode=lock_storage_mode,
)
if _is_slot_part_replacement(component, part_no) and not confirm_position_change:
error = (
"当前位已绑定到固定料号,检测到替换操作。"
"如需变更位置绑定,请勾选“我确认替换当前位物料”后再保存"
)
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
notice=notice,
search_query=search_query_post,
lock_storage_mode=lock_storage_mode,
)
current_component_id = component.id if component is not None else None
part_no_conflict = _find_enabled_part_no_conflict(part_no, exclude_component_id=current_component_id)
material_conflict = _find_enabled_material_conflict(
name=name,
specification=specification,
exclude_component_id=current_component_id,
exclude_part_no=part_no,
)
conflict = part_no_conflict or material_conflict
if conflict and not confirm_merge:
conflict_reason = "同料号" if part_no_conflict else "同参数"
error = (
f"检测到{conflict_reason}物料已存在于 {_format_component_position(conflict)}"
"请勾选“人工确认后合并”再保存"
)
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
notice=notice,
search_query=search_query_post,
lock_storage_mode=lock_storage_mode,
)
if conflict and confirm_merge:
_merge_into_existing_component(
target=conflict,
incoming_part_no=part_no,
incoming_name=name,
incoming_specification=specification,
incoming_note=note,
incoming_quantity=quantity,
source_component=component,
source_box=box,
)
db.session.commit()
target_box = Box.query.get(conflict.box_id)
target_slot = conflict.slot_index
notice_text = (
f"已人工确认合并到 {_format_component_position(conflict)}"
f"累计数量 {conflict.quantity}"
)
if target_box:
return redirect(
url_for(
"edit_component",
box_id=target_box.id,
slot=target_slot,
notice=notice_text,
)
)
return redirect(url_for("view_box", box_id=box.id, notice=notice_text))
old_enabled_qty = 0
if component is not None and component.is_enabled:
old_enabled_qty = int(component.quantity)
if component is None:
component = Component(box_id=box.id, slot_index=slot)
db.session.add(component)
component.part_no = part_no
component.name = name
component.specification = specification or None
component.quantity = quantity
component.note = note or None
if component.is_enabled is None:
component.is_enabled = True
new_enabled_qty = quantity if component.is_enabled else 0
delta = int(new_enabled_qty - old_enabled_qty)
if delta:
log_inventory_event(
event_type="component_save",
delta=delta,
box=box,
component=component,
part_no=part_no,
)
db.session.commit()
if search_query_effective:
return redirect(url_for("search_page", q=search_query_effective))
return redirect(url_for("view_box", box_id=box.id))
return render_template(
"edit.html",
box=box,
slot=slot,
slot_code=slot_code_for_box(box, slot),
component=component,
error=error,
notice=notice,
search_query=search_query,
lock_storage_mode=lock_storage_mode,
)
@app.route("/edit/<int:box_id>/<int:slot>/lcsc-import", methods=["POST"])
def lcsc_import_to_edit_slot(box_id: int, slot: int):
box = Box.query.get_or_404(box_id)
if slot < 1 or slot > box.slot_capacity:
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="目标位置超出当前容器范围"))
try:
quantity = _parse_non_negative_int(request.form.get("quantity", "0"), 0)
except ValueError:
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="数量必须是大于等于 0 的整数"))
settings = _get_ai_settings()
product_identifier = request.form.get("lcsc_product_id", "").strip()
try:
product = _fetch_lcsc_product_basic(product_identifier, settings)
except RuntimeError as exc:
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error=f"立创导入失败: {exc}"))
mapped = _map_lcsc_product_to_component(product)
if not mapped["part_no"] or not mapped["name"]:
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="立创导入失败: 商品信息缺少料号或名称"))
component = Component.query.filter_by(box_id=box.id, slot_index=slot).first()
confirm_merge = _is_truthy_form_value(request.form.get("confirm_merge", ""))
confirm_position_change = _is_truthy_form_value(request.form.get("confirm_position_change", ""))
if bool(settings.get("lock_storage_mode", False)) and _is_slot_part_replacement(component, mapped["part_no"]):
return redirect(
url_for(
"edit_component",
box_id=box.id,
slot=slot,
error="立创导入失败: 锁仓模式已开启,禁止替换当前位置绑定料号。",
)
)
if _is_slot_part_replacement(component, mapped["part_no"]) and not confirm_position_change:
return redirect(
url_for(
"edit_component",
box_id=box.id,
slot=slot,
error=(
"立创导入失败: 当前位已绑定固定料号。"
"如需替换,请勾选“我确认替换当前位物料”后再导入"
),
)
)
current_component_id = component.id if component is not None else None
part_no_conflict = _find_enabled_part_no_conflict(
mapped["part_no"],
exclude_component_id=current_component_id,
)
material_conflict = _find_enabled_material_conflict(
name=mapped["name"],
specification=mapped["specification"],
exclude_component_id=current_component_id,
exclude_part_no=mapped["part_no"],
)
conflict = part_no_conflict or material_conflict
if conflict and not confirm_merge:
conflict_reason = "同料号" if part_no_conflict else "同参数"
return redirect(
url_for(
"edit_component",
box_id=box.id,
slot=slot,
error=(
f"立创导入失败: 检测到{conflict_reason}物料已存在于 "
f"{_format_component_position(conflict)},请勾选人工确认后合并"
),
)
)
if conflict and confirm_merge:
_merge_into_existing_component(
target=conflict,
incoming_part_no=mapped["part_no"],
incoming_name=mapped["name"],
incoming_specification=mapped["specification"],
incoming_note=mapped["note"],
incoming_quantity=quantity,
source_component=component,
source_box=box,
)
db.session.commit()
target_box = Box.query.get(conflict.box_id)
target_slot = conflict.slot_index
notice_text = (
f"已人工确认合并到 {_format_component_position(conflict)}"
f"累计数量 {conflict.quantity}"
)
if target_box:
return redirect(
url_for(
"edit_component",
box_id=target_box.id,
slot=target_slot,
notice=notice_text,
)
)
return redirect(url_for("view_box", box_id=box.id, notice=notice_text))
old_enabled_qty = int(component.quantity or 0) if component and component.is_enabled else 0
if component is None:
component = Component(box_id=box.id, slot_index=slot)
db.session.add(component)
component.part_no = mapped["part_no"]
component.name = mapped["name"]
component.specification = mapped["specification"] or None
component.note = mapped["note"] or None
component.quantity = quantity
component.is_enabled = True
delta = int(component.quantity or 0) - old_enabled_qty
if delta:
log_inventory_event(
event_type="component_save",
delta=delta,
box=box,
component=component,
part_no=component.part_no,
)
db.session.commit()
slot_code = slot_code_for_box(box, slot)
return redirect(
url_for(
"edit_component",
box_id=box.id,
slot=slot,
notice=f"立创导入成功: {mapped['part_no']} 已写入 {slot_code}",
)
)
@app.route("/search")
def search_page():
keyword = request.args.get("q", "").strip()
fuzziness = _parse_search_fuzziness(request.args.get("fuzziness", "balanced"))
notice = request.args.get("notice", "").strip()
error = request.args.get("error", "").strip()
results = []
search_plan = None
search_parse_notice = ""
search_trace = None
if keyword:
settings = _get_ai_settings()
search_plan, search_parse_notice, search_trace = _build_search_plan(keyword, settings)
if search_trace is None:
search_trace = {}
search_trace["fuzziness"] = fuzziness
search_trace["fuzziness_label"] = SEARCH_FUZZY_PROFILES.get(fuzziness, SEARCH_FUZZY_PROFILES["balanced"])["label"]
enabled_components = Component.query.filter_by(is_enabled=True).order_by(Component.part_no.asc(), Component.name.asc()).all()
box_by_id = {box.id: box for box in Box.query.all()}
matched_rows = []
for c in enabled_components:
match_info = _search_component_match_info(c, search_plan, fuzziness=fuzziness)
if not match_info["is_match"]:
continue
box = box_by_id.get(c.box_id)
matched_rows.append(
{
"component": c,
"box_name": box.name if box else f"{c.box_id}",
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
"edit_url": url_for("edit_component", box_id=c.box_id, slot=c.slot_index, q=keyword),
"match_summary": " / ".join(
{
"part_no": "料号",
"name": "名称",
"specification": "规格",
"note": "备注",
}.get(field, field)
for field in match_info["matched_fields"]
)
or "全文匹配",
"matched_terms": match_info["matched_terms"],
"match_score": match_info["score"],
"fuzzy_matches": match_info.get("fuzzy_matches", []),
}
)
results = sorted(matched_rows, key=lambda row: (-row["match_score"], row["component"].part_no or "", row["component"].name or ""))
return render_template(
"search.html",
keyword=keyword,
fuzziness=fuzziness,
fuzziness_profiles=SEARCH_FUZZY_PROFILES,
results=results,
search_plan=search_plan,
search_trace=search_trace,
search_parse_notice=search_parse_notice,
notice=notice,
error=error,
)
@app.route("/ai/component-standardize", methods=["POST"])
def ai_component_standardize():
"""生成元件标签与备注标准化建议。
中文说明:该接口只返回建议,不会直接写库;用户在编辑页确认后再把建议回填到表单,
这样可以兼顾 AI 提效和人工把关。
"""
part_no = request.form.get("part_no", "").strip()
name = request.form.get("name", "").strip()
specification = request.form.get("specification", "").strip()
note = request.form.get("note", "").strip()
if not part_no and not name:
return {"ok": False, "message": "至少需要填写料号或名称后再生成标准化建议"}, 400
settings = _get_ai_settings()
suggestion, parse_notice = _build_component_standardization_suggestion(
part_no,
name,
specification,
note,
settings,
)
return {
"ok": True,
"suggestion": suggestion,
"parse_notice": parse_notice,
}
@app.route("/ai/inbound-parse", methods=["POST"])
def ai_inbound_parse():
"""AI 入库预处理接口。
中文说明:
1. 输入原始多行文本;
2. 返回结构化行数据 + 脏数据识别;
3. 仅提供“预处理与预览”,最终入库仍由用户手动确认提交。
"""
raw_text = request.form.get("lines", "")
mode = (request.form.get("mode", "box") or "box").strip().lower()
use_web_search = _is_truthy_form_value(request.form.get("use_web_search", ""))
if mode not in {"box", "bag"}:
mode = "box"
lines = [line.strip() for line in (raw_text or "").splitlines() if line.strip()]
if not lines:
return {"ok": False, "message": "请至少输入一行待处理文本"}, 400
fallback_rows = _parse_inbound_preview_rows(lines)
settings = _get_ai_settings()
rows, parse_notice, web_context = _ai_enhance_inbound_preview(
lines,
mode,
fallback_rows,
settings,
use_web_search=use_web_search,
)
valid_rows = [row for row in rows if row.get("is_valid")]
invalid_rows = [row for row in rows if not row.get("is_valid")]
normalized_lines = "\n".join([row.get("normalized_line", "") for row in valid_rows if row.get("normalized_line")])
return {
"ok": True,
"mode": mode,
"use_web_search": use_web_search,
"total_lines": len(rows),
"valid_count": len(valid_rows),
"invalid_count": len(invalid_rows),
"parse_notice": parse_notice,
"web_context": web_context,
"rows": rows,
"normalized_lines": normalized_lines,
}
@app.route("/ai/restock-plan", methods=["POST"])
def ai_restock_plan():
"""生成 AI 补货建议。
中文说明:优先调用 AI 输出 JSON 结构化补货方案;如果 AI 调用失败或返回格式异常,
会自动退回到规则生成的兜底方案,保证页面始终有结果可展示。
"""
ai_settings = _get_ai_settings()
data = _build_restock_payload(
limit=ai_settings["restock_limit"],
threshold=ai_settings["restock_threshold"],
)
def _empty_plan(summary: str) -> dict:
return {
"summary": summary,
"urgent": [],
"this_week": [],
"defer": [],
}
def _normalize_item(raw_item: dict) -> dict:
if not isinstance(raw_item, dict):
return {
"part_no": "-",
"name": "未命名元件",
"suggest_qty": "待确认",
"reason": "AI 返回项格式异常",
}
return {
"part_no": str(raw_item.get("part_no", "-") or "-").strip() or "-",
"name": str(raw_item.get("name", "未命名元件") or "未命名元件").strip(),
"suggest_qty": str(raw_item.get("suggest_qty", "待确认") or "待确认").strip(),
"reason": str(raw_item.get("reason", "") or "").strip() or "",
}
def _normalize_plan(raw_plan: dict, default_summary: str) -> dict:
if not isinstance(raw_plan, dict):
return _empty_plan(default_summary)
summary = str(raw_plan.get("summary", "") or "").strip() or default_summary
def to_items(key: str):
rows = raw_plan.get(key, [])
if not isinstance(rows, list):
return []
return [_normalize_item(row) for row in rows]
return {
"summary": summary,
"urgent": to_items("urgent"),
"this_week": to_items("this_week"),
"defer": to_items("defer"),
}
def _extract_json_block(raw_text: str) -> str:
text = (raw_text or "").strip()
if not text:
return ""
if text.startswith("```"):
text = re.sub(r"^```(?:json)?\\s*", "", text)
text = re.sub(r"\\s*```$", "", text)
first = text.find("{")
last = text.rfind("}")
if first >= 0 and last > first:
return text[first : last + 1]
return text
def _build_rule_based_plan() -> dict:
threshold = int(data.get("threshold", LOW_STOCK_THRESHOLD))
urgent = []
this_week = []
for idx, item in enumerate(data.get("low_stock_items", [])):
qty = int(item.get("quantity", 0) or 0)
suggest_qty = max(threshold * 2 - qty, 1)
row = {
"part_no": item.get("part_no", "-") or "-",
"name": item.get("name", "未命名元件") or "未命名元件",
"suggest_qty": str(suggest_qty),
"reason": f"当前库存 {qty},低于阈值 {threshold}",
}
if idx < 5:
urgent.append(row)
else:
this_week.append(row)
return {
"summary": "已按规则生成兜底补货建议AI 输出异常时使用)",
"urgent": urgent,
"this_week": this_week,
"defer": [],
}
if not data["low_stock_items"]:
plan = _empty_plan("当前没有低库存元件,暂不需要补货。")
return {
"ok": True,
"suggestion": "当前没有低库存元件,暂不需要补货。",
"plan": plan,
"data": data,
}
system_prompt = (
"你是电子元器件库存助手。"
"必须只输出 JSON不要 Markdown不要解释文字。"
"输出结构必须是: "
"{\"summary\":string,\"urgent\":[item],\"this_week\":[item],\"defer\":[item]}。"
"item 结构: {\"part_no\":string,\"name\":string,\"suggest_qty\":string,\"reason\":string}。"
"各数组允许为空。"
)
user_prompt = "库存数据如下(JSON):\n" + json.dumps(data, ensure_ascii=False)
try:
suggestion = _call_siliconflow_chat(
system_prompt,
user_prompt,
api_url=ai_settings["api_url"],
model=ai_settings["model"],
api_key=ai_settings["api_key"],
timeout=ai_settings["timeout"],
)
except RuntimeError as exc:
fallback_plan = _build_rule_based_plan()
return {
"ok": False,
"message": str(exc),
"plan": fallback_plan,
"data": data,
}, 400
parse_warning = ""
try:
parsed_plan = json.loads(_extract_json_block(suggestion))
plan = _normalize_plan(parsed_plan, "已生成 AI 补货建议")
except json.JSONDecodeError:
plan = _build_rule_based_plan()
parse_warning = "AI 返回格式异常,已切换到规则兜底建议。"
return {
"ok": True,
"suggestion": suggestion,
"plan": plan,
"parse_warning": parse_warning,
"data": data,
}
@app.route("/ai/duplicate-audit", methods=["POST"])
def ai_duplicate_audit():
"""AI 重复物料巡检接口。
中文说明:
1. 巡检同料号、同参数、同立创编号三类疑似重复;
2. 输出重复原因与标准化建议;
3. 不自动合并,仅提供人工核对清单。
"""
payload = _build_duplicate_audit_payload(limit_groups=60)
settings = _get_ai_settings()
summary = ""
parse_warning = ""
if payload.get("total_groups", 0) > 0:
summary, parse_warning = _build_duplicate_audit_summary_with_ai(payload, settings)
if not summary:
part_no_count = len(payload.get("part_no_groups", []))
material_count = len(payload.get("material_groups", []))
lcsc_count = len(payload.get("lcsc_groups", []))
summary = (
"巡检完成: "
f"同料号 {part_no_count} 组,"
f"同参数 {material_count} 组,"
f"同立创编号 {lcsc_count} 组。"
"请优先处理成员数较多的分组。"
)
return {
"ok": True,
"summary": summary,
"parse_warning": parse_warning,
"data": payload,
}
@app.route("/ai/duplicate-audit/export")
def export_duplicate_audit_csv():
"""导出重复物料巡检结果 CSV。"""
limit_raw = (request.args.get("limit", "200") or "200").strip()
try:
limit_groups = int(limit_raw)
except ValueError:
limit_groups = 200
limit_groups = min(max(limit_groups, 1), 1000)
payload = _build_duplicate_audit_payload(limit_groups=limit_groups)
groups = payload.get("groups", [])
selected_group_ids = {
(group_id or "").strip()
for group_id in request.args.getlist("group_id")
if (group_id or "").strip()
}
if selected_group_ids:
groups = [
group
for group in groups
if f"{group.get('type', '')}::{group.get('key', '')}" in selected_group_ids
]
output = StringIO()
writer = csv.writer(output)
writer.writerow(
[
"巡检时间(generated_at)",
"重复类型(type)",
"重复原因(reason)",
"分组标识(key)",
"分组成员数(member_count)",
"建议(suggestion)",
"标准名称建议(standard_name)",
"标准规格建议(standard_specification)",
"元件ID(component_id)",
"料号(part_no)",
"名称(name)",
"规格(specification)",
"数量(quantity)",
"立创编号(lcsc_code)",
"盒子名称(box_name)",
"位置编号(slot_code)",
"编辑链接(edit_url)",
]
)
generated_at = payload.get("generated_at", "")
for group in groups:
members = group.get("members", []) or [None]
for member in members:
if member is None:
writer.writerow(
[
generated_at,
group.get("type", ""),
group.get("reason", ""),
group.get("key", ""),
group.get("member_count", 0),
group.get("suggestion", ""),
group.get("standard_name", ""),
group.get("standard_specification", ""),
"",
"",
"",
"",
"",
"",
"",
"",
"",
]
)
continue
writer.writerow(
[
generated_at,
group.get("type", ""),
group.get("reason", ""),
group.get("key", ""),
group.get("member_count", 0),
group.get("suggestion", ""),
group.get("standard_name", ""),
group.get("standard_specification", ""),
member.get("component_id", ""),
member.get("part_no", ""),
member.get("name", ""),
member.get("specification", ""),
member.get("quantity", 0),
member.get("lcsc_code", ""),
member.get("box_name", ""),
member.get("slot_code", ""),
member.get("edit_url", ""),
]
)
csv_content = "\ufeff" + output.getvalue()
output.close()
scope = "selected" if selected_group_ids else "all"
filename = f"duplicate_audit_{scope}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
return Response(
csv_content,
mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@app.route("/ai/settings", methods=["GET", "POST"])
def ai_settings_page():
settings = _get_ai_settings()
error = ""
notice = request.args.get("notice", "").strip()
if request.method == "POST":
api_url = request.form.get("api_url", "").strip()
model = request.form.get("model", "").strip()
api_key = request.form.get("api_key", "").strip()
lcsc_app_id = request.form.get("lcsc_app_id", "").strip()
lcsc_access_key = request.form.get("lcsc_access_key", "").strip()
lcsc_secret_key = request.form.get("lcsc_secret_key", "").strip()
lock_storage_mode = _is_truthy_form_value(request.form.get("lock_storage_mode", ""))
try:
timeout = int((request.form.get("timeout", "30") or "30").strip())
if timeout < 5:
raise ValueError
except ValueError:
error = "超时时间必须是大于等于 5 的整数"
timeout = settings["timeout"]
try:
restock_threshold = int((request.form.get("restock_threshold", "5") or "5").strip())
if restock_threshold < 0:
raise ValueError
except ValueError:
if not error:
error = "低库存阈值必须是大于等于 0 的整数"
restock_threshold = settings["restock_threshold"]
try:
restock_limit = int((request.form.get("restock_limit", "24") or "24").strip())
if restock_limit < 1:
raise ValueError
except ValueError:
if not error:
error = "补货条目数必须是大于等于 1 的整数"
restock_limit = settings["restock_limit"]
try:
lcsc_timeout = int((request.form.get("lcsc_timeout", "20") or "20").strip())
if lcsc_timeout < 5:
raise ValueError
except ValueError:
if not error:
error = "立创接口超时时间必须是大于等于 5 的整数"
lcsc_timeout = settings["lcsc_timeout"]
if not api_url and not error:
error = "API URL 不能为空"
if not model and not error:
error = "模型名称不能为空"
if (not lcsc_app_id or not lcsc_access_key or not lcsc_secret_key) and not error:
error = "立创接口需要填写 app_id、access_key、secret_key"
if not error:
settings = {
"api_url": api_url,
"model": model,
"api_key": api_key,
"timeout": timeout,
"restock_threshold": restock_threshold,
"restock_limit": restock_limit,
"lcsc_base_url": LCSC_BASE_URL,
"lcsc_basic_path": LCSC_BASIC_PATH,
"lcsc_timeout": lcsc_timeout,
"lcsc_app_id": lcsc_app_id,
"lcsc_access_key": lcsc_access_key,
"lcsc_secret_key": lcsc_secret_key,
"lock_storage_mode": lock_storage_mode,
}
_save_ai_settings(settings)
return redirect(url_for("ai_settings_page", notice="AI参数已保存"))
settings.update(
{
"api_url": api_url,
"model": model,
"api_key": api_key,
"timeout": timeout,
"restock_threshold": restock_threshold,
"restock_limit": restock_limit,
"lcsc_base_url": LCSC_BASE_URL,
"lcsc_basic_path": LCSC_BASIC_PATH,
"lcsc_timeout": lcsc_timeout,
"lcsc_app_id": lcsc_app_id,
"lcsc_access_key": lcsc_access_key,
"lcsc_secret_key": lcsc_secret_key,
"lock_storage_mode": lock_storage_mode,
}
)
return render_template(
"ai_settings.html",
settings=settings,
notice=notice,
error=error,
)
@app.route("/component/<int:component_id>/outbound", methods=["POST"])
def quick_outbound(component_id: int):
component = Component.query.get_or_404(component_id)
keyword = request.form.get("q", "").strip()
fuzziness = _parse_search_fuzziness(request.form.get("fuzziness", "balanced"))
try:
amount = _parse_non_negative_int(request.form.get("amount", "0"), 0)
except ValueError:
return redirect(url_for("search_page", q=keyword, fuzziness=fuzziness, error="出库数量必须是大于等于 0 的整数"))
if amount <= 0:
return redirect(url_for("search_page", q=keyword, fuzziness=fuzziness, error="出库数量必须大于 0"))
if not component.is_enabled:
return redirect(url_for("search_page", q=keyword, fuzziness=fuzziness, error="该元件已停用,不能出库"))
if amount > int(component.quantity or 0):
return redirect(url_for("search_page", q=keyword, fuzziness=fuzziness, error="出库数量超过当前库存"))
component.quantity = int(component.quantity or 0) - amount
box = Box.query.get(component.box_id)
log_inventory_event(
event_type="component_outbound",
delta=-amount,
box=box,
component=component,
part_no=component.part_no,
)
db.session.commit()
slot_code = slot_code_for_box(box, component.slot_index) if box else str(component.slot_index)
notice = f"出库成功: {component.part_no} -{amount}{slot_code}"
return redirect(url_for("search_page", q=keyword, fuzziness=fuzziness, notice=notice))
@app.route("/stats")
def stats_page():
"""统计页。
中文说明:这里会按筛选条件汇总当前库存、低库存数量、操作次数、趋势线和分类排行,
属于展示层用到的集中统计入口。
"""
days = parse_days_value(request.args.get("days", "7"))
box_type_filter = parse_box_type_filter(request.args.get("box_type", "all"))
notice = request.args.get("notice", "").strip()
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
components = Component.query.all()
all_enabled_components = [c for c in components if c.is_enabled]
overall_total_quantity = sum(c.quantity for c in all_enabled_components)
enabled_components = [
c
for c in components
if c.is_enabled and (
box_type_filter == "all"
or (box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type_filter)
)
]
total_quantity = sum(c.quantity for c in enabled_components)
total_items = len(enabled_components)
low_stock_count = len([c for c in enabled_components if c.quantity < LOW_STOCK_THRESHOLD])
inventory_share = (
round(total_quantity * 100 / overall_total_quantity, 1) if overall_total_quantity > 0 else 0.0
)
start_day = datetime.now().date() - timedelta(days=days - 1)
event_query = InventoryEvent.query.filter(
InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time())
)
if box_type_filter != "all":
event_query = event_query.filter_by(box_type=box_type_filter)
period_operation_count = event_query.count()
active_days = len(
{
_to_date(row[0])
for row in event_query.with_entities(db.func.date(InventoryEvent.created_at)).all()
if row and row[0]
}
)
totals_by_type = {}
for box_type in BOX_TYPES.keys():
totals_by_type[box_type] = sum(
c.quantity
for c in components
if c.is_enabled and box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type
)
category_stats = []
for key, meta in BOX_TYPES.items():
category_components = [
c
for c in enabled_components
if box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == key
]
category_stats.append(
{
"key": key,
"label": meta["label"],
"item_count": len(category_components),
"quantity": sum(c.quantity for c in category_components),
}
)
category_stats.sort(key=lambda row: row["quantity"], reverse=True)
max_category_quantity = max([row["quantity"] for row in category_stats], default=0)
chart_mode = "category"
chart_title = "分类占比"
chart_rows = category_stats
max_chart_quantity = max_category_quantity
if box_type_filter != "all":
chart_mode = "component"
chart_title = "分类内元件库存 Top"
bucket = {}
for c in enabled_components:
key = (c.part_no or "", c.name or "")
if key not in bucket:
bucket[key] = 0
bucket[key] += int(c.quantity or 0)
component_rows = []
for (part_no, name), quantity in bucket.items():
label = name or part_no or "未命名元件"
if part_no:
label = f"{label} ({part_no})"
component_rows.append({"label": label, "quantity": quantity, "item_count": 1})
component_rows.sort(key=lambda row: row["quantity"], reverse=True)
chart_rows = component_rows[:8]
max_chart_quantity = max([row["quantity"] for row in chart_rows], default=0)
trend_points = build_trend_points_from_events(
days=days,
total_quantity=total_quantity,
box_type_filter=box_type_filter,
)
period_net_change = 0
if len(trend_points) >= 2:
period_net_change = trend_points[-1]["value"] - trend_points[0]["value"]
min_value = min([point["value"] for point in trend_points], default=0)
max_value = max([point["value"] for point in trend_points], default=0)
value_span = max(max_value - min_value, 1)
svg_points = []
if trend_points:
width = 520
height = 180
step_x = width / max(len(trend_points) - 1, 1)
for idx, point in enumerate(trend_points):
x = idx * step_x
y = height - ((point["value"] - min_value) / value_span) * height
svg_points.append(f"{x:.2f},{y:.2f}")
box_type_series_raw = build_box_type_trend_series(days=days, totals_by_type=totals_by_type)
box_type_series = []
for key, meta in BOX_TYPES.items():
values = box_type_series_raw["series"].get(key, [])
delta = values[-1] - values[0] if len(values) >= 2 else 0
box_type_series.append(
{
"key": key,
"label": meta["label"],
"sparkline": make_sparkline(values),
"latest": values[-1] if values else 0,
"delta": delta,
}
)
activity_rows = recent_events(limit=20, box_type_filter=box_type_filter)
return render_template(
"stats.html",
notice=notice,
days=days,
box_type_filter=box_type_filter,
box_types=BOX_TYPES,
total_quantity=total_quantity,
total_items=total_items,
low_stock_count=low_stock_count,
category_stats=category_stats,
max_category_quantity=max_category_quantity,
chart_mode=chart_mode,
chart_title=chart_title,
chart_rows=chart_rows,
max_chart_quantity=max_chart_quantity,
trend_points=trend_points,
trend_polyline=" ".join(svg_points),
min_value=min_value,
max_value=max_value,
period_net_change=period_net_change,
overall_total_quantity=overall_total_quantity,
inventory_share=inventory_share,
period_operation_count=period_operation_count,
active_days=active_days,
box_type_series=box_type_series,
activity_rows=activity_rows,
)
@app.route("/stats/export")
def stats_export_csv():
days = parse_days_value(request.args.get("days", "7"))
box_type_filter = parse_box_type_filter(request.args.get("box_type", "all"))
boxes = Box.query.all()
box_by_id = {box.id: box for box in boxes}
components = Component.query.all()
enabled_components = [
c
for c in components
if c.is_enabled and (
box_type_filter == "all"
or (box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type_filter)
)
]
total_quantity = sum(c.quantity for c in enabled_components)
trend_points = build_trend_points_from_events(days, total_quantity, box_type_filter)
delta_by_day = query_event_daily_delta(days, box_type_filter)
output = StringIO()
writer = csv.writer(output)
writer.writerow(["date", "inventory_total", "daily_delta", "days", "box_type_filter"])
for point in trend_points:
writer.writerow(
[
point["date"].isoformat(),
point["value"],
int(delta_by_day.get(point["date"], 0)),
days,
box_type_filter,
]
)
csv_content = output.getvalue()
output.close()
filename = f"inventory_stats_{box_type_filter}_{days}d.csv"
return Response(
csv_content,
mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)
@app.route("/stats/clear", methods=["POST"])
def clear_stats_logs():
days = parse_days_value(request.form.get("days", "7"))
box_type_filter = parse_box_type_filter(request.form.get("box_type", "all"))
clear_scope = (request.form.get("scope", "current") or "current").strip()
if clear_scope == "all":
deleted_count = InventoryEvent.query.delete()
db.session.commit()
notice = f"已清除全部统计日志,共 {deleted_count}"
return redirect(url_for("stats_page", days=days, box_type="all", notice=notice))
query = InventoryEvent.query
if box_type_filter != "all":
query = query.filter_by(box_type=box_type_filter)
deleted_count = query.delete(synchronize_session=False)
db.session.commit()
if box_type_filter == "all":
notice = f"已清除当前筛选(全部分类)统计日志,共 {deleted_count}"
else:
label = BOX_TYPES.get(box_type_filter, {}).get("label", box_type_filter)
notice = f"已清除当前筛选({label})统计日志,共 {deleted_count}"
return redirect(url_for("stats_page", days=days, box_type=box_type_filter, notice=notice))
def bootstrap() -> None:
"""应用启动时初始化数据库。
中文说明:启动顺序是“建表 -> 补字段 -> 修历史数据”,这样新旧数据库都能正常启动。
"""
with app.app_context():
db.create_all()
ensure_schema()
normalize_legacy_data()
bootstrap()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)