首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Python 数据管道构建:ETL 与数据处理实战

Python 数据管道构建:ETL 与数据处理实战

原创
作者头像
小丹资料
发布2026-04-22 20:39:03
发布2026-04-22 20:39:03
1440
举报

Python 数据管道构建:ETL 与数据处理实战

ETL(Extract 抽取、Transform 转换、Load 加载)是数据工程的核心,Python 是构建轻量、高效数据管道的最佳工具

本文带你从零搭建可直接运行的实战 ETL 数据管道,涵盖:

  • 数据源:CSV + 模拟 API
  • 处理库:Pandas(核心)、Requests
  • 目标库:SQLite(本地数据库)
  • 完整流程:抽取 → 清洗 → 转换 → 校验 → 加载

一、环境准备

首先安装依赖库:

代码语言:bash
复制
pip install pandas requests

二、ETL 管道完整实战代码

我们构建一个用户订单数据处理管道,实现:

  1. 从 CSV 抽取用户数据
  2. 从 API 抽取订单数据
  3. 数据清洗(去重、空值、格式修正)
  4. 数据关联转换
  5. 加载到数据库
代码语言:python
复制
import pandas as pd
import requests
import sqlite3
from datetime import datetime

# ==========================================
# E - Extract 数据抽取
# ==========================================
def extract_users_from_csv(file_path: str) -> pd.DataFrame:
    """从 CSV 文件抽取用户数据"""
    try:
        df = pd.read_csv(file_path)
        print(f"✅ 成功抽取用户数据:{len(df)} 条")
        return df
    except Exception as e:
        print(f"❌ 用户数据抽取失败:{str(e)}")
        return pd.DataFrame()

def extract_orders_from_api() -> pd.DataFrame:
    """从公共 API 抽取订单数据(模拟真实接口)"""
    # 这里使用免费模拟 API,你可以替换为真实接口
    url = "https://jsonplaceholder.typicode.com/orders"
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()  # 抛出请求异常
        df = pd.DataFrame(response.json())
        print(f"✅ 成功抽取订单数据:{len(df)} 条")
        return df
    except Exception as e:
        print(f"❌ 订单数据抽取失败:{str(e)}")
        return pd.DataFrame()

# ==========================================
# T - Transform 数据转换与清洗
# ==========================================
def transform_data(users_df: pd.DataFrame, orders_df: pd.DataFrame) -> pd.DataFrame:
    """数据清洗、转换、关联"""
    # 1. 清洗用户数据
    users_df = users_df.drop_duplicates(subset=["user_id"])  # 去重
    users_df = users_df.dropna(subset=["user_id", "email"])  # 删除关键字段空值
    users_df["email"] = users_df["email"].str.lower()  # 邮箱统一小写

    # 2. 清洗订单数据
    orders_df = orders_df.drop_duplicates(subset=["order_id"])
    orders_df = orders_df.dropna(subset=["order_id", "user_id", "amount"])
    orders_df["order_date"] = pd.to_datetime(orders_df["order_date"], errors="coerce")  # 日期格式化

    # 3. 关联用户 + 订单数据(内连接)
    merged_df = pd.merge(
        users_df, orders_df,
        on="user_id", how="inner"
    )

    # 4. 新增计算字段
    merged_df["data_process_time"] = datetime.now()
    merged_df["amount"] = merged_df["amount"].round(2)  # 金额保留两位小数

    print(f"✅ 数据转换完成:最终 {len(merged_df)} 条有效数据")
    return merged_df

# ==========================================
# L - Load 数据加载
# ==========================================
def load_to_database(df: pd.DataFrame, db_name: str = "etl_pipeline.db"):
    """将清洗后的数据加载到 SQLite 数据库"""
    try:
        with sqlite3.connect(db_name) as conn:
            # 替换表(可改为 append 追加)
            df.to_sql(
                name="user_orders",
                con=conn,
                if_exists="replace",
                index=False
            )
        print(f"✅ 数据成功加载到 {db_name} → user_orders 表")
    except Exception as e:
        print(f"❌ 数据加载失败:{str(e)}")

# ==========================================
# 主管道:串联 ETL 流程
# ==========================================
def etl_pipeline():
    print("===== 开始执行 ETL 数据管道 =====")
    
    # 抽取
    users = extract_users_from_csv("users.csv")
    orders = extract_orders_from_api()
    
    if users.empty or orders.empty:
        print("❌ 原始数据为空,终止管道")
        return

    # 转换
    cleaned_data = transform_data(users, orders)
    
    # 加载
    load_to_database(cleaned_data)
    
    print("===== ETL 管道执行完成 =====")

if __name__ == "__main__":
    etl_pipeline()

三、配套测试数据(users.csv)

在代码同目录创建 users.csv

代码语言:csv
复制
user_id,name,email,age,country
1,张三,zhangsan@test.com,28,中国
2,李四,lisi@TEST.COM,32,美国
3,王五,,25,中国
1,张三,zhangsan@test.com,28,中国
4,赵六,zhaoliu@test.com,30,英国

四、核心 ETL 步骤详解

1. Extract(抽取)

  • 文件抽取pd.read_csv() 支持 CSV/Excel/JSON 等几乎所有文件格式
  • 接口抽取requests 调用 API 获取 JSON 数据
  • 容错处理:捕获异常,避免管道崩溃

2. Transform(转换)—— 最核心环节

包含企业级数据清洗操作

  • 去重:drop_duplicates()
  • 缺失值处理:dropna() / fillna()
  • 格式统一:大小写、日期、数值精度
  • 数据关联:merge() 多表拼接
  • 字段计算:新增业务字段

3. Load(加载)

  • 轻量场景:SQLite(无需安装服务)
  • 企业场景:MySQL、PostgreSQL、Hive、Doris
  • 加载模式:
    • replace:覆盖(测试用)
    • append:追加(生产用)
    • fail:存在则失败

五、生产级优化(必看)

1. 日志替代 print

代码语言:python
复制
import logging
logging.basicConfig(level=logging.INFO)
logging.info("数据抽取成功")

2. 增量抽取(避免全量拉取)

代码语言:python
复制
# 只拉取昨天的订单
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
orders_df = orders_df[orders_df["order_date"] >= yesterday]

3. 数据校验

代码语言:python
复制
# 关键指标校验,不通过则报警
assert merged_df["amount"].min() >= 0, "订单金额不能为负数"

4. 定时调度

  • 轻量:schedule
  • 企业:Airflow / Prefect

六、扩展方向

  1. 大数据场景:替换 Pandas 为 PySpark
  2. 实时数据:结合 Kafka + Flink
  3. 可视化:对接 Metabase / Tableau
  4. 云平台:AWS Glue / Azure Data Factory

总结

  1. 这套 ETL 管道开箱即用,直接复制代码 + CSV 即可运行
  2. 核心流程:抽取 → 清洗 → 转换 → 校验 → 加载
  3. Python + Pandas 是轻量数据管道的最优组合
  4. 生产环境只需增加日志、增量、调度即可直接上线

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Python 数据管道构建:ETL 与数据处理实战
    • 一、环境准备
    • 二、ETL 管道完整实战代码
    • 三、配套测试数据(users.csv)
    • 四、核心 ETL 步骤详解
      • 1. Extract(抽取)
      • 2. Transform(转换)—— 最核心环节
      • 3. Load(加载)
    • 五、生产级优化(必看)
      • 1. 日志替代 print
      • 2. 增量抽取(避免全量拉取)
      • 3. 数据校验
      • 4. 定时调度
    • 六、扩展方向
      • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档