首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >当 AI 写代码开始帮你引入第三方库,供应链安全该怎么守

当 AI 写代码开始帮你引入第三方库,供应链安全该怎么守

作者头像
Jumbo
发布2026-03-31 14:48:51
发布2026-03-31 14:48:51
1080
举报
文章被收录于专栏:中国白客联盟中国白客联盟

前言

AI 已经慢慢成为工作的一部分,尤其在 IT 行业,利用 AI 写代码已是日常。但 AI 写的代码就完全可信、安全吗?传统的 SCA 扫描面对 AI 编程场景,是否有更好的解决方案?

最近看到腾讯的一篇文章《幽灵依赖:Agentic Coding 范式下的新型供应链安全威胁》,文中提出了一个有意思的思路——将安全检测左移至 Coding Agent 产生决策的瞬间,在 AI 真正执行依赖操作之前就完成拦截。

但是因为其插件是基于CodeBuddy Code,因此本文就基于这个思路,结合 Claude Code 的 Hook 机制和 OpenSCA,实现一个在 AI 写代码时实时拦截高危依赖的方案。

Claude Code Hook机制

Claude Code提供了完善的Hook机制,其中PreToolUse符合本文的要求,也就是在AI输出方案但是还没产生文件前,安全介入:

然后利用Exit code 2来实现阻止当前Claude Code的操作,那么思路就很清晰了,只要通过Hook PreToolUse,判断当出现高危组件时则终止任务。

Hook具体操作

首先在Claude Code的全局配置文件~/.claude/settings.json 中添加hook操作:

代码语言:javascript
复制
{
  "hooks": {
    "PreToolUse": [
      {
        "matcher": "Write",
        "hooks": [
          {
            "type": "command",
            "command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py"
          }
        ]
      },
      {
        "matcher": "Edit",
        "hooks": [
          {
            "type": "command",
            "command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py"
          }
        ]
      },
      {
        "matcher": "Bash",
        "hooks": [
          {
            "type": "command",
            "command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py"
          }
        ]
      }
    ]
  }
}

从几个方面来进行Hook来保证全面性:

代码语言:javascript
复制
Edit
Write
BASH

然后编写Hook代码,具体代码如下:

可访问 https://github.com/Jumbo-WJB/ai-code-guard 获取访问代码。

代码语言:javascript
复制
#!/usr/bin/env python3
import sys
import json
import os
import re
import subprocess
import tempfile
import shutil
from datetime import datetime
from typing import Optional, Tuple
# ── 配置区 ───────────────────────────────────────────────────────────────────
OPENSCA_CLI   = os.path.expanduser("~/Downloads/opensca-cli-v3.0.9-darwin-arm64/opensca-cli")
OPENSCA_TOKEN = "your-token-here"
OPENSCA_PROJ  = "claude-hook-scan"
BLOCK_LEVEL   = 2  # 1=Critical, 2=High, 3=Medium, 4=Low
LOG_FILE      = os.path.expanduser("~/.claude/hooks/opensca_guard/opensca_guard.log")
DEPENDENCY_FILES = {
    # Python - Pip
    "requirements.txt",
    "requirements.in",
    "requirements-dev.txt",
    "requirements-prod.txt",
    "setup.py",
    "Pipfile",
    "Pipfile.lock",
    # JavaScript - Npm
    "package.json",
    "package-lock.json",
    "yarn.lock",
    # Java - Maven
    "pom.xml",
    # Java - Gradle
    ".gradle",
    ".gradle.kts",
    # PHP - Composer
    "composer.json",
    "composer.lock",
    # Ruby - gem
    "Gemfile",
    "Gemfile.lock",
    # Golang - gomod
    "go.mod",
    "go.sum",
    "Gopkg.toml",
    "Gopkg.lock",
    # Rust - cargo
    "Cargo.toml",
    "Cargo.lock",
    # Erlang - Rebar
    "rebar.lock",
    "rebar.config",
}
INSTALL_CMD_PATTERNS = [
    # Python
    r"\bpip3?\s+install\b",
    r"\bpipenv\s+install\b",
    # JavaScript
    r"\bnpm\s+install\b",
    r"\bnpm\s+i\b",
    r"\byarn\s+add\b",
    r"\bpnpm\s+add\b",
    r"\bpnpm\s+install\b",
    # Java
    r"\bmvn\s+install\b",
    r"\bmvn\s+package\b",
    r"\bgradle\s+build\b",
    r"\bgradle\s+dependencies\b",
    # PHP
    r"\bcomposer\s+require\b",
    r"\bcomposer\s+install\b",
    # Ruby
    r"\bbundle\s+install\b",
    r"\bgem\s+install\b",
    # Golang
    r"\bgo\s+get\b",
    r"\bgo\s+install\b",
    # Rust
    r"\bcargo\s+add\b",
    r"\bcargo\s+install\b",
    r"\bcargo\s+build\b",
    # Erlang
    r"\brebar3?\s+get-deps\b",
    r"\brebar3?\s+compile\b",
]
LEVEL_NAMES = {1: "Critical", 2: "High", 3: "Medium", 4: "Low"}
# ── 日志 ─────────────────────────────────────────────────────────────────────
def wlog(level: str, msg: str):
    with open(LOG_FILE, "a", encoding="utf-8") as f:
        ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        f.write(f"[{ts}][{level}] {msg}\n")
def log(msg: str):
    print(f"[OpenSCA Guard] {msg}", file=sys.stderr)
# ── 判断逻辑 ─────────────────────────────────────────────────────────────────
def is_dependency_file(path: str) -> bool:
    return os.path.basename(path) in DEPENDENCY_FILES
def is_install_command(command: str) -> bool:
    return any(re.search(p, command, re.IGNORECASE) for p in INSTALL_CMD_PATTERNS)
# ── 各生态 Bash 命令包名提取 ──────────────────────────────────────────────────
def extract_reqs_from_pip_command(command: str) -> list:
    """pip install django==4.2.7 requests -> ["django==4.2.7", "requests"]"""
    reqs = []
    m = re.search(r"pip3?\s+install\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
    if not m:
        return reqs
    for arg in m.group(1).split():
        arg = arg.strip()
        if not arg or arg.startswith("-") or arg.endswith((".txt", ".cfg")):
            continue
        reqs.append(arg)
    return reqs
def extract_reqs_from_npm_command(command: str) -> list:
    """npm install express@4.18.0 -> ["express@4.18.0"]"""
    reqs = []
    m = re.search(r"npm\s+(?:install|i)\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
    if not m:
        return reqs
    for arg in m.group(1).split():
        arg = arg.strip()
        if not arg or arg.startswith("-"):
            continue
        reqs.append(arg)
    return reqs
def extract_reqs_from_yarn_command(command: str) -> list:
    """yarn add express@4.18.0 -> ["express@4.18.0"]"""
    reqs = []
    m = re.search(r"yarn\s+add\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
    if not m:
        return reqs
    for arg in m.group(1).split():
        arg = arg.strip()
        if not arg or arg.startswith("-"):
            continue
        reqs.append(arg)
    return reqs
def extract_reqs_from_go_command(command: str) -> list:
    """go get github.com/gin-gonic/gin@v1.9.0 -> ["github.com/gin-gonic/gin@v1.9.0"]"""
    reqs = []
    m = re.search(r"go\s+(?:get|install)\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
    if not m:
        return reqs
    for arg in m.group(1).split():
        arg = arg.strip()
        if not arg or arg.startswith("-"):
            continue
        reqs.append(arg)
    return reqs
def extract_reqs_from_cargo_command(command: str) -> list:
    """cargo add serde@1.0 -> ["serde@1.0"]"""
    reqs = []
    m = re.search(r"cargo\s+add\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
    if not m:
        return reqs
    for arg in m.group(1).split():
        arg = arg.strip()
        if not arg or arg.startswith("-"):
            continue
        reqs.append(arg)
    return reqs
def extract_reqs_from_composer_command(command: str) -> list:
    """composer require laravel/framework:^10.0 -> ["laravel/framework:^10.0"]"""
    reqs = []
    m = re.search(r"composer\s+require\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
    if not m:
        return reqs
    for arg in m.group(1).split():
        arg = arg.strip()
        if not arg or arg.startswith("-"):
            continue
        reqs.append(arg)
    return reqs
# ── Bash 提取器路由表(必须在所有 extract 函数定义之后)─────────────────────
def _fmt_pip(pkgs: list) -> str:
    return "\n".join(pkgs)
def _fmt_npm(pkgs: list) -> str:
    deps = {}
    for p in pkgs:
        if "@" in p and not p.startswith("@"):
            name, ver = p.split("@", 1)
        elif p.startswith("@") and p.count("@") == 2:
            # 处理 @scope/pkg@version 格式
            parts = p.rsplit("@", 1)
            name, ver = parts[0], parts[1]
        else:
            name, ver = p, "latest"
        deps[name] = ver
    return json.dumps({"dependencies": deps}, indent=2)
def _fmt_go(pkgs: list) -> str:
    requires = "\n".join(f"\t{p}" for p in pkgs)
    return f"module scan\n\ngo 1.21\n\nrequire (\n{requires}\n)"
def _fmt_cargo(pkgs: list) -> str:
    lines = ["[dependencies]"]
    for p in pkgs:
        if "@" in p:
            name, ver = p.split("@", 1)
            lines.append(f'{name} = "{ver}"')
        else:
            lines.append(f'{p} = "*"')
    return "\n".join(lines)
def _fmt_composer(pkgs: list) -> str:
    deps = {}
    for p in pkgs:
        if ":" in p:
            name, ver = p.split(":", 1)
        else:
            name, ver = p, "*"
        deps[name] = ver
    return json.dumps({"require": deps}, indent=2)
BASH_EXTRACTORS = [
    (r"\bpip3?\s+install\b",      extract_reqs_from_pip_command,      "requirements.txt", _fmt_pip),
    (r"\bpipenv\s+install\b",     extract_reqs_from_pip_command,      "requirements.txt", _fmt_pip),
    (r"\bnpm\s+(?:install|i)\b",  extract_reqs_from_npm_command,      "package.json",     _fmt_npm),
    (r"\byarn\s+add\b",           extract_reqs_from_yarn_command,     "package.json",     _fmt_npm),
    (r"\bgo\s+(?:get|install)\b", extract_reqs_from_go_command,       "go.mod",           _fmt_go),
    (r"\bcargo\s+add\b",          extract_reqs_from_cargo_command,    "Cargo.toml",       _fmt_cargo),
    (r"\bcomposer\s+require\b",   extract_reqs_from_composer_command, "composer.json",    _fmt_composer),
]
# ── 核心:处理三种工具的不同字段结构 ─────────────────────────────────────────
def get_scan_target(tool_name: str, tool_input: dict) -> Optional[Tuple[str, str]]:
    """
    返回 (filename, content_to_scan) 或 None
    三种工具的字段结构:
    - Write : {"file_path": "...", "content": "..."}
    - Edit  : {"file_path": "...", "old_string": "...", "new_string": "..."}
    - Bash  : {"command": "pip install django==4.2.7"}
    """
    # ── Write ────────────────────────────────────────────────
    if tool_name == "Write":
        path    = tool_input.get("file_path", "")
        content = tool_input.get("content", "")
        if not is_dependency_file(path):
            wlog("DEBUG", f"Write 非依赖文件,跳过: {path}")
            return None
        wlog("INFO", f"Write 依赖文件: {path}")
        return os.path.basename(path), content
    # ── Edit ─────────────────────────────────────────────────
    elif tool_name == "Edit":
        path       = tool_input.get("file_path", "")
        old_string = tool_input.get("old_string", "")
        new_string = tool_input.get("new_string", "")
        if not is_dependency_file(path):
            wlog("DEBUG", f"Edit 非依赖文件,跳过: {path}")
            return None
        wlog("INFO", f"Edit 依赖文件: {path}")
        wlog("DEBUG", f"old_string: {old_string}")
        wlog("DEBUG", f"new_string: {new_string}")
        full_content = get_post_edit_content(path, old_string, new_string)
        wlog("DEBUG", f"合并后完整内容:\n{full_content}")
        return os.path.basename(path), full_content
    # ── Bash ─────────────────────────────────────────────────
    elif tool_name == "Bash":
        command = tool_input.get("command", "")
        for pattern, extractor, filename, formatter in BASH_EXTRACTORS:
            if re.search(pattern, command, re.IGNORECASE):
                pkgs = extractor(command)
                if not pkgs:
                    wlog("WARN", f"未解析出包名: {command}")
                    return None
                content = formatter(pkgs)
                wlog("INFO", f"Bash 解析出包: {pkgs} -> {filename}")
                return filename, content
        wlog("DEBUG", f"Bash 非安装命令,跳过: {command[:80]}")
        return None
    return None
def get_post_edit_content(file_path: str, old_string: str, new_string: str) -> str:
    """读取磁盘原文件,将 old_string 替换为 new_string,返回替换后的完整内容"""
    if os.path.exists(file_path):
        with open(file_path, "r", encoding="utf-8") as f:
            original = f.read()
        if old_string in original:
            return original.replace(old_string, new_string, 1)
        else:
            wlog("WARN", "old_string 未在原文件中找到,使用 new_string 作为扫描内容")
            return new_string
    else:
        wlog("INFO", "原文件不存在,使用 new_string 作为扫描内容")
        return new_string
# ── OpenSCA 扫描 ──────────────────────────────────────────────────────────────
def run_opensca_scan(filename: str, content: str) -> Optional[dict]:
    tmpdir = tempfile.mkdtemp(prefix="opensca_hook_")
    try:
        target_path = os.path.join(tmpdir, filename)
        with open(target_path, "w", encoding="utf-8") as f:
            f.write(content)
        wlog("INFO", f"临时扫描目录: {tmpdir}")
        result_path = os.path.join(tmpdir, "result.json")
        cmd = [
            OPENSCA_CLI,
            "-token", OPENSCA_TOKEN,
            "-proj",  OPENSCA_PROJ,
            "-path",  tmpdir,
            "-out",   result_path,
        ]
        wlog("INFO", f"执行命令: {' '.join(cmd)}")
        proc = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=60,
            cwd=tmpdir,  # 避免 opensca.log 等产物污染项目目录
        )
        wlog("DEBUG", f"opensca stdout: {proc.stdout[:200]}")
        wlog("DEBUG", f"opensca stderr: {proc.stderr[:200]}")
        if not os.path.exists(result_path):
            wlog("WARN", "未生成 result.json")
            return None
        with open(result_path, "r", encoding="utf-8") as f:
            result = json.load(f)
        # 检测云端返回的错误信息
        task_error = result.get("task_info", {}).get("error", "")
        if task_error:
            wlog("WARN", f"opensca 云端报错: {task_error}")
        wlog("INFO", "扫描结果解析成功")
        wlog("DEBUG", f"result.json 原始内容: {json.dumps(result)[:500]}")
        return result
    except subprocess.TimeoutExpired:
        wlog("WARN", "opensca-cli 超时")
        return None
    except Exception as e:
        wlog("ERROR", f"扫描异常: {e}")
        return None
    finally:
        shutil.rmtree(tmpdir, ignore_errors=True)
# ── 漏洞解析 ─────────────────────────────────────────────────────────────────
def collect_vulnerabilities(scan_result: dict) -> list:
    vulns = []
    def walk(node: dict):
        pkg_name    = node.get("name", "unknown")
        pkg_version = node.get("version", "unknown")
        for v in node.get("vulnerabilities", []):
            level_id = v.get("security_level_id", 4)
            vulns.append({
                "pkg_name":    pkg_name,
                "pkg_version": pkg_version,
                "vuln_name":   v.get("name", ""),
                "cve_id":      v.get("cve_id", ""),
                "level_id":    level_id,
                "level_name":  LEVEL_NAMES.get(level_id, f"Level{level_id}"),
                "suggestion":  v.get("suggestion", ""),
            })
        for child in node.get("children", []):
            walk(child)
    walk(scan_result)
    return vulns
def build_block_message(vulns_to_block: list, all_vulns: list) -> str:
    lines = [
        "=" * 60,
        "🚨 供应链安全扫描发现高危漏洞,已阻断本次依赖操作",
        "=" * 60,
        f"\n共发现 {len(all_vulns)} 个漏洞,其中 {len(vulns_to_block)} 个达到阻断阈值(Critical/High):\n",
    ]
    pkg_map = {}
    for v in vulns_to_block:
        key = f"{v['pkg_name']}=={v['pkg_version']}"
        pkg_map.setdefault(key, []).append(v)
    for pkg_key, pkg_vulns in pkg_map.items():
        lines.append(f"📦 {pkg_key}")
        for v in pkg_vulns:
            lines.append(f"   [{v['level_name']}] {v['cve_id']} — {v['vuln_name']}")
            ver_match = re.search(r"(\d+\.\d+[\.\d]*)", v["suggestion"])
            if ver_match:
                lines.append(f"   💡 建议升级至: {ver_match.group(1)} 或更高版本")
        lines.append("")
    lines += ["─" * 60, "⚡ 请将依赖修改为安全版本后重新执行。", "=" * 60]
    return "\n".join(lines)
# ── 主流程 ───────────────────────────────────────────────────────────────────
def main():
    wlog("INFO", "=" * 50)
    wlog("INFO", "Hook 被触发!")
    raw = sys.stdin.read()
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as e:
        wlog("ERROR", f"JSON 解析失败: {e}")
        sys.exit(0)
    tool_name  = data.get("tool_name", "")
    tool_input = data.get("tool_input", {})
    wlog("INFO", f"tool_name={tool_name}")
    wlog("INFO", f"file_path={tool_input.get('file_path') or tool_input.get('command', '')[:80]}")
    scan_target = get_scan_target(tool_name, tool_input)
    if scan_target is None:
        sys.exit(0)
    filename, content = scan_target
    wlog("INFO", f"开始扫描: {filename}")
    scan_result = run_opensca_scan(filename, content)
    if scan_result is None:
        wlog("WARN", "扫描无结果,放行")
        sys.exit(0)
    all_vulns      = collect_vulnerabilities(scan_result)
    vulns_to_block = [v for v in all_vulns if v["level_id"] <= BLOCK_LEVEL]
    wlog("INFO", f"漏洞总数={len(all_vulns)}, 阻断数={len(vulns_to_block)}")
    if not vulns_to_block:
        wlog("INFO", "✅ 无高危漏洞,放行")
        sys.exit(0)
    wlog("WARN", f"🚨 阻断!包: {set(v['pkg_name'] for v in vulns_to_block)}")
    msg = build_block_message(vulns_to_block, all_vulns)
    print(msg, file=sys.stderr)
    sys.exit(2)
if __name__ == "__main__":
    main()

实现效果

当写入低版本的、存在漏洞的依赖包时,会触发Hook终止当前任务,并展示存在漏洞的依赖包和修复方案:

总结

AI 编程工具的普及极大提升了开发效率,但也带来了新的安全盲区——当 AI 开始替我们引入第三方依赖时,传统的事后扫描已经跟不上节奏。

本文基于 Claude Code 的 Hook 机制,结合 OpenSCA 实现了一套实时供应链安全拦截方案,核心思路是将安全检测的时机前移:

1.不等代码提交,在 AI 修改依赖文件的瞬间触发扫描

2.不等人工审查,由 Hook 自动完成漏洞识别与阻断

3.不放过任何入口,无论是直接写文件还是执行安装命令,均在拦截范围之内

整个方案的本质,是把安全审计员的角色嵌入到 Coding Agent 的决策链路中,让 AI 在"想引入一个有漏洞的依赖"时,在执行前就被叫停。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-03-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 中国白客联盟 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档