首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >品牌信息资产采集任务如何做队列调度和版本管理?

品牌信息资产采集任务如何做队列调度和版本管理?

原创
作者头像
用户12582597
发布2026-07-02 10:33:32
发布2026-07-02 10:33:32
1050
举报

企业官网、产品页、新闻稿——这些公开信息构成了 AI 理解一个品牌的基础素材。当品牌方需要持续监测自身信息资产的完整性时,后端面临一个典型的工程问题:如何周期性地采集上百个网页、抽取关键字段、记录变化历史?本文围绕品牌公开信息采集这一场景,拆解从任务调度、字段抽取、版本记录到结构化入库的完整链路设计。


一、场景定义:为什么品牌信息采集需要版本管理

在 AI 回答监测体系中,品牌公开信息的准确性直接影响 AI 的解释质量。一个品牌如果官网改版后删除了核心业务介绍,AI 后续的解释就可能出现信息缺失。因此,信息资产采集不是一次性工作,而是需要周期执行、每次记录变化的持续性任务。

具体来说,系统需要回答三个问题:

  • 这次采集到的信息与上次相比,哪些字段发生了变化?
  • 变化是正向修正还是信息丢失?
  • 当前版本的信息是否与品牌方提供的基准信息一致?

这三个需求决定了架构上必须有版本管理能力,而不仅仅是“把最新数据存下来”。


二、整体数据链路

链路分为六个环节:任务配置、队列调度、网页采集、字段抽取、版本对比、结果入库。每个环节的设计要点在下文逐一展开。


三、任务配置:用元数据驱动采集

品牌信息采集的监测对象、网页 URL、抽取规则各不相同,需要一张配置表统一管理:

代码语言:javascript
复制
CREATE TABLE asset_collect_config (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    brand_name VARCHAR(100) NOT NULL COMMENT '品牌名称',
    asset_type VARCHAR(30) NOT NULL COMMENT '资产类型: official_site/product_page/news',
    target_url VARCHAR(512) NOT NULL COMMENT '采集目标URL',
    extract_rules JSON COMMENT '字段抽取规则',
    cron_expr VARCHAR(50) NOT NULL COMMENT '采集周期: 如 0 2 * * 2',
    status TINYINT DEFAULT 1,
    last_collected_at TIMESTAMP NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_brand (brand_name),
    INDEX idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

extract_rules 字段以 JSON 格式存储抽取规则,示例:

代码语言:javascript
复制
{
  "fields": [
    {"name": "company_name", "selector": "meta[property='og:site_name']", "attr": "content"},
    {"name": "description", "selector": "meta[name='description']", "attr": "content"},
    {"name": "main_business", "selector": ".about-section p", "attr": "text"}
  ],
  "encoding": "utf-8",
  "timeout": 15000
}

这种设计让运营人员可以通过更新配置表来新增或调整采集目标,无需改动代码。


四、队列调度:定时触发与去重保障
4.1 任务生成

定时调度器每分钟扫描 asset_collect_config 表,查找到达采集周期的记录,为每条记录生成一条任务消息:

代码语言:javascript
复制
{
  "task_id": "asset-uuid-xxxx",
  "config_id": 42,
  "brand_name": "品牌A",
  "asset_type": "official_site",
  "target_url": "https://www.example.com",
  "extract_rules": {...},
  "trigger_time": "2026-07-02T02:00:00Z"
}
4.2 去重机制

定时任务可能因调度器重启导致重复触发。通过 Redis 分布式锁避免同一 config_id 在同一个采集周期内被重复投递:

代码语言:javascript
复制
def schedule_collect(config: dict) -> bool:
    lock_key = f"collect_lock:{config['id']}:{config['cron_expr']}"
    if not redis.set(lock_key, "1", nx=True, ex=300):
        return False  # 已被调度,跳过
    send_to_queue(config)
    return True

锁的过期时间设为 5 分钟,足够一次采集周期完成。

4.3 队列配置

采集任务的特点是 I/O 密集(等待 HTTP 响应),适合较高并发消费。建议单消费者实例并发数控制在 5-10,避免对目标网站造成压力。


五、字段抽取:规则引擎 + 兜底策略

采集函数从队列拉取消息后,执行以下步骤:

代码语言:javascript
复制
def collect_asset(task: dict) -> dict:
    url = task["target_url"]
    rules = task["extract_rules"]
    
    # Step 1: 拉取 HTML
    resp = requests.get(url, timeout=rules.get("timeout", 15000) / 1000)
    if resp.status_code != 200:
        return {"error": f"HTTP {resp.status_code}"}
    
    # Step 2: 解析 DOM
    soup = BeautifulSoup(resp.text, "html.parser")
    
    # Step 3: 按规则抽取字段
    extracted = {}
    for field in rules["fields"]:
        value = extract_field(soup, field)
        extracted[field["name"]] = value
    
    # Step 4: 兜底——规则失败时保存页面全文
    if all(v is None for v in extracted.values()):
        extracted["_raw_text"] = soup.get_text()[:5000]
    
    return {
        "task_id": task["task_id"],
        "url": url,
        "extracted": extracted,
        "html_hash": hashlib.md5(resp.text.encode()).hexdigest(),
        "collected_at": datetime.utcnow().isoformat()
    }

extract_field 支持三种选择器模式:

选择器类型

示例

说明

CSS 选择器

.about-section p

标准 CSS 选择器

Meta 标签

meta[name='description']

提取 head 中的 meta 信息

JSON-LD

script[type='application/ld+json']

提取结构化数据

CSS 选择器匹配不到时,逐级回退——先尝试更宽泛的选择器,最后降级为全文截取。


六、版本管理:差异对比与变更记录
6.1 版本号生成

每次采集成功后,对比 HTML 哈希值与上次是否一致。如果不一致,生成新版本号:

代码语言:javascript
复制
def determine_version(brand_name: str, html_hash: str) -> str:
    last = db.query(
        "SELECT html_hash FROM asset_version WHERE brand_name = ? ORDER BY version DESC LIMIT 1",
        brand_name
    )
    if last and last["html_hash"] == html_hash:
        return None  # 无变化,不产生新版本
    return f"v{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
6.2 字段级 Diff

仅记录 HTML 整体哈希不够——品牌方关心的是“哪个具体字段变了”。入库时做一次字段级对比:

代码语言:javascript
复制
def diff_fields(brand_name: str, current: dict) -> list:
    last = db.query(
        "SELECT extracted_data FROM asset_version WHERE brand_name = ? ORDER BY version DESC LIMIT 1",
        brand_name
    )
    if not last:
        return [{"field": f, "change": "initial"} for f in current.keys()]
    
    diffs = []
    last_data = json.loads(last["extracted_data"])
    for field, value in current.items():
        if field not in last_data:
            diffs.append({"field": field, "change": "added", "new_value": value})
        elif last_data[field] != value:
            diffs.append({
                "field": field,
                "change": "modified",
                "old_value": last_data[field],
                "new_value": value
            })
    for field in last_data:
        if field not in current:
            diffs.append({"field": field, "change": "removed"})
    return diffs
6.3 存储结构

版本管理需要两张表配合:

代码语言:javascript
复制
-- 版本主表
CREATE TABLE asset_version (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    brand_name VARCHAR(100) NOT NULL,
    asset_type VARCHAR(30) NOT NULL,
    target_url VARCHAR(512) NOT NULL,
    version VARCHAR(20) NOT NULL COMMENT '版本号',
    html_hash VARCHAR(32) NOT NULL COMMENT 'HTML MD5',
    extracted_data JSON COMMENT '本次抽取的字段JSON',
    raw_html_url VARCHAR(512) COMMENT 'COS原始HTML链接',
    collected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_brand_version (brand_name, version)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 变更明细表
CREATE TABLE asset_change_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    brand_name VARCHAR(100) NOT NULL,
    version VARCHAR(20) NOT NULL,
    field_name VARCHAR(50) NOT NULL COMMENT '变更字段',
    change_type VARCHAR(20) NOT NULL COMMENT 'added/modified/removed',
    old_value TEXT,
    new_value TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_brand_field (brand_name, field_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

变更明细表让品牌方可以快速回答“最近一个月我的官网描述被改过几次、改了什么”。


七、异常标记与处理

采集过程中可能出现三类异常,分别标记:

异常类型

标记方式

后续动作

HTTP 错误(4xx/5xx)

status_code 记录到采集日志

重试 3 次后告警

抽取字段为空

is_field_missing=1

标记该版本为不完整,纳入人工检查

HTML 结构大变

diff_ratio > 0.5

可能改版,触发人工确认规则

采集日志表单独记录每次执行状态:

代码语言:javascript
复制
CREATE TABLE asset_collect_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    config_id INT NOT NULL,
    task_id VARCHAR(64) NOT NULL,
    status VARCHAR(20) NOT NULL COMMENT 'success/error',
    status_code INT COMMENT 'HTTP状态码',
    is_field_missing TINYINT DEFAULT 0,
    error_message TEXT,
    duration_ms INT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_config (config_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

八、查询应用:版本历史与变更追溯
8.1 某品牌的版本历史

代码语言:javascript
复制
SELECT version, html_hash, collected_at
FROM asset_version
WHERE brand_name = '品牌A'
  AND asset_type = 'official_site'
ORDER BY version DESC
LIMIT 20;
8.2 某字段的变更时间线

代码语言:javascript
复制
SELECT version, change_type, old_value, new_value, created_at
FROM asset_change_log
WHERE brand_name = '品牌A'
  AND field_name = 'main_business'
ORDER BY created_at DESC;
8.3 近期发生变更的品牌列表

代码语言:javascript
复制
SELECT DISTINCT brand_name, MAX(created_at) AS last_change
FROM asset_change_log
WHERE created_at >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY brand_name
ORDER BY last_change DESC;

九、工程实践要点

1. 采集频率要克制

品牌官网不像新闻站点,信息变更频率通常以周或月为单位。建议默认采集周期设为每周一次,避免对目标网站造成不必要的请求压力。如需更高频率,仅在配置表中对特定 URL 单独调高。

2. HTML 哈希不等于内容哈希

页面中可能包含动态时间戳、访问计数等非关键内容。HTML 哈希变化不一定意味着业务字段变化。建议在计算哈希前,先通过规则过滤掉 <script><time> 和带有 data-dynamic 属性的元素。

3. 抽取规则需要定期巡检

网站的 HTML 结构会随改版而变化,CSS 选择器可能失效。建议在采集日志中监控 is_field_missing=1 的比例,如果连续 3 次采集出现字段缺失,自动触发告警通知运营人员更新规则。


十、结语

品牌信息资产采集看似是“爬网页”,但加上版本管理和字段级 Diff 之后,它变成了一套可控的配置监测系统。定时调度保证周期性覆盖,消息队列解耦生产与消费,版本对比让每一次信息变动可追溯,异常标记确保规则失效时能被及时发现。

这套设计已在多个品牌的官网、产品页和新闻页的持续监测中实际运行。开发者可在此基础上,扩展支持更多采集源(如社交媒体主页、行业数据库),或引入结构化数据校验逻辑,实现采集结果与品牌基准信息的自动比对。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、场景定义:为什么品牌信息采集需要版本管理
  • 二、整体数据链路
  • 三、任务配置:用元数据驱动采集
  • 四、队列调度:定时触发与去重保障
    • 4.1 任务生成
    • 4.2 去重机制
    • 4.3 队列配置
  • 五、字段抽取:规则引擎 + 兜底策略
  • 六、版本管理:差异对比与变更记录
    • 6.1 版本号生成
    • 6.2 字段级 Diff
    • 6.3 存储结构
  • 七、异常标记与处理
  • 八、查询应用:版本历史与变更追溯
    • 8.1 某品牌的版本历史
    • 8.2 某字段的变更时间线
    • 8.3 近期发生变更的品牌列表
  • 九、工程实践要点
  • 十、结语
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档