文章目录发现宝藏一、 目标二、 浅析三、获取所有模块四、请求处理模块、版面、文章1. 分析切换页面的参数传递2. 获取共有多少页标签并遍历版面3.解析版面并保存版面信息4. 解析文章列表和文章5. 清洗文章6. 保存文章图片五、完整代码六、效果展示发现宝藏前些天发现了一个巨牛的人工智能学习网站通俗易懂风趣幽默忍不住分享一下给大家。【宝藏入口】。一、 目标爬取news.mit.edu的字段,包含标题、内容作者发布时间链接地址文章快照 (可能需要翻墙才能访问)二、 浅析1.全部新闻大致分为4个模块2.每个模块的标签列表大致如下3.每个标签对应的文章列表大致如下4.具体每篇文章对应的结构如下三、获取所有模块其实就四个模块列举出来就好然后对每个分别解析爬取每个模块class MitnewsScraper: def __init__(self, root_url, model_url, img_output_dir): self.root_url root_url self.model_url model_url self.img_output_dir img_output_dir self.headers { Referer: https://news.mit.edu/, User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36, Cookie: 替换成你自己的, } ... def run(): root_url https://news.mit.edu/ model_urls [https://news.mit.edu/topic, https://news.mit.edu/clp, https://news.mit.edu/department, https://news.mit.edu/] output_dir D:imgsmit-news for model_url in model_urls: scraper MitnewsScraper(root_url, model_url, output_dir) scraper.catalogue_all_pages()四、请求处理模块、版面、文章先处理一个模块TOPICS1. 分析切换页面的参数传递如图可知是get请求,需要传一个参数page2. 获取共有多少页标签并遍历版面实际上是获取所有的page参数然后进行遍历获取所有的标签# 获取一个模块有多少版面 def catalogue_all_pages(self): response requests.get(self.model_url, headersself.headers) soup BeautifulSoup(response.text, html.parser) try: match re.search(rof (d) topics, soup.text) total_catalogues int(match.group(1)) total_pages math.ceil(total_catalogues / 20) print(topics模块一共有 match.group(1) 个版面, str(total_pages) 页数据) for page in range(0, total_pages): self.parse_catalogues(page) print(fFinished catalogues page {page 1}) except: self.parse_catalogues(0)3.解析版面并保存版面信息前三个模块的版面列表第四个模块的版面列表# 解析版面列表里的版面 def parse_catalogues(self, page): params {page: page} response requests.get(self.model_url, paramsparams, headersself.headers) if response.status_code 200: soup BeautifulSoup(response.text, html.parser) if self.root_url self.model_url: catalogue_list soup.find(div, site-browse--recommended-section site-browse--recommended-section--schools) catalogues_list catalogue_list.find_all(li) else: catalogue_list soup.find(ul, page-vocabulary--views--list) catalogues_list catalogue_list.find_all(li) for index, catalogue in enumerate(catalogues_list): # 操作时间 date datetime.now() # 版面标题 catalogue_title catalogue.find(a).get_text(stripTrue) print(第 str(index 1) 个版面标题为: catalogue_title) catalogue_href catalogue.find(a).get(href) # 版面id catalogue_id catalogue_href[1:] catalogue_url self.root_url catalogue_href print(第 str(index 1) 个版面地址为: catalogue_url) # 根据版面url解析文章列表 response requests.get(catalogue_url, headersself.headers) soup BeautifulSoup(response.text, html.parser) match re.search(rof (d), soup.text) # 查找一个版面有多少篇文章 total_cards int(match.group(1)) total_pages math.ceil(total_cards / 15) print(f{catalogue_title}版面一共有{total_cards}篇文章, f{total_pages}页数据) for page in range(0, total_pages): self.parse_cards_list(page, catalogue_url, catalogue_id) print(fFinished {catalogue_title} 版面 page {page 1}) # 连接 MongoDB 数据库服务器 client MongoClient(mongodb://localhost:27017/) # 创建或选择数据库 db client[mit-news] # 创建或选择集合 catalogues_collection db[catalogues] # 插入示例数据到 catalogues 集合 catalogue_data { id: catalogue_id, date: date, title: catalogue_title, url: catalogue_url, cardSize: total_cards } catalogues_collection.insert_one(catalogue_data) return True else: raise Exception(fFailed to fetch page {page}. Status code: {response.status_code})4. 解析文章列表和文章寻找冗余部分并删除例如# 解析文章列表里的文章 def parse_cards_list(self, page, url, catalogue_id): params {page: page} response requests.get(url, paramsparams, headersself.headers) if response.status_code 200: soup BeautifulSoup(response.text, html.parser) card_list soup.find(div, page-term--views--list) cards_list card_list.find_all(div, page-term--views--list-item) for index, card in enumerate(cards_list): # 对应的版面id catalogue_id catalogue_id # 操作时间 date datetime.now() # 文章标题 card_title card.find(a, term-page--news-article--item--title--link).find(span).get_text( stripTrue) # 文章简介 card_introduction card.find(p, term-page--news-article--item--dek).find(span).get_text( stripTrue) # 文章更新时间 publish_time card.find(p, term-page--news-article--item--publication-date).find(time).get( datetime) updateTime datetime.strptime(publish_time, %Y-%m-%dT%H:%M:%SZ) # 文章地址 temp_url card.find(div, term-page--news-article--item--cover-image).find(a).get(href) url https://news.mit.edu temp_url # 文章id pattern r(w(-w)*)-(d) match re.search(pattern, temp_url) card_id str(match.group(0)) card_response requests.get(url, headersself.headers) soup BeautifulSoup(card_response.text, html.parser) # 原始htmldom结构 html_title soup.find(div, idblock-mit-page-title) html_content soup.find(div, idblock-mit-content) # 合并标题和内容 html_title.append(html_content) html_cut1 soup.find(div, news-article--topics) html_cut2 soup.find(div, news-article--archives) html_cut3 soup.find(div, news-article--content--side-column) html_cut4 soup.find(div, news-article--press-inquiries) html_cut5 soup.find_all(div, visually-hidden) html_cut6 soup.find(p, news-article--images-gallery--nav--inner) # 移除元素 if html_cut1: html_cut1.extract() if html_cut2: html_cut2.extract() if html_cut3: html_cut3.extract() if html_cut4: html_cut4.extract() if html_cut5: for item in html_cut5: item.extract() if html_cut6: html_cut6.extract() # 获取合并后的内容文本 html_content html_title # 文章作者 author_list html_content.find(div, news-article--authored-by).find_all(span) author for item in author_list: author author item.get_text() # 增加保留html样式的源文本 origin_html html_content.prettify() # String # 转义网页中的图片标签 str_html self.transcoding_tags(origin_html) # 再包装成 temp_soup BeautifulSoup(str_html, html.parser) # 反转译文件中的插图 str_html self.translate_tags(temp_soup.text) # 绑定更新内容 content self.clean_content(str_html) # 下载图片 imgs [] img_array soup.find_all(div, news-article--image-item) for item in img_array: img_url self.root_url item.find(img).get(data-src) imgs.append(img_url) if len(imgs) ! 0: # 下载图片 illustrations self.download_images(imgs, card_id) # 连接 MongoDB 数据库服务器 client MongoClient(mongodb://localhost:27017/) # 创建或选择数据库 db client[mit-news] # 创建或选择集合 cards_collection db[cards] # 插入示例数据到 catalogues 集合 card_data { id: card_id, catalogueId: catalogue_id, type: mit-news, date: date, title: card_title, author: author, card_introduction: card_introduction, updatetime: updateTime, url: url, html_content: str(html_content), content: content, illustrations: illustrations, } cards_collection.insert_one(card_data) return True else: raise Exception(fFailed to fetch page {page}. Status code: {response.status_code})5. 清洗文章# 工具 转义标签 def transcoding_tags(self, htmlstr): re_img re.compile(rs*(img.*?)s*, re.M) s re_img.sub(r #### , htmlstr) # IMG 转义 return s # 工具 转义标签 def translate_tags(self, htmlstr): re_img re.compile(r##(img.*?)##, re.M) s re_img.sub(r, htmlstr) # IMG 转义 return s # 清洗文章 def clean_content(self, content): if content is not None: content re.sub(r , r , content) content re.sub(r {2,}, , content) content re.sub(r {6,}, , content) content re.sub(r {3,} , , content) content re.sub(rimg src../../../image/zxbl.gif/, , content) content content.replace( img border0 src****处理标记[Article]时 字段 [SnapUrl] 在数据源中没有找到! ****/ , ) content content.replace( !--/enpcontentINPUT typecheckbox value0 nametitlecheckbox sourceidSourceSourcePh styledisplay:none, ) .replace( !--enpcontent, ).replace(TABLE, ) content content.replace(P, ).replace(P, ).replace(nbsp;, ) return content6. 保存文章图片# 下载图片 def download_images(self, img_urls, card_id): # 根据card_id创建一个新的子目录 images_dir os.path.join(self.img_output_dir, card_id) if not os.path.exists(images_dir): os.makedirs(images_dir) downloaded_images [] for index, img_url in enumerate(img_urls): try: response requests.get(img_url, streamTrue, headersself.headers) if response.status_code 200: # 从URL中提取图片文件名 img_name_with_extension img_url.split(/)[-1] pattern r^[^?]* match re.search(pattern, img_name_with_extension) img_name match.group(0) # 保存图片 with open(os.path.join(images_dir, img_name), wb) as f: f.write(response.content) downloaded_images.append([img_url, os.path.join(images_dir, img_name)]) except requests.exceptions.RequestException as e: print(f请求图片时发生错误{e}) except Exception as e: print(f保存图片时发生错误{e}) return downloaded_images # 如果文件夹存在则跳过 else: print(f文章id为{card_id}的图片文件夹已经存在) return []五、完整代码import os from datetime import datetime import requests from bs4 import BeautifulSoup from pymongo import MongoClient import re import math class MitnewsScraper: def __init__(self, root_url, model_url, img_output_dir): self.root_url root_url self.model_url model_url self.img_output_dir img_output_dir self.headers { Referer: https://news.mit.edu/, User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36, Cookie: 替换成你自己的 } # 获取一个模块有多少版面 def catalogue_all_pages(self): response requests.get(self.model_url, headersself.headers) soup BeautifulSoup(response.text, html.parser) try: match re.search(rof (d) topics, soup.text) total_catalogues int(match.group(1)) total_pages math.ceil(total_catalogues / 20) print(topics模块一共有 match.group(1) 个版面, str(total_pages) 页数据) for page in range(0, total_pages): self.parse_catalogues(page) print(fFinished catalogues page {page 1}) except: self.parse_catalogues(0) # 解析版面列表里的版面 def parse_catalogues(self, page): params {page: page} response requests.get(self.model_url, paramsparams, headersself.headers) if response.status_code 200: soup BeautifulSoup(response.text, html.parser) if self.root_url self.model_url: catalogue_list soup.find(div, site-browse--recommended-section site-browse--recommended-section--schools) catalogues_list catalogue_list.find_all(li) else: catalogue_list soup.find(ul, page-vocabulary--views--list) catalogues_list catalogue_list.find_all(li) for index, catalogue in enumerate(catalogues_list): # 操作时间 date datetime.now() # 版面标题 catalogue_title catalogue.find(a).get_text(stripTrue) print(第 str(index 1) 个版面标题为: catalogue_title) catalogue_href catalogue.find(a).get(href) # 版面id catalogue_id catalogue_href[1:] catalogue_url self.root_url catalogue_href print(第 str(index 1) 个版面地址为: catalogue_url) # 根据版面url解析文章列表 response requests.get(catalogue_url, headersself.headers) soup BeautifulSoup(response.text, html.parser) match re.search(rof (d), soup.text) # 查找一个版面有多少篇文章 total_cards int(match.group(1)) total_pages math.ceil(total_cards / 15) print(f{catalogue_title}版面一共有{total_cards}篇文章, f{total_pages}页数据) for page in range(0, total_pages): self.parse_cards_list(page, catalogue_url, catalogue_id) print(fFinished {catalogue_title} 版面 page {page 1}) # 连接 MongoDB 数据库服务器 client MongoClient(mongodb://localhost:27017/) # 创建或选择数据库 db client[mit-news] # 创建或选择集合 catalogues_collection db[catalogues] # 插入示例数据到 catalogues 集合 catalogue_data { id: catalogue_id, date: date, title: catalogue_title, url: catalogue_url, cardSize: total_cards } catalogues_collection.insert_one(catalogue_data) return True else: raise Exception(fFailed to fetch page {page}. Status code: {response.status_code}) # 解析文章列表里的文章 def parse_cards_list(self, page, url, catalogue_id): params {page: page} response requests.get(url, paramsparams, headersself.headers) if response.status_code 200: soup BeautifulSoup(response.text, html.parser) card_list soup.find(div, page-term--views--list) cards_list card_list.find_all(div, page-term--views--list-item) for index, card in enumerate(cards_list): # 对应的版面id catalogue_id catalogue_id # 操作时间 date datetime.now() # 文章标题 card_title card.find(a, term-page--news-article--item--title--link).find(span).get_text( stripTrue) # 文章简介 card_introduction card.find(p, term-page--news-article--item--dek).find(span).get_text( stripTrue) # 文章更新时间 publish_time card.find(p, term-page--news-article--item--publication-date).find(time).get( datetime) updateTime datetime.strptime(publish_time, %Y-%m-%dT%H:%M:%SZ) # 文章地址 temp_url card.find(div, term-page--news-article--item--cover-image).find(a).get(href) url https://news.mit.edu temp_url # 文章id pattern r(w(-w)*)-(d) match re.search(pattern, temp_url) card_id str(match.group(0)) card_response requests.get(url, headersself.headers) soup BeautifulSoup(card_response.text, html.parser) # 原始htmldom结构 html_title soup.find(div, idblock-mit-page-title) html_content soup.find(div, idblock-mit-content) # 合并标题和内容 html_title.append(html_content) html_cut1 soup.find(div, news-article--topics) html_cut2 soup.find(div, news-article--archives) html_cut3 soup.find(div, news-article--content--side-column) html_cut4 soup.find(div, news-article--press-inquiries) html_cut5 soup.find_all(div, visually-hidden) html_cut6 soup.find(p, news-article--images-gallery--nav--inner) # 移除元素 if html_cut1: html_cut1.extract() if html_cut2: html_cut2.extract() if html_cut3: html_cut3.extract() if html_cut4: html_cut4.extract() if html_cut5: for item in html_cut5: item.extract() if html_cut6: html_cut6.extract() # 获取合并后的内容文本 html_content html_title # 文章作者 author_list html_content.find(div, news-article--authored-by).find_all(span) author for item in author_list: author author item.get_text() # 增加保留html样式的源文本 origin_html html_content.prettify() # String # 转义网页中的图片标签 str_html self.transcoding_tags(origin_html) # 再包装成 temp_soup BeautifulSoup(str_html, html.parser) # 反转译文件中的插图 str_html self.translate_tags(temp_soup.text) # 绑定更新内容 content self.clean_content(str_html) # 下载图片 imgs [] img_array soup.find_all(div, news-article--image-item) for item in img_array: img_url self.root_url item.find(img).get(data-src) imgs.append(img_url) if len(imgs) ! 0: # 下载图片 illustrations self.download_images(imgs, card_id) # 连接 MongoDB 数据库服务器 client MongoClient(mongodb://localhost:27017/) # 创建或选择数据库 db client[mit-news] # 创建或选择集合 cards_collection db[cards] # 插入示例数据到 catalogues 集合 card_data { id: card_id, catalogueId: catalogue_id, type: mit-news, date: date, title: card_title, author: author, card_introduction: card_introduction, updatetime: updateTime, url: url, html_content: str(html_content), content: content, illustrations: illustrations, } cards_collection.insert_one(card_data) return True else: raise Exception(fFailed to fetch page {page}. Status code: {response.status_code}) # 下载图片 def download_images(self, img_urls, card_id): # 根据card_id创建一个新的子目录 images_dir os.path.join(self.img_output_dir, card_id) if not os.path.exists(images_dir): os.makedirs(images_dir) downloaded_images [] for index, img_url in enumerate(img_urls): try: response requests.get(img_url, streamTrue, headersself.headers) if response.status_code 200: # 从URL中提取图片文件名 img_name_with_extension img_url.split(/)[-1] pattern r^[^?]* match re.search(pattern, img_name_with_extension) img_name match.group(0) # 保存图片 with open(os.path.join(images_dir, img_name), wb) as f: f.write(response.content) downloaded_images.append([img_url, os.path.join(images_dir, img_name)]) except requests.exceptions.RequestException as e: print(f请求图片时发生错误{e}) except Exception as e: print(f保存图片时发生错误{e}) return downloaded_images # 如果文件夹存在则跳过 else: print(f文章id为{card_id}的图片文件夹已经存在) return [] # 工具 转义标签 def transcoding_tags(self, htmlstr): re_img re.compile(rs*(img.*?)s*, re.M) s re_img.sub(r #### , htmlstr) # IMG 转义 return s # 工具 转义标签 def translate_tags(self, htmlstr): re_img re.compile(r##(img.*?)##, re.M) s re_img.sub(r, htmlstr) # IMG 转义 return s # 清洗文章 def clean_content(self, content): if content is not None: content re.sub(r , r , content) content re.sub(r {2,}, , content) content re.sub(r {6,}, , content) content re.sub(r {3,} , , content) content re.sub(rimg src../../../image/zxbl.gif/, , content) content content.replace( img border0 src****处理标记[Article]时 字段 [SnapUrl] 在数据源中没有找到! ****/ , ) content content.replace( !--/enpcontentINPUT typecheckbox value0 nametitlecheckbox sourceidSourceSourcePh styledisplay:none, ) .replace( !--enpcontent, ).replace(TABLE, ) content content.replace(P, ).replace(P, ).replace(nbsp;, ) return content def run(): root_url https://news.mit.edu/ model_urls [https://news.mit.edu/topic, https://news.mit.edu/clp, https://news.mit.edu/department, https://news.mit.edu/] output_dir D:imgsmit-news for model_url in model_urls: scraper MitnewsScraper(root_url, model_url, output_dir) scraper.catalogue_all_pages() if __name__ __main__: run()六、效果展示