一、文件处理基础
# 传统方式(需要手动关闭)
f = open('data.txt', 'r') # 打开文件
content = f.read() # 读取内容
f.close() # 关闭文件
# 问题:如果中间出错,文件可能不会被关闭with open('data.txt', 'r') as f:
content = f.read()
# 离开代码块自动关闭文件模式 | 描述 | 文件存在 | 文件不存在 |
|---|---|---|---|
'r' | 读取(默认) | 打开 | 错误 |
'w' | 写入 | 清空 | 创建 |
'a' | 追加 | 追加 | 创建 |
'x' | 独占创建 | 错误 | 创建 |
'b' | 二进制模式 | - | - |
't' | 文本模式(默认) | - | - |
'+' | 读写更新 | - | - |
具体区别可查询官网
https://docs.python.org/3.11/tutorial/inputoutput.html
# 逐行读取
with open('large.log', 'r') as f:
for line in f: # 文件对象是迭代器
process(line)
# 指定缓冲区大小
with open('huge.bin', 'rb', buffering=8192) as f: # 8KB缓冲区
while chunk := f.read(4096): # 每次读取4KB
process_chunk(chunk)with open('input.txt', 'r') as fin, open('output.txt', 'w') as fout:
for line in fin:
fout.write(line.upper())with open('data.bin', 'rb+') as f:
f.seek(1024) # 移动到1KB位置
header = f.read(100)
f.seek(-50, 2) # 从末尾向前移动50字节
footer = f.read(50)class SmartFileHandler:
def__init__(self, filename, mode='r', encoding='utf-8'):
self.filename = filename
self.mode = mode
self.encoding = encoding
self.file = None
self.open_time = None
def__enter__(self):
self.open_time = time.time()
try:
self.file = open(self.filename, self.mode, encoding=self.encoding)
except FileNotFoundError:
if'w'inself.mode or'a'inself.mode:
self.file = open(self.filename, 'w', encoding=self.encoding)
else:
raise
returnself.file
def__exit__(self, exc_type, exc_val, exc_tb):
ifself.file:
self.file.close()
elapsed = time.time() - self.open_time
print(f"文件 {self.filename} 已关闭,打开时长: {elapsed:.2f}秒")
# 处理异常
if exc_type isnotNone:
print(f"发生错误: {exc_val}")
# 返回True表示已处理异常,不再向上传播
returnTrue
# 使用示例
with SmartFileHandler('smart.log', 'w') as f:
f.write("自定义上下文管理器示例\n")
# 如果这里发生异常,会被捕获并处理from contextlib import contextmanager
import os
@contextmanager
defatomic_write(filepath, mode='w', encoding='utf-8'):
"""原子写入文件:要么完整写入,要么保留原文件"""
temp_path = f"{filepath}.tmp"
backup_path = f"{filepath}.bak"
try:
# 创建临时文件
withopen(temp_path, mode, encoding=encoding) as f:
yield f
# 写入成功:备份原文件并替换
if os.path.exists(filepath):
os.replace(filepath, backup_path)
os.replace(temp_path, filepath)
except Exception as e:
# 出错时清理临时文件
if os.path.exists(temp_path):
os.remove(temp_path)
raise e
finally:
# 清理备份文件(可选)
if os.path.exists(backup_path):
os.remove(backup_path)
# 使用示例
try:
with atomic_write('important.json') as f:
json.dump({"key": "value"}, f)
# 模拟写入失败
# raise IOError("磁盘错误")
except Exception as e:
print(f"写入失败: {e}")
else:
print("写入成功")import mmap
defprocess_large_file(filepath):
withopen(filepath, 'r+b') as f:
# 创建内存映射
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
# 像操作字符串一样操作文件内容
if mm.find(b'signature') != -1:
print("找到签名")
# 高效搜索
formatchin re.finditer(rb'pattern', mm):
print(f"在位置 {match.start()} 找到匹配")from tempfile import NamedTemporaryFile, TemporaryDirectory
# 安全临时文件
with NamedTemporaryFile('w+', delete=False) as temp_file:
temp_file.write("临时内容")
temp_path = temp_file.name
# 自动删除临时目录
with TemporaryDirectory() as temp_dir:
temp_file_path = os.path.join(temp_dir, 'temp.txt')
with open(temp_file_path, 'w') as f:
f.write("在临时目录中的文件")
# 离开with块后自动删除整个目录import gzip
import zipfile
# 读取GZIP文件
with gzip.open('data.gz', 'rt') as f: # 'rt'表示文本模式
content = f.read()
# 创建ZIP存档
with zipfile.ZipFile('archive.zip', 'w') as zf:
zf.write('document.txt')
zf.write('image.png')
# 读取ZIP文件
with zipfile.ZipFile('archive.zip', 'r') as zf:
with zf.open('document.txt') as f:
text = f.read().decode('utf-8')import os
from pathlib import Path
# 使用os.walk
for root, dirs, files in os.walk('project'):
for file in files:
if file.endswith('.py'):
filepath = os.path.join(root, file)
print(f"Python文件: {filepath}")
# 使用pathlib(更现代)
py_files = Path('project').glob('**/*.py')
for file in py_files:
print(f"Python文件: {file}")
# 下面这个性能更好一点
import os
from typing import Generator
defscan_directory(path: str) -> Generator[str, None, None]:
"""递归扫描目录,返回所有文件路径"""
with os.scandir(path) as entries:
for entry in entries:
if entry.is_file(follow_symlinks=False):
yield entry.path
elif entry.is_dir(follow_symlinks=False):
yieldfrom scan_directory(entry.path)
# 使用示例
for file_path in scan_directory("/path/to/large/directory"):
print(file_path)from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
classChangeHandler(FileSystemEventHandler):
defon_modified(self, event):
ifnot event.is_directory:
print(f"文件修改: {event.src_path}")
observer = Observer()
observer.schedule(ChangeHandler(), path='.', recursive=True)
observer.start()
try:
whileTrue:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()class EncryptedFile:
"""自动加密/解密的文件处理类"""
def__init__(self, filename, mode='r', key='secret'):
self.filename = filename
self.mode = mode
self.key = key.encode('utf-8')
self.file = None
def_cipher(self, data):
"""简单XOR加密"""
returnbytes(b ^ self.key[i % len(self.key)] for i, b inenumerate(data))
def__enter__(self):
self.file = open(self.filename, 'rb'if'b'inself.mode else'r')
returnself
def__exit__(self, exc_type, exc_val, exc_tb):
ifself.file:
self.file.close()
defread(self, size=-1):
raw = self.file.read(size)
returnself._cipher(raw) if'b'inself.mode elseself._cipher(raw.encode()).decode()
defwrite(self, data):
encrypted = self._cipher(data if'b'inself.mode else data.encode())
self.file.write(encrypted)
# 使用示例
with EncryptedFile('secret.dat', 'wb', key='mypass') as f:
f.write(b"Sensitive data")
with EncryptedFile('secret.dat', 'rb', key='mypass') as f:
print(f.read()) # b'Sensitive data'from contextlib import contextmanager, ExitStack
@contextmanager
deffile_pipeline(input_path, output_path):
"""文件处理管道:自动管理多个文件资源"""
with ExitStack() as stack:
# 打开输入文件
fin = stack.enter_context(open(input_path, 'r'))
# 打开输出文件
fout = stack.enter_context(open(output_path, 'w'))
# 打开日志文件
flog = stack.enter_context(open('process.log', 'a'))
# 添加自定义处理
defprocess(line):
flog.write(f"处理行: {line.strip()}\n")
return line.upper()
yield fin, fout, process
# 使用示例
with file_pipeline('input.txt', 'output.txt') as (fin, fout, process):
for line in fin:
fout.write(process(line))简单文件简单处理
#Python文件 #Python