3016 lines
102 KiB
Python
3016 lines
102 KiB
Python
"""库存管理 Flask 应用主文件。
|
||
|
||
中文说明:
|
||
1. 这个文件同时承担了配置加载、数据库模型、数据修复、页面路由、统计计算、AI 补货建议等职责。
|
||
2. 为了便于你后续阅读和维护,关键函数上方会保留中文解释,说明“这个函数做什么”以及“为什么这样做”。
|
||
3. 这些中文解释属于代码可读性的一部分,不应在后续维护中随意删除。
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import csv
|
||
import json
|
||
import hmac
|
||
import base64
|
||
import random
|
||
import string
|
||
import hashlib
|
||
import time
|
||
import urllib.error
|
||
import urllib.parse
|
||
import urllib.request
|
||
from copy import deepcopy
|
||
from io import StringIO
|
||
from datetime import datetime, timedelta
|
||
|
||
from flask import Flask, Response, redirect, render_template, request, url_for
|
||
from flask_sqlalchemy import SQLAlchemy
|
||
|
||
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
DB_DIR = os.path.join(BASE_DIR, "data")
|
||
os.makedirs(DB_DIR, exist_ok=True)
|
||
DB_PATH = os.path.join(DB_DIR, "inventory.db")
|
||
|
||
# Flask 和 SQLAlchemy 基础初始化。
|
||
app = Flask(__name__)
|
||
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{DB_PATH}"
|
||
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
||
db = SQLAlchemy(app)
|
||
|
||
# 这里集中放全局常量,避免后面函数里散落硬编码。
|
||
LOW_STOCK_THRESHOLD = 5
|
||
BOX_TYPES_OVERRIDE_PATH = os.path.join(DB_DIR, "box_types.json")
|
||
AI_SETTINGS_PATH = os.path.join(DB_DIR, "ai_settings.json")
|
||
LCSC_BASE_URL = "https://open-api.jlc.com"
|
||
LCSC_BASIC_PATH = "/lcsc/openapi/sku/product/basic"
|
||
AI_SETTINGS_DEFAULT = {
|
||
"api_url": os.environ.get(
|
||
"SILICONFLOW_API_URL",
|
||
"https://api.siliconflow.cn/v1/chat/completions",
|
||
),
|
||
"model": os.environ.get(
|
||
"SILICONFLOW_MODEL",
|
||
"Qwen/Qwen2.5-7B-Instruct",
|
||
),
|
||
"api_key": os.environ.get("SILICONFLOW_API_KEY", ""),
|
||
"timeout": int(os.environ.get("SILICONFLOW_TIMEOUT", "30") or "30"),
|
||
"restock_threshold": LOW_STOCK_THRESHOLD,
|
||
"restock_limit": 24,
|
||
"lcsc_timeout": int(os.environ.get("LCSC_TIMEOUT", "20") or "20"),
|
||
"lcsc_app_id": os.environ.get("LCSC_APP_ID", ""),
|
||
"lcsc_access_key": os.environ.get("LCSC_ACCESS_KEY", ""),
|
||
"lcsc_secret_key": os.environ.get("LCSC_SECRET_KEY", ""),
|
||
"lock_storage_mode": False,
|
||
}
|
||
|
||
|
||
DEFAULT_BOX_TYPES = {
|
||
"small_28": {
|
||
"label": "28格小盒大盒",
|
||
"default_capacity": 28,
|
||
"default_desc": "4连排小盒,常见摆放为竖向7排",
|
||
"default_prefix": "A",
|
||
},
|
||
"medium_14": {
|
||
"label": "14格中盒大盒",
|
||
"default_capacity": 14,
|
||
"default_desc": "14格中盒,内部摆放方向与28格不同",
|
||
"default_prefix": "B",
|
||
},
|
||
"custom": {
|
||
"label": "自定义容器",
|
||
"default_capacity": 20,
|
||
"default_desc": "可按实际盒型设置格数与编号前缀",
|
||
"default_prefix": "C",
|
||
},
|
||
"bag": {
|
||
"label": "袋装清单",
|
||
"default_capacity": 28,
|
||
"default_desc": "一袋一种器件,按清单管理",
|
||
"default_prefix": "BAG",
|
||
},
|
||
}
|
||
|
||
BOX_TYPES = deepcopy(DEFAULT_BOX_TYPES)
|
||
|
||
|
||
def _apply_box_type_overrides() -> None:
|
||
"""加载盒型覆盖配置。
|
||
|
||
中文说明:默认盒型写在代码里;如果用户在页面上修改了名称、描述、前缀,
|
||
会写入 data/box_types.json,这里负责把这些覆盖项合并回运行时配置。
|
||
"""
|
||
if not os.path.exists(BOX_TYPES_OVERRIDE_PATH):
|
||
return
|
||
|
||
try:
|
||
with open(BOX_TYPES_OVERRIDE_PATH, "r", encoding="utf-8") as f:
|
||
overrides = json.load(f)
|
||
except (OSError, json.JSONDecodeError):
|
||
return
|
||
|
||
if not isinstance(overrides, dict):
|
||
return
|
||
|
||
for key, value in overrides.items():
|
||
if key not in BOX_TYPES or not isinstance(value, dict):
|
||
continue
|
||
|
||
for field in ("label", "default_desc", "default_prefix"):
|
||
if field not in value:
|
||
continue
|
||
BOX_TYPES[key][field] = value[field]
|
||
|
||
# Keep bag container capacity fixed by domain rule.
|
||
BOX_TYPES["bag"]["default_capacity"] = 28
|
||
|
||
|
||
def _save_box_type_overrides() -> None:
|
||
payload = {}
|
||
for key, defaults in DEFAULT_BOX_TYPES.items():
|
||
current = BOX_TYPES.get(key, defaults)
|
||
changed = {}
|
||
for field in ("label", "default_desc", "default_prefix"):
|
||
if current.get(field) != defaults.get(field):
|
||
changed[field] = current.get(field)
|
||
if changed:
|
||
payload[key] = changed
|
||
|
||
with open(BOX_TYPES_OVERRIDE_PATH, "w", encoding="utf-8") as f:
|
||
json.dump(payload, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
_apply_box_type_overrides()
|
||
|
||
|
||
def _load_ai_settings() -> dict:
|
||
settings = dict(AI_SETTINGS_DEFAULT)
|
||
if not os.path.exists(AI_SETTINGS_PATH):
|
||
return settings
|
||
|
||
try:
|
||
with open(AI_SETTINGS_PATH, "r", encoding="utf-8") as f:
|
||
saved = json.load(f)
|
||
except (OSError, json.JSONDecodeError):
|
||
return settings
|
||
|
||
if not isinstance(saved, dict):
|
||
return settings
|
||
|
||
for key in settings.keys():
|
||
if key in saved:
|
||
settings[key] = saved[key]
|
||
|
||
return settings
|
||
|
||
|
||
def _save_ai_settings(settings: dict) -> None:
|
||
with open(AI_SETTINGS_PATH, "w", encoding="utf-8") as f:
|
||
json.dump(settings, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def _get_ai_settings() -> dict:
|
||
"""读取并清洗 AI / 立创接口配置。
|
||
|
||
中文说明:这个函数不只是“读文件”,还会把超时、阈值、布尔值这些字段
|
||
统一修正成安全可用的格式,避免后面的业务逻辑反复判空和转类型。
|
||
"""
|
||
settings = _load_ai_settings()
|
||
|
||
try:
|
||
settings["timeout"] = max(5, int(settings.get("timeout", 30)))
|
||
except (TypeError, ValueError):
|
||
settings["timeout"] = 30
|
||
|
||
try:
|
||
settings["restock_threshold"] = max(0, int(settings.get("restock_threshold", LOW_STOCK_THRESHOLD)))
|
||
except (TypeError, ValueError):
|
||
settings["restock_threshold"] = LOW_STOCK_THRESHOLD
|
||
|
||
try:
|
||
settings["restock_limit"] = max(1, int(settings.get("restock_limit", 24)))
|
||
except (TypeError, ValueError):
|
||
settings["restock_limit"] = 24
|
||
|
||
try:
|
||
settings["lcsc_timeout"] = max(5, int(settings.get("lcsc_timeout", 20)))
|
||
except (TypeError, ValueError):
|
||
settings["lcsc_timeout"] = 20
|
||
|
||
settings["api_url"] = (settings.get("api_url") or "").strip()
|
||
settings["model"] = (settings.get("model") or "").strip()
|
||
settings["api_key"] = (settings.get("api_key") or "").strip()
|
||
settings["lcsc_base_url"] = LCSC_BASE_URL
|
||
settings["lcsc_basic_path"] = LCSC_BASIC_PATH
|
||
settings["lcsc_app_id"] = (settings.get("lcsc_app_id") or "").strip()
|
||
settings["lcsc_access_key"] = (settings.get("lcsc_access_key") or "").strip()
|
||
settings["lcsc_secret_key"] = (settings.get("lcsc_secret_key") or "").strip()
|
||
settings["lock_storage_mode"] = bool(settings.get("lock_storage_mode", False))
|
||
return settings
|
||
|
||
|
||
def _generate_nonce(length: int = 32) -> str:
|
||
alphabet = string.ascii_letters + string.digits
|
||
return "".join(random.choice(alphabet) for _ in range(length))
|
||
|
||
|
||
def _generate_jop_signature(method: str, path: str, timestamp: str, nonce: str, post_data: str, secret_key: str) -> str:
|
||
string_to_sign = f"{method}\n{path}\n{timestamp}\n{nonce}\n{post_data}\n"
|
||
digest = hmac.new(
|
||
secret_key.encode("utf-8"),
|
||
string_to_sign.encode("utf-8"),
|
||
hashlib.sha256,
|
||
).digest()
|
||
return base64.b64encode(digest).decode("utf-8")
|
||
|
||
|
||
def _extract_lcsc_product_id_from_input(raw_identifier: str) -> int | None:
|
||
text = (raw_identifier or "").strip()
|
||
if not text:
|
||
return None
|
||
|
||
# Accept full item detail URL and extract /23913.html.
|
||
try:
|
||
parsed = urllib.parse.urlparse(text)
|
||
except ValueError:
|
||
parsed = None
|
||
|
||
if parsed and parsed.netloc:
|
||
path = parsed.path or ""
|
||
m = re.search(r"/(\d+)\.html$", path)
|
||
if m:
|
||
return int(m.group(1))
|
||
|
||
return None
|
||
|
||
|
||
def _fetch_lcsc_product_basic(product_identifier: str, settings: dict) -> dict:
|
||
"""调用立创开放接口,按商品详情页链接读取基础资料。
|
||
|
||
中文说明:当前业务只接受“商品详情页链接”,先从链接里提取 productId,
|
||
再按 JOP 鉴权规则签名请求接口,最后返回接口中的第一条商品记录。
|
||
"""
|
||
raw_identifier = (product_identifier or "").strip()
|
||
if not raw_identifier:
|
||
raise RuntimeError("立创商品链接不能为空")
|
||
|
||
product_id_from_input = _extract_lcsc_product_id_from_input(raw_identifier)
|
||
if product_id_from_input is None:
|
||
raise RuntimeError("请输入立创商品详情页链接,例如 https://item.szlcsc.com/23913.html")
|
||
|
||
app_id = settings.get("lcsc_app_id", "").strip()
|
||
access_key = settings.get("lcsc_access_key", "").strip()
|
||
secret_key = settings.get("lcsc_secret_key", "").strip()
|
||
if not app_id or not access_key or not secret_key:
|
||
raise RuntimeError("立创 JOP 鉴权参数不完整,请填写 app_id/access_key/secret_key")
|
||
|
||
timeout = int(settings.get("lcsc_timeout", 20))
|
||
def request_openapi(payload: dict) -> dict:
|
||
post_data_str = json.dumps(payload, ensure_ascii=False)
|
||
timestamp = str(int(time.time()))
|
||
nonce = _generate_nonce(32)
|
||
signature = _generate_jop_signature(
|
||
"POST",
|
||
LCSC_BASIC_PATH,
|
||
timestamp,
|
||
nonce,
|
||
post_data_str,
|
||
secret_key,
|
||
)
|
||
headers = {
|
||
"Content-Type": "application/json; utf-8",
|
||
"Authorization": (
|
||
f'JOP appid="{app_id}",accesskey="{access_key}",'
|
||
f'timestamp="{timestamp}",nonce="{nonce}",signature="{signature}"'
|
||
),
|
||
}
|
||
|
||
endpoint = LCSC_BASE_URL + LCSC_BASIC_PATH
|
||
req = urllib.request.Request(
|
||
endpoint,
|
||
data=post_data_str.encode("utf-8"),
|
||
method="POST",
|
||
headers=headers,
|
||
)
|
||
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||
raw = resp.read().decode("utf-8")
|
||
except urllib.error.HTTPError as exc:
|
||
detail = exc.read().decode("utf-8", errors="ignore")
|
||
raise RuntimeError(f"立创接口调用失败: HTTP {exc.code} {detail[:180]}") from exc
|
||
except urllib.error.URLError as exc:
|
||
raise RuntimeError(f"立创接口调用失败: 连接失败 {exc.reason}") from exc
|
||
|
||
try:
|
||
response = json.loads(raw)
|
||
except json.JSONDecodeError as exc:
|
||
raise RuntimeError("立创接口返回非 JSON 数据") from exc
|
||
|
||
code = response.get("code")
|
||
successful = response.get("successful", False)
|
||
if code != 200 and not successful:
|
||
message = str(response.get("message", "立创接口调用失败") or "立创接口调用失败")
|
||
raise RuntimeError(f"立创接口调用失败: code={code}, message={message}")
|
||
|
||
data = response.get("data") or {}
|
||
items = data.get("productBasicInfoVOList") or []
|
||
if not items:
|
||
raise RuntimeError("未查询到商品信息,请检查编号是否正确")
|
||
return items[0]
|
||
|
||
return request_openapi({"productId": product_id_from_input})
|
||
|
||
|
||
def _map_lcsc_product_to_component(product: dict) -> dict:
|
||
"""把立创商品字段映射为系统内部元件字段。
|
||
|
||
中文说明:接口原始字段很多,这里只提取库存系统真正会用到的料号、名称、
|
||
规格和备注,尽量保持结果简洁且便于搜索。
|
||
"""
|
||
product_model = str(product.get("productModel") or "").strip()
|
||
product_code = str(product.get("productCode") or "").strip()
|
||
product_id = product.get("productId")
|
||
part_no = product_model or product_code or (str(product_id) if product_id is not None else "")
|
||
name = str(product.get("productName") or "").strip() or part_no or "未命名元件"
|
||
|
||
brand_name = str(product.get("brandName") or "").strip()
|
||
encap_standard = str(product.get("encapStandard") or "").strip()
|
||
catalog_name = str(product.get("catalogName") or "").strip()
|
||
|
||
# Prefer concise, searchable spec fields.
|
||
spec_parts = [brand_name, encap_standard, catalog_name]
|
||
specification = " / ".join([p for p in spec_parts if p])
|
||
|
||
arrange_map = {
|
||
"biandai": "编带",
|
||
"bianpai": "编排",
|
||
"daizhuang": "袋装",
|
||
"guanzhuang": "管装",
|
||
"hezhuang": "盒装",
|
||
"juan": "卷装",
|
||
"kun": "捆装",
|
||
"tuopan": "托盘",
|
||
"xiangzhuang": "箱装",
|
||
}
|
||
unit_map = {
|
||
"pan": "圆盘",
|
||
"bao": "包",
|
||
"ben": "本",
|
||
"dai": "袋",
|
||
"guan": "管",
|
||
"he": "盒",
|
||
"juan": "卷",
|
||
"kun": "捆",
|
||
"mi": "米",
|
||
"tuopan": "托盘",
|
||
"xiang": "箱",
|
||
}
|
||
|
||
product_arrange_raw = str(product.get("productArrange") or "").strip().lower()
|
||
product_arrange = arrange_map.get(product_arrange_raw, product_arrange_raw)
|
||
min_packet_number = product.get("minPacketNumber")
|
||
min_packet_unit_raw = str(product.get("minPacketUnit") or "").strip().lower()
|
||
min_packet_unit = unit_map.get(min_packet_unit_raw, min_packet_unit_raw)
|
||
|
||
note_bits = []
|
||
if product_code:
|
||
note_bits.append(f"LCSC {product_code}")
|
||
if product_id is not None:
|
||
note_bits.append(f"ID {product_id}")
|
||
if product_arrange:
|
||
note_bits.append(f"编排 {product_arrange}")
|
||
if min_packet_number:
|
||
unit_suffix = min_packet_unit or ""
|
||
note_bits.append(f"最小包装 {min_packet_number}{unit_suffix}")
|
||
note = " | ".join(note_bits)
|
||
|
||
return {
|
||
"part_no": part_no,
|
||
"name": name,
|
||
"specification": specification,
|
||
"note": note,
|
||
}
|
||
|
||
|
||
# Box 表示一个容器/盒子;Component 表示盒内某个位置上的元件。
|
||
class Box(db.Model):
|
||
__tablename__ = "boxes"
|
||
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(100), nullable=False, unique=True)
|
||
description = db.Column(db.String(255), nullable=True)
|
||
box_type = db.Column(db.String(30), nullable=False, default="small_28")
|
||
slot_capacity = db.Column(db.Integer, nullable=False, default=28)
|
||
slot_prefix = db.Column(db.String(16), nullable=False, default="A")
|
||
start_number = db.Column(db.Integer, nullable=False, default=1)
|
||
|
||
|
||
class Component(db.Model):
|
||
__tablename__ = "components"
|
||
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
box_id = db.Column(db.Integer, db.ForeignKey("boxes.id"), nullable=False)
|
||
slot_index = db.Column(db.Integer, nullable=False)
|
||
|
||
part_no = db.Column(db.String(100), nullable=False)
|
||
name = db.Column(db.String(120), nullable=False)
|
||
specification = db.Column(db.String(120), nullable=True)
|
||
quantity = db.Column(db.Integer, nullable=False, default=0)
|
||
location = db.Column(db.String(120), nullable=True)
|
||
note = db.Column(db.Text, nullable=True)
|
||
is_enabled = db.Column(db.Boolean, nullable=False, default=True)
|
||
|
||
box = db.relationship("Box", backref=db.backref("components", lazy=True))
|
||
|
||
__table_args__ = (
|
||
db.UniqueConstraint("box_id", "slot_index", name="uq_box_slot"),
|
||
)
|
||
|
||
|
||
class InventoryEvent(db.Model):
|
||
__tablename__ = "inventory_events"
|
||
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
box_id = db.Column(db.Integer, nullable=True)
|
||
box_type = db.Column(db.String(30), nullable=True)
|
||
component_id = db.Column(db.Integer, nullable=True)
|
||
part_no = db.Column(db.String(100), nullable=True)
|
||
event_type = db.Column(db.String(30), nullable=False)
|
||
delta = db.Column(db.Integer, nullable=False, default=0)
|
||
created_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now())
|
||
|
||
|
||
def _add_column_if_missing(table_name: str, column_name: str, ddl: str) -> None:
|
||
columns = {
|
||
row[1]
|
||
for row in db.session.execute(db.text(f"PRAGMA table_info({table_name})")).fetchall()
|
||
}
|
||
if column_name not in columns:
|
||
db.session.execute(db.text(f"ALTER TABLE {table_name} ADD COLUMN {ddl}"))
|
||
|
||
|
||
def ensure_schema() -> None:
|
||
"""对旧数据库做最小增量迁移。
|
||
|
||
中文说明:项目早期数据库可能缺少新字段,这里用“缺什么补什么”的方式补列,
|
||
这样升级时不会强制删库重建。
|
||
"""
|
||
_add_column_if_missing(
|
||
"boxes",
|
||
"box_type",
|
||
"box_type VARCHAR(30) NOT NULL DEFAULT 'small_28'",
|
||
)
|
||
_add_column_if_missing(
|
||
"boxes",
|
||
"slot_capacity",
|
||
"slot_capacity INTEGER NOT NULL DEFAULT 28",
|
||
)
|
||
_add_column_if_missing(
|
||
"boxes",
|
||
"slot_prefix",
|
||
"slot_prefix VARCHAR(16) NOT NULL DEFAULT 'A'",
|
||
)
|
||
_add_column_if_missing(
|
||
"boxes",
|
||
"start_number",
|
||
"start_number INTEGER NOT NULL DEFAULT 1",
|
||
)
|
||
_add_column_if_missing(
|
||
"components",
|
||
"is_enabled",
|
||
"is_enabled BOOLEAN NOT NULL DEFAULT 1",
|
||
)
|
||
db.session.commit()
|
||
|
||
|
||
def slot_code_for_box(box: Box, slot_index: int) -> str:
|
||
serial = box.start_number + slot_index - 1
|
||
return f"{box.slot_prefix}{serial}"
|
||
|
||
|
||
def slot_range_label(box: Box) -> str:
|
||
start_code = slot_code_for_box(box, 1)
|
||
end_code = slot_code_for_box(box, box.slot_capacity)
|
||
return f"{start_code}-{end_code}"
|
||
|
||
|
||
def _find_enabled_part_no_conflict(part_no: str, exclude_component_id: int = None):
|
||
normalized = (part_no or "").strip()
|
||
if not normalized:
|
||
return None
|
||
|
||
query = Component.query.filter(
|
||
Component.part_no == normalized,
|
||
Component.is_enabled.is_(True),
|
||
)
|
||
if exclude_component_id is not None:
|
||
query = query.filter(Component.id != exclude_component_id)
|
||
|
||
return query.order_by(Component.box_id.asc(), Component.slot_index.asc()).first()
|
||
|
||
|
||
def _normalize_material_text(text: str) -> str:
|
||
raw = (text or "").upper().strip()
|
||
if not raw:
|
||
return ""
|
||
raw = raw.replace("(", "(").replace(")", ")")
|
||
raw = re.sub(r"[\s\-_/,|;:,。;:]+", "", raw)
|
||
raw = re.sub(r"[()\[\]{}]", "", raw)
|
||
return raw
|
||
|
||
|
||
def _material_identity_key(name: str, specification: str) -> str:
|
||
name_key = _normalize_material_text(name)
|
||
spec_key = _normalize_material_text(specification)
|
||
if not name_key and not spec_key:
|
||
return ""
|
||
return f"{name_key}|{spec_key}"
|
||
|
||
|
||
def _find_enabled_material_conflict(
|
||
name: str,
|
||
specification: str,
|
||
exclude_component_id: int = None,
|
||
exclude_part_no: str = "",
|
||
):
|
||
"""查找“同名 + 同规格”的启用中物料冲突。
|
||
|
||
中文说明:有些元件料号不同,但实际是同一种物料,所以不能只按 part_no 去重;
|
||
这里会把名称和规格做标准化后比对,避免重复建库存位。
|
||
"""
|
||
target_key = _material_identity_key(name, specification)
|
||
if not target_key:
|
||
return None
|
||
|
||
query = Component.query.filter(Component.is_enabled.is_(True))
|
||
if exclude_component_id is not None:
|
||
query = query.filter(Component.id != exclude_component_id)
|
||
|
||
normalized_exclude_part_no = (exclude_part_no or "").strip()
|
||
for candidate in query.order_by(Component.box_id.asc(), Component.slot_index.asc()).all():
|
||
if normalized_exclude_part_no and (candidate.part_no or "").strip() == normalized_exclude_part_no:
|
||
continue
|
||
if _material_identity_key(candidate.name, candidate.specification) == target_key:
|
||
return candidate
|
||
return None
|
||
|
||
|
||
def _is_truthy_form_value(value: str) -> bool:
|
||
return (value or "").strip().lower() in {"1", "true", "yes", "on"}
|
||
|
||
|
||
def _append_merge_part_no_note(note: str, part_no: str) -> str:
|
||
normalized = (part_no or "").strip()
|
||
if not normalized:
|
||
return note or ""
|
||
line = f"合并料号: {normalized}"
|
||
current = note or ""
|
||
if line in current:
|
||
return current
|
||
return f"{current}\n{line}".strip()
|
||
|
||
|
||
def _is_slot_part_replacement(component: Component, new_part_no: str) -> bool:
|
||
if component is None:
|
||
return False
|
||
old_part_no = (component.part_no or "").strip()
|
||
target_part_no = (new_part_no or "").strip()
|
||
return bool(old_part_no and target_part_no and old_part_no != target_part_no)
|
||
|
||
|
||
def _merge_into_existing_component(
|
||
target: Component,
|
||
incoming_part_no: str,
|
||
incoming_name: str,
|
||
incoming_specification: str,
|
||
incoming_note: str,
|
||
incoming_quantity: int,
|
||
source_component: Component = None,
|
||
source_box: Box = None,
|
||
) -> None:
|
||
"""把一个待保存/待导入的元件合并进已有元件记录。
|
||
|
||
中文说明:合并时会做三件事:
|
||
1. 把名称、规格、备注等信息补到目标记录上。
|
||
2. 把数量累加,并记录库存事件。
|
||
3. 如果来源位置本来就有旧记录,则删除旧记录,避免同一物料保留两份。
|
||
"""
|
||
old_target_enabled_qty = int(target.quantity or 0) if target.is_enabled else 0
|
||
|
||
target.name = incoming_name or target.name
|
||
if incoming_specification:
|
||
target.specification = incoming_specification
|
||
if incoming_note:
|
||
target.note = incoming_note
|
||
if incoming_part_no and incoming_part_no != target.part_no:
|
||
target.note = _append_merge_part_no_note(target.note, incoming_part_no)
|
||
|
||
target.quantity = int(target.quantity or 0) + int(incoming_quantity or 0)
|
||
target.is_enabled = True
|
||
|
||
target_delta = int(target.quantity or 0) - old_target_enabled_qty
|
||
if target_delta:
|
||
log_inventory_event(
|
||
event_type="component_merge_confirmed",
|
||
delta=target_delta,
|
||
box=target.box,
|
||
component=target,
|
||
part_no=target.part_no,
|
||
)
|
||
|
||
if source_component is not None and source_component.id != target.id:
|
||
if source_component.is_enabled and source_component.quantity:
|
||
log_inventory_event(
|
||
event_type="component_merge_cleanup",
|
||
delta=-int(source_component.quantity),
|
||
box=source_box,
|
||
component=source_component,
|
||
part_no=source_component.part_no,
|
||
)
|
||
db.session.delete(source_component)
|
||
|
||
|
||
def _format_component_position(component: Component) -> str:
|
||
target_box = Box.query.get(component.box_id)
|
||
if not target_box:
|
||
return f"盒子#{component.box_id} 位置#{component.slot_index}"
|
||
return f"{target_box.name} {slot_code_for_box(target_box, component.slot_index)}"
|
||
|
||
|
||
def compose_box_name(base_name: str, prefix: str, start_number: int, slot_capacity: int) -> str:
|
||
base = (base_name or "").strip()
|
||
if not base:
|
||
base = "盒子"
|
||
end_number = start_number + slot_capacity - 1
|
||
return f"{base} {prefix}{start_number}-{prefix}{end_number}"
|
||
|
||
|
||
def make_unique_box_name(candidate_name: str, exclude_box_id: int = None) -> str:
|
||
name = candidate_name
|
||
counter = 2
|
||
while True:
|
||
query = Box.query.filter_by(name=name)
|
||
if exclude_box_id is not None:
|
||
query = query.filter(Box.id != exclude_box_id)
|
||
if not query.first():
|
||
return name
|
||
name = f"{candidate_name} #{counter}"
|
||
counter += 1
|
||
|
||
|
||
def infer_base_name(box: Box) -> str:
|
||
pattern = rf"\s+{re.escape(box.slot_prefix)}\d+-{re.escape(box.slot_prefix)}\d+(?:\s+#\d+)?$"
|
||
base = re.sub(pattern, "", box.name).strip()
|
||
return base or box.name
|
||
|
||
|
||
def _extract_lcsc_code_from_text(text: str) -> str:
|
||
raw = (text or "").upper()
|
||
match = re.search(r"\bC\d{3,}\b", raw)
|
||
return match.group(0) if match else ""
|
||
|
||
|
||
def _parse_slot_spec_fields(specification: str) -> dict:
|
||
parts = [p.strip() for p in (specification or "").split("/") if p.strip()]
|
||
return {
|
||
"brand": parts[0] if len(parts) >= 1 else "",
|
||
"package": parts[1] if len(parts) >= 2 else "",
|
||
"usage": parts[2] if len(parts) >= 3 else "",
|
||
}
|
||
|
||
|
||
def _parse_note_detail_fields(note: str) -> dict:
|
||
raw = (note or "")
|
||
lcsc_code = _extract_lcsc_code_from_text(raw)
|
||
|
||
product_id = ""
|
||
id_match = re.search(r"\b(?:ID|productId)\s*(\d+)\b", raw, flags=re.IGNORECASE)
|
||
if id_match:
|
||
product_id = id_match.group(1)
|
||
|
||
arrange = ""
|
||
arrange_match = re.search(r"编排\s*([^|]+)", raw)
|
||
if arrange_match:
|
||
arrange = arrange_match.group(1).strip()
|
||
|
||
min_pack = ""
|
||
min_pack_match = re.search(r"最小包装\s*([^|]+)", raw)
|
||
if min_pack_match:
|
||
min_pack = min_pack_match.group(1).strip()
|
||
|
||
return {
|
||
"lcsc_code": lcsc_code,
|
||
"product_id": product_id,
|
||
"arrange": arrange,
|
||
"min_pack": min_pack,
|
||
}
|
||
|
||
|
||
def slot_data_for_box(box: Box):
|
||
components = Component.query.filter_by(box_id=box.id).all()
|
||
slot_map = {c.slot_index: c for c in components}
|
||
slots = []
|
||
for slot in range(1, box.slot_capacity + 1):
|
||
component = slot_map.get(slot)
|
||
lcsc_code = _extract_lcsc_code_from_text(component.note if component else "")
|
||
if not lcsc_code and component:
|
||
lcsc_code = _extract_lcsc_code_from_text(component.part_no)
|
||
spec_fields = _parse_slot_spec_fields(component.specification) if component else {
|
||
"brand": "",
|
||
"package": "",
|
||
"usage": "",
|
||
}
|
||
slots.append(
|
||
{
|
||
"slot": slot,
|
||
"slot_code": slot_code_for_box(box, slot),
|
||
"component": component,
|
||
"lcsc_code": lcsc_code,
|
||
"spec_fields": spec_fields,
|
||
}
|
||
)
|
||
return slots
|
||
|
||
|
||
def bag_rows_for_box(box: Box):
|
||
rows = []
|
||
components = (
|
||
Component.query.filter_by(box_id=box.id)
|
||
.order_by(Component.slot_index.asc())
|
||
.all()
|
||
)
|
||
for c in components:
|
||
rows.append({"component": c, "slot_code": slot_code_for_box(box, c.slot_index)})
|
||
return rows
|
||
|
||
|
||
def _parse_non_negative_int(raw_value: str, default_value: int = 0) -> int:
|
||
raw = (raw_value or "").strip()
|
||
if raw == "":
|
||
return default_value
|
||
value = int(raw)
|
||
if value < 0:
|
||
raise ValueError
|
||
return value
|
||
|
||
|
||
def normalize_legacy_data() -> None:
|
||
"""修复历史数据,使旧数据满足当前业务规则。
|
||
|
||
中文说明:这个函数主要解决历史版本遗留问题,例如空 box_type、空前缀、
|
||
旧数据没有 is_enabled 字段,以及确保“袋装清单”这个固定容器一定存在。
|
||
"""
|
||
db.session.execute(
|
||
db.text(
|
||
"UPDATE boxes SET box_type = 'small_28' WHERE box_type IS NULL OR box_type = ''"
|
||
)
|
||
)
|
||
db.session.execute(
|
||
db.text("UPDATE boxes SET slot_capacity = 28 WHERE slot_capacity IS NULL")
|
||
)
|
||
db.session.execute(
|
||
db.text("UPDATE boxes SET slot_prefix = 'A' WHERE slot_prefix IS NULL OR slot_prefix = ''")
|
||
)
|
||
db.session.execute(
|
||
db.text("UPDATE boxes SET start_number = 1 WHERE start_number IS NULL")
|
||
)
|
||
db.session.execute(
|
||
db.text("UPDATE components SET is_enabled = 1 WHERE is_enabled IS NULL")
|
||
)
|
||
|
||
for box in Box.query.all():
|
||
if box.box_type not in BOX_TYPES:
|
||
box.box_type = "small_28"
|
||
if not box.slot_capacity or box.slot_capacity < 1:
|
||
box.slot_capacity = BOX_TYPES[box.box_type]["default_capacity"]
|
||
if not box.slot_prefix:
|
||
box.slot_prefix = BOX_TYPES[box.box_type]["default_prefix"]
|
||
if box.start_number is None or box.start_number < 0:
|
||
box.start_number = 1
|
||
if box.box_type == "bag":
|
||
box.start_number = 1
|
||
box.name = "袋装清单"
|
||
|
||
if not Box.query.filter_by(box_type="bag").first():
|
||
default_meta = BOX_TYPES["bag"]
|
||
db.session.add(
|
||
Box(
|
||
name="袋装清单",
|
||
description=default_meta["default_desc"],
|
||
box_type="bag",
|
||
slot_capacity=default_meta["default_capacity"],
|
||
slot_prefix=default_meta["default_prefix"],
|
||
start_number=1,
|
||
)
|
||
)
|
||
|
||
db.session.commit()
|
||
|
||
|
||
def get_fixed_bag_box() -> Box:
|
||
bag_box = Box.query.filter_by(box_type="bag").order_by(Box.id.asc()).first()
|
||
if bag_box:
|
||
return bag_box
|
||
|
||
meta = BOX_TYPES["bag"]
|
||
bag_box = Box(
|
||
name="袋装清单",
|
||
description=meta["default_desc"],
|
||
box_type="bag",
|
||
slot_capacity=meta["default_capacity"],
|
||
slot_prefix=meta["default_prefix"],
|
||
start_number=1,
|
||
)
|
||
db.session.add(bag_box)
|
||
db.session.commit()
|
||
return bag_box
|
||
|
||
|
||
@app.route("/box/<int:box_id>/bag-capacity", methods=["POST"])
|
||
def update_bag_capacity(box_id: int):
|
||
box = Box.query.get_or_404(box_id)
|
||
if box.box_type != "bag":
|
||
return bad_request("当前容器不是袋装清单", box.box_type)
|
||
|
||
try:
|
||
slot_capacity = _parse_non_negative_int(request.form.get("slot_capacity", ""), box.slot_capacity)
|
||
except ValueError:
|
||
return render_box_page(box, error="袋位数量必须是大于等于 1 的整数")
|
||
|
||
if slot_capacity < 1:
|
||
return render_box_page(box, error="袋位数量必须是大于等于 1 的整数")
|
||
|
||
max_used_slot = (
|
||
db.session.query(db.func.max(Component.slot_index))
|
||
.filter_by(box_id=box.id)
|
||
.scalar()
|
||
or 0
|
||
)
|
||
if max_used_slot > slot_capacity:
|
||
return render_box_page(box, error=f"袋位数量不能小于已使用位置 {max_used_slot}")
|
||
|
||
box.slot_capacity = slot_capacity
|
||
db.session.commit()
|
||
return redirect(url_for("view_box", box_id=box.id, notice="袋位数量已更新"))
|
||
|
||
|
||
def make_overview_rows(box: Box):
|
||
enabled_components = (
|
||
Component.query.filter_by(box_id=box.id, is_enabled=True)
|
||
.order_by(Component.slot_index.asc())
|
||
.all()
|
||
)
|
||
rows = []
|
||
for c in enabled_components:
|
||
rows.append(
|
||
{
|
||
"slot_code": slot_code_for_box(box, c.slot_index),
|
||
"name": c.name,
|
||
"part_no": c.part_no,
|
||
}
|
||
)
|
||
return rows
|
||
|
||
|
||
def box_sort_key(box: Box):
|
||
return (
|
||
(box.slot_prefix or "").upper(),
|
||
box.start_number if box.start_number is not None else 0,
|
||
box.name or "",
|
||
)
|
||
|
||
|
||
def has_range_conflict(
|
||
*,
|
||
box_type: str,
|
||
prefix: str,
|
||
start_number: int,
|
||
slot_capacity: int,
|
||
exclude_box_id: int = None,
|
||
):
|
||
end_number = start_number + slot_capacity - 1
|
||
query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix)
|
||
if exclude_box_id is not None:
|
||
query = query.filter(Box.id != exclude_box_id)
|
||
|
||
for other in query.all():
|
||
other_start = other.start_number
|
||
other_end = other.start_number + other.slot_capacity - 1
|
||
# Two ranges overlap unless one is strictly before the other.
|
||
if not (end_number < other_start or start_number > other_end):
|
||
return True, other
|
||
|
||
return False, None
|
||
|
||
|
||
def suggest_next_start_number(
|
||
*,
|
||
box_type: str,
|
||
prefix: str,
|
||
slot_capacity: int,
|
||
exclude_box_id: int = None,
|
||
) -> int:
|
||
max_end = 0
|
||
query = Box.query.filter_by(box_type=box_type, slot_prefix=prefix)
|
||
if exclude_box_id is not None:
|
||
query = query.filter(Box.id != exclude_box_id)
|
||
|
||
for other in query.all():
|
||
other_end = other.start_number + other.slot_capacity - 1
|
||
if other_end > max_end:
|
||
max_end = other_end
|
||
|
||
return max_end + 1 if max_end > 0 else 1
|
||
|
||
|
||
def build_index_anchor(box_type: str = "") -> str:
|
||
if box_type in BOX_TYPES:
|
||
return f"group-{box_type}"
|
||
return ""
|
||
|
||
|
||
def bad_request(message: str, box_type: str = ""):
|
||
anchor = build_index_anchor(box_type)
|
||
if anchor and box_type in BOX_TYPES:
|
||
back_url = url_for("type_page", box_type=box_type)
|
||
else:
|
||
back_url = request.referrer or url_for("types_page")
|
||
|
||
return (
|
||
render_template(
|
||
"error.html",
|
||
status_code=400,
|
||
title="请求参数有误",
|
||
message=message,
|
||
back_url=back_url,
|
||
),
|
||
400,
|
||
)
|
||
|
||
|
||
def render_box_page(box: Box, error: str = "", notice: str = ""):
|
||
"""统一渲染盒子详情页。
|
||
|
||
中文说明:很多路由最终都要返回 box.html,这里集中准备模板所需的公共数据,
|
||
避免每个路由重复组织 slots、提示信息和阈值参数。
|
||
"""
|
||
slots = slot_data_for_box(box)
|
||
return render_template(
|
||
"box.html",
|
||
box=box,
|
||
slots=slots,
|
||
box_types=BOX_TYPES,
|
||
slot_range=slot_range_label(box),
|
||
low_stock_threshold=LOW_STOCK_THRESHOLD,
|
||
error=error,
|
||
notice=notice,
|
||
)
|
||
|
||
|
||
def _next_empty_slot_index(box: Box, occupied_slots: set[int]):
|
||
for idx in range(1, box.slot_capacity + 1):
|
||
if idx not in occupied_slots:
|
||
return idx
|
||
return None
|
||
|
||
|
||
def _parse_bulk_line(line: str):
|
||
parts = [p.strip() for p in re.split(r"[,\t]", line)]
|
||
while len(parts) < 5:
|
||
parts.append("")
|
||
return {
|
||
"part_no": parts[0],
|
||
"name": parts[1],
|
||
"quantity_raw": parts[2],
|
||
"specification": parts[3],
|
||
"note": parts[4],
|
||
}
|
||
|
||
|
||
def log_inventory_event(
|
||
*,
|
||
event_type: str,
|
||
delta: int,
|
||
box: Box = None,
|
||
component: Component = None,
|
||
part_no: str = "",
|
||
):
|
||
"""记录库存变动日志。
|
||
|
||
中文说明:统计页的趋势图、最近活动、净变化等都依赖这张事件表,
|
||
所以凡是入库、出库、删除、合并等会改变库存的操作,都尽量走这里记账。
|
||
"""
|
||
event = InventoryEvent(
|
||
box_id=box.id if box else (component.box_id if component else None),
|
||
box_type=box.box_type if box else None,
|
||
component_id=component.id if component else None,
|
||
part_no=(part_no or (component.part_no if component else "") or "").strip() or None,
|
||
event_type=event_type,
|
||
delta=int(delta),
|
||
)
|
||
db.session.add(event)
|
||
|
||
|
||
def parse_days_value(raw_days: str) -> int:
|
||
try:
|
||
days = int((raw_days or "7").strip())
|
||
except ValueError:
|
||
days = 7
|
||
return days if days in (7, 30) else 7
|
||
|
||
|
||
def parse_box_type_filter(raw_box_type: str) -> str:
|
||
box_type = (raw_box_type or "").strip()
|
||
return box_type if box_type in BOX_TYPES else "all"
|
||
|
||
|
||
def _to_date(raw_day):
|
||
if isinstance(raw_day, str):
|
||
return datetime.strptime(raw_day, "%Y-%m-%d").date()
|
||
return raw_day
|
||
|
||
|
||
def query_event_daily_delta(days: int, box_type_filter: str = "all"):
|
||
today = datetime.now().date()
|
||
start_day = today - timedelta(days=days - 1)
|
||
|
||
query = db.session.query(
|
||
db.func.date(InventoryEvent.created_at).label("event_day"),
|
||
db.func.sum(InventoryEvent.delta).label("daily_delta"),
|
||
).filter(InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time()))
|
||
|
||
if box_type_filter != "all":
|
||
query = query.filter(InventoryEvent.box_type == box_type_filter)
|
||
|
||
rows = query.group_by(db.func.date(InventoryEvent.created_at)).all()
|
||
|
||
delta_by_day = {}
|
||
for row in rows:
|
||
delta_by_day[_to_date(row.event_day)] = int(row.daily_delta or 0)
|
||
|
||
return delta_by_day
|
||
|
||
|
||
def build_trend_points_from_events(days: int, total_quantity: int, box_type_filter: str = "all"):
|
||
safe_days = days if days in (7, 30) else 7
|
||
today = datetime.now().date()
|
||
delta_by_day = query_event_daily_delta(safe_days, box_type_filter)
|
||
|
||
points = []
|
||
running_total = total_quantity
|
||
reverse_days = [today - timedelta(days=offset) for offset in range(safe_days)]
|
||
|
||
for day in reverse_days:
|
||
points.append(
|
||
{
|
||
"date": day,
|
||
"label": day.strftime("%m-%d"),
|
||
"value": running_total,
|
||
}
|
||
)
|
||
running_total -= delta_by_day.get(day, 0)
|
||
|
||
points.reverse()
|
||
return points
|
||
|
||
|
||
def build_box_type_trend_series(days: int, totals_by_type: dict):
|
||
safe_days = days if days in (7, 30) else 7
|
||
today = datetime.now().date()
|
||
all_days = [today - timedelta(days=offset) for offset in range(safe_days)]
|
||
|
||
series = {}
|
||
for box_type in BOX_TYPES.keys():
|
||
delta_by_day = query_event_daily_delta(safe_days, box_type)
|
||
running_total = int(totals_by_type.get(box_type, 0))
|
||
values = []
|
||
for day in all_days:
|
||
values.append(running_total)
|
||
running_total -= delta_by_day.get(day, 0)
|
||
values.reverse()
|
||
series[box_type] = values
|
||
|
||
return {
|
||
"labels": [day.strftime("%m-%d") for day in reversed(all_days)],
|
||
"series": series,
|
||
}
|
||
|
||
|
||
def make_sparkline(values: list[int], width: int = 220, height: int = 56) -> str:
|
||
if not values:
|
||
return ""
|
||
min_value = min(values)
|
||
max_value = max(values)
|
||
span = max(max_value - min_value, 1)
|
||
step_x = width / max(len(values) - 1, 1)
|
||
points = []
|
||
|
||
for idx, value in enumerate(values):
|
||
x = idx * step_x
|
||
y = height - ((value - min_value) / span) * height
|
||
points.append(f"{x:.2f},{y:.2f}")
|
||
return " ".join(points)
|
||
|
||
|
||
def event_type_label(event_type: str) -> str:
|
||
labels = {
|
||
"quick_inbound_add": "快速入库新增",
|
||
"quick_inbound_merge": "快速入库合并",
|
||
"component_outbound": "快速出库",
|
||
"component_save": "编辑保存",
|
||
"component_enable": "启用元件",
|
||
"component_disable": "停用元件",
|
||
"component_delete": "删除元件",
|
||
"bag_add": "袋装新增",
|
||
"bag_batch_add": "袋装批量新增",
|
||
"bag_merge": "袋装合并",
|
||
"bag_batch_merge": "袋装批量合并",
|
||
"box_delete": "删除盒子",
|
||
}
|
||
return labels.get(event_type, event_type)
|
||
|
||
|
||
def recent_events(limit: int = 20, box_type_filter: str = "all"):
|
||
query = InventoryEvent.query
|
||
if box_type_filter != "all":
|
||
query = query.filter_by(box_type=box_type_filter)
|
||
|
||
rows = query.order_by(InventoryEvent.created_at.desc()).limit(limit).all()
|
||
items = []
|
||
for row in rows:
|
||
items.append(
|
||
{
|
||
"time": row.created_at.strftime("%Y-%m-%d %H:%M") if row.created_at else "-",
|
||
"type": event_type_label(row.event_type),
|
||
"box_type": BOX_TYPES.get(row.box_type, {}).get("label", "全部"),
|
||
"part_no": row.part_no or "-",
|
||
"delta": int(row.delta or 0),
|
||
}
|
||
)
|
||
return items
|
||
|
||
|
||
def build_dashboard_context():
|
||
"""构建首页/分类页需要的聚合数据。
|
||
|
||
中文说明:这里会一次性整理盒子分组、库存统计、低库存清单等信息,
|
||
让模板层尽量只负责展示,不承担复杂的数据拼装逻辑。
|
||
"""
|
||
boxes = Box.query.all()
|
||
box_by_id = {box.id: box for box in boxes}
|
||
boxes.sort(key=box_sort_key)
|
||
groups = {key: [] for key in BOX_TYPES.keys()}
|
||
|
||
for box in boxes:
|
||
box_type = box.box_type if box.box_type in BOX_TYPES else "small_28"
|
||
overview_rows = make_overview_rows(box)
|
||
groups[box_type].append(
|
||
{
|
||
"box": box,
|
||
"used_count": len(overview_rows),
|
||
"slot_range": slot_range_label(box),
|
||
"overview_rows": overview_rows,
|
||
"base_name": infer_base_name(box),
|
||
}
|
||
)
|
||
|
||
components = Component.query.all()
|
||
enabled_components = [c for c in components if c.is_enabled]
|
||
disabled_components = [c for c in components if not c.is_enabled]
|
||
low_stock_components = [c for c in enabled_components if c.quantity < LOW_STOCK_THRESHOLD]
|
||
|
||
trend_points_7d = build_trend_points_from_events(
|
||
days=7,
|
||
total_quantity=sum(c.quantity for c in enabled_components),
|
||
box_type_filter="all",
|
||
)
|
||
period_net_change_7d = 0
|
||
if len(trend_points_7d) >= 2:
|
||
period_net_change_7d = trend_points_7d[-1]["value"] - trend_points_7d[0]["value"]
|
||
|
||
low_stock_items = []
|
||
for c in sorted(low_stock_components, key=lambda item: (item.quantity, item.name or ""))[:12]:
|
||
box = box_by_id.get(c.box_id)
|
||
box_type_key = box.box_type if box and box.box_type in BOX_TYPES else "small_28"
|
||
low_stock_items.append(
|
||
{
|
||
"name": c.name,
|
||
"part_no": c.part_no,
|
||
"quantity": c.quantity,
|
||
"box_type": box_type_key,
|
||
"box_type_label": BOX_TYPES[box_type_key]["label"],
|
||
"box_name": box.name if box else f"盒 {c.box_id}",
|
||
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
|
||
"edit_url": url_for("edit_component", box_id=c.box_id, slot=c.slot_index),
|
||
}
|
||
)
|
||
|
||
category_stats = []
|
||
max_category_quantity = 0
|
||
for key, meta in BOX_TYPES.items():
|
||
category_components = [
|
||
c
|
||
for c in enabled_components
|
||
if box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == key
|
||
]
|
||
quantity = sum(c.quantity for c in category_components)
|
||
max_category_quantity = max(max_category_quantity, quantity)
|
||
category_stats.append(
|
||
{
|
||
"key": key,
|
||
"label": meta["label"],
|
||
"item_count": len(category_components),
|
||
"quantity": quantity,
|
||
}
|
||
)
|
||
|
||
stats = {
|
||
"box_count": len(boxes),
|
||
"active_items": len(enabled_components),
|
||
"low_stock_count": len(low_stock_components),
|
||
"disabled_count": len(disabled_components),
|
||
"max_category_quantity": max_category_quantity,
|
||
"period_net_change_7d": period_net_change_7d,
|
||
}
|
||
|
||
return {
|
||
"groups": groups,
|
||
"stats": stats,
|
||
"category_stats": category_stats,
|
||
"low_stock_items": low_stock_items,
|
||
}
|
||
|
||
|
||
def _build_restock_payload(*, limit: int = 20, threshold: int = LOW_STOCK_THRESHOLD) -> dict:
|
||
"""生成补货分析输入数据。
|
||
|
||
中文说明:AI 不直接读数据库,而是读取这里整理好的低库存清单和近 30 天出库热点,
|
||
这样可以稳定控制输入格式,也方便 AI 异常时用同一份数据做规则兜底。
|
||
"""
|
||
boxes = Box.query.all()
|
||
box_by_id = {box.id: box for box in boxes}
|
||
enabled_components = Component.query.filter_by(is_enabled=True).all()
|
||
low_stock_components = [c for c in enabled_components if int(c.quantity or 0) < threshold]
|
||
|
||
low_items = []
|
||
for c in sorted(low_stock_components, key=lambda item: (int(item.quantity or 0), item.name or ""))[:limit]:
|
||
box = box_by_id.get(c.box_id)
|
||
box_type = box.box_type if box and box.box_type in BOX_TYPES else "small_28"
|
||
low_items.append(
|
||
{
|
||
"part_no": c.part_no,
|
||
"name": c.name,
|
||
"quantity": int(c.quantity or 0),
|
||
"box_type": box_type,
|
||
"box_type_label": BOX_TYPES[box_type]["label"],
|
||
"box_name": box.name if box else f"盒 {c.box_id}",
|
||
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
|
||
}
|
||
)
|
||
|
||
start_day = datetime.now().date() - timedelta(days=29)
|
||
outbound_rows = (
|
||
db.session.query(InventoryEvent.part_no, db.func.sum(-InventoryEvent.delta).label("outbound_qty"))
|
||
.filter(
|
||
InventoryEvent.event_type == "component_outbound",
|
||
InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time()),
|
||
InventoryEvent.delta < 0,
|
||
)
|
||
.group_by(InventoryEvent.part_no)
|
||
.order_by(db.func.sum(-InventoryEvent.delta).desc())
|
||
.limit(20)
|
||
.all()
|
||
)
|
||
outbound_top = [
|
||
{"part_no": row[0] or "-", "outbound_qty_30d": int(row[1] or 0)}
|
||
for row in outbound_rows
|
||
]
|
||
|
||
return {
|
||
"generated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
"threshold": int(threshold),
|
||
"low_stock_items": low_items,
|
||
"top_outbound_30d": outbound_top,
|
||
}
|
||
|
||
|
||
def _call_siliconflow_chat(
|
||
system_prompt: str,
|
||
user_prompt: str,
|
||
*,
|
||
api_url: str,
|
||
model: str,
|
||
api_key: str,
|
||
timeout: int,
|
||
) -> str:
|
||
api_key = (api_key or "").strip()
|
||
if not api_key:
|
||
raise RuntimeError("SILICONFLOW_API_KEY 未配置")
|
||
if not api_url:
|
||
raise RuntimeError("AI API URL 未配置")
|
||
if not model:
|
||
raise RuntimeError("AI 模型名称未配置")
|
||
|
||
payload = {
|
||
"model": model,
|
||
"temperature": 0.2,
|
||
"max_tokens": 700,
|
||
"messages": [
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": user_prompt},
|
||
],
|
||
}
|
||
body = json.dumps(payload).encode("utf-8")
|
||
req = urllib.request.Request(
|
||
api_url,
|
||
data=body,
|
||
method="POST",
|
||
headers={
|
||
"Authorization": f"Bearer {api_key}",
|
||
"Content-Type": "application/json",
|
||
},
|
||
)
|
||
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||
raw = resp.read().decode("utf-8")
|
||
except urllib.error.HTTPError as exc:
|
||
detail = exc.read().decode("utf-8", errors="ignore")
|
||
raise RuntimeError(f"AI 服务返回 HTTP {exc.code}: {detail[:200]}") from exc
|
||
except urllib.error.URLError as exc:
|
||
raise RuntimeError(f"AI 服务连接失败: {exc.reason}") from exc
|
||
|
||
try:
|
||
data = json.loads(raw)
|
||
return data["choices"][0]["message"]["content"].strip()
|
||
except (KeyError, IndexError, TypeError, json.JSONDecodeError) as exc:
|
||
raise RuntimeError("AI 返回格式无法解析") from exc
|
||
|
||
|
||
@app.route("/")
|
||
def index():
|
||
return redirect(url_for("types_page"))
|
||
|
||
|
||
@app.route("/types")
|
||
def types_page():
|
||
dashboard = build_dashboard_context()
|
||
notice = request.args.get("notice", "").strip()
|
||
error = request.args.get("error", "").strip()
|
||
category_stats_map = {item["key"]: item for item in dashboard["category_stats"]}
|
||
low_stock_groups = []
|
||
|
||
for key, meta in BOX_TYPES.items():
|
||
grouped_items = [
|
||
item
|
||
for item in dashboard["low_stock_items"]
|
||
if item.get("box_type") == key
|
||
]
|
||
low_stock_groups.append(
|
||
{
|
||
"key": key,
|
||
"label": meta["label"],
|
||
"items": grouped_items,
|
||
}
|
||
)
|
||
|
||
type_cards = []
|
||
for key, meta in BOX_TYPES.items():
|
||
category_item = category_stats_map.get(key, {})
|
||
type_cards.append(
|
||
{
|
||
"key": key,
|
||
"label": meta["label"],
|
||
"desc": meta["default_desc"],
|
||
"count": len(dashboard["groups"].get(key, [])),
|
||
"item_count": int(category_item.get("item_count", 0)),
|
||
"quantity": int(category_item.get("quantity", 0)),
|
||
"url": url_for("type_page", box_type=key),
|
||
}
|
||
)
|
||
|
||
return render_template(
|
||
"types.html",
|
||
type_cards=type_cards,
|
||
stats=dashboard["stats"],
|
||
low_stock_groups=low_stock_groups,
|
||
notice=notice,
|
||
error=error,
|
||
)
|
||
|
||
|
||
@app.route("/container-type/<box_type>/edit", methods=["GET", "POST"])
|
||
def edit_container_type(box_type: str):
|
||
if box_type not in BOX_TYPES:
|
||
return bad_request("无效盒子类型", "")
|
||
|
||
meta = BOX_TYPES[box_type]
|
||
error = ""
|
||
|
||
if request.method == "POST":
|
||
label = request.form.get("label", "").strip()
|
||
default_desc = request.form.get("default_desc", "").strip()
|
||
default_prefix = request.form.get("default_prefix", "").strip().upper()
|
||
|
||
if not label:
|
||
error = "容器名称不能为空"
|
||
elif not default_prefix:
|
||
error = "默认前缀不能为空"
|
||
|
||
if not error:
|
||
meta["label"] = label
|
||
meta["default_desc"] = default_desc or DEFAULT_BOX_TYPES[box_type]["default_desc"]
|
||
meta["default_prefix"] = default_prefix
|
||
|
||
_save_box_type_overrides()
|
||
return redirect(url_for("types_page", notice="容器属性已更新"))
|
||
|
||
return render_template(
|
||
"type_edit.html",
|
||
box_type=box_type,
|
||
meta=meta,
|
||
error=error,
|
||
)
|
||
|
||
|
||
@app.route("/type/<box_type>")
|
||
def type_page(box_type: str):
|
||
if box_type not in BOX_TYPES:
|
||
return bad_request("无效盒子类型", "")
|
||
|
||
if box_type == "bag":
|
||
return render_box_page(get_fixed_bag_box())
|
||
|
||
dashboard = build_dashboard_context()
|
||
|
||
return render_template(
|
||
"index.html",
|
||
groups=dashboard["groups"],
|
||
box_types=BOX_TYPES,
|
||
stats=dashboard["stats"],
|
||
category_stats=dashboard["category_stats"],
|
||
low_stock_items=dashboard["low_stock_items"],
|
||
view_box_types=[box_type],
|
||
current_box_type=box_type,
|
||
separate_mode=True,
|
||
)
|
||
|
||
|
||
@app.route("/boxes/create", methods=["POST"])
|
||
def create_box():
|
||
box_type = request.form.get("box_type", "small_28").strip()
|
||
base_name = request.form.get("name", "").strip()
|
||
description = request.form.get("description", "").strip()
|
||
slot_prefix = request.form.get("slot_prefix", "").strip().upper()
|
||
|
||
if box_type not in BOX_TYPES:
|
||
return bad_request("无效盒子类型", box_type)
|
||
if box_type == "bag":
|
||
return redirect(url_for("type_page", box_type="bag"))
|
||
if not base_name:
|
||
return bad_request("盒子名称不能为空", box_type)
|
||
|
||
try:
|
||
start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1)
|
||
except ValueError:
|
||
return bad_request("起始序号必须是大于等于 0 的整数", box_type)
|
||
|
||
meta = BOX_TYPES[box_type]
|
||
slot_capacity = meta["default_capacity"]
|
||
if box_type == "custom":
|
||
try:
|
||
slot_capacity = _parse_non_negative_int(
|
||
request.form.get("slot_capacity", str(meta["default_capacity"])),
|
||
meta["default_capacity"],
|
||
)
|
||
except ValueError:
|
||
return bad_request("格数必须是大于等于 1 的整数", box_type)
|
||
if slot_capacity < 1:
|
||
return bad_request("格数必须是大于等于 1 的整数", box_type)
|
||
|
||
effective_prefix = slot_prefix or meta["default_prefix"]
|
||
conflict, other_box = has_range_conflict(
|
||
box_type=box_type,
|
||
prefix=effective_prefix,
|
||
start_number=start_number,
|
||
slot_capacity=slot_capacity,
|
||
)
|
||
if conflict:
|
||
return bad_request(
|
||
"编号范围冲突: 与现有盒子 "
|
||
f"[{other_box.name}]"
|
||
" 重叠,请更换前缀或起始序号",
|
||
box_type,
|
||
)
|
||
|
||
generated_name = compose_box_name(
|
||
base_name=base_name,
|
||
prefix=effective_prefix,
|
||
start_number=start_number,
|
||
slot_capacity=slot_capacity,
|
||
)
|
||
final_name = make_unique_box_name(generated_name)
|
||
|
||
box = Box(
|
||
name=final_name,
|
||
description=description or meta["default_desc"],
|
||
box_type=box_type,
|
||
slot_capacity=slot_capacity,
|
||
slot_prefix=effective_prefix,
|
||
start_number=start_number,
|
||
)
|
||
db.session.add(box)
|
||
db.session.commit()
|
||
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
|
||
target_type = return_to_type if return_to_type != "all" else box_type
|
||
return redirect(url_for("type_page", box_type=target_type))
|
||
|
||
|
||
@app.route("/boxes/<int:box_id>/update", methods=["POST"])
|
||
def update_box(box_id: int):
|
||
box = Box.query.get_or_404(box_id)
|
||
|
||
base_name = request.form.get("name", "").strip()
|
||
description = request.form.get("description", "").strip()
|
||
slot_prefix = request.form.get("slot_prefix", "").strip().upper()
|
||
|
||
if not base_name:
|
||
return bad_request("盒子名称不能为空", box.box_type)
|
||
|
||
try:
|
||
start_number = _parse_non_negative_int(request.form.get("start_number", "1"), 1)
|
||
except ValueError:
|
||
return bad_request("起始序号必须是大于等于 0 的整数", box.box_type)
|
||
|
||
slot_capacity = box.slot_capacity
|
||
if box.box_type == "custom":
|
||
try:
|
||
slot_capacity = _parse_non_negative_int(
|
||
request.form.get("slot_capacity", str(box.slot_capacity)),
|
||
box.slot_capacity,
|
||
)
|
||
except ValueError:
|
||
return bad_request("格数必须是大于等于 1 的整数", box.box_type)
|
||
if slot_capacity < 1:
|
||
return bad_request("格数必须是大于等于 1 的整数", box.box_type)
|
||
|
||
max_used_slot = (
|
||
db.session.query(db.func.max(Component.slot_index))
|
||
.filter_by(box_id=box.id)
|
||
.scalar()
|
||
or 0
|
||
)
|
||
if max_used_slot > slot_capacity:
|
||
return bad_request(
|
||
f"格数不能小于已使用位置 {max_used_slot}",
|
||
box.box_type,
|
||
)
|
||
|
||
effective_prefix = slot_prefix or BOX_TYPES[box.box_type]["default_prefix"]
|
||
conflict, other_box = has_range_conflict(
|
||
box_type=box.box_type,
|
||
prefix=effective_prefix,
|
||
start_number=start_number,
|
||
slot_capacity=slot_capacity,
|
||
exclude_box_id=box.id,
|
||
)
|
||
if conflict:
|
||
return bad_request(
|
||
"编号范围冲突: 与现有盒子 "
|
||
f"[{other_box.name}]"
|
||
" 重叠,请更换前缀或起始序号",
|
||
box.box_type,
|
||
)
|
||
|
||
generated_name = compose_box_name(
|
||
base_name=base_name,
|
||
prefix=effective_prefix,
|
||
start_number=start_number,
|
||
slot_capacity=slot_capacity,
|
||
)
|
||
|
||
box.name = make_unique_box_name(generated_name, exclude_box_id=box.id)
|
||
box.description = description or BOX_TYPES[box.box_type]["default_desc"]
|
||
box.slot_prefix = effective_prefix
|
||
box.start_number = start_number
|
||
box.slot_capacity = slot_capacity
|
||
db.session.commit()
|
||
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
|
||
target_type = return_to_type if return_to_type != "all" else box.box_type
|
||
return redirect(url_for("type_page", box_type=target_type))
|
||
|
||
|
||
@app.route("/boxes/<int:box_id>/delete", methods=["POST"])
|
||
def delete_box(box_id: int):
|
||
box = Box.query.get_or_404(box_id)
|
||
if box.box_type == "bag":
|
||
return bad_request("袋装清单为固定大容器,不能删除", box.box_type)
|
||
enabled_sum = (
|
||
db.session.query(db.func.sum(Component.quantity))
|
||
.filter_by(box_id=box.id, is_enabled=True)
|
||
.scalar()
|
||
or 0
|
||
)
|
||
if enabled_sum:
|
||
log_inventory_event(event_type="box_delete", delta=-int(enabled_sum), box=box)
|
||
Component.query.filter_by(box_id=box.id).delete()
|
||
box_type = box.box_type
|
||
db.session.delete(box)
|
||
db.session.commit()
|
||
return_to_type = parse_box_type_filter(request.form.get("return_to_type", ""))
|
||
target_type = return_to_type if return_to_type != "all" else box_type
|
||
return redirect(url_for("type_page", box_type=target_type))
|
||
|
||
|
||
@app.route("/boxes/suggest-start")
|
||
def suggest_start():
|
||
box_type = request.args.get("box_type", "small_28").strip()
|
||
if box_type not in BOX_TYPES:
|
||
return {"ok": False, "message": "无效盒子类型"}, 400
|
||
|
||
slot_prefix = request.args.get("slot_prefix", "").strip().upper()
|
||
effective_prefix = slot_prefix or BOX_TYPES[box_type]["default_prefix"]
|
||
|
||
box_id_raw = request.args.get("box_id", "").strip()
|
||
exclude_box_id = None
|
||
slot_capacity = BOX_TYPES[box_type]["default_capacity"]
|
||
|
||
if box_type == "custom" and not box_id_raw:
|
||
try:
|
||
slot_capacity = _parse_non_negative_int(
|
||
request.args.get("slot_capacity", str(slot_capacity)),
|
||
slot_capacity,
|
||
)
|
||
except ValueError:
|
||
return {"ok": False, "message": "格数必须是大于等于 1 的整数"}, 400
|
||
if slot_capacity < 1:
|
||
return {"ok": False, "message": "格数必须是大于等于 1 的整数"}, 400
|
||
|
||
if box_id_raw:
|
||
try:
|
||
box_id = int(box_id_raw)
|
||
except ValueError:
|
||
return {"ok": False, "message": "box_id 非法"}, 400
|
||
|
||
box = Box.query.get(box_id)
|
||
if not box:
|
||
return {"ok": False, "message": "盒子不存在"}, 404
|
||
|
||
box_type = box.box_type
|
||
slot_capacity = box.slot_capacity
|
||
exclude_box_id = box.id
|
||
|
||
suggested = suggest_next_start_number(
|
||
box_type=box_type,
|
||
prefix=effective_prefix,
|
||
slot_capacity=slot_capacity,
|
||
exclude_box_id=exclude_box_id,
|
||
)
|
||
end_number = suggested + slot_capacity - 1
|
||
|
||
return {
|
||
"ok": True,
|
||
"start_number": suggested,
|
||
"slot_prefix": effective_prefix,
|
||
"preview_range": f"{effective_prefix}{suggested}-{effective_prefix}{end_number}",
|
||
}
|
||
|
||
|
||
@app.route("/box/<int:box_id>")
|
||
def view_box(box_id: int):
|
||
box = Box.query.get_or_404(box_id)
|
||
return render_box_page(box)
|
||
|
||
|
||
@app.route("/box/<int:box_id>/labels/export")
|
||
def export_box_labels_csv(box_id: int):
|
||
box = Box.query.get_or_404(box_id)
|
||
rows = (
|
||
Component.query.filter_by(box_id=box.id, is_enabled=True)
|
||
.order_by(Component.slot_index.asc())
|
||
.all()
|
||
)
|
||
|
||
output = StringIO()
|
||
writer = csv.writer(output)
|
||
writer.writerow(
|
||
[
|
||
"盒子名称(box_name)",
|
||
"位置编号(slot_code)",
|
||
"料号(part_no)",
|
||
"名称(name)",
|
||
"品牌(brand)",
|
||
"封装(package)",
|
||
"用途(usage)",
|
||
"立创编号(lcsc_code)",
|
||
"立创商品ID(lcsc_product_id)",
|
||
"商品编排(arrange)",
|
||
"最小包装(min_pack)",
|
||
"规格(specification)",
|
||
"数量(quantity)",
|
||
"位置备注(location)",
|
||
"备注(note)",
|
||
]
|
||
)
|
||
|
||
for c in rows:
|
||
slot_code = slot_code_for_box(box, c.slot_index)
|
||
spec_fields = _parse_slot_spec_fields(c.specification)
|
||
note_fields = _parse_note_detail_fields(c.note)
|
||
if not note_fields["lcsc_code"]:
|
||
note_fields["lcsc_code"] = _extract_lcsc_code_from_text(c.part_no)
|
||
writer.writerow(
|
||
[
|
||
box.name,
|
||
slot_code,
|
||
c.part_no or "",
|
||
c.name or "",
|
||
spec_fields["brand"],
|
||
spec_fields["package"],
|
||
spec_fields["usage"],
|
||
note_fields["lcsc_code"],
|
||
note_fields["product_id"],
|
||
note_fields["arrange"],
|
||
note_fields["min_pack"],
|
||
c.specification or "",
|
||
int(c.quantity or 0),
|
||
c.location or "",
|
||
c.note or "",
|
||
]
|
||
)
|
||
|
||
csv_content = "\ufeff" + output.getvalue()
|
||
output.close()
|
||
filename = f"labels_box_{box.id}.csv"
|
||
|
||
return Response(
|
||
csv_content,
|
||
mimetype="text/csv; charset=utf-8",
|
||
headers={"Content-Disposition": f"attachment; filename={filename}"},
|
||
)
|
||
|
||
|
||
@app.route("/box/<int:box_id>/quick-inbound", methods=["POST"])
|
||
def quick_inbound(box_id: int):
|
||
box = Box.query.get_or_404(box_id)
|
||
raw_lines = request.form.get("lines", "")
|
||
lines = [line.strip() for line in raw_lines.splitlines() if line.strip()]
|
||
if not lines:
|
||
return render_box_page(box, error="快速入库失败: 请至少输入一行")
|
||
|
||
components = Component.query.filter_by(box_id=box.id).all()
|
||
occupied_slots = {c.slot_index for c in components}
|
||
|
||
added_count = 0
|
||
skipped_lines = []
|
||
changed = False
|
||
|
||
for line_no, line in enumerate(lines, start=1):
|
||
parsed = _parse_bulk_line(line)
|
||
part_no = parsed["part_no"]
|
||
name = parsed["name"]
|
||
quantity_raw = parsed["quantity_raw"]
|
||
specification = parsed["specification"]
|
||
note = parsed["note"]
|
||
|
||
if not part_no or not name:
|
||
skipped_lines.append(f"第 {line_no} 行")
|
||
continue
|
||
|
||
try:
|
||
quantity = _parse_non_negative_int(quantity_raw, 0)
|
||
except ValueError:
|
||
skipped_lines.append(f"第 {line_no} 行")
|
||
continue
|
||
|
||
part_no_conflict = _find_enabled_part_no_conflict(part_no)
|
||
if part_no_conflict:
|
||
skipped_lines.append(
|
||
f"第 {line_no} 行({part_no} 已在 {_format_component_position(part_no_conflict)},需人工确认后再合并)"
|
||
)
|
||
continue
|
||
|
||
material_conflict = _find_enabled_material_conflict(
|
||
name=name,
|
||
specification=specification,
|
||
exclude_part_no=part_no,
|
||
)
|
||
if material_conflict:
|
||
skipped_lines.append(
|
||
"第 "
|
||
f"{line_no} 行(检测到同参数物料: {material_conflict.part_no} 已在 "
|
||
f"{_format_component_position(material_conflict)},需人工确认后再合并)"
|
||
)
|
||
continue
|
||
|
||
slot_index = _next_empty_slot_index(box, occupied_slots)
|
||
if slot_index is None:
|
||
skipped_lines.append(f"第 {line_no} 行(盒子已满)")
|
||
continue
|
||
|
||
item = Component(
|
||
box_id=box.id,
|
||
slot_index=slot_index,
|
||
part_no=part_no,
|
||
name=name,
|
||
quantity=quantity,
|
||
specification=specification or None,
|
||
note=note or None,
|
||
is_enabled=True,
|
||
)
|
||
db.session.add(item)
|
||
if quantity:
|
||
log_inventory_event(
|
||
event_type="quick_inbound_add",
|
||
delta=quantity,
|
||
box=box,
|
||
component=item,
|
||
part_no=part_no,
|
||
)
|
||
occupied_slots.add(slot_index)
|
||
added_count += 1
|
||
changed = True
|
||
|
||
if changed:
|
||
db.session.commit()
|
||
|
||
if added_count == 0:
|
||
return render_box_page(
|
||
box,
|
||
error="快速入库失败: 没有可导入的数据,请检查格式",
|
||
)
|
||
|
||
message = f"快速入库完成: 新增 {added_count} 条"
|
||
if skipped_lines:
|
||
message += ";跳过: " + ", ".join(skipped_lines)
|
||
return render_box_page(box, notice=message)
|
||
|
||
|
||
@app.route("/box/<int:box_id>/bags/add", methods=["POST"])
|
||
def add_bag_item(box_id: int):
|
||
box = Box.query.get_or_404(box_id)
|
||
if box.box_type != "bag":
|
||
return "当前盒子不是袋装清单", 400
|
||
|
||
part_no = request.form.get("part_no", "").strip()
|
||
name = request.form.get("name", "").strip()
|
||
specification = request.form.get("specification", "").strip()
|
||
note = request.form.get("note", "").strip()
|
||
|
||
if not part_no or not name:
|
||
return render_box_page(box, error="袋装新增失败: 料号和名称不能为空")
|
||
|
||
try:
|
||
quantity = _parse_non_negative_int(request.form.get("quantity", "0"), 0)
|
||
except ValueError:
|
||
return render_box_page(box, error="袋装新增失败: 数量必须是大于等于 0 的整数")
|
||
|
||
part_no_conflict = _find_enabled_part_no_conflict(part_no)
|
||
if part_no_conflict:
|
||
return render_box_page(
|
||
box,
|
||
error=(
|
||
f"袋装新增失败: 料号 {part_no} 已存在于 "
|
||
f"{_format_component_position(part_no_conflict)},需人工确认后再合并"
|
||
),
|
||
)
|
||
|
||
material_conflict = _find_enabled_material_conflict(
|
||
name=name,
|
||
specification=specification,
|
||
exclude_part_no=part_no,
|
||
)
|
||
if material_conflict:
|
||
return render_box_page(
|
||
box,
|
||
error=(
|
||
"袋装新增失败: 检测到同参数物料 "
|
||
f"{material_conflict.part_no} 已在 "
|
||
f"{_format_component_position(material_conflict)},需人工确认后再合并"
|
||
),
|
||
)
|
||
|
||
next_slot = (
|
||
db.session.query(db.func.max(Component.slot_index))
|
||
.filter(Component.box_id == box.id)
|
||
.scalar()
|
||
or 0
|
||
) + 1
|
||
|
||
item = Component(
|
||
box_id=box.id,
|
||
slot_index=next_slot,
|
||
part_no=part_no,
|
||
name=name,
|
||
quantity=quantity,
|
||
specification=specification or None,
|
||
note=note or None,
|
||
is_enabled=True,
|
||
)
|
||
db.session.add(item)
|
||
if quantity:
|
||
log_inventory_event(
|
||
event_type="bag_add",
|
||
delta=quantity,
|
||
box=box,
|
||
component=item,
|
||
part_no=part_no,
|
||
)
|
||
|
||
if next_slot > box.slot_capacity:
|
||
box.slot_capacity = next_slot
|
||
|
||
db.session.commit()
|
||
return render_box_page(box, notice="已新增 1 条袋装记录")
|
||
|
||
|
||
@app.route("/box/<int:box_id>/bags/batch", methods=["POST"])
|
||
def add_bag_items_batch(box_id: int):
|
||
box = Box.query.get_or_404(box_id)
|
||
if box.box_type != "bag":
|
||
return "当前盒子不是袋装清单", 400
|
||
|
||
raw_lines = request.form.get("lines", "")
|
||
lines = [line.strip() for line in raw_lines.splitlines() if line.strip()]
|
||
if not lines:
|
||
return render_box_page(box, error="批量新增失败: 请至少输入一行")
|
||
|
||
skipped_lines = []
|
||
added_count = 0
|
||
next_slot = (
|
||
db.session.query(db.func.max(Component.slot_index))
|
||
.filter(Component.box_id == box.id)
|
||
.scalar()
|
||
or 0
|
||
) + 1
|
||
for line_no, line in enumerate(lines, start=1):
|
||
parts = [p.strip() for p in re.split(r"[,\t]", line)]
|
||
while len(parts) < 5:
|
||
parts.append("")
|
||
|
||
part_no = parts[0]
|
||
name = parts[1]
|
||
quantity_raw = parts[2]
|
||
specification = parts[3]
|
||
note = parts[4]
|
||
|
||
if not part_no or not name:
|
||
skipped_lines.append(f"第 {line_no} 行(料号或名称为空)")
|
||
continue
|
||
|
||
try:
|
||
quantity = _parse_non_negative_int(quantity_raw, 0)
|
||
except ValueError:
|
||
skipped_lines.append(f"第 {line_no} 行(数量格式错误)")
|
||
continue
|
||
|
||
part_no_conflict = _find_enabled_part_no_conflict(part_no)
|
||
if part_no_conflict:
|
||
skipped_lines.append(
|
||
f"第 {line_no} 行({part_no} 已在 {_format_component_position(part_no_conflict)},需人工确认后再合并)"
|
||
)
|
||
continue
|
||
|
||
material_conflict = _find_enabled_material_conflict(
|
||
name=name,
|
||
specification=specification,
|
||
exclude_part_no=part_no,
|
||
)
|
||
if material_conflict:
|
||
skipped_lines.append(
|
||
"第 "
|
||
f"{line_no} 行(检测到同参数物料: {material_conflict.part_no} 已在 "
|
||
f"{_format_component_position(material_conflict)},需人工确认后再合并)"
|
||
)
|
||
continue
|
||
|
||
new_component = Component(
|
||
box_id=box.id,
|
||
slot_index=next_slot,
|
||
part_no=part_no,
|
||
name=name,
|
||
quantity=quantity,
|
||
specification=specification or None,
|
||
note=note or None,
|
||
is_enabled=True,
|
||
)
|
||
db.session.add(new_component)
|
||
if quantity:
|
||
log_inventory_event(
|
||
event_type="bag_batch_add",
|
||
delta=quantity,
|
||
box=box,
|
||
component=new_component,
|
||
part_no=part_no,
|
||
)
|
||
added_count += 1
|
||
next_slot += 1
|
||
|
||
if added_count:
|
||
box.slot_capacity = max(box.slot_capacity, next_slot - 1)
|
||
db.session.commit()
|
||
|
||
if skipped_lines and added_count == 0:
|
||
return render_box_page(
|
||
box,
|
||
error="批量新增失败: 没有可导入的数据(请检查: " + ", ".join(skipped_lines) + ")",
|
||
)
|
||
|
||
if skipped_lines:
|
||
return render_box_page(
|
||
box,
|
||
notice=f"已新增 {added_count} 条,以下行被跳过: " + ", ".join(skipped_lines),
|
||
)
|
||
|
||
return render_box_page(box, notice=f"批量新增成功:新增 {added_count} 条")
|
||
|
||
|
||
@app.route("/edit/<int:box_id>/<int:slot>", methods=["GET", "POST"])
|
||
def edit_component(box_id: int, slot: int):
|
||
box = Box.query.get_or_404(box_id)
|
||
if slot < 1 or slot > box.slot_capacity:
|
||
return "无效的格子编号", 400
|
||
|
||
search_query = request.args.get("q", "").strip()
|
||
notice = request.args.get("notice", "").strip()
|
||
error = request.args.get("error", "").strip()
|
||
component = Component.query.filter_by(box_id=box.id, slot_index=slot).first()
|
||
settings = _get_ai_settings()
|
||
lock_storage_mode = bool(settings.get("lock_storage_mode", False))
|
||
|
||
if request.method == "POST":
|
||
action = request.form.get("action", "save")
|
||
search_query_post = request.form.get("q", "").strip()
|
||
search_query_effective = search_query_post or search_query
|
||
delete_confirm_slot = request.form.get("delete_confirm_slot", "").strip().upper()
|
||
expected_slot_code = slot_code_for_box(box, slot).strip().upper()
|
||
|
||
if action == "delete":
|
||
if lock_storage_mode:
|
||
return render_template(
|
||
"edit.html",
|
||
box=box,
|
||
slot=slot,
|
||
slot_code=slot_code_for_box(box, slot),
|
||
component=component,
|
||
error="锁仓模式已开启,禁止删除位置绑定。",
|
||
notice=notice,
|
||
search_query=search_query_post,
|
||
lock_storage_mode=lock_storage_mode,
|
||
)
|
||
|
||
if delete_confirm_slot != expected_slot_code:
|
||
return render_template(
|
||
"edit.html",
|
||
box=box,
|
||
slot=slot,
|
||
slot_code=slot_code_for_box(box, slot),
|
||
component=component,
|
||
error=f"删除确认失败: 请输入当前位置编号 {expected_slot_code}",
|
||
notice=notice,
|
||
search_query=search_query_post,
|
||
lock_storage_mode=lock_storage_mode,
|
||
)
|
||
|
||
if component:
|
||
if component.is_enabled and component.quantity:
|
||
log_inventory_event(
|
||
event_type="component_delete",
|
||
delta=-int(component.quantity),
|
||
box=box,
|
||
component=component,
|
||
)
|
||
db.session.delete(component)
|
||
db.session.commit()
|
||
if search_query_effective:
|
||
return redirect(url_for("search_page", q=search_query_effective))
|
||
return redirect(url_for("view_box", box_id=box.id))
|
||
|
||
if action == "toggle_enable":
|
||
if component:
|
||
if not component.is_enabled:
|
||
component.is_enabled = True
|
||
if component.quantity:
|
||
log_inventory_event(
|
||
event_type="component_enable",
|
||
delta=int(component.quantity),
|
||
box=box,
|
||
component=component,
|
||
)
|
||
db.session.commit()
|
||
return redirect(url_for("edit_component", box_id=box.id, slot=slot, q=search_query_post))
|
||
|
||
if action == "toggle_disable":
|
||
if component:
|
||
if component.is_enabled:
|
||
component.is_enabled = False
|
||
if component.quantity:
|
||
log_inventory_event(
|
||
event_type="component_disable",
|
||
delta=-int(component.quantity),
|
||
box=box,
|
||
component=component,
|
||
)
|
||
db.session.commit()
|
||
return redirect(url_for("edit_component", box_id=box.id, slot=slot, q=search_query_post))
|
||
|
||
part_no = request.form.get("part_no", "").strip()
|
||
name = request.form.get("name", "").strip()
|
||
specification = request.form.get("specification", "").strip()
|
||
quantity_raw = request.form.get("quantity", "0").strip()
|
||
note = request.form.get("note", "").strip()
|
||
confirm_merge = _is_truthy_form_value(request.form.get("confirm_merge", ""))
|
||
confirm_position_change = _is_truthy_form_value(request.form.get("confirm_position_change", ""))
|
||
|
||
if not part_no or not name:
|
||
error = "料号和名称不能为空"
|
||
return render_template(
|
||
"edit.html",
|
||
box=box,
|
||
slot=slot,
|
||
slot_code=slot_code_for_box(box, slot),
|
||
component=component,
|
||
error=error,
|
||
notice=notice,
|
||
search_query=search_query_post,
|
||
lock_storage_mode=lock_storage_mode,
|
||
)
|
||
|
||
try:
|
||
quantity = _parse_non_negative_int(quantity_raw, 0)
|
||
except ValueError:
|
||
error = "数量必须是大于等于 0 的整数"
|
||
return render_template(
|
||
"edit.html",
|
||
box=box,
|
||
slot=slot,
|
||
slot_code=slot_code_for_box(box, slot),
|
||
component=component,
|
||
error=error,
|
||
notice=notice,
|
||
search_query=search_query_post,
|
||
lock_storage_mode=lock_storage_mode,
|
||
)
|
||
|
||
if lock_storage_mode and _is_slot_part_replacement(component, part_no):
|
||
error = "锁仓模式已开启,禁止替换当前位置绑定料号。"
|
||
return render_template(
|
||
"edit.html",
|
||
box=box,
|
||
slot=slot,
|
||
slot_code=slot_code_for_box(box, slot),
|
||
component=component,
|
||
error=error,
|
||
notice=notice,
|
||
search_query=search_query_post,
|
||
lock_storage_mode=lock_storage_mode,
|
||
)
|
||
|
||
if _is_slot_part_replacement(component, part_no) and not confirm_position_change:
|
||
error = (
|
||
"当前位已绑定到固定料号,检测到替换操作。"
|
||
"如需变更位置绑定,请勾选“我确认替换当前位物料”后再保存"
|
||
)
|
||
return render_template(
|
||
"edit.html",
|
||
box=box,
|
||
slot=slot,
|
||
slot_code=slot_code_for_box(box, slot),
|
||
component=component,
|
||
error=error,
|
||
notice=notice,
|
||
search_query=search_query_post,
|
||
lock_storage_mode=lock_storage_mode,
|
||
)
|
||
|
||
current_component_id = component.id if component is not None else None
|
||
part_no_conflict = _find_enabled_part_no_conflict(part_no, exclude_component_id=current_component_id)
|
||
material_conflict = _find_enabled_material_conflict(
|
||
name=name,
|
||
specification=specification,
|
||
exclude_component_id=current_component_id,
|
||
exclude_part_no=part_no,
|
||
)
|
||
conflict = part_no_conflict or material_conflict
|
||
if conflict and not confirm_merge:
|
||
conflict_reason = "同料号" if part_no_conflict else "同参数"
|
||
error = (
|
||
f"检测到{conflict_reason}物料已存在于 {_format_component_position(conflict)};"
|
||
"请勾选“人工确认后合并”再保存"
|
||
)
|
||
return render_template(
|
||
"edit.html",
|
||
box=box,
|
||
slot=slot,
|
||
slot_code=slot_code_for_box(box, slot),
|
||
component=component,
|
||
error=error,
|
||
notice=notice,
|
||
search_query=search_query_post,
|
||
lock_storage_mode=lock_storage_mode,
|
||
)
|
||
|
||
if conflict and confirm_merge:
|
||
_merge_into_existing_component(
|
||
target=conflict,
|
||
incoming_part_no=part_no,
|
||
incoming_name=name,
|
||
incoming_specification=specification,
|
||
incoming_note=note,
|
||
incoming_quantity=quantity,
|
||
source_component=component,
|
||
source_box=box,
|
||
)
|
||
db.session.commit()
|
||
|
||
target_box = Box.query.get(conflict.box_id)
|
||
target_slot = conflict.slot_index
|
||
notice_text = (
|
||
f"已人工确认合并到 {_format_component_position(conflict)},"
|
||
f"累计数量 {conflict.quantity}"
|
||
)
|
||
if target_box:
|
||
return redirect(
|
||
url_for(
|
||
"edit_component",
|
||
box_id=target_box.id,
|
||
slot=target_slot,
|
||
notice=notice_text,
|
||
)
|
||
)
|
||
return redirect(url_for("view_box", box_id=box.id, notice=notice_text))
|
||
|
||
old_enabled_qty = 0
|
||
if component is not None and component.is_enabled:
|
||
old_enabled_qty = int(component.quantity)
|
||
|
||
if component is None:
|
||
component = Component(box_id=box.id, slot_index=slot)
|
||
db.session.add(component)
|
||
|
||
component.part_no = part_no
|
||
component.name = name
|
||
component.specification = specification or None
|
||
component.quantity = quantity
|
||
component.note = note or None
|
||
if component.is_enabled is None:
|
||
component.is_enabled = True
|
||
|
||
new_enabled_qty = quantity if component.is_enabled else 0
|
||
delta = int(new_enabled_qty - old_enabled_qty)
|
||
if delta:
|
||
log_inventory_event(
|
||
event_type="component_save",
|
||
delta=delta,
|
||
box=box,
|
||
component=component,
|
||
part_no=part_no,
|
||
)
|
||
|
||
db.session.commit()
|
||
if search_query_effective:
|
||
return redirect(url_for("search_page", q=search_query_effective))
|
||
return redirect(url_for("view_box", box_id=box.id))
|
||
|
||
return render_template(
|
||
"edit.html",
|
||
box=box,
|
||
slot=slot,
|
||
slot_code=slot_code_for_box(box, slot),
|
||
component=component,
|
||
error=error,
|
||
notice=notice,
|
||
search_query=search_query,
|
||
lock_storage_mode=lock_storage_mode,
|
||
)
|
||
|
||
|
||
@app.route("/edit/<int:box_id>/<int:slot>/lcsc-import", methods=["POST"])
|
||
def lcsc_import_to_edit_slot(box_id: int, slot: int):
|
||
box = Box.query.get_or_404(box_id)
|
||
if slot < 1 or slot > box.slot_capacity:
|
||
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="目标位置超出当前容器范围"))
|
||
|
||
try:
|
||
quantity = _parse_non_negative_int(request.form.get("quantity", "0"), 0)
|
||
except ValueError:
|
||
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="数量必须是大于等于 0 的整数"))
|
||
|
||
settings = _get_ai_settings()
|
||
product_identifier = request.form.get("lcsc_product_id", "").strip()
|
||
try:
|
||
product = _fetch_lcsc_product_basic(product_identifier, settings)
|
||
except RuntimeError as exc:
|
||
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error=f"立创导入失败: {exc}"))
|
||
|
||
mapped = _map_lcsc_product_to_component(product)
|
||
if not mapped["part_no"] or not mapped["name"]:
|
||
return redirect(url_for("edit_component", box_id=box.id, slot=slot, error="立创导入失败: 商品信息缺少料号或名称"))
|
||
|
||
component = Component.query.filter_by(box_id=box.id, slot_index=slot).first()
|
||
confirm_merge = _is_truthy_form_value(request.form.get("confirm_merge", ""))
|
||
confirm_position_change = _is_truthy_form_value(request.form.get("confirm_position_change", ""))
|
||
|
||
if bool(settings.get("lock_storage_mode", False)) and _is_slot_part_replacement(component, mapped["part_no"]):
|
||
return redirect(
|
||
url_for(
|
||
"edit_component",
|
||
box_id=box.id,
|
||
slot=slot,
|
||
error="立创导入失败: 锁仓模式已开启,禁止替换当前位置绑定料号。",
|
||
)
|
||
)
|
||
|
||
if _is_slot_part_replacement(component, mapped["part_no"]) and not confirm_position_change:
|
||
return redirect(
|
||
url_for(
|
||
"edit_component",
|
||
box_id=box.id,
|
||
slot=slot,
|
||
error=(
|
||
"立创导入失败: 当前位已绑定固定料号。"
|
||
"如需替换,请勾选“我确认替换当前位物料”后再导入"
|
||
),
|
||
)
|
||
)
|
||
|
||
current_component_id = component.id if component is not None else None
|
||
part_no_conflict = _find_enabled_part_no_conflict(
|
||
mapped["part_no"],
|
||
exclude_component_id=current_component_id,
|
||
)
|
||
material_conflict = _find_enabled_material_conflict(
|
||
name=mapped["name"],
|
||
specification=mapped["specification"],
|
||
exclude_component_id=current_component_id,
|
||
exclude_part_no=mapped["part_no"],
|
||
)
|
||
conflict = part_no_conflict or material_conflict
|
||
if conflict and not confirm_merge:
|
||
conflict_reason = "同料号" if part_no_conflict else "同参数"
|
||
return redirect(
|
||
url_for(
|
||
"edit_component",
|
||
box_id=box.id,
|
||
slot=slot,
|
||
error=(
|
||
f"立创导入失败: 检测到{conflict_reason}物料已存在于 "
|
||
f"{_format_component_position(conflict)},请勾选人工确认后合并"
|
||
),
|
||
)
|
||
)
|
||
|
||
if conflict and confirm_merge:
|
||
_merge_into_existing_component(
|
||
target=conflict,
|
||
incoming_part_no=mapped["part_no"],
|
||
incoming_name=mapped["name"],
|
||
incoming_specification=mapped["specification"],
|
||
incoming_note=mapped["note"],
|
||
incoming_quantity=quantity,
|
||
source_component=component,
|
||
source_box=box,
|
||
)
|
||
db.session.commit()
|
||
|
||
target_box = Box.query.get(conflict.box_id)
|
||
target_slot = conflict.slot_index
|
||
notice_text = (
|
||
f"已人工确认合并到 {_format_component_position(conflict)},"
|
||
f"累计数量 {conflict.quantity}"
|
||
)
|
||
if target_box:
|
||
return redirect(
|
||
url_for(
|
||
"edit_component",
|
||
box_id=target_box.id,
|
||
slot=target_slot,
|
||
notice=notice_text,
|
||
)
|
||
)
|
||
return redirect(url_for("view_box", box_id=box.id, notice=notice_text))
|
||
|
||
old_enabled_qty = int(component.quantity or 0) if component and component.is_enabled else 0
|
||
|
||
if component is None:
|
||
component = Component(box_id=box.id, slot_index=slot)
|
||
db.session.add(component)
|
||
|
||
component.part_no = mapped["part_no"]
|
||
component.name = mapped["name"]
|
||
component.specification = mapped["specification"] or None
|
||
component.note = mapped["note"] or None
|
||
component.quantity = quantity
|
||
component.is_enabled = True
|
||
|
||
delta = int(component.quantity or 0) - old_enabled_qty
|
||
if delta:
|
||
log_inventory_event(
|
||
event_type="component_save",
|
||
delta=delta,
|
||
box=box,
|
||
component=component,
|
||
part_no=component.part_no,
|
||
)
|
||
|
||
db.session.commit()
|
||
slot_code = slot_code_for_box(box, slot)
|
||
return redirect(
|
||
url_for(
|
||
"edit_component",
|
||
box_id=box.id,
|
||
slot=slot,
|
||
notice=f"立创导入成功: {mapped['part_no']} 已写入 {slot_code}",
|
||
)
|
||
)
|
||
|
||
|
||
@app.route("/search")
|
||
def search_page():
|
||
keyword = request.args.get("q", "").strip()
|
||
notice = request.args.get("notice", "").strip()
|
||
error = request.args.get("error", "").strip()
|
||
results = []
|
||
|
||
if keyword:
|
||
raw_results = (
|
||
Component.query.join(Box, Box.id == Component.box_id)
|
||
.filter(
|
||
Component.is_enabled.is_(True),
|
||
db.or_(
|
||
Component.part_no.ilike(f"%{keyword}%"),
|
||
Component.name.ilike(f"%{keyword}%"),
|
||
),
|
||
)
|
||
.order_by(Component.part_no.asc(), Component.name.asc())
|
||
.all()
|
||
)
|
||
|
||
for c in raw_results:
|
||
box = Box.query.get(c.box_id)
|
||
results.append(
|
||
{
|
||
"component": c,
|
||
"box_name": box.name if box else f"盒 {c.box_id}",
|
||
"slot_code": slot_code_for_box(box, c.slot_index) if box else str(c.slot_index),
|
||
"edit_url": url_for("edit_component", box_id=c.box_id, slot=c.slot_index, q=keyword),
|
||
}
|
||
)
|
||
|
||
return render_template(
|
||
"search.html",
|
||
keyword=keyword,
|
||
results=results,
|
||
notice=notice,
|
||
error=error,
|
||
)
|
||
|
||
|
||
@app.route("/ai/restock-plan", methods=["POST"])
|
||
def ai_restock_plan():
|
||
"""生成 AI 补货建议。
|
||
|
||
中文说明:优先调用 AI 输出 JSON 结构化补货方案;如果 AI 调用失败或返回格式异常,
|
||
会自动退回到规则生成的兜底方案,保证页面始终有结果可展示。
|
||
"""
|
||
ai_settings = _get_ai_settings()
|
||
data = _build_restock_payload(
|
||
limit=ai_settings["restock_limit"],
|
||
threshold=ai_settings["restock_threshold"],
|
||
)
|
||
|
||
def _empty_plan(summary: str) -> dict:
|
||
return {
|
||
"summary": summary,
|
||
"urgent": [],
|
||
"this_week": [],
|
||
"defer": [],
|
||
}
|
||
|
||
def _normalize_item(raw_item: dict) -> dict:
|
||
if not isinstance(raw_item, dict):
|
||
return {
|
||
"part_no": "-",
|
||
"name": "未命名元件",
|
||
"suggest_qty": "待确认",
|
||
"reason": "AI 返回项格式异常",
|
||
}
|
||
return {
|
||
"part_no": str(raw_item.get("part_no", "-") or "-").strip() or "-",
|
||
"name": str(raw_item.get("name", "未命名元件") or "未命名元件").strip(),
|
||
"suggest_qty": str(raw_item.get("suggest_qty", "待确认") or "待确认").strip(),
|
||
"reason": str(raw_item.get("reason", "") or "").strip() or "无",
|
||
}
|
||
|
||
def _normalize_plan(raw_plan: dict, default_summary: str) -> dict:
|
||
if not isinstance(raw_plan, dict):
|
||
return _empty_plan(default_summary)
|
||
|
||
summary = str(raw_plan.get("summary", "") or "").strip() or default_summary
|
||
|
||
def to_items(key: str):
|
||
rows = raw_plan.get(key, [])
|
||
if not isinstance(rows, list):
|
||
return []
|
||
return [_normalize_item(row) for row in rows]
|
||
|
||
return {
|
||
"summary": summary,
|
||
"urgent": to_items("urgent"),
|
||
"this_week": to_items("this_week"),
|
||
"defer": to_items("defer"),
|
||
}
|
||
|
||
def _extract_json_block(raw_text: str) -> str:
|
||
text = (raw_text or "").strip()
|
||
if not text:
|
||
return ""
|
||
if text.startswith("```"):
|
||
text = re.sub(r"^```(?:json)?\\s*", "", text)
|
||
text = re.sub(r"\\s*```$", "", text)
|
||
first = text.find("{")
|
||
last = text.rfind("}")
|
||
if first >= 0 and last > first:
|
||
return text[first : last + 1]
|
||
return text
|
||
|
||
def _build_rule_based_plan() -> dict:
|
||
threshold = int(data.get("threshold", LOW_STOCK_THRESHOLD))
|
||
urgent = []
|
||
this_week = []
|
||
|
||
for idx, item in enumerate(data.get("low_stock_items", [])):
|
||
qty = int(item.get("quantity", 0) or 0)
|
||
suggest_qty = max(threshold * 2 - qty, 1)
|
||
row = {
|
||
"part_no": item.get("part_no", "-") or "-",
|
||
"name": item.get("name", "未命名元件") or "未命名元件",
|
||
"suggest_qty": str(suggest_qty),
|
||
"reason": f"当前库存 {qty},低于阈值 {threshold}",
|
||
}
|
||
if idx < 5:
|
||
urgent.append(row)
|
||
else:
|
||
this_week.append(row)
|
||
|
||
return {
|
||
"summary": "已按规则生成兜底补货建议(AI 输出异常时使用)",
|
||
"urgent": urgent,
|
||
"this_week": this_week,
|
||
"defer": [],
|
||
}
|
||
|
||
if not data["low_stock_items"]:
|
||
plan = _empty_plan("当前没有低库存元件,暂不需要补货。")
|
||
return {
|
||
"ok": True,
|
||
"suggestion": "当前没有低库存元件,暂不需要补货。",
|
||
"plan": plan,
|
||
"data": data,
|
||
}
|
||
|
||
system_prompt = (
|
||
"你是电子元器件库存助手。"
|
||
"必须只输出 JSON,不要 Markdown,不要解释文字。"
|
||
"输出结构必须是: "
|
||
"{\"summary\":string,\"urgent\":[item],\"this_week\":[item],\"defer\":[item]}。"
|
||
"item 结构: {\"part_no\":string,\"name\":string,\"suggest_qty\":string,\"reason\":string}。"
|
||
"各数组允许为空。"
|
||
)
|
||
user_prompt = "库存数据如下(JSON):\n" + json.dumps(data, ensure_ascii=False)
|
||
|
||
try:
|
||
suggestion = _call_siliconflow_chat(
|
||
system_prompt,
|
||
user_prompt,
|
||
api_url=ai_settings["api_url"],
|
||
model=ai_settings["model"],
|
||
api_key=ai_settings["api_key"],
|
||
timeout=ai_settings["timeout"],
|
||
)
|
||
except RuntimeError as exc:
|
||
fallback_plan = _build_rule_based_plan()
|
||
return {
|
||
"ok": False,
|
||
"message": str(exc),
|
||
"plan": fallback_plan,
|
||
"data": data,
|
||
}, 400
|
||
|
||
parse_warning = ""
|
||
try:
|
||
parsed_plan = json.loads(_extract_json_block(suggestion))
|
||
plan = _normalize_plan(parsed_plan, "已生成 AI 补货建议")
|
||
except json.JSONDecodeError:
|
||
plan = _build_rule_based_plan()
|
||
parse_warning = "AI 返回格式异常,已切换到规则兜底建议。"
|
||
|
||
return {
|
||
"ok": True,
|
||
"suggestion": suggestion,
|
||
"plan": plan,
|
||
"parse_warning": parse_warning,
|
||
"data": data,
|
||
}
|
||
|
||
|
||
@app.route("/ai/settings", methods=["GET", "POST"])
|
||
def ai_settings_page():
|
||
settings = _get_ai_settings()
|
||
error = ""
|
||
notice = request.args.get("notice", "").strip()
|
||
|
||
if request.method == "POST":
|
||
api_url = request.form.get("api_url", "").strip()
|
||
model = request.form.get("model", "").strip()
|
||
api_key = request.form.get("api_key", "").strip()
|
||
lcsc_app_id = request.form.get("lcsc_app_id", "").strip()
|
||
lcsc_access_key = request.form.get("lcsc_access_key", "").strip()
|
||
lcsc_secret_key = request.form.get("lcsc_secret_key", "").strip()
|
||
lock_storage_mode = _is_truthy_form_value(request.form.get("lock_storage_mode", ""))
|
||
|
||
try:
|
||
timeout = int((request.form.get("timeout", "30") or "30").strip())
|
||
if timeout < 5:
|
||
raise ValueError
|
||
except ValueError:
|
||
error = "超时时间必须是大于等于 5 的整数"
|
||
timeout = settings["timeout"]
|
||
|
||
try:
|
||
restock_threshold = int((request.form.get("restock_threshold", "5") or "5").strip())
|
||
if restock_threshold < 0:
|
||
raise ValueError
|
||
except ValueError:
|
||
if not error:
|
||
error = "低库存阈值必须是大于等于 0 的整数"
|
||
restock_threshold = settings["restock_threshold"]
|
||
|
||
try:
|
||
restock_limit = int((request.form.get("restock_limit", "24") or "24").strip())
|
||
if restock_limit < 1:
|
||
raise ValueError
|
||
except ValueError:
|
||
if not error:
|
||
error = "补货条目数必须是大于等于 1 的整数"
|
||
restock_limit = settings["restock_limit"]
|
||
|
||
try:
|
||
lcsc_timeout = int((request.form.get("lcsc_timeout", "20") or "20").strip())
|
||
if lcsc_timeout < 5:
|
||
raise ValueError
|
||
except ValueError:
|
||
if not error:
|
||
error = "立创接口超时时间必须是大于等于 5 的整数"
|
||
lcsc_timeout = settings["lcsc_timeout"]
|
||
|
||
if not api_url and not error:
|
||
error = "API URL 不能为空"
|
||
if not model and not error:
|
||
error = "模型名称不能为空"
|
||
if (not lcsc_app_id or not lcsc_access_key or not lcsc_secret_key) and not error:
|
||
error = "立创接口需要填写 app_id、access_key、secret_key"
|
||
|
||
if not error:
|
||
settings = {
|
||
"api_url": api_url,
|
||
"model": model,
|
||
"api_key": api_key,
|
||
"timeout": timeout,
|
||
"restock_threshold": restock_threshold,
|
||
"restock_limit": restock_limit,
|
||
"lcsc_base_url": LCSC_BASE_URL,
|
||
"lcsc_basic_path": LCSC_BASIC_PATH,
|
||
"lcsc_timeout": lcsc_timeout,
|
||
"lcsc_app_id": lcsc_app_id,
|
||
"lcsc_access_key": lcsc_access_key,
|
||
"lcsc_secret_key": lcsc_secret_key,
|
||
"lock_storage_mode": lock_storage_mode,
|
||
}
|
||
_save_ai_settings(settings)
|
||
return redirect(url_for("ai_settings_page", notice="AI参数已保存"))
|
||
|
||
settings.update(
|
||
{
|
||
"api_url": api_url,
|
||
"model": model,
|
||
"api_key": api_key,
|
||
"timeout": timeout,
|
||
"restock_threshold": restock_threshold,
|
||
"restock_limit": restock_limit,
|
||
"lcsc_base_url": LCSC_BASE_URL,
|
||
"lcsc_basic_path": LCSC_BASIC_PATH,
|
||
"lcsc_timeout": lcsc_timeout,
|
||
"lcsc_app_id": lcsc_app_id,
|
||
"lcsc_access_key": lcsc_access_key,
|
||
"lcsc_secret_key": lcsc_secret_key,
|
||
"lock_storage_mode": lock_storage_mode,
|
||
}
|
||
)
|
||
|
||
return render_template(
|
||
"ai_settings.html",
|
||
settings=settings,
|
||
notice=notice,
|
||
error=error,
|
||
)
|
||
|
||
|
||
@app.route("/component/<int:component_id>/outbound", methods=["POST"])
|
||
def quick_outbound(component_id: int):
|
||
component = Component.query.get_or_404(component_id)
|
||
keyword = request.form.get("q", "").strip()
|
||
|
||
try:
|
||
amount = _parse_non_negative_int(request.form.get("amount", "0"), 0)
|
||
except ValueError:
|
||
return redirect(url_for("search_page", q=keyword, error="出库数量必须是大于等于 0 的整数"))
|
||
|
||
if amount <= 0:
|
||
return redirect(url_for("search_page", q=keyword, error="出库数量必须大于 0"))
|
||
|
||
if not component.is_enabled:
|
||
return redirect(url_for("search_page", q=keyword, error="该元件已停用,不能出库"))
|
||
|
||
if amount > int(component.quantity or 0):
|
||
return redirect(url_for("search_page", q=keyword, error="出库数量超过当前库存"))
|
||
|
||
component.quantity = int(component.quantity or 0) - amount
|
||
box = Box.query.get(component.box_id)
|
||
log_inventory_event(
|
||
event_type="component_outbound",
|
||
delta=-amount,
|
||
box=box,
|
||
component=component,
|
||
part_no=component.part_no,
|
||
)
|
||
db.session.commit()
|
||
|
||
slot_code = slot_code_for_box(box, component.slot_index) if box else str(component.slot_index)
|
||
notice = f"出库成功: {component.part_no} -{amount}({slot_code})"
|
||
return redirect(url_for("search_page", q=keyword, notice=notice))
|
||
|
||
|
||
@app.route("/stats")
|
||
def stats_page():
|
||
"""统计页。
|
||
|
||
中文说明:这里会按筛选条件汇总当前库存、低库存数量、操作次数、趋势线和分类排行,
|
||
属于展示层用到的集中统计入口。
|
||
"""
|
||
days = parse_days_value(request.args.get("days", "7"))
|
||
box_type_filter = parse_box_type_filter(request.args.get("box_type", "all"))
|
||
notice = request.args.get("notice", "").strip()
|
||
|
||
boxes = Box.query.all()
|
||
box_by_id = {box.id: box for box in boxes}
|
||
components = Component.query.all()
|
||
|
||
all_enabled_components = [c for c in components if c.is_enabled]
|
||
overall_total_quantity = sum(c.quantity for c in all_enabled_components)
|
||
|
||
enabled_components = [
|
||
c
|
||
for c in components
|
||
if c.is_enabled and (
|
||
box_type_filter == "all"
|
||
or (box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type_filter)
|
||
)
|
||
]
|
||
total_quantity = sum(c.quantity for c in enabled_components)
|
||
total_items = len(enabled_components)
|
||
low_stock_count = len([c for c in enabled_components if c.quantity < LOW_STOCK_THRESHOLD])
|
||
inventory_share = (
|
||
round(total_quantity * 100 / overall_total_quantity, 1) if overall_total_quantity > 0 else 0.0
|
||
)
|
||
|
||
start_day = datetime.now().date() - timedelta(days=days - 1)
|
||
event_query = InventoryEvent.query.filter(
|
||
InventoryEvent.created_at >= datetime.combine(start_day, datetime.min.time())
|
||
)
|
||
if box_type_filter != "all":
|
||
event_query = event_query.filter_by(box_type=box_type_filter)
|
||
period_operation_count = event_query.count()
|
||
|
||
active_days = len(
|
||
{
|
||
_to_date(row[0])
|
||
for row in event_query.with_entities(db.func.date(InventoryEvent.created_at)).all()
|
||
if row and row[0]
|
||
}
|
||
)
|
||
|
||
totals_by_type = {}
|
||
for box_type in BOX_TYPES.keys():
|
||
totals_by_type[box_type] = sum(
|
||
c.quantity
|
||
for c in components
|
||
if c.is_enabled and box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type
|
||
)
|
||
|
||
category_stats = []
|
||
for key, meta in BOX_TYPES.items():
|
||
category_components = [
|
||
c
|
||
for c in enabled_components
|
||
if box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == key
|
||
]
|
||
category_stats.append(
|
||
{
|
||
"key": key,
|
||
"label": meta["label"],
|
||
"item_count": len(category_components),
|
||
"quantity": sum(c.quantity for c in category_components),
|
||
}
|
||
)
|
||
|
||
category_stats.sort(key=lambda row: row["quantity"], reverse=True)
|
||
max_category_quantity = max([row["quantity"] for row in category_stats], default=0)
|
||
|
||
chart_mode = "category"
|
||
chart_title = "分类占比"
|
||
chart_rows = category_stats
|
||
max_chart_quantity = max_category_quantity
|
||
|
||
if box_type_filter != "all":
|
||
chart_mode = "component"
|
||
chart_title = "分类内元件库存 Top"
|
||
bucket = {}
|
||
for c in enabled_components:
|
||
key = (c.part_no or "", c.name or "")
|
||
if key not in bucket:
|
||
bucket[key] = 0
|
||
bucket[key] += int(c.quantity or 0)
|
||
|
||
component_rows = []
|
||
for (part_no, name), quantity in bucket.items():
|
||
label = name or part_no or "未命名元件"
|
||
if part_no:
|
||
label = f"{label} ({part_no})"
|
||
component_rows.append({"label": label, "quantity": quantity, "item_count": 1})
|
||
|
||
component_rows.sort(key=lambda row: row["quantity"], reverse=True)
|
||
chart_rows = component_rows[:8]
|
||
max_chart_quantity = max([row["quantity"] for row in chart_rows], default=0)
|
||
|
||
trend_points = build_trend_points_from_events(
|
||
days=days,
|
||
total_quantity=total_quantity,
|
||
box_type_filter=box_type_filter,
|
||
)
|
||
period_net_change = 0
|
||
if len(trend_points) >= 2:
|
||
period_net_change = trend_points[-1]["value"] - trend_points[0]["value"]
|
||
|
||
min_value = min([point["value"] for point in trend_points], default=0)
|
||
max_value = max([point["value"] for point in trend_points], default=0)
|
||
value_span = max(max_value - min_value, 1)
|
||
|
||
svg_points = []
|
||
if trend_points:
|
||
width = 520
|
||
height = 180
|
||
step_x = width / max(len(trend_points) - 1, 1)
|
||
for idx, point in enumerate(trend_points):
|
||
x = idx * step_x
|
||
y = height - ((point["value"] - min_value) / value_span) * height
|
||
svg_points.append(f"{x:.2f},{y:.2f}")
|
||
|
||
box_type_series_raw = build_box_type_trend_series(days=days, totals_by_type=totals_by_type)
|
||
box_type_series = []
|
||
for key, meta in BOX_TYPES.items():
|
||
values = box_type_series_raw["series"].get(key, [])
|
||
delta = values[-1] - values[0] if len(values) >= 2 else 0
|
||
box_type_series.append(
|
||
{
|
||
"key": key,
|
||
"label": meta["label"],
|
||
"sparkline": make_sparkline(values),
|
||
"latest": values[-1] if values else 0,
|
||
"delta": delta,
|
||
}
|
||
)
|
||
|
||
activity_rows = recent_events(limit=20, box_type_filter=box_type_filter)
|
||
|
||
return render_template(
|
||
"stats.html",
|
||
notice=notice,
|
||
days=days,
|
||
box_type_filter=box_type_filter,
|
||
box_types=BOX_TYPES,
|
||
total_quantity=total_quantity,
|
||
total_items=total_items,
|
||
low_stock_count=low_stock_count,
|
||
category_stats=category_stats,
|
||
max_category_quantity=max_category_quantity,
|
||
chart_mode=chart_mode,
|
||
chart_title=chart_title,
|
||
chart_rows=chart_rows,
|
||
max_chart_quantity=max_chart_quantity,
|
||
trend_points=trend_points,
|
||
trend_polyline=" ".join(svg_points),
|
||
min_value=min_value,
|
||
max_value=max_value,
|
||
period_net_change=period_net_change,
|
||
overall_total_quantity=overall_total_quantity,
|
||
inventory_share=inventory_share,
|
||
period_operation_count=period_operation_count,
|
||
active_days=active_days,
|
||
box_type_series=box_type_series,
|
||
activity_rows=activity_rows,
|
||
)
|
||
|
||
|
||
@app.route("/stats/export")
|
||
def stats_export_csv():
|
||
days = parse_days_value(request.args.get("days", "7"))
|
||
box_type_filter = parse_box_type_filter(request.args.get("box_type", "all"))
|
||
|
||
boxes = Box.query.all()
|
||
box_by_id = {box.id: box for box in boxes}
|
||
components = Component.query.all()
|
||
|
||
enabled_components = [
|
||
c
|
||
for c in components
|
||
if c.is_enabled and (
|
||
box_type_filter == "all"
|
||
or (box_by_id.get(c.box_id) and box_by_id[c.box_id].box_type == box_type_filter)
|
||
)
|
||
]
|
||
total_quantity = sum(c.quantity for c in enabled_components)
|
||
trend_points = build_trend_points_from_events(days, total_quantity, box_type_filter)
|
||
delta_by_day = query_event_daily_delta(days, box_type_filter)
|
||
|
||
output = StringIO()
|
||
writer = csv.writer(output)
|
||
writer.writerow(["date", "inventory_total", "daily_delta", "days", "box_type_filter"])
|
||
for point in trend_points:
|
||
writer.writerow(
|
||
[
|
||
point["date"].isoformat(),
|
||
point["value"],
|
||
int(delta_by_day.get(point["date"], 0)),
|
||
days,
|
||
box_type_filter,
|
||
]
|
||
)
|
||
|
||
csv_content = output.getvalue()
|
||
output.close()
|
||
filename = f"inventory_stats_{box_type_filter}_{days}d.csv"
|
||
|
||
return Response(
|
||
csv_content,
|
||
mimetype="text/csv; charset=utf-8",
|
||
headers={"Content-Disposition": f"attachment; filename={filename}"},
|
||
)
|
||
|
||
|
||
@app.route("/stats/clear", methods=["POST"])
|
||
def clear_stats_logs():
|
||
days = parse_days_value(request.form.get("days", "7"))
|
||
box_type_filter = parse_box_type_filter(request.form.get("box_type", "all"))
|
||
clear_scope = (request.form.get("scope", "current") or "current").strip()
|
||
|
||
if clear_scope == "all":
|
||
deleted_count = InventoryEvent.query.delete()
|
||
db.session.commit()
|
||
notice = f"已清除全部统计日志,共 {deleted_count} 条"
|
||
return redirect(url_for("stats_page", days=days, box_type="all", notice=notice))
|
||
|
||
query = InventoryEvent.query
|
||
if box_type_filter != "all":
|
||
query = query.filter_by(box_type=box_type_filter)
|
||
deleted_count = query.delete(synchronize_session=False)
|
||
db.session.commit()
|
||
|
||
if box_type_filter == "all":
|
||
notice = f"已清除当前筛选(全部分类)统计日志,共 {deleted_count} 条"
|
||
else:
|
||
label = BOX_TYPES.get(box_type_filter, {}).get("label", box_type_filter)
|
||
notice = f"已清除当前筛选({label})统计日志,共 {deleted_count} 条"
|
||
|
||
return redirect(url_for("stats_page", days=days, box_type=box_type_filter, notice=notice))
|
||
|
||
|
||
def bootstrap() -> None:
|
||
"""应用启动时初始化数据库。
|
||
|
||
中文说明:启动顺序是“建表 -> 补字段 -> 修历史数据”,这样新旧数据库都能正常启动。
|
||
"""
|
||
with app.app_context():
|
||
db.create_all()
|
||
ensure_schema()
|
||
normalize_legacy_data()
|
||
|
||
|
||
bootstrap()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app.run(host="0.0.0.0", port=5000, debug=True)
|