# 单条件筛选
critical_bugs = df[df['severity'] == 'Critical'] # 筛选严重缺陷
# 多条件筛选(& 表示 AND,| 表示 OR)
p0_failed = df[
(df['priority'] == 'P0') &
(df['last_result'] == 'Failed') # 筛选P0级失败用例
]
# 反选(~ 表示 NOT)
automated_cases = df[~df['is_automated']] # 筛选非自动化用例query方法(SQL风格)# 多条件组合(避免括号嵌套)
env_issues = df.query(
"environment == 'Production' and "
"severity in ['Critical', 'Blocker'] and "
"status != 'Closed'"
)loc[] 条件索引(条件+列选择)# 筛选指定列+条件
regression_data = df.loc[
(df['version'] == 'v2.3') &
(df['test_type'] == 'Regression'), # 行条件
['case_id', 'module', 'executor', 'result'] # 选择的列
]# 包含关键词(na=False处理空值)
timeout_issues = df[df['title'].str.contains('timeout', case=False, na=False)]
# 正则表达式匹配(如匹配特定错误码)
error_code_cases = df[df['error_log'].str.contains(r'ERR_\d{4}', regex=True)]# 设置时间列为datetime类型
df['create_time'] = pd.to_datetime(df['create_time'])
# 筛选最近3天的缺陷
recent_bugs = df[df['create_time'] > (pd.Timestamp.now() - pd.DateOffset(days=3))]import pandas as pd
# 读取测试报告
report = pd.read_excel("test_report.xlsx")
# 筛选今日失败的P1用例(带详细日志)
today_failures = report[
(report['exec_date'] == pd.Timestamp.today().date()) &
(report['result'] == 'Failed') &
(report['priority'].isin(['P1', 'P0']))
].loc[:, ['case_id', 'module', 'error_detail', 'screenshot']]
# 导出到新文件(用于缺陷创建)
today_failures.to_excel("daily_failures.xlsx", index=False)import pandas as pd
import pymysql
# 连接测试数据库
conn = pymysql.connect(host="qa-db", user="tester", password="****", database="bug_tracker")
# 直接SQL筛选(减少数据传输)
sql = """
SELECT bug_id, title, severity, status, repro_steps
FROM bugs
WHERE status = 'Open'
AND environment = 'iOS'
AND create_date > CURDATE() - INTERVAL 7 DAY
"""
ios_open_bugs = pd.read_sql(sql, conn)
# 本地二次筛选(复杂逻辑)
urgent_bugs = ios_open_bugs.query(
"severity == 'Critical' or "
"title.str.contains('crash', case=False)"
)
# 输出统计结果
print(f"紧急iOS缺陷: {len(urgent_bugs)}个")
print(urgent_bugs[['bug_id', 'title']])# 构建动态筛选条件
filters = {
'priority': 'P0',
'environment': ['Android', 'iOS'],
'status': ['Open', 'Reopen']
}
# 应用筛选
query_str = " and ".join([
f"{k} in {tuple(v)}"ifisinstance(v, list) elsef"{k}=='{v}'"
for k, v in filters.items()
])
filtered_data = df.query(query_str)# 关联两个数据源
cases = pd.read_excel("test_cases.xlsx")
bugs = pd.read_sql("SELECT * FROM bugs", conn)
# 筛选有未关闭缺陷的用例
bug_related_cases = cases.merge(
bugs[bugs['status'] != 'Closed'],
left_on='case_id',
right_on='related_case',
how='inner'
)# 按模块统计失败率
module_stats = report.groupby('module').agg(
total_cases=('case_id', 'count'),
failed_cases=('result', lambda x: (x == 'Failed').sum())
)
module_stats['fail_rate'] = module_stats['failed_cases'] / module_stats['total_cases']
# 筛选高风险模块
high_risk_modules = module_stats[module_stats['fail_rate'] > 0.2]# 1. 使用dtype指定类型(减少内存)
dtypes = {'case_id': 'int32', 'result': 'category', 'priority': 'category'}
df = pd.read_csv("large_report.csv", dtype=dtypes)
# 2. 分块筛选(GB级数据)
chunk_size = 50000
critical_bugs_chunks = []
for chunk in pd.read_sql("SELECT * FROM bugs", conn, chunksize=chunk_size):
critical_bugs_chunks.append(chunk[chunk['severity'] == 'Critical'])
critical_bugs = pd.concat(critical_bugs_chunks)
# 3. 使用eval()加速计算(100万+数据)
df = df.eval("is_urgent = (severity == 'Critical') & (status == 'Open')")
urgent_items = df[df['is_urgent']]关键原则:筛选前先缩小数据范围(用SQL条件或列选择)
#Python #Pandas筛选