用Python爬虫+Scapy抓包,手把手教你从零搭建一个自己的期末复习资料库(附完整代码)
用Python构建智能复习资料库爬虫与抓包技术实战期末考试季总是让人手忙脚乱各种讲义、习题和PPT散落在不同平台手动收集效率低下。作为计算机专业学生我们完全可以用技术手段解决这个问题——构建一个自动化、智能化的个人复习资料库。本文将带你从零开始整合Python爬虫与网络抓包技术打造一个能自动收集、分类存储学习资料的系统。1. 项目规划与技术选型在开始编码前明确项目目标和选择合适的技术栈至关重要。我们的复习资料库需要实现三个核心功能资源自动收集、内容解析存储和反爬策略应对。技术栈对比分析功能需求技术方案优势适用场景网页内容获取Requests库简单高效适合静态页面大多数学术资源网站动态内容渲染Selenium能处理JavaScript动态加载单页应用(SPA)类网站HTML解析BeautifulSoup语法简洁学习曲线平缓常规网页结构解析网络请求分析Scapy提供底层数据包操作能力反爬机制分析与模拟请求数据存储SQLite 本地文件系统轻量级无需额外服务个人使用的小型资料库选择RequestsBeautifulSoupScapy组合既能覆盖大多数学术资源网站又能应对基础反爬措施。对于特别复杂的动态网站可以后续引入Selenium作为补充。提示在实际开发中建议先从最简单的Requests开始遇到障碍再逐步引入更复杂的技术避免过早优化。2. 基础爬虫框架搭建让我们先构建一个稳健的爬虫框架包含请求管理、异常处理和基础解析功能。模块化设计将使后续扩展更加容易。import requests from bs4 import BeautifulSoup import time import os from urllib.parse import urljoin, urlparse class AcademicSpider: def __init__(self, base_url, output_dirmaterials): self.base_url base_url self.output_dir output_dir self.session requests.Session() self.session.headers.update({ User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64), Accept-Language: en-US,en;q0.9 }) self.visited_urls set() self.create_output_dir() def create_output_dir(self): if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) os.makedirs(os.path.join(self.output_dir, pdfs)) os.makedirs(os.path.join(self.output_dir, ppts)) os.makedirs(os.path.join(self.output_dir, html)) def is_valid_url(self, url): parsed urlparse(url) return bool(parsed.netloc) and parsed.netloc in self.base_url def download_file(self, url, file_type): try: response self.session.get(url, streamTrue, timeout10) response.raise_for_status() filename os.path.basename(urlparse(url).path) or ffile_{int(time.time())} save_path os.path.join(self.output_dir, file_type, filename) with open(save_path, wb) as f: for chunk in response.iter_content(1024): f.write(chunk) return True except Exception as e: print(f下载失败 {url}: {e}) return False def extract_links(self, url): try: response self.session.get(url, timeout8) response.raise_for_status() soup BeautifulSoup(response.text, html.parser) # 识别并下载资源文件 for link in soup.find_all(a, hrefTrue): href link[href].lower() if href.endswith(.pdf): pdf_url urljoin(url, link[href]) self.download_file(pdf_url, pdfs) elif href.endswith((.ppt, .pptx)): ppt_url urljoin(url, link[href]) self.download_file(ppt_url, ppts) # 返回页面上的所有链接供进一步爬取 return [urljoin(url, a[href]) for a in soup.find_all(a, hrefTrue) if self.is_valid_url(urljoin(url, a[href]))] except Exception as e: print(f解析页面失败 {url}: {e}) return [] def crawl(self, start_url, max_depth2): queue [(start_url, 0)] while queue: url, depth queue.pop(0) if url in self.visited_urls or depth max_depth: continue print(f正在抓取: {url} (深度 {depth})) self.visited_urls.add(url) time.sleep(1) # 礼貌性延迟 links self.extract_links(url) queue.extend((link, depth1) for link in links if link not in self.visited_urls)这个基础框架已经具备了几个关键特性会话管理使用requests.Session保持连接提高效率资源识别自动检测并下载PDF和PPT文件礼貌爬取设置1秒间隔避免对服务器造成负担广度优先搜索控制爬取深度防止无限递归异常处理对各种网络错误进行捕获和记录3. 使用Scapy分析网络请求当遇到反爬机制时我们需要更深入地理解网站如何运作。Scapy可以帮助我们分析原始网络流量识别关键API请求和验证机制。from scapy.all import sniff, IP, TCP, Raw from collections import defaultdict import json class TrafficAnalyzer: def __init__(self, target_domain): self.target_domain target_domain self.api_endpoints defaultdict(int) self.auth_headers set() def packet_handler(self, packet): if IP in packet and TCP in packet: ip_pkt packet[IP] tcp_pkt packet[TCP] # 只分析目标域名的流量 if self.target_domain in ip_pkt.dst: payload bytes(tcp_pkt.payload) try: # 尝试解析HTTP请求 if bHTTP in payload[:20]: http_lines payload.decode(utf-8, errorsignore).split(\r\n) request_line http_lines[0] # 记录API端点 if any(method in request_line for method in [GET, POST, PUT]): path request_line.split( )[1] self.api_endpoints[path] 1 # 提取认证头信息 for line in http_lines[1:]: if line.startswith((Authorization:, X-API-Key:)): self.auth_headers.add(line) except UnicodeDecodeError: pass def start_capture(self, duration60): print(f开始捕获 {self.target_domain} 的流量...) sniff(filterftcp and host {self.target_domain}, prnself.packet_handler, timeoutduration) def generate_report(self): report { most_frequent_endpoints: sorted(self.api_endpoints.items(), keylambda x: x[1], reverseTrue)[:5], authentication_headers: list(self.auth_headers) } with open(traffic_report.json, w) as f: json.dump(report, f, indent2) return report # 使用示例 analyzer TrafficAnalyzer(example.edu) analyzer.start_capture(120) report analyzer.generate_report()这个流量分析器能帮助我们识别网站最频繁访问的API端点发现隐藏的认证机制理解网站的数据加载方式找出可能用于反爬的请求头注意在实际使用中请确保你有权限监控网络流量。仅分析你拥有或有权测试的网站流量。4. 应对常见反爬策略学术资源网站常用的反爬手段包括请求频率限制、用户行为分析和验证码。下面是一些实用对策1. 请求速率控制import random from functools import wraps def randomized_delay(min_wait1, max_wait3): def decorator(func): wraps(func) def wrapper(*args, **kwargs): delay random.uniform(min_wait, max_wait) time.sleep(delay) return func(*args, **kwargs) return wrapper return decorator # 使用方法 randomized_delay(0.5, 2.5) def make_request(url): return requests.get(url)2. 请求头轮换USER_AGENTS [ Mozilla/5.0 (Windows NT 10.0; Win64; x64), Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7), Mozilla/5.0 (X11; Linux x86_64) ] def rotate_headers(): return { User-Agent: random.choice(USER_AGENTS), Accept: text/html,application/xhtmlxml, Accept-Language: en-US,en;q0.5, Referer: https://www.google.com/ }3. 处理验证码对于简单验证码可以使用OCR库尝试自动识别import pytesseract from PIL import Image import io def solve_captcha(image_bytes): try: image Image.open(io.BytesIO(image_bytes)) text pytesseract.image_to_string(image) return text.strip() except Exception: return None对于复杂验证码建议手动输入后保存cookies复用寻找无验证码的API接口使用专业验证码解决服务4. 会话保持技巧def maintain_session(spider, login_url, credentials): # 先获取登录页获取CSRF token login_page spider.session.get(login_url) soup BeautifulSoup(login_page.text, html.parser) csrf_token soup.find(input, {name: csrf_token})[value] # 提交登录表单 credentials[csrf_token] csrf_token response spider.session.post(login_url, datacredentials) # 检查登录是否成功 if welcome in response.url: print(登录成功会话已建立) return True return False5. 资料存储与检索系统收集到的资料需要有效组织才能发挥最大价值。我们设计一个基于SQLite的存储系统支持全文检索。import sqlite3 from datetime import datetime import fitz # PyMuPDF class KnowledgeBase: def __init__(self, db_pathknowledge.db): self.conn sqlite3.connect(db_path) self._init_db() def _init_db(self): cursor self.conn.cursor() cursor.execute( CREATE TABLE IF NOT EXISTS materials ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, file_path TEXT UNIQUE NOT NULL, file_type TEXT NOT NULL, source_url TEXT, content_text TEXT, keywords TEXT, added_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) # 启用全文搜索 (FTS5) cursor.execute( CREATE VIRTUAL TABLE IF NOT EXISTS materials_fts USING fts5(title, content_text, keywords) ) self.conn.commit() def index_pdf(self, filepath): 提取PDF文本内容并索引 try: doc fitz.open(filepath) text for page in doc: text page.get_text() title os.path.basename(filepath) keywords self._extract_keywords(text) cursor self.conn.cursor() cursor.execute( INSERT INTO materials (title, file_path, file_type, content_text, keywords) VALUES (?, ?, pdf, ?, ?) , (title, filepath, text, keywords)) # 更新全文搜索索引 cursor.execute( INSERT INTO materials_fts (rowid, title, content_text, keywords) VALUES (last_insert_rowid(), ?, ?, ?) , (title, text, keywords)) self.conn.commit() return True except Exception as e: print(f索引PDF失败 {filepath}: {e}) return False def _extract_keywords(self, text, top_n10): 简单的关键词提取 from collections import Counter import re words re.findall(r\w{4,}, text.lower()) word_counts Counter(words) return ,.join(word for word, _ in word_counts.most_common(top_n)) def search(self, query): 全文检索 cursor self.conn.cursor() cursor.execute( SELECT m.title, m.file_path, snippet(materials_fts, 2, b, /b, ..., 64) FROM materials m JOIN materials_fts fts ON m.id fts.rowid WHERE materials_fts MATCH ? ORDER BY rank LIMIT 10 , (query,)) return cursor.fetchall() def close(self): self.conn.close() # 使用示例 kb KnowledgeBase() kb.index_pdf(materials/pdfs/computer_networks.pdf) results kb.search(TCP 三次握手) for title, path, snippet in results: print(f{title}: {snippet}\n- {path})这个知识库系统提供PDF文本内容提取自动关键词生成快速全文检索搜索结果高亮显示按相关性排序6. 项目部署与自动化为了让资料库保持更新我们可以设置定时任务自动运行爬虫并通过简单的Web界面提供访问。1. 使用APScheduler设置定时任务from apscheduler.schedulers.blocking import BlockingScheduler def scheduled_crawl(): spider AcademicSpider(https://ocw.example.edu) spider.crawl(https://ocw.example.edu/cs101) kb KnowledgeBase() for root, _, files in os.walk(materials/pdfs): for file in files: if file.endswith(.pdf): kb.index_pdf(os.path.join(root, file)) kb.close() scheduler BlockingScheduler() scheduler.add_job(scheduled_crawl, cron, day_of_weekmon,wed,fri, hour2) scheduler.start()2. 简易Web界面使用Flaskfrom flask import Flask, render_template, request app Flask(__name__) kb KnowledgeBase() app.route(/) def home(): query request.args.get(q, ) results [] if query: results kb.search(query) return render_template(search.html, queryquery, resultsresults) if __name__ __main__: app.run(port5000)对应的HTML模板templates/search.html!DOCTYPE html html head title个人知识库/title style .result { margin-bottom: 20px; padding: 15px; border: 1px solid #ddd; } .snippet { color: #666; } .path { font-size: 0.8em; color: #999; } /style /head body h1复习资料检索系统/h1 form methodget input typetext nameq value{{ query }} placeholder输入搜索关键词 button typesubmit搜索/button /form {% if query %} h2搜索结果/h2 {% if results %} {% for title, path, snippet in results %} div classresult h3{{ title }}/h3 p classsnippet{{ snippet|safe }}/p p classpath{{ path }}/p /div {% endfor %} {% else %} p没有找到相关结果/p {% endif %} {% endif %} /body /html7. 实际应用中的经验分享在开发过程中我遇到了几个值得注意的问题。首先是网站结构变化导致爬虫失效解决方法是为重要爬虫添加监控通知def check_crawler_health(spider, test_url): try: links spider.extract_links(test_url) if not links: send_alert_email(爬虫可能失效未获取到任何链接) return bool(links) except Exception as e: send_alert_email(f爬虫健康检查失败: {str(e)}) return False另一个常见问题是资源去重。相同的讲义可能在不同页面出现通过内容哈希可以避免重复存储import hashlib def file_hash(filepath): with open(filepath, rb) as f: return hashlib.md5(f.read()).hexdigest() def is_duplicate(filepath): kb KnowledgeBase() cursor kb.conn.cursor() current_hash file_hash(filepath) cursor.execute(SELECT file_path FROM materials WHERE file_hash?, (current_hash,)) return cursor.fetchone() is not None最后对于需要登录的学术平台建议使用环境变量存储凭证并添加两步验证支持import os from getpass import getpass def get_credentials(): username os.getenv(UNIV_USERNAME) password os.getenv(UNIV_PASSWORD) if not username: username input(请输入学号: ) if not password: password getpass(请输入密码: ) return { username: username, password: password }