告别手动复制粘贴Python批量下载风云卫星数据的保姆级教程附避坑指南风云卫星数据作为气象、环境监测等领域的重要资源其获取效率直接影响科研和业务进度。传统手动下载方式不仅耗时费力还容易因网络波动或操作失误导致前功尽弃。本文将带你用Python构建一个健壮的批量下载工具从环境配置到异常处理全程详解即使零基础也能轻松上手。1. 环境准备与项目初始化1.1 必备软件安装确保系统已安装Python 3.6推荐3.9版本pip包管理工具通常随Python自动安装验证安装python --version pip --version1.2 依赖库安装在终端执行以下命令安装必要库pip install requests ftplib3 retrying注retrying库将用于实现自动重试机制1.3 项目目录结构建议按如下结构组织项目├── download_script.py # 主脚本文件 ├── target/ # 存放下载列表文件 ├── downloads/ # 下载文件存储目录 └── logs/ # 运行日志目录2. 核心下载功能实现2.1 HTTP下载增强版import os from urllib.parse import urlparse import requests from retrying import retry retry(stop_max_attempt_number3, wait_fixed2000) def download_http(url, save_dirdownloads): filename os.path.basename(urlparse(url).path) save_path os.path.join(save_dir, filename) try: with requests.get(url, streamTrue, timeout30) as r: r.raise_for_status() with open(save_path, wb) as f: for chunk in r.iter_content(chunk_size8192): if chunk: # 过滤keep-alive空chunk f.write(chunk) print(f[成功] HTTP下载完成: {filename}) return True except Exception as e: print(f[失败] HTTP下载错误: {filename} - {str(e)}) raise2.2 FTP下载优化方案from ftplib import FTP import socket def download_ftp(ftp_url, save_dirdownloads): parsed urlparse(ftp_url) host parsed.hostname path parsed.path filename os.path.basename(path) save_path os.path.join(save_dir, filename) try: with FTP(host, timeout60) as ftp: ftp.login() # 匿名登录 with open(save_path, wb) as f: ftp.retrbinary(fRETR {path}, f.write) print(f[成功] FTP下载完成: {filename}) return True except (socket.timeout, EOFError) as e: print(f[失败] FTP连接超时: {filename}) return False except Exception as e: print(f[失败] FTP下载错误: {filename} - {str(e)}) return False3. 批量处理与性能优化3.1 多线程下载改造使用concurrent.futures实现并行下载from concurrent.futures import ThreadPoolExecutor def batch_download(url_list, max_workers4): with ThreadPoolExecutor(max_workersmax_workers) as executor: futures [] for url in url_list: if url.startswith(http): futures.append(executor.submit(download_http, url)) elif url.startswith(ftp): futures.append(executor.submit(download_ftp, url)) results [f.result() for f in futures] return sum(results) # 返回成功下载数量3.2 下载列表文件解析支持多种格式的列表文件def parse_list_file(file_path): extensions (.txt, .csv, .lst) if not file_path.endswith(extensions): raise ValueError(仅支持txt/csv/lst格式列表文件) with open(file_path, r) as f: lines f.readlines() urls [] for line in lines: line line.strip() if line and not line.startswith(#): # 跳过空行和注释 if | in line: # 处理CSV格式 url line.split(|)[0].strip() else: url line urls.append(url) return urls4. 实战避坑指南4.1 常见问题解决方案问题现象可能原因解决方案SSL证书错误网站证书过期添加verifyFalse参数仅限测试环境下载文件不完整网络中断启用分块下载断点续传中文文件名乱码编码问题使用response.apparent_encoding检测编码FTP连接超时服务器限制调整timeout参数至60秒以上4.2 日志记录与监控建议添加日志记录功能import logging from datetime import datetime def setup_logger(): if not os.path.exists(logs): os.makedirs(logs) log_file flogs/download_{datetime.now().strftime(%Y%m%d)}.log logging.basicConfig( filenamelog_file, levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s ) return logging.getLogger()4.3 特殊字符处理针对URL中的特殊字符from urllib.parse import quote def sanitize_url(url): parts url.split(://) if len(parts) 2: scheme parts[0] :// path quote(parts[1]) return scheme path return url5. 完整脚本集成5.1 主程序逻辑import sys import glob def main(): logger setup_logger() if not os.path.exists(target): print(错误未找到target目录) sys.exit(1) list_files glob.glob(target/*.txt) glob.glob(target/*.csv) if not list_files: print(错误target目录中未找到列表文件) sys.exit(1) for list_file in list_files: try: urls parse_list_file(list_file) success batch_download(urls) logger.info(f文件{list_file}处理完成成功下载{success}/{len(urls)}个文件) except Exception as e: logger.error(f处理{list_file}时出错: {str(e)}) if __name__ __main__: main()5.2 使用示例将下载链接列表保存到target/list.txt每行一个URL运行脚本python download_script.py查看下载结果成功下载的文件保存在downloads目录运行日志存储在logs目录6. 进阶优化方向6.1 断点续传实现HTTP断点续传代码片段headers {} if os.path.exists(save_path): file_size os.path.getsize(save_path) headers {Range: fbytes{file_size}-} response requests.get(url, headersheaders, streamTrue) if response.status_code 206: # 部分内容 with open(save_path, ab) as f: # 追加模式 for chunk in response.iter_content(chunk_size8192): f.write(chunk)6.2 速度限制与进度显示使用tqdm显示进度条from tqdm import tqdm response requests.get(url, streamTrue) total_size int(response.headers.get(content-length, 0)) with open(save_path, wb) as f, tqdm( descfilename, totaltotal_size, unitB, unit_scaleTrue ) as pbar: for chunk in response.iter_content(chunk_size8192): f.write(chunk) pbar.update(len(chunk))在实际项目中我发现网络不稳定时段使用max_workers2能显著降低失败率。对于超大型文件1GB建议单独处理并添加MD5校验功能确保数据完整性。