前言
AI 已经慢慢成为工作的一部分,尤其在 IT 行业,利用 AI 写代码已是日常。但 AI 写的代码就完全可信、安全吗?传统的 SCA 扫描面对 AI 编程场景,是否有更好的解决方案?
最近看到腾讯的一篇文章《幽灵依赖:Agentic Coding 范式下的新型供应链安全威胁》,文中提出了一个有意思的思路——将安全检测左移至 Coding Agent 产生决策的瞬间,在 AI 真正执行依赖操作之前就完成拦截。
但是因为其插件是基于CodeBuddy Code,因此本文就基于这个思路,结合 Claude Code 的 Hook 机制和 OpenSCA,实现一个在 AI 写代码时实时拦截高危依赖的方案。
Claude Code Hook机制
Claude Code提供了完善的Hook机制,其中PreToolUse符合本文的要求,也就是在AI输出方案但是还没产生文件前,安全介入:

然后利用Exit code 2来实现阻止当前Claude Code的操作,那么思路就很清晰了,只要通过Hook PreToolUse,判断当出现高危组件时则终止任务。
Hook具体操作
首先在Claude Code的全局配置文件~/.claude/settings.json 中添加hook操作:
{
"hooks": {
"PreToolUse": [
{
"matcher": "Write",
"hooks": [
{
"type": "command",
"command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py"
}
]
},
{
"matcher": "Edit",
"hooks": [
{
"type": "command",
"command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py"
}
]
},
{
"matcher": "Bash",
"hooks": [
{
"type": "command",
"command": "python3 ~/.claude/hooks/opensca_guard/opensca_guard.py"
}
]
}
]
}
}从几个方面来进行Hook来保证全面性:
Edit
Write
BASH然后编写Hook代码,具体代码如下:
可访问 https://github.com/Jumbo-WJB/ai-code-guard 获取访问代码。
#!/usr/bin/env python3
import sys
import json
import os
import re
import subprocess
import tempfile
import shutil
from datetime import datetime
from typing import Optional, Tuple
# ── 配置区 ───────────────────────────────────────────────────────────────────
OPENSCA_CLI = os.path.expanduser("~/Downloads/opensca-cli-v3.0.9-darwin-arm64/opensca-cli")
OPENSCA_TOKEN = "your-token-here"
OPENSCA_PROJ = "claude-hook-scan"
BLOCK_LEVEL = 2 # 1=Critical, 2=High, 3=Medium, 4=Low
LOG_FILE = os.path.expanduser("~/.claude/hooks/opensca_guard/opensca_guard.log")
DEPENDENCY_FILES = {
# Python - Pip
"requirements.txt",
"requirements.in",
"requirements-dev.txt",
"requirements-prod.txt",
"setup.py",
"Pipfile",
"Pipfile.lock",
# JavaScript - Npm
"package.json",
"package-lock.json",
"yarn.lock",
# Java - Maven
"pom.xml",
# Java - Gradle
".gradle",
".gradle.kts",
# PHP - Composer
"composer.json",
"composer.lock",
# Ruby - gem
"Gemfile",
"Gemfile.lock",
# Golang - gomod
"go.mod",
"go.sum",
"Gopkg.toml",
"Gopkg.lock",
# Rust - cargo
"Cargo.toml",
"Cargo.lock",
# Erlang - Rebar
"rebar.lock",
"rebar.config",
}
INSTALL_CMD_PATTERNS = [
# Python
r"\bpip3?\s+install\b",
r"\bpipenv\s+install\b",
# JavaScript
r"\bnpm\s+install\b",
r"\bnpm\s+i\b",
r"\byarn\s+add\b",
r"\bpnpm\s+add\b",
r"\bpnpm\s+install\b",
# Java
r"\bmvn\s+install\b",
r"\bmvn\s+package\b",
r"\bgradle\s+build\b",
r"\bgradle\s+dependencies\b",
# PHP
r"\bcomposer\s+require\b",
r"\bcomposer\s+install\b",
# Ruby
r"\bbundle\s+install\b",
r"\bgem\s+install\b",
# Golang
r"\bgo\s+get\b",
r"\bgo\s+install\b",
# Rust
r"\bcargo\s+add\b",
r"\bcargo\s+install\b",
r"\bcargo\s+build\b",
# Erlang
r"\brebar3?\s+get-deps\b",
r"\brebar3?\s+compile\b",
]
LEVEL_NAMES = {1: "Critical", 2: "High", 3: "Medium", 4: "Low"}
# ── 日志 ─────────────────────────────────────────────────────────────────────
def wlog(level: str, msg: str):
with open(LOG_FILE, "a", encoding="utf-8") as f:
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
f.write(f"[{ts}][{level}] {msg}\n")
def log(msg: str):
print(f"[OpenSCA Guard] {msg}", file=sys.stderr)
# ── 判断逻辑 ─────────────────────────────────────────────────────────────────
def is_dependency_file(path: str) -> bool:
return os.path.basename(path) in DEPENDENCY_FILES
def is_install_command(command: str) -> bool:
return any(re.search(p, command, re.IGNORECASE) for p in INSTALL_CMD_PATTERNS)
# ── 各生态 Bash 命令包名提取 ──────────────────────────────────────────────────
def extract_reqs_from_pip_command(command: str) -> list:
"""pip install django==4.2.7 requests -> ["django==4.2.7", "requests"]"""
reqs = []
m = re.search(r"pip3?\s+install\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
if not m:
return reqs
for arg in m.group(1).split():
arg = arg.strip()
if not arg or arg.startswith("-") or arg.endswith((".txt", ".cfg")):
continue
reqs.append(arg)
return reqs
def extract_reqs_from_npm_command(command: str) -> list:
"""npm install express@4.18.0 -> ["express@4.18.0"]"""
reqs = []
m = re.search(r"npm\s+(?:install|i)\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
if not m:
return reqs
for arg in m.group(1).split():
arg = arg.strip()
if not arg or arg.startswith("-"):
continue
reqs.append(arg)
return reqs
def extract_reqs_from_yarn_command(command: str) -> list:
"""yarn add express@4.18.0 -> ["express@4.18.0"]"""
reqs = []
m = re.search(r"yarn\s+add\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
if not m:
return reqs
for arg in m.group(1).split():
arg = arg.strip()
if not arg or arg.startswith("-"):
continue
reqs.append(arg)
return reqs
def extract_reqs_from_go_command(command: str) -> list:
"""go get github.com/gin-gonic/gin@v1.9.0 -> ["github.com/gin-gonic/gin@v1.9.0"]"""
reqs = []
m = re.search(r"go\s+(?:get|install)\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
if not m:
return reqs
for arg in m.group(1).split():
arg = arg.strip()
if not arg or arg.startswith("-"):
continue
reqs.append(arg)
return reqs
def extract_reqs_from_cargo_command(command: str) -> list:
"""cargo add serde@1.0 -> ["serde@1.0"]"""
reqs = []
m = re.search(r"cargo\s+add\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
if not m:
return reqs
for arg in m.group(1).split():
arg = arg.strip()
if not arg or arg.startswith("-"):
continue
reqs.append(arg)
return reqs
def extract_reqs_from_composer_command(command: str) -> list:
"""composer require laravel/framework:^10.0 -> ["laravel/framework:^10.0"]"""
reqs = []
m = re.search(r"composer\s+require\s+(.+?)(\s*&&|\s*;|\s*\|\||\s*$)", command)
if not m:
return reqs
for arg in m.group(1).split():
arg = arg.strip()
if not arg or arg.startswith("-"):
continue
reqs.append(arg)
return reqs
# ── Bash 提取器路由表(必须在所有 extract 函数定义之后)─────────────────────
def _fmt_pip(pkgs: list) -> str:
return "\n".join(pkgs)
def _fmt_npm(pkgs: list) -> str:
deps = {}
for p in pkgs:
if "@" in p and not p.startswith("@"):
name, ver = p.split("@", 1)
elif p.startswith("@") and p.count("@") == 2:
# 处理 @scope/pkg@version 格式
parts = p.rsplit("@", 1)
name, ver = parts[0], parts[1]
else:
name, ver = p, "latest"
deps[name] = ver
return json.dumps({"dependencies": deps}, indent=2)
def _fmt_go(pkgs: list) -> str:
requires = "\n".join(f"\t{p}" for p in pkgs)
return f"module scan\n\ngo 1.21\n\nrequire (\n{requires}\n)"
def _fmt_cargo(pkgs: list) -> str:
lines = ["[dependencies]"]
for p in pkgs:
if "@" in p:
name, ver = p.split("@", 1)
lines.append(f'{name} = "{ver}"')
else:
lines.append(f'{p} = "*"')
return "\n".join(lines)
def _fmt_composer(pkgs: list) -> str:
deps = {}
for p in pkgs:
if ":" in p:
name, ver = p.split(":", 1)
else:
name, ver = p, "*"
deps[name] = ver
return json.dumps({"require": deps}, indent=2)
BASH_EXTRACTORS = [
(r"\bpip3?\s+install\b", extract_reqs_from_pip_command, "requirements.txt", _fmt_pip),
(r"\bpipenv\s+install\b", extract_reqs_from_pip_command, "requirements.txt", _fmt_pip),
(r"\bnpm\s+(?:install|i)\b", extract_reqs_from_npm_command, "package.json", _fmt_npm),
(r"\byarn\s+add\b", extract_reqs_from_yarn_command, "package.json", _fmt_npm),
(r"\bgo\s+(?:get|install)\b", extract_reqs_from_go_command, "go.mod", _fmt_go),
(r"\bcargo\s+add\b", extract_reqs_from_cargo_command, "Cargo.toml", _fmt_cargo),
(r"\bcomposer\s+require\b", extract_reqs_from_composer_command, "composer.json", _fmt_composer),
]
# ── 核心:处理三种工具的不同字段结构 ─────────────────────────────────────────
def get_scan_target(tool_name: str, tool_input: dict) -> Optional[Tuple[str, str]]:
"""
返回 (filename, content_to_scan) 或 None
三种工具的字段结构:
- Write : {"file_path": "...", "content": "..."}
- Edit : {"file_path": "...", "old_string": "...", "new_string": "..."}
- Bash : {"command": "pip install django==4.2.7"}
"""
# ── Write ────────────────────────────────────────────────
if tool_name == "Write":
path = tool_input.get("file_path", "")
content = tool_input.get("content", "")
if not is_dependency_file(path):
wlog("DEBUG", f"Write 非依赖文件,跳过: {path}")
return None
wlog("INFO", f"Write 依赖文件: {path}")
return os.path.basename(path), content
# ── Edit ─────────────────────────────────────────────────
elif tool_name == "Edit":
path = tool_input.get("file_path", "")
old_string = tool_input.get("old_string", "")
new_string = tool_input.get("new_string", "")
if not is_dependency_file(path):
wlog("DEBUG", f"Edit 非依赖文件,跳过: {path}")
return None
wlog("INFO", f"Edit 依赖文件: {path}")
wlog("DEBUG", f"old_string: {old_string}")
wlog("DEBUG", f"new_string: {new_string}")
full_content = get_post_edit_content(path, old_string, new_string)
wlog("DEBUG", f"合并后完整内容:\n{full_content}")
return os.path.basename(path), full_content
# ── Bash ─────────────────────────────────────────────────
elif tool_name == "Bash":
command = tool_input.get("command", "")
for pattern, extractor, filename, formatter in BASH_EXTRACTORS:
if re.search(pattern, command, re.IGNORECASE):
pkgs = extractor(command)
if not pkgs:
wlog("WARN", f"未解析出包名: {command}")
return None
content = formatter(pkgs)
wlog("INFO", f"Bash 解析出包: {pkgs} -> {filename}")
return filename, content
wlog("DEBUG", f"Bash 非安装命令,跳过: {command[:80]}")
return None
return None
def get_post_edit_content(file_path: str, old_string: str, new_string: str) -> str:
"""读取磁盘原文件,将 old_string 替换为 new_string,返回替换后的完整内容"""
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as f:
original = f.read()
if old_string in original:
return original.replace(old_string, new_string, 1)
else:
wlog("WARN", "old_string 未在原文件中找到,使用 new_string 作为扫描内容")
return new_string
else:
wlog("INFO", "原文件不存在,使用 new_string 作为扫描内容")
return new_string
# ── OpenSCA 扫描 ──────────────────────────────────────────────────────────────
def run_opensca_scan(filename: str, content: str) -> Optional[dict]:
tmpdir = tempfile.mkdtemp(prefix="opensca_hook_")
try:
target_path = os.path.join(tmpdir, filename)
with open(target_path, "w", encoding="utf-8") as f:
f.write(content)
wlog("INFO", f"临时扫描目录: {tmpdir}")
result_path = os.path.join(tmpdir, "result.json")
cmd = [
OPENSCA_CLI,
"-token", OPENSCA_TOKEN,
"-proj", OPENSCA_PROJ,
"-path", tmpdir,
"-out", result_path,
]
wlog("INFO", f"执行命令: {' '.join(cmd)}")
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60,
cwd=tmpdir, # 避免 opensca.log 等产物污染项目目录
)
wlog("DEBUG", f"opensca stdout: {proc.stdout[:200]}")
wlog("DEBUG", f"opensca stderr: {proc.stderr[:200]}")
if not os.path.exists(result_path):
wlog("WARN", "未生成 result.json")
return None
with open(result_path, "r", encoding="utf-8") as f:
result = json.load(f)
# 检测云端返回的错误信息
task_error = result.get("task_info", {}).get("error", "")
if task_error:
wlog("WARN", f"opensca 云端报错: {task_error}")
wlog("INFO", "扫描结果解析成功")
wlog("DEBUG", f"result.json 原始内容: {json.dumps(result)[:500]}")
return result
except subprocess.TimeoutExpired:
wlog("WARN", "opensca-cli 超时")
return None
except Exception as e:
wlog("ERROR", f"扫描异常: {e}")
return None
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
# ── 漏洞解析 ─────────────────────────────────────────────────────────────────
def collect_vulnerabilities(scan_result: dict) -> list:
vulns = []
def walk(node: dict):
pkg_name = node.get("name", "unknown")
pkg_version = node.get("version", "unknown")
for v in node.get("vulnerabilities", []):
level_id = v.get("security_level_id", 4)
vulns.append({
"pkg_name": pkg_name,
"pkg_version": pkg_version,
"vuln_name": v.get("name", ""),
"cve_id": v.get("cve_id", ""),
"level_id": level_id,
"level_name": LEVEL_NAMES.get(level_id, f"Level{level_id}"),
"suggestion": v.get("suggestion", ""),
})
for child in node.get("children", []):
walk(child)
walk(scan_result)
return vulns
def build_block_message(vulns_to_block: list, all_vulns: list) -> str:
lines = [
"=" * 60,
"🚨 供应链安全扫描发现高危漏洞,已阻断本次依赖操作",
"=" * 60,
f"\n共发现 {len(all_vulns)} 个漏洞,其中 {len(vulns_to_block)} 个达到阻断阈值(Critical/High):\n",
]
pkg_map = {}
for v in vulns_to_block:
key = f"{v['pkg_name']}=={v['pkg_version']}"
pkg_map.setdefault(key, []).append(v)
for pkg_key, pkg_vulns in pkg_map.items():
lines.append(f"📦 {pkg_key}")
for v in pkg_vulns:
lines.append(f" [{v['level_name']}] {v['cve_id']} — {v['vuln_name']}")
ver_match = re.search(r"(\d+\.\d+[\.\d]*)", v["suggestion"])
if ver_match:
lines.append(f" 💡 建议升级至: {ver_match.group(1)} 或更高版本")
lines.append("")
lines += ["─" * 60, "⚡ 请将依赖修改为安全版本后重新执行。", "=" * 60]
return "\n".join(lines)
# ── 主流程 ───────────────────────────────────────────────────────────────────
def main():
wlog("INFO", "=" * 50)
wlog("INFO", "Hook 被触发!")
raw = sys.stdin.read()
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
wlog("ERROR", f"JSON 解析失败: {e}")
sys.exit(0)
tool_name = data.get("tool_name", "")
tool_input = data.get("tool_input", {})
wlog("INFO", f"tool_name={tool_name}")
wlog("INFO", f"file_path={tool_input.get('file_path') or tool_input.get('command', '')[:80]}")
scan_target = get_scan_target(tool_name, tool_input)
if scan_target is None:
sys.exit(0)
filename, content = scan_target
wlog("INFO", f"开始扫描: {filename}")
scan_result = run_opensca_scan(filename, content)
if scan_result is None:
wlog("WARN", "扫描无结果,放行")
sys.exit(0)
all_vulns = collect_vulnerabilities(scan_result)
vulns_to_block = [v for v in all_vulns if v["level_id"] <= BLOCK_LEVEL]
wlog("INFO", f"漏洞总数={len(all_vulns)}, 阻断数={len(vulns_to_block)}")
if not vulns_to_block:
wlog("INFO", "✅ 无高危漏洞,放行")
sys.exit(0)
wlog("WARN", f"🚨 阻断!包: {set(v['pkg_name'] for v in vulns_to_block)}")
msg = build_block_message(vulns_to_block, all_vulns)
print(msg, file=sys.stderr)
sys.exit(2)
if __name__ == "__main__":
main()实现效果
当写入低版本的、存在漏洞的依赖包时,会触发Hook终止当前任务,并展示存在漏洞的依赖包和修复方案:


总结
AI 编程工具的普及极大提升了开发效率,但也带来了新的安全盲区——当 AI 开始替我们引入第三方依赖时,传统的事后扫描已经跟不上节奏。
本文基于 Claude Code 的 Hook 机制,结合 OpenSCA 实现了一套实时供应链安全拦截方案,核心思路是将安全检测的时机前移:
1.不等代码提交,在 AI 修改依赖文件的瞬间触发扫描
2.不等人工审查,由 Hook 自动完成漏洞识别与阻断
3.不放过任何入口,无论是直接写文件还是执行安装命令,均在拦截范围之内
整个方案的本质,是把安全审计员的角色嵌入到 Coding Agent 的决策链路中,让 AI 在"想引入一个有漏洞的依赖"时,在执行前就被叫停。