ETL(Extract 抽取、Transform 转换、Load 加载)是数据工程的核心,Python 是构建轻量、高效数据管道的最佳工具。
本文带你从零搭建可直接运行的实战 ETL 数据管道,涵盖:
首先安装依赖库:
pip install pandas requests我们构建一个用户订单数据处理管道,实现:
import pandas as pd
import requests
import sqlite3
from datetime import datetime
# ==========================================
# E - Extract 数据抽取
# ==========================================
def extract_users_from_csv(file_path: str) -> pd.DataFrame:
"""从 CSV 文件抽取用户数据"""
try:
df = pd.read_csv(file_path)
print(f"✅ 成功抽取用户数据:{len(df)} 条")
return df
except Exception as e:
print(f"❌ 用户数据抽取失败:{str(e)}")
return pd.DataFrame()
def extract_orders_from_api() -> pd.DataFrame:
"""从公共 API 抽取订单数据(模拟真实接口)"""
# 这里使用免费模拟 API,你可以替换为真实接口
url = "https://jsonplaceholder.typicode.com/orders"
try:
response = requests.get(url, timeout=10)
response.raise_for_status() # 抛出请求异常
df = pd.DataFrame(response.json())
print(f"✅ 成功抽取订单数据:{len(df)} 条")
return df
except Exception as e:
print(f"❌ 订单数据抽取失败:{str(e)}")
return pd.DataFrame()
# ==========================================
# T - Transform 数据转换与清洗
# ==========================================
def transform_data(users_df: pd.DataFrame, orders_df: pd.DataFrame) -> pd.DataFrame:
"""数据清洗、转换、关联"""
# 1. 清洗用户数据
users_df = users_df.drop_duplicates(subset=["user_id"]) # 去重
users_df = users_df.dropna(subset=["user_id", "email"]) # 删除关键字段空值
users_df["email"] = users_df["email"].str.lower() # 邮箱统一小写
# 2. 清洗订单数据
orders_df = orders_df.drop_duplicates(subset=["order_id"])
orders_df = orders_df.dropna(subset=["order_id", "user_id", "amount"])
orders_df["order_date"] = pd.to_datetime(orders_df["order_date"], errors="coerce") # 日期格式化
# 3. 关联用户 + 订单数据(内连接)
merged_df = pd.merge(
users_df, orders_df,
on="user_id", how="inner"
)
# 4. 新增计算字段
merged_df["data_process_time"] = datetime.now()
merged_df["amount"] = merged_df["amount"].round(2) # 金额保留两位小数
print(f"✅ 数据转换完成:最终 {len(merged_df)} 条有效数据")
return merged_df
# ==========================================
# L - Load 数据加载
# ==========================================
def load_to_database(df: pd.DataFrame, db_name: str = "etl_pipeline.db"):
"""将清洗后的数据加载到 SQLite 数据库"""
try:
with sqlite3.connect(db_name) as conn:
# 替换表(可改为 append 追加)
df.to_sql(
name="user_orders",
con=conn,
if_exists="replace",
index=False
)
print(f"✅ 数据成功加载到 {db_name} → user_orders 表")
except Exception as e:
print(f"❌ 数据加载失败:{str(e)}")
# ==========================================
# 主管道:串联 ETL 流程
# ==========================================
def etl_pipeline():
print("===== 开始执行 ETL 数据管道 =====")
# 抽取
users = extract_users_from_csv("users.csv")
orders = extract_orders_from_api()
if users.empty or orders.empty:
print("❌ 原始数据为空,终止管道")
return
# 转换
cleaned_data = transform_data(users, orders)
# 加载
load_to_database(cleaned_data)
print("===== ETL 管道执行完成 =====")
if __name__ == "__main__":
etl_pipeline()在代码同目录创建 users.csv:
user_id,name,email,age,country
1,张三,zhangsan@test.com,28,中国
2,李四,lisi@TEST.COM,32,美国
3,王五,,25,中国
1,张三,zhangsan@test.com,28,中国
4,赵六,zhaoliu@test.com,30,英国pd.read_csv() 支持 CSV/Excel/JSON 等几乎所有文件格式requests 调用 API 获取 JSON 数据包含企业级数据清洗操作:
drop_duplicates()dropna() / fillna()merge() 多表拼接replace:覆盖(测试用)append:追加(生产用)fail:存在则失败import logging
logging.basicConfig(level=logging.INFO)
logging.info("数据抽取成功")# 只拉取昨天的订单
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
orders_df = orders_df[orders_df["order_date"] >= yesterday]# 关键指标校验,不通过则报警
assert merged_df["amount"].min() >= 0, "订单金额不能为负数"schedule 库原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。