当前位置:首页 > 问答 > 正文

探索Python文件下载的进阶方法:提升效率的核心技巧与实战指南

探索Python文件下载的进阶方法:提升效率的核心技巧与实战指南

最近在折腾一个项目,需要批量下载一堆文件——图片、压缩包、API返回的数据,啥都有,一开始我用的是经典的 requests + urllib,简单粗暴,但很快就发现效率太低了,下载500个文件居然花了半小时,还时不时因为网络波动崩掉几个,气得我差点把键盘砸了 😤,于是我开始琢磨:有没有更高效、更稳定的方法?

后来我发现,文件下载这事儿,真不是简单的“发请求-收数据-写文件”就完事了,里面有很多细节可以优化,比如并发、断点续传、错误处理,甚至如何“礼貌”地不被服务器拉黑,下面我就分享一些自己踩坑后总结的进阶方法,希望能帮你少走弯路。


别再用“一行流”了,试试分块下载和流式写入

很多人喜欢用这种“经典”写法:

import requests
response = requests.get(url)
with open("file.zip", "wb") as f:
    f.write(response.content)

问题在哪?如果文件很大(比如几个G),response.content 会一次性加载到内存,容易爆内存,而且中途失败就得重头再来。

改进方法:用流式下载 + 分块写入:

response = requests.get(url, stream=True)
with open("file.zip", "wb") as f:
    for chunk in response.iter_content(chunk_size=8192):
        if chunk:
            f.write(chunk)

这样内存占用小,还能实时写入。chunk_size 可以根据网络情况调整,我一般设成8KB或16KB,亲测稳定。


并发下载:别傻傻地一个个下

如果你要下100个文件,用循环串行下载简直是在浪费生命,这时候必须上并发!但我建议别直接用 threading,容易把自己搞乱,我用的是 concurrent.futures,简单又可控。

from concurrent.futures import ThreadPoolExecutor
import requests
def download_one(url, path):
    # 这里可以加重试、超时等逻辑
    response = requests.get(url, stream=True)
    with open(path, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)
urls = ["http://example.com/file1", "http://example.com/file2"]
paths = ["file1.zip", "file2.zip"]
with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(download_one, urls, paths)

注意别开太多线程,我一般控制在5-10个,不然容易被服务器当成攻击(别问我怎么知道的 🥲)。


断点续传:下载大文件的神器

有一次我下到95%突然断网,差点崩溃,后来才知道 requests 支持断点续传,但得手动处理——检查本地已下载大小,然后设置 Range 头。

探索Python文件下载的进阶方法:提升效率的核心技巧与实战指南

import os
headers = {}
if os.path.exists("file.zip"):
    downloaded_size = os.path.getsize("file.zip")
    headers["Range"] = f"bytes={downloaded_size}-"
response = requests.get(url, headers=headers, stream=True)
with open("file.zip", "ab") as f:  # 注意这里是ab,追加模式
    for chunk in response.iter_content(chunk_size=8192):
        f.write(chunk)

这个方法适合大文件,但要注意服务器是否支持 Range 请求(一般都支持)。


异步?小心别掉坑里

我试过用 aiohttp 做异步下载,速度确实快,但写起来复杂多了,除非你是下载狂魔(比如同时下几千个文件),否则不一定值得,而且异步代码调试起来真是要命,我经常搞不清哪个任务挂了。

# 伪代码,实际还得处理异常和重试
async with aiohttp.ClientSession() as session:
    async with session.get(url) as response:
        with open(path, "wb") as f:
            while True:
                chunk = await response.content.read(8192)
                if not chunk:
                    break
                f.write(chunk)

如果你不是老司机,建议先用线程池,稳定更重要。


伪装和限速:做个“礼貌”的下载者

有些服务器会封杀频繁请求的IP,所以最好加个User-Agent,甚至用随机延迟:

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
# 随机延迟,避免请求太密集
import time
import random
time.sleep(random.uniform(0.5, 1.5))

如果你用的是云服务,注意别把人家带宽打满(我被运维警告过两次🙃)。

探索Python文件下载的进阶方法:提升效率的核心技巧与实战指南


实战案例:下载Unsplash图片集

去年我做了一个小项目,要爬取Unsplash上的某个主题图片(大约300张),一开始直接串行下载,慢得要死还经常429,后来改成:

  • 用线程池(8个worker)
  • 每个请求加随机延迟和UA头
  • 失败自动重试3次

最终下载时间从15分钟降到2分钟,而且一张都没漏。

代码大概长这样:

from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))
def download_with_retry(url, path):
    # 加上超时、头部、延迟等
    time.sleep(random.uniform(0.3, 1.0))
    response = requests.get(url, headers=headers, timeout=10)
    # 写入文件...

最后一点思考

文件下载看似简单,但想做得稳健高效,其实有很多细节要注意,我现在习惯性地会问自己:

  • 要不要断点续传?
  • 并发开多少合适?
  • 需不需要伪装和限流?

有时候还会想,如果Python标准库能内置一个更强大的下载工具就好了……不过自己折腾也挺有意思的,至少能学到很多东西。

如果你有更好的方法,欢迎交流~我至今还在坑里爬着,偶尔能捡到宝石💎,但更多的是踩到雷哈哈哈。